mirror of
https://github.com/hwchase17/langchain
synced 2024-11-10 01:10:59 +00:00
59d4b80a92
- Added ensure_ascii property to ElasticsearchChatMessageHistory <!-- Thank you for contributing to LangChain! Please title your PR "<package>: <description>", where <package> is whichever of langchain, community, core, experimental, etc. is being modified. Replace this entire comment with: - **Description:** a description of the change, - **Issue:** the issue # it fixes if applicable, - **Dependencies:** any dependencies required for this change, - **Twitter handle:** we announce bigger features on Twitter. If your PR gets announced, and you'd like a mention, we'll gladly shout you out! Please make sure your PR is passing linting and testing before submitting. Run `make format`, `make lint` and `make test` from the root of the package you've modified to check this locally. See contribution guidelines for more information on how to write/run tests, lint, etc: https://python.langchain.com/docs/contributing/ If you're adding a new integration, please include: 1. a test for the integration, preferably unit tests that do not rely on network access, 2. an example notebook showing its use. It lives in `docs/docs/integrations` directory. If no one reviews your PR within a few days, please @-mention one of @baskaryan, @eyurtsev, @hwchase17. --> --------- Co-authored-by: Ivan Chetverikov <ivan.chetverikov@raftds.com> Co-authored-by: Harrison Chase <hw.chase.17@gmail.com>
202 lines
6.7 KiB
Python
202 lines
6.7 KiB
Python
import json
|
|
import logging
|
|
from time import time
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
|
|
|
from langchain_core.chat_history import BaseChatMessageHistory
|
|
from langchain_core.messages import (
|
|
BaseMessage,
|
|
message_to_dict,
|
|
messages_from_dict,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from elasticsearch import Elasticsearch
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ElasticsearchChatMessageHistory(BaseChatMessageHistory):
|
|
"""Chat message history that stores history in Elasticsearch.
|
|
|
|
Args:
|
|
es_url: URL of the Elasticsearch instance to connect to.
|
|
es_cloud_id: Cloud ID of the Elasticsearch instance to connect to.
|
|
es_user: Username to use when connecting to Elasticsearch.
|
|
es_password: Password to use when connecting to Elasticsearch.
|
|
es_api_key: API key to use when connecting to Elasticsearch.
|
|
es_connection: Optional pre-existing Elasticsearch connection.
|
|
esnsure_ascii: Used to escape ASCII symbols in json.dumps. Defaults to True.
|
|
index: Name of the index to use.
|
|
session_id: Arbitrary key that is used to store the messages
|
|
of a single chat session.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
index: str,
|
|
session_id: str,
|
|
*,
|
|
es_connection: Optional["Elasticsearch"] = None,
|
|
es_url: Optional[str] = None,
|
|
es_cloud_id: Optional[str] = None,
|
|
es_user: Optional[str] = None,
|
|
es_api_key: Optional[str] = None,
|
|
es_password: Optional[str] = None,
|
|
esnsure_ascii: Optional[bool] = True,
|
|
):
|
|
self.index: str = index
|
|
self.session_id: str = session_id
|
|
self.ensure_ascii: bool = esnsure_ascii
|
|
|
|
# Initialize Elasticsearch client from passed client arg or connection info
|
|
if es_connection is not None:
|
|
self.client = es_connection.options(
|
|
headers={"user-agent": self.get_user_agent()}
|
|
)
|
|
elif es_url is not None or es_cloud_id is not None:
|
|
self.client = ElasticsearchChatMessageHistory.connect_to_elasticsearch(
|
|
es_url=es_url,
|
|
username=es_user,
|
|
password=es_password,
|
|
cloud_id=es_cloud_id,
|
|
api_key=es_api_key,
|
|
)
|
|
else:
|
|
raise ValueError(
|
|
"""Either provide a pre-existing Elasticsearch connection, \
|
|
or valid credentials for creating a new connection."""
|
|
)
|
|
|
|
if self.client.indices.exists(index=index):
|
|
logger.debug(
|
|
f"Chat history index {index} already exists, skipping creation."
|
|
)
|
|
else:
|
|
logger.debug(f"Creating index {index} for storing chat history.")
|
|
|
|
self.client.indices.create(
|
|
index=index,
|
|
mappings={
|
|
"properties": {
|
|
"session_id": {"type": "keyword"},
|
|
"created_at": {"type": "date"},
|
|
"history": {"type": "text"},
|
|
}
|
|
},
|
|
)
|
|
|
|
@staticmethod
|
|
def get_user_agent() -> str:
|
|
from langchain_community import __version__
|
|
|
|
return f"langchain-py-ms/{__version__}"
|
|
|
|
@staticmethod
|
|
def connect_to_elasticsearch(
|
|
*,
|
|
es_url: Optional[str] = None,
|
|
cloud_id: Optional[str] = None,
|
|
api_key: Optional[str] = None,
|
|
username: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
) -> "Elasticsearch":
|
|
try:
|
|
import elasticsearch
|
|
except ImportError:
|
|
raise ImportError(
|
|
"Could not import elasticsearch python package. "
|
|
"Please install it with `pip install elasticsearch`."
|
|
)
|
|
|
|
if es_url and cloud_id:
|
|
raise ValueError(
|
|
"Both es_url and cloud_id are defined. Please provide only one."
|
|
)
|
|
|
|
connection_params: Dict[str, Any] = {}
|
|
|
|
if es_url:
|
|
connection_params["hosts"] = [es_url]
|
|
elif cloud_id:
|
|
connection_params["cloud_id"] = cloud_id
|
|
else:
|
|
raise ValueError("Please provide either elasticsearch_url or cloud_id.")
|
|
|
|
if api_key:
|
|
connection_params["api_key"] = api_key
|
|
elif username and password:
|
|
connection_params["basic_auth"] = (username, password)
|
|
|
|
es_client = elasticsearch.Elasticsearch(
|
|
**connection_params,
|
|
headers={"user-agent": ElasticsearchChatMessageHistory.get_user_agent()},
|
|
)
|
|
try:
|
|
es_client.info()
|
|
except Exception as err:
|
|
logger.error(f"Error connecting to Elasticsearch: {err}")
|
|
raise err
|
|
|
|
return es_client
|
|
|
|
@property
|
|
def messages(self) -> List[BaseMessage]: # type: ignore[override]
|
|
"""Retrieve the messages from Elasticsearch"""
|
|
try:
|
|
from elasticsearch import ApiError
|
|
|
|
result = self.client.search(
|
|
index=self.index,
|
|
query={"term": {"session_id": self.session_id}},
|
|
sort="created_at:asc",
|
|
)
|
|
except ApiError as err:
|
|
logger.error(f"Could not retrieve messages from Elasticsearch: {err}")
|
|
raise err
|
|
|
|
if result and len(result["hits"]["hits"]) > 0:
|
|
items = [
|
|
json.loads(document["_source"]["history"])
|
|
for document in result["hits"]["hits"]
|
|
]
|
|
else:
|
|
items = []
|
|
|
|
return messages_from_dict(items)
|
|
|
|
def add_message(self, message: BaseMessage) -> None:
|
|
"""Add a message to the chat session in Elasticsearch"""
|
|
try:
|
|
from elasticsearch import ApiError
|
|
|
|
self.client.index(
|
|
index=self.index,
|
|
document={
|
|
"session_id": self.session_id,
|
|
"created_at": round(time() * 1000),
|
|
"history": json.dumps(
|
|
message_to_dict(message),
|
|
ensure_ascii=self.ensure_ascii,
|
|
),
|
|
},
|
|
refresh=True,
|
|
)
|
|
except ApiError as err:
|
|
logger.error(f"Could not add message to Elasticsearch: {err}")
|
|
raise err
|
|
|
|
def clear(self) -> None:
|
|
"""Clear session memory in Elasticsearch"""
|
|
try:
|
|
from elasticsearch import ApiError
|
|
|
|
self.client.delete_by_query(
|
|
index=self.index,
|
|
query={"term": {"session_id": self.session_id}},
|
|
refresh=True,
|
|
)
|
|
except ApiError as err:
|
|
logger.error(f"Could not clear session memory in Elasticsearch: {err}")
|
|
raise err
|