2

python 包之 multiprocessing 多进程教程

 2 years ago
source link: https://blog.51cto.com/autofelix/5166197
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

python 包之 multiprocessing 多进程教程

原创

一、创建一个进程

  • 实例化 Process 类创建一个进程对象

  • 然后调用它的 start 方法即可生成一个子进程

from multiprocessing import Process

def func(s):
print(s)

if __name__ == '__main__':
p = Process(target=func, args=('autofelix', ))
p.start()
p.join()

二、创建多个进程

from multiprocessing import Process

def func(s):
print(s)

if __name__ == '__main__':
process = [
Process(target=func, args=('1', ))
Process(target=func, args=('2', ))
]

[p.start() for p in process]
[p.join() for p in process]

三、管道pipe进行进程间通信

  • Pipe(duplex=True):表示双工通信,也就是双向的,既可以接受也可以发送数据,默认为True

  • Pipe(duplex=False):表示单工通信,也就是单向的,只能进行接受或者发送数据

from multiprocessing import Process, Pipe

def func(conn):
print('send a list object ot other side...')
# 从管道对象的一端发送数据对象
conn.send(['33', 44, None])
conn.close()

if __name__ == '__main__':
# 默认创建一个双工管道对象,返回的两个对象代表管道的两端,
# 双工表示两端的对象都可以发送和接收数据,但是需要注意,
# 需要避免多个进程或线程从一端同时读或写数据
parent_conn, child_conn = Pipe()
p = Process(target=func, args=(child_conn, ))
p.start()
# 从管道的另一端接收数据对象
print(parent_conn.recv())
p.join()

四、队列Queue进行进程间通信

  • 当向队列中放入的数据较大时,就会在join()处卡死

  • 为了避免这种情况,常的做法是先使用get()将数据取出来,再使用join()方法

  • 如果不这样处理,队列进程将不能正常终止,造成死锁情况

from multiprocessing import Process, Queue

def func(q):
print('put a list object to queue...')
# 向Queue对象中添加一个对象
q.put(['33', 44, None])

if __name__ == '__main__':
# 创建一个队列
q = Queue()
p = Process(target=func, args=(q, ))
p.start()
# 从Queue对象中获取一个对象
print(q.get())
p.join()

五、进程间同步

  • 使用锁保证进程间的同步操作

from multiprocessing import Process, Lock

def func(lc, num):
# 使用锁保证以下代码同一时间只有一个进程在执行
lc.acquire()
print('process num: ', num)
lc.release()

if __name__ == '__main__':
lock = Lock()
for i in range(5):
Process(target=func, args=(lock, i)).start()

六、进程间共享数据

  • 使用共享内存的方式,共享值Value对象和数据Array对象

from multiprocessing import Process, Value, Array

def func(n, a):
n.value = 3.333
for i in range(len(a)):
a[i] = -a[i]

if __name__ == '__main__':
# 第一个参数d表示数据类型'double'双精度浮点类型
num = Value('d', 0.0)
# 第一个参数i表示数据类型'integer'整型
arr = Array('i', range(6))
p = Process(target=func, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])

七、进程池

  • 创建一个 Pool 进程池对象,并执行提交给它的任务

  • 进程池对象允许其中的进程以不同的方式运行

  • 但是需要注意,Pool 对象的方法只能是创建它的进程才能调用

from multiprocessing import Pool
import time

def f(x):
return x * x

if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
# 在进程池中开启一个新的进程并执行 f 函数
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow

# map会一直阻塞当前进程直到运行完可迭代对象中的所有元素,并返回结果。
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"

# imap是map方法的延迟执行版本,对于比较消耗内存的迭代,建议使用这个方法,
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow

result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
  • 1
  • 1收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK