core: expand docstring for RunnableGenerator (#16672)

- **Description:** expand docstring for RunnableGenerator
  - **Issue:** https://github.com/langchain-ai/langchain/issues/16631
This commit is contained in:
ccurme 2024-01-28 19:47:08 -05:00 committed by GitHub
parent 0866a984fe
commit ec0ae23645
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2882,8 +2882,88 @@ RunnableMap = RunnableParallel
class RunnableGenerator(Runnable[Input, Output]):
"""
A runnable that runs a generator function.
"""A runnable that runs a generator function.
RunnableGenerators can be instantiated directly or by using a generator within
a sequence.
RunnableGenerators can be used to implement custom behavior, such as custom output
parsers, while preserving streaming capabilities. Given a generator function with
a signature Iterator[A] -> Iterator[B], wrapping it in a RunnableGenerator allows
it to emit output chunks as soon as they are streamed in from the previous step.
Note that if a generator function has a signature A -> Iterator[B], such that it
requires its input from the previous step to be completed before emitting chunks
(e.g., most LLMs need the entire prompt available to start generating), it can
instead be wrapped in a RunnableLambda.
Here is an example to show the basic mechanics of a RunnableGenerator:
.. code-block:: python
from typing import Any, AsyncIterator, Iterator
from langchain_core.runnables import RunnableGenerator
def gen(input: Iterator[Any]) -> Iterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(gen)
runnable.invoke(None) # "Have a nice day"
list(runnable.stream(None)) # ["Have", " a", " nice", " day"]
runnable.batch([None, None]) # ["Have a nice day", "Have a nice day"]
# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(agen)
await runnable.ainvoke(None) # "Have a nice day"
[p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]
RunnableGenerator makes it easy to implement custom behavior within a streaming
context. Below we show an example:
.. code-block:: python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableGenerator, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain.schema import StrOutputParser
model = ChatOpenAI()
chant_chain = (
ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
| model
| StrOutputParser()
)
def character_generator(input: Iterator[str]) -> Iterator[str]:
for token in input:
if "," in token or "." in token:
yield "👏" + token
else:
yield token
runnable = chant_chain | character_generator
assert type(runnable.last) is RunnableGenerator
"".join(runnable.stream({"topic": "waste"})) # Reduce👏, Reuse👏, Recycle👏.
# Note that RunnableLambda can be used to delay streaming of one step in a
# sequence until the previous step is finished:
def reverse_generator(input: str) -> Iterator[str]:
# Yield characters of input in reverse order.
for character in input[::-1]:
yield character
runnable = chant_chain | RunnableLambda(reverse_generator)
"".join(runnable.stream({"topic": "waste"})) # ".elcycer ,esuer ,ecudeR"
"""
def __init__(