@ -82,18 +82,22 @@ Other = TypeVar("Other")
class Runnable ( Generic [ Input , Output ] , ABC ) :
""" A unit of work that can be invoked, batched, streamed, transformed and composed.
Key methods :
Key Methods
== == == == == =
* invoke / ainvoke : Transforms a single input into an output .
* batch / abatch : Efficiently transforms multiple inputs into outputs .
* stream / astream : Streams output from a single input as it ' s produced.
* astream_log : Streams output and selected intermediate results from an input .
Batch : By default , batch runs invoke ( ) in parallel using a thread pool executor .
Override to optimize batching .
Built - in optimizations :
Async : Methods with " a " suffix are asynchronous . By default , they execute
the sync counterpart using asyncio ' s thread pool. Override for native async.
* Batch : By default , batch runs invoke ( ) in parallel using a thread pool executor .
Override to optimize batching .
* Async : Methods with " a " suffix are asynchronous . By default , they execute
the sync counterpart using asyncio ' s thread pool.
Override for native async .
All methods accept an optional config argument , which can be used to configure
execution , add tags and metadata for tracing and debugging etc .
@ -101,6 +105,9 @@ class Runnable(Generic[Input, Output], ABC):
Runnables expose schematic information about their input , output and config via
the input_schema property , the output_schema property and config_schema method .
LCEL and Composition
== == == == == == == == == ==
The LangChain Expression Language ( LCEL ) is a declarative way to compose Runnables
into chains . Any chain constructed this way will automatically have sync , async ,
batch , and streaming support .
@ -115,6 +122,7 @@ class Runnable(Generic[Input, Output], ABC):
to each . Construct it using a dict literal within a sequence or by passing a
dict to RunnableParallel .
For example ,
. . code - block : : python
@ -133,6 +141,72 @@ class Runnable(Generic[Input, Output], ABC):
' mul_5 ' : RunnableLambda ( lambda x : x * 5 )
}
sequence . invoke ( 1 ) # {'mul_2': 4, 'mul_5': 10}
Standard Methods
== == == == == == == ==
All Runnables expose additional methods that can be used to modify their behavior
( e . g . , add a retry policy , add lifecycle listeners , make them configurable , etc . ) .
These methods will work on any Runnable , including Runnable chains constructed
by composing other Runnables . See the individual methods for details .
For example ,
. . code - block : : python
from langchain . schema . runnable import RunnableLambda
import random
def add_one ( x : int ) - > int :
return x + 1
def buggy_double ( y : int ) - > int :
''' Buggy code that will fail 70 % o f the time '''
if random . random ( ) > 0.3 :
print ( ' This code failed, and will probably be retried! ' )
raise ValueError ( ' Triggered buggy code ' )
return y * 2
sequence = (
RunnableLambda ( add_one ) |
RunnableLambda ( buggy_double ) . with_retry ( # Retry on failure
stop_after_attempt = 10 ,
wait_exponential_jitter = False
)
)
print ( sequence . input_schema . schema ( ) ) # Show inferred input schema
print ( sequence . output_schema . schema ( ) ) # Show inferred output schema
print ( sequence . invoke ( 2 ) ) # invoke the sequence (note the retry above!!)
Debugging and tracing
== == == == == == == == == == =
As the chains get longer , it can be useful to be able to see intermediate results
to debug and trace the chain .
You can set the global debug flag to True to enable debug output for all chains :
. . code - block : : python
from langchain . globals import set_debug
set_debug ( True )
Alternatively , you can pass existing or custom callbacks to any given chain :
. . . code - block : : python
from langchain . callbacks . tracers import ConsoleCallbackHandler
chain . invoke (
. . . ,
config = { ' callbacks ' : [ ConsoleCallbackHandler ( ) ] }
)
For a UI ( and much more ) checkout LangSmith : https : / / docs . smith . langchain . com /
"""
@property
@ -169,7 +243,20 @@ class Runnable(Generic[Input, Output], ABC):
def get_input_schema (
self , config : Optional [ RunnableConfig ] = None
) - > Type [ BaseModel ] :
""" The type of input this runnable accepts specified as a pydantic model. """
""" Get a pydantic model that can be used to validate input to the runnable.
Runnables that leverage the configurable_fields and configurable_alternatives
methods will have a dynamic input schema that depends on which
configuration the runnable is invoked with .
This method allows to get an input schema for a specific configuration .
Args :
config : A config to use when generating the schema .
Returns :
A pydantic model that can be used to validate input .
"""
root_type = self . InputType
if inspect . isclass ( root_type ) and issubclass ( root_type , BaseModel ) :
@ -187,7 +274,20 @@ class Runnable(Generic[Input, Output], ABC):
def get_output_schema (
self , config : Optional [ RunnableConfig ] = None
) - > Type [ BaseModel ] :
""" The type of output this runnable produces specified as a pydantic model. """
""" Get a pydantic model that can be used to validate output to the runnable.
Runnables that leverage the configurable_fields and configurable_alternatives
methods will have a dynamic output schema that depends on which
configuration the runnable is invoked with .
This method allows to get an output schema for a specific configuration .
Args :
config : A config to use when generating the schema .
Returns :
A pydantic model that can be used to validate output .
"""
root_type = self . OutputType
if inspect . isclass ( root_type ) and issubclass ( root_type , BaseModel ) :
@ -278,14 +378,28 @@ class Runnable(Generic[Input, Output], ABC):
@abstractmethod
def invoke ( self , input : Input , config : Optional [ RunnableConfig ] = None ) - > Output :
""" Transform a single input into an output. Override to implement. """
. . .
""" Transform a single input into an output. Override to implement.
Args :
input : The input to the runnable .
config : A config to use when invoking the runnable .
The config supports standard keys like ' tags ' , ' metadata ' for tracing
purposes , ' max_concurrency ' for controlling how much work to do
in parallel , and other keys . Please refer to the RunnableConfig
for more details .
Returns :
The output of the runnable .
"""
async def ainvoke (
self , input : Input , config : Optional [ RunnableConfig ] = None , * * kwargs : Any
) - > Output :
"""
Default implementation of ainvoke , which calls invoke in a thread pool .
""" Default implementation of ainvoke, calls invoke from a thread.
The default implementation allows usage of async code even if
the runnable did not implement a native async version of invoke .
Subclasses should override this method if they can run asynchronously .
"""
return await asyncio . get_running_loop ( ) . run_in_executor (
@ -300,9 +414,12 @@ class Runnable(Generic[Input, Output], ABC):
return_exceptions : bool = False ,
* * kwargs : Optional [ Any ] ,
) - > List [ Output ] :
"""
Default implementation of batch , which calls invoke N times .
Subclasses should override this method if they can batch more efficiently .
""" Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables .
Subclasses should override this method if they can batch more efficiently ;
e . g . , if the underlying runnable uses an API which supports a batch mode .
"""
if not inputs :
return [ ]
@ -333,9 +450,12 @@ class Runnable(Generic[Input, Output], ABC):
return_exceptions : bool = False ,
* * kwargs : Optional [ Any ] ,
) - > List [ Output ] :
"""
Default implementation of abatch , which calls ainvoke N times .
Subclasses should override this method if they can batch more efficiently .
""" Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables .
Subclasses should override this method if they can batch more efficiently ;
e . g . , if the underlying runnable uses an API which supports a batch mode .
"""
if not inputs :
return [ ]
@ -682,6 +802,16 @@ class Runnable(Generic[Input, Output], ABC):
* ,
exceptions_to_handle : Tuple [ Type [ BaseException ] , . . . ] = ( Exception , ) ,
) - > RunnableWithFallbacksT [ Input , Output ] :
""" Add fallbacks to a runnable, returning a new Runnable.
Args :
fallbacks : A sequence of runnables to try if the original runnable fails .
exceptions_to_handle : A tuple of exception types to handle .
Returns :
A new Runnable that will try the original runnable , and then each
fallback in order , upon failures .
"""
from langchain . schema . runnable . fallbacks import RunnableWithFallbacks
return RunnableWithFallbacks (