@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
import functools
import json
import logging
from typing import (
Any ,
@ -12,6 +13,8 @@ from typing import (
List ,
Mapping ,
Optional ,
Sequence ,
Type ,
Union ,
cast ,
)
@ -20,6 +23,7 @@ from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun ,
CallbackManagerForLLMRun ,
)
from langchain_core . language_models import LanguageModelInput
from langchain_core . language_models . chat_models import BaseChatModel
from langchain_core . messages import (
AIMessage ,
@ -32,6 +36,8 @@ from langchain_core.messages import (
HumanMessageChunk ,
SystemMessage ,
SystemMessageChunk ,
ToolMessage ,
ToolMessageChunk ,
)
from langchain_core . output_parsers . openai_tools import (
make_invalid_tool_call ,
@ -42,8 +48,11 @@ from langchain_core.outputs import (
ChatGenerationChunk ,
ChatResult ,
)
from langchain_core . pydantic_v1 import Field , SecretStr , root_validator
from langchain_core . pydantic_v1 import BaseModel , Field , SecretStr , root_validator
from langchain_core . runnables import Runnable
from langchain_core . tools import BaseTool
from langchain_core . utils import convert_to_secret_str , get_from_dict_or_env
from langchain_core . utils . function_calling import convert_to_openai_tool
from requests . exceptions import HTTPError
from tenacity import (
before_sleep_log ,
@ -68,6 +77,7 @@ def convert_dict_to_message(
""" Convert a dict to a message. """
role = _dict [ " role " ]
content = _dict [ " content " ]
if role == " user " :
return (
HumanMessageChunk ( content = content )
@ -79,17 +89,39 @@ def convert_dict_to_message(
invalid_tool_calls = [ ]
if " tool_calls " in _dict :
additional_kwargs = { " tool_calls " : _dict [ " tool_calls " ] }
for raw_tool_call in _dict [ " tool_calls " ] :
try :
tool_calls . append ( parse_tool_call ( raw_tool_call , return_id = True ) )
except Exception as e :
invalid_tool_calls . append (
make_invalid_tool_call ( raw_tool_call , str ( e ) )
)
for index , value in enumerate ( _dict [ " tool_calls " ] ) :
if is_chunk :
try :
tool_calls . append (
{
" name " : value [ " function " ] . get ( " name " ) ,
" args " : value [ " function " ] . get ( " arguments " ) ,
" id " : value . get ( " id " ) ,
# Tongyi does not respond with index,
# use index in the list instead
" index " : index ,
}
)
except KeyError :
pass
else :
try :
parsed_tool = parse_tool_call ( value , return_id = True )
if parsed_tool :
tool_calls . append ( parsed_tool )
except Exception as e :
invalid_tool_calls . append ( make_invalid_tool_call ( value , str ( e ) ) )
else :
additional_kwargs = { }
return (
AIMessageChunk ( content = content )
AIMessageChunk (
content = content ,
additional_kwargs = additional_kwargs ,
tool_call_chunks = tool_calls ,
id = _dict . get ( " id " ) ,
)
if is_chunk
else AIMessage (
content = content ,
@ -104,6 +136,23 @@ def convert_dict_to_message(
if is_chunk
else SystemMessage ( content = content )
)
elif role == " tool " :
additional_kwargs = { }
if " name " in _dict :
additional_kwargs [ " name " ] = _dict [ " name " ]
return (
ToolMessageChunk (
content = _dict . get ( " content " , " " ) ,
tool_call_id = _dict . get ( " tool_call_id " ) ,
additional_kwargs = additional_kwargs ,
)
if is_chunk
else ToolMessage (
content = _dict . get ( " content " , " " ) ,
tool_call_id = _dict . get ( " tool_call_id " ) ,
additional_kwargs = additional_kwargs ,
)
)
else :
return (
ChatMessageChunk ( role = role , content = content )
@ -113,17 +162,23 @@ def convert_dict_to_message(
def convert_message_chunk_to_message ( message_chunk : BaseMessageChunk ) - > BaseMessage :
""" Convert a message chunk to a message. """
if isinstance ( message_chunk , HumanMessageChunk ) :
return HumanMessage ( content = message_chunk . content )
elif isinstance ( message_chunk , AIMessageChunk ) :
return AIMessage ( content = message_chunk . content )
elif isinstance ( message_chunk , SystemMessageChunk ) :
return SystemMessage ( content = message_chunk . content )
elif isinstance ( message_chunk , ChatMessageChunk ) :
return ChatMessage ( role = message_chunk . role , content = message_chunk . content )
else :
raise TypeError ( f " Got unknown type { message_chunk } " )
""" Convert a message chunk to a message.
Args :
chunk : Message chunk to convert .
Returns :
Message .
"""
if not isinstance ( message_chunk , BaseMessageChunk ) :
return message_chunk
# chunk classes always have the equivalent non-chunk class as their first parent
ignore_keys = [ " type " ]
if isinstance ( message_chunk , AIMessageChunk ) :
ignore_keys . append ( " tool_call_chunks " )
return message_chunk . __class__ . __mro__ [ 1 ] (
* * { k : v for k , v in message_chunk . __dict__ . items ( ) if k not in ignore_keys }
)
def convert_message_to_dict ( message : BaseMessage ) - > dict :
@ -136,8 +191,17 @@ def convert_message_to_dict(message: BaseMessage) -> dict:
message_dict = { " role " : " user " , " content " : message . content }
elif isinstance ( message , AIMessage ) :
message_dict = { " role " : " assistant " , " content " : message . content }
if " tool_calls " in message . additional_kwargs :
message_dict [ " tool_calls " ] = message . additional_kwargs [ " tool_calls " ]
elif isinstance ( message , SystemMessage ) :
message_dict = { " role " : " system " , " content " : message . content }
elif isinstance ( message , ToolMessage ) :
message_dict = {
" role " : " tool " ,
" tool_call_id " : message . tool_call_id ,
" content " : message . content ,
" name " : message . name ,
}
else :
raise TypeError ( f " Got unknown type { message } " )
return message_dict
@ -256,11 +320,57 @@ class ChatTongyi(BaseChatModel):
@retry_decorator
def _stream_completion_with_retry ( * * _kwargs : Any ) - > Any :
responses = self . client . call ( * * _kwargs )
prev_resp = None
for resp in responses :
yield check_response ( resp )
# If we are streaming without `incremental_output = True`,
# we need to calculate the delta response manually
if _kwargs . get ( " stream " ) and not _kwargs . get (
" incremental_output " , False
) :
if prev_resp is None :
delta_resp = resp
else :
delta_resp = self . subtract_client_response ( resp , prev_resp )
prev_resp = resp
yield check_response ( delta_resp )
else :
yield check_response ( resp )
return _stream_completion_with_retry ( * * kwargs )
def subtract_client_response ( self , resp : Any , prev_resp : Any ) - > Any :
""" Subtract prev response from curr response.
Useful when streaming without ` incremental_output = True `
"""
resp_copy = json . loads ( json . dumps ( resp ) )
choice = resp_copy [ " output " ] [ " choices " ] [ 0 ]
message = choice [ " message " ]
prev_resp_copy = json . loads ( json . dumps ( prev_resp ) )
prev_choice = prev_resp_copy [ " output " ] [ " choices " ] [ 0 ]
prev_message = prev_choice [ " message " ]
message [ " content " ] = message [ " content " ] . replace ( prev_message [ " content " ] , " " )
if message . get ( " tool_calls " ) :
for index , tool_call in enumerate ( message [ " tool_calls " ] ) :
function = tool_call [ " function " ]
if prev_message . get ( " tool_calls " ) :
prev_function = prev_message [ " tool_calls " ] [ index ] [ " function " ]
function [ " name " ] = function [ " name " ] . replace (
prev_function [ " name " ] , " "
)
function [ " arguments " ] = function [ " arguments " ] . replace (
prev_function [ " arguments " ] , " "
)
return resp_copy
async def astream_completion_with_retry ( self , * * kwargs : Any ) - > Any :
""" Because the dashscope SDK doesn ' t provide an async API,
we wrap ` stream_generate_with_retry ` with an async generator . """
@ -301,16 +411,16 @@ class ChatTongyi(BaseChatModel):
) - > ChatResult :
generations = [ ]
if self . streaming :
generation : Optional [ ChatGenerationChunk ] = None
generation _chunk : Optional [ ChatGenerationChunk ] = None
for chunk in self . _stream (
messages , stop = stop , run_manager = run_manager , * * kwargs
) :
if generation is None :
generation = chunk
if generation _chunk is None :
generation _chunk = chunk
else :
generation + = chunk
assert generation is not None
generations . append ( self . _chunk_to_generation ( generation ) )
generation _chunk + = chunk
assert generation _chunk is not None
generations . append ( self . _chunk_to_generation ( generation _chunk ) )
else :
params : Dict [ str , Any ] = self . _invocation_params (
messages = messages , stop = stop , * * kwargs
@ -373,9 +483,19 @@ class ChatTongyi(BaseChatModel):
params : Dict [ str , Any ] = self . _invocation_params (
messages = messages , stop = stop , stream = True , * * kwargs
)
for stream_resp , is_last_chunk in generate_with_last_element_mark (
self . stream_completion_with_retry ( * * params )
) :
choice = stream_resp [ " output " ] [ " choices " ] [ 0 ]
message = choice [ " message " ]
if (
choice [ " finish_reason " ] == " null "
and message [ " content " ] == " "
and " tool_calls " not in message
) :
continue
chunk = ChatGenerationChunk (
* * self . _chat_generation_from_qwen_resp (
stream_resp , is_chunk = True , is_last_chunk = is_last_chunk
@ -413,14 +533,13 @@ class ChatTongyi(BaseChatModel):
params = { * * self . _default_params , * * kwargs }
if stop is not None :
params [ " stop " ] = stop
if params . get ( " stream " ) :
# According to the Tongyi official docs,
# `incremental_output` with `tools` is not supported yet
if params . get ( " stream " ) and not params . get ( " tools " ) :
params [ " incremental_output " ] = True
message_dicts = [ convert_message_to_dict ( m ) for m in messages ]
# According to the docs, the last message should be a `user` message
if message_dicts [ - 1 ] [ " role " ] != " user " :
raise ValueError ( " Last message should be user message. " )
# And the `system` message should be the first message if present
system_message_indices = [
i for i , m in enumerate ( message_dicts ) if m [ " role " ] == " system "
@ -470,3 +589,22 @@ class ChatTongyi(BaseChatModel):
message = convert_message_chunk_to_message ( chunk . message ) ,
generation_info = chunk . generation_info ,
)
def bind_tools (
self ,
tools : Sequence [ Union [ Dict [ str , Any ] , Type [ BaseModel ] , Callable , BaseTool ] ] ,
* * kwargs : Any ,
) - > Runnable [ LanguageModelInput , BaseMessage ] :
""" Bind tool-like objects to this chat model.
Args :
tools : A list of tool definitions to bind to this chat model .
Can be a dictionary , pydantic model , callable , or BaseTool . Pydantic
models , callables , and BaseTools will be automatically converted to
their schema dictionary representation .
* * kwargs : Any additional parameters to pass to the
: class : ` ~ langchain . runnable . Runnable ` constructor .
"""
formatted_tools = [ convert_to_openai_tool ( tool ) for tool in tools ]
return super ( ) . bind ( tools = formatted_tools , * * kwargs )