8

Python 多线(进)程间共享变量与锁

 1 year ago
source link: https://yanbin.blog/python-multiple-thread-process-share-variables/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Python 多线(进)程间共享变量与锁

2022-09-24 | 阅读(7) |

在通常的编程语言中,多线程访问共享变量时都得注意同步或竞争的问题,所以不得不使用到同步或锁来保证一致性。比如说在 Java 中, ArrayList, HashMap 等类全都不是线程安全的,对应的线程安全的集合是 Vector 和 Hashtable, 或用 java.util.concurrent 包中的集合。

然后 Python 由于存在 GIL(Global Interpreter Lock) 这一全局锁,它阻止了 Python 字节码 被不同线程同时执行,也就是 Python 对象总是被串行存取。这样的话,在 Python 多线程环境中的所有对象都是线程安全的,也就是说在 Python 中, list, set, dict 是线程安全的。

我们来看下面的 Python 代码

from concurrent.futures import ThreadPoolExecutor
def add(num):
    numbers.append(num)
if __name__ == '__main__':
    numbers = []
    with ThreadPoolExecutor(100) as executor:
        for i in range(100000):
            executor.submit(add, i)
    print(len(numbers)

它的输出永远提 100000, 往 numbers 列表中添加元素不会产生错乱,类似的 对  set, dict 的测试也都能得到预期的数据。

而在 Java 中,我们写相似的代码

public class Test {
    public static void main(String[] args) throws Exception {
        List<Integer> numbers = new ArrayList<>();
        ExecutorService executor = Executors.newFixedThreadPool(100);
        IntStream.range(0, 100000).forEach(i -> {
            executor.submit(() -> numbers.add(i));
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.HOURS);
        System.out.println(numbers.size());

无论执行多少遍,我都得到不 100000 个元素,输出的值大概是

97645
99296
80915
99324
99362

对比完 Java, 再回到 Python 的多线程编程,对于 GIL 不禁要思考,既然有它为何还要 threading.Lockmultiprocessing.Lock 呢? GIL 只保证同一时刻执行同一个字节码,或访问同一个对象,但对代码块的整体同步还得要 Lock, 如

item = {'count': 0}
def foo():
    count = item['count']
    # do something here
    item['count'] = count + 1

如果以上几行代码不放到 with lock: 中, count 的值就会混乱。立马就做一个测试看看

import time
from concurrent.futures import ThreadPoolExecutor
item = {'count': 1}
def foo():
    global item    # 这行可省略
    count = item['count']
    # do something here
    time.sleep(0.0001)
    item['count'] = count + 1
if __name__ == '__main__':
    with ThreadPoolExecutor(100) as executor:
        for i in range(100000):
            executor.submit(foo)
    print(item['count'])

输出结果大概是

13783
11021

这样的结果,加上 Lock 再执行

import time
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
item = {'count': 0}
lock = Lock()
def foo():
    with lock:
        global item   # 这行可省略
        count = item['count']
        # do something here
        time.sleep(0.0001)
        item['count'] = count + 1
if __name__ == '__main__':
    with ThreadPoolExecutor(100) as executor:
        for i in range(100000):
            executor.submit(foo)
    print(item['count'])

每次都是输出 100000, 有 GIL,同样需要锁

线程间共享变量很简单,直接使用就了,在子进程中共享变量就变得复杂了,运行下面的测试代码

from concurrent.futures import ProcessPoolExecutor
import os
item = {'count': 0}
def foo():
    count = item['count']
    item['count'] = count + 1
    print('foo process: %s, id(item): %s, id(count): %s, count: %s' %
          (os.getpid(), id(item), id(count), count))
item['count'] = 1
print('outer process: %s, id(item): %s, id(count): %s' % (os.getpid(), id(item), id(item['count'])))
if __name__ == '__main__':
    print('main process: %s, id(item): %s, id(count): %s' % (os.getpid(), id(item), id(item['count'])))
    item['count'] = 10
    with ProcessPoolExecutor(2) as executor:
        for i in range(5):
            executor.submit(foo)
    print('main process: %s, id(item): %s, id(count): %s' % (os.getpid(), id(item), item['count']))

outer process: 23093, id(item): 4336193152, id(count): 4334862576
main process: 23093, id(item): 4336193152, id(count): 4334862576
outer process: 23097, id(item): 4381875520, id(count): 4377919728
outer process: 23096, id(item): 4455259456, id(count): 4451287280
foo process: 23096, id(item): 4455259456, id(count): 4451287280, count: 1
foo process: 23097, id(item): 4381875520, id(count): 4377919728, count: 1
foo process: 23096, id(item): 4455259456, id(count): 4451287312, count: 2
foo process: 23097, id(item): 4381875520, id(count): 4377919760, count: 2
foo process: 23096, id(item): 4455259456, id(count): 4451287344, count: 3
main process: 23093, id(item): 4336193152, id(count): 10


用不同的颜色跟踪每一个进程就好理解了

  1. 每个进程都会执行方法和 if __name__ == '__main__' 之外的代码
  2. 主进程会执行 if __name__ == '__name__' 之中的代码
  3. 子进程不会执行 if __name__ == '__name__' 中创建子进程间的代码,如前面加亮的 14-15 行,它们只对主进程有效
  4. 每个进程(主进程和子进程) 都有自己的 item 变量,完全是不同享的

这也帮助我们懂得了为什么 ProcessPoolExecutor 要放到 if  __name__ == '__main__ 中去执行,它用来区分了外部代码与主进程的执行代码。

如果试图把主进程的变量通过参数传入到子进程会如何呢?

from concurrent.futures import ProcessPoolExecutor
import os
item = {'count': 0}
def foo(seq, share):
    count = share['count']
    share['count'] = count + 1
    print('#%s foo process: %s, id(share): %s, id(count): %s, count: %s' %
          (seq, os.getpid(), id(share), id(count), count))
item['count'] = 1
print('outer process: %s, id(item): %s, id(count): %s' % (os.getpid(), id(item), id(item['count'])))
if __name__ == '__main__':
    print('main process: %s, id(item): %s, id(count): %s' % (os.getpid(), id(item), id(item['count'])))
    item['count'] = 10
    with ProcessPoolExecutor(2) as executor:
        for i in range(5):
            executor.submit(foo, i, item)
    print('main process: %s, id(item): %s, id(count): %s' % (os.getpid(), id(item), item['count']))

outer process: 24180, id(item): 4433710720, id(count): 4432380144
main process: 24180, id(item): 4433710720, id(count): 4432380144
outer process: 24183, id(item): 4393803392, id(count): 4389830896
outer process: 24182, id(item): 4408990912, id(count): 4405035248
#0 foo process: 24183, id(share): 4395485376, id(count): 4389831184, count: 10
#1 foo process: 24182, id(share): 4410656960, id(count): 4405035536, count: 10
#2 foo process: 24183, id(share): 4395485760, id(count): 4389831184, count: 10
#3 foo process: 24182, id(share): 4410657280, id(count): 4405035536, count: 10
#4 foo process: 24183, id(share): 4395485376, id(count): 4389831184, count: 10
main process: 24180, id(item): 4433710720, id(count): 10


即使通过参数传入到子进程的主进程中的变量每次都会产生一个深层拷贝,所以在子进程中可以读取到相应的值,但不能修改其中的内部状态,而且在重用进程的时候也不会是同一个共享变量。

在进程间共享变量的变法,用 multiprocessing.Manager()

from concurrent.futures import ProcessPoolExecutor
import os
import multiprocessing
item = {'count': 0}
def foo(seq, share):
    count = share['count']
    share['count'] = count + 1
    print('#%s foo process: %s, id(share): %s, id(count): %s, count: %s' % (seq, os.getpid(), id(share), id(count), count))
item['count'] = 1
print('outer process: %s, id(item): %s, id(count): %s' % (os.getpid(), id(item), id(item['count'])))
if __name__ == '__main__':
    print('main process: %s, id(item): %s, id(count): %s' % (os.getpid(), id(item), id(item['count'])))
    item['count'] = 10
    for_share = multiprocessing.Manager().dict(item)
    with ProcessPoolExecutor(2) as executor:
        for i in range(5):
            executor.submit(foo, i, for_share)
    print('main process: %s, id(item): %s, id(count): %s' % (os.getpid(), id(item), item['count']))
    print('main process: %s, type(for_share): %s, for_share: %s' % (os.getpid(), type(for_share), for_share))

outer process: 24801, id(item): 4304703104, id(count): 4303372528
main process: 24801, id(item): 4304703104, id(count): 4303372528
outer process: 24803, id(item): 4527087296, id(count): 4523114736
outer process: 24804, id(item): 4368555392, id(count): 4364599536
outer process: 24805, id(item): 4335853248, id(count): 4331880688
#1 foo process: 24805, id(share): 4337443664, id(count): 4331880976, count: 10
#0 foo process: 24804, id(share): 4370146128, id(count): 4364599856, count: 11
#3 foo process: 24804, id(share): 4370146080, id(count): 4364599888, count: 12
#2 foo process: 24805, id(share): 4337443616, id(count): 4331881040, count: 12
#4 foo process: 24804, id(share): 4370146272, id(count): 4364599920, count: 13
main process: 24801, id(item): 4304703104, id(count): 10
main process: 24801, type(for_share): <class 'multiprocessing.managers.DictProxy'>, for_share: {'count': 14}


通过 multiprocessing.Manager().dict(item) 可以共享变量,它的实际类型是  DictProxy。眼尖的朋友应该还会注意到我们只希望起动三个进程(一个主进程和两个子进程),然而却多出了一个进程(24803),这一定用来在进程中共享数据的进程。

multiprocessing.Manager() 除了支持 dict 外,还支持 list, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value 和 Array 类型。

接下来检验一下 Manager() 包裹的数据是否存在竞争问题

import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor
def foo(share):
    count = share['count']
    time.sleep(0.0001)
    share['count'] = count + 1
if __name__ == '__main__':
    item = {'count': 0}
    for_share = multiprocessing.Manager().dict(item)
    with ProcessPoolExecutor(2) as executor:
        for i in range(10000):
            executor.submit(foo, for_share)
    print(for_share)

输出的数字不定,像

{'count': 9248}
{'count': 8998}

为同步 foo() 方法中的代码行,我们可以用 multiprocessing.Manager().Lock(), 需要传这个 Lock 到子进程当中去

import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor
def foo(share, lock):
    with lock:
        count = share['count']
        time.sleep(0.0001)
        share['count'] = count + 1
if __name__ == '__main__':
    item = {'count': 0}
    for_share = multiprocessing.Manager().dict(item)
    lock = multiprocessing.Manager().Lock()
    with ProcessPoolExecutor(2) as executor:
        for i in range(10000):
            executor.submit(foo, for_share, lock)
    print(for_share)

这样就能保证每次输入都是

{'count': 10000}

上面的 multiprocessing.Maanger().Lock 不能被替换为 multiprocessing.Lock(), 否则有问题

Lock objects should only be shared between processes through inheritance

executor.submit(foo, ...) 的方式,在子进程中出了任何异常都是静默状态,也就是根据无法知道程序执行是对还是错。可以通过结果检查每个子进程执行是否有异常

    futures = []
    with ProcessPoolExecutor(2) as executor:
        for i in range(10000):
            futures.append(executor.submit(foo, for_share, lock))
    print(futures[0].exception())  # 判断每一个结果是否有异常

或都用 executor.map() 时能子进程的异常会报告出来

    results = []
    with ProcessPoolExecutor(2) as executor:
        results.extend(executor.map(foo, *zip(*[(for_share, lock) for i in range(0, 10000)])))

有异常时会在控制台中看到错误输出,用 executor.map() 比 executor.submit() 更保险些。

注意:上面 executor.map() 调用的 foo 有多个参数,所以需要用 *zip(*list_of_tuple) 的方式。

那么 multiprocessing.Lock() 什么时候用得着呢?在用 multiprocessing.Process 中用

import multiprocessing
from multiprocessing import Process
import time
def foo(share, lock):
    with lock:
        count = share['count']
        time.sleep(0.0001)
        share['count'] = count + 1
if __name__ == '__main__':
    item = {'count': 0}
    for_share = multiprocessing.Manager().dict(item)
    lock = multiprocessing.Lock()  # 此时也可以使用 multiprocessing.Manager().Lock()
    processes = [Process(target=foo, args=(for_share, lock)) for i in range(100)]
    for process in processes:
        process.start()
    for process in processes:
        process.join()
    print(for_share)

输出总是为

{'count': 100}

如果在 foo() 方法中不使用 lock 的话

import multiprocessing
from multiprocessing import Process
import time
def foo(share, lock):
    # with lock:
    count = share['count']
    time.sleep(0.0001)
    share['count'] = count + 1
if __name__ == '__main__':
    item = {'count': 0}
    for_share = multiprocessing.Manager().dict(item)
    lock = multiprocessing.Lock()
    processes = [Process(target=foo, args=(for_share, lock)) for i in range(100)]
    for process in processes:
        process.start()
    for process in processes:
        process.join()
    print(for_share)

输出将会少于 100,如

{'count': 83}

如果创建过多的子进程可能出现 Connection refused 异常

ConnectionRefusedError: [Errno 61] Connection refused

在进程间共享数据仍然需要 multiprocessing.Manager()

multiprocessing.ProcessPoolExecutor() 要用 multiprocessing.Manager().Lock(), 而 Process() 既可用 multiprocessing.Lock() 也能用 multiprocessing.Manager().Lock()

在 multiprocessing.Pool() 中共享数据与锁

import multiprocessing
import time
def foo(share, lock):
    with lock:
        count = share['count']
        time.sleep(0.0001)
        share['count'] = count + 1
if __name__ == '__main__':
    item = {'count': 0}
    for_share = multiprocessing.Manager().dict(item)
    lock = multiprocessing.Manager().Lock()
    # lock = multiprocessing.Lock() # 不能用这个 Lock
    with multiprocessing.Pool(2) as executor:
        executor.starmap(foo, list(zip([for_share] * 10000, [lock] * 10000)))
    print(for_share)

multiprocessing.Pool() 也不能用 multiprocessing.Lock(), 共享数据也必须用 multiprocessing.Manager()。

子进程通过 self 访问当前实例的属性

import 进来的对象

Categories: Python

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK