3 \: @sdZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl Z ddl Z ddlZddl mZddlmZddlmZmZy ddlZWnek rdZYnXddlmZddlmZdd lmZdd lmZdd lmZdd lmZdd lmZddl m!Z!ddl"m#Z#e j$dkrHddl%m&Z&n ddlm&Z&ddZ'e'dZ(e'dZ)ddZ*ddZ+dRddZ,ddZ-Gdd d eZ.Gd!d"d"eZ/Gd#d$d$Z0Gd%d&d&e0e/Z1d'd(d)d*Z2e3ed+rZGd,d-d-ej4eZ5Gd.d/d/e5eZ6Gd0d1d1e6Z7Gd2d3d3e0e7Z8d4d5Z9ej:d6d7Z;ej:d'd(d8d9Zd?Z>Gd@dAdAej?Z@GdBdCdCejAZBdDdEZCGdFdGdGeDZEdHdIZFGdJdKdKe jGZGej:dLdMZHejIejJejKfdNdOZLdPdQZMdS)SzUtilities shared by tests.N)mock) HTTPServer)WSGIRequestHandler WSGIServer) base_events)compat)events)futures) selectors)tasks) coroutine)logger)supportZwin32) socketpaircCs`ttdr*tjjtj|}tjj|r*|Stjjtjjtjd|}tjj|rT|St |dS)N TEST_HOME_DIRtest) hasattrrospathjoinrisfiledirname__file__FileNotFoundError)filenamefullnamer"/usr/lib64/python3.6/test_utils.py data_file-s   rz ssl_cert.pemz ssl_key.pemcCstdkr dStjtjSdS)N)ssl SSLContextZPROTOCOL_SSLv23rrrrdummy_ssl_context<sr"c Cs@tdd}|}|j|}d|_z|j|Wd|jXdS)NcSsdS)NrrrrronceDszrun_briefly..onceF)r Z create_taskZ_log_destroy_pendingrun_until_completeclose)loopr#gentrrr run_brieflyCs  r)cCsTtj|}xB|sN|dk r8|tj}|dkr8tj|jtjd|dqWdS)NrgMbP?)r&)timer TimeoutErrorr$r Zsleep)r&ZpredtimeoutZdeadlinerrr run_untilRs  r.cCs|j|j|jdS)zLegacy API to run once through the event loop. This is the recommended pattern for test code. It will poll the selector once and run all callbacks scheduled in response to I/O events. N)Z call_soonstopZ run_forever)r&rrrrun_once\s r0c@seZdZddZddZdS)SilentWSGIRequestHandlercCstjS)N)ioStringIO)selfrrr get_stderrisz#SilentWSGIRequestHandler.get_stderrcGsdS)Nr)r4formatargsrrr log_messagelsz$SilentWSGIRequestHandler.log_messageN)__name__ __module__ __qualname__r5r8rrrrr1gsr1cs(eZdZdZfddZddZZS)SilentWSGIServercs"tj\}}|j|j||fS)N)super get_request settimeoutrequest_timeout)r4request client_addr) __class__rrr?ts zSilentWSGIServer.get_requestcCsdS)Nr)r4rBclient_addressrrr handle_erroryszSilentWSGIServer.handle_error)r9r:r;rAr?rF __classcell__rr)rDrr<ps r<c@seZdZddZdS)SSLWSGIServerMixinc Cs^t}t}tj}|j|||j|dd}y|j||||jWntk rXYnXdS)NT)Z server_side) ONLYKEYONLYCERTr r!Zload_cert_chainZ wrap_socketZRequestHandlerClassr%OSError)r4rBrEZkeyfileZcertfilecontextZssockrrrfinish_requests  z!SSLWSGIServerMixin.finish_requestN)r9r:r;rMrrrrrH}srHc@s eZdZdS) SSLWSGIServerN)r9r:r;rrrrrNsrNF)use_sslc #svdd}|r|n|}||tj|j_tjfddd}|jz VWdjj|j XdS)NcSsd}dg}|||dgS)Nz200 OK Content-type text/plains Test message)rPrQr)environZstart_responseZstatusZheadersrrrapps z_run_test_server..appcs jddS)Ng?)Z poll_interval)Z serve_foreverr)httpdrrsz"_run_test_server..)target) r1Zset_appZserver_addressaddress threadingZThreadstartshutdownZ server_closer)rWrO server_clsserver_ssl_clsrSZ server_classZ server_threadr)rTr_run_test_servers    r]ZAF_UNIXc@seZdZddZdS)UnixHTTPServercCstjj|d|_d|_dS)Nz 127.0.0.1P) socketserverUnixStreamServer server_bindZ server_nameZ server_port)r4rrrrbs zUnixHTTPServer.server_bindN)r9r:r;rbrrrrr^sr^cs(eZdZdZddZfddZZS)UnixWSGIServerr=cCstj||jdS)N)r^rbZ setup_environ)r4rrrrbs zUnixWSGIServer.server_bindcs"tj\}}|j|j|dfS)N 127.0.0.1)rdre)r>r?r@rA)r4rBrC)rDrrr?s zUnixWSGIServer.get_request)r9r:r;rArbr?rGrr)rDrrcsrcc@seZdZddZdS)SilentUnixWSGIServercCsdS)Nr)r4rBrErrrrFsz!SilentUnixWSGIServer.handle_errorN)r9r:r;rFrrrrrfsrfc@s eZdZdS)UnixSSLWSGIServerN)r9r:r;rrrrrgsrgc Cstj}|jSQRXdS)N)tempfileZNamedTemporaryFilename)filerrrgen_unix_socket_paths rkccs<t}z |VWdytj|Wntk r4YnXXdS)N)rkrunlinkrK)rrrrunix_socket_paths rmc cs,t}t||ttdEdHWdQRXdS)N)rWrOr[r\)rmr]rfrg)rOrrrrrun_test_unix_serversrnz 127.0.0.1)hostportrOccst||f|ttdEdHdS)N)rWrOr[r\)r]r<rN)rorprOrrrrun_test_servers rqcCsPi}x4t|D](}|jdr(|jdr(qtdd||<qWtd|f|j|S)N__) return_valueZ TestProtocol)dir startswithendswith MockCallbacktype __bases__)baseZdctrirrrmake_test_protocols r{c@s6eZdZddZd ddZddZdd Zd d ZdS) TestSelectorcCs i|_dS)N)keys)r4rrr__init__szTestSelector.__init__NcCstj|d||}||j|<|S)Nr)r Z SelectorKeyr})r4fileobjr datakeyrrrregisters zTestSelector.registercCs |jj|S)N)r}pop)r4rrrr unregister szTestSelector.unregistercCsgS)Nr)r4r-rrrselectszTestSelector.selectcCs|jS)N)r})r4rrrget_mapszTestSelector.get_map)N)r9r:r;r~rrrrrrrrr|s  r|cseZdZdZd-fdd ZddZddZfd d Zd d Zd dZ ddZ ddZ ddZ ddZ ddZddZddZddZdd Zd!d"Zd#d$Zfd%d&Zfd'd(Zd)d*Zd+d,ZZS).TestLoopaLoop for unittests. It manages self time directly. If something scheduled to be executed later then on next loop iteration after all ready handlers done generator passed to __init__ is calling. Generator should be like this: def gen(): ... when = yield ... ... = yield time_advance Value returned by yield is absolute time of next scheduled handler. Value passed to yield is time advance to move loop's time forward. Ncsvtj|dkr"dd}d|_nd|_||_t|jd|_d|_g|_t|_ i|_ i|_ |j t j|_dS)Ncss dVdS)Nrrrrrr',szTestLoop.__init__..genFTrg& .>)r>r~_check_on_close_gennext_timeZ_clock_resolution_timersr|Z _selectorreaderswritersreset_countersweakrefWeakValueDictionary _transports)r4r')rDrrr~(s  zTestLoop.__init__cCs|jS)N)r)r4rrrr+?sz TestLoop.timecCs|r|j|7_dS)zMove test time forward.N)r)r4advancerrr advance_timeBszTestLoop.advance_timec sBtj|jr>y|jjdWntk r4Yn XtddS)NrzTime generator is not finished)r>r%rrsend StopIterationAssertionError)r4)rDrrr%Gs zTestLoop.closecGstj||||j|<dS)N)r Handler)r4fdcallbackr7rrr _add_readerQszTestLoop._add_readercCs0|j|d7<||jkr(|j|=dSdSdS)NrTF)remove_reader_countr)r4rrrr_remove_readerTs  zTestLoop._remove_readercGsh||jkrtd|d|j|}|j|krDtd|jd||j|krdtd|jd|dS)Nzfd z is not registeredzunexpected callback: z != zunexpected callback args: )rrZ _callbackZ_args)r4rrr7handlerrr assert_reader\s    zTestLoop.assert_readercCs||jkrtd|ddS)Nzfd z is registered)rr)r4rrrrassert_no_readergs zTestLoop.assert_no_readercGstj||||j|<dS)N)r rr)r4rrr7rrr _add_writerkszTestLoop._add_writercCs0|j|d7<||jkr(|j|=dSdSdS)NrTF)remove_writer_countr)r4rrrr_remove_writerns  zTestLoop._remove_writercGs|j|}dS)N)r)r4rrr7rrrr assert_writervs zTestLoop.assert_writerc Cs8y|j|}Wntk r"YnXtdj||dS)Nz.File descriptor {!r} is used by transport {!r})rKeyError RuntimeErrorr6)r4rZ transportrrr_ensure_fd_no_transport~sz TestLoop._ensure_fd_no_transportcGs|j||j||f|S)zAdd a reader callback.)rr)r4rrr7rrr add_readers zTestLoop.add_readercCs|j||j|S)zRemove a reader callback.)rr)r4rrrr remove_readers zTestLoop.remove_readercGs|j||j||f|S)zAdd a writer callback..)rr)r4rrr7rrr add_writers zTestLoop.add_writercCs|j||j|S)zRemove a writer callback.)rr)r4rrrr remove_writers zTestLoop.remove_writercCstjt|_tjt|_dS)N) collections defaultdictintrr)r4rrrrs zTestLoop.reset_counterscs:tjx$|jD]}|jj|}|j|qWg|_dS)N)r> _run_oncerrrr)r4whenr)rDrrrs    zTestLoop._run_oncecs |jj|tj||f|S)N)rappendr>call_at)r4rrr7)rDrrrs zTestLoop.call_atcCsdS)Nr)r4Z event_listrrr_process_eventsszTestLoop._process_eventscCsdS)Nr)r4rrr_write_to_selfszTestLoop._write_to_self)N)r9r:r;__doc__r~r+rr%rrrrrrrrrrrrrrrrrrGrr)rDrrs,     rcKstjfddgi|S)Nspec__call__)rZMock)kwargsrrrrwsrwc@seZdZdZddZdS) MockPatternzA regex based str with a fuzzy __eq__. Use this helper with 'mock.assert_called_with', or anywhere where a regex comparison between strings is needed. For instance: mock_call.assert_called_with(MockPattern('spam.*ham')) cCsttjt||tjS)N)boolresearchstrS)r4otherrrr__eq__szMockPattern.__eq__N)r9r:r;rrrrrrrsrcCs$tj|}|dkr td|f|S)Nzunable to get the source of %r)r Z_get_function_source ValueError)funcsourcerrrget_function_sources rc@sVeZdZeddZddddZddd Zd d Zd d ZddZ e j sRddZ dS)TestCasecCs&|j}|dk r|jdd|jdS)NT)wait)Z_default_executorrZr%)r&executorrrr close_loops zTestCase.close_loopT)cleanupcCs tjd|r|j|j|dS)N)r set_event_loopZ addCleanupr)r4r&rrrrrs zTestCase.set_event_loopNcCst|}|j||S)N)rr)r4r'r&rrr new_test_loops zTestCase.new_test_loopcCs |jt_dS)N)_get_running_loopr )r4rrrunpatch_get_running_loopsz!TestCase.unpatch_get_running_loopcCs tj|_ddt_tj|_dS)NcSsdS)NrrrrrrUsz TestCase.setUp..)r rrZthreading_setup_thread_cleanup)r4rrrsetUps zTestCase.setUpcCsB|jtjd|jtjd|jtj|j tj dS)N)NNN) rr rZ assertEqualsysexc_infoZ doCleanupsrZthreading_cleanuprZ reap_children)r4rrrtearDowns   zTestCase.tearDowncOsGddd}|S)Nc@seZdZddZddZdS)z!TestCase.subTest..EmptyCMcSsdS)Nr)r4rrr __enter__sz+TestCase.subTest..EmptyCM.__enter__cWsdS)Nr)r4excrrr__exit__sz*TestCase.subTest..EmptyCM.__exit__N)r9r:r;rrrrrrEmptyCMsrr)r4r7rrrrrsubTestszTestCase.subTest)N) r9r:r; staticmethodrrrrrrrZPY34rrrrrrs   rc cs2tj}ztjtjddVWdtj|XdS)zrContext manager to disable asyncio logger. For example, it can be used to ignore warnings in debug mode. rN)rlevelZsetLevelloggingZCRITICAL)Z old_levelrrrdisable_loggers  rcCs*tjtj}||_||_||_d|j_|S)z'Create a mock of a non-blocking socket.g)rZ MagicMocksocketprotorxfamilyZ gettimeoutrs)rrxrZsockrrrmock_nonblocking_socket s  rcCstjdddS)Nz'asyncio.sslproto._is_sslproto_availableF)rs)rZpatchrrrrforce_legacy_ssl_supportsr)r*)Nrr contextlibr2rrrrr`rrhrXr+ZunittestrrZ http.serverrZwsgiref.simple_serverrrr ImportErrorrerrr r r r Z coroutinesr logrrrplatformZ windows_utilsrrrJrIr"r)r.r0r1r<rHrNr]rrar^rcrfrgrkcontextmanagerrmrnrqr{Z BaseSelectorr|Z BaseEventLooprrwrrrrrZ IPPROTO_TCPZ SOCK_STREAMZAF_INETrrrrrrs                        4