forked from Archives/langchain
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
327 lines
11 KiB
Python
327 lines
11 KiB
Python
"""Base implementation for tools or skills."""
|
|
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
from inspect import signature
|
|
from typing import Any, Awaitable, Callable, Dict, Optional, Tuple, Type, Union
|
|
|
|
from pydantic import (
|
|
BaseModel,
|
|
Extra,
|
|
Field,
|
|
create_model,
|
|
validate_arguments,
|
|
validator,
|
|
)
|
|
from pydantic.main import ModelMetaclass
|
|
|
|
from langchain.callbacks import get_callback_manager
|
|
from langchain.callbacks.base import BaseCallbackManager
|
|
|
|
|
|
class SchemaAnnotationError(TypeError):
|
|
"""Raised when 'args_schema' is missing or has an incorrect type annotation."""
|
|
|
|
|
|
class ToolMetaclass(ModelMetaclass):
|
|
"""Metaclass for BaseTool to ensure the provided args_schema
|
|
|
|
doesn't silently ignored."""
|
|
|
|
def __new__(
|
|
cls: Type[ToolMetaclass], name: str, bases: Tuple[Type, ...], dct: dict
|
|
) -> ToolMetaclass:
|
|
"""Create the definition of the new tool class."""
|
|
schema_type: Optional[Type[BaseModel]] = dct.get("args_schema")
|
|
if schema_type is not None:
|
|
schema_annotations = dct.get("__annotations__", {})
|
|
args_schema_type = schema_annotations.get("args_schema", None)
|
|
if args_schema_type is None or args_schema_type == BaseModel:
|
|
# Throw errors for common mis-annotations.
|
|
# TODO: Use get_args / get_origin and fully
|
|
# specify valid annotations.
|
|
typehint_mandate = """
|
|
class ChildTool(BaseTool):
|
|
...
|
|
args_schema: Type[BaseModel] = SchemaClass
|
|
..."""
|
|
raise SchemaAnnotationError(
|
|
f"Tool definition for {name} must include valid type annotations"
|
|
f" for argument 'args_schema' to behave as expected.\n"
|
|
f"Expected annotation of 'Type[BaseModel]'"
|
|
f" but got '{args_schema_type}'.\n"
|
|
f"Expected class looks like:\n"
|
|
f"{typehint_mandate}"
|
|
)
|
|
# Pass through to Pydantic's metaclass
|
|
return super().__new__(cls, name, bases, dct)
|
|
|
|
|
|
def _create_subset_model(
|
|
name: str, model: BaseModel, field_names: list
|
|
) -> Type[BaseModel]:
|
|
"""Create a pydantic model with only a subset of model's fields."""
|
|
fields = {
|
|
field_name: (
|
|
model.__fields__[field_name].type_,
|
|
model.__fields__[field_name].default,
|
|
)
|
|
for field_name in field_names
|
|
if field_name in model.__fields__
|
|
}
|
|
return create_model(name, **fields) # type: ignore
|
|
|
|
|
|
def get_filtered_args(
|
|
inferred_model: Type[BaseModel],
|
|
func: Callable,
|
|
) -> dict:
|
|
"""Get the arguments from a function's signature."""
|
|
schema = inferred_model.schema()["properties"]
|
|
valid_keys = signature(func).parameters
|
|
return {k: schema[k] for k in valid_keys}
|
|
|
|
|
|
def create_schema_from_function(
|
|
model_name: str,
|
|
func: Callable,
|
|
) -> Type[BaseModel]:
|
|
"""Create a pydantic schema from a function's signature."""
|
|
inferred_model = validate_arguments(func).model # type: ignore
|
|
# Pydantic adds placeholder virtual fields we need to strip
|
|
filtered_args = get_filtered_args(inferred_model, func)
|
|
return _create_subset_model(
|
|
f"{model_name}Schema", inferred_model, list(filtered_args)
|
|
)
|
|
|
|
|
|
class BaseTool(ABC, BaseModel, metaclass=ToolMetaclass):
|
|
"""Interface LangChain tools must implement."""
|
|
|
|
name: str
|
|
"""The unique name of the tool that clearly communicates its purpose."""
|
|
description: str
|
|
"""Used to tell the model how/when/why to use the tool.
|
|
|
|
You can provide few-shot examples as a part of the description.
|
|
"""
|
|
args_schema: Optional[Type[BaseModel]] = None
|
|
"""Pydantic model class to validate and parse the tool's input arguments."""
|
|
return_direct: bool = False
|
|
"""Whether to return the tool's output directly. Setting this to True means
|
|
|
|
that after the tool is called, the AgentExecutor will stop looping.
|
|
"""
|
|
verbose: bool = False
|
|
"""Whether to log the tool's progress."""
|
|
callback_manager: BaseCallbackManager = Field(default_factory=get_callback_manager)
|
|
"""Callback manager for this tool."""
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
extra = Extra.forbid
|
|
arbitrary_types_allowed = True
|
|
|
|
@property
|
|
def is_single_input(self) -> bool:
|
|
"""Whether the tool only accepts a single input."""
|
|
return len(self.args) == 1
|
|
|
|
@property
|
|
def args(self) -> dict:
|
|
if self.args_schema is not None:
|
|
return self.args_schema.schema()["properties"]
|
|
else:
|
|
inferred_model = validate_arguments(self._run).model # type: ignore
|
|
return get_filtered_args(inferred_model, self._run)
|
|
|
|
def _parse_input(
|
|
self,
|
|
tool_input: Union[str, Dict],
|
|
) -> None:
|
|
"""Convert tool input to pydantic model."""
|
|
input_args = self.args_schema
|
|
if isinstance(tool_input, str):
|
|
if input_args is not None:
|
|
key_ = next(iter(input_args.__fields__.keys()))
|
|
input_args.validate({key_: tool_input})
|
|
else:
|
|
if input_args is not None:
|
|
input_args.validate(tool_input)
|
|
|
|
@validator("callback_manager", pre=True, always=True)
|
|
def set_callback_manager(
|
|
cls, callback_manager: Optional[BaseCallbackManager]
|
|
) -> BaseCallbackManager:
|
|
"""If callback manager is None, set it.
|
|
|
|
This allows users to pass in None as callback manager, which is a nice UX.
|
|
"""
|
|
return callback_manager or get_callback_manager()
|
|
|
|
@abstractmethod
|
|
def _run(self, *args: Any, **kwargs: Any) -> Any:
|
|
"""Use the tool."""
|
|
|
|
@abstractmethod
|
|
async def _arun(self, *args: Any, **kwargs: Any) -> Any:
|
|
"""Use the tool asynchronously."""
|
|
|
|
def _to_args_and_kwargs(self, tool_input: Union[str, Dict]) -> Tuple[Tuple, Dict]:
|
|
# For backwards compatibility, if run_input is a string,
|
|
# pass as a positional argument.
|
|
if isinstance(tool_input, str):
|
|
return (tool_input,), {}
|
|
else:
|
|
return (), tool_input
|
|
|
|
def run(
|
|
self,
|
|
tool_input: Union[str, Dict],
|
|
verbose: Optional[bool] = None,
|
|
start_color: Optional[str] = "green",
|
|
color: Optional[str] = "green",
|
|
**kwargs: Any,
|
|
) -> str:
|
|
"""Run the tool."""
|
|
self._parse_input(tool_input)
|
|
if not self.verbose and verbose is not None:
|
|
verbose_ = verbose
|
|
else:
|
|
verbose_ = self.verbose
|
|
self.callback_manager.on_tool_start(
|
|
{"name": self.name, "description": self.description},
|
|
tool_input if isinstance(tool_input, str) else str(tool_input),
|
|
verbose=verbose_,
|
|
color=start_color,
|
|
**kwargs,
|
|
)
|
|
try:
|
|
tool_args, tool_kwargs = self._to_args_and_kwargs(tool_input)
|
|
observation = self._run(*tool_args, **tool_kwargs)
|
|
except (Exception, KeyboardInterrupt) as e:
|
|
self.callback_manager.on_tool_error(e, verbose=verbose_)
|
|
raise e
|
|
self.callback_manager.on_tool_end(
|
|
str(observation), verbose=verbose_, color=color, name=self.name, **kwargs
|
|
)
|
|
return observation
|
|
|
|
async def arun(
|
|
self,
|
|
tool_input: Union[str, Dict],
|
|
verbose: Optional[bool] = None,
|
|
start_color: Optional[str] = "green",
|
|
color: Optional[str] = "green",
|
|
**kwargs: Any,
|
|
) -> Any:
|
|
"""Run the tool asynchronously."""
|
|
self._parse_input(tool_input)
|
|
if not self.verbose and verbose is not None:
|
|
verbose_ = verbose
|
|
else:
|
|
verbose_ = self.verbose
|
|
if self.callback_manager.is_async:
|
|
await self.callback_manager.on_tool_start(
|
|
{"name": self.name, "description": self.description},
|
|
tool_input if isinstance(tool_input, str) else str(tool_input),
|
|
verbose=verbose_,
|
|
color=start_color,
|
|
**kwargs,
|
|
)
|
|
else:
|
|
self.callback_manager.on_tool_start(
|
|
{"name": self.name, "description": self.description},
|
|
tool_input if isinstance(tool_input, str) else str(tool_input),
|
|
verbose=verbose_,
|
|
color=start_color,
|
|
**kwargs,
|
|
)
|
|
try:
|
|
# We then call the tool on the tool input to get an observation
|
|
tool_args, tool_kwargs = self._to_args_and_kwargs(tool_input)
|
|
observation = await self._arun(*tool_args, **tool_kwargs)
|
|
except (Exception, KeyboardInterrupt) as e:
|
|
if self.callback_manager.is_async:
|
|
await self.callback_manager.on_tool_error(e, verbose=verbose_)
|
|
else:
|
|
self.callback_manager.on_tool_error(e, verbose=verbose_)
|
|
raise e
|
|
if self.callback_manager.is_async:
|
|
await self.callback_manager.on_tool_end(
|
|
str(observation),
|
|
verbose=verbose_,
|
|
color=color,
|
|
name=self.name,
|
|
**kwargs,
|
|
)
|
|
else:
|
|
self.callback_manager.on_tool_end(
|
|
observation, verbose=verbose_, color=color, name=self.name, **kwargs
|
|
)
|
|
return observation
|
|
|
|
def __call__(self, tool_input: Union[str, dict]) -> Any:
|
|
"""Make tool callable."""
|
|
return self.run(tool_input)
|
|
|
|
|
|
class StructuredTool(BaseTool):
|
|
"""Tool that can operate on any number of inputs."""
|
|
|
|
description: str = ""
|
|
args_schema: Type[BaseModel] = Field(..., description="The tool schema.")
|
|
"""The input arguments' schema."""
|
|
func: Callable[..., str]
|
|
"""The function to run when the tool is called."""
|
|
coroutine: Optional[Callable[..., Awaitable[str]]] = None
|
|
"""The asynchronous version of the function."""
|
|
|
|
@property
|
|
def args(self) -> dict:
|
|
"""The tool's input arguments."""
|
|
return self.args_schema.schema()["properties"]
|
|
|
|
def _run(self, *args: Any, **kwargs: Any) -> Any:
|
|
"""Use the tool."""
|
|
return self.func(*args, **kwargs)
|
|
|
|
async def _arun(self, *args: Any, **kwargs: Any) -> Any:
|
|
"""Use the tool asynchronously."""
|
|
if self.coroutine:
|
|
return await self.coroutine(*args, **kwargs)
|
|
raise NotImplementedError("Tool does not support async")
|
|
|
|
@classmethod
|
|
def from_function(
|
|
cls,
|
|
func: Callable,
|
|
name: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
return_direct: bool = False,
|
|
args_schema: Optional[Type[BaseModel]] = None,
|
|
infer_schema: bool = True,
|
|
**kwargs: Any,
|
|
) -> StructuredTool:
|
|
name = name or func.__name__
|
|
description = description or func.__doc__
|
|
assert (
|
|
description is not None
|
|
), "Function must have a docstring if description not provided."
|
|
|
|
# Description example:
|
|
# search_api(query: str) - Searches the API for the query.
|
|
description = f"{name}{signature(func)} - {description.strip()}"
|
|
_args_schema = args_schema
|
|
if _args_schema is None and infer_schema:
|
|
_args_schema = create_schema_from_function(f"{name}Schema", func)
|
|
return cls(
|
|
name=name,
|
|
func=func,
|
|
args_schema=_args_schema,
|
|
description=description,
|
|
return_direct=return_direct,
|
|
**kwargs,
|
|
)
|