mulitporcessing共享变量
# 字典使用时深层修改失效
如果直接更新深层的字典,会不起作用:
manager = Manager()
pdict = manager.dict()
pdict['a'] = {'ss': 'init val'}
print('before: ', pdict)
pdict['a']['ss'] = 'something new'
print('after: ', pdict) #前后print出来都是一样的,修改未生效
2
3
4
5
6
7
可使用下面的上下文管理器:
class ProcessDictUpdate(object):
'''usage:
manager = Manager()
pdict = manager.dict()
pdict['a'] = {'ss': 'init val'}
print('before: ', pdict)
with ProcessDictUpdate(pdict) as newd:
newd['a']['ss'] = 'something new'
print('after: ', pdict) #修改生效
'''
def __init__(self, process_dict, lock:multiprocessing.Lock=None):
if lock:
lock.acquire()
self.lock = lock
self.process_dict = process_dict
self.cp = copy.deepcopy(self.process_dict)
def __enter__(self):
return self.cp
def __exit__(self, exc_type, exc_value, exc_traceback_obj):
for key in self.cp:
self.process_dict[key] = self.cp[key]
if self.lock:
self.lock.release()
return True #程序继续执行
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 加锁
https://www.jb51.net/article/171299.htm (opens new window)
python多进程和多线程是大家会重点了解的部分,因为很多工作如果并没有前后相互依赖关系的话其实顺序并不是非常的重要,采用顺序执行的话就必定会造成无谓的等待,任凭cpu和内存白白浪费,这是我们不想看到的。
为了解决这个问题,我们就可以采用多线程或者多进程的方式,(多线程我们之后再讲),而这两者之间是有本质区别的。就内存而言,已知进程是在执行过程中有独立的内存单元的,而多个线程是共享内存的,这是多进程和多线程的一大区别。
利用Value在不同进程中同步变量
在多进程中,由于进程之间内存相互是隔离的,所以无法在多个进程中用直接读取的方式共享变量,这时候就可以用multiprocessing库中的 Value在各自隔离的进程中共享变量。
下面是一个多进程的例子:
假设有一个counter用来记录程序经过的总循环次数,每调用一次count函数之后counter就会增加20,在主程序中用循环开10个进程分别调用count函数,那么理想状态下,在十个进程中共享的counter值到程序结束后应该是200。
from multiprocessing import Process, Value
import time
def count(v):
for i in range(20):
time.sleep(0.01)
v.value += 1
def main():
value = Value('i',0)
processes = [Process(target=count, args=(value,)) for i in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()
print(value.value)
if __name__ == '__main__':
for i in range(10):
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
运行这个例子,会得到怎样的结果呢?
188 180 168 186 183 179 186 181 166 186
我在主程序里运行了十次这个程序,而最后的结果是160-180之间,总之,没有一次到200。这是什么原因呢?
相信很多人都已经明白了问题所在,那就是因为在multiprocessing库中的Value是细粒度的,Value中有一个ctypes类型的对象,拥有一个value属性来表征内存中实际的对象。Value可以保证同时只有一个单独的线程或进程在读或者写value值。这么看起来没有什么问题。
然而在第一个进程加载value值的时候,程序却不能阻止第二个进程加载旧的值。两个进程都会把value拷贝到自己的私有内存然后进行处理,并写回到共享值里。
那么这么会出现什么问题呢?
最后的共享值只接收到了一次值的增加,而非两次。
Python客栈送红包、纸质书
利用Lock在不同进程共享变量时加锁 上面的问题其实可以用一个非常简单的方法解决,我们只需要调用multiprocessing库中的Lock (锁)就可以保证一次只能有一个进程访问这个共享变量。修改后的代码如下:
from multiprocessing import Process, Value, Lock
from time import sleep
def count(x,lock):
for i in range(20):
sleep(0.01)
with lock:
x.value += 1
def main():
counter = Value('i',0)
lock = Lock()
processes = [Process(target=count,args=(counter,lock)) for i in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()
print(counter.value)
if __name__ == '__main__':
for i in range(10):
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
这样一来,输出的结果就会恒定为200了。
一些补充
- 调用get_lock() 函数 其实Value这个包里已经包含了锁的概念,如果调用get_lock() 函数就可以自动给共享变量加锁。这样其实是比较推荐的方式,因为这样就不需要同时调用两个包。修改如下:
from multiprocessing import Process, Value
from time import sleep
def count(x):
for i in range(20):
global counter # 声明全局变量
sleep(0.01)
with counter.get_lock(): # 直接调用get_lock()函数获取锁
x.value += 1
def main():
processes = [Process(target=count, args=(counter,)) for i in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()
print(counter.value)
if __name__ == '__main__':
counter = Value('i', 0) # 需要把全局变量移到主程序
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
上面的程序更加明确,且最终结果也是200。
- 使用 multiprocessing.RawValue 整个multiprocessing包里刚刚调用的Value和Lock还可以统一被 multiprocessing.RawValue取代。