3 \ P@sbdZdZddlZddlZddlmZddlZddlmZddlZddlm Z ddl m Z ddl Z ddl Z ddlmZddlZddlZe jZd ad d Zd ZGd ddeZGdddZddZGdddeZGdddeZGdddeZddZddZ ddZ!dd Z"d!d"Z#d a$da%d#d$Z&d%d&Z'Gd'd(d(e(Z)Gd)d*d*ej*Z+ej,edS)+a* Implements ProcessPoolExecutor. The follow diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ Executor.submit() called: - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: - reads work ids from the "Work Ids" queue and looks up the corresponding WorkItem from the "Work Items" dict: if the work item has been cancelled then it is simply removed from the dict, otherwise it is repackaged as a _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). - reads _ResultItems from "Result Q", updates the future stored in the "Work Items" dict and deletes the dict entry Process #1..n: - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Result Q" z"Brian Quinlan (brian@sweetapp.com)N)_base)Full) SimpleQueue)wait)partialFcCsJdattj}x|D]\}}|jdqWx|D]\}}|jq2WdS)NT) _shutdownlist_threads_queuesitemsputjoin)r tqr/usr/lib64/python3.6/process.py _python_exitOs  rc@seZdZddZddZdS)_RemoteTracebackcCs ||_dS)N)tb)selfrrrr__init__asz_RemoteTraceback.__init__cCs|jS)N)r)rrrr__str__csz_RemoteTraceback.__str__N)__name__ __module__ __qualname__rrrrrrr`src@seZdZddZddZdS)_ExceptionWithTracebackcCs0tjt|||}dj|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontyper excr)rr rrrrrgs z _ExceptionWithTraceback.__init__cCst|j|jffS)N) _rebuild_excr r)rrrr __reduce__lsz"_ExceptionWithTraceback.__reduce__N)rrrrr"rrrrrfsrcCst||_|S)N)r __cause__)r rrrrr!os r!c@seZdZddZdS) _WorkItemcCs||_||_||_||_dS)N)futurefnargskwargs)rr%r&r'r(rrrrtsz_WorkItem.__init__N)rrrrrrrrr$ssr$c@seZdZdddZdS) _ResultItemNcCs||_||_||_dS)N)work_id exceptionresult)rr*r+r,rrrr{sz_ResultItem.__init__)NN)rrrrrrrrr)zsr)c@seZdZddZdS) _CallItemcCs||_||_||_||_dS)N)r*r&r'r()rr*r&r'r(rrrrsz_CallItem.__init__N)rrrrrrrrr-sr-cgs0t|}x"ttj||}|s"dS|Vq WdS)z, Iterates over zip()ed iterables in chunks. N)ziptuple itertoolsislice) chunksize iterablesitchunkrrr _get_chunkss r6csfdd|DS)z Processes a chunk of an iterable passed to map. Runs the function passed to map() on a chunk of the iterable passed to map. This function is run in a separate process. csg|] }|qSrr).0r')r&rr sz"_process_chunk..r)r&r5r)r&r_process_chunks r9cCsx|jdd}|dkr(|jtjdSy|j|j|j}WnBtk r~}z&t||j }|jt |j |dWYdd}~XqX|jt |j |dqWdS)aEvaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: call_queue: A multiprocessing.Queue of _CallItems that will be read and evaluated by the worker. result_queue: A multiprocessing.Queue of _ResultItems that will written to by the worker. shutdown: A multiprocessing.Event that will be set as a signal to the worker that it should exit when call_queue is empty. T)blockN)r+)r,) getr osgetpidr&r'r( BaseExceptionr __traceback__r)r*) call_queue result_queueZ call_itemrer rrr_process_workers   & rDcCsxxr|jrdSy|jdd}Wntjk r4dSX||}|jjrh|jt||j|j |j ddq||=qqWdS)aMFills call_queue with _WorkItems from pending_work_items. This function never blocks. Args: pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids are consumed and the corresponding _WorkItems from pending_work_items are transformed into _CallItems and put in call_queue. call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems. NF)r:T) Zfullr;queueZEmptyr%Zset_running_or_notify_cancelr r-r&r'r()pending_work_itemsZwork_idsr@r* work_itemrrr_add_call_item_to_queues   rHc sdfdd}fdd}|j}xlt||ddjD} t|g| } || krf|j} nr|dk rd_d_dx&|jD]\} } | jj t d ~ qW|j xjD] }|j qW|dSt | trj| }|jsR|dSnJ| dk rR|j| jd} | dk rR| jrB| jj | jn| jj| j~ ||ry|sr|dSWntk rYnXdq(WdS) aManages the communication between this process and the worker processes. This function is run in a local thread. Args: executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. process: A list of the multiprocessing.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. NcstpdkpjS)N)r_shutdown_threadr)executorrr shutting_downsz/_queue_management_worker..shutting_downcsZtddjD}xtd|D]}jdq"WjxjD] }|jqFWdS)Ncss|]}|jVqdS)N)Zis_alive)r7prrr szD_queue_management_worker..shutdown_worker..r)sumvaluesrangeZ put_nowaitcloser )Znb_children_aliveirL)r@ processesrrshutdown_workers z1_queue_management_worker..shutdown_workercSsg|] }|jqSr)sentinel)r7rLrrrr8 sz,_queue_management_worker..Tz^A process in the process pool was terminated abruptly while the future was running or pending.)Z_readerrHrOrZrecv_brokenrIr r%Z set_exceptionBrokenProcessPoolclearZ terminate isinstanceintpopr r*r+Z set_resultr,r)Zexecutor_referencerSrFZwork_ids_queuer@rArKrTreaderZ sentinelsZreadyZ result_itemr*rGrLr)r@rJrSr_queue_management_workersb        r]c Cshtrtrttdaytjd}Wnttfk r:dSX|dkrHdS|dkrTdSd|attdS)NTSC_SEM_NSEMS_MAXrz@system provides too few semaphores (%d available, 256 necessary))_system_limits_checked_system_limitedNotImplementedErrorr<sysconfAttributeError ValueError)Z nsems_maxrrr_check_system_limitsQsrgccs.x(|D] }|jx|r$|jVqWqWdS)z Specialized implementation of itertools.chain.from_iterable. Each item in *iterable* should be a list. This function is careful not to keep references to yielded objects. N)reverser[)iterableelementrrr_chain_from_iterable_of_listshs rkc@seZdZdZdS)rWzy Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. N)rrr__doc__rrrrrWtsrWcsheZdZdddZddZddZdd Zejjj e_ dd d fd d Z dddZ ejj j e _ Z S)ProcessPoolExecutorNcCst|dkrtjpd|_n|dkr.td||_tj|jt|_d|j_ t |_ t j|_ d|_i|_d|_tj|_d|_d|_i|_dS)a/Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. Nrrz"max_workers must be greater than 0TF)rgr< cpu_count _max_workersrfmultiprocessingZQueueEXTRA_QUEUED_CALLS _call_queueZ _ignore_epiper _result_queuerE _work_ids_queue_management_thread _processesrI threadingZLock_shutdown_lockrV _queue_count_pending_work_items)r max_workersrrrr|s$   zProcessPoolExecutor.__init__cCsp|jfdd}|jdkrl|jtjttj|||j|j |j |j |jfd|_d|j_ |jj |jt|j<dS)NcSs|jddS)N)r )_rrrr weakref_cbszFProcessPoolExecutor._start_queue_management_thread..weakref_cb)targetr'T)rsru_adjust_process_countrwZThreadr]weakrefrefrvrzrtrrZdaemonstartr )rr}rrr_start_queue_management_threads   z2ProcessPoolExecutor._start_queue_management_threadcCsJxDtt|j|jD].}tjt|j|jfd}|j ||j|j <qWdS)N)r~r') rPlenrvrorpZProcessrDrrrsrpid)rr|rLrrrrs z)ProcessPoolExecutor._adjust_process_countc Os|jt|jrtd|jr$tdtj}t||||}||j|j <|j j |j |j d7_ |j j d|j |SQRXdS)NzKA child process terminated abruptly, the process pool is not usable anymorez*cannot schedule new futures after shutdownr)rxrVrWrI RuntimeErrorrZFuturer$rzryrtr rsr)rr&r'r(fwrrrsubmits  zProcessPoolExecutor.submitr)timeoutr2cs:|dkrtdtjtt|t|d|i|d}t|S)ajReturns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. rzchunksize must be >= 1.r2)r)rfsupermaprr9r6rk)rr&rr2r3results) __class__rrrs  zProcessPoolExecutor.mapTc CsT|j d|_WdQRX|jr8|jjd|r8|jjd|_d|_d|_d|_dS)NT)rxrIrursr r rrrv)rrrrrshutdowns  zProcessPoolExecutor.shutdown)N)T) rrrrrrrrExecutorrlrr __classcell__rr)rrrm{s (   rm)-rl __author__atexitr<concurrent.futuresrrErrprZmultiprocessing.connectionrrwr functoolsrr0rWeakKeyDictionaryr rrrq Exceptionrrr!objectr$r)r-r6r9rDrHr]rarbrgrkrrWrrmregisterrrrr,sJ         %s