langchain/docs/extras/modules/callbacks/async_callbacks.ipynb

135 lines
5.1 KiB
Plaintext
Raw Normal View History

{
"cells": [
{
"cell_type": "markdown",
"id": "9418c7ff",
"metadata": {},
"source": [
"# Async callbacks\n",
"\n",
"If you are planning to use the async API, it is recommended to use `AsyncCallbackHandler` to avoid blocking the runloop. \n",
"\n",
"**Advanced** if you use a sync `CallbackHandler` while using an async method to run your LLM / Chain / Tool / Agent, it will still work. However, under the hood, it will be called with [`run_in_executor`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) which can cause issues if your `CallbackHandler` is not thread-safe."
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "f771eea0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"zzzz....\n",
"Hi! I just woke up. Your llm is starting\n",
"Sync handler being called in a `thread_pool_executor`: token: \n",
"Sync handler being called in a `thread_pool_executor`: token: Why\n",
"Sync handler being called in a `thread_pool_executor`: token: don\n",
"Sync handler being called in a `thread_pool_executor`: token: 't\n",
"Sync handler being called in a `thread_pool_executor`: token: scientists\n",
"Sync handler being called in a `thread_pool_executor`: token: trust\n",
"Sync handler being called in a `thread_pool_executor`: token: atoms\n",
"Sync handler being called in a `thread_pool_executor`: token: ?\n",
"Sync handler being called in a `thread_pool_executor`: token: \n",
"\n",
"\n",
"Sync handler being called in a `thread_pool_executor`: token: Because\n",
"Sync handler being called in a `thread_pool_executor`: token: they\n",
"Sync handler being called in a `thread_pool_executor`: token: make\n",
"Sync handler being called in a `thread_pool_executor`: token: up\n",
"Sync handler being called in a `thread_pool_executor`: token: everything\n",
"Sync handler being called in a `thread_pool_executor`: token: .\n",
"Sync handler being called in a `thread_pool_executor`: token: \n",
"zzzz....\n",
"Hi! I just woke up. Your llm is ending\n"
]
},
{
"data": {
"text/plain": [
"LLMResult(generations=[[ChatGeneration(text=\"Why don't scientists trust atoms? \\n\\nBecause they make up everything.\", generation_info=None, message=AIMessage(content=\"Why don't scientists trust atoms? \\n\\nBecause they make up everything.\", additional_kwargs={}, example=False))]], llm_output={'token_usage': {}, 'model_name': 'gpt-3.5-turbo'})"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import asyncio\n",
"from typing import Any, Dict, List\n",
"\n",
"from langchain.chat_models import ChatOpenAI\n",
"from langchain.schema import LLMResult, HumanMessage\n",
"from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler\n",
"\n",
"\n",
"class MyCustomSyncHandler(BaseCallbackHandler):\n",
" def on_llm_new_token(self, token: str, **kwargs) -> None:\n",
" print(f\"Sync handler being called in a `thread_pool_executor`: token: {token}\")\n",
"\n",
"\n",
"class MyCustomAsyncHandler(AsyncCallbackHandler):\n",
" \"\"\"Async callback handler that can be used to handle callbacks from langchain.\"\"\"\n",
"\n",
" async def on_llm_start(\n",
" self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any\n",
" ) -> None:\n",
" \"\"\"Run when chain starts running.\"\"\"\n",
" print(\"zzzz....\")\n",
" await asyncio.sleep(0.3)\n",
" class_name = serialized[\"name\"]\n",
" print(\"Hi! I just woke up. Your llm is starting\")\n",
"\n",
" async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:\n",
" \"\"\"Run when chain ends running.\"\"\"\n",
" print(\"zzzz....\")\n",
" await asyncio.sleep(0.3)\n",
" print(\"Hi! I just woke up. Your llm is ending\")\n",
"\n",
"\n",
"# To enable streaming, we pass in `streaming=True` to the ChatModel constructor\n",
"# Additionally, we pass in a list with our custom handler\n",
"chat = ChatOpenAI(\n",
" max_tokens=25,\n",
" streaming=True,\n",
" callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],\n",
")\n",
"\n",
"await chat.agenerate([[HumanMessage(content=\"Tell me a joke\")]])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "01778cac",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "venv"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}