langchain/libs/community/langchain_community/chat_message_histories/redis.py
Wu Enze 32c61b3ece
community[patch]: chat message history mypy fixes #17048 (#20114)
Relates [#17048]
Description : Applied fix to redis and neo4j file.

Error was : `Cannot override writeable attribute with read-only
property`

fix with the same solution of
[[langchain/libs/community/langchain_community/chat_message_histories/elasticsearch.py](d5c412b0a9/libs/community/langchain_community/chat_message_histories/elasticsearch.py (L170-L175))]

---------

Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
Co-authored-by: Chester Curme <chester.curme@gmail.com>
2024-05-06 22:17:45 +00:00

73 lines
2.1 KiB
Python

import json
import logging
from typing import List, Optional
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
BaseMessage,
message_to_dict,
messages_from_dict,
)
from langchain_community.utilities.redis import get_client
logger = logging.getLogger(__name__)
class RedisChatMessageHistory(BaseChatMessageHistory):
"""Chat message history stored in a Redis database."""
def __init__(
self,
session_id: str,
url: str = "redis://localhost:6379/0",
key_prefix: str = "message_store:",
ttl: Optional[int] = None,
):
try:
import redis
except ImportError:
raise ImportError(
"Could not import redis python package. "
"Please install it with `pip install redis`."
)
try:
self.redis_client = get_client(redis_url=url)
except redis.exceptions.ConnectionError as error:
logger.error(error)
self.session_id = session_id
self.key_prefix = key_prefix
self.ttl = ttl
@property
def key(self) -> str:
"""Construct the record key to use"""
return self.key_prefix + self.session_id
@property
def messages(self) -> List[BaseMessage]:
"""Retrieve the messages from Redis"""
_items = self.redis_client.lrange(self.key, 0, -1)
items = [json.loads(m.decode("utf-8")) for m in _items[::-1]]
messages = messages_from_dict(items)
return messages
@messages.setter
def messages(self, messages: List[BaseMessage]) -> None:
raise NotImplementedError(
"Direct assignment to 'messages' is not allowed."
" Use the 'add_messages' instead."
)
def add_message(self, message: BaseMessage) -> None:
"""Append the message to the record in Redis"""
self.redis_client.lpush(self.key, json.dumps(message_to_dict(message)))
if self.ttl:
self.redis_client.expire(self.key, self.ttl)
def clear(self) -> None:
"""Clear session memory from Redis"""
self.redis_client.delete(self.key)