diff --git a/libs/core/langchain_core/runnables/base.py b/libs/core/langchain_core/runnables/base.py index 8308768cc5..a5450de381 100644 --- a/libs/core/langchain_core/runnables/base.py +++ b/libs/core/langchain_core/runnables/base.py @@ -2882,8 +2882,88 @@ RunnableMap = RunnableParallel class RunnableGenerator(Runnable[Input, Output]): - """ - A runnable that runs a generator function. + """A runnable that runs a generator function. + + RunnableGenerators can be instantiated directly or by using a generator within + a sequence. + + RunnableGenerators can be used to implement custom behavior, such as custom output + parsers, while preserving streaming capabilities. Given a generator function with + a signature Iterator[A] -> Iterator[B], wrapping it in a RunnableGenerator allows + it to emit output chunks as soon as they are streamed in from the previous step. + + Note that if a generator function has a signature A -> Iterator[B], such that it + requires its input from the previous step to be completed before emitting chunks + (e.g., most LLMs need the entire prompt available to start generating), it can + instead be wrapped in a RunnableLambda. + + Here is an example to show the basic mechanics of a RunnableGenerator: + + .. code-block:: python + + from typing import Any, AsyncIterator, Iterator + + from langchain_core.runnables import RunnableGenerator + + + def gen(input: Iterator[Any]) -> Iterator[str]: + for token in ["Have", " a", " nice", " day"]: + yield token + + + runnable = RunnableGenerator(gen) + runnable.invoke(None) # "Have a nice day" + list(runnable.stream(None)) # ["Have", " a", " nice", " day"] + runnable.batch([None, None]) # ["Have a nice day", "Have a nice day"] + + + # Async version: + async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]: + for token in ["Have", " a", " nice", " day"]: + yield token + + runnable = RunnableGenerator(agen) + await runnable.ainvoke(None) # "Have a nice day" + [p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"] + + RunnableGenerator makes it easy to implement custom behavior within a streaming + context. Below we show an example: + .. code-block:: python + + from langchain_core.prompts import ChatPromptTemplate + from langchain_core.runnables import RunnableGenerator, RunnableLambda + from langchain_openai import ChatOpenAI + from langchain.schema import StrOutputParser + + + model = ChatOpenAI() + chant_chain = ( + ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}") + | model + | StrOutputParser() + ) + + def character_generator(input: Iterator[str]) -> Iterator[str]: + for token in input: + if "," in token or "." in token: + yield "👏" + token + else: + yield token + + + runnable = chant_chain | character_generator + assert type(runnable.last) is RunnableGenerator + "".join(runnable.stream({"topic": "waste"})) # Reduce👏, Reuse👏, Recycle👏. + + # Note that RunnableLambda can be used to delay streaming of one step in a + # sequence until the previous step is finished: + def reverse_generator(input: str) -> Iterator[str]: + # Yield characters of input in reverse order. + for character in input[::-1]: + yield character + + runnable = chant_chain | RunnableLambda(reverse_generator) + "".join(runnable.stream({"topic": "waste"})) # ".elcycer ,esuer ,ecudeR" """ def __init__(