Colderleo's blog Colderleo's blog
首页
Linux
C++
Python
前端
工具软件
mysql
索引
关于
GitHub (opens new window)

Colder Leo

热爱代码的打工人
首页
Linux
C++
Python
前端
工具软件
mysql
索引
关于
GitHub (opens new window)
  • 1-django使用
  • centos部署Nginx+uwsgi+django - blog
  • django-admin完全自定义某个模块的页面
  • easy_select2 编辑页面添加外键选择搜索
  • 使用vscode开发python
  • python自定义异常处理with,上下文context管理器
  • python读写excel,xlsx 模块选择
  • python多线程多进程协程
  • TemporaryFile
  • threading用法
  • ubantu设置python优先级
  • conda anacodna 命令行用法
  • 我写的常用工具函数
  • 0-python常用函数
  • smtp发送邮件
  • pandas用法
  • datetime类,时间差
  • format
  • enumerate遍历数组时获取index
  • argv
  • generator 生成器
  • GIL锁、多线程
  • linux用源文件安装python3
  • list sort排序
  • logging日志
  • mulitporcessing共享变量
    • OrderedDict
    • path
    • pip用法
    • pymysql用法 - python连接mysql
    • python bash解释器,脚本前两行,
    • python docstring格式 PEP 257
    • python logging获取logger信息
    • python交互式窗口如何进行多行输入
    • virtualenv用法
    • 标准差
    • 单例模式
    • 函数中定义static变量
    • 切片
    • 去掉字符串中emoji字符
    • 去掉字符串中的空行
    • 全局变量、global和nonlocal
    • 文字识别pytesseract
    • 析构函数和del
    • 用python制作游戏外挂
    • 正则表达式,函数替换字符串
    • 装饰器
    • pycharm中运行pyqt时不报错
    • python 写文件
    • Python
    gaoliu
    2021-10-06
    目录

    mulitporcessing共享变量

    # 字典使用时深层修改失效

    参考:https://stackoverflow.com/questions/59752003/multiprocessing-manager-dict-update-failed (opens new window)

    如果直接更新深层的字典,会不起作用:

    manager = Manager()
    pdict = manager.dict()
    pdict['a'] = {'ss': 'init val'}
    
    print('before: ', pdict)
    pdict['a']['ss'] = 'something new'
    print('after: ', pdict) #前后print出来都是一样的,修改未生效
    
    1
    2
    3
    4
    5
    6
    7

    可使用下面的上下文管理器:

    class ProcessDictUpdate(object):
        '''usage:
            manager = Manager()
            pdict = manager.dict()
            pdict['a'] = {'ss': 'init val'}
            
            print('before: ', pdict)
            with ProcessDictUpdate(pdict) as newd:
                newd['a']['ss'] = 'something new'
            print('after: ', pdict) #修改生效
        '''
        def __init__(self, process_dict, lock:multiprocessing.Lock=None):
            if lock:
                lock.acquire()
            self.lock = lock
            self.process_dict = process_dict
            self.cp = copy.deepcopy(self.process_dict)
    
        def __enter__(self):
            return self.cp
    
        def __exit__(self, exc_type, exc_value, exc_traceback_obj):
            for key in self.cp:
                self.process_dict[key] = self.cp[key]
            if self.lock:
                self.lock.release()
            return True #程序继续执行
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27

    # 加锁

    https://www.jb51.net/article/171299.htm (opens new window)

    python多进程和多线程是大家会重点了解的部分,因为很多工作如果并没有前后相互依赖关系的话其实顺序并不是非常的重要,采用顺序执行的话就必定会造成无谓的等待,任凭cpu和内存白白浪费,这是我们不想看到的。

    为了解决这个问题,我们就可以采用多线程或者多进程的方式,(多线程我们之后再讲),而这两者之间是有本质区别的。就内存而言,已知进程是在执行过程中有独立的内存单元的,而多个线程是共享内存的,这是多进程和多线程的一大区别。

    利用Value在不同进程中同步变量

    在多进程中,由于进程之间内存相互是隔离的,所以无法在多个进程中用直接读取的方式共享变量,这时候就可以用multiprocessing库中的 Value在各自隔离的进程中共享变量。

    下面是一个多进程的例子:

    假设有一个counter用来记录程序经过的总循环次数,每调用一次count函数之后counter就会增加20,在主程序中用循环开10个进程分别调用count函数,那么理想状态下,在十个进程中共享的counter值到程序结束后应该是200。

    from multiprocessing import Process, Value
    import time
     
    def count(v):
      for i in range(20):
        time.sleep(0.01)
        v.value += 1
     
    def main():
      value = Value('i',0)
      processes = [Process(target=count, args=(value,)) for i in range(10)]
     
      for p in processes:
        p.start()
      for p in processes:
        p.join()
     
      print(value.value)
     
    if __name__ == '__main__':
     
      for i in range(10):
        main()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    运行这个例子,会得到怎样的结果呢?

    188 180 168 186 183 179 186 181 166 186

    我在主程序里运行了十次这个程序,而最后的结果是160-180之间,总之,没有一次到200。这是什么原因呢?

    相信很多人都已经明白了问题所在,那就是因为在multiprocessing库中的Value是细粒度的,Value中有一个ctypes类型的对象,拥有一个value属性来表征内存中实际的对象。Value可以保证同时只有一个单独的线程或进程在读或者写value值。这么看起来没有什么问题。

    然而在第一个进程加载value值的时候,程序却不能阻止第二个进程加载旧的值。两个进程都会把value拷贝到自己的私有内存然后进行处理,并写回到共享值里。

    那么这么会出现什么问题呢?

    最后的共享值只接收到了一次值的增加,而非两次。

    Python客栈送红包、纸质书

    利用Lock在不同进程共享变量时加锁 上面的问题其实可以用一个非常简单的方法解决,我们只需要调用multiprocessing库中的Lock (锁)就可以保证一次只能有一个进程访问这个共享变量。修改后的代码如下:

    from multiprocessing import Process, Value, Lock
    from time import sleep
     
    def count(x,lock):
      for i in range(20):
        sleep(0.01)
        with lock:
          x.value += 1
     
     
    def main():
      counter = Value('i',0)
      lock = Lock()
      processes = [Process(target=count,args=(counter,lock)) for i in range(10)]
      for p in processes:
        p.start()
      for p in processes:
        p.join()
     
      print(counter.value)
     
    if __name__ == '__main__':
      for i in range(10):
        main()
        
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25

    这样一来,输出的结果就会恒定为200了。

    一些补充

    1. 调用get_lock() 函数 其实Value这个包里已经包含了锁的概念,如果调用get_lock() 函数就可以自动给共享变量加锁。这样其实是比较推荐的方式,因为这样就不需要同时调用两个包。修改如下:
    from multiprocessing import Process, Value
    from time import sleep
     
    def count(x):
      for i in range(20):
        global counter # 声明全局变量
        sleep(0.01)
        with counter.get_lock(): # 直接调用get_lock()函数获取锁
          x.value += 1
     
    def main():
      processes = [Process(target=count, args=(counter,)) for i in range(10)]
      for p in processes:
        p.start()
      for p in processes:
        p.join()
     
      print(counter.value)
     
    if __name__ == '__main__':
      counter = Value('i', 0) # 需要把全局变量移到主程序
      main()
     
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    上面的程序更加明确,且最终结果也是200。

    1. 使用 multiprocessing.RawValue 整个multiprocessing包里刚刚调用的Value和Lock还可以统一被 multiprocessing.RawValue取代。
    编辑 (opens new window)
    上次更新: 2021/10/11, 12:43:58
    logging日志
    OrderedDict

    ← logging日志 OrderedDict→

    最近更新
    01
    通过模板实现结构体成员拷贝-按成员名匹配
    05-07
    02
    c++17通过模板获取类成员的个数
    05-01
    03
    avx-sse切换惩罚
    04-30
    更多文章>
    Theme by Vdoing | Copyright © 2019-2023 Colder Leo | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式
    ×