@ -14,6 +14,70 @@ from langchain.schema.output import LLMResult
DEFAULT_API_URL = " https://app.llmonitor.com "
DEFAULT_API_URL = " https://app.llmonitor.com "
def _serialize ( obj : Any ) - > Union [ Dict [ str , Any ] , List [ Any ] , Any ] :
if hasattr ( obj , " to_json " ) :
return obj . to_json ( )
if isinstance ( obj , dict ) :
return { key : _serialize ( value ) for key , value in obj . items ( ) }
if isinstance ( obj , list ) :
return [ _serialize ( element ) for element in obj ]
return obj
def _parse_input ( raw_input : Any ) - > Any :
if not raw_input :
return None
if not isinstance ( raw_input , dict ) :
return _serialize ( raw_input )
input_value = raw_input . get ( " input " )
inputs_value = raw_input . get ( " inputs " )
question_value = raw_input . get ( " question " )
query_value = raw_input . get ( " query " )
if input_value :
return input_value
if inputs_value :
return inputs_value
if question_value :
return question_value
if query_value :
return query_value
return _serialize ( raw_input )
def _parse_output ( raw_output : dict ) - > Any :
if not raw_output :
return None
if not isinstance ( raw_output , dict ) :
return _serialize ( raw_output )
text_value = raw_output . get ( " text " )
output_value = raw_output . get ( " output " )
output_text_value = raw_output . get ( " output_text " )
answer_value = raw_output . get ( " answer " )
result_value = raw_output . get ( " result " )
if text_value :
return text_value
if answer_value :
return answer_value
if output_value :
return output_value
if output_text_value :
return output_text_value
if result_value :
return result_value
return _serialize ( raw_output )
def _parse_lc_role (
def _parse_lc_role (
role : str ,
role : str ,
) - > Union [ Literal [ " user " , " ai " , " system " , " function " ] , None ] :
) - > Union [ Literal [ " user " , " ai " , " system " , " function " ] , None ] :
@ -29,8 +93,27 @@ def _parse_lc_role(
return None
return None
def _serialize_lc_message ( message : BaseMessage ) - > Dict [ str , Any ] :
def _get_user_id ( metadata : Any ) - > Any :
return { " text " : message . content , " role " : _parse_lc_role ( message . type ) }
metadata = metadata or { }
user_id = metadata . get ( " user_id " )
if user_id is None :
user_id = metadata . get ( " userId " )
return user_id
def _parse_lc_message ( message : BaseMessage ) - > Dict [ str , Any ] :
parsed = { " text " : message . content , " role " : _parse_lc_role ( message . type ) }
function_call = ( message . additional_kwargs or { } ) . get ( " function_call " )
if function_call is not None :
parsed [ " functionCall " ] = function_call
return parsed
def _parse_lc_messages ( messages : Union [ List [ BaseMessage ] , Any ] ) - > List [ Dict [ str , Any ] ] :
return [ _parse_lc_message ( message ) for message in messages ]
class LLMonitorCallbackHandler ( BaseCallbackHandler ) :
class LLMonitorCallbackHandler ( BaseCallbackHandler ) :
@ -62,14 +145,20 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
__api_url : str
__api_url : str
__app_id : str
__app_id : str
__verbose : bool
def __init__ (
def __init__ (
self , app_id : Union [ str , None ] = None , api_url : Union [ str , None ] = None
self ,
app_id : Union [ str , None ] = None ,
api_url : Union [ str , None ] = None ,
verbose : bool = False ,
) - > None :
) - > None :
super ( ) . __init__ ( )
super ( ) . __init__ ( )
self . __api_url = api_url or os . getenv ( " LLMONITOR_API_URL " ) or DEFAULT_API_URL
self . __api_url = api_url or os . getenv ( " LLMONITOR_API_URL " ) or DEFAULT_API_URL
self . __verbose = verbose or bool ( os . getenv ( " LLMONITOR_VERBOSE " ) )
_app_id = app_id or os . getenv ( " LLMONITOR_APP_ID " )
_app_id = app_id or os . getenv ( " LLMONITOR_APP_ID " )
if _app_id is None :
if _app_id is None :
raise ValueError (
raise ValueError (
@ -89,7 +178,12 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
def __send_event ( self , event : Dict [ str , Any ] ) - > None :
def __send_event ( self , event : Dict [ str , Any ] ) - > None :
headers = { " Content-Type " : " application/json " }
headers = { " Content-Type " : " application/json " }
event = { * * event , " app " : self . __app_id , " timestamp " : str ( datetime . utcnow ( ) ) }
event = { * * event , " app " : self . __app_id , " timestamp " : str ( datetime . utcnow ( ) ) }
if self . __verbose :
print ( " llmonitor_callback " , event )
data = { " events " : event }
data = { " events " : event }
requests . post ( headers = headers , url = f " { self . __api_url } /api/report " , json = data )
requests . post ( headers = headers , url = f " { self . __api_url } /api/report " , json = data )
@ -110,7 +204,7 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
" userId " : ( metadata or { } ) . get ( " userId " ) ,
" userId " : ( metadata or { } ) . get ( " userId " ) ,
" runId " : str ( run_id ) ,
" runId " : str ( run_id ) ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" input " : prompts[ 0 ] ,
" input " : _parse_input( prompts ) ,
" name " : kwargs . get ( " invocation_params " , { } ) . get ( " model_name " ) ,
" name " : kwargs . get ( " invocation_params " , { } ) . get ( " model_name " ) ,
" tags " : tags ,
" tags " : tags ,
" metadata " : metadata ,
" metadata " : metadata ,
@ -128,13 +222,15 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
metadata : Union [ Dict [ str , Any ] , None ] = None ,
metadata : Union [ Dict [ str , Any ] , None ] = None ,
* * kwargs : Any ,
* * kwargs : Any ,
) - > Any :
) - > Any :
user_id = _get_user_id ( metadata )
event = {
event = {
" event " : " start " ,
" event " : " start " ,
" type " : " llm " ,
" type " : " llm " ,
" userId " : ( metadata or { } ) . get ( " userId " ) ,
" userId " : user_id ,
" runId " : str ( run_id ) ,
" runId " : str ( run_id ) ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" input " : [ _ serializ e_lc_message( message [ 0 ] ) for message in messages ] ,
" input " : _ par se_lc_messages ( message s [ 0 ] ) ,
" name " : kwargs . get ( " invocation_params " , { } ) . get ( " model_name " ) ,
" name " : kwargs . get ( " invocation_params " , { } ) . get ( " model_name " ) ,
" tags " : tags ,
" tags " : tags ,
" metadata " : metadata ,
" metadata " : metadata ,
@ -151,36 +247,26 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
) - > None :
) - > None :
token_usage = ( response . llm_output or { } ) . get ( " token_usage " , { } )
token_usage = ( response . llm_output or { } ) . get ( " token_usage " , { } )
parsed_output = _parse_lc_messages (
map (
lambda o : o . message if hasattr ( o , " message " ) else None ,
response . generations [ 0 ] ,
)
)
event = {
event = {
" event " : " end " ,
" event " : " end " ,
" type " : " llm " ,
" type " : " llm " ,
" runId " : str ( run_id ) ,
" runId " : str ( run_id ) ,
" parent_run_id " : str ( parent_run_id ) if parent_run_id else None ,
" parent_run_id " : str ( parent_run_id ) if parent_run_id else None ,
" output " : { " text " : response . generations [ 0 ] [ 0 ] . text , " role " : " ai " } ,
" output " : parsed_output ,
" tokensUsage " : {
" tokensUsage " : {
" prompt " : token_usage . get ( " prompt_tokens " , 0 ),
" prompt " : token_usage . get ( " prompt_tokens " ),
" completion " : token_usage . get ( " completion_tokens " , 0 ),
" completion " : token_usage . get ( " completion_tokens " ),
} ,
} ,
}
}
self . __send_event ( event )
self . __send_event ( event )
def on_llm_error (
self ,
error : Union [ Exception , KeyboardInterrupt ] ,
* ,
run_id : UUID ,
parent_run_id : Union [ UUID , None ] = None ,
* * kwargs : Any ,
) - > Any :
event = {
" event " : " error " ,
" type " : " llm " ,
" runId " : str ( run_id ) ,
" parent_run_id " : str ( parent_run_id ) if parent_run_id else None ,
" error " : { " message " : str ( error ) , " stack " : traceback . format_exc ( ) } ,
}
self . __send_event ( event )
def on_tool_start (
def on_tool_start (
self ,
self ,
serialized : Dict [ str , Any ] ,
serialized : Dict [ str , Any ] ,
@ -192,10 +278,11 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
metadata : Union [ Dict [ str , Any ] , None ] = None ,
metadata : Union [ Dict [ str , Any ] , None ] = None ,
* * kwargs : Any ,
* * kwargs : Any ,
) - > None :
) - > None :
user_id = _get_user_id ( metadata )
event = {
event = {
" event " : " start " ,
" event " : " start " ,
" type " : " tool " ,
" type " : " tool " ,
" userId " : ( metadata or { } ) . get ( " userId " ) ,
" userId " : user_id ,
" runId " : str ( run_id ) ,
" runId " : str ( run_id ) ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" name " : serialized . get ( " name " ) ,
" name " : serialized . get ( " name " ) ,
@ -236,25 +323,34 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
) - > Any :
) - > Any :
name = serialized . get ( " id " , [ None , None , None , None ] ) [ 3 ]
name = serialized . get ( " id " , [ None , None , None , None ] ) [ 3 ]
type = " chain "
type = " chain "
metadata = metadata or { }
agentName = metadata . get ( " agent_name " )
if agentName is None :
agentName = metadata . get ( " agentName " )
agentName = ( metadata or { } ) . get ( " agentName " )
if agentName is not None :
if agentName is not None :
type = " agent "
type = " agent "
name = agentName
name = agentName
if name == " AgentExecutor " or name == " PlanAndExecute " :
if name == " AgentExecutor " or name == " PlanAndExecute " :
type = " agent "
type = " agent "
if parent_run_id is not None :
type = " chain "
user_id = _get_user_id ( metadata )
event = {
event = {
" event " : " start " ,
" event " : " start " ,
" type " : type ,
" type " : type ,
" userId " : ( metadata or { } ) . get ( " userId " ) ,
" userId " : user_id ,
" runId " : str ( run_id ) ,
" runId " : str ( run_id ) ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" input " : inputs. get ( " input " , inputs ) ,
" input " : _parse_input( inputs ) ,
" tags " : tags ,
" tags " : tags ,
" metadata " : metadata ,
" metadata " : metadata ,
" name " : serialized. get ( " id " , [ No ne, None , None , None ] ) [ 3 ] ,
" name " : nam e,
}
}
self . __send_event ( event )
self . __send_event ( event )
def on_chain_end (
def on_chain_end (
@ -269,7 +365,42 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
" event " : " end " ,
" event " : " end " ,
" type " : " chain " ,
" type " : " chain " ,
" runId " : str ( run_id ) ,
" runId " : str ( run_id ) ,
" output " : outputs . get ( " output " , outputs ) ,
" output " : _parse_output ( outputs ) ,
}
self . __send_event ( event )
def on_agent_action (
self ,
action : AgentAction ,
* ,
run_id : UUID ,
parent_run_id : Union [ UUID , None ] = None ,
* * kwargs : Any ,
) - > Any :
event = {
" event " : " start " ,
" type " : " tool " ,
" runId " : str ( run_id ) ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" name " : action . tool ,
" input " : _parse_input ( action . tool_input ) ,
}
self . __send_event ( event )
def on_agent_finish (
self ,
finish : AgentFinish ,
* ,
run_id : UUID ,
parent_run_id : Union [ UUID , None ] = None ,
* * kwargs : Any ,
) - > Any :
event = {
" event " : " end " ,
" type " : " agent " ,
" runId " : str ( run_id ) ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" output " : _parse_output ( finish . return_values ) ,
}
}
self . __send_event ( event )
self . __send_event ( event )
@ -290,38 +421,37 @@ class LLMonitorCallbackHandler(BaseCallbackHandler):
}
}
self . __send_event ( event )
self . __send_event ( event )
def on_ agent_action (
def on_ tool_error (
self ,
self ,
action: AgentAction ,
error: Union [ Exception , KeyboardInterrupt ] ,
* ,
* ,
run_id : UUID ,
run_id : UUID ,
parent_run_id : Union [ UUID , None ] = None ,
parent_run_id : Union [ UUID , None ] = None ,
* * kwargs : Any ,
* * kwargs : Any ,
) - > Any :
) - > Any :
event = {
event = {
" event " : " start " ,
" event " : " error " ,
" type " : " tool " ,
" type " : " tool " ,
" runId " : str ( run_id ) ,
" runId " : str ( run_id ) ,
" parentRunId " : str ( parent_run_id ) if parent_run_id else None ,
" parent_run_id " : str ( parent_run_id ) if parent_run_id else None ,
" name " : action . tool ,
" error " : { " message " : str ( error ) , " stack " : traceback . format_exc ( ) } ,
" input " : action . tool_input ,
}
}
self . __send_event ( event )
self . __send_event ( event )
def on_ agent_finish (
def on_ llm_error (
self ,
self ,
finish: AgentFinish ,
error: Union [ Exception , KeyboardInterrupt ] ,
* ,
* ,
run_id : UUID ,
run_id : UUID ,
parent_run_id : Union [ UUID , None ] = None ,
parent_run_id : Union [ UUID , None ] = None ,
* * kwargs : Any ,
* * kwargs : Any ,
) - > Any :
) - > Any :
event = {
event = {
" event " : " e nd " ,
" event " : " e rror " ,
" type " : " agent " ,
" type " : " llm " ,
" runId " : str ( run_id ) ,
" runId " : str ( run_id ) ,
" parent RunI d" : str ( parent_run_id ) if parent_run_id else None ,
" parent _run_i d" : str ( parent_run_id ) if parent_run_id else None ,
" output" : finish . return_values ,
" error" : { " message " : str ( error ) , " stack " : traceback . format_exc ( ) } ,
}
}
self . __send_event ( event )
self . __send_event ( event )