From 803351889930c8792fb97a614dbfb2019b436f8c Mon Sep 17 00:00:00 2001 From: Markus Heiser Date: Mon, 24 May 2021 16:59:35 +0200 Subject: [PATCH] [pylint] searx/network/__init__.py & add global (THREADLOCAL) No functional change! - fix messages from pylint - add ``global THREADLOCAL`` - normalized various indentation Signed-off-by: Markus Heiser --- searx/network/__init__.py | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/searx/network/__init__.py b/searx/network/__init__.py index 587198144..c921bdecb 100644 --- a/searx/network/__init__.py +++ b/searx/network/__init__.py @@ -1,4 +1,6 @@ # SPDX-License-Identifier: AGPL-3.0-or-later +# lint: pylint +# pylint: disable=missing-module-docstring, missing-function-docstring, global-statement import asyncio import threading @@ -31,29 +33,33 @@ except ImportError: self._count.release() def get(self): - if not self._count.acquire(True): + if not self._count.acquire(True): #pylint: disable=consider-using-with raise Empty return self._queue.popleft() THREADLOCAL = threading.local() - +"""Thread-local data is data for thread specific values.""" def reset_time_for_thread(): + global THREADLOCAL THREADLOCAL.total_time = 0 def get_time_for_thread(): """returns thread's total time or None""" + global THREADLOCAL return THREADLOCAL.__dict__.get('total_time') def set_timeout_for_thread(timeout, start_time=None): + global THREADLOCAL THREADLOCAL.timeout = timeout THREADLOCAL.start_time = start_time def set_context_network_name(network_name): + global THREADLOCAL THREADLOCAL.network = get_network(network_name) @@ -62,11 +68,13 @@ def get_context_network(): If unset, return value from :py:obj:`get_network`. """ + global THREADLOCAL return THREADLOCAL.__dict__.get('network') or get_network() def request(method, url, **kwargs): """same as requests/requests/api.py request(...)""" + global THREADLOCAL time_before_request = default_timer() # timeout (httpx) @@ -153,18 +161,17 @@ def patch(url, data=None, **kwargs): def delete(url, **kwargs): return request('delete', url, **kwargs) - -async def stream_chunk_to_queue(network, q, method, url, **kwargs): +async def stream_chunk_to_queue(network, queue, method, url, **kwargs): try: async with network.stream(method, url, **kwargs) as response: - q.put(response) + queue.put(response) async for chunk in response.aiter_bytes(65536): if len(chunk) > 0: - q.put(chunk) + queue.put(chunk) except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e: - q.put(e) + queue.put(e) finally: - q.put(None) + queue.put(None) def stream(method, url, **kwargs): @@ -179,13 +186,15 @@ def stream(method, url, **kwargs): httpx.Client.stream requires to write the httpx.HTTPTransport version of the the httpx.AsyncHTTPTransport declared above. """ - q = SimpleQueue() - future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(get_network(), q, method, url, **kwargs), - get_loop()) - chunk_or_exception = q.get() + queue = SimpleQueue() + future = asyncio.run_coroutine_threadsafe( + stream_chunk_to_queue(get_network(), queue, method, url, **kwargs), + get_loop() + ) + chunk_or_exception = queue.get() while chunk_or_exception is not None: if isinstance(chunk_or_exception, Exception): raise chunk_or_exception yield chunk_or_exception - chunk_or_exception = q.get() + chunk_or_exception = queue.get() return future.result()