community[patch]: Fixed bug in merging `generation_info` during chunk concatenation in Tongyi and ChatTongyi (#19014)

- **Description:** 

In #16218 , during the `GenerationChunk` and `ChatGenerationChunk`
concatenation, the `generation_info` merging changed from simple keys &
values replacement to using the util method
[`merge_dicts`](https://github.com/langchain-ai/langchain/blob/master/libs/core/langchain_core/utils/_merge.py):


![image](https://github.com/langchain-ai/langchain/assets/2098020/10f315bf-7fe0-43a7-a0ce-6a3834b99a15)

The `merge_dicts` method could not handle merging values of `int` or
some other types, and would raise a
[`TypeError`](https://github.com/langchain-ai/langchain/blob/master/libs/core/langchain_core/utils/_merge.py#L55).

This PR fixes this issue in the **Tongyi and ChatTongyi Model** by
adopting the `generation_info` of the last chunk
and discarding the `generation_info` of the intermediate chunks,
ensuring that `stream` and `astream` function correctly.

- **Issue:**  
    - Related issues or PRs about Tongyi & ChatTongyi: #16605, #17105 
    - Other models or cases: #18441, #17376
- **Dependencies:** No new dependencies
pull/19013/head
Shuai Liu 3 months ago committed by GitHub
parent f79d0cb9fb
commit c244e1a50b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -49,7 +49,11 @@ from tenacity import (
wait_exponential,
)
from langchain_community.llms.tongyi import check_response
from langchain_community.llms.tongyi import (
agenerate_with_last_element_mark,
check_response,
generate_with_last_element_mark,
)
logger = logging.getLogger(__name__)
@ -338,9 +342,13 @@ class ChatTongyi(BaseChatModel):
params: Dict[str, Any] = self._invocation_params(
messages=messages, stop=stop, stream=True, **kwargs
)
for stream_resp in self.stream_completion_with_retry(**params):
for stream_resp, is_last_chunk in generate_with_last_element_mark(
self.stream_completion_with_retry(**params)
):
chunk = ChatGenerationChunk(
**self._chat_generation_from_qwen_resp(stream_resp, is_chunk=True)
**self._chat_generation_from_qwen_resp(
stream_resp, is_chunk=True, is_last_chunk=is_last_chunk
)
)
if run_manager:
run_manager.on_llm_new_token(chunk.text, chunk=chunk)
@ -356,9 +364,13 @@ class ChatTongyi(BaseChatModel):
params: Dict[str, Any] = self._invocation_params(
messages=messages, stop=stop, stream=True, **kwargs
)
async for stream_resp in self.astream_completion_with_retry(**params):
async for stream_resp, is_last_chunk in agenerate_with_last_element_mark(
self.astream_completion_with_retry(**params)
):
chunk = ChatGenerationChunk(
**self._chat_generation_from_qwen_resp(stream_resp, is_chunk=True)
**self._chat_generation_from_qwen_resp(
stream_resp, is_chunk=True, is_last_chunk=is_last_chunk
)
)
if run_manager:
await run_manager.on_llm_new_token(chunk.text, chunk=chunk)
@ -398,18 +410,28 @@ class ChatTongyi(BaseChatModel):
@staticmethod
def _chat_generation_from_qwen_resp(
resp: Any, is_chunk: bool = False
resp: Any, is_chunk: bool = False, is_last_chunk: bool = True
) -> Dict[str, Any]:
# According to the response from dashscope,
# each chunk's `generation_info` overwrites the previous one.
# Besides, The `merge_dicts` method,
# which is used to concatenate `generation_info` in `GenerationChunk`,
# does not support merging of int type values.
# Therefore, we adopt the `generation_info` of the last chunk
# and discard the `generation_info` of the intermediate chunks.
choice = resp["output"]["choices"][0]
message = convert_dict_to_message(choice["message"], is_chunk=is_chunk)
return dict(
message=message,
generation_info=dict(
finish_reason=choice["finish_reason"],
request_id=resp["request_id"],
token_usage=dict(resp["usage"]),
),
)
if is_last_chunk:
return dict(
message=message,
generation_info=dict(
finish_reason=choice["finish_reason"],
request_id=resp["request_id"],
token_usage=dict(resp["usage"]),
),
)
else:
return dict(message=message)
@staticmethod
def _chunk_to_generation(chunk: ChatGenerationChunk) -> ChatGeneration:

@ -5,13 +5,17 @@ import functools
import logging
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Callable,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Tuple,
TypeVar,
)
from langchain_core.callbacks import (
@ -32,6 +36,7 @@ from tenacity import (
)
logger = logging.getLogger(__name__)
T = TypeVar("T")
def _create_retry_decorator(llm: Tongyi) -> Callable[[Any], Any]:
@ -122,6 +127,36 @@ async def astream_generate_with_retry(llm: Tongyi, **kwargs: Any) -> Any:
yield chunk
def generate_with_last_element_mark(iterable: Iterable[T]) -> Iterator[Tuple[T, bool]]:
"""Generate elements from an iterable,
and a boolean indicating if it is the last element."""
iterator = iter(iterable)
try:
item = next(iterator)
except StopIteration:
return
for next_item in iterator:
yield item, False
item = next_item
yield item, True
async def agenerate_with_last_element_mark(
iterable: AsyncIterable[T],
) -> AsyncIterator[Tuple[T, bool]]:
"""Generate elements from an async iterable,
and a boolean indicating if it is the last element."""
iterator = iterable.__aiter__()
try:
item = await iterator.__anext__()
except StopAsyncIteration:
return
async for next_item in iterator:
yield item, False
item = next_item
yield item, True
class Tongyi(BaseLLM):
"""Tongyi Qwen large language models.
@ -283,8 +318,12 @@ class Tongyi(BaseLLM):
params: Dict[str, Any] = self._invocation_params(
stop=stop, stream=True, **kwargs
)
for stream_resp in stream_generate_with_retry(self, prompt=prompt, **params):
chunk = GenerationChunk(**self._generation_from_qwen_resp(stream_resp))
for stream_resp, is_last_chunk in generate_with_last_element_mark(
stream_generate_with_retry(self, prompt=prompt, **params)
):
chunk = GenerationChunk(
**self._generation_from_qwen_resp(stream_resp, is_last_chunk)
)
if run_manager:
run_manager.on_llm_new_token(
chunk.text,
@ -303,10 +342,12 @@ class Tongyi(BaseLLM):
params: Dict[str, Any] = self._invocation_params(
stop=stop, stream=True, **kwargs
)
async for stream_resp in astream_generate_with_retry(
self, prompt=prompt, **params
async for stream_resp, is_last_chunk in agenerate_with_last_element_mark(
astream_generate_with_retry(self, prompt=prompt, **params)
):
chunk = GenerationChunk(**self._generation_from_qwen_resp(stream_resp))
chunk = GenerationChunk(
**self._generation_from_qwen_resp(stream_resp, is_last_chunk)
)
if run_manager:
await run_manager.on_llm_new_token(
chunk.text,
@ -327,15 +368,27 @@ class Tongyi(BaseLLM):
return params
@staticmethod
def _generation_from_qwen_resp(resp: Any) -> Dict[str, Any]:
return dict(
text=resp["output"]["text"],
generation_info=dict(
finish_reason=resp["output"]["finish_reason"],
request_id=resp["request_id"],
token_usage=dict(resp["usage"]),
),
)
def _generation_from_qwen_resp(
resp: Any, is_last_chunk: bool = True
) -> Dict[str, Any]:
# According to the response from dashscope,
# each chunk's `generation_info` overwrites the previous one.
# Besides, The `merge_dicts` method,
# which is used to concatenate `generation_info` in `GenerationChunk`,
# does not support merging of int type values.
# Therefore, we adopt the `generation_info` of the last chunk
# and discard the `generation_info` of the intermediate chunks.
if is_last_chunk:
return dict(
text=resp["output"]["text"],
generation_info=dict(
finish_reason=resp["output"]["finish_reason"],
request_id=resp["request_id"],
token_usage=dict(resp["usage"]),
),
)
else:
return dict(text=resp["output"]["text"])
@staticmethod
def _chunk_to_generation(chunk: GenerationChunk) -> Generation:

Loading…
Cancel
Save