@ -1,4 +1,6 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=missing-module-docstring, missing-function-docstring, global-statement
import asyncio
import asyncio
import threading
import threading
@ -31,29 +33,33 @@ except ImportError:
self . _count . release ( )
self . _count . release ( )
def get ( self ) :
def get ( self ) :
if not self . _count . acquire ( True ) :
if not self . _count . acquire ( True ) : #pylint: disable=consider-using-with
raise Empty
raise Empty
return self . _queue . popleft ( )
return self . _queue . popleft ( )
THREADLOCAL = threading . local ( )
THREADLOCAL = threading . local ( )
""" Thread-local data is data for thread specific values. """
def reset_time_for_thread ( ) :
def reset_time_for_thread ( ) :
global THREADLOCAL
THREADLOCAL . total_time = 0
THREADLOCAL . total_time = 0
def get_time_for_thread ( ) :
def get_time_for_thread ( ) :
""" returns thread ' s total time or None """
""" returns thread ' s total time or None """
global THREADLOCAL
return THREADLOCAL . __dict__ . get ( ' total_time ' )
return THREADLOCAL . __dict__ . get ( ' total_time ' )
def set_timeout_for_thread ( timeout , start_time = None ) :
def set_timeout_for_thread ( timeout , start_time = None ) :
global THREADLOCAL
THREADLOCAL . timeout = timeout
THREADLOCAL . timeout = timeout
THREADLOCAL . start_time = start_time
THREADLOCAL . start_time = start_time
def set_context_network_name ( network_name ) :
def set_context_network_name ( network_name ) :
global THREADLOCAL
THREADLOCAL . network = get_network ( network_name )
THREADLOCAL . network = get_network ( network_name )
@ -62,11 +68,13 @@ def get_context_network():
If unset , return value from : py : obj : ` get_network ` .
If unset , return value from : py : obj : ` get_network ` .
"""
"""
global THREADLOCAL
return THREADLOCAL . __dict__ . get ( ' network ' ) or get_network ( )
return THREADLOCAL . __dict__ . get ( ' network ' ) or get_network ( )
def request ( method , url , * * kwargs ) :
def request ( method , url , * * kwargs ) :
""" same as requests/requests/api.py request(...) """
""" same as requests/requests/api.py request(...) """
global THREADLOCAL
time_before_request = default_timer ( )
time_before_request = default_timer ( )
# timeout (httpx)
# timeout (httpx)
@ -153,18 +161,17 @@ def patch(url, data=None, **kwargs):
def delete ( url , * * kwargs ) :
def delete ( url , * * kwargs ) :
return request ( ' delete ' , url , * * kwargs )
return request ( ' delete ' , url , * * kwargs )
async def stream_chunk_to_queue ( network , queue , method , url , * * kwargs ) :
async def stream_chunk_to_queue ( network , q , method , url , * * kwargs ) :
try :
try :
async with network . stream ( method , url , * * kwargs ) as response :
async with network . stream ( method , url , * * kwargs ) as response :
q . put ( response )
q ueue . put ( response )
async for chunk in response . aiter_bytes ( 65536 ) :
async for chunk in response . aiter_bytes ( 65536 ) :
if len ( chunk ) > 0 :
if len ( chunk ) > 0 :
q . put ( chunk )
q ueue . put ( chunk )
except ( httpx . HTTPError , OSError , h2 . exceptions . ProtocolError ) as e :
except ( httpx . HTTPError , OSError , h2 . exceptions . ProtocolError ) as e :
q . put ( e )
q ueue . put ( e )
finally :
finally :
q . put ( None )
q ueue . put ( None )
def stream ( method , url , * * kwargs ) :
def stream ( method , url , * * kwargs ) :
@ -179,13 +186,15 @@ def stream(method, url, **kwargs):
httpx . Client . stream requires to write the httpx . HTTPTransport version of the
httpx . Client . stream requires to write the httpx . HTTPTransport version of the
the httpx . AsyncHTTPTransport declared above .
the httpx . AsyncHTTPTransport declared above .
"""
"""
q = SimpleQueue ( )
queue = SimpleQueue ( )
future = asyncio . run_coroutine_threadsafe ( stream_chunk_to_queue ( get_network ( ) , q , method , url , * * kwargs ) ,
future = asyncio . run_coroutine_threadsafe (
get_loop ( ) )
stream_chunk_to_queue ( get_network ( ) , queue , method , url , * * kwargs ) ,
chunk_or_exception = q . get ( )
get_loop ( )
)
chunk_or_exception = queue . get ( )
while chunk_or_exception is not None :
while chunk_or_exception is not None :
if isinstance ( chunk_or_exception , Exception ) :
if isinstance ( chunk_or_exception , Exception ) :
raise chunk_or_exception
raise chunk_or_exception
yield chunk_or_exception
yield chunk_or_exception
chunk_or_exception = q . get ( )
chunk_or_exception = q ueue . get ( )
return future . result ( )
return future . result ( )