3 \e@s8ddgZddlZddlZddlZddlZddlZddlZddlZddlm Z ddlm Z m Z dZ dZ dZejZdd Zd d ZGd d d eZGdddZddZGdddeZdfddfddZddZGdddeZGdddeZeZGdddeZGdddeZGd d!d!eZ Gd"ddeZ!dS)#Pool ThreadPoolN)util) get_context TimeoutErrorcCs tt|S)N)listmap)argsr /usr/lib64/python3.6/pool.pymapstar+srcCsttj|d|dS)Nrr)r itertoolsstarmap)r r r r starmapstar.src@seZdZddZddZdS)RemoteTracebackcCs ||_dS)N)tb)selfrr r r __init__6szRemoteTraceback.__init__cCs|jS)N)r)rr r r __str__8szRemoteTraceback.__str__N)__name__ __module__ __qualname__rrr r r r r5src@seZdZddZddZdS)ExceptionWithTracebackcCs0tjt|||}dj|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr rr r r r<s zExceptionWithTraceback.__init__cCst|j|jffS)N) rebuild_excr r)rr r r __reduce__Asz!ExceptionWithTraceback.__reduce__N)rrrrr"r r r r r;srcCst||_|S)N)r __cause__)r rr r r r!Ds r!cs0eZdZdZfddZddZddZZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.cs.t||_t||_tt|j|j|jdS)N)reprr valuesuperr$r)rr r&) __class__r r rPs  zMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r&r )rr r r rUszMaybeEncodingError.__str__cCsd|jj|fS)Nz<%s: %s>)r(r)rr r r __repr__YszMaybeEncodingError.__repr__)rrr__doc__rrr) __classcell__r r )r(r r$Ls r$Fc'Cs|j}|j}t|dr*|jj|jj|dk r:||d}xH|dksX|oT||kry |} Wn$ttfk rtj dPYnX| dkrtj dP| \} } } } }yd| | |f}WnBt k r}z&|r| t k rt ||j }d|f}WYdd}~XnXy|| | |fWnRt k rd}z4t||d}tj d||| | d|ffWYdd}~XnXd} } }} } }|d7}qBWtj d |dS) N_writerrz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFrz0Possible encoding error while sending result: %szworker exiting after %d tasks)putgethasattrr,close_readerEOFErrorOSErrorrdebug Exception_helper_reraises_exceptionr __traceback__r$)inqueueoutqueue initializerinitargsZmaxtasksZwrap_exceptionr-r.Z completedtaskjobifuncr kwdsresultewrappedr r r worker]sD        $ rDcCs|dS)z@Pickle-able helper function for use by _guarded_task_generation.Nr )Zexr r r r6sr6c@s6eZdZdZdZddZddfddfddZdd Zd d Zd d Z ddZ fifddZ d=ddZ d>ddZ d?ddZddZd@ddZdAddZfiddfdd ZdBd!d"ZdCd#d$Zed%d&Zed'd(Zed)d*Zed+d,Zd-d.Zd/d0Zd1d2Zd3d4Zed5d6Zed7d8Z d9d:Z!d;d<Z"dS)DrzS Class which supports an async version of applying functions to arguments. TcOs|jj||S)N)_ctxProcess)rr r@r r r rFsz Pool.ProcessNc Csn|pt|_|jtj|_i|_t|_||_ ||_ ||_ |dkrPt j pNd}|dkr`td|dk rzt| rztd||_g|_|jtjtj|fd|_d|j_t|j_|jjtjtj|j|j|j|j|jfd|_d|j_t|j_|jjtjtj|j|j |jfd|_!d|j!_t|j!_|j!jt"j#||j$|j|j%|j|j|j|j|j!|jfdd|_&dS)Nrz&Number of processes must be at least 1zinitializer must be a callable)targetr T)r Z exitpriority)'rrE _setup_queuesqueueQueue _taskqueue_cacheRUN_state_maxtasksperchild _initializer _initargsos cpu_count ValueErrorcallable TypeError _processes_pool_repopulate_pool threadingZThreadr_handle_workers_worker_handlerdaemonstart _handle_tasks _quick_put _outqueue _task_handler_handle_results _quick_get_result_handlerrZFinalize_terminate_pool_inqueue _terminate)r processesr:r;Zmaxtasksperchildcontextr r r rsT         z Pool.__init__cCsZd}xPttt|jD]:}|j|}|jdk rtjd||jd}|j|=qW|S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNzcleaning up worker %dT)reversedrangelenrYexitcoderr4r)rZcleanedr>rDr r r _join_exited_workerss   zPool._join_exited_workersc Cs|xvt|jt|jD]^}|jt|j|j|j|j |j |j fd}|jj ||j jdd|_ d|_|jtjdqWdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. )rGr rFZ PoolWorkerTz added workerN)rmrXrnrYrFrDrhrbrQrRrP_wrap_exceptionappendnamereplacer^r_rr4)rr>wr r r rZs  zPool._repopulate_poolcCs|jr|jdS)zEClean up any exited workers and start replacements for them. N)rprZ)rr r r _maintain_poolszPool._maintain_poolcCs4|jj|_|jj|_|jjj|_|jjj|_ dS)N) rEZ SimpleQueuerhrbr,sendrar1recvre)rr r r rIs   zPool._setup_queuescCs|j|||jS)z6 Equivalent of `func(*args, **kwds)`. ) apply_asyncr.)rr?r r@r r r applysz Pool.applycCs|j||t|jS)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr.)rr?iterable chunksizer r r r szPool.mapcCs|j||t|jS)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )r{rr.)rr?r|r}r r r r sz Pool.starmapcCs|j||t|||S)z= Asynchronous version of `starmap()` method. )r{r)rr?r|r}callbackerror_callbackr r r starmap_asyncs zPool.starmap_asyncccsny0d}x&t|D]\}}||||fifVqWWn8tk rh}z||dt|fifVWYdd}~XnXdS)zProvides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.rN) enumerater5r6)rZ result_jobr?r|r>xrBr r r _guarded_task_generations zPool._guarded_task_generationrcCs|jtkrtd|dkrFt|j}|jj|j|j|||j f|St j |||}t|j}|jj|j|jt ||j fdd|DSdS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. zPool not runningrcss|]}|D] }|Vq qdS)Nr ).0chunkitemr r r @szPool.imap..N) rOrNrU IMapIteratorrMrLr-r_job _set_lengthr _get_tasksr)rr?r|r}rA task_batchesr r r imap's      z Pool.imapcCs|jtkrtd|dkrFt|j}|jj|j|j|||j f|St j |||}t|j}|jj|j|jt ||j fdd|DSdS)zL Like `imap()` method but ordering of results is arbitrary. zPool not runningrcss|]}|D] }|Vq qdS)Nr )rrrr r r r[sz&Pool.imap_unordered..N) rOrNrUIMapUnorderedIteratorrMrLr-rrrrrr)rr?r|r}rArr r r imap_unorderedBs      zPool.imap_unorderedcCsB|jtkrtdt|j||}|jj|jd|||fgdf|S)z; Asynchronous version of `apply()` method. zPool not runningrN)rOrNrU ApplyResultrMrLr-r)rr?r r@r~rrAr r r ry]s  zPool.apply_asynccCs|j||t|||S)z9 Asynchronous version of `map()` method. )r{r)rr?r|r}r~rr r r map_asynchszPool.map_asyncc Cs|jtkrtdt|ds$t|}|dkrTtt|t|jd\}}|rT|d7}t|dkrdd}tj |||}t |j |t|||d} |j j |j| j||df| S)zY Helper function to implement map, starmap and their async counterparts. zPool not running__len__Nrr)r)rOrNrUr/r divmodrnrYrr MapResultrMrLr-rr) rr?r|Zmapperr}r~rZextrarrAr r r r{ps&   zPool._map_asynccCsTtj}x0|jtks$|jr8|jtkr8|jtjdq W|j j dt j ddS)Ng?zworker handler exiting) r[current_threadrOrNrM TERMINATErvtimesleeprLr-rr4)poolthreadr r r r\s  zPool._handle_workersc Csjtj}xt|jdD]\}}d}zx|D]}|jr@tjdPy ||Wq*tk r} zD|dd\} } y|| j| d| fWnt k rYnXWYdd} ~ Xq*Xq*W|rtjd|r|dnd } || dwPWdd}}} XqWtjdy:tjd|j dtjdx|D]} |dq&WWn t k rZtjd YnXtjd dS) Nz'task handler found thread._state != RUNrFzdoing set_length()rztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exitingr) r[riterr.rOrr4r5_setKeyErrorr-r3) taskqueuer-r9rcacherZtaskseqZ set_lengthr<rBr=idxpr r r r`sB          zPool._handle_taskscCstj}xy |}Wn"ttfk r6tjddSX|jrJtjdP|dkr^tjdP|\}}}y||j||Wntk rYnXd}}}q Wx|o|jt kr:y |}Wn"ttfk rtjddSX|dkrtjdq|\}}}y||j||Wntk r*YnXd}}}qWt |drtjdy,x&t dD]}|j j spP|q^WWnttfk rYnXtjdt||jdS) Nz.result handler got EOFError/OSError -- exitingz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelr1z"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)r[rr3r2rr4rOrrrr/rmr1pollrn)r9r.rrr<r=r>objr r r rdsZ            zPool._handle_resultsccs4t|}x&ttj||}|s"dS||fVq WdS)N)rtuplerislice)r?itsizerr r r rs zPool._get_taskscCs tddS)Nz:pool objects cannot be passed between processes or pickled)NotImplementedError)rr r r r"szPool.__reduce__cCs&tjd|jtkr"t|_t|j_dS)Nz closing pool)rr4rOrNCLOSEr])rr r r r0s  z Pool.closecCs$tjdt|_t|j_|jdS)Nzterminating pool)rr4rrOr]ri)rr r r terminates zPool.terminatecCsDtjd|jj|jj|jjx|jD] }|jq0WdS)Nz joining pool)rr4r]rrcrfrY)rrr r r rs      z Pool.joincCsDtjd|jjx*|jr>|jjr>|jjtj dqWdS)Nz7removing tasks from inqueue until task handler finishedr) rr4Z_rlockacquireis_aliver1rrxrr)r8 task_handlerrr r r _help_stuff_finish(s    zPool._help_stuff_finishc Cs(tjdt|_t|_tjd|j||t|t|_|jdtjdtj|k r`|j |rt |ddrtjdx|D]} | j dkr| j qWtjdtj|k r|j tjdtj|k r|j |ot |ddr$tjd x,|D]$} | j rtjd | j| j qWdS) Nzfinalizing poolz&helping task handler/workers to finishzjoining worker handlerrrzterminating workerszjoining task handlerzjoining result handlerzjoining pool workerszcleaning up worker %d)rr4rrOrrnr-r[rrr/rorrpid) clsrr8r9rZworker_handlerrZresult_handlerrrr r r rg1s6               zPool._terminate_poolcCs|S)Nr )rr r r __enter___szPool.__enter__cCs |jdS)N)r)rexc_typeZexc_valZexc_tbr r r __exit__bsz Pool.__exit__)N)N)NNN)r)r)NNN)NNN)#rrrr*rqrFrrprZrvrIrzr rrrrrryrr{ staticmethodr\r`rdrr"r0rrr classmethodrgrrr r r r rsF8         . <  .c@s@eZdZddZddZddZddd Zdd d Zd d ZdS)rcCs4tj|_tt|_||_||_||_|||j<dS)N) r[ZEvent_eventnext job_counterrrM _callback_error_callback)rrr~rr r r rks   zApplyResult.__init__cCs |jjS)N)rZis_set)rr r r readysszApplyResult.readycCs|jS)N)_success)rr r r successfulvszApplyResult.successfulNcCs|jj|dS)N)rwait)rtimeoutr r r rzszApplyResult.waitcCs,|j||jst|jr"|jS|jdS)N)rrrr_value)rrr r r r.}s  zApplyResult.getcCsV|\|_|_|jr$|jr$|j|j|jr>|j r>|j|j|jj|j|j=dS)N)rrrrrsetrMr)rr>rr r r rs     zApplyResult._set)N)N) rrrrrrrr.rr r r r ris   rc@seZdZddZddZdS)rcCsftj||||dd|_dg||_||_|dkrLd|_|jj||j=n||t |||_dS)N)rTr) rrrr _chunksize _number_leftrrrbool)rrr}lengthr~rr r r rs    zMapResult.__init__cCs|jd8_|\}}|rp|jrp||j||j|d|j<|jdkr|jrZ|j|j|j|j=|jjnH| r|jrd|_||_|jdkr|j r|j |j|j|j=|jjdS)NrrF) rrrrrrMrrrr)rr>Zsuccess_resultsuccessrAr r r rs"         zMapResult._setN)rrrrrr r r r rs rc@s:eZdZddZddZd ddZeZdd Zd d ZdS) rcCsJtjtj|_tt|_||_tj |_ d|_ d|_ i|_ |||j<dS)Nr)r[Z ConditionZLock_condrrrrM collectionsdeque_items_index_length _unsorted)rrr r r rs  zIMapIterator.__init__cCs|S)Nr )rr r r __iter__szIMapIterator.__iter__NcCs|j~y|jj}Wnhtk r~|j|jkr6t|jj|y|jj}Wn(tk rx|j|jkrpttYnXYnXWdQRX|\}}|r|S|dS)N) rrpopleft IndexErrorrr StopIterationrr)rrrrr&r r r rs"   zIMapIterator.nextc Cs|j|j|krr|jj||jd7_x8|j|jkrd|jj|j}|jj||jd7_q.W|jjn ||j|<|j|jkr|j|j =WdQRXdS)Nr) rrrrrrpopnotifyrrMr)rr>rr r r rs      zIMapIterator._setc Cs<|j,||_|j|jkr.|jj|j|j=WdQRXdS)N)rrrrrMr)rrr r r rs   zIMapIterator._set_length)N) rrrrrr__next__rrr r r r rs   rc@seZdZddZdS)rc CsP|j@|jj||jd7_|jj|j|jkrB|j|j=WdQRXdS)Nr)rrrrrrrrMr)rr>rr r r rs    zIMapUnorderedIterator._setN)rrrrr r r r rsrc@s@eZdZdZeddZddffddZddZed d ZdS) rFcOsddlm}|||S)Nr)rF)ZdummyrF)r r@rFr r r rFs zThreadPool.ProcessNcCstj||||dS)N)rr)rrjr:r;r r r rszThreadPool.__init__cCs,tj|_tj|_|jj|_|jj|_dS)N)rJrKrhrbr-rar.re)rr r r rIs   zThreadPool._setup_queuesc Cs<|j,|jj|jjdg||jjWdQRXdS)N)Z not_emptyrJclearextendZ notify_all)r8rrr r r rs zThreadPool._help_stuff_finish) rrrrqrrFrrIrr r r r r s  )"__all__r[rJrrrSrrrrrrrNrrcountrrrr5rrr!r$rDr6objectrrZ AsyncResultrrrrr r r r  s@   *\&)@