|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
|
|
|
import enum
|
|
|
|
import enum
|
|
|
|
import threading
|
|
|
|
import threading
|
|
|
|
from abc import abstractmethod
|
|
|
|
from abc import abstractmethod
|
|
|
|
|
|
|
|
from functools import wraps
|
|
|
|
from typing import (
|
|
|
|
from typing import (
|
|
|
|
Any,
|
|
|
|
Any,
|
|
|
|
AsyncIterator,
|
|
|
|
AsyncIterator,
|
|
|
@ -26,6 +27,7 @@ from langchain_core.runnables.config import (
|
|
|
|
ensure_config,
|
|
|
|
ensure_config,
|
|
|
|
get_config_list,
|
|
|
|
get_config_list,
|
|
|
|
get_executor_for_config,
|
|
|
|
get_executor_for_config,
|
|
|
|
|
|
|
|
merge_configs,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
from langchain_core.runnables.graph import Graph
|
|
|
|
from langchain_core.runnables.graph import Graph
|
|
|
|
from langchain_core.runnables.utils import (
|
|
|
|
from langchain_core.runnables.utils import (
|
|
|
@ -46,6 +48,8 @@ class DynamicRunnable(RunnableSerializable[Input, Output]):
|
|
|
|
|
|
|
|
|
|
|
|
default: RunnableSerializable[Input, Output]
|
|
|
|
default: RunnableSerializable[Input, Output]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
config: Optional[RunnableConfig] = None
|
|
|
|
|
|
|
|
|
|
|
|
class Config:
|
|
|
|
class Config:
|
|
|
|
arbitrary_types_allowed = True
|
|
|
|
arbitrary_types_allowed = True
|
|
|
|
|
|
|
|
|
|
|
@ -69,19 +73,37 @@ class DynamicRunnable(RunnableSerializable[Input, Output]):
|
|
|
|
def get_input_schema(
|
|
|
|
def get_input_schema(
|
|
|
|
self, config: Optional[RunnableConfig] = None
|
|
|
|
self, config: Optional[RunnableConfig] = None
|
|
|
|
) -> Type[BaseModel]:
|
|
|
|
) -> Type[BaseModel]:
|
|
|
|
runnable, config = self._prepare(config)
|
|
|
|
runnable, config = self.prepare(config)
|
|
|
|
return runnable.get_input_schema(config)
|
|
|
|
return runnable.get_input_schema(config)
|
|
|
|
|
|
|
|
|
|
|
|
def get_output_schema(
|
|
|
|
def get_output_schema(
|
|
|
|
self, config: Optional[RunnableConfig] = None
|
|
|
|
self, config: Optional[RunnableConfig] = None
|
|
|
|
) -> Type[BaseModel]:
|
|
|
|
) -> Type[BaseModel]:
|
|
|
|
runnable, config = self._prepare(config)
|
|
|
|
runnable, config = self.prepare(config)
|
|
|
|
return runnable.get_output_schema(config)
|
|
|
|
return runnable.get_output_schema(config)
|
|
|
|
|
|
|
|
|
|
|
|
def get_graph(self, config: Optional[RunnableConfig] = None) -> Graph:
|
|
|
|
def get_graph(self, config: Optional[RunnableConfig] = None) -> Graph:
|
|
|
|
runnable, config = self._prepare(config)
|
|
|
|
runnable, config = self.prepare(config)
|
|
|
|
return runnable.get_graph(config)
|
|
|
|
return runnable.get_graph(config)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def with_config(
|
|
|
|
|
|
|
|
self,
|
|
|
|
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
|
|
|
|
# Sadly Unpack is not well supported by mypy so this will have to be untyped
|
|
|
|
|
|
|
|
**kwargs: Any,
|
|
|
|
|
|
|
|
) -> Runnable[Input, Output]:
|
|
|
|
|
|
|
|
return self.__class__(
|
|
|
|
|
|
|
|
**{**self.__dict__, "config": ensure_config(merge_configs(config, kwargs))} # type: ignore[arg-type]
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def prepare(
|
|
|
|
|
|
|
|
self, config: Optional[RunnableConfig] = None
|
|
|
|
|
|
|
|
) -> Tuple[Runnable[Input, Output], RunnableConfig]:
|
|
|
|
|
|
|
|
runnable: Runnable[Input, Output] = self
|
|
|
|
|
|
|
|
while isinstance(runnable, DynamicRunnable):
|
|
|
|
|
|
|
|
runnable, config = runnable._prepare(merge_configs(runnable.config, config))
|
|
|
|
|
|
|
|
return runnable, cast(RunnableConfig, config)
|
|
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
@abstractmethod
|
|
|
|
def _prepare(
|
|
|
|
def _prepare(
|
|
|
|
self, config: Optional[RunnableConfig] = None
|
|
|
|
self, config: Optional[RunnableConfig] = None
|
|
|
@ -91,13 +113,13 @@ class DynamicRunnable(RunnableSerializable[Input, Output]):
|
|
|
|
def invoke(
|
|
|
|
def invoke(
|
|
|
|
self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any
|
|
|
|
self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any
|
|
|
|
) -> Output:
|
|
|
|
) -> Output:
|
|
|
|
runnable, config = self._prepare(config)
|
|
|
|
runnable, config = self.prepare(config)
|
|
|
|
return runnable.invoke(input, config, **kwargs)
|
|
|
|
return runnable.invoke(input, config, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
async def ainvoke(
|
|
|
|
async def ainvoke(
|
|
|
|
self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any
|
|
|
|
self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any
|
|
|
|
) -> Output:
|
|
|
|
) -> Output:
|
|
|
|
runnable, config = self._prepare(config)
|
|
|
|
runnable, config = self.prepare(config)
|
|
|
|
return await runnable.ainvoke(input, config, **kwargs)
|
|
|
|
return await runnable.ainvoke(input, config, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
def batch(
|
|
|
|
def batch(
|
|
|
@ -109,7 +131,7 @@ class DynamicRunnable(RunnableSerializable[Input, Output]):
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
) -> List[Output]:
|
|
|
|
) -> List[Output]:
|
|
|
|
configs = get_config_list(config, len(inputs))
|
|
|
|
configs = get_config_list(config, len(inputs))
|
|
|
|
prepared = [self._prepare(c) for c in configs]
|
|
|
|
prepared = [self.prepare(c) for c in configs]
|
|
|
|
|
|
|
|
|
|
|
|
if all(p is self.default for p, _ in prepared):
|
|
|
|
if all(p is self.default for p, _ in prepared):
|
|
|
|
return self.default.batch(
|
|
|
|
return self.default.batch(
|
|
|
@ -151,7 +173,7 @@ class DynamicRunnable(RunnableSerializable[Input, Output]):
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
) -> List[Output]:
|
|
|
|
) -> List[Output]:
|
|
|
|
configs = get_config_list(config, len(inputs))
|
|
|
|
configs = get_config_list(config, len(inputs))
|
|
|
|
prepared = [self._prepare(c) for c in configs]
|
|
|
|
prepared = [self.prepare(c) for c in configs]
|
|
|
|
|
|
|
|
|
|
|
|
if all(p is self.default for p, _ in prepared):
|
|
|
|
if all(p is self.default for p, _ in prepared):
|
|
|
|
return await self.default.abatch(
|
|
|
|
return await self.default.abatch(
|
|
|
@ -186,7 +208,7 @@ class DynamicRunnable(RunnableSerializable[Input, Output]):
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
) -> Iterator[Output]:
|
|
|
|
) -> Iterator[Output]:
|
|
|
|
runnable, config = self._prepare(config)
|
|
|
|
runnable, config = self.prepare(config)
|
|
|
|
return runnable.stream(input, config, **kwargs)
|
|
|
|
return runnable.stream(input, config, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
async def astream(
|
|
|
|
async def astream(
|
|
|
@ -195,7 +217,7 @@ class DynamicRunnable(RunnableSerializable[Input, Output]):
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
) -> AsyncIterator[Output]:
|
|
|
|
) -> AsyncIterator[Output]:
|
|
|
|
runnable, config = self._prepare(config)
|
|
|
|
runnable, config = self.prepare(config)
|
|
|
|
async for chunk in runnable.astream(input, config, **kwargs):
|
|
|
|
async for chunk in runnable.astream(input, config, **kwargs):
|
|
|
|
yield chunk
|
|
|
|
yield chunk
|
|
|
|
|
|
|
|
|
|
|
@ -205,7 +227,7 @@ class DynamicRunnable(RunnableSerializable[Input, Output]):
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
) -> Iterator[Output]:
|
|
|
|
) -> Iterator[Output]:
|
|
|
|
runnable, config = self._prepare(config)
|
|
|
|
runnable, config = self.prepare(config)
|
|
|
|
return runnable.transform(input, config, **kwargs)
|
|
|
|
return runnable.transform(input, config, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
async def atransform(
|
|
|
|
async def atransform(
|
|
|
@ -214,10 +236,48 @@ class DynamicRunnable(RunnableSerializable[Input, Output]):
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
) -> AsyncIterator[Output]:
|
|
|
|
) -> AsyncIterator[Output]:
|
|
|
|
runnable, config = self._prepare(config)
|
|
|
|
runnable, config = self.prepare(config)
|
|
|
|
async for chunk in runnable.atransform(input, config, **kwargs):
|
|
|
|
async for chunk in runnable.atransform(input, config, **kwargs):
|
|
|
|
yield chunk
|
|
|
|
yield chunk
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __getattr__(self, name: str) -> Any:
|
|
|
|
|
|
|
|
attr = getattr(self.default, name)
|
|
|
|
|
|
|
|
if callable(attr):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@wraps(attr)
|
|
|
|
|
|
|
|
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
|
|
|
|
|
|
|
for key, arg in kwargs.items():
|
|
|
|
|
|
|
|
if key == "config" and (
|
|
|
|
|
|
|
|
isinstance(arg, dict)
|
|
|
|
|
|
|
|
and "configurable" in arg
|
|
|
|
|
|
|
|
and isinstance(arg["configurable"], dict)
|
|
|
|
|
|
|
|
):
|
|
|
|
|
|
|
|
runnable, config = self.prepare(cast(RunnableConfig, arg))
|
|
|
|
|
|
|
|
kwargs = {**kwargs, "config": config}
|
|
|
|
|
|
|
|
return getattr(runnable, name)(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for idx, arg in enumerate(args):
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
|
|
|
isinstance(arg, dict)
|
|
|
|
|
|
|
|
and "configurable" in arg
|
|
|
|
|
|
|
|
and isinstance(arg["configurable"], dict)
|
|
|
|
|
|
|
|
):
|
|
|
|
|
|
|
|
runnable, config = self.prepare(cast(RunnableConfig, arg))
|
|
|
|
|
|
|
|
argsl = list(args)
|
|
|
|
|
|
|
|
argsl[idx] = config
|
|
|
|
|
|
|
|
return getattr(runnable, name)(*argsl, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.config:
|
|
|
|
|
|
|
|
runnable, config = self.prepare()
|
|
|
|
|
|
|
|
return getattr(runnable, name)(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return attr(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
return attr
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RunnableConfigurableFields(DynamicRunnable[Input, Output]):
|
|
|
|
class RunnableConfigurableFields(DynamicRunnable[Input, Output]):
|
|
|
|
"""Runnable that can be dynamically configured.
|
|
|
|
"""Runnable that can be dynamically configured.
|
|
|
@ -291,6 +351,7 @@ class RunnableConfigurableFields(DynamicRunnable[Input, Output]):
|
|
|
|
def config_specs(self) -> List[ConfigurableFieldSpec]:
|
|
|
|
def config_specs(self) -> List[ConfigurableFieldSpec]:
|
|
|
|
return get_unique_config_specs(
|
|
|
|
return get_unique_config_specs(
|
|
|
|
[
|
|
|
|
[
|
|
|
|
|
|
|
|
(
|
|
|
|
ConfigurableFieldSpec(
|
|
|
|
ConfigurableFieldSpec(
|
|
|
|
id=spec.id,
|
|
|
|
id=spec.id,
|
|
|
|
name=spec.name,
|
|
|
|
name=spec.name,
|
|
|
@ -305,6 +366,7 @@ class RunnableConfigurableFields(DynamicRunnable[Input, Output]):
|
|
|
|
else make_options_spec(
|
|
|
|
else make_options_spec(
|
|
|
|
spec, self.default.__fields__[field_name].field_info.description
|
|
|
|
spec, self.default.__fields__[field_name].field_info.description
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
)
|
|
|
|
for field_name, spec in self.fields.items()
|
|
|
|
for field_name, spec in self.fields.items()
|
|
|
|
]
|
|
|
|
]
|
|
|
|
+ list(self.default.config_specs)
|
|
|
|
+ list(self.default.config_specs)
|
|
|
@ -488,9 +550,11 @@ class RunnableConfigurableAlternatives(DynamicRunnable[Input, Output]):
|
|
|
|
)
|
|
|
|
)
|
|
|
|
# config specs of the alternatives
|
|
|
|
# config specs of the alternatives
|
|
|
|
+ [
|
|
|
|
+ [
|
|
|
|
|
|
|
|
(
|
|
|
|
prefix_config_spec(s, f"{self.which.id}=={alt_key}")
|
|
|
|
prefix_config_spec(s, f"{self.which.id}=={alt_key}")
|
|
|
|
if self.prefix_keys
|
|
|
|
if self.prefix_keys
|
|
|
|
else s
|
|
|
|
else s
|
|
|
|
|
|
|
|
)
|
|
|
|
for alt_key, alt in self.alternatives.items()
|
|
|
|
for alt_key, alt in self.alternatives.items()
|
|
|
|
if isinstance(alt, RunnableSerializable)
|
|
|
|
if isinstance(alt, RunnableSerializable)
|
|
|
|
for s in alt.config_specs
|
|
|
|
for s in alt.config_specs
|
|
|
|