2

【Python多任务--进程池Pool】

 1 year ago
source link: https://blog.51cto.com/u_15874356/5938797
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

【Python多任务--进程池Pool】

推荐 原创

HI王小呆 2022-12-15 10:12:13 ©著作权

文章标签 进程池 子进程 浮点数 文章分类 IT职场 其它 yyds干货盘点 阅读数185

进程池Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

一:使用进程池

例1:非阻塞

from multiprocessing import Pool
import os, time, random


def worker(name):
t_start = time.time()
print("%s开始执行,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
t_stop = time.time()
print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))

def main():
po = Pool(5) # 定义一个进程池,最大进程数5

# 往进程池中添加任务
for i in range(10):
# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
po.apply_async(worker, (f'liang{i}',))

print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("----all_done----")

if __name__ == '__main__':
main()
----start----
liang0开始执行,进程号为10404
liang1开始执行,进程号为9920
liang2开始执行,进程号为13136
liang3开始执行,进程号为10180
liang4开始执行,进程号为7708
liang4 执行完毕,耗时0.57
liang5开始执行,进程号为7708
liang2 执行完毕,耗时1.20
liang6开始执行,进程号为13136
liang1 执行完毕,耗时1.33
liang7开始执行,进程号为9920
liang0 执行完毕,耗时1.34
liang8开始执行,进程号为10404
liang3 执行完毕,耗时1.96
liang9开始执行,进程号为10180
liang5 执行完毕,耗时1.73
liang9 执行完毕,耗时0.54
liang8 执行完毕,耗时1.28
liang7 执行完毕,耗时1.37
liang6 执行完毕,耗时1.88
----all_done----

函数解释:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表, kwds为传递给func的关键字参数列表;
  • apply(func[, args[, kwds]]):使用阻塞方式调用func
  • close():关闭Pool,使其不再接受新的任务;
  • terminate():不管任务是否完成,立即终止;
  • join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

执行说明:创建一个进程池pool,并设定进程的数量为5,range(10)会相继产生10个对象,10个对象被提交到pool中,因pool指定进程数为5,所以0、1、2、3、4会直接送到进程中执行,当其中一个执行完后才空出一个进程处理对象,继续去执行新的对象,所以会出现输出“liang5开始执行,进程号为7708”出现在"liang4 执行完毕,耗时0.57"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“----start----”,主程序在pool.join()处等待各个进程的结束。

例2:阻塞

from multiprocessing import Pool
import os, time, random


def worker(name):
t_start = time.time()
print("%s开始执行,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
t_stop = time.time()
print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))

def main():
po = Pool(3) # 定义一个进程池,最大进程数3

# 往进程池中添加任务
for i in range(0, 5):
# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
po.apply(worker, (f'liang{i}',))

print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("----all_done----")

if __name__ == '__main__':
main()
liang0开始执行,进程号为1976
liang0 执行完毕,耗时1.75
liang1开始执行,进程号为12624
liang1 执行完毕,耗时0.57
liang2开始执行,进程号为12444
liang2 执行完毕,耗时0.52
liang3开始执行,进程号为1976
liang3 执行完毕,耗时1.23
liang4开始执行,进程号为12624
liang4 执行完毕,耗时0.85
----start----
----all_done----

因为是阻塞,主函数会等待进程的执行,执行完之后才会继续往下,所以运行完所有进程后才输出“----start----”

例3、使用进程池,并返回结果

from multiprocessing import Pool
import os, time, random


def worker(name):
print("%s开始执行,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
return name,os.getpid()

def main():
po = Pool(3) # 定义一个进程池,最大进程数3

res=[]
# 往进程池中添加任务
for i in range(0, 5):
# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
res.append(po.apply_async(worker, (f'liang{i}',)))

print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
for result in res:
print(result.get()) #get()函数得出每个返回结果的值
print("----all_done----")

if __name__ == '__main__':
main()

输出结果:

----start----
liang0开始执行,进程号为14012
liang1开始执行,进程号为13000
liang2开始执行,进程号为14120
liang3开始执行,进程号为14012
liang4开始执行,进程号为14012
('liang0', 14012)
('liang1', 13000)
('liang2', 14120)
('liang3', 14012)
('liang4', 14012)
----all_done----

例4、多进程执行多个任务

from multiprocessing import Pool
import os, time, random


def worker1(name):
print("%s开始执行work1,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)

def worker2(name):
print("%s开始执行work2,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)

def worker3(name):
print("%s开始执行work3,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)

def main():
po = Pool(4) # 定义一个进程池,最大进程数3

work_list=[worker1,worker2,worker3]
# 往进程池中添加任务
for work in work_list:
for i in range(3):
po.apply_async(work, (f'liang{i}',))

print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后

print("----all_done----")

if __name__ == '__main__':
main()

线程池4个线程执行3个任务,每个任务执行3次。
输出:

----start----
liang0开始执行work1,进程号为13088
liang1开始执行work1,进程号为4908
liang2开始执行work1,进程号为4200
liang0开始执行work2,进程号为8124
liang1开始执行work2,进程号为4908
liang2开始执行work2,进程号为13088
liang0开始执行work3,进程号为8124
liang1开始执行work3,进程号为4200
liang2开始执行work3,进程号为4908
----all_done----

二、进程池进程之间的通讯

进程池中进程的通讯队列
from multiprocessing import Pool, Manager
q = Manager().Queue()

import os
import time
from multiprocessing import Pool, Manager


def work(name, q):
time.sleep(1)
print(f"{name}:---{os.getpid()}---{q.get()}")

def main():
# 创建一个用于进程池通信的队列
q = Manager().Queue()

for i in range(1000):
q.put(f'data-{i}')

# 创建一个拥有五个进程的进程池
po = Pool(5)
# 往进程池中添加20个任务
for i in range(20):
po.apply_async(work, (f'liang{i}', q))

# close:关闭进程池(进程池停止接收任务)
po.close()
# 主进程等待进程池中的任务结束再往下执行
po.join()

if __name__ == '__main__':
main()

三、多进程+协程实现并发

小练习:

假设一个队列中有100000个URL地址,每个请求需要1秒钟,尝试用4个进程,每个进程中开启1000个协程去请求!统计运行时间

from gevent import monkey
monkey.patch_all(thread=False)
import gevent
import time
from multiprocessing import Process, Queue
import os


def time_count(func):
def wrapper(*args, **kwargs):
start_time = time.time()
func(*args, **kwargs)
end_time = time.time()
print('总耗时:', end_time - start_time)

return wrapper


class Myprocess(Process):
def __init__(self, que):
super().__init__()
self.que = que

#重写进程子类的run函数
def run(self):
cos = []
#开启多协程
for i in range(1000):
#调用工作函数
cor = gevent.spawn(self.work)
cos.append(cor)
gevent.joinall(cos)

#定义工作函数
def work(self):
while self.que.qsize() > 0:
try:
url = self.que.get(timeout=1)
time.sleep(1)
print(f"{os.getpid()}正在请求url:{url}")
except Exception as e:
print(e.__repr__())
break


@time_count
def main():
q = Queue()
for i in range(100000):
q.put(f'https://www.baidu.com--{i}')

process_list = []
for i in range(4):
p = Myprocess(q)
process_list.append(p)
p.start()
for p in process_list:
p.join()

print("任务结束")


if __name__ == '__main__':
main()

运行时间27秒

  • 收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK