2

【Python多任务--进程,协程】

 1 year ago
source link: https://blog.51cto.com/u_15874356/5938744
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
进程是线程的容器,一个进程可以有多个线程

  • 动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的。
  • 并发性:任何进程都可以同其他进程一起并发执行。
  • 独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位。
  • 异步性:由于进程间的相互制约,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进。

二、python多进程

创建多进程的两种方式:

  • 调用multiprocessing.Process模块
  • 重写multiprocessing.Process类的run方法

1、调用multiprocessing.Process模块创建进程

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

  1. 需要使用关键字的方式来指定参数
  2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=('liang',)
4 kwargs表示调用对象的字典,kwargs={'name':'anne','age':18}
5 name为子进程的名称

from multiprocessing import Process

def work1(name):
for i in range(4):
time.sleep(1)
print(f'{name}浇花的第{i + 1}秒')

def work2(name):
for i in range(3):
time.sleep(1)
print(f'{name}打墙的第{i + 1}秒')


if __name__ == '__main__':
p1 = Process(target=work1, args=('liang',))
p2 = Process(target=work2, args=('小狼',))

p1.start()
p2.start()

p1.join()
p2.join()
print('主线程执行完毕')

#输出
liang浇花的第1秒
小狼打墙的第1秒
小狼打墙的第2秒
liang浇花的第2秒
liang浇花的第3秒
小狼打墙的第3秒
liang浇花的第4秒
主线程执行完毕

2、重写multiprocessing.Process类的run方法

将要执行的任务写入run方法,同样的任务可以多线程并行执行

import time
from multiprocessing import Process

class MyProcess(Process):
"""自定义的进程类"""
def __init__(self, name):
super().__init__()
self.name = name

def run(self):
for i in range(3):
time.sleep(1)
print(f'{self.name}浇花的第{i + 1}秒')
if __name__ == '__main__':

q_list = []
for i in range(2): #创建4个线程并启动
p = MyProcess(f"liang{i}")
q_list.append(p)
p.start()
for q in q_list: #等待启动的线程执行结束
q.join()
print('主进程执行完毕')

#输出
liang0浇花的第1秒
liang1浇花的第1秒
liang0浇花的第2秒
liang1浇花的第2秒
liang0浇花的第3秒
liang1浇花的第3秒
主进程执行完毕

注意: 多进程 执行,必须在main函数下, if __name__ == '__main__':

三、多进程之间通讯

多进程之间所有资源都是独立的,不能共享全局变量

queue.Queue模块只能在一个进行中使用,可以实现一个进程中的多个线程相互通讯

多个进程之间的相互通讯,需要用到--multiprocessing.Queue:可以多个进程之间共用(通讯)

"""
进程之间通信:使用队列
multiprocessing.Queue:可以多个进程之间共用(通讯)
queue.Queue模块只能在一个进行中使用,一个进程中多个线程使用
"""
from multiprocessing import Process, Queue

def work1(q):
for i in range(5000):
n = q.get()
n += 1
q.put(n)

print("work1结束时候n的值:", n)


def work2(q):
for i in range(5000):
n = q.get()
n += 1
q.put(n)
print("work2结束时候n的值:", n)


if __name__ == '__main__':
q = Queue()
q.put(100)
p1 = Process(target=work1, args=(q,))
p2 = Process(target=work2, args=(q,))

p1.start()
p2.start()

p1.join()
p2.join()
print('两个子进程执行结束之后,主进程打印的n:', q.get())

#输出
work2结束时候n的值: 10090
work1结束时候n的值: 10100
两个子进程执行结束之后,主进程打印的n: 10100

===================================================================================================================================================================================================================================================

接上文,我们下面来讲一下:

python多任务--协程

协程 ,又称为微线程,它是实现多任务的另一种方式,只不过是比线程更小的执行单元。因为它自带CPU的上下文,这样只要在合适的时机,我们可以把一个协程切换到另一个协程。

协程的优势

  • 执行效率高,因为子程序切换函数,而不是线程,没有线程切换的开销,由程序自身控制切换。于多线程相比,线程数量越多,切换开销越大,协程的优势越明显
  • 不需要锁的机制,只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁。

二、实现协程的几种方式

1、yield(生成器)可以很容易的实现从一个函数切换到另外一个函数

def work1():
for i in range(5):
print(f"work1--befor----{i}")
yield
print(f"work1--after----{i}")
time.sleep(0.5)

def work2():
for i in range(5):
print(f"work2---befor---{i}")
yield
print(f"work2--after----{i}")
time.sleep(0.5)

def main():
g1 = work1()
g2 = work2()
while True:
try:
next(g1)
print('主程序')
next(g2)
except StopIteration:
break
main()

运行结果如下:

【Python多任务--进程,协程】_事件循环

2、原生的协程

import asyncio
# 定义一个协程函数
async def work1():
for i in range(10):
print(f"work1--浇花----{i}")

# 调用协程函数,返回的是一个协程对象
cor1 = work1()

# 执行协程
asyncio.run(cor1)
2.1、使用原生的协程实现多任务(不同任务)

协程中切换,通过await语法来挂起自身的协程。await后面跟上耗时操作,耗时操作一般指IO操作: 网络请求,文件读取等,使用asyncio.sleep模拟耗时操作。协程的目的也是让这些IO操作异步化。
sleep()需要用asyncio.sleep()
await必须要在 async def function(): 中用,否则会报错

import asyncio
async def work1():
for i in range(3):
print(f"work1--浇花----{i}")
await asyncio.sleep(1)

async def work2():
for i in range(5):
print(f"work2--打墙----{i}")
await asyncio.sleep(1)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
# 创建两个协程任务
tasks = [
work1(),
work2(),
]
# 启动事件循环并将协程放进去执行
loop.run_until_complete(asyncio.wait(tasks))

#输出
work1--浇花----0
work2--打墙----0
work1--浇花----1
work2--打墙----1
work1--浇花----2
work2--打墙----2
work2--打墙----3
work2--打墙----4
2.2、使用原生的协程实现多任务(同一方法处理大量数据)
import asyncio
from queue import Queue
import time

def decorator(func):
def wrapper():
# 函数执行之前获取系统时间
start_time = time.time()
func()
# 函数执行之后获取系统时间
end_time = time.time()
print('执行时间为:', end_time - start_time)
return end_time - start_time
return wrapper

async def work1(q):
while q.qsize():
print(f"请求url:{q.get()}")
await asyncio.sleep(0.1)

@decorator
def main():
#创建一个包含有1000条url的队列
q = Queue()
for i in range(1000):
q.put(f"www.baidu.com.{i}")

loop = asyncio.get_event_loop()
# 创建100个协程任务
tasks = [work1(q) for i in range(100)]

# 启动事件循环并将协程放进去执行
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

if __name__ == '__main__':
main()

#输出
...
请求url:www.baidu.com.890
请求url:www.baidu.com.891
请求url:www.baidu.com.892
...
执行时间为: 1.060093641281128

100个协程执行1000个耗时0.1秒的请求只需要1秒

2.3、版本区别:

python 3.7 以前的版本调用异步函数的步骤:(如以上代码)

  • 1、调用asyncio.get_event_loop()函数获取事件循环loop对象
  • 2、通过不同的策略调用loop.run_forever()方法或者loop.run_until_complete()方法执行异步函数

python3.7 以后的版本

  • 1、asyncio.run() 函数用来运行最高层级的入口点,下例的main()函数。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
  • 2、await 等待一个协程,也可以启动一个协程。
  • 3、asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。下例并发运行两个work协程

改动后代码如下

#以上省略
async def main():
q = Queue()
for i in range(1000):
q.put(f"www.baidu.com.{i}")
#创建了任务
tasks = [asyncio.create_task(work1(q)) for i in range(100)]
#将任务丢到执行队列里面去
[await t for t in tasks]

if __name__ == '__main__':
m=main()
start_time = time.time()
asyncio.run(m)
end_time = time.time()
print('运行时间{}秒'.format(end_time - start_time))

3、greenlet模块

import time
import greenlet

"""
greenlet:在协程之间只能手动进行切换
"""
def work1():
for i in range(6):
time.sleep(1)
cor2.switch()
print(f'浇花的第{i + 1}秒')

def work2():
for i in range(5):
time.sleep(1)
cor1.switch()
print(f'打墙的第{i + 1}秒')

cor1 = greenlet.greenlet(work1)
cor2 = greenlet.greenlet(work2)
cor1.switch()

4、gevent模块实现多任务

  • gevent模块对greenlet又做了一层封装,当程序遇到IO耗时等待的时候会进行自动切换
  • gevent中默认是遇到gevent.sleep()会自动进行切换
  • 如果让gevent遇到io耗时自动切换:需要在程序的导包处加一个补丁monkey.patch_all(),该补丁不支持多线程
from gevent import monkey
monkey.patch_all()
import gevent

def work1():
for i in range(6):
gevent.sleep(1)
print(f'浇花的第{i + 1}秒')


def work2():
for i in range(5):
gevent.sleep(1)
print(f'打墙的第{i + 1}秒')


# 创建两个协程
g1 = gevent.spawn(work1)
g2 = gevent.spawn(work2)

# 等待所有协程任务运行完毕
gevent.joinall([g1, g2])

示例: 模拟50000个协程执行对100000个地址的请求

from gevent import monkey
monkey.patch_all()
import gevent
import time

#创建100000个地址
urls = ['http://www.baidu.com' for i in range(100000)]

#定义需要执行的任务函数
def work():
while urls:
url = urls.pop()
# res = requests.get(url)
time.sleep(0.5)
print(f"正在请求url:{url},请求结果:url")

def main():
cos = []

#创建50000个协程
for i in range(50000):
cor = gevent.spawn(work)
cos.append(cor)

# 等待所有协程任务运行完毕
gevent.joinall(cos)

main()

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK