4

Python2 Multiprocessing Pool源码解读

 2 years ago
source link: https://perfectnewer.github.io/personal-note/post/python/python2-multiprocessing-pool%E6%BA%90%E7%A0%81%E8%A7%A3%E8%AF%BB/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

因为踩坑ctrl-c无法退出multiprocessing pool问题,趁机简单解读一下相关代码

结构导图

逻辑架构图

初始化逻辑流程

  1. 初始化和子进程交互的队列
  2. 初始化存放用户任务的队列
  3. 初始化子进程
  4. 启动进程管理线程
  5. 启动任务管理线程
  6. 启动结果管理进程
  7. 设置清理逻辑

class Pool


 (object):
    
    Process  Process

     __init__(self, processes, initializer, initargs(),
                 maxtasksperchild):
        
        self_setup_queues()
        
        self_taskqueue  QueueQueue()
        
        self_cache  {}
        
        self_state  RUN
        
        self_maxtasksperchild  maxtasksperchild
        self_initializer  initializer
        self_initargs  initargs
    
         processes  :
            :
                processes  cpu_count()
             :
                processes  
         processes  :
             ()
    
         initializer      hasattr(initializer, ):
             ()
    
        
        self_processes  processes
        self_pool  []
        
        self_repopulate_pool()
    
        self_worker_handler  threadingThread(
            targetPool_handle_workers,
            args(self, )
            )
        self_worker_handlerdaemon  
        self_worker_handler_state  RUN
        self_worker_handlerstart()
    
        self_task_handler  threadingThread(
            targetPool_handle_tasks,
            args(self_taskqueue, self_quick_put, self_outqueue,
                  self_pool, self_cache)
            )
        self_task_handlerdaemon  
        self_task_handler_state  RUN
        self_task_handlerstart()
    
        self_result_handler  threadingThread(
            targetPool_handle_results,
            args(self_outqueue, self_quick_get, self_cache)
            )
        self_result_handlerdaemon  
        self_result_handler_state  RUN
        self_result_handlerstart()
    
        self_terminate  Finalize(
            self, self_terminate_pool,
            args(self_taskqueue, self_inqueue, self_outqueue, self_pool,
                  self_worker_handler, self_task_handler,
                  self_result_handler, self_cache),
            exitpriority
            )

worker管理线程逻辑:_handle_workers

它的作用是维护保持进程数量,清理死掉的子进程,拉起新的进程。

这里要注意的是,如果pool仅仅是close掉了,那么pool中剩余的任务仍然会被执行,并且全部有了结果才会退出这个线程。

为什么说是有了结果而不是说worker退出呢,因为如果子进程执行用户代码遇到了没有捕捉到的异常,那么那个用户任务的结果就无法正常设置成功。

这个维护进程就永远无法退出,直到用户调用pool.terminate()

_handle_wokers

    
     (self):
        
        cleaned  
         i  reversed(range(len(self_pool))):
            worker  self_pool[i]
             workerexitcode   :
                
                debug(  i)
                workerjoin()
                cleaned  
                 self_pool[i]
         cleaned

    
     (self):
        
         i  range(self_processes  len(self_pool)):
            w  selfProcess(targetworker,
                             args(self_inqueue, self_outqueue,
                                   self_initializer,
                                   self_initargs, self_maxtasksperchild)
                            )
            self_poolappend(w)
            wname  wnamereplace(, )
            wdaemon  
            wstart()
            debug()

     (self):
        
         self_join_exited_workers():
            self_repopulate_pool()
    
    
    
     (pool):
        thread  threadingcurrent_thread()

        
        
         thread_state  RUN  (pool_cache  thread_state  TERMINATE):
            pool_maintain_pool()
            timesleep()
        
        pool_taskqueueput()
        debug()

任务管理线程:_handle_tasks

将用户放入的task,转入到子进程监听的队列中。核心就是迭代task queue获取用户任务,然后put到outqueue中。这里之所以代码稍微复杂,是为了统一转化apply、map、imap等函数放任务的格式

_handle_tasks

    
     (taskqueue, put, outqueue, pool, cache):
        thread  threadingcurrent_thread()
        
        
         taskseq, set_length  iter(taskqueueget, ):
            task  
            i  
            :
                 i, task  enumerate(taskseq):
                     thread_state:
                        debug()
                        
                    :
                        put(task)
                       e:
                        job, ind  task[:]
                        :
                            cache[job]_set(ind, (, e))
                         :
                            
                :
                     set_length:
                        debug()
                        set_length(i)
                    
                
               ex:
                job, ind  task[:]  task  (, )
                 job  cache:
                    cache[job]_set(ind  , (, ex))
                 set_length:
                    debug()
                    set_length(i)
            :
                task  taskseq  job  
        :
            debug()

        :
            
            debug()
            outqueueput()

            
            debug()
             p  pool:
                put()
         :
            debug()

        debug()

结果处理线程:_handle_result

这里逻辑很简单,单纯的从outqueue中获取子进程的处理结果,将结果设置到对应pool._cache的AsyncResult中。

最后对outqueue的read是为了防止_handle_task线程因block无法退出

_handle_result


    
     (outqueue, get, cache):
        thread  threadingcurrent_thread()

         :
            :
                task  get()
             (, ):
                debug()
                

             thread_state:
                 thread_state  TERMINATE
                debug()
                

             task  :
                debug()
                

            job, i, obj  task
            :
                cache[job]_set(i, obj)
             :
                
            task  job  obj  

         cache  thread_state  TERMINATE:
            :
                task  get()
             (, ):
                debug()
                

             task  :
                debug()
                
            job, i, obj  task
            :
                cache[job]_set(i, obj)
             :
                
            task  job  obj  

         hasattr(outqueue, ):
            debug()
            
            
            
            :
                 i  range():
                      outqueue_readerpoll():
                        
                    get()
             (, ):
                

        debug(,
              len(cache), thread_state)

用户api:apply_async

这里只看一个apply_async函数,其他函数大同小异。

这个函数就是简单的将用户任务构造成task的格式,放入task队列。然后返回ApplyResult给用户,作为获取结果的桥梁

    def apply_async(self, func, args=(), kwds={}, callback=None):
        '''
        Asynchronous equivalent of `apply()` builtin
        '''
        assert self._state == RUN
        result = ApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result

worker

worker代码是通过Popen的方式在子进程中运行的。因此我们的function(也就是task)写的时候必须牢记,自己的代码是运行中另一个进程中的。并且这个进程正常情况会一直运行下去,除非设置了maxtasksperchild参数。

worker逻辑也很清晰:

  1. 执行用户自定义的初始化逻辑。
  2. 然后进入任务循环。
  3. 从inqueue获取任务。
  4. 执行用户代码逻辑。
  5. 将结果放入outqueue。

worker

 (inqueue, outqueue, initializer, initargs(), maxtasks):
     maxtasks    (type(maxtasks)  (int, long)  maxtasks  )
    put  outqueueput
    get  inqueueget
     hasattr(inqueue, ):
        inqueue_writerclose()
        outqueue_readerclose()

     initializer   :
        initializer(initargs)

    completed  
     maxtasks    (maxtasks  completed  maxtasks):
        :
            task  get()
         (, ):
            debug()
            

         task  :
            debug()
            

        job, i, func, args, kwds  task
        :
            result  (, func(args, kwds))
         , e:
            result  (, e)
        :
            put((job, i, result))
           e:
            wrapped  MaybeEncodingError(e, result[])
            debug(  (
                wrapped))
            put((job, i, (, wrapped)))

        task  job  result  func  args  kwds  
        completed  
    debug(  completed)

ApplyResult

用户获取执行结果的桥梁。

需要注意的是,如果调用get或者wait不加超时时间,那么进程就会一直block住,直到result被设置。此时无法响应signal。这是python2设计的一个bug,但是并不打算修复。详情:threading.Condition.wait() is not interruptible in Python 2.7

这里就是容易导致进程不响应ctrl-c的地方之一。方案有以下几种

  1. get添加超时时间
  2. 保证子进程能正常退出。一般是子进程忽略相关signal
  3. 找个合适的途径调用pool.terminate

class ApplyResult

 (object):

     __init__(self, cache, callback):
        self_cond  threadingCondition(threadingLock())
        self_job  job_counternext()
        self_cache  cache
        self_ready  
        self_callback  callback
        cache[self_job]  self

     (self):
         self_ready

     (self):
         self_ready
         self_success

     (self, timeout):
        self_condacquire()
        :
              self_ready:
                self_condwait(timeout)
        :
            self_condrelease()

     (self, timeout):
        selfwait(timeout)
          self_ready:
             
         self_success:
             self_value
        :
             self_value

     (self, i, obj):
        self_success, self_value  obj
         self_callback  self_success:
            self_callback(self_value)
        self_condacquire()
        :
            self_ready  
            self_condnotify()
        :
            self_condrelease()
         self_cache[self_job]

AsyncResult  ApplyResult       

}

pool.close

close的行为就是单纯的设置pool的状态为close。 此时pool不再接受新的任务。现存的任务仍然会被继续执行

调用close后,执行join也会触发condition.wait。此时也会block进程,无法处理signal

    def close(self):
        debug('closing pool')
        if self._state == RUN:
            self._state = CLOSE
            self._worker_handler._state = CLOSE

pool.terminate

设置状态,调用_terminate

    def terminate(self):
        debug('terminating pool')
        self._state = TERMINATE  
        self._worker_handler._state = TERMINATE
        self._terminate()

_terminate是一个callable的Finalize对象。这个对象通过weakref绑定进程对象。在对象被销毁或者调用Finalize的时候执行pool._terminate_pool

        self._terminate = Finalize(
            self, self._terminate_pool,
            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                  self._worker_handler, self._task_handler,
                  self._result_handler, self._cache),
            exitpriority=15
            )

_terminate_pool:

_terminate_pool

    
     (cls, taskqueue, inqueue, outqueue, pool,
                        worker_handler, task_handler, result_handler, cache):
        
        debug()

        
        worker_handler_state  TERMINATE
        task_handler_state  TERMINATE

        debug()
        
        cls_help_stuff_finish(inqueue, task_handler, len(pool))

         result_handleris_alive()  len(cache)  

        result_handler_state  TERMINATE
        
        outqueueput()                  

        
        
        debug()
         threadingcurrent_thread()   worker_handler:
            
            worker_handlerjoin()

        
         pool  hasattr(pool[], ):
            debug()
             p  pool:
                 pexitcode  :
                    pterminate()

        debug()
         threadingcurrent_thread()   task_handler:
            
            task_handlerjoin()

        debug()
         threadingcurrent_thread()   result_handler:
            
            result_handlerjoin()

        
         pool  hasattr(pool[], ):
            debug()
             p  pool:
                 pis_alive():
                    
                    debug(  ppid)
                    pjoin()

pool.join

这个方法逻辑简单粗暴。直接依次执行每个handler线程以及pool中子进程的join

注意这里的join是没有超时时间的,会block住signal。

此处也是会导致ctrl-c后进程无法退出的一个原因。

如果子此时调用的是close方法,并且进程不能正确处理异常,导致一些进程的执行结果没有设置到ApplyResult,就会导致pool._cache永远不为空,此时worker handler就永远无法退出。主进程就会一直block在self._worker_handler.join()

    def join(self):
        debug('joining pool')
        assert self._state in (CLOSE, TERMINATE)
        debug('joining worker handler')
        self._worker_handler.join()
        debug("joining task handler")
        self._task_handler.join()
        debug("joiningg result handler")
        self._result_handler.join()
        debug("joining pools")
        for p in self._pool:
            p.join()

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK