From 62c8a67f56f4a64c20fb374bff215d0d96588e6c Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Mon, 17 Jun 2024 20:34:01 -0700 Subject: [PATCH] community: add KafkaChatMessageHistory (#22216) Add chat history store based on Kafka. Files added: `libs/community/langchain_community/chat_message_histories/kafka.py` `docs/docs/integrations/memory/kafka_chat_message_history.ipynb` New issue to be created for future improvement: 1. Async method implementation. 2. Message retrieval based on timestamp. 3. Support for other configs when connecting to cloud hosted Kafka (e.g. add `api_key` field) 4. Improve unit testing & integration testing. --- .../memory/kafka_chat_message_history.ipynb | 245 ++++++++++++ .../chat_message_histories/__init__.py | 5 + .../chat_message_histories/kafka.py | 362 ++++++++++++++++++ .../chat_message_histories/test_imports.py | 1 + 4 files changed, 613 insertions(+) create mode 100644 docs/docs/integrations/memory/kafka_chat_message_history.ipynb create mode 100644 libs/community/langchain_community/chat_message_histories/kafka.py diff --git a/docs/docs/integrations/memory/kafka_chat_message_history.ipynb b/docs/docs/integrations/memory/kafka_chat_message_history.ipynb new file mode 100644 index 0000000000..f6d673d46a --- /dev/null +++ b/docs/docs/integrations/memory/kafka_chat_message_history.ipynb @@ -0,0 +1,245 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "c21deb80-9cf7-4185-8205-a38110152d2c", + "metadata": {}, + "source": [ + "# Kafka\n", + "\n", + "[Kafka](https://github.com/apache/kafka) is a distributed messaging system that is used to publish and subscribe to streams of records. \n", + "This demo shows how to use `KafkaChatMessageHistory` to store and retrieve chat messages from a Kafka cluster." + ] + }, + { + "cell_type": "markdown", + "id": "c7c4fc02-18ac-4285-b8d6-507357e2aa13", + "metadata": {}, + "source": [ + "A running Kafka cluster is required to run the demo. You can follow this [instruction](https://developer.confluent.io/get-started/python) to create a Kafka cluster locally." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f09f3b45-c4ff-4e59-bf79-238cc85d6465", + "metadata": {}, + "outputs": [], + "source": [ + "from langchain_community.chat_message_histories import KafkaChatMessageHistory\n", + "\n", + "chat_session_id = \"chat-message-history-kafka\"\n", + "bootstrap_servers = \"localhost:64797\" # host:port. `localhost:Plaintext Ports` if setup Kafka cluster locally\n", + "history = KafkaChatMessageHistory(\n", + " chat_session_id,\n", + " bootstrap_servers,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "109812d2-85c5-4a65-a8a0-2d16eb80347b", + "metadata": {}, + "source": [ + "Optional parameters to construct `KafkaChatMessageHistory`:\n", + " - `ttl_ms`: Time to live in milliseconds for the chat messages.\n", + " - `partition`: Number of partition of the topic to store the chat messages.\n", + " - `replication_factor`: Replication factor of the topic to store the chat messages." + ] + }, + { + "cell_type": "markdown", + "id": "c8fba39f-650b-4192-94ea-1a2a89f5348d", + "metadata": {}, + "source": [ + "`KafkaChatMessageHistory` internally uses Kafka consumer to read chat messages, and it has the ability to mark the consumed position persistently. It has following methods to retrieve chat messages:\n", + "- `messages`: continue consuming chat messages from last one.\n", + "- `messages_from_beginning`: reset the consumer to the beginning of the history and consume messages. Optional parameters:\n", + " 1. `max_message_count`: maximum number of messages to read.\n", + " 2. `max_time_sec`: maximum time in seconds to read messages.\n", + "- `messages_from_latest`: reset the consumer to the end of the chat history and try consuming messages. Optional parameters same as above.\n", + "- `messages_from_last_consumed`: return messages continuing from the last consumed message, similar to `messages`, but with optional parameters.\n", + "\n", + "`max_message_count` and `max_time_sec` are used to avoid blocking indefinitely when retrieving messages.\n", + "As a result, `messages` and other methods to retrieve messages may not return all messages in the chat history. You will need to specify `max_message_count` and `max_time_sec` to retrieve all chat history in a single batch.\n" + ] + }, + { + "cell_type": "markdown", + "id": "caf2176b-db7a-451a-a292-3d9fde585ded", + "metadata": {}, + "source": [ + "Add messages and retrieve." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "7e52a70d-3921-4614-b8cd-53b8d3c2deb4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[HumanMessage(content='hi!'), AIMessage(content='whats up?')]" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "history.add_user_message(\"hi!\")\n", + "history.add_ai_message(\"whats up?\")\n", + "\n", + "history.messages" + ] + }, + { + "cell_type": "markdown", + "id": "874ce388-da8f-4796-b9ca-3ac114195b10", + "metadata": {}, + "source": [ + "Calling `messages` again returns an empty list because the consumer is at the end of the chat history." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "f863618e-7da1-4f46-9182-7a1387b93b16", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "history.messages" + ] + }, + { + "cell_type": "markdown", + "id": "e108255b-c240-44f7-9ecc-52bf04cd15b6", + "metadata": {}, + "source": [ + "Add new messages and continue consuming." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "31aa7403-5392-4ad4-ba43-226020a274e3", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[HumanMessage(content='hi again!'), AIMessage(content='whats up again?')]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "history.add_user_message(\"hi again!\")\n", + "history.add_ai_message(\"whats up again?\")\n", + "history.messages" + ] + }, + { + "cell_type": "markdown", + "id": "5062fabc-c605-40dd-933b-c68de2727874", + "metadata": {}, + "source": [ + "To reset the consumer and read from beginning:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "005816ae-c8ed-4e41-9ecd-b6432578c8f1", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[HumanMessage(content='hi again!'),\n", + " AIMessage(content='whats up again?'),\n", + " HumanMessage(content='hi!'),\n", + " AIMessage(content='whats up?')]" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "history.messages_from_beginning()" + ] + }, + { + "cell_type": "markdown", + "id": "42cc7bed-5cd7-417f-94fd-fe2e511cc9c6", + "metadata": {}, + "source": [ + "Set the consumer to the end of the chat history, add a couple of new messages, and consume:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "d8b4f1cd-fa47-461b-b1b6-278ad54e9ac5", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[HumanMessage(content='HI!'), AIMessage(content='WHATS UP?')]" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "history.messages_from_latest()\n", + "history.add_user_message(\"HI!\")\n", + "history.add_ai_message(\"WHATS UP?\")\n", + "history.messages" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.18" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/libs/community/langchain_community/chat_message_histories/__init__.py b/libs/community/langchain_community/chat_message_histories/__init__.py index 99b4aae14f..fc20cacacc 100644 --- a/libs/community/langchain_community/chat_message_histories/__init__.py +++ b/libs/community/langchain_community/chat_message_histories/__init__.py @@ -43,6 +43,9 @@ if TYPE_CHECKING: from langchain_community.chat_message_histories.in_memory import ( ChatMessageHistory, ) + from langchain_community.chat_message_histories.kafka import ( + KafkaChatMessageHistory, + ) from langchain_community.chat_message_histories.momento import ( MomentoChatMessageHistory, ) @@ -109,6 +112,7 @@ __all__ = [ "XataChatMessageHistory", "ZepChatMessageHistory", "ZepCloudChatMessageHistory", + "KafkaChatMessageHistory", ] _module_lookup = { @@ -134,6 +138,7 @@ _module_lookup = { "XataChatMessageHistory": "langchain_community.chat_message_histories.xata", "ZepChatMessageHistory": "langchain_community.chat_message_histories.zep", "ZepCloudChatMessageHistory": "langchain_community.chat_message_histories.zep_cloud", # noqa: E501 + "KafkaChatMessageHistory": "langchain_community.chat_message_histories.kafka", } diff --git a/libs/community/langchain_community/chat_message_histories/kafka.py b/libs/community/langchain_community/chat_message_histories/kafka.py new file mode 100644 index 0000000000..9c171971cf --- /dev/null +++ b/libs/community/langchain_community/chat_message_histories/kafka.py @@ -0,0 +1,362 @@ +""" Kafka-based chat message history by using confluent-kafka-python. + confluent-kafka-python is under Apache 2.0 license. + https://github.com/confluentinc/confluent-kafka-python +""" +from __future__ import annotations + +import json +import logging +import time +from enum import Enum +from typing import TYPE_CHECKING, List, Optional, Sequence + +from langchain_core.chat_history import BaseChatMessageHistory +from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict + +if TYPE_CHECKING: + from confluent_kafka import TopicPartition + from confluent_kafka.admin import AdminClient + +logger = logging.getLogger(__name__) + +BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers" + +DEFAULT_TTL_MS = 604800000 # 7 days +DEFAULT_REPLICATION_FACTOR = 1 +DEFAULT_PARTITION = 3 + + +class ConsumeStartPosition(Enum): + """Consume start position for Kafka consumer to get chat history messages. + LAST_CONSUMED: Continue from the last consumed offset. + EARLIEST: Start consuming from the beginning. + LATEST: Start consuming from the latest offset. + """ + + LAST_CONSUMED = 1 + EARLIEST = 2 + LATEST = 3 + + +def ensure_topic_exists( + admin_client: AdminClient, + topic_name: str, + replication_factor: int, + partition: int, + ttl_ms: int, +) -> int: + """Create topic if it doesn't exist, and return the number of partitions. + If the topic already exists, we don't change the topic configuration. + """ + from confluent_kafka.admin import NewTopic + + try: + topic_metadata = admin_client.list_topics().topics + if topic_name in topic_metadata: + num_partitions = len(topic_metadata[topic_name].partitions) + logger.info( + f"Topic {topic_name} already exists with {num_partitions} partitions" + ) + return num_partitions + except Exception as e: + logger.error(f"Failed to list topics: {e}") + raise e + + topics = [ + NewTopic( + topic_name, + num_partitions=partition, + replication_factor=replication_factor, + config={"retention.ms": str(ttl_ms)}, + ) + ] + try: + futures = admin_client.create_topics(topics) + for _, f in futures.items(): + f.result() # result is None + logger.info(f"Topic {topic_name} created") + except Exception as e: + logger.error(f"Failed to create topic {topic_name}: {e}") + raise e + + return partition + + +class KafkaChatMessageHistory(BaseChatMessageHistory): + """Chat message history stored in Kafka. + + Setup: + Install ``confluent-kafka-python``. + + .. code-block:: bash + + pip install confluent_kafka + + Instantiate: + .. code-block:: python + + from langchain_community.chat_message_histories import KafkaChatMessageHistory + + history = KafkaChatMessageHistory( + session_id="your_session_id", + bootstrap_servers="host:port", + ) + + Add and retrieve messages: + .. code-block:: python + + # Add messages + history.add_messages([message1, message2, message3, ...]) + + # Retrieve messages + message_batch_0 = history.messages + + # retrieve messages after message_batch_0 + message_batch_1 = history.messages + + # Reset to beginning and retrieve messages + messages_from_beginning = history.messages_from_beginning() + + Retrieving messages is stateful. Internally, it uses Kafka consumer to read. + The consumed offset is maintained persistently. + + To retrieve messages, you can use the following methods: + - `messages`: + continue consuming chat messages from last one. + - `messages_from_beginning`: + reset the consumer to the beginning of the chat history and return messages. + Optional parameters: + 1. `max_message_count`: maximum number of messages to return. + 2. `max_time_sec`: maximum time in seconds to wait for messages. + - `messages_from_latest`: + reset to end of the chat history and try consuming messages. + Optional parameters same as above. + - `messages_from_last_consumed`: + continuing from the last consumed message, similar to `messages`. + Optional parameters same as above. + + `max_message_count` and `max_time_sec` are used to avoid blocking indefinitely + when retrieving messages. As a result, the method to retrieve messages may not + return all messages. Change `max_message_count` and `max_time_sec` to retrieve + all history messages. + """ # noqa: E501 + + def __init__( + self, + session_id: str, + bootstrap_servers: str, + ttl_ms: int = DEFAULT_TTL_MS, + replication_factor: int = DEFAULT_REPLICATION_FACTOR, + partition: int = DEFAULT_PARTITION, + ): + """ + Args: + session_id: The ID for single chat session. It is used as Kafka topic name. + bootstrap_servers: + Comma-separated host/port pairs to establish connection to Kafka cluster + https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers + ttl_ms: + Time-to-live (milliseconds) for automatic expiration of entries. + Default 7 days. -1 for no expiration. + It translates to https://kafka.apache.org/documentation.html#topicconfigs_retention.ms + replication_factor: The replication factor for the topic. Default 1. + partition: The number of partitions for the topic. Default 3. + """ + try: + from confluent_kafka import Producer + from confluent_kafka.admin import AdminClient + except (ImportError, ModuleNotFoundError): + raise ImportError( + "Could not import confluent_kafka package. " + "Please install it with `pip install confluent_kafka`." + ) + + self.session_id = session_id + self.bootstrap_servers = bootstrap_servers + self.admin_client = AdminClient({BOOTSTRAP_SERVERS_CONFIG: bootstrap_servers}) + self.num_partitions = ensure_topic_exists( + self.admin_client, session_id, replication_factor, partition, ttl_ms + ) + self.producer = Producer({BOOTSTRAP_SERVERS_CONFIG: bootstrap_servers}) + + def add_messages( + self, + messages: Sequence[BaseMessage], + flush_timeout_seconds: float = 5.0, + ) -> None: + """Add messages to the chat history by producing to the Kafka topic.""" + try: + for message in messages: + self.producer.produce( + topic=self.session_id, + value=json.dumps(message_to_dict(message)), + ) + message_remaining = self.producer.flush(flush_timeout_seconds) + if message_remaining > 0: + logger.warning(f"{message_remaining} messages are still in-flight.") + except Exception as e: + logger.error(f"Failed to add messages to Kafka: {e}") + raise e + + def __read_messages( + self, + consume_start_pos: ConsumeStartPosition, + max_message_count: Optional[int], + max_time_sec: Optional[float], + ) -> List[BaseMessage]: + """Retrieve messages from Kafka topic for the session. + Please note this method is stateful. Internally, it uses Kafka consumer + to consume messages, and maintains the consumed offset. + + Args: + consume_start_pos: Start position for Kafka consumer. + max_message_count: Maximum number of messages to consume. + max_time_sec: Time limit in seconds to consume messages. + Returns: + List of messages. + """ + from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, Consumer + + consumer_config = { + BOOTSTRAP_SERVERS_CONFIG: self.bootstrap_servers, + "group.id": self.session_id, + "auto.offset.reset": "latest" + if consume_start_pos == ConsumeStartPosition.LATEST + else "earliest", + } + + def assign_beginning( + assigned_consumer: Consumer, assigned_partitions: list[TopicPartition] + ) -> None: + for p in assigned_partitions: + p.offset = OFFSET_BEGINNING + assigned_consumer.assign(assigned_partitions) + + def assign_latest( + assigned_consumer: Consumer, assigned_partitions: list[TopicPartition] + ) -> None: + for p in assigned_partitions: + p.offset = OFFSET_END + assigned_consumer.assign(assigned_partitions) + + messages: List[dict] = [] + consumer = Consumer(consumer_config) + try: + if consume_start_pos == ConsumeStartPosition.EARLIEST: + consumer.subscribe([self.session_id], on_assign=assign_beginning) + elif consume_start_pos == ConsumeStartPosition.LATEST: + consumer.subscribe([self.session_id], on_assign=assign_latest) + else: + consumer.subscribe([self.session_id]) + start_time_sec = time.time() + while True: + if ( + max_time_sec is not None + and time.time() - start_time_sec > max_time_sec + ): + break + if max_message_count is not None and len(messages) >= max_message_count: + break + + message = consumer.poll(timeout=1.0) + if message is None: # poll timeout + continue + if message.error() is not None: # error + logger.error(f"Consumer error: {message.error()}") + continue + if message.value() is None: # empty value + logger.warning("Empty message value") + continue + messages.append(json.loads(message.value())) + except Exception as e: + logger.error(f"Failed to consume messages from Kafka: {e}") + raise e + finally: + consumer.close() + + return messages_from_dict(messages) + + def messages_from_beginning( + self, max_message_count: Optional[int] = 5, max_time_sec: Optional[float] = 5.0 + ) -> List[BaseMessage]: + """Retrieve messages from Kafka topic from the beginning. + This method resets the consumer to the beginning and consumes messages. + + Args: + max_message_count: Maximum number of messages to consume. + max_time_sec: Time limit in seconds to consume messages. + Returns: + List of messages. + """ + return self.__read_messages( + consume_start_pos=ConsumeStartPosition.EARLIEST, + max_message_count=max_message_count, + max_time_sec=max_time_sec, + ) + + def messages_from_latest( + self, max_message_count: Optional[int] = 5, max_time_sec: Optional[float] = 5.0 + ) -> List[BaseMessage]: + """Reset to the end offset. Try to consume messages if available. + + Args: + max_message_count: Maximum number of messages to consume. + max_time_sec: Time limit in seconds to consume messages. + Returns: + List of messages. + """ + + return self.__read_messages( + consume_start_pos=ConsumeStartPosition.LATEST, + max_message_count=max_message_count, + max_time_sec=max_time_sec, + ) + + def messages_from_last_consumed( + self, max_message_count: Optional[int] = 5, max_time_sec: Optional[float] = 5.0 + ) -> List[BaseMessage]: + """Retrieve messages from Kafka topic from the last consumed message. + Please note this method is stateful. Internally, it uses Kafka consumer + to consume messages, and maintains the commit offset. + + Args: + max_message_count: Maximum number of messages to consume. + max_time_sec: Time limit in seconds to consume messages. + Returns: + List of messages. + """ + + return self.__read_messages( + consume_start_pos=ConsumeStartPosition.LAST_CONSUMED, + max_message_count=max_message_count, + max_time_sec=max_time_sec, + ) + + @property + def messages(self) -> List[BaseMessage]: # type: ignore + """ + Retrieve the messages for the session, from Kafka topic continuously + from last consumed message. This method is stateful and maintains + consumed(committed) offset based on consumer group. + Alternatively, use messages_from_last_consumed() with specified parameters. + Use messages_from_beginning() to read from the earliest message. + Use messages_from_latest() to read from the latest message. + """ + return self.messages_from_last_consumed() + + def clear(self) -> None: + """Clear the chat history by deleting the Kafka topic.""" + try: + futures = self.admin_client.delete_topics([self.session_id]) + for _, f in futures.items(): + f.result() # result is None + logger.info(f"Topic {self.session_id} deleted") + except Exception as e: + logger.error(f"Failed to delete topic {self.session_id}: {e}") + raise e + + def close(self) -> None: + """Release the resources. + Nothing to be released at this moment. + """ + pass diff --git a/libs/community/tests/unit_tests/chat_message_histories/test_imports.py b/libs/community/tests/unit_tests/chat_message_histories/test_imports.py index a9f26ee7d2..4c14a0efd9 100644 --- a/libs/community/tests/unit_tests/chat_message_histories/test_imports.py +++ b/libs/community/tests/unit_tests/chat_message_histories/test_imports.py @@ -23,6 +23,7 @@ EXPECTED_ALL = [ "XataChatMessageHistory", "ZepChatMessageHistory", "ZepCloudChatMessageHistory", + "KafkaChatMessageHistory", ]