3 \e@s8ddgZddlZddlZddlZddlZddlZddlZddlZddlm Z ddlm Z m Z dZ dZ dZejZdd Zd d ZGd d d eZGdddZddZGdddeZdfddfddZddZGdddeZGdddeZeZGdddeZGdddeZGd d!d!eZ Gd"ddeZ!dS)#Pool ThreadPoolN)util) get_context TimeoutErrorcCs tt|S)N)listmap)argsr /usr/lib64/python3.6/pool.pymapstar+srcCsttj|d|dS)Nrr)r itertoolsstarmap)r r r r starmapstar.src@seZdZddZddZdS)RemoteTracebackcCs ||_dS)N)tb)selfrr r r __init__6szRemoteTraceback.__init__cCs|jS)N)r)rr r r __str__8szRemoteTraceback.__str__N)__name__ __module__ __qualname__rrr r r r r5src@seZdZddZddZdS)ExceptionWithTracebackcCs0tjt|||}dj|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr rr r r r<s zExceptionWithTraceback.__init__cCst|j|jffS)N) rebuild_excr r)rr r r __reduce__Asz!ExceptionWithTraceback.__reduce__N)rrrrr"r r r r r;srcCst||_|S)N)r __cause__)r rr r r r!Ds r!cs0eZdZdZfddZddZddZZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.cs.t||_t||_tt|j|j|jdS)N)reprr valuesuperr$r)rr r&) __class__r r rPs  zMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r&r )rr r r rUszMaybeEncodingError.__str__cCsd|jj|fS)Nz<%s: %s>)r(r)rr r r __repr__YszMaybeEncodingError.__repr__)rrr__doc__rrr) __classcell__r r )r(r r$Ls r$Fc'Cs|dks t|tkr|dks t|j}|j}t|drJ|jj|jj|dk rZ||d}xL|dksx|ot||kry |} Wn$t t fk rt j dPYnX| dkrt j dP| \} } } } }yd| | |f}WnFt k r"}z(|o| tk r t||j}d|f}WYdd}~XnXy|| | |fWnRt k r}z4t||d}t j d||| | d|ffWYdd}~XnXd} } }} } }|d7}qbWt j d |dS) Nr_writerz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFrz0Possible encoding error while sending result: %szworker exiting after %d tasks)rintAssertionErrorputgethasattrr,close_readerEOFErrorOSErrorrdebug Exception_helper_reraises_exceptionr __traceback__r$)inqueueoutqueue initializerinitargsZmaxtasksZwrap_exceptionr/r0Z completedtaskjobifuncr kwdsresultewrappedr r r worker]sF        $ rFcCs|dS)z@Pickle-able helper function for use by _guarded_task_generation.Nr )Zexr r r r8sr8c@s6eZdZdZdZddZddfddfddZdd Zd d Zd d Z ddZ fifddZ d=ddZ d>ddZ d?ddZddZd@ddZdAddZfiddfdd ZdBd!d"ZdCd#d$Zed%d&Zed'd(Zed)d*Zed+d,Zd-d.Zd/d0Zd1d2Zd3d4Zed5d6Zed7d8Z d9d:Z!d;d<Z"dS)DrzS Class which supports an async version of applying functions to arguments. TcOs|jj||S)N)_ctxProcess)rr rBr r r rHsz Pool.ProcessNc Csn|pt|_|jtj|_i|_t|_||_ ||_ ||_ |dkrPt j pNd}|dkr`td|dk rzt| rztd||_g|_|jtjtj|fd|_d|j_t|j_|jjtjtj|j|j|j|j|jfd|_d|j_t|j_|jjtjtj|j|j |jfd|_!d|j!_t|j!_|j!jt"j#||j$|j|j%|j|j|j|j|j!|jfdd|_&dS)Nrz&Number of processes must be at least 1zinitializer must be a callable)targetr T)r Z exitpriority)'rrG _setup_queuesqueueQueue _taskqueue_cacheRUN_state_maxtasksperchild _initializer _initargsos cpu_count ValueErrorcallable TypeError _processes_pool_repopulate_pool threadingZThreadr_handle_workers_worker_handlerdaemonstart _handle_tasks _quick_put _outqueue _task_handler_handle_results _quick_get_result_handlerrZFinalize_terminate_pool_inqueue _terminate)r processesr<r=Zmaxtasksperchildcontextr r r rsT         z Pool.__init__cCsZd}xPttt|jD]:}|j|}|jdk rtjd||jd}|j|=qW|S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNzcleaning up worker %dT)reversedrangelenr[exitcoderr6r)rZcleanedr@rFr r r _join_exited_workerss   zPool._join_exited_workersc Cs|xvt|jt|jD]^}|jt|j|j|j|j |j |j fd}|jj ||j jdd|_ d|_|jtjdqWdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. )rIr rHZ PoolWorkerTz added workerN)rorZrpr[rHrFrjrdrSrTrR_wrap_exceptionappendnamereplacer`rarr6)rr@wr r r r\s  zPool._repopulate_poolcCs|jr|jdS)zEClean up any exited workers and start replacements for them. N)rrr\)rr r r _maintain_poolszPool._maintain_poolcCs4|jj|_|jj|_|jjj|_|jjj|_ dS)N) rGZ SimpleQueuerjrdr,sendrcr3recvrg)rr r r rKs   zPool._setup_queuescCs |jtkst|j|||jS)z6 Equivalent of `func(*args, **kwds)`. )rQrPr. apply_asyncr0)rrAr rBr r r applysz Pool.applycCs|j||t|jS)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr0)rrAiterable chunksizer r r r szPool.mapcCs|j||t|jS)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )r}rr0)rrAr~rr r r r sz Pool.starmapcCs|j||t|||S)z= Asynchronous version of `starmap()` method. )r}r)rrAr~rcallbackerror_callbackr r r starmap_asyncs zPool.starmap_asyncccsny0d}x&t|D]\}}||||fifVqWWn8tk rh}z||dt|fifVWYdd}~XnXdS)zProvides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.rN) enumerater7r8)rZ result_jobrAr~r@xrDr r r _guarded_task_generations zPool._guarded_task_generationrcCs|jtkrtd|dkrFt|j}|jj|j|j|||j f|S|dksRt t j |||}t|j}|jj|j|jt ||j fdd|DSdS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. zPool not runningrcss|]}|D] }|Vq qdS)Nr ).0chunkitemr r r @szPool.imap..N)rQrPrW IMapIteratorrOrNr/r_job _set_lengthr.r _get_tasksr)rrAr~rrC task_batchesr r r imap's"      z Pool.imapcCs|jtkrtd|dkrFt|j}|jj|j|j|||j f|S|dksRt t j |||}t|j}|jj|j|jt ||j fdd|DSdS)zL Like `imap()` method but ordering of results is arbitrary. zPool not runningrcss|]}|D] }|Vq qdS)Nr )rrrr r r r[sz&Pool.imap_unordered..N)rQrPrWIMapUnorderedIteratorrOrNr/rrrr.rrr)rrAr~rrCrr r r imap_unorderedBs"      zPool.imap_unorderedcCsB|jtkrtdt|j||}|jj|jd|||fgdf|S)z; Asynchronous version of `apply()` method. zPool not runningrN)rQrPrW ApplyResultrOrNr/r)rrAr rBrrrCr r r r{]s  zPool.apply_asynccCs|j||t|||S)z9 Asynchronous version of `map()` method. )r}r)rrAr~rrrr r r map_asynchszPool.map_asyncc Cs|jtkrtdt|ds$t|}|dkrTtt|t|jd\}}|rT|d7}t|dkrdd}tj |||}t |j |t|||d} |j j |j| j||df| S)zY Helper function to implement map, starmap and their async counterparts. zPool not running__len__Nrr)r)rQrPrWr1r divmodrpr[rr MapResultrOrNr/rr) rrAr~ZmapperrrrZextrarrCr r r r}ps&   zPool._map_asynccCsTtj}x0|jtks$|jr8|jtkr8|jtjdq W|j j dt j ddS)Ng?zworker handler exiting) r]current_threadrQrPrO TERMINATErxtimesleeprNr/rr6)poolthreadr r r r^s  zPool._handle_workersc Csjtj}xt|jdD]\}}d}zx|D]}|jr@tjdPy ||Wq*tk r} zD|dd\} } y|| j| d| fWnt k rYnXWYdd} ~ Xq*Xq*W|rtjd|r|dnd } || dwPWdd}}} XqWtjdy:tjd|j dtjdx|D]} |dq&WWn t k rZtjd YnXtjd dS) Nz'task handler found thread._state != RUNrFzdoing set_length()rztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exitingr) r]riterr0rQrr6r7_setKeyErrorr/r5) taskqueuer/r;rcacherZtaskseqZ set_lengthr>rDr?idxpr r r rbsB          zPool._handle_taskscCstj}xy |}Wn"ttfk r6tjddSX|jrX|jtksLttjdP|dkrltjdP|\}}}y||j ||Wnt k rYnXd}}}q Wx|o|jtkrJy |}Wn"ttfk rtjddSX|dkrtjdq|\}}}y||j ||Wnt k r:YnXd}}}qWt |drtjdy,x&t dD]}|j jsP|qnWWnttfk rYnXtjdt||jdS) Nz.result handler got EOFError/OSError -- exitingz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelr3z"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)r]rr5r4rr6rQrr.rrr1ror3pollrp)r;r0rrr>r?r@objr r r rfs\             zPool._handle_resultsccs4t|}x&ttj||}|s"dS||fVq WdS)N)rtuplerislice)rAitsizerr r r rs zPool._get_taskscCs tddS)Nz:pool objects cannot be passed between processes or pickled)NotImplementedError)rr r r r"szPool.__reduce__cCs&tjd|jtkr"t|_t|j_dS)Nz closing pool)rr6rQrPCLOSEr_)rr r r r2s  z Pool.closecCs$tjdt|_t|j_|jdS)Nzterminating pool)rr6rrQr_rk)rr r r terminates zPool.terminatecCsVtjd|jttfkst|jj|jj|j jx|j D] }|jqBWdS)Nz joining pool) rr6rQrrr.r_rrerhr[)rrr r r rs     z Pool.joincCsDtjd|jjx*|jr>|jjr>|jjtj dqWdS)Nz7removing tasks from inqueue until task handler finishedr) rr6Z_rlockacquireis_aliver3rrzrr)r: task_handlerrr r r _help_stuff_finish(s    zPool._help_stuff_finishc CsFtjdt|_t|_tjd|j||t||jsJt|dksJtt|_|jdtjdt j |k rx|j |rt |ddrtjdx|D]} | j dkr| jqWtjdt j |k r|j tjdt j |k r|j |rBt |ddrBtjd x0|D](} | jrtjd | j| j qWdS) Nzfinalizing poolz&helping task handler/workers to finishrzjoining worker handlerrzterminating workerszjoining task handlerzjoining result handlerzjoining pool workerszcleaning up worker %d)rr6rrQrrprr.r/r]rrr1rqrpid) clsrr:r;rZworker_handlerrZresult_handlerrrr r r ri1s8                zPool._terminate_poolcCs|S)Nr )rr r r __enter___szPool.__enter__cCs |jdS)N)r)rexc_typeZexc_valZexc_tbr r r __exit__bsz Pool.__exit__)N)N)NNN)r)r)NNN)NNN)#rrrr*rsrHrrrr\rxrKr|r rrrrrr{rr} staticmethodr^rbrfrr"r2rrr classmethodrirrr r r r rsF8         . <  .c@s@eZdZddZddZddZddd Zdd d Zd d ZdS)rcCs4tj|_tt|_||_||_||_|||j<dS)N) r]ZEvent_eventnext job_counterrrO _callback_error_callback)rrrrr r r rks   zApplyResult.__init__cCs |jjS)N)rZis_set)rr r r readysszApplyResult.readycCs|js t|jS)N)rr._success)rr r r successfulvs zApplyResult.successfulNcCs|jj|dS)N)rwait)rtimeoutr r r rzszApplyResult.waitcCs,|j||jst|jr"|jS|jdS)N)rrrr_value)rrr r r r0}s  zApplyResult.getcCsV|\|_|_|jr$|jr$|j|j|jr>|j r>|j|j|jj|j|j=dS)N)rrrrrsetrOr)rr@rr r r rs     zApplyResult._set)N)N) rrrrrrrr0rr r r r ris   rc@seZdZddZddZdS)rcCsftj||||dd|_dg||_||_|dkrLd|_|jj||j=n||t |||_dS)N)rTr) rrrr _chunksize _number_leftrrrbool)rrrlengthrrr r r rs    zMapResult.__init__cCs|jd8_|\}}|rp|jrp||j||j|d|j<|jdkr|jrZ|j|j|j|j=|jjnH| r|jrd|_||_|jdkr|j r|j |j|j|j=|jjdS)NrrF) rrrrrrOrrrr)rr@Zsuccess_resultsuccessrCr r r rs"         zMapResult._setN)rrrrrr r r r rs rc@s:eZdZddZddZd ddZeZdd Zd d ZdS) rcCsJtjtj|_tt|_||_tj |_ d|_ d|_ i|_ |||j<dS)Nr)r]Z ConditionZLock_condrrrrO collectionsdeque_items_index_length _unsorted)rrr r r rs  zIMapIterator.__init__cCs|S)Nr )rr r r __iter__szIMapIterator.__iter__NcCs|j~y|jj}Wnhtk r~|j|jkr6t|jj|y|jj}Wn(tk rx|j|jkrpttYnXYnXWdQRX|\}}|r|S|dS)N) rrpopleft IndexErrorrr StopIterationrr)rrrrr&r r r rs"   zIMapIterator.nextc Cs|j|j|krr|jj||jd7_x8|j|jkrd|jj|j}|jj||jd7_q.W|jjn ||j|<|j|jkr|j|j =WdQRXdS)Nr) rrrrtrpopnotifyrrOr)rr@rr r r rs      zIMapIterator._setc Cs<|j,||_|j|jkr.|jj|j|j=WdQRXdS)N)rrrrrOr)rrr r r rs   zIMapIterator._set_length)N) rrrrrr__next__rrr r r r rs   rc@seZdZddZdS)rc CsP|j@|jj||jd7_|jj|j|jkrB|j|j=WdQRXdS)Nr)rrrtrrrrOr)rr@rr r r rs    zIMapUnorderedIterator._setN)rrrrr r r r rsrc@s@eZdZdZeddZddffddZddZed d ZdS) rFcOsddlm}|||S)Nr)rH)ZdummyrH)r rBrHr r r rHs zThreadPool.ProcessNcCstj||||dS)N)rr)rrlr<r=r r r rszThreadPool.__init__cCs,tj|_tj|_|jj|_|jj|_dS)N)rLrMrjrdr/rcr0rg)rr r r rKs   zThreadPool._setup_queuesc Cs<|j,|jj|jjdg||jjWdQRXdS)N)Z not_emptyrLclearextendZ notify_all)r:rrr r r rs zThreadPool._help_stuff_finish) rrrrsrrHrrKrr r r r r s  )"__all__r]rLrrrUrrrrrrrPrrcountrrrr7rrr!r$rFr8objectrrZ AsyncResultrrrrr r r r  s@   *\&)@