diff --git a/libs/core/langchain_core/runnables/base.py b/libs/core/langchain_core/runnables/base.py index 280ecd25eb..8308768cc5 100644 --- a/libs/core/langchain_core/runnables/base.py +++ b/libs/core/langchain_core/runnables/base.py @@ -1804,7 +1804,7 @@ class RunnableSequence(RunnableSerializable[Input, Output]): # Or equivalently: # sequence = RunnableSequence(first=runnable_1, last=runnable_2) sequence.invoke(1) - await runnable.ainvoke(1) + await sequence.ainvoke(1) sequence.batch([1, 2, 3]) await sequence.abatch([1, 2, 3]) @@ -2451,9 +2451,83 @@ class RunnableSequence(RunnableSerializable[Input, Output]): class RunnableParallel(RunnableSerializable[Input, Dict[str, Any]]): - """ - A runnable that runs a mapping of runnables in parallel, - and returns a mapping of their outputs. + """A runnable that runs a mapping of runnables in parallel, and returns a mapping + of their outputs. + + RunnableParallel is one of the two main composition primitives for the LCEL, + alongside RunnableSequence. It invokes runnables concurrently, providing the same + input to each. + + A RunnableParallel can be instantiated directly or by using a dict literal within a + sequence. + + Here is a simple example that uses functions to illustrate the use of + RunnableParallel: + + .. code-block:: python + + from langchain_core.runnables import RunnableLambda + + def add_one(x: int) -> int: + return x + 1 + + def mul_two(x: int) -> int: + return x * 2 + + def mul_three(x: int) -> int: + return x * 3 + + runnable_1 = RunnableLambda(add_one) + runnable_2 = RunnableLambda(mul_two) + runnable_3 = RunnableLambda(mul_three) + + sequence = runnable_1 | { # this dict is coerced to a RunnableParallel + "mul_two": runnable_2, + "mul_three": runnable_3, + } + # Or equivalently: + # sequence = runnable_1 | RunnableParallel( + # {"mul_two": runnable_2, "mul_three": runnable_3} + # ) + # Also equivalently: + # sequence = runnable_1 | RunnableParallel( + # mul_two=runnable_2, + # mul_three=runnable_3, + # ) + + sequence.invoke(1) + await sequence.ainvoke(1) + + sequence.batch([1, 2, 3]) + await sequence.abatch([1, 2, 3]) + + RunnableParallel makes it easy to run Runnables in parallel. In the below example, + we simultaneously stream output from two different Runnables: + + .. code-block:: python + + from langchain_core.prompts import ChatPromptTemplate + from langchain_core.runnables import RunnableParallel + from langchain_openai import ChatOpenAI + + model = ChatOpenAI() + joke_chain = ( + ChatPromptTemplate.from_template("tell me a joke about {topic}") + | model + ) + poem_chain = ( + ChatPromptTemplate.from_template("write a 2-line poem about {topic}") + | model + ) + + runnable = RunnableParallel(joke=joke_chain, poem=poem_chain) + + # Display stream + output = {key: "" for key, _ in runnable.output_schema()} + for chunk in runnable.stream({"topic": "bear"}): + for key in chunk: + output[key] = output[key] + chunk[key].content + print(output) """ steps: Mapping[str, Runnable[Input, Any]]