3 \ *@sdddgZddlZddlZddlZddlZddlZddlZddlZddlm Z m Z ddl Z ddl m Z ddl mZejjZdd lmZmZmZmZmZGd ddeZeZGd ddeZGd ddeZdS) Queue SimpleQueue JoinableQueueN)EmptyFull) connection)context)debuginfoFinalizeregister_after_fork is_exitingc@seZdZd(ddZddZddZdd Zd)d d Zd*ddZddZ ddZ ddZ ddZ ddZ ddZddZddZd d!Zed"d#Zed$d%Zed&d'Zd S)+rrcCs|dkrddlm}||_tjdd\|_|_|j|_t j |_ t j dkrTd|_n |j|_|j||_d|_|jt j dkrt|tjdS)Nrr) SEM_VALUE_MAXF)duplexwin32)Z synchronizer_maxsizerPipe_reader_writerLock_rlockosgetpid_opidsysplatform_wlockZBoundedSemaphore_sem _ignore_epipe _after_forkr r)selfmaxsizectxr$/usr/lib64/python3.6/queues.py__init__$s       zQueue.__init__cCs.tj||j|j|j|j|j|j|j|j fS)N) r assert_spawningrrrrrrrr)r!r$r$r% __getstate__9s zQueue.__getstate__c Cs0|\|_|_|_|_|_|_|_|_|jdS)N) rrrrrrrrr )r!stater$r$r% __setstate__>s$zQueue.__setstate__cCsbtdtjtj|_tj|_d|_d|_ d|_ d|_ d|_ |j j|_|jj|_|jj|_dS)NzQueue._after_fork()F)r threading Conditionr _notempty collectionsdeque_buffer_thread _jointhread_joincancelled_closed_closer send_bytes _send_bytesr recv_bytes _recv_bytespoll_poll)r!r$r$r%r Cs   zQueue._after_forkTNc Cs\|j s t|jj||st|j.|jdkr8|j|jj ||jj WdQRXdS)N) r4AssertionErrorracquirerr-r1 _start_threadr0appendnotify)r!objblocktimeoutr$r$r%putPs   z Queue.putc Cs|r2|dkr2|j|j}WdQRX|jjnr|rBtj|}|jj||sTtzB|rv|tj}|j|stn |jst|j}|jjWd|jjXt j |S)N) rr9rreleasetimeZ monotonicr=rr;_ForkingPicklerloads)r!rBrCresZdeadliner$r$r%get[s&      z Queue.getcCs|j|jjjS)N)rr_semlockZ _get_value)r!r$r$r%qsizessz Queue.qsizecCs |j S)N)r;)r!r$r$r%emptywsz Queue.emptycCs |jjjS)N)rrK_is_zero)r!r$r$r%fullzsz Queue.fullcCs |jdS)NF)rJ)r!r$r$r% get_nowait}szQueue.get_nowaitcCs |j|dS)NF)rD)r!rAr$r$r% put_nowaitszQueue.put_nowaitc Cs2d|_z|jjWd|j}|r,d|_|XdS)NT)r4rcloser5)r!rRr$r$r%rRsz Queue.closecCs$td|jst|jr |jdS)NzQueue.join_thread())r r4r<r2)r!r$r$r% join_threads zQueue.join_threadc Cs6tdd|_y|jjWntk r0YnXdS)NzQueue.cancel_join_thread()T)r r3r2ZcancelAttributeError)r!r$r$r%cancel_join_threads zQueue.cancel_join_threadcCstd|jjtjtj|j|j|j|j |j j |j fdd|_ d|j _td|j jtd|jst|j tjtj|j gd d|_t|tj|j|jgd d|_dS) NzQueue._start_thread()ZQueueFeederThread)targetargsnameTzdoing self._thread.start()z... done self._thread.start())Z exitpriority )r r0clearr+ZThreadr_feedr-r7rrrRrr1Zdaemonstartr3r _finalize_joinweakrefrefr2_finalize_closer5)r!r$r$r%r>s(       zQueue._start_threadcCs4td|}|dk r(|jtdntddS)Nzjoining queue threadz... queue thread joinedz... queue thread already dead)r join)ZtwrZthreadr$r$r%r_s  zQueue._finalize_joinc Cs.td||jt|jWdQRXdS)Nztelling queue thread to quit)r r? _sentinelr@)buffernotemptyr$r$r%rbs zQueue._finalize_closecCsPtd|j}|j}|j}|j} t} tjdkr<|j} |j} nd} xy|z|sX|Wd|Xybx\| } | | krtd|dStj | } | dkr|| qh| z || Wd| XqhWWnt k rYnXWqDt k rF}zJ|rt |ddt jkrdStr&td|dSddl}|jWYdd}~XqDXqDWdS)Nz$starting thread to feed data to piperz%feeder thread got sentinel -- exitingerrnorzerror in queue thread: %s)r r=rEwaitpopleftrdrrrGdumps IndexError ExceptiongetattrrgZEPIPErr traceback print_exc)rerfr6Z writelockrRZ ignore_epipeZnacquireZnreleaseZnwaitZbpopleftsentinelZwacquireZwreleaserAernr$r$r%r]sR       z Queue._feed)r)TN)TN)__name__ __module__ __qualname__r&r(r*r rDrJrLrMrOrPrQrRrSrUr> staticmethodr_rbr]r$r$r$r%r"s$    c@s@eZdZdddZddZddZdd d Zd d ZddZd S)rrcCs*tj|||d|jd|_|j|_dS)N)r#r)rr&Z Semaphore_unfinished_tasksr,_cond)r!r"r#r$r$r%r&s zJoinableQueue.__init__cCstj||j|jfS)N)rr(rwrv)r!r$r$r%r(szJoinableQueue.__getstate__cCs,tj||dd|dd\|_|_dS)Nry)rr*rwrv)r!r)r$r$r%r*szJoinableQueue.__setstate__TNcCsx|j s t|jj||st|jJ|j8|jdkr@|j|j j ||j j |jj WdQRXWdQRXdS)N)r4r<rr=rr-rwr1r>r0r?rvrEr@)r!rArBrCr$r$r%rDs    zJoinableQueue.putc Cs@|j0|jjdstd|jjjr2|jjWdQRXdS)NFz!task_done() called too many times)rwrvr= ValueErrorrKrNZ notify_all)r!r$r$r% task_done's   zJoinableQueue.task_donec Cs,|j|jjjs|jjWdQRXdS)N)rwrvrKrNrh)r!r$r$r%rc.s zJoinableQueue.join)r)TN) rrrsrtr&r(r*rDr{rcr$r$r$r%r s   c@s<eZdZddZddZddZddZd d Zd d Zd S)rcCsHtjdd\|_|_|j|_|jj|_tj dkr:d|_ n |j|_ dS)NF)rr) rrrrrrr:r;rrr)r!r#r$r$r%r&9s    zSimpleQueue.__init__cCs |j S)N)r;)r!r$r$r%rMBszSimpleQueue.emptycCstj||j|j|j|jfS)N)r r'rrrr)r!r$r$r%r(Es zSimpleQueue.__getstate__cCs"|\|_|_|_|_|jj|_dS)N)rrrrr:r;)r!r)r$r$r%r*IszSimpleQueue.__setstate__c Cs&|j|jj}WdQRXtj|S)N)rrr8rGrH)r!rIr$r$r%rJMszSimpleQueue.getc CsDtj|}|jdkr"|jj|n|j|jj|WdQRXdS)N)rGrjrrr6)r!rAr$r$r%rDSs   zSimpleQueue.putN) rrrsrtr&rMr(r*rJrDr$r$r$r%r7s  )__all__rrr+r.rFr`rgZqueuerrZ_multiprocessingrr Z reductionZForkingPicklerrGutilr r r r robjectrrdrrr$r$r$r% s$   b *