[LLMonitorCallbackHandler] Refactor + add llmonitor-py dependency (#11948)

We now require uses to have the pip package `llmonitor` installed. It
allows us to have cleaner code and avoid duplicates between our library
and our code in Langchain.
pull/12016/head
Hugues Chocart 12 months ago committed by GitHub
parent 77fc2f7644
commit 008c7df80d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,12 +1,14 @@
import importlib.metadata
import logging
import os
import traceback
import warnings
from contextvars import ContextVar
from datetime import datetime
from typing import Any, Dict, List, Literal, Union
from uuid import UUID
import requests
from packaging.version import parse
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema.agent import AgentAction, AgentFinish
@ -142,7 +144,7 @@ def _get_user_props(metadata: Any) -> Any:
return user_props_ctx.get()
metadata = metadata or {}
return metadata.get("user_props")
return metadata.get("user_props", None)
def _parse_lc_message(message: BaseMessage) -> Dict[str, Any]:
@ -191,6 +193,8 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
__api_url: str
__app_id: str
__verbose: bool
__llmonitor_version: str
__has_valid_config: bool
def __init__(
self,
@ -200,37 +204,58 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
) -> None:
super().__init__()
self.__api_url = api_url or os.getenv("LLMONITOR_API_URL") or DEFAULT_API_URL
self.__has_valid_config = True
try:
import llmonitor
self.__llmonitor_version = importlib.metadata.version("llmonitor")
self.__track_event = llmonitor.track_event
except ImportError:
warnings.warn(
"""[LLMonitor] To use the LLMonitor callback handler you need to
have the `llmonitor` Python package installed. Please install it
with `pip install llmonitor`"""
)
self.__has_valid_config = False
if parse(self.__llmonitor_version) < parse("0.0.20"):
warnings.warn(
f"""[LLMonitor] The installed `llmonitor` version is
{self.__llmonitor_version} but `LLMonitorCallbackHandler` requires
at least version 0.0.20 upgrade `llmonitor` with `pip install
--upgrade llmonitor`"""
)
self.__has_valid_config = False
self.__has_valid_config = True
self.__api_url = api_url or os.getenv("LLMONITOR_API_URL") or DEFAULT_API_URL
self.__verbose = verbose or bool(os.getenv("LLMONITOR_VERBOSE"))
_app_id = app_id or os.getenv("LLMONITOR_APP_ID")
if _app_id is None:
raise ValueError(
"""app_id must be provided either as an argument or as
warnings.warn(
"""[LLMonitor] app_id must be provided either as an argument or as
an environment variable"""
)
self.__app_id = _app_id
self.__has_valid_config = False
else:
self.__app_id = _app_id
if self.__has_valid_config is False:
return None
try:
res = requests.get(f"{self.__api_url}/api/app/{self.__app_id}")
if not res.ok:
raise ConnectionError()
except Exception as e:
raise ConnectionError(
f"Could not connect to the LLMonitor API at {self.__api_url}"
) from e
def __send_event(self, event: Dict[str, Any]) -> None:
headers = {"Content-Type": "application/json"}
event = {**event, "app": self.__app_id, "timestamp": str(datetime.utcnow())}
if self.__verbose:
print("llmonitor_callback", event)
data = {"events": event}
requests.post(headers=headers, url=f"{self.__api_url}/api/report", json=data)
except Exception:
warnings.warn(
f"""[LLMonitor] Could not connect to the LLMonitor API at
{self.__api_url}"""
)
def on_llm_start(
self,
@ -243,27 +268,28 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
metadata: Union[Dict[str, Any], None] = None,
**kwargs: Any,
) -> None:
if self.__has_valid_config is False:
return
try:
user_id = _get_user_id(metadata)
user_props = _get_user_props(metadata)
event = {
"event": "start",
"type": "llm",
"userId": user_id,
"runId": str(run_id),
"parentRunId": str(parent_run_id) if parent_run_id else None,
"input": _parse_input(prompts),
"name": kwargs.get("invocation_params", {}).get("model_name"),
"tags": tags,
"metadata": metadata,
}
if user_props:
event["userProps"] = user_props
self.__send_event(event)
name = kwargs.get("invocation_params", {}).get("model_name")
input = _parse_input(prompts)
self.__track_event(
"llm",
"start",
user_id=user_id,
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
name=name,
input=input,
tags=tags,
metadata=metadata,
user_props=user_props,
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_llm_start: {e}")
warnings.warn(f"[LLMonitor] An error occurred in on_llm_start: {e}")
def on_chat_model_start(
self,
@ -276,28 +302,29 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
metadata: Union[Dict[str, Any], None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
user_id = _get_user_id(metadata)
user_props = _get_user_props(metadata)
event = {
"event": "start",
"type": "llm",
"userId": user_id,
"runId": str(run_id),
"parentRunId": str(parent_run_id) if parent_run_id else None,
"input": _parse_lc_messages(messages[0]),
"name": kwargs.get("invocation_params", {}).get("model_name"),
"tags": tags,
"metadata": metadata,
}
if user_props:
event["userProps"] = user_props
self.__send_event(event)
name = kwargs.get("invocation_params", {}).get("model_name")
input = _parse_lc_messages(messages[0])
self.__track_event(
"llm",
"start",
user_id=user_id,
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
name=name,
input=input,
tags=tags,
metadata=metadata,
user_props=user_props,
)
except Exception as e:
logging.warning(
f"[LLMonitor] An error occurred in on_chat_model_start: " f"{e}"
f"[LLMonitor] An error occurred in on_chat_model_start: {e}"
)
def on_llm_end(
@ -308,9 +335,11 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> None:
if self.__has_valid_config is False:
return
try:
token_usage = (response.llm_output or {}).get("token_usage", {})
parsed_output = [
{
"text": generation.text,
@ -330,20 +359,19 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
for generation in response.generations[0]
]
event = {
"event": "end",
"type": "llm",
"runId": str(run_id),
"parent_run_id": str(parent_run_id) if parent_run_id else None,
"output": parsed_output,
"tokensUsage": {
self.__track_event(
"llm",
"end",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
output=parsed_output,
token_usage={
"prompt": token_usage.get("prompt_tokens"),
"completion": token_usage.get("completion_tokens"),
},
}
self.__send_event(event)
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_llm_end: {e}")
warnings.warn(f"[LLMonitor] An error occurred in on_llm_end: {e}")
def on_tool_start(
self,
@ -356,27 +384,27 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
metadata: Union[Dict[str, Any], None] = None,
**kwargs: Any,
) -> None:
if self.__has_valid_config is False:
return
try:
user_id = _get_user_id(metadata)
user_props = _get_user_props(metadata)
event = {
"event": "start",
"type": "tool",
"userId": user_id,
"runId": str(run_id),
"parentRunId": str(parent_run_id) if parent_run_id else None,
"name": serialized.get("name"),
"input": input_str,
"tags": tags,
"metadata": metadata,
}
if user_props:
event["userProps"] = user_props
self.__send_event(event)
name = serialized.get("name")
self.__track_event(
"tool",
"start",
user_id=user_id,
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
name=name,
input=input_str,
tags=tags,
metadata=metadata,
user_props=user_props,
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_tool_start: {e}")
warnings.warn(f"[LLMonitor] An error occurred in on_tool_start: {e}")
def on_tool_end(
self,
@ -387,17 +415,18 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
tags: Union[List[str], None] = None,
**kwargs: Any,
) -> None:
if self.__has_valid_config is False:
return
try:
event = {
"event": "end",
"type": "tool",
"runId": str(run_id),
"parent_run_id": str(parent_run_id) if parent_run_id else None,
"output": output,
}
self.__send_event(event)
self.__track_event(
"tool",
"end",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
output=output,
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_tool_end: {e}")
warnings.warn(f"[LLMonitor] An error occurred in on_tool_end: {e}")
def on_chain_start(
self,
@ -410,6 +439,8 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
metadata: Union[Dict[str, Any], None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
name = serialized.get("id", [None, None, None, None])[3]
type = "chain"
@ -419,35 +450,32 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
if agentName is None:
agentName = metadata.get("agentName")
if name == "AgentExecutor" or name == "PlanAndExecute":
type = "agent"
if agentName is not None:
type = "agent"
name = agentName
if name == "AgentExecutor" or name == "PlanAndExecute":
type = "agent"
if parent_run_id is not None:
type = "chain"
user_id = _get_user_id(metadata)
user_props = _get_user_props(metadata)
event = {
"event": "start",
"type": type,
"userId": user_id,
"runId": str(run_id),
"parentRunId": str(parent_run_id) if parent_run_id else None,
"input": _parse_input(inputs),
"tags": tags,
"metadata": metadata,
"name": name,
}
if user_props:
event["userProps"] = user_props
self.__send_event(event)
input = _parse_input(inputs)
self.__track_event(
type,
"start",
user_id=user_id,
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
name=name,
input=input,
tags=tags,
metadata=metadata,
user_props=user_props,
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_chain_start: {e}")
warnings.warn(f"[LLMonitor] An error occurred in on_chain_start: {e}")
def on_chain_end(
self,
@ -457,14 +485,18 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
event = {
"event": "end",
"type": "chain",
"runId": str(run_id),
"output": _parse_output(outputs),
}
self.__send_event(event)
output = _parse_output(outputs)
self.__track_event(
"chain",
"end",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
output=output,
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_chain_end: {e}")
@ -476,16 +508,20 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
event = {
"event": "start",
"type": "tool",
"runId": str(run_id),
"parentRunId": str(parent_run_id) if parent_run_id else None,
"name": action.tool,
"input": _parse_input(action.tool_input),
}
self.__send_event(event)
name = action.tool
input = _parse_input(action.tool_input)
self.__track_event(
"tool",
"start",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
name=name,
input=input,
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_agent_action: {e}")
@ -497,15 +533,18 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
event = {
"event": "end",
"type": "agent",
"runId": str(run_id),
"parentRunId": str(parent_run_id) if parent_run_id else None,
"output": _parse_output(finish.return_values),
}
self.__send_event(event)
output = _parse_output(finish.return_values)
self.__track_event(
"agent",
"end",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
output=output,
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_agent_finish: {e}")
@ -517,15 +556,16 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
event = {
"event": "error",
"type": "chain",
"runId": str(run_id),
"parent_run_id": str(parent_run_id) if parent_run_id else None,
"error": {"message": str(error), "stack": traceback.format_exc()},
}
self.__send_event(event)
self.__track_event(
"chain",
"error",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
error={"message": str(error), "stack": traceback.format_exc()},
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_chain_error: {e}")
@ -537,15 +577,16 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
event = {
"event": "error",
"type": "tool",
"runId": str(run_id),
"parent_run_id": str(parent_run_id) if parent_run_id else None,
"error": {"message": str(error), "stack": traceback.format_exc()},
}
self.__send_event(event)
self.__track_event(
"tool",
"error",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
error={"message": str(error), "stack": traceback.format_exc()},
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_tool_error: {e}")
@ -557,15 +598,16 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
parent_run_id: Union[UUID, None] = None,
**kwargs: Any,
) -> Any:
if self.__has_valid_config is False:
return
try:
event = {
"event": "error",
"type": "llm",
"runId": str(run_id),
"parent_run_id": str(parent_run_id) if parent_run_id else None,
"error": {"message": str(error), "stack": traceback.format_exc()},
}
self.__send_event(event)
self.__track_event(
"llm",
"error",
run_id=str(run_id),
parent_run_id=str(parent_run_id) if parent_run_id else None,
error={"message": str(error), "stack": traceback.format_exc()},
)
except Exception as e:
logging.warning(f"[LLMonitor] An error occurred in on_llm_error: {e}")

Loading…
Cancel
Save