You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

261 lines
6.1 KiB

"cells": [
"cell_type": "markdown",
"metadata": {},
"source": [
"# Stream custom generator functions\n",
"You can use generator functions (ie. functions that use the `yield` keyword, and behave like iterators) in a LCEL pipeline.\n",
"The signature of these generators should be `Iterator[Input] -> Iterator[Output]`. Or for async generators: `AsyncIterator[Input] -> AsyncIterator[Output]`.\n",
"These are useful for:\n",
"- implementing a custom output parser\n",
"- modifying the output of a previous step, while preserving streaming capabilities\n",
"Let's implement a custom output parser for comma-separated lists."
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sync version"
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from typing import Iterator, List\n",
"from import ChatPromptTemplate\n",
"from langchain_core.output_parsers import StrOutputParser\n",
"from langchain_openai import ChatOpenAI\n",
"prompt = ChatPromptTemplate.from_template(\n",
" \"Write a comma-separated list of 5 animals similar to: {animal}\"\n",
"model = ChatOpenAI(temperature=0.0)\n",
"str_chain = prompt | model | StrOutputParser()"
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"lion, tiger, wolf, gorilla, panda"
"source": [
"for chunk in{\"animal\": \"bear\"}):\n",
" print(chunk, end=\"\", flush=True)"
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"'lion, tiger, wolf, gorilla, panda'"
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
"source": [
"str_chain.invoke({\"animal\": \"bear\"})"
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# This is a custom parser that splits an iterator of llm tokens\n",
"# into a list of strings separated by commas\n",
"def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:\n",
" # hold partial input until we get a comma\n",
" buffer = \"\"\n",
" for chunk in input:\n",
" # add current chunk to buffer\n",
" buffer += chunk\n",
" # while there are commas in the buffer\n",
" while \",\" in buffer:\n",
" # split buffer on comma\n",
" comma_index = buffer.index(\",\")\n",
" # yield everything before the comma\n",
" yield [buffer[:comma_index].strip()]\n",
" # save the rest for the next iteration\n",
" buffer = buffer[comma_index + 1 :]\n",
" # yield the last chunk\n",
" yield [buffer.strip()]"
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"list_chain = str_chain | split_into_list"
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"source": [
"for chunk in{\"animal\": \"bear\"}):\n",
" print(chunk, flush=True)"
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"['lion', 'tiger', 'wolf', 'gorilla', 'panda']"
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
"source": [
"list_chain.invoke({\"animal\": \"bear\"})"
"cell_type": "markdown",
"metadata": {},
"source": [
"## Async version"
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"from typing import AsyncIterator\n",
"async def asplit_into_list(\n",
" input: AsyncIterator[str],\n",
") -> AsyncIterator[List[str]]: # async def\n",
" buffer = \"\"\n",
" async for (\n",
" chunk\n",
" ) in input: # `input` is a `async_generator` object, so use `async for`\n",
" buffer += chunk\n",
" while \",\" in buffer:\n",
" comma_index = buffer.index(\",\")\n",
" yield [buffer[:comma_index].strip()]\n",
" buffer = buffer[comma_index + 1 :]\n",
" yield [buffer.strip()]\n",
"list_chain = str_chain | asplit_into_list"
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"source": [
"async for chunk in list_chain.astream({\"animal\": \"bear\"}):\n",
" print(chunk, flush=True)"
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
"data": {
"text/plain": [
"['lion', 'tiger', 'wolf', 'gorilla', 'panda']"
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
"source": [
"await list_chain.ainvoke({\"animal\": \"bear\"})"
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
"nbformat": 4,
"nbformat_minor": 4