8

进程Process、线程Threa、协程Coroutine

 3 years ago
source link: https://maxyoung.fun//blog/%E5%B9%B6%E5%8F%91.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

此文章赞不成熟

进程Process、线程Thread、协程Coroutine

进程: 对应处理器, 一个处理器运行一个进程, 多个进程可以并行Parallelism, 也就是说多个任务同时执行
线程:一个进程里可以运行多个线程, 多个线程可以并发concurrent, 也就是交替执行
协程: 协程是在单线程里, 多个子程序可以交替执行实现并发, 因为是单线程, 所以可以实现资源共享

看示例代码:

import asyncio
import threading
import time


async def foo():
    print('foo start')
    print("foo thread is {}".format(threading.currentThread()))
    await asyncio.sleep(1)
    print('foo finish')


async def bar():
    print('bar start')
    print("bar thread is {}".format(threading.currentThread()))
    await asyncio.sleep(1)
    print('bar finish')


print("main thread is {}".format(threading.currentThread()))
start = time.time()
loop = asyncio.get_event_loop()
tasks = [foo(), bar()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print("运行时间是: {}".format(end-start))

运行结果是:

main thread is <_MainThread(MainThread, started 140735607858048)>
foo start
foo thread is <_MainThread(MainThread, started 140735607858048)>
bar start
bar thread is <_MainThread(MainThread, started 140735607858048)>
foo finish
bar finish
运行时间是: 1.005871295928955

我们可以看到foo和bar两个函数交替执行, 两个子线程和主线程是同一个线程, 执行时间是1秒, 如果我们顺序执行两个函数, 那么时间也会是1秒

协程相比多线程效率更高, 因为没有切换线程所需要的开销,而且很重要的是可以共享资源(下面我们会解释共享资源)

但是我发现一个问题, 这里我们用的是await asyncio.sleep(1)来模拟阻塞(这里也可以使用aiohttp, 应用在io请求的场景下, 比如web请求, 比如爬虫等等)
但是实际开发中, 阻塞代码可能千差万别, 我们实际上需要await我们自定义的一个函数, 这个函数可能包含了复杂的逻辑, 可能有http请求, 可能有数据库读写, asyncio应该提供这样的接口, 但是我不想拆分的那么细, 拆分也面临很多实际困难, 那么应该怎么办呢?

上面的问题暂且不表, 我们先说说多线程, 实现多线程, 我们可以使用concurrent.futures.ThreadPoolExecutor 示例代码:

import threading
import time
from concurrent.futures import ThreadPoolExecutor

_executor = ThreadPoolExecutor(2)


def foo(array):
    print("foo start")
    print("foo thread {}".format(threading.currentThread()))
    time.sleep(1)
    array.append("foo")
    print("foo end time is {}".format(time.time()))


def bar(array):
    print("bar start")
    print("bar thread {}".format(threading.currentThread()))
    time.sleep(1)
    array.append("bar")
    print("bar end time is {}".format(time.time()))


start = time.time()
print("start time is {}".format(time.time()))
print("main thread {}".format(threading.currentThread()))
my_list = []
print("init list is {}".format(my_list))
executor = ThreadPoolExecutor(max_workers=2)
executor.submit(foo, my_list)
executor.submit(bar, my_list)
print("finally list is {}".format(my_list))
end = time.time()
print("运行时间是: {}".format(end-start))

运行结果是:

start time is 1615809735.847754
main thread <_MainThread(MainThread, started 140735607858048)>
init list is []
foo start
foo thread <Thread(ThreadPoolExecutor-1_0, started daemon 123145423056896)>
bar start
finally list is []
bar thread <Thread(ThreadPoolExecutor-1_1, started daemon 123145428312064)>
运行时间是: 0.0009667873382568359
foo end time is 1615809736.848752
bar end time is 1615809736.8536499

我们可以看到, 主线程和执行a、b的线程是3个不同的线程, 主线程不会等a、b执行完而顺序执行, 同样的, 执行时间也在1秒左右
(我们我们配置线程是是1, ThreadPoolExecutor(1), 那么a、b会在同一个线程里顺序执行, 总时间会变成2, 这里就不在赘述了)
我们还可以看到, 我们定义的my_list没有变化, 看来也没有共享数据

那么问题来了, 虽然我们可以用此方法实现并发, 提高运行效率, 但是我们想实现在a、b和主线程在一个线程里, 且能共享数据, 应该怎么办呢?

Asyncio和ThreadPoolExecutor

为了解决上面的问题, 我们Asyncio和ThreadPoolExecutor结合起来, 看示例代码:

import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor

_executor = ThreadPoolExecutor(2)

async def foo(array):
    print('foo start')
    print("foo thread is {}".format(threading.currentThread()))
    foo_loop = asyncio.get_event_loop()
    await foo_loop.run_in_executor(_executor, my_func, array, "foo")
    print('foo finish')


async def bar(array):
    print('bar start')
    print("bar thread is {}".format(threading.currentThread()))
    bar_loop = asyncio.get_event_loop()
    await bar_loop.run_in_executor(_executor, my_func, array, "bar")
    print('bar finish')


def my_func(array, item):
    print("my_func thread is {}".format(threading.currentThread()))
    array.append(item)
    time.sleep(1)


print("main thread is {}".format(threading.currentThread()))
start = time.time()
my_list = []
print("init list is {}".format(my_list))
loop = asyncio.get_event_loop()
tasks = [foo(my_list), bar(my_list)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print("finally list is {}".format(my_list))
print("运行时间是: {}".format(end-start))

运行结果是:

main thread is <_MainThread(MainThread, started 140735607858048)>
init list is []
bar start
bar thread is <_MainThread(MainThread, started 140735607858048)>
my_func thread is <Thread(ThreadPoolExecutor-0_0, started daemon 123145474777088)>
foo start
foo thread is <_MainThread(MainThread, started 140735607858048)>
my_func thread is <Thread(ThreadPoolExecutor-0_1, started daemon 123145480032256)>
bar finish
foo finish
finally list is ['bar', 'foo']
运行时间是: 1.0060360431671143

我们可以看到, 主函数和foo、bar函数同属于一个线程, my_func函数的线程不一样, 但是, 我们发现是可以共享数据的, 这里我也不知道怎么解释
只能解释为执行my_func的线程还是和主线程同属一个线程
这样我们解决了上面的问题

我们对上面的代码做一下优化:

import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor


async def foo(array, item):
    print('foo start')
    print("foo thread is {}".format(threading.currentThread()))
    bar_loop = asyncio.get_event_loop()
    _executor = ThreadPoolExecutor(1)
    await bar_loop.run_in_executor(_executor, my_func, array, item)
    print('foo finish')


def my_func(array, item):
    print("my_func thread is {}".format(threading.currentThread()))
    array.append(item)
    time.sleep(1)


print("main thread is {}".format(threading.currentThread()))
start = time.time()
my_list = []
print("init list is {}".format(my_list))
loop = asyncio.get_event_loop()
tasks = [foo(my_list, i) for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print("finally list is {}".format(my_list))
print("运行时间是: {}".format(end-start))

优化了两方面:

  1. tasks使用了对同一个函数循环添加到list, 更贴近实际场景
  2. _executor定义在函数内, max workers数量可以定义为1, 也可以不传参数采用默认值, 实际上只需要一个就可以了 运行结果是:
    main thread is <_MainThread(MainThread, started 140735693562752)>
    init list is []
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-0_0, started daemon 123145324679168)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-1_0, started daemon 123145329934336)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-2_0, started daemon 123145335189504)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-3_0, started daemon 123145340444672)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-4_0, started daemon 123145345699840)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-5_0, started daemon 123145350955008)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-6_0, started daemon 123145356210176)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-7_0, started daemon 123145361465344)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-8_0, started daemon 123145366720512)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-9_0, started daemon 123145371975680)>
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    finally list is [2, 6, 0, 7, 8, 3, 9, 4, 1, 5]
    运行时间是: 1.0114827156066895
    

对于IO型程序, 为了减少IO等待带来的效率地下, 我们可以使用多线程和协程
但是对于计算型程序, 没有IO等待的问题, 为了提高效率, 就可以利用多进程, 将多核处理器充分利用起来
对于多进程的实现, 我们可以使concurrent.futures.ProcessPoolExecutor

import concurrent.futures
import math
import time

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n, m=None):
    time.sleep(1)
    print("{}-{}".format(n ,m))

    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # 这里有三种写法, 可以灵活使用

        # 第一种
        # futures = {executor.submit(is_prime, i): i for i in PRIMES}
        # for f in concurrent.futures.as_completed(futures):
        #     i = futures[f]
        #     data = f.result()
        #     print("{} {}".format(i, data))

        # 第二种, 注: 如果is_prime有两个参数, submit则增加一个参数
        # for i in PRIMES:
        #     executor.submit(is_prime, i, 2)

        # 第三种
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    t1 = time.time()
    main()
    print("耗时{}".format(time.time()-t1))

multi processing and asyncio

我们让多进程和asyncio结合一下, 看看是什么情况:

import asyncio
import threading
import time
from concurrent.futures import ProcessPoolExecutor


async def foo(array):
    print('foo start')
    print("foo thread is {}".format(threading.currentThread()))
    foo_loop = asyncio.get_event_loop()
    _executor = ProcessPoolExecutor()
    tasks = [foo_loop.run_in_executor(_executor, my_func, array, item) for item in range(10)]
    completed, pending = await asyncio.wait(tasks)
    results = [t.result() for t in completed]
    print('results: {!r}'.format(results))
    print('foo finish')
    return results


def my_func(array, item):
    print("my_func thread is {}".format(threading.currentThread()))
    array.append(item)
    time.sleep(1)
    return item


print("main thread is {}".format(threading.currentThread()))
start = time.time()
my_list = []
print("init my list is {}".format(my_list))
loop = asyncio.get_event_loop()
result = loop.run_until_complete(foo(my_list))
loop.close()
end = time.time()
print("finally my list is {}".format(my_list))
print("result list is {}".format(result))
print("运行时间是: {}".format(end-start))

运行结果是:

main thread is <_MainThread(MainThread, started 140736046461824)>
init my list is []
foo start
foo thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
results: [4, 9, 5, 1, 6, 2, 7, 3, 8, 0]
foo finish
finally my list is []
result list is [4, 9, 5, 1, 6, 2, 7, 3, 8, 0]
运行时间是: 3.021937847137451

我们看到, 运行时间变长了, 因为sleep相当于IO等待, 适用于多线程. 在这里使用多进程, 因为我的电脑CPU数量是4, 有10个任务, 相当于4个CPU要同时运行3次, 所以总时间在3秒
同时我们看到list init和fanally没有变化, 也印证了多进程没有办法共享数据
但是我们欣喜的看到: results: [4, 9, 5, 1, 6, 2, 7, 3, 8, 0], 我们通过completed, pending = await asyncio.wait(tasks)可以将多个进程里执行的任务的结果组织起来, 这样建立了联系, 在某些场景里是很有用的.

multi processing and concurrent

上面已经说到了多进程和协程, 那么如何让多进程与协程结合呢?

实现方式参照: https://pypi.org/project/aiomultiprocess/


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK