|
|
|
@ -77,11 +77,64 @@ Other = TypeVar("Other")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Runnable(Generic[Input, Output], ABC):
|
|
|
|
|
"""A Runnable is a unit of work that can be invoked, batched, streamed, or
|
|
|
|
|
transformed."""
|
|
|
|
|
"""A unit of work that can be invoked, batched, streamed, transformed and composed.
|
|
|
|
|
|
|
|
|
|
Key methods:
|
|
|
|
|
|
|
|
|
|
* invoke/ainvoke: Transforms a single input into an output.
|
|
|
|
|
* batch/abatch: Efficiently transforms multiple inputs into outputs.
|
|
|
|
|
* stream/astream: Streams output from a single input as it's produced.
|
|
|
|
|
* astream_log: Streams output and selected intermediate results from an input.
|
|
|
|
|
|
|
|
|
|
Batch: By default, batch runs invoke() in parallel using a thread pool executor.
|
|
|
|
|
Override to optimize batching.
|
|
|
|
|
|
|
|
|
|
Async: Methods with "a" suffix are asynchronous. By default, they execute
|
|
|
|
|
the sync counterpart using asyncio's thread pool. Override for native async.
|
|
|
|
|
|
|
|
|
|
All methods accept an optional config argument, which can be used to configure
|
|
|
|
|
execution, add tags and metadata for tracing and debugging etc.
|
|
|
|
|
|
|
|
|
|
Runnables expose schematic information about their input, output and config via
|
|
|
|
|
the input_schema property, the output_schema property and config_schema method.
|
|
|
|
|
|
|
|
|
|
The LangChain Expression Language (LCEL) is a declarative way to compose Runnables
|
|
|
|
|
into chains. Any chain constructed this way will automatically have sync, async,
|
|
|
|
|
batch, and streaming support.
|
|
|
|
|
|
|
|
|
|
The main composition primitives are RunnableSequence and RunnableParallel.
|
|
|
|
|
|
|
|
|
|
RunnableSequence invokes a series of runnables sequentially, with one runnable's
|
|
|
|
|
output serving as the next's input. Construct using the `|` operator or by
|
|
|
|
|
passing a list of runnables to RunnableSequence.
|
|
|
|
|
|
|
|
|
|
RunnableParallel invokes runnables concurrently, providing the same input
|
|
|
|
|
to each. Construct it using a dict literal within a sequence or by passing a
|
|
|
|
|
dict to RunnableParallel.
|
|
|
|
|
|
|
|
|
|
For example,
|
|
|
|
|
|
|
|
|
|
..code-block:: python
|
|
|
|
|
|
|
|
|
|
from langchain.schema.runnable import RunnableLambda
|
|
|
|
|
|
|
|
|
|
# A RunnableSequence constructed using the `|` operator
|
|
|
|
|
sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
|
|
|
|
|
sequence.invoke(1) # 4
|
|
|
|
|
sequence.batch([1, 2, 3]) # [4, 6, 8]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# A sequence that contains a RunnableParallel constructed using a dict literal
|
|
|
|
|
sequence = RunnableLambda(lambda x: x + 1) | {
|
|
|
|
|
'mul_2': RunnableLambda(lambda x: x * 2),
|
|
|
|
|
'mul_5': RunnableLambda(lambda x: x * 5)
|
|
|
|
|
}
|
|
|
|
|
sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def InputType(self) -> Type[Input]:
|
|
|
|
|
"""The type of input this runnable accepts specified as a type annotation."""
|
|
|
|
|
for cls in self.__class__.__orig_bases__: # type: ignore[attr-defined]
|
|
|
|
|
type_args = get_args(cls)
|
|
|
|
|
if type_args and len(type_args) == 2:
|
|
|
|
@ -94,6 +147,7 @@ class Runnable(Generic[Input, Output], ABC):
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def OutputType(self) -> Type[Output]:
|
|
|
|
|
"""The type of output this runnable produces specified as a type annotation."""
|
|
|
|
|
for cls in self.__class__.__orig_bases__: # type: ignore[attr-defined]
|
|
|
|
|
type_args = get_args(cls)
|
|
|
|
|
if type_args and len(type_args) == 2:
|
|
|
|
@ -106,6 +160,7 @@ class Runnable(Generic[Input, Output], ABC):
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def input_schema(self) -> Type[BaseModel]:
|
|
|
|
|
"""The type of input this runnable accepts specified as a pydantic model."""
|
|
|
|
|
root_type = self.InputType
|
|
|
|
|
|
|
|
|
|
if inspect.isclass(root_type) and issubclass(root_type, BaseModel):
|
|
|
|
@ -117,6 +172,7 @@ class Runnable(Generic[Input, Output], ABC):
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def output_schema(self) -> Type[BaseModel]:
|
|
|
|
|
"""The type of output this runnable produces specified as a pydantic model."""
|
|
|
|
|
root_type = self.OutputType
|
|
|
|
|
|
|
|
|
|
if inspect.isclass(root_type) and issubclass(root_type, BaseModel):
|
|
|
|
@ -128,9 +184,22 @@ class Runnable(Generic[Input, Output], ABC):
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def config_specs(self) -> Sequence[ConfigurableFieldSpec]:
|
|
|
|
|
"""List configurable fields for this runnable."""
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
def config_schema(self, *, include: Sequence[str]) -> Type[BaseModel]:
|
|
|
|
|
"""The type of config this runnable accepts specified as a pydantic model.
|
|
|
|
|
|
|
|
|
|
To mark a field as configurable, see the `configurable_fields`
|
|
|
|
|
and `configurable_alternatives` methods.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
include: A list of fields to include in the config schema.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
A pydantic model that can be used to validate config.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
class _Config:
|
|
|
|
|
arbitrary_types_allowed = True
|
|
|
|
|
|
|
|
|
@ -173,6 +242,7 @@ class Runnable(Generic[Input, Output], ABC):
|
|
|
|
|
Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],
|
|
|
|
|
],
|
|
|
|
|
) -> RunnableSequence[Input, Other]:
|
|
|
|
|
"""Compose this runnable with another object to create a RunnableSequence."""
|
|
|
|
|
return RunnableSequence(first=self, last=coerce_to_runnable(other))
|
|
|
|
|
|
|
|
|
|
def __ror__(
|
|
|
|
@ -184,12 +254,14 @@ class Runnable(Generic[Input, Output], ABC):
|
|
|
|
|
Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]],
|
|
|
|
|
],
|
|
|
|
|
) -> RunnableSequence[Other, Output]:
|
|
|
|
|
"""Compose this runnable with another object to create a RunnableSequence."""
|
|
|
|
|
return RunnableSequence(first=coerce_to_runnable(other), last=self)
|
|
|
|
|
|
|
|
|
|
""" --- Public API --- """
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
|
|
|
|
|
"""Transform a single input into an output. Override to implement."""
|
|
|
|
|
...
|
|
|
|
|
|
|
|
|
|
async def ainvoke(
|
|
|
|
|