B `2P@s8ddlZddlZddlZddlmZmZmZmZmZm Z m Z ddl m Z ddl mZmZmZddlmZyddlmZWn ek rddlmZYnXdZe d ZGd d d eZGd d d eeZGdddZGdddZGdddeZGdddeZeZGdddeeZ Gddde eZ!dS)N) AwaitableCallableGenericListOptionalTupleTypeVar) BaseProtocol)BaseTimerContext set_exception set_result)internal_logger)Deque) EMPTY_PAYLOAD EofStream StreamReader DataQueueFlowControlDataQueue_Tc@seZdZdZdS)rzeof stream indication.N)__name__ __module__ __qualname____doc__rrj/private/var/folders/7j/8686xlfs15q3tgljmghtvg0r0000gn/T/pip-target-isidps9b/lib/python/aiohttp/streams.pyrsrc@sDeZdZegeefddddZddddZedd d ZdS) AsyncStreamIteratorN) read_funcreturncCs ||_dS)N)r)selfrrrr__init__szAsyncStreamIterator.__init__zAsyncStreamIterator[_T])rcCs|S)Nr)rrrr __aiter__"szAsyncStreamIterator.__aiter__cs<y|IdH}Wntk r*tYnX|dkr8t|S)N)rrStopAsyncIteration)rrvrrr __anext__%s zAsyncStreamIterator.__anext__) rrrrrrr r!r%rrrrrsrc@s@eZdZdddddZddddZeeefdd d ZdS) ChunkTupleAsyncStreamIteratorrN)streamrcCs ||_dS)N)_stream)rr'rrrr 0sz&ChunkTupleAsyncStreamIterator.__init__)rcCs|S)Nr)rrrrr!3sz'ChunkTupleAsyncStreamIterator.__aiter__cs |jIdH}|dkrt|S)N)r"F)r( readchunkr#)rr$rrrr%6sz'ChunkTupleAsyncStreamIterator.__anext__) rrrr r!rbytesboolr%rrrrr&/sr&c@sReZdZeedddZeeedddZeedddZe dd d Z d S) AsyncStreamReaderMixin)rcCs t|jS)N)rreadline)rrrrr!>sz AsyncStreamReaderMixin.__aiter__)nrcstfddS)zzReturns an asynchronous iterator that yields chunks of size n. Python-3.5 available for Python 3.5+ only cs S)N)readr)r.rrrFr"z5AsyncStreamReaderMixin.iter_chunked..)r)rr.r)r.rr iter_chunkedAsz#AsyncStreamReaderMixin.iter_chunkedcCs t|jS)zReturns an asynchronous iterator that yields all the available data as soon as it is received Python-3.5 available for Python 3.5+ only )rreadany)rrrriter_anyHszAsyncStreamReaderMixin.iter_anycCst|S)a Returns an asynchronous iterator that yields chunks of data as they are received by the server. The yielded objects are tuples of (bytes, bool) as returned by the StreamReader.readchunk method. Python-3.5 available for Python 3.5+ only )r&)rrrr iter_chunksPsz"AsyncStreamReaderMixin.iter_chunksN) rrrrr*r!intr1r3r&r4rrrrr,=sr,c@seZdZdZdZdddeeeeee j ddddZ e dd d Z eeefdd d Zeedd dZeddddZegdfddddZddddZedddZedddZddddZeddddZdeed-d6d7Z%eed-d8d9Z&eed-d:d;Z'dS)?ra*An enhancement of asyncio.StreamReader. Supports asynchronous iteration by line, chunk or as available:: async for line in reader: ... async for chunk in reader.iter_chunked(1024): ... async for slice in reader.iter_any(): ... rN)timerloop)protocollimitr6r7rcCsv||_||_|d|_|dkr&t}||_d|_d|_d|_t |_ d|_ d|_ d|_d|_d|_||_g|_dS)NrF) _protocol _low_water _high_waterasyncioZget_event_loop_loop_size_cursor_http_chunk_splits collectionsdeque_buffer_buffer_offset_eof_waiter _eof_waiter _exception_timer_eof_callbacks)rr8r9r6r7rrrr js"  zStreamReader.__init__)rcCs|jjg}|jr |d|j|jr0|d|jdkrP|d|j|jf|jrf|d|j|jr||d|jdd |S) Nz%d byteseofizlow=%d high=%dzw=%rze=%rz<%s> ) __class__rr@appendrGr<r=rHrJjoin)rinforrr__repr__s   zStreamReader.__repr__cCs |j|jfS)N)r<r=)rrrrget_read_buffer_limitssz#StreamReader.get_read_buffer_limitscCs|jS)N)rJ)rrrr exceptionszStreamReader.exception)excrcCsP||_|j|j}|dk r.d|_t|||j}|dk rLd|_t||dS)N)rJrLclearrHr rI)rrVwaiterrrrr s  zStreamReader.set_exception)callbackrcCsB|jr2y |Wq>tk r.tdYq>Xn |j|dS)NzException in eof callback)rG ExceptionrrUrLrP)rrYrrron_eofs  zStreamReader.on_eofc Csd|_|j}|dk r$d|_t|d|j}|dk rBd|_t|dx8|jD].}y |WqJtk rvtdYqJXqJW|jdS)NTzException in eof callback) rGrHr rIrLrZrrUrW)rrXcbrrrfeed_eofs    zStreamReader.feed_eofcCs|jS)z&Return True if 'feed_eof' was called.)rG)rrrris_eofszStreamReader.is_eofcCs|jo |j S)z=Return True if the buffer is empty and 'feed_eof' was called.)rGrE)rrrrat_eofszStreamReader.at_eofcsB|jr dS|jdkst|j|_z|jIdHWdd|_XdS)N)rGrIAssertionErrorr? create_future)rrrrwait_eofs zStreamReader.wait_eof)datarcCsxtjdtdd|sdS|jr>|jd|jd|jd<d|_|jt|7_|jt|8_|j|d|_ dS)zDrollback reading some data from stream, inserting it to buffer head.zJunread_data() is deprecated and will be removed in future releases (#3260)r:) stacklevelNr) warningswarnDeprecationWarningrFrEr@lenrA appendleft _eof_counter)rrcrrr unread_datas zStreamReader.unread_data)rcsizercCs|jrtd|sdS|jt|7_|j||jt|7_|j}|dk rdd|_t|d|j|j kr|j j s|j dS)Nzfeed_data after feed_eof) rGr`r@rhrErP total_bytesrHr r=r;_reading_paused pause_reading)rrcrlrXrrr feed_datas  zStreamReader.feed_datacCs"|jdkr|jrtdg|_dS)Nz?Called begin_http_chunk_receiving whensome data was already fed)rBrm RuntimeError)rrrrbegin_http_chunk_receivings  z'StreamReader.begin_http_chunk_receivingcCsd|jdkrtd|jr"|jdnd}|j|kr4dS|j|j|j}|dk r`d|_t|ddS)NzFCalled end_chunk_receiving without calling begin_chunk_receiving firstr)rBrqrmrPrHr )rposrXrrrend_http_chunk_receivings   z%StreamReader.end_http_chunk_receiving) func_namerc sf|jdk rtd||j}|_z2|jrL|j|IdHWdQRXn |IdHWdd|_XdS)NzH%s() called while another coroutine is already waiting for incoming data)rHrqr?rarK)rrvrXrrr_wait#s zStreamReader._waitcs|jdk r|jg}d}d}x|rxp|jr|r|j}|jdd|d}||rZ||nd}|||t|7}|r~d}||jkr$tdq$W|j rP|r| dIdHqWd |S) NrT r rsFzLine is too longr-r") rJrErFfind_read_nowait_chunkrPrhr= ValueErrorrGrwrQ)rlineZ line_sizeZ not_enoughoffsetZicharrcrrrr-8s*      zStreamReader.readliners)r.rcs|jdk r|j|jrF|jsFt|ddd|_|jdkrFtjddd|sNdS|dkrg}x"|IdH}|spP||q\Wd |Sx |js|js| d IdHqW| |S) Nrjrr zEMultiple access to StreamReader in eof state, might be infinite loop.T) stack_infor"r/) rJrGrEgetattrrjrwarningr2rPrQrw _read_nowait)rr.blocksblockrrrr/Vs*    zStreamReader.readcs<|jdk r|jx |js0|js0|dIdHqW|dS)Nr2rs)rJrErGrwr)rrrrr2s  zStreamReader.readanycsx|jdk r|jxL|jr^|jd}||jkr4dS||jkrR|||jdfStdqW|jrt|ddfS|j r~dS| d IdHqWdS) zReturns a tuple of (data, end_of_http_chunk). When chunked transfer encoding is used, end_of_http_chunk is a boolean indicating if the end of the data corresponds to the end of a HTTP chunk , otherwise it is always False. Nr)r"TTzESkipping HTTP chunk end due to data consumption beyond chunk boundaryrsF)r"Fr)) rJrBpoprArrrrErzrGrw)rrtrrrr)s      zStreamReader.readchunkcst|jdk r|jg}xT|dkrh||IdH}|sPd|}t|t|||||t|8}qWd|S)Nrr")rJr/rQr>IncompleteReadErrorrhrP)rr.rrpartialrrr readexactlys    zStreamReader.readexactlycCs2|jdk r|j|jr(|js(td||S)Nz9Called while some coroutine is waiting for incoming data.)rJrHdonerqr)rr.rrr read_nowaits  zStreamReader.read_nowaitcCs|jd}|j}|dkrHt|||krH||||}|j|7_n,|rj|j||d}d|_n |j}|jt|8_|jt|7_|j}x |r|d|jkr|dqW|j|jkr|j j r|j |S)Nrrs) rErFrhpopleftr@rArBrr<r;rnresume_reading)rr.Z first_bufferr}rcZ chunk_splitsrrrrzs$     zStreamReader._read_nowait_chunkcCsTg}x<|jr@||}|||dkr|t|8}|dkrPqW|rPd|SdS)z8 Read not more than n bytes, or whole buffer if n == -1 rsrr")rErzrPrhrQ)rr.chunkschunkrrrrs   zStreamReader._read_nowait)r)rs)rs)(rrrrrmr r5rr r>AbstractEventLoopr strrSrrT BaseExceptionrUr rr[r]r+r^r_rbr*rkrprrrurwr-r/r2r)rrrzrrrrrrZs6   *  rc@seZdZeedddZeddddZegdfddd d Zddd d Z e dd dZ e dddZ ddddZ d%eeddddZedddZd&eedddZedddZeee fddd Zeedd!d"Zedd#d$ZdS)'EmptyStreamReader)rcCsdS)Nr)rrrrrUszEmptyStreamReader.exceptionN)rVrcCsdS)Nr)rrVrrrr szEmptyStreamReader.set_exception)rYrcCs.y |Wntk r(tdYnXdS)NzException in eof callback)rZrrU)rrYrrrr[s zEmptyStreamReader.on_eofcCsdS)Nr)rrrrr]szEmptyStreamReader.feed_eofcCsdS)NTr)rrrrr^szEmptyStreamReader.is_eofcCsdS)NTr)rrrrr_szEmptyStreamReader.at_eofcsdS)Nr)rrrrrb szEmptyStreamReader.wait_eofr)rcr.rcCsdS)Nr)rrcr.rrrrp szEmptyStreamReader.feed_datacsdS)Nr"r)rrrrr-szEmptyStreamReader.readliners)r.rcsdS)Nr"r)rr.rrrr/szEmptyStreamReader.readcsdS)Nr"r)rrrrr2szEmptyStreamReader.readanycsdS)N)r"Tr)rrrrr)szEmptyStreamReader.readchunkcstd|dS)Nr")r>r)rr.rrrrszEmptyStreamReader.readexactlycCsdS)Nr"r)rrrrrszEmptyStreamReader.read_nowait)r)rs)rrrrrrUr rr[r]r+r^r_rbr*r5rpr-r/r2rr)rrrrrrrsrc@seZdZdZejddddZedddZe dd d Z e dd d Z e e dd dZe ddddZdeeddddZddddZedddZeedddZdS)rz>DataQueue is a general-purpose blocking queue with one reader.N)r7rcCs,||_d|_d|_d|_d|_t|_dS)NFr)r?rGrHrJr@rCrDrE)rr7rrrr )s zDataQueue.__init__)rcCs t|jS)N)rhrE)rrrr__len__1szDataQueue.__len__cCs|jS)N)rG)rrrrr^4szDataQueue.is_eofcCs|jo |j S)N)rGrE)rrrrr_7szDataQueue.at_eofcCs|jS)N)rJ)rrrrrU:szDataQueue.exception)rVrcCs.d|_||_|j}|dk r*d|_t||dS)NT)rGrJrHr )rrVrXrrrr =s zDataQueue.set_exceptionr)rcrlrcCs@|j|7_|j||f|j}|dk rZCancelledError TimeoutErrorrr@rJr)rrcrlrrrr/Ws    zDataQueue.readcCs t|jS)N)rr/)rrrrr!kszDataQueue.__aiter__)r)rrrrr>rr r5rr+r^r_rrrUr rrpr]r/rr!rrrrr&s  rcsXeZdZdZeeejddfdd Zd e eddfdd Z e d fd d Z Z S)rzgFlowControlDataQueue resumes and pauses an underlying stream. It is a destination for parsed data.N)r8r9r7rcs"tj|d||_|d|_dS)N)r7r:)superr r;_limit)rr8r9r7)rOrrr tszFlowControlDataQueue.__init__r)rcrlrcs0t|||j|jkr,|jjs,|jdS)N)rrpr@rr;rnro)rrcrl)rOrrrp|szFlowControlDataQueue.feed_data)rcs6ztIdHS|j|jkr0|jjr0|jXdS)N)rr/r@rr;rnr)r)rOrrr/szFlowControlDataQueue.read)r) rrrrr r5r>rr rrpr/ __classcell__rr)rOrros r)"r>rCretypingrrrrrrrZ base_protocolr Zhelpersr r r logrr ImportErrorZtyping_extensions__all__rrZrrr&r,rrrrrrrrrs0$  /I