add async to zapier nla tools (#6791)

Replace this comment with:
  - Description: Add Async functionality to Zapier NLA Tools
  - Issue:  n/a 
  - Dependencies: n/a
  - Tag maintainer: 

Maintainer responsibilities:
  - Agents / Tools / Toolkits: @vowelparrot
  - Async: @agola11

If no one reviews your PR within a few days, feel free to @-mention the
same people again.

See contribution guidelines for more information on how to write/run
tests, lint, etc:
https://github.com/hwchase17/langchain/blob/master/.github/CONTRIBUTING.md
This commit is contained in:
Matthew Plachter 2023-06-27 19:53:35 -04:00 committed by GitHub
parent efe0d39c6a
commit d6664af0ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 294 additions and 20 deletions

View File

@ -29,6 +29,23 @@ class ZapierToolkit(BaseToolkit):
]
return cls(tools=tools)
@classmethod
async def async_from_zapier_nla_wrapper(
cls, zapier_nla_wrapper: ZapierNLAWrapper
) -> "ZapierToolkit":
"""Create a toolkit from a ZapierNLAWrapper."""
actions = await zapier_nla_wrapper.alist()
tools = [
ZapierNLARunAction(
action_id=action["id"],
zapier_description=action["description"],
params_schema=action["params"],
api_wrapper=zapier_nla_wrapper,
)
for action in actions
]
return cls(tools=tools)
def get_tools(self) -> List[BaseTool]:
"""Get the tools in the toolkit."""
return self.tools

View File

@ -142,11 +142,15 @@ class ZapierNLARunAction(BaseTool):
async def _arun(
self,
_: str,
instructions: str,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the Zapier NLA tool to return a list of all exposed user actions."""
raise NotImplementedError("ZapierNLAListActions does not support async")
return await self.api_wrapper.arun_as_str(
self.action_id,
instructions,
self.params,
)
ZapierNLARunAction.__doc__ = (
@ -184,7 +188,7 @@ class ZapierNLAListActions(BaseTool):
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the Zapier NLA tool to return a list of all exposed user actions."""
raise NotImplementedError("ZapierNLAListActions does not support async")
return await self.api_wrapper.alist_as_str()
ZapierNLAListActions.__doc__ = (

View File

@ -12,8 +12,9 @@ to use oauth. Review the full docs above and reach out to nla@zapier.com for
developer support.
"""
import json
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional
import aiohttp
import requests
from pydantic import BaseModel, Extra, root_validator
from requests import Request, Session
@ -49,36 +50,63 @@ class ZapierNLAWrapper(BaseModel):
extra = Extra.forbid
def _get_session(self) -> Session:
session = requests.Session()
session.headers.update(
{
def _format_headers(self) -> Dict[str, str]:
"""Format headers for requests."""
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
)
if self.zapier_nla_oauth_access_token:
session.headers.update(
headers.update(
{"Authorization": f"Bearer {self.zapier_nla_oauth_access_token}"}
)
else:
session.params = {"api_key": self.zapier_nla_api_key}
headers.update({"X-API-Key": self.zapier_nla_api_key})
return headers
def _get_session(self) -> Session:
session = requests.Session()
session.headers.update(self._format_headers())
return session
def _get_action_request(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Request:
async def _arequest(self, method: str, url: str, **kwargs: Any) -> Dict[str, Any]:
"""Make an async request."""
async with aiohttp.ClientSession(headers=self._format_headers()) as session:
async with session.request(method, url, **kwargs) as response:
response.raise_for_status()
return await response.json()
def _create_action_payload( # type: ignore[no-untyped-def]
self, instructions: str, params: Optional[Dict] = None, preview_only=False
) -> Dict:
"""Create a payload for an action."""
data = params if params else {}
data.update(
{
"instructions": instructions,
}
)
if preview_only:
data.update({"preview_only": True})
return data
def _create_action_url(self, action_id: str) -> str:
"""Create a url for an action."""
return self.zapier_nla_api_base + f"exposed/{action_id}/execute/"
def _create_action_request( # type: ignore[no-untyped-def]
self,
action_id: str,
instructions: str,
params: Optional[Dict] = None,
preview_only=False,
) -> Request:
data = self._create_action_payload(instructions, params, preview_only)
return Request(
"POST",
self.zapier_nla_api_base + f"exposed/{action_id}/execute/",
self._create_action_url(action_id),
json=data,
)
@ -107,6 +135,28 @@ class ZapierNLAWrapper(BaseModel):
return values
async def alist(self) -> List[Dict]:
"""Returns a list of all exposed (enabled) actions associated with
current user (associated with the set api_key). Change your exposed
actions here: https://nla.zapier.com/demo/start/
The return list can be empty if no actions exposed. Else will contain
a list of action objects:
[{
"id": str,
"description": str,
"params": Dict[str, str]
}]
`params` will always contain an `instructions` key, the only required
param. All others optional and if provided will override any AI guesses
(see "understanding the AI guessing flow" here:
https://nla.zapier.com/api/v1/docs)
"""
response = await self._arequest("GET", self.zapier_nla_api_base + "exposed/")
return response["results"]
def list(self) -> List[Dict]:
"""Returns a list of all exposed (enabled) actions associated with
current user (associated with the set api_key). Change your exposed
@ -157,11 +207,29 @@ class ZapierNLAWrapper(BaseModel):
call.
"""
session = self._get_session()
request = self._get_action_request(action_id, instructions, params)
request = self._create_action_request(action_id, instructions, params)
response = session.send(session.prepare_request(request))
response.raise_for_status()
return response.json()["result"]
async def arun(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""Executes an action that is identified by action_id, must be exposed
(enabled) by the current user (associated with the set api_key). Change
your exposed actions here: https://nla.zapier.com/demo/start/
The return JSON is guaranteed to be less than ~500 words (350
tokens) making it safe to inject into the prompt of another LLM
call.
"""
response = await self._arequest(
"POST",
self._create_action_url(action_id),
json=self._create_action_payload(instructions, params),
)
return response["result"]
def preview(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
@ -171,25 +239,58 @@ class ZapierNLAWrapper(BaseModel):
session = self._get_session()
params = params if params else {}
params.update({"preview_only": True})
request = self._get_action_request(action_id, instructions, params)
request = self._create_action_request(action_id, instructions, params, True)
response = session.send(session.prepare_request(request))
response.raise_for_status()
return response.json()["input_params"]
async def apreview(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""Same as run, but instead of actually executing the action, will
instead return a preview of params that have been guessed by the AI in
case you need to explicitly review before executing."""
response = await self._arequest(
"POST",
self._create_action_url(action_id),
json=self._create_action_payload(instructions, params, preview_only=True),
)
return response["result"]
def run_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""Same as run, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = self.run(*args, **kwargs)
return json.dumps(data)
async def arun_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""Same as run, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = await self.arun(*args, **kwargs)
return json.dumps(data)
def preview_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""Same as preview, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = self.preview(*args, **kwargs)
return json.dumps(data)
async def apreview_as_str( # type: ignore[no-untyped-def]
self, *args, **kwargs
) -> str:
"""Same as preview, but returns a stringified version of the JSON for
insertting back into an LLM."""
data = await self.apreview(*args, **kwargs)
return json.dumps(data)
def list_as_str(self) -> str: # type: ignore[no-untyped-def]
"""Same as list, but returns a stringified version of the JSON for
insertting back into an LLM."""
actions = self.list()
return json.dumps(actions)
async def alist_as_str(self) -> str: # type: ignore[no-untyped-def]
"""Same as list, but returns a stringified version of the JSON for
insertting back into an LLM."""
actions = await self.alist()
return json.dumps(actions)

View File

@ -55,6 +55,158 @@ def test_custom_base_prompt_fail() -> None:
)
def test_format_headers_api_key() -> None:
"""Test that the action headers is being created correctly."""
tool = ZapierNLARunAction(
action_id="test",
zapier_description="test",
params_schema={"test": "test"},
api_wrapper=ZapierNLAWrapper(zapier_nla_api_key="test"),
)
headers = tool.api_wrapper._format_headers()
assert headers["Content-Type"] == "application/json"
assert headers["Accept"] == "application/json"
assert headers["X-API-Key"] == "test"
def test_format_headers_access_token() -> None:
"""Test that the action headers is being created correctly."""
tool = ZapierNLARunAction(
action_id="test",
zapier_description="test",
params_schema={"test": "test"},
api_wrapper=ZapierNLAWrapper(zapier_nla_oauth_access_token="test"),
)
headers = tool.api_wrapper._format_headers()
assert headers["Content-Type"] == "application/json"
assert headers["Accept"] == "application/json"
assert headers["Authorization"] == "Bearer test"
def test_create_action_payload() -> None:
"""Test that the action payload is being created correctly."""
tool = ZapierNLARunAction(
action_id="test",
zapier_description="test",
params_schema={"test": "test"},
api_wrapper=ZapierNLAWrapper(zapier_nla_api_key="test"),
)
payload = tool.api_wrapper._create_action_payload("some instructions")
assert payload["instructions"] == "some instructions"
assert payload.get("preview_only") is None
def test_create_action_payload_preview() -> None:
"""Test that the action payload with preview is being created correctly."""
tool = ZapierNLARunAction(
action_id="test",
zapier_description="test",
params_schema={"test": "test"},
api_wrapper=ZapierNLAWrapper(zapier_nla_api_key="test"),
)
payload = tool.api_wrapper._create_action_payload(
"some instructions",
preview_only=True,
)
assert payload["instructions"] == "some instructions"
assert payload["preview_only"] is True
def test_create_action_payload_with_params() -> None:
"""Test that the action payload with params is being created correctly."""
tool = ZapierNLARunAction(
action_id="test",
zapier_description="test",
params_schema={"test": "test"},
api_wrapper=ZapierNLAWrapper(zapier_nla_api_key="test"),
)
payload = tool.api_wrapper._create_action_payload(
"some instructions",
{"test": "test"},
preview_only=True,
)
assert payload["instructions"] == "some instructions"
assert payload["preview_only"] is True
assert payload["test"] == "test"
@pytest.mark.asyncio
async def test_apreview(mocker) -> None: # type: ignore[no-untyped-def]
"""Test that the action payload with params is being created correctly."""
tool = ZapierNLARunAction(
action_id="test",
zapier_description="test",
params_schema={"test": "test"},
api_wrapper=ZapierNLAWrapper(
zapier_nla_api_key="test",
zapier_nla_api_base="http://localhost:8080/v1/",
),
)
mockObj = mocker.patch.object(ZapierNLAWrapper, "_arequest")
await tool.api_wrapper.apreview(
"random_action_id",
"some instructions",
{"test": "test"},
)
mockObj.assert_called_once_with(
"POST",
"http://localhost:8080/v1/exposed/random_action_id/execute/",
json={
"instructions": "some instructions",
"preview_only": True,
"test": "test",
},
)
@pytest.mark.asyncio
async def test_arun(mocker) -> None: # type: ignore[no-untyped-def]
"""Test that the action payload with params is being created correctly."""
tool = ZapierNLARunAction(
action_id="test",
zapier_description="test",
params_schema={"test": "test"},
api_wrapper=ZapierNLAWrapper(
zapier_nla_api_key="test",
zapier_nla_api_base="http://localhost:8080/v1/",
),
)
mockObj = mocker.patch.object(ZapierNLAWrapper, "_arequest")
await tool.api_wrapper.arun(
"random_action_id",
"some instructions",
{"test": "test"},
)
mockObj.assert_called_once_with(
"POST",
"http://localhost:8080/v1/exposed/random_action_id/execute/",
json={"instructions": "some instructions", "test": "test"},
)
@pytest.mark.asyncio
async def test_alist(mocker) -> None: # type: ignore[no-untyped-def]
"""Test that the action payload with params is being created correctly."""
tool = ZapierNLARunAction(
action_id="test",
zapier_description="test",
params_schema={"test": "test"},
api_wrapper=ZapierNLAWrapper(
zapier_nla_api_key="test",
zapier_nla_api_base="http://localhost:8080/v1/",
),
)
mockObj = mocker.patch.object(ZapierNLAWrapper, "_arequest")
await tool.api_wrapper.alist()
mockObj.assert_called_once_with(
"GET",
"http://localhost:8080/v1/exposed/",
)
def test_wrapper_fails_no_api_key_or_access_token_initialization() -> None:
"""Test Wrapper requires either an API Key or OAuth Access Token."""
with pytest.raises(ValueError):