You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
langchain/libs/community/tests/integration_tests/memory/test_elasticsearch.py

92 lines
3.1 KiB
Python

import json
import os
import uuid
from typing import Generator, Union
import pytest
from langchain.memory import ConversationBufferMemory
from langchain_core.messages import message_to_dict
from langchain_community.chat_message_histories import ElasticsearchChatMessageHistory
"""
cd tests/integration_tests/memory/docker-compose
docker-compose -f elasticsearch.yml up
By default runs against local docker instance of Elasticsearch.
To run against Elastic Cloud, set the following environment variables:
- ES_CLOUD_ID
- ES_USERNAME
- ES_PASSWORD
"""
class TestElasticsearch:
@pytest.fixture(scope="class", autouse=True)
def elasticsearch_connection(self) -> Union[dict, Generator[dict, None, None]]: # type: ignore[return]
# Run this integration test against Elasticsearch on localhost,
# or an Elastic Cloud instance
from elasticsearch import Elasticsearch
es_url = os.environ.get("ES_URL", "http://localhost:9200")
es_cloud_id = os.environ.get("ES_CLOUD_ID")
es_username = os.environ.get("ES_USERNAME", "elastic")
es_password = os.environ.get("ES_PASSWORD", "changeme")
if es_cloud_id:
es = Elasticsearch(
cloud_id=es_cloud_id,
basic_auth=(es_username, es_password),
)
yield {
"es_cloud_id": es_cloud_id,
"es_user": es_username,
"es_password": es_password,
}
else:
# Running this integration test with local docker instance
es = Elasticsearch(hosts=es_url)
yield {"es_url": es_url}
# Clear all indexes
index_names = es.indices.get(index="_all").keys()
for index_name in index_names:
if index_name.startswith("test_"):
es.indices.delete(index=index_name)
es.indices.refresh(index="_all")
@pytest.fixture(scope="function")
def index_name(self) -> str:
"""Return the index name."""
return f"test_{uuid.uuid4().hex}"
def test_memory_with_message_store(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test the memory with a message store."""
# setup Elasticsearch as a message store
message_history = ElasticsearchChatMessageHistory(
**elasticsearch_connection, index=index_name, session_id="test-session"
)
memory = ConversationBufferMemory(
memory_key="baz", chat_memory=message_history, return_messages=True
)
# add some messages
memory.chat_memory.add_ai_message("This is me, the AI")
memory.chat_memory.add_user_message("This is me, the human")
# get the message history from the memory store and turn it into a json
messages = memory.chat_memory.messages
messages_json = json.dumps([message_to_dict(msg) for msg in messages])
assert "This is me, the AI" in messages_json
assert "This is me, the human" in messages_json
# remove the record from Elasticsearch, so the next test run won't pick it up
memory.chat_memory.clear()
assert memory.chat_memory.messages == []