Python2 Multiprocessing Pool源码解读
source link: https://perfectnewer.github.io/personal-note/post/python/python2-multiprocessing-pool%E6%BA%90%E7%A0%81%E8%A7%A3%E8%AF%BB/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
因为踩坑ctrl-c无法退出multiprocessing pool问题,趁机简单解读一下相关代码
初始化逻辑流程
- 初始化和子进程交互的队列
- 初始化存放用户任务的队列
- 初始化子进程
- 启动进程管理线程
- 启动任务管理线程
- 启动结果管理进程
- 设置清理逻辑
class Pool
(object):
Process Process
__init__(self, processes, initializer, initargs(),
maxtasksperchild):
self_setup_queues()
self_taskqueue QueueQueue()
self_cache {}
self_state RUN
self_maxtasksperchild maxtasksperchild
self_initializer initializer
self_initargs initargs
processes :
:
processes cpu_count()
:
processes
processes :
()
initializer hasattr(initializer, ):
()
self_processes processes
self_pool []
self_repopulate_pool()
self_worker_handler threadingThread(
targetPool_handle_workers,
args(self, )
)
self_worker_handlerdaemon
self_worker_handler_state RUN
self_worker_handlerstart()
self_task_handler threadingThread(
targetPool_handle_tasks,
args(self_taskqueue, self_quick_put, self_outqueue,
self_pool, self_cache)
)
self_task_handlerdaemon
self_task_handler_state RUN
self_task_handlerstart()
self_result_handler threadingThread(
targetPool_handle_results,
args(self_outqueue, self_quick_get, self_cache)
)
self_result_handlerdaemon
self_result_handler_state RUN
self_result_handlerstart()
self_terminate Finalize(
self, self_terminate_pool,
args(self_taskqueue, self_inqueue, self_outqueue, self_pool,
self_worker_handler, self_task_handler,
self_result_handler, self_cache),
exitpriority
)
worker管理线程逻辑:_handle_workers
它的作用是维护保持进程数量,清理死掉的子进程,拉起新的进程。
这里要注意的是,如果pool仅仅是close掉了,那么pool中剩余的任务仍然会被执行,并且全部有了结果才会退出这个线程。
为什么说是有了结果而不是说worker退出呢,因为如果子进程执行用户代码遇到了没有捕捉到的异常,那么那个用户任务的结果就无法正常设置成功。
这个维护进程就永远无法退出,直到用户调用pool.terminate()
_handle_wokers
(self):
cleaned
i reversed(range(len(self_pool))):
worker self_pool[i]
workerexitcode :
debug( i)
workerjoin()
cleaned
self_pool[i]
cleaned
(self):
i range(self_processes len(self_pool)):
w selfProcess(targetworker,
args(self_inqueue, self_outqueue,
self_initializer,
self_initargs, self_maxtasksperchild)
)
self_poolappend(w)
wname wnamereplace(, )
wdaemon
wstart()
debug()
(self):
self_join_exited_workers():
self_repopulate_pool()
(pool):
thread threadingcurrent_thread()
thread_state RUN (pool_cache thread_state TERMINATE):
pool_maintain_pool()
timesleep()
pool_taskqueueput()
debug()
任务管理线程:_handle_tasks
将用户放入的task,转入到子进程监听的队列中。核心就是迭代task queue获取用户任务,然后put到outqueue中。这里之所以代码稍微复杂,是为了统一转化apply、map、imap等函数放任务的格式
_handle_tasks
(taskqueue, put, outqueue, pool, cache):
thread threadingcurrent_thread()
taskseq, set_length iter(taskqueueget, ):
task
i
:
i, task enumerate(taskseq):
thread_state:
debug()
:
put(task)
e:
job, ind task[:]
:
cache[job]_set(ind, (, e))
:
:
set_length:
debug()
set_length(i)
ex:
job, ind task[:] task (, )
job cache:
cache[job]_set(ind , (, ex))
set_length:
debug()
set_length(i)
:
task taskseq job
:
debug()
:
debug()
outqueueput()
debug()
p pool:
put()
:
debug()
debug()
结果处理线程:_handle_result
这里逻辑很简单,单纯的从outqueue中获取子进程的处理结果,将结果设置到对应pool._cache
的AsyncResult中。
最后对outqueue的read是为了防止_handle_task
线程因block无法退出
_handle_result
(outqueue, get, cache):
thread threadingcurrent_thread()
:
:
task get()
(, ):
debug()
thread_state:
thread_state TERMINATE
debug()
task :
debug()
job, i, obj task
:
cache[job]_set(i, obj)
:
task job obj
cache thread_state TERMINATE:
:
task get()
(, ):
debug()
task :
debug()
job, i, obj task
:
cache[job]_set(i, obj)
:
task job obj
hasattr(outqueue, ):
debug()
:
i range():
outqueue_readerpoll():
get()
(, ):
debug(,
len(cache), thread_state)
用户api:apply_async
这里只看一个apply_async
函数,其他函数大同小异。
这个函数就是简单的将用户任务构造成task的格式,放入task队列。然后返回ApplyResult
给用户,作为获取结果的桥梁
def apply_async(self, func, args=(), kwds={}, callback=None):
'''
Asynchronous equivalent of `apply()` builtin
'''
assert self._state == RUN
result = ApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
worker
worker代码是通过Popen的方式在子进程中运行的。因此我们的function(也就是task)写的时候必须牢记,自己的代码是运行中另一个进程中的。并且这个进程正常情况会一直运行下去,除非设置了maxtasksperchild
参数。
worker逻辑也很清晰:
- 执行用户自定义的初始化逻辑。
- 然后进入任务循环。
- 从inqueue获取任务。
- 执行用户代码逻辑。
- 将结果放入outqueue。
worker
(inqueue, outqueue, initializer, initargs(), maxtasks):
maxtasks (type(maxtasks) (int, long) maxtasks )
put outqueueput
get inqueueget
hasattr(inqueue, ):
inqueue_writerclose()
outqueue_readerclose()
initializer :
initializer(initargs)
completed
maxtasks (maxtasks completed maxtasks):
:
task get()
(, ):
debug()
task :
debug()
job, i, func, args, kwds task
:
result (, func(args, kwds))
, e:
result (, e)
:
put((job, i, result))
e:
wrapped MaybeEncodingError(e, result[])
debug( (
wrapped))
put((job, i, (, wrapped)))
task job result func args kwds
completed
debug( completed)
ApplyResult
用户获取执行结果的桥梁。
需要注意的是,如果调用get或者wait不加超时时间,那么进程就会一直block住,直到result被设置。此时无法响应signal。这是python2设计的一个bug,但是并不打算修复。详情:threading.Condition.wait() is not interruptible in Python 2.7
这里就是容易导致进程不响应
ctrl-c
的地方之一。方案有以下几种
- get添加超时时间
- 保证子进程能正常退出。一般是子进程忽略相关signal
- 找个合适的途径调用pool.terminate
class ApplyResult
(object):
__init__(self, cache, callback):
self_cond threadingCondition(threadingLock())
self_job job_counternext()
self_cache cache
self_ready
self_callback callback
cache[self_job] self
(self):
self_ready
(self):
self_ready
self_success
(self, timeout):
self_condacquire()
:
self_ready:
self_condwait(timeout)
:
self_condrelease()
(self, timeout):
selfwait(timeout)
self_ready:
self_success:
self_value
:
self_value
(self, i, obj):
self_success, self_value obj
self_callback self_success:
self_callback(self_value)
self_condacquire()
:
self_ready
self_condnotify()
:
self_condrelease()
self_cache[self_job]
AsyncResult ApplyResult
}
pool.close
close的行为就是单纯的设置pool的状态为close。 此时pool不再接受新的任务。现存的任务仍然会被继续执行
调用close后,执行join也会触发
condition.wait
。此时也会block进程,无法处理signal
def close(self):
debug('closing pool')
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE
pool.terminate
设置状态,调用_terminate
def terminate(self):
debug('terminating pool')
self._state = TERMINATE
self._worker_handler._state = TERMINATE
self._terminate()
_terminate
是一个callable的Finalize对象。这个对象通过weakref
绑定进程对象。在对象被销毁或者调用Finalize的时候执行pool._terminate_pool
self._terminate = Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._worker_handler, self._task_handler,
self._result_handler, self._cache),
exitpriority=15
)
_terminate_pool
:
_terminate_pool
(cls, taskqueue, inqueue, outqueue, pool,
worker_handler, task_handler, result_handler, cache):
debug()
worker_handler_state TERMINATE
task_handler_state TERMINATE
debug()
cls_help_stuff_finish(inqueue, task_handler, len(pool))
result_handleris_alive() len(cache)
result_handler_state TERMINATE
outqueueput()
debug()
threadingcurrent_thread() worker_handler:
worker_handlerjoin()
pool hasattr(pool[], ):
debug()
p pool:
pexitcode :
pterminate()
debug()
threadingcurrent_thread() task_handler:
task_handlerjoin()
debug()
threadingcurrent_thread() result_handler:
result_handlerjoin()
pool hasattr(pool[], ):
debug()
p pool:
pis_alive():
debug( ppid)
pjoin()
pool.join
这个方法逻辑简单粗暴。直接依次执行每个handler线程以及pool中子进程的join
注意这里的join是没有超时时间的,会block住signal。
此处也是会导致ctrl-c后进程无法退出的一个原因。
如果子此时调用的是close方法,并且进程不能正确处理异常,导致一些进程的执行结果没有设置到ApplyResult,就会导致
pool._cache
永远不为空,此时worker handler就永远无法退出。主进程就会一直block在self._worker_handler.join()
def join(self):
debug('joining pool')
assert self._state in (CLOSE, TERMINATE)
debug('joining worker handler')
self._worker_handler.join()
debug("joining task handler")
self._task_handler.join()
debug("joiningg result handler")
self._result_handler.join()
debug("joining pools")
for p in self._pool:
p.join()
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK