From 008c7df80de8b8c2c141768e30474c51ae31d979 Mon Sep 17 00:00:00 2001 From: Hugues Chocart Date: Thu, 19 Oct 2023 08:54:10 +0200 Subject: [PATCH] [LLMonitorCallbackHandler] Refactor + add llmonitor-py dependency (#11948) We now require uses to have the pip package `llmonitor` installed. It allows us to have cleaner code and avoid duplicates between our library and our code in Langchain. --- .../langchain/callbacks/llmonitor_callback.py | 362 ++++++++++-------- 1 file changed, 202 insertions(+), 160 deletions(-) diff --git a/libs/langchain/langchain/callbacks/llmonitor_callback.py b/libs/langchain/langchain/callbacks/llmonitor_callback.py index c160685f04..ee7bc4dd38 100644 --- a/libs/langchain/langchain/callbacks/llmonitor_callback.py +++ b/libs/langchain/langchain/callbacks/llmonitor_callback.py @@ -1,12 +1,14 @@ +import importlib.metadata import logging import os import traceback +import warnings from contextvars import ContextVar -from datetime import datetime from typing import Any, Dict, List, Literal, Union from uuid import UUID import requests +from packaging.version import parse from langchain.callbacks.base import BaseCallbackHandler from langchain.schema.agent import AgentAction, AgentFinish @@ -142,7 +144,7 @@ def _get_user_props(metadata: Any) -> Any: return user_props_ctx.get() metadata = metadata or {} - return metadata.get("user_props") + return metadata.get("user_props", None) def _parse_lc_message(message: BaseMessage) -> Dict[str, Any]: @@ -191,6 +193,8 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): __api_url: str __app_id: str __verbose: bool + __llmonitor_version: str + __has_valid_config: bool def __init__( self, @@ -200,37 +204,58 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): ) -> None: super().__init__() - self.__api_url = api_url or os.getenv("LLMONITOR_API_URL") or DEFAULT_API_URL + self.__has_valid_config = True + + try: + import llmonitor + + self.__llmonitor_version = importlib.metadata.version("llmonitor") + self.__track_event = llmonitor.track_event + + except ImportError: + warnings.warn( + """[LLMonitor] To use the LLMonitor callback handler you need to + have the `llmonitor` Python package installed. Please install it + with `pip install llmonitor`""" + ) + self.__has_valid_config = False + + if parse(self.__llmonitor_version) < parse("0.0.20"): + warnings.warn( + f"""[LLMonitor] The installed `llmonitor` version is + {self.__llmonitor_version} but `LLMonitorCallbackHandler` requires + at least version 0.0.20 upgrade `llmonitor` with `pip install + --upgrade llmonitor`""" + ) + self.__has_valid_config = False + + self.__has_valid_config = True + self.__api_url = api_url or os.getenv("LLMONITOR_API_URL") or DEFAULT_API_URL self.__verbose = verbose or bool(os.getenv("LLMONITOR_VERBOSE")) _app_id = app_id or os.getenv("LLMONITOR_APP_ID") if _app_id is None: - raise ValueError( - """app_id must be provided either as an argument or as + warnings.warn( + """[LLMonitor] app_id must be provided either as an argument or as an environment variable""" ) - self.__app_id = _app_id + self.__has_valid_config = False + else: + self.__app_id = _app_id + + if self.__has_valid_config is False: + return None try: res = requests.get(f"{self.__api_url}/api/app/{self.__app_id}") if not res.ok: raise ConnectionError() - except Exception as e: - raise ConnectionError( - f"Could not connect to the LLMonitor API at {self.__api_url}" - ) from e - - def __send_event(self, event: Dict[str, Any]) -> None: - headers = {"Content-Type": "application/json"} - - event = {**event, "app": self.__app_id, "timestamp": str(datetime.utcnow())} - - if self.__verbose: - print("llmonitor_callback", event) - - data = {"events": event} - requests.post(headers=headers, url=f"{self.__api_url}/api/report", json=data) + except Exception: + warnings.warn( + f"""[LLMonitor] Could not connect to the LLMonitor API at + {self.__api_url}""" + ) def on_llm_start( self, @@ -243,27 +268,28 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): metadata: Union[Dict[str, Any], None] = None, **kwargs: Any, ) -> None: + if self.__has_valid_config is False: + return try: user_id = _get_user_id(metadata) user_props = _get_user_props(metadata) - - event = { - "event": "start", - "type": "llm", - "userId": user_id, - "runId": str(run_id), - "parentRunId": str(parent_run_id) if parent_run_id else None, - "input": _parse_input(prompts), - "name": kwargs.get("invocation_params", {}).get("model_name"), - "tags": tags, - "metadata": metadata, - } - if user_props: - event["userProps"] = user_props - - self.__send_event(event) + name = kwargs.get("invocation_params", {}).get("model_name") + input = _parse_input(prompts) + + self.__track_event( + "llm", + "start", + user_id=user_id, + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + name=name, + input=input, + tags=tags, + metadata=metadata, + user_props=user_props, + ) except Exception as e: - logging.warning(f"[LLMonitor] An error occurred in on_llm_start: {e}") + warnings.warn(f"[LLMonitor] An error occurred in on_llm_start: {e}") def on_chat_model_start( self, @@ -276,28 +302,29 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): metadata: Union[Dict[str, Any], None] = None, **kwargs: Any, ) -> Any: + if self.__has_valid_config is False: + return try: user_id = _get_user_id(metadata) user_props = _get_user_props(metadata) - - event = { - "event": "start", - "type": "llm", - "userId": user_id, - "runId": str(run_id), - "parentRunId": str(parent_run_id) if parent_run_id else None, - "input": _parse_lc_messages(messages[0]), - "name": kwargs.get("invocation_params", {}).get("model_name"), - "tags": tags, - "metadata": metadata, - } - if user_props: - event["userProps"] = user_props - - self.__send_event(event) + name = kwargs.get("invocation_params", {}).get("model_name") + input = _parse_lc_messages(messages[0]) + + self.__track_event( + "llm", + "start", + user_id=user_id, + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + name=name, + input=input, + tags=tags, + metadata=metadata, + user_props=user_props, + ) except Exception as e: logging.warning( - f"[LLMonitor] An error occurred in on_chat_model_start: " f"{e}" + f"[LLMonitor] An error occurred in on_chat_model_start: {e}" ) def on_llm_end( @@ -308,9 +335,11 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): parent_run_id: Union[UUID, None] = None, **kwargs: Any, ) -> None: + if self.__has_valid_config is False: + return + try: token_usage = (response.llm_output or {}).get("token_usage", {}) - parsed_output = [ { "text": generation.text, @@ -330,20 +359,19 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): for generation in response.generations[0] ] - event = { - "event": "end", - "type": "llm", - "runId": str(run_id), - "parent_run_id": str(parent_run_id) if parent_run_id else None, - "output": parsed_output, - "tokensUsage": { + self.__track_event( + "llm", + "end", + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + output=parsed_output, + token_usage={ "prompt": token_usage.get("prompt_tokens"), "completion": token_usage.get("completion_tokens"), }, - } - self.__send_event(event) + ) except Exception as e: - logging.warning(f"[LLMonitor] An error occurred in on_llm_end: {e}") + warnings.warn(f"[LLMonitor] An error occurred in on_llm_end: {e}") def on_tool_start( self, @@ -356,27 +384,27 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): metadata: Union[Dict[str, Any], None] = None, **kwargs: Any, ) -> None: + if self.__has_valid_config is False: + return try: user_id = _get_user_id(metadata) user_props = _get_user_props(metadata) - - event = { - "event": "start", - "type": "tool", - "userId": user_id, - "runId": str(run_id), - "parentRunId": str(parent_run_id) if parent_run_id else None, - "name": serialized.get("name"), - "input": input_str, - "tags": tags, - "metadata": metadata, - } - if user_props: - event["userProps"] = user_props - - self.__send_event(event) + name = serialized.get("name") + + self.__track_event( + "tool", + "start", + user_id=user_id, + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + name=name, + input=input_str, + tags=tags, + metadata=metadata, + user_props=user_props, + ) except Exception as e: - logging.warning(f"[LLMonitor] An error occurred in on_tool_start: {e}") + warnings.warn(f"[LLMonitor] An error occurred in on_tool_start: {e}") def on_tool_end( self, @@ -387,17 +415,18 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): tags: Union[List[str], None] = None, **kwargs: Any, ) -> None: + if self.__has_valid_config is False: + return try: - event = { - "event": "end", - "type": "tool", - "runId": str(run_id), - "parent_run_id": str(parent_run_id) if parent_run_id else None, - "output": output, - } - self.__send_event(event) + self.__track_event( + "tool", + "end", + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + output=output, + ) except Exception as e: - logging.warning(f"[LLMonitor] An error occurred in on_tool_end: {e}") + warnings.warn(f"[LLMonitor] An error occurred in on_tool_end: {e}") def on_chain_start( self, @@ -410,6 +439,8 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): metadata: Union[Dict[str, Any], None] = None, **kwargs: Any, ) -> Any: + if self.__has_valid_config is False: + return try: name = serialized.get("id", [None, None, None, None])[3] type = "chain" @@ -419,35 +450,32 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): if agentName is None: agentName = metadata.get("agentName") + if name == "AgentExecutor" or name == "PlanAndExecute": + type = "agent" if agentName is not None: type = "agent" name = agentName - if name == "AgentExecutor" or name == "PlanAndExecute": - type = "agent" - if parent_run_id is not None: type = "chain" user_id = _get_user_id(metadata) user_props = _get_user_props(metadata) - - event = { - "event": "start", - "type": type, - "userId": user_id, - "runId": str(run_id), - "parentRunId": str(parent_run_id) if parent_run_id else None, - "input": _parse_input(inputs), - "tags": tags, - "metadata": metadata, - "name": name, - } - if user_props: - event["userProps"] = user_props - - self.__send_event(event) + input = _parse_input(inputs) + + self.__track_event( + type, + "start", + user_id=user_id, + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + name=name, + input=input, + tags=tags, + metadata=metadata, + user_props=user_props, + ) except Exception as e: - logging.warning(f"[LLMonitor] An error occurred in on_chain_start: {e}") + warnings.warn(f"[LLMonitor] An error occurred in on_chain_start: {e}") def on_chain_end( self, @@ -457,14 +485,18 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): parent_run_id: Union[UUID, None] = None, **kwargs: Any, ) -> Any: + if self.__has_valid_config is False: + return try: - event = { - "event": "end", - "type": "chain", - "runId": str(run_id), - "output": _parse_output(outputs), - } - self.__send_event(event) + output = _parse_output(outputs) + + self.__track_event( + "chain", + "end", + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + output=output, + ) except Exception as e: logging.warning(f"[LLMonitor] An error occurred in on_chain_end: {e}") @@ -476,16 +508,20 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): parent_run_id: Union[UUID, None] = None, **kwargs: Any, ) -> Any: + if self.__has_valid_config is False: + return try: - event = { - "event": "start", - "type": "tool", - "runId": str(run_id), - "parentRunId": str(parent_run_id) if parent_run_id else None, - "name": action.tool, - "input": _parse_input(action.tool_input), - } - self.__send_event(event) + name = action.tool + input = _parse_input(action.tool_input) + + self.__track_event( + "tool", + "start", + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + name=name, + input=input, + ) except Exception as e: logging.warning(f"[LLMonitor] An error occurred in on_agent_action: {e}") @@ -497,15 +533,18 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): parent_run_id: Union[UUID, None] = None, **kwargs: Any, ) -> Any: + if self.__has_valid_config is False: + return try: - event = { - "event": "end", - "type": "agent", - "runId": str(run_id), - "parentRunId": str(parent_run_id) if parent_run_id else None, - "output": _parse_output(finish.return_values), - } - self.__send_event(event) + output = _parse_output(finish.return_values) + + self.__track_event( + "agent", + "end", + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + output=output, + ) except Exception as e: logging.warning(f"[LLMonitor] An error occurred in on_agent_finish: {e}") @@ -517,15 +556,16 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): parent_run_id: Union[UUID, None] = None, **kwargs: Any, ) -> Any: + if self.__has_valid_config is False: + return try: - event = { - "event": "error", - "type": "chain", - "runId": str(run_id), - "parent_run_id": str(parent_run_id) if parent_run_id else None, - "error": {"message": str(error), "stack": traceback.format_exc()}, - } - self.__send_event(event) + self.__track_event( + "chain", + "error", + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + error={"message": str(error), "stack": traceback.format_exc()}, + ) except Exception as e: logging.warning(f"[LLMonitor] An error occurred in on_chain_error: {e}") @@ -537,15 +577,16 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): parent_run_id: Union[UUID, None] = None, **kwargs: Any, ) -> Any: + if self.__has_valid_config is False: + return try: - event = { - "event": "error", - "type": "tool", - "runId": str(run_id), - "parent_run_id": str(parent_run_id) if parent_run_id else None, - "error": {"message": str(error), "stack": traceback.format_exc()}, - } - self.__send_event(event) + self.__track_event( + "tool", + "error", + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + error={"message": str(error), "stack": traceback.format_exc()}, + ) except Exception as e: logging.warning(f"[LLMonitor] An error occurred in on_tool_error: {e}") @@ -557,15 +598,16 @@ class LLMonitorCallbackHandler(BaseCallbackHandler): parent_run_id: Union[UUID, None] = None, **kwargs: Any, ) -> Any: + if self.__has_valid_config is False: + return try: - event = { - "event": "error", - "type": "llm", - "runId": str(run_id), - "parent_run_id": str(parent_run_id) if parent_run_id else None, - "error": {"message": str(error), "stack": traceback.format_exc()}, - } - self.__send_event(event) + self.__track_event( + "llm", + "error", + run_id=str(run_id), + parent_run_id=str(parent_run_id) if parent_run_id else None, + error={"message": str(error), "stack": traceback.format_exc()}, + ) except Exception as e: logging.warning(f"[LLMonitor] An error occurred in on_llm_error: {e}")