diff --git a/docs/extras/expression_language/interface.ipynb b/docs/extras/expression_language/interface.ipynb index 47a3efaf82..418bc86f60 100644 --- a/docs/extras/expression_language/interface.ipynb +++ b/docs/extras/expression_language/interface.ipynb @@ -8,7 +8,7 @@ "---\n", "sidebar_position: 0\n", "title: Interface\n", - "---" + "---\n" ] }, { @@ -18,15 +18,16 @@ "source": [ "In an effort to make it as easy as possible to create custom chains, we've implemented a [\"Runnable\"](https://api.python.langchain.com/en/latest/schema/langchain.schema.runnable.Runnable.html#langchain.schema.runnable.Runnable) protocol that most components implement. This is a standard interface with a few different methods, which makes it easy to define custom chains as well as making it possible to invoke them in a standard way. The standard interface exposed includes:\n", "\n", - "- `stream`: stream back chunks of the response\n", - "- `invoke`: call the chain on an input\n", - "- `batch`: call the chain on a list of inputs\n", + "- [`stream`](#stream): stream back chunks of the response\n", + "- [`invoke`](#invoke): call the chain on an input\n", + "- [`batch`](#batch): call the chain on a list of inputs\n", "\n", "These also have corresponding async methods:\n", "\n", - "- `astream`: stream back chunks of the response async\n", - "- `ainvoke`: call the chain on an input async\n", - "- `abatch`: call the chain on a list of inputs async\n", + "- [`astream`](#async-stream): stream back chunks of the response async\n", + "- [`ainvoke`](#async-invoke): call the chain on an input async\n", + "- [`abatch`](#async-batch): call the chain on a list of inputs async\n", + "- [`astream_log`](#async-stream-intermediate-steps): stream back intermediate steps as they happen, in addition to the final response\n", "\n", "The type of the input varies by component:\n", "\n", @@ -49,6 +50,10 @@ "| Tool | Depends on the tool |\n", "| OutputParser | Depends on the parser |\n", "\n", + "All runnables expose properties to inspect the input and output types:\n", + "- [`input_schema`](#input-schema): an input Pydantic model auto-generated from the structure of the Runnable\n", + "- [`output_schema`](#output-schema): an output Pydantic model auto-generated from the structure of the Runnable\n", + "\n", "Let's take a look at these methods! To do so, we'll create a super simple PromptTemplate + ChatModel chain." ] }, @@ -60,7 +65,7 @@ "outputs": [], "source": [ "from langchain.prompts import ChatPromptTemplate\n", - "from langchain.chat_models import ChatOpenAI" + "from langchain.chat_models import ChatOpenAI\n" ] }, { @@ -70,7 +75,7 @@ "metadata": {}, "outputs": [], "source": [ - "model = ChatOpenAI()" + "model = ChatOpenAI()\n" ] }, { @@ -80,7 +85,7 @@ "metadata": {}, "outputs": [], "source": [ - "prompt = ChatPromptTemplate.from_template(\"tell me a joke about {topic}\")" + "prompt = ChatPromptTemplate.from_template(\"tell me a joke about {topic}\")\n" ] }, { @@ -90,7 +95,156 @@ "metadata": {}, "outputs": [], "source": [ - "chain = prompt | model" + "chain = prompt | model\n" + ] + }, + { + "cell_type": "markdown", + "id": "5cccdf0b-2d89-4f74-9530-bf499610e9a5", + "metadata": {}, + "source": [ + "## Input Schema\n", + "\n", + "A description of the inputs accepted by a Runnable.\n", + "This is a Pydantic model dynamically generated from the structure of any Runnable.\n", + "You can call `.schema()` on it to obtain a JSONSchema representation." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "25e146d4-60da-40a2-9026-b5dfee106a3f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'title': 'PromptInput',\n", + " 'type': 'object',\n", + " 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# The input schema of the chain is the input schema of its first part, the prompt.\n", + "chain.input_schema.schema()" + ] + }, + { + "cell_type": "markdown", + "id": "5059a5dc-d544-4add-85bd-78a3f2b78b9a", + "metadata": {}, + "source": [ + "## Output Schema\n", + "\n", + "A description of the outputs produced by a Runnable.\n", + "This is a Pydantic model dynamically generated from the structure of any Runnable.\n", + "You can call `.schema()` on it to obtain a JSONSchema representation." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "a0e41fd3-77d8-4911-af6a-d4d3aad5f77b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'title': 'ChatOpenAIOutput',\n", + " 'anyOf': [{'$ref': '#/definitions/HumanMessageChunk'},\n", + " {'$ref': '#/definitions/AIMessageChunk'},\n", + " {'$ref': '#/definitions/ChatMessageChunk'},\n", + " {'$ref': '#/definitions/FunctionMessageChunk'},\n", + " {'$ref': '#/definitions/SystemMessageChunk'}],\n", + " 'definitions': {'HumanMessageChunk': {'title': 'HumanMessageChunk',\n", + " 'description': 'A Human Message chunk.',\n", + " 'type': 'object',\n", + " 'properties': {'content': {'title': 'Content', 'type': 'string'},\n", + " 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},\n", + " 'type': {'title': 'Type',\n", + " 'default': 'human',\n", + " 'enum': ['human'],\n", + " 'type': 'string'},\n", + " 'example': {'title': 'Example', 'default': False, 'type': 'boolean'},\n", + " 'is_chunk': {'title': 'Is Chunk',\n", + " 'default': True,\n", + " 'enum': [True],\n", + " 'type': 'boolean'}},\n", + " 'required': ['content']},\n", + " 'AIMessageChunk': {'title': 'AIMessageChunk',\n", + " 'description': 'A Message chunk from an AI.',\n", + " 'type': 'object',\n", + " 'properties': {'content': {'title': 'Content', 'type': 'string'},\n", + " 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},\n", + " 'type': {'title': 'Type',\n", + " 'default': 'ai',\n", + " 'enum': ['ai'],\n", + " 'type': 'string'},\n", + " 'example': {'title': 'Example', 'default': False, 'type': 'boolean'},\n", + " 'is_chunk': {'title': 'Is Chunk',\n", + " 'default': True,\n", + " 'enum': [True],\n", + " 'type': 'boolean'}},\n", + " 'required': ['content']},\n", + " 'ChatMessageChunk': {'title': 'ChatMessageChunk',\n", + " 'description': 'A Chat Message chunk.',\n", + " 'type': 'object',\n", + " 'properties': {'content': {'title': 'Content', 'type': 'string'},\n", + " 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},\n", + " 'type': {'title': 'Type',\n", + " 'default': 'chat',\n", + " 'enum': ['chat'],\n", + " 'type': 'string'},\n", + " 'role': {'title': 'Role', 'type': 'string'},\n", + " 'is_chunk': {'title': 'Is Chunk',\n", + " 'default': True,\n", + " 'enum': [True],\n", + " 'type': 'boolean'}},\n", + " 'required': ['content', 'role']},\n", + " 'FunctionMessageChunk': {'title': 'FunctionMessageChunk',\n", + " 'description': 'A Function Message chunk.',\n", + " 'type': 'object',\n", + " 'properties': {'content': {'title': 'Content', 'type': 'string'},\n", + " 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},\n", + " 'type': {'title': 'Type',\n", + " 'default': 'function',\n", + " 'enum': ['function'],\n", + " 'type': 'string'},\n", + " 'name': {'title': 'Name', 'type': 'string'},\n", + " 'is_chunk': {'title': 'Is Chunk',\n", + " 'default': True,\n", + " 'enum': [True],\n", + " 'type': 'boolean'}},\n", + " 'required': ['content', 'name']},\n", + " 'SystemMessageChunk': {'title': 'SystemMessageChunk',\n", + " 'description': 'A System Message chunk.',\n", + " 'type': 'object',\n", + " 'properties': {'content': {'title': 'Content', 'type': 'string'},\n", + " 'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},\n", + " 'type': {'title': 'Type',\n", + " 'default': 'system',\n", + " 'enum': ['system'],\n", + " 'type': 'string'},\n", + " 'is_chunk': {'title': 'Is Chunk',\n", + " 'default': True,\n", + " 'enum': [True],\n", + " 'type': 'boolean'}},\n", + " 'required': ['content']}}}" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# The output schema of the chain is the output schema of its last part, in this case a ChatModel, which outputs a ChatMessage\n", + "chain.output_schema.schema()" ] }, { @@ -103,7 +257,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 7, "id": "bea9639d", "metadata": {}, "outputs": [ @@ -111,9 +265,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Sure, here's a bear-themed joke for you:\n", - "\n", - "Why don't bears wear shoes?\n", + "Why don't bears wear shoes? \n", "\n", "Because they have bear feet!" ] @@ -121,7 +273,7 @@ ], "source": [ "for s in chain.stream({\"topic\": \"bears\"}):\n", - " print(s.content, end=\"\", flush=True)" + " print(s.content, end=\"\", flush=True)\n" ] }, { @@ -134,23 +286,23 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 8, "id": "470e483f", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "AIMessage(content=\"Why don't bears wear shoes?\\n\\nBecause they already have bear feet!\", additional_kwargs={}, example=False)" + "AIMessage(content=\"Why don't bears wear shoes?\\n\\nBecause they have bear feet!\")" ] }, - "execution_count": 9, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "chain.invoke({\"topic\": \"bears\"})" + "chain.invoke({\"topic\": \"bears\"})\n" ] }, { @@ -163,24 +315,24 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": 9, "id": "9685de67", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "[AIMessage(content=\"Why don't bears ever wear shoes?\\n\\nBecause they have bear feet!\", additional_kwargs={}, example=False),\n", - " AIMessage(content=\"Why don't cats play poker in the wild?\\n\\nToo many cheetahs!\", additional_kwargs={}, example=False)]" + "[AIMessage(content=\"Why don't bears wear shoes?\\n\\nBecause they have bear feet!\"),\n", + " AIMessage(content=\"Why don't cats play poker in the wild?\\n\\nToo many cheetahs!\")]" ] }, - "execution_count": 19, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "chain.batch([{\"topic\": \"bears\"}, {\"topic\": \"cats\"}])" + "chain.batch([{\"topic\": \"bears\"}, {\"topic\": \"cats\"}])\n" ] }, { @@ -193,24 +345,24 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 10, "id": "a08522f6", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "[AIMessage(content=\"Why don't bears wear shoes?\\n\\nBecause they have bear feet!\", additional_kwargs={}, example=False),\n", - " AIMessage(content=\"Why don't cats play poker in the wild?\\n\\nToo many cheetahs!\", additional_kwargs={}, example=False)]" + "[AIMessage(content=\"Why don't bears wear shoes?\\n\\nBecause they have bear feet!\"),\n", + " AIMessage(content=\"Sure, here's a cat joke for you:\\n\\nWhy don't cats play poker in the wild?\\n\\nToo many cheetahs!\")]" ] }, - "execution_count": 5, + "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "chain.batch([{\"topic\": \"bears\"}, {\"topic\": \"cats\"}], config={\"max_concurrency\": 5})" + "chain.batch([{\"topic\": \"bears\"}, {\"topic\": \"cats\"}], config={\"max_concurrency\": 5})\n" ] }, { @@ -223,7 +375,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 11, "id": "ea35eee4", "metadata": {}, "outputs": [ @@ -231,6 +383,8 @@ "name": "stdout", "output_type": "stream", "text": [ + "Sure, here's a bear joke for you:\n", + "\n", "Why don't bears wear shoes?\n", "\n", "Because they have bear feet!" @@ -239,7 +393,7 @@ ], "source": [ "async for s in chain.astream({\"topic\": \"bears\"}):\n", - " print(s.content, end=\"\", flush=True)" + " print(s.content, end=\"\", flush=True)\n" ] }, { @@ -252,23 +406,23 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 12, "id": "ef8c9b20", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "AIMessage(content=\"Sure, here you go:\\n\\nWhy don't bears wear shoes?\\n\\nBecause they have bear feet!\", additional_kwargs={}, example=False)" + "AIMessage(content=\"Why don't bears wear shoes? \\n\\nBecause they have bear feet!\")" ] }, - "execution_count": 16, + "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "await chain.ainvoke({\"topic\": \"bears\"})" + "await chain.ainvoke({\"topic\": \"bears\"})\n" ] }, { @@ -281,28 +435,360 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 13, "id": "eba2a103", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "[AIMessage(content=\"Why don't bears wear shoes?\\n\\nBecause they have bear feet!\", additional_kwargs={}, example=False)]" + "[AIMessage(content=\"Why don't bears wear shoes?\\n\\nBecause they have bear feet!\")]" ] }, - "execution_count": 18, + "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "await chain.abatch([{\"topic\": \"bears\"}])" + "await chain.abatch([{\"topic\": \"bears\"}])\n" ] }, { "cell_type": "markdown", - "id": "0a1c409d", + "id": "f9cef104", + "metadata": {}, + "source": [ + "## Async Stream Intermediate Steps\n", + "\n", + "All runnables also have a method `.astream_log()` which can be used to stream (as they happen) all or part of the intermediate steps of your chain/sequence. \n", + "\n", + "This is useful eg. to show progress to the user, to use intermediate results, or even just to debug your chain.\n", + "\n", + "You can choose to stream all steps (default), or include/exclude steps by name, tags or metadata.\n", + "\n", + "This method yields [JSONPatch](https://jsonpatch.com) ops that when applied in the same order as received build up the RunState.\n", + "\n", + "```python\n", + "class LogEntry(TypedDict):\n", + " id: str\n", + " \"\"\"ID of the sub-run.\"\"\"\n", + " name: str\n", + " \"\"\"Name of the object being run.\"\"\"\n", + " type: str\n", + " \"\"\"Type of the object being run, eg. prompt, chain, llm, etc.\"\"\"\n", + " tags: List[str]\n", + " \"\"\"List of tags for the run.\"\"\"\n", + " metadata: Dict[str, Any]\n", + " \"\"\"Key-value pairs of metadata for the run.\"\"\"\n", + " start_time: str\n", + " \"\"\"ISO-8601 timestamp of when the run started.\"\"\"\n", + "\n", + " streamed_output_str: List[str]\n", + " \"\"\"List of LLM tokens streamed by this run, if applicable.\"\"\"\n", + " final_output: Optional[Any]\n", + " \"\"\"Final output of this run.\n", + " Only available after the run has finished successfully.\"\"\"\n", + " end_time: Optional[str]\n", + " \"\"\"ISO-8601 timestamp of when the run ended.\n", + " Only available after the run has finished.\"\"\"\n", + "\n", + "\n", + "class RunState(TypedDict):\n", + " id: str\n", + " \"\"\"ID of the run.\"\"\"\n", + " streamed_output: List[Any]\n", + " \"\"\"List of output chunks streamed by Runnable.stream()\"\"\"\n", + " final_output: Optional[Any]\n", + " \"\"\"Final output of the run, usually the result of aggregating (`+`) streamed_output.\n", + " Only available after the run has finished successfully.\"\"\"\n", + "\n", + " logs: Dict[str, LogEntry]\n", + " \"\"\"Map of run names to sub-runs. If filters were supplied, this list will\n", + " contain only the runs that matched the filters.\"\"\"\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "a146a5df-25be-4fa2-a7e4-df8ebe55a35e", + "metadata": {}, + "source": [ + "### Streaming JSONPatch chunks\n", + "\n", + "This is useful eg. to stream the JSONPatch in an HTTP server, and then apply the ops on the client to rebuild the run state there. See [LangServe](https://github.com/langchain-ai/langserve) for tooling to make it easier to build a webserver from any Runnable." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "21c9019e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "RunLogPatch({'op': 'replace',\n", + " 'path': '',\n", + " 'value': {'final_output': None,\n", + " 'id': 'fd6fcf62-c92c-4edf-8713-0fc5df000f62',\n", + " 'logs': {},\n", + " 'streamed_output': []}})\n", + "RunLogPatch({'op': 'add',\n", + " 'path': '/logs/Docs',\n", + " 'value': {'end_time': None,\n", + " 'final_output': None,\n", + " 'id': '8c998257-1ec8-4546-b744-c3fdb9728c41',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:35.668',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}})\n", + "RunLogPatch({'op': 'add',\n", + " 'path': '/logs/Docs/final_output',\n", + " 'value': {'documents': [Document(page_content='harrison worked at kensho')]}},\n", + " {'op': 'add',\n", + " 'path': '/logs/Docs/end_time',\n", + " 'value': '2023-10-05T12:52:36.033'})\n", + "RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': ''})\n", + "RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': 'H'})\n", + "RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': 'arrison'})\n", + "RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': ' worked'})\n", + "RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': ' at'})\n", + "RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': ' Kens'})\n", + "RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': 'ho'})\n", + "RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': '.'})\n", + "RunLogPatch({'op': 'add', 'path': '/streamed_output/-', 'value': ''})\n", + "RunLogPatch({'op': 'replace',\n", + " 'path': '/final_output',\n", + " 'value': {'output': 'Harrison worked at Kensho.'}})\n" + ] + } + ], + "source": [ + "from langchain.embeddings import OpenAIEmbeddings\n", + "from langchain.schema.output_parser import StrOutputParser\n", + "from langchain.schema.runnable import RunnablePassthrough\n", + "from langchain.vectorstores import FAISS\n", + "\n", + "template = \"\"\"Answer the question based only on the following context:\n", + "{context}\n", + "\n", + "Question: {question}\n", + "\"\"\"\n", + "prompt = ChatPromptTemplate.from_template(template)\n", + "\n", + "vectorstore = FAISS.from_texts([\"harrison worked at kensho\"], embedding=OpenAIEmbeddings())\n", + "retriever = vectorstore.as_retriever()\n", + "\n", + "retrieval_chain = (\n", + " {\"context\": retriever.with_config(run_name='Docs'), \"question\": RunnablePassthrough()}\n", + " | prompt \n", + " | model \n", + " | StrOutputParser()\n", + ")\n", + "\n", + "async for chunk in retrieval_chain.astream_log(\"where did harrison work?\", include_names=['Docs']):\n", + " print(chunk)\n" + ] + }, + { + "cell_type": "markdown", + "id": "19570f36-7126-4fe2-b209-0cc6178b4582", + "metadata": {}, + "source": [ + "### Streaming the incremental RunState\n", + "\n", + "You can simply pass diff=False to get incremental values of RunState." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "5c26b731-b4eb-4967-a42a-dec813249ecb", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {},\n", + " 'streamed_output': []})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': None,\n", + " 'final_output': None,\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': []})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': []})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['']})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['', 'H']})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['', 'H', 'arrison']})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['', 'H', 'arrison', ' worked']})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['', 'H', 'arrison', ' worked', ' at']})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['', 'H', 'arrison', ' worked', ' at', ' Kens']})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['', 'H', 'arrison', ' worked', ' at', ' Kens', 'ho']})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['', 'H', 'arrison', ' worked', ' at', ' Kens', 'ho', '.']})\n", + "RunLog({'final_output': None,\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['',\n", + " 'H',\n", + " 'arrison',\n", + " ' worked',\n", + " ' at',\n", + " ' Kens',\n", + " 'ho',\n", + " '.',\n", + " '']})\n", + "RunLog({'final_output': {'output': 'Harrison worked at Kensho.'},\n", + " 'id': 'f95ccb87-31f1-48ea-a51c-d2dadde44185',\n", + " 'logs': {'Docs': {'end_time': '2023-10-05T12:52:37.217',\n", + " 'final_output': {'documents': [Document(page_content='harrison worked at kensho')]},\n", + " 'id': '621597dd-d716-4532-938d-debc21a453d1',\n", + " 'metadata': {},\n", + " 'name': 'Docs',\n", + " 'start_time': '2023-10-05T12:52:36.935',\n", + " 'streamed_output_str': [],\n", + " 'tags': ['map:key:context', 'FAISS'],\n", + " 'type': 'retriever'}},\n", + " 'streamed_output': ['',\n", + " 'H',\n", + " 'arrison',\n", + " ' worked',\n", + " ' at',\n", + " ' Kens',\n", + " 'ho',\n", + " '.',\n", + " '']})\n" + ] + } + ], + "source": [ + "async for chunk in retrieval_chain.astream_log(\"where did harrison work?\", include_names=['Docs'], diff=False):\n", + " print(chunk)" + ] + }, + { + "cell_type": "markdown", + "id": "7006f1aa", "metadata": {}, "source": [ "## Parallelism\n", @@ -313,7 +799,7 @@ { "cell_type": "code", "execution_count": 7, - "id": "e3014c7a", + "id": "0a1c409d", "metadata": {}, "outputs": [], "source": [ @@ -323,7 +809,7 @@ "combined = RunnableMap({\n", " \"joke\": chain1,\n", " \"poem\": chain2,\n", - "})" + "})\n" ] }, { @@ -353,7 +839,7 @@ ], "source": [ "%%time\n", - "chain1.invoke({\"topic\": \"bears\"})" + "chain1.invoke({\"topic\": \"bears\"})\n" ] }, { @@ -383,7 +869,7 @@ ], "source": [ "%%time\n", - "chain2.invoke({\"topic\": \"bears\"})" + "chain2.invoke({\"topic\": \"bears\"})\n" ] }, { @@ -414,7 +900,7 @@ ], "source": [ "%%time\n", - "combined.invoke({\"topic\": \"bears\"})" + "combined.invoke({\"topic\": \"bears\"})\n" ] }, { @@ -442,7 +928,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.1" + "version": "3.11.5" } }, "nbformat": 4, diff --git a/libs/langchain/langchain/callbacks/tracers/base.py b/libs/langchain/langchain/callbacks/tracers/base.py index f270a5ec55..10fd1d701f 100644 --- a/libs/langchain/langchain/callbacks/tracers/base.py +++ b/libs/langchain/langchain/callbacks/tracers/base.py @@ -49,7 +49,7 @@ class BaseTracer(BaseCallbackHandler, ABC): def _start_trace(self, run: Run) -> None: """Start a trace for a run.""" if run.parent_run_id: - parent_run = self.run_map[str(run.parent_run_id)] + parent_run = self.run_map.get(str(run.parent_run_id)) if parent_run: self._add_child_run(parent_run, run) parent_run.child_execution_order = max( diff --git a/libs/langchain/langchain/callbacks/tracers/log_stream.py b/libs/langchain/langchain/callbacks/tracers/log_stream.py index 2440d5feca..33827b5dbb 100644 --- a/libs/langchain/langchain/callbacks/tracers/log_stream.py +++ b/libs/langchain/langchain/callbacks/tracers/log_stream.py @@ -54,13 +54,12 @@ class RunState(TypedDict): streamed_output: List[Any] """List of output chunks streamed by Runnable.stream()""" final_output: Optional[Any] - """Final output of the run, usually the result of aggregating streamed_output. + """Final output of the run, usually the result of aggregating (`+`) streamed_output. Only available after the run has finished successfully.""" logs: Dict[str, LogEntry] - """List of sub-runs contained in this run, if any, in the order they were started. - If filters were supplied, this list will contain only the runs that matched the - filters.""" + """Map of run names to sub-runs. If filters were supplied, this list will + contain only the runs that matched the filters.""" class RunLogPatch: @@ -74,7 +73,7 @@ class RunLogPatch: def __init__(self, *ops: Dict[str, Any]) -> None: self.ops = list(ops) - def __add__(self, other: Union[RunLogPatch, Any]) -> RunLogPatch: + def __add__(self, other: Union[RunLogPatch, Any]) -> RunLog: if type(other) == RunLogPatch: ops = self.ops + other.ops state = jsonpatch.apply_patch(None, ops) @@ -102,7 +101,7 @@ class RunLog(RunLogPatch): super().__init__(*ops) self.state = state - def __add__(self, other: Union[RunLogPatch, Any]) -> RunLogPatch: + def __add__(self, other: Union[RunLogPatch, Any]) -> RunLog: if type(other) == RunLogPatch: ops = self.ops + other.ops state = jsonpatch.apply_patch(self.state, other.ops) diff --git a/libs/langchain/langchain/schema/runnable/base.py b/libs/langchain/langchain/schema/runnable/base.py index 57a93d8f7c..6bfef3ed44 100644 --- a/libs/langchain/langchain/schema/runnable/base.py +++ b/libs/langchain/langchain/schema/runnable/base.py @@ -26,16 +26,17 @@ from typing import ( TypeVar, Union, cast, + overload, ) -from typing_extensions import get_args +from typing_extensions import Literal, get_args if TYPE_CHECKING: from langchain.callbacks.manager import ( AsyncCallbackManagerForChainRun, CallbackManagerForChainRun, ) - from langchain.callbacks.tracers.log_stream import RunLogPatch + from langchain.callbacks.tracers.log_stream import RunLog, RunLogPatch from langchain.schema.runnable.fallbacks import ( RunnableWithFallbacks as RunnableWithFallbacksT, ) @@ -290,11 +291,13 @@ class Runnable(Generic[Input, Output], ABC): """ yield await self.ainvoke(input, config, **kwargs) - async def astream_log( + @overload + def astream_log( self, input: Any, config: Optional[RunnableConfig] = None, *, + diff: Literal[True] = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, @@ -303,6 +306,39 @@ class Runnable(Generic[Input, Output], ABC): exclude_tags: Optional[Sequence[str]] = None, **kwargs: Optional[Any], ) -> AsyncIterator[RunLogPatch]: + ... + + @overload + def astream_log( + self, + input: Any, + config: Optional[RunnableConfig] = None, + *, + diff: Literal[False], + include_names: Optional[Sequence[str]] = None, + include_types: Optional[Sequence[str]] = None, + include_tags: Optional[Sequence[str]] = None, + exclude_names: Optional[Sequence[str]] = None, + exclude_types: Optional[Sequence[str]] = None, + exclude_tags: Optional[Sequence[str]] = None, + **kwargs: Optional[Any], + ) -> AsyncIterator[RunLog]: + ... + + async def astream_log( + self, + input: Any, + config: Optional[RunnableConfig] = None, + *, + diff: bool = True, + include_names: Optional[Sequence[str]] = None, + include_types: Optional[Sequence[str]] = None, + include_tags: Optional[Sequence[str]] = None, + exclude_names: Optional[Sequence[str]] = None, + exclude_types: Optional[Sequence[str]] = None, + exclude_tags: Optional[Sequence[str]] = None, + **kwargs: Optional[Any], + ) -> Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]: """ Stream all output from a runnable, as reported to the callback system. This includes all inner runs of LLMs, Retrievers, Tools, etc. @@ -317,6 +353,7 @@ class Runnable(Generic[Input, Output], ABC): from langchain.callbacks.base import BaseCallbackManager from langchain.callbacks.tracers.log_stream import ( LogStreamCallbackHandler, + RunLog, RunLogPatch, ) @@ -370,8 +407,14 @@ class Runnable(Generic[Input, Output], ABC): try: # Yield each chunk from the output stream - async for log in stream: - yield log + if diff: + async for log in stream: + yield log + else: + state = RunLog(state=None) # type: ignore[arg-type] + async for log in stream: + state = state + log + yield state finally: # Wait for the runnable to finish, if not cancelled (eg. by break) try: diff --git a/libs/langchain/langchain/schema/runnable/passthrough.py b/libs/langchain/langchain/schema/runnable/passthrough.py index 874123c394..ce893b6a52 100644 --- a/libs/langchain/langchain/schema/runnable/passthrough.py +++ b/libs/langchain/langchain/schema/runnable/passthrough.py @@ -171,7 +171,9 @@ class RunnableAssign(RunnableSerializable[Dict[str, Any], Dict[str, Any]]): config: Optional[RunnableConfig] = None, **kwargs: Any, ) -> Dict[str, Any]: - assert isinstance(input, dict) + assert isinstance( + input, dict + ), "The input to RunnablePassthrough.assign() must be a dict." return { **input, **self.mapper.invoke(input, config, **kwargs), @@ -183,7 +185,9 @@ class RunnableAssign(RunnableSerializable[Dict[str, Any], Dict[str, Any]]): config: Optional[RunnableConfig] = None, **kwargs: Any, ) -> Dict[str, Any]: - assert isinstance(input, dict) + assert isinstance( + input, dict + ), "The input to RunnablePassthrough.assign() must be a dict." return { **input, **await self.mapper.ainvoke(input, config, **kwargs), @@ -204,10 +208,16 @@ class RunnableAssign(RunnableSerializable[Dict[str, Any], Dict[str, Any]]): # get executor to start map output stream in background with get_executor_for_config(config or {}) as executor: # start map output stream - first_map_chunk_future = executor.submit(next, map_output) # type: ignore + first_map_chunk_future = executor.submit( + next, + map_output, # type: ignore + None, + ) # consume passthrough stream for chunk in for_passthrough: - assert isinstance(chunk, dict) + assert isinstance( + chunk, dict + ), "The input to RunnablePassthrough.assign() must be a dict." # remove mapper keys from passthrough chunk, to be overwritten by map filtered = AddableDict( {k: v for k, v in chunk.items() if k not in mapper_keys} @@ -233,11 +243,13 @@ class RunnableAssign(RunnableSerializable[Dict[str, Any], Dict[str, Any]]): map_output = self.mapper.atransform(for_map, config, **kwargs) # start map output stream first_map_chunk_task: asyncio.Task = asyncio.create_task( - py_anext(map_output), # type: ignore[arg-type] + py_anext(map_output, None), # type: ignore[arg-type] ) # consume passthrough stream async for chunk in for_passthrough: - assert isinstance(chunk, dict) + assert isinstance( + chunk, dict + ), "The input to RunnablePassthrough.assign() must be a dict." # remove mapper keys from passthrough chunk, to be overwritten by map output filtered = AddableDict( {k: v for k, v in chunk.items() if k not in mapper_keys} diff --git a/libs/langchain/tests/unit_tests/schema/runnable/test_runnable.py b/libs/langchain/tests/unit_tests/schema/runnable/test_runnable.py index 680df3c4d3..81da3b0c97 100644 --- a/libs/langchain/tests/unit_tests/schema/runnable/test_runnable.py +++ b/libs/langchain/tests/unit_tests/schema/runnable/test_runnable.py @@ -1260,6 +1260,42 @@ async def test_prompt() -> None: RunLogPatch({"op": "add", "path": "/streamed_output/-", "value": expected}), ] + stream_log_state = [ + part + async for part in prompt.astream_log( + {"question": "What is your name?"}, diff=False + ) + ] + + # remove random id + stream_log[0].ops[0]["value"]["id"] = "00000000-0000-0000-0000-000000000000" + stream_log_state[-1].ops[0]["value"]["id"] = "00000000-0000-0000-0000-000000000000" + stream_log_state[-1].state["id"] = "00000000-0000-0000-0000-000000000000" + + # assert output with diff=False matches output with diff=True + assert stream_log_state[-1].ops == [op for chunk in stream_log for op in chunk.ops] + assert stream_log_state[-1] == RunLog( + *[op for chunk in stream_log for op in chunk.ops], + state={ + "final_output": ChatPromptValue( + messages=[ + SystemMessage(content="You are a nice assistant."), + HumanMessage(content="What is your name?"), + ] + ), + "id": "00000000-0000-0000-0000-000000000000", + "logs": {}, + "streamed_output": [ + ChatPromptValue( + messages=[ + SystemMessage(content="You are a nice assistant."), + HumanMessage(content="What is your name?"), + ] + ) + ], + }, + ) + def test_prompt_template_params() -> None: prompt = ChatPromptTemplate.from_template(