|
|
|
@ -1946,8 +1946,41 @@ class RunnableGenerator(Runnable[Input, Output]):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RunnableLambda(Runnable[Input, Output]):
|
|
|
|
|
"""
|
|
|
|
|
A runnable that runs a callable.
|
|
|
|
|
"""RunnableLambda converts a python callable into a Runnable.
|
|
|
|
|
|
|
|
|
|
Wrapping a callable in a RunnableLambda makes the callable usable
|
|
|
|
|
within either a sync or async context.
|
|
|
|
|
|
|
|
|
|
RunnableLambda can be composed as any other Runnable and provides
|
|
|
|
|
seamless integration with LangChain tracing.
|
|
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
|
|
# This is a RunnableLambda
|
|
|
|
|
from langchain.schema.runnable import RunnableLambda
|
|
|
|
|
|
|
|
|
|
def add_one(x: int) -> int:
|
|
|
|
|
return x + 1
|
|
|
|
|
|
|
|
|
|
runnable = RunnableLambda(add_one)
|
|
|
|
|
|
|
|
|
|
runnable.invoke(1) # returns 2
|
|
|
|
|
runnable.batch([1, 2, 3]) # returns [2, 3, 4]
|
|
|
|
|
|
|
|
|
|
# Async is supported by default by delegating to the sync implementation
|
|
|
|
|
await runnable.ainvoke(1) # returns 2
|
|
|
|
|
await runnable.abatch([1, 2, 3]) # returns [2, 3, 4]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Alternatively, can provide both synd and sync implementations
|
|
|
|
|
async def add_one_async(x: int) -> int:
|
|
|
|
|
return x + 1
|
|
|
|
|
|
|
|
|
|
runnable = RunnableLambda(add_one, afunc=add_one_async)
|
|
|
|
|
runnable.invoke(1) # Uses add_one
|
|
|
|
|
await runnable.ainvoke(1) # Uses add_one_async
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
@ -1955,10 +1988,25 @@ class RunnableLambda(Runnable[Input, Output]):
|
|
|
|
|
func: Union[Callable[[Input], Output], Callable[[Input], Awaitable[Output]]],
|
|
|
|
|
afunc: Optional[Callable[[Input], Awaitable[Output]]] = None,
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Create a RunnableLambda from a callable, and async callable or both.
|
|
|
|
|
|
|
|
|
|
Accepts both sync and async variants to allow providing efficient
|
|
|
|
|
implementations for sync and async execution.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
func: Either sync or async callable
|
|
|
|
|
afunc: An async callable that takes an input and returns an output.
|
|
|
|
|
"""
|
|
|
|
|
if afunc is not None:
|
|
|
|
|
self.afunc = afunc
|
|
|
|
|
|
|
|
|
|
if inspect.iscoroutinefunction(func):
|
|
|
|
|
if afunc is not None:
|
|
|
|
|
raise TypeError(
|
|
|
|
|
"Func was provided as a coroutine function, but afunc was "
|
|
|
|
|
"also provided. If providing both, func should be a regular "
|
|
|
|
|
"function to avoid ambiguity."
|
|
|
|
|
)
|
|
|
|
|
self.afunc = func
|
|
|
|
|
elif callable(func):
|
|
|
|
|
self.func = cast(Callable[[Input], Output], func)
|
|
|
|
@ -1970,6 +2018,7 @@ class RunnableLambda(Runnable[Input, Output]):
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def InputType(self) -> Any:
|
|
|
|
|
"""The type of the input to this runnable."""
|
|
|
|
|
func = getattr(self, "func", None) or getattr(self, "afunc")
|
|
|
|
|
try:
|
|
|
|
|
params = inspect.signature(func).parameters
|
|
|
|
@ -1983,6 +2032,7 @@ class RunnableLambda(Runnable[Input, Output]):
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def input_schema(self) -> Type[BaseModel]:
|
|
|
|
|
"""The pydantic schema for the input to this runnable."""
|
|
|
|
|
func = getattr(self, "func", None) or getattr(self, "afunc")
|
|
|
|
|
|
|
|
|
|
if isinstance(func, itemgetter):
|
|
|
|
@ -2010,6 +2060,7 @@ class RunnableLambda(Runnable[Input, Output]):
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def OutputType(self) -> Any:
|
|
|
|
|
"""The type of the output of this runnable as a type annotation."""
|
|
|
|
|
func = getattr(self, "func", None) or getattr(self, "afunc")
|
|
|
|
|
try:
|
|
|
|
|
sig = inspect.signature(func)
|
|
|
|
@ -2033,6 +2084,7 @@ class RunnableLambda(Runnable[Input, Output]):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
"""A string representation of this runnable."""
|
|
|
|
|
return f"RunnableLambda({get_lambda_source(self.func) or '...'})"
|
|
|
|
|
|
|
|
|
|
def _invoke(
|
|
|
|
@ -2106,6 +2158,7 @@ class RunnableLambda(Runnable[Input, Output]):
|
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
|
) -> Output:
|
|
|
|
|
"""Invoke this runnable synchronously."""
|
|
|
|
|
if hasattr(self, "func"):
|
|
|
|
|
return self._call_with_config(
|
|
|
|
|
self._invoke,
|
|
|
|
@ -2124,6 +2177,7 @@ class RunnableLambda(Runnable[Input, Output]):
|
|
|
|
|
config: Optional[RunnableConfig] = None,
|
|
|
|
|
**kwargs: Optional[Any],
|
|
|
|
|
) -> Output:
|
|
|
|
|
"""Invoke this runnable asynchronously."""
|
|
|
|
|
if hasattr(self, "afunc"):
|
|
|
|
|
return await self._acall_with_config(
|
|
|
|
|
self._ainvoke,
|
|
|
|
|