fixed aiohttp.client_exceptions.ClientConnectionError: Connection closed (#2718)

I fixed an issue where an error would always occur when making a request
using the `TextRequestsWrapper` with async API.

This is caused by escaping the scope of the context, which causes the
connection to be broken when reading the response body.

The correct usage is as described in the [official
tutorial](https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request),
where the text method must also be handled in the context scope.

<details>

<summary>Stacktrace</summary>

```
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/tools/base.py", line 116, in arun
    raise e
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/tools/base.py", line 110, in arun
    observation = await self._arun(tool_input)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/agents/tools.py", line 22, in _arun
    return await self.coroutine(tool_input)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/chains/base.py", line 234, in arun
    return (await self.acall(args[0]))[self.output_keys[0]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/chains/base.py", line 154, in acall
    raise e
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/chains/base.py", line 148, in acall
    outputs = await self._acall(inputs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/src/tools/example.py", line 153, in _acall
    api_response = await self.requests_wrapper.aget("http://example.com")
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/langchain/requests.py", line 130, in aget
    return await response.text()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 1081, in text
    await self.read()
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 1037, in read
    self._body = await self.content.read()
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vscode/.cache/pypoetry/virtualenvs/codehex-workspace-xS3fZVNL-py3.11/lib/python3.11/site-packages/aiohttp/streams.py", line 349, in read
  raise self._exception
aiohttp.client_exceptions.ClientConnectionError: Connection closed
```

</details>
fix_agent_callbacks
Kei Kamikawa 1 year ago committed by GitHub
parent 3623bdb31b
commit 186ca9d3e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,5 +1,6 @@
"""Lightweight wrapper around requests library, with async support."""
from typing import Any, Dict, Optional
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Optional
import aiohttp
import requests
@ -42,47 +43,62 @@ class Requests(BaseModel):
"""DELETE the URL and return the text."""
return requests.delete(url, headers=self.headers, **kwargs)
@asynccontextmanager
async def _arequest(
self, method: str, url: str, **kwargs: Any
) -> aiohttp.ClientResponse:
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""Make an async request."""
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=self.headers, **kwargs
) as response:
return response
yield response
else:
async with self.aiosession.request(
method, url, headers=self.headers, **kwargs
) as response:
return response
yield response
async def aget(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
@asynccontextmanager
async def aget(
self, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""GET the URL and return the text asynchronously."""
return await self._arequest("GET", url, **kwargs)
async with self._arequest("GET", url, **kwargs) as response:
yield response
@asynccontextmanager
async def apost(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> aiohttp.ClientResponse:
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""POST to the URL and return the text asynchronously."""
return await self._arequest("POST", url, json=data, **kwargs)
async with self._arequest("POST", url, **kwargs) as response:
yield response
@asynccontextmanager
async def apatch(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> aiohttp.ClientResponse:
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""PATCH the URL and return the text asynchronously."""
return await self._arequest("PATCH", url, json=data, **kwargs)
async with self._arequest("PATCH", url, **kwargs) as response:
yield response
@asynccontextmanager
async def aput(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> aiohttp.ClientResponse:
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""PUT the URL and return the text asynchronously."""
return await self._arequest("PUT", url, json=data, **kwargs)
async with self._arequest("PUT", url, **kwargs) as response:
yield response
async def adelete(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
@asynccontextmanager
async def adelete(
self, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""DELETE the URL and return the text asynchronously."""
return await self._arequest("DELETE", url, **kwargs)
async with self._arequest("DELETE", url, **kwargs) as response:
yield response
class TextRequestsWrapper(BaseModel):
@ -126,28 +142,28 @@ class TextRequestsWrapper(BaseModel):
async def aget(self, url: str, **kwargs: Any) -> str:
"""GET the URL and return the text asynchronously."""
response = await self.requests.aget(url, **kwargs)
return await response.text()
async with self.requests.aget(url, **kwargs) as response:
return await response.text()
async def apost(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""POST to the URL and return the text asynchronously."""
response = await self.requests.apost(url, data, **kwargs)
return await response.text()
async with self.requests.apost(url, **kwargs) as response:
return await response.text()
async def apatch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PATCH the URL and return the text asynchronously."""
response = await self.requests.apatch(url, data, **kwargs)
return await response.text()
async with self.requests.apatch(url, **kwargs) as response:
return await response.text()
async def aput(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str:
"""PUT the URL and return the text asynchronously."""
response = await self.requests.aput(url, data, **kwargs)
return await response.text()
async with self.requests.aput(url, **kwargs) as response:
return await response.text()
async def adelete(self, url: str, **kwargs: Any) -> str:
"""DELETE the URL and return the text asynchronously."""
response = await self.requests.adelete(url, **kwargs)
return await response.text()
async with self.requests.adelete(url, **kwargs) as response:
return await response.text()
# For backwards compatibility

Loading…
Cancel
Save