|
|
|
@ -35,6 +35,8 @@ if TYPE_CHECKING:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EmptyDict(TypedDict, total=False):
|
|
|
|
|
"""Empty dict type."""
|
|
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -85,6 +87,15 @@ class RunnableConfig(TypedDict, total=False):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ensure_config(config: Optional[RunnableConfig] = None) -> RunnableConfig:
|
|
|
|
|
"""Ensure that a config is a dict with all keys present.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config (Optional[RunnableConfig], optional): The config to ensure.
|
|
|
|
|
Defaults to None.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
RunnableConfig: The ensured config.
|
|
|
|
|
"""
|
|
|
|
|
empty = RunnableConfig(
|
|
|
|
|
tags=[],
|
|
|
|
|
metadata={},
|
|
|
|
@ -101,9 +112,21 @@ def ensure_config(config: Optional[RunnableConfig] = None) -> RunnableConfig:
|
|
|
|
|
def get_config_list(
|
|
|
|
|
config: Optional[Union[RunnableConfig, List[RunnableConfig]]], length: int
|
|
|
|
|
) -> List[RunnableConfig]:
|
|
|
|
|
"""
|
|
|
|
|
Helper method to get a list of configs from a single config or a list of
|
|
|
|
|
configs, useful for subclasses overriding batch() or abatch().
|
|
|
|
|
"""Get a list of configs from a single config or a list of configs.
|
|
|
|
|
|
|
|
|
|
It is useful for subclasses overriding batch() or abatch().
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]):
|
|
|
|
|
The config or list of configs.
|
|
|
|
|
length (int): The length of the list.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List[RunnableConfig]: The list of configs.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
ValueError: If the length of the list is not equal to the length of the inputs.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
if length < 0:
|
|
|
|
|
raise ValueError(f"length must be >= 0, but got {length}")
|
|
|
|
@ -129,9 +152,27 @@ def patch_config(
|
|
|
|
|
run_name: Optional[str] = None,
|
|
|
|
|
configurable: Optional[Dict[str, Any]] = None,
|
|
|
|
|
) -> RunnableConfig:
|
|
|
|
|
"""Patch a config with new values.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config (Optional[RunnableConfig]): The config to patch.
|
|
|
|
|
copy_locals (bool, optional): Whether to copy locals. Defaults to False.
|
|
|
|
|
callbacks (Optional[BaseCallbackManager], optional): The callbacks to set.
|
|
|
|
|
Defaults to None.
|
|
|
|
|
recursion_limit (Optional[int], optional): The recursion limit to set.
|
|
|
|
|
Defaults to None.
|
|
|
|
|
max_concurrency (Optional[int], optional): The max concurrency to set.
|
|
|
|
|
Defaults to None.
|
|
|
|
|
run_name (Optional[str], optional): The run name to set. Defaults to None.
|
|
|
|
|
configurable (Optional[Dict[str, Any]], optional): The configurable to set.
|
|
|
|
|
Defaults to None.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
RunnableConfig: The patched config.
|
|
|
|
|
"""
|
|
|
|
|
config = ensure_config(config)
|
|
|
|
|
if callbacks is not None:
|
|
|
|
|
# If we're replacing callbacks we need to unset run_name
|
|
|
|
|
# If we're replacing callbacks, we need to unset run_name
|
|
|
|
|
# As that should apply only to the same run as the original callbacks
|
|
|
|
|
config["callbacks"] = callbacks
|
|
|
|
|
if "run_name" in config:
|
|
|
|
@ -148,9 +189,17 @@ def patch_config(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def merge_configs(*configs: Optional[RunnableConfig]) -> RunnableConfig:
|
|
|
|
|
"""Merge multiple configs into one.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
*configs (Optional[RunnableConfig]): The configs to merge.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
RunnableConfig: The merged config.
|
|
|
|
|
"""
|
|
|
|
|
base: RunnableConfig = {}
|
|
|
|
|
# Even though the keys aren't literals this is correct
|
|
|
|
|
# because both dicts are same type
|
|
|
|
|
# Even though the keys aren't literals, this is correct
|
|
|
|
|
# because both dicts are the same type
|
|
|
|
|
for config in (c for c in configs if c is not None):
|
|
|
|
|
for key in config:
|
|
|
|
|
if key == "metadata":
|
|
|
|
@ -184,7 +233,22 @@ def call_func_with_variable_args(
|
|
|
|
|
run_manager: Optional[CallbackManagerForChainRun] = None,
|
|
|
|
|
**kwargs: Any,
|
|
|
|
|
) -> Output:
|
|
|
|
|
"""Call function that may optionally accept a run_manager and/or config."""
|
|
|
|
|
"""Call function that may optionally accept a run_manager and/or config.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
func (Union[Callable[[Input], Output],
|
|
|
|
|
Callable[[Input, CallbackManagerForChainRun], Output],
|
|
|
|
|
Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]]):
|
|
|
|
|
The function to call.
|
|
|
|
|
input (Input): The input to the function.
|
|
|
|
|
run_manager (CallbackManagerForChainRun): The run manager to
|
|
|
|
|
pass to the function.
|
|
|
|
|
config (RunnableConfig): The config to pass to the function.
|
|
|
|
|
**kwargs (Any): The keyword arguments to pass to the function.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Output: The output of the function.
|
|
|
|
|
"""
|
|
|
|
|
if accepts_config(func):
|
|
|
|
|
if run_manager is not None:
|
|
|
|
|
kwargs["config"] = patch_config(config, callbacks=run_manager.get_child())
|
|
|
|
@ -210,7 +274,22 @@ async def acall_func_with_variable_args(
|
|
|
|
|
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
|
|
|
|
|
**kwargs: Any,
|
|
|
|
|
) -> Output:
|
|
|
|
|
"""Call function that may optionally accept a run_manager and/or config."""
|
|
|
|
|
"""Call function that may optionally accept a run_manager and/or config.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
func (Union[Callable[[Input], Awaitable[Output]], Callable[[Input,
|
|
|
|
|
AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input,
|
|
|
|
|
AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]):
|
|
|
|
|
The function to call.
|
|
|
|
|
input (Input): The input to the function.
|
|
|
|
|
run_manager (AsyncCallbackManagerForChainRun): The run manager
|
|
|
|
|
to pass to the function.
|
|
|
|
|
config (RunnableConfig): The config to pass to the function.
|
|
|
|
|
**kwargs (Any): The keyword arguments to pass to the function.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Output: The output of the function.
|
|
|
|
|
"""
|
|
|
|
|
if accepts_config(func):
|
|
|
|
|
if run_manager is not None:
|
|
|
|
|
kwargs["config"] = patch_config(config, callbacks=run_manager.get_child())
|
|
|
|
@ -222,6 +301,14 @@ async def acall_func_with_variable_args(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_callback_manager_for_config(config: RunnableConfig) -> CallbackManager:
|
|
|
|
|
"""Get a callback manager for a config.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config (RunnableConfig): The config.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
CallbackManager: The callback manager.
|
|
|
|
|
"""
|
|
|
|
|
from langchain.callbacks.manager import CallbackManager
|
|
|
|
|
|
|
|
|
|
return CallbackManager.configure(
|
|
|
|
@ -234,6 +321,14 @@ def get_callback_manager_for_config(config: RunnableConfig) -> CallbackManager:
|
|
|
|
|
def get_async_callback_manager_for_config(
|
|
|
|
|
config: RunnableConfig,
|
|
|
|
|
) -> AsyncCallbackManager:
|
|
|
|
|
"""Get an async callback manager for a config.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config (RunnableConfig): The config.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
AsyncCallbackManager: The async callback manager.
|
|
|
|
|
"""
|
|
|
|
|
from langchain.callbacks.manager import AsyncCallbackManager
|
|
|
|
|
|
|
|
|
|
return AsyncCallbackManager.configure(
|
|
|
|
@ -245,5 +340,13 @@ def get_async_callback_manager_for_config(
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
|
def get_executor_for_config(config: RunnableConfig) -> Generator[Executor, None, None]:
|
|
|
|
|
"""Get an executor for a config.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config (RunnableConfig): The config.
|
|
|
|
|
|
|
|
|
|
Yields:
|
|
|
|
|
Generator[Executor, None, None]: The executor.
|
|
|
|
|
"""
|
|
|
|
|
with ThreadPoolExecutor(max_workers=config.get("max_concurrency")) as executor:
|
|
|
|
|
yield executor
|
|
|
|
|