From 12b4ee1fc7ff2a5708347a611f23a8dfaca1959b Mon Sep 17 00:00:00 2001 From: Harrison Chase Date: Sun, 14 May 2023 22:04:27 -0700 Subject: [PATCH] Harrison/telegram chat loader (#4698) Co-authored-by: Akinwande Komolafe <47945512+Sensei-akin@users.noreply.github.com> Co-authored-by: Akinwande Komolafe --- .../document_loaders/examples/telegram.ipynb | 48 +++- langchain/document_loaders/__init__.py | 12 +- langchain/document_loaders/telegram.py | 209 +++++++++++++++++- .../document_loaders/test_telegram.py | 6 +- 4 files changed, 263 insertions(+), 12 deletions(-) diff --git a/docs/modules/indexes/document_loaders/examples/telegram.ipynb b/docs/modules/indexes/document_loaders/examples/telegram.ipynb index 20f7d46b..bf54fc97 100644 --- a/docs/modules/indexes/document_loaders/examples/telegram.ipynb +++ b/docs/modules/indexes/document_loaders/examples/telegram.ipynb @@ -19,7 +19,7 @@ "metadata": {}, "outputs": [], "source": [ - "from langchain.document_loaders import TelegramChatLoader" + "from langchain.document_loaders import TelegramChatFileLoader, TelegramChatApiLoader" ] }, { @@ -29,7 +29,7 @@ "metadata": {}, "outputs": [], "source": [ - "loader = TelegramChatLoader(\"example_data/telegram.json\")" + "loader = TelegramChatFileLoader(\"example_data/telegram.json\")" ] }, { @@ -41,7 +41,7 @@ { "data": { "text/plain": [ - "[Document(page_content=\"Henry on 2020-01-01T00:00:02: It's 2020...\\n\\nHenry on 2020-01-01T00:00:04: Fireworks!\\n\\nGrace 🧤 ðŸ\\x8d’ on 2020-01-01T00:00:05: You're a minute late!\\n\\n\", lookup_str='', metadata={'source': 'example_data/telegram.json'}, lookup_index=0)]" + "[Document(page_content=\"Henry on 2020-01-01T00:00:02: It's 2020...\\n\\nHenry on 2020-01-01T00:00:04: Fireworks!\\n\\nGrace 🧤 ðŸ\\x8d’ on 2020-01-01T00:00:05: You're a minute late!\\n\\n\", metadata={'source': 'example_data/telegram.json'})]" ] }, "execution_count": 3, @@ -53,10 +53,45 @@ "loader.load()" ] }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "3e64cac2", + "metadata": {}, + "source": [ + "`TelegramChatApiLoader` loads data directly from any specified channel from Telegram. In order to export the data, you will need to authenticate your Telegram account. \n", + "\n", + "You can get the API_HASH and API_ID from https://my.telegram.org/auth?to=apps\n", + "\n" + ] + }, { "cell_type": "code", "execution_count": null, - "id": "3e64cac2", + "id": "f05f75f3", + "metadata": {}, + "outputs": [], + "source": [ + "loader = TelegramChatApiLoader(user_name =\"\"\\\n", + " chat_url=\"\",\\\n", + " api_hash=\"\",\\\n", + " api_id=\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "40039f7b", + "metadata": {}, + "outputs": [], + "source": [ + "loader.load()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18e5af2b", "metadata": {}, "outputs": [], "source": [] @@ -78,7 +113,10 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.6" + + "version": "3.9.13" + + } }, "nbformat": 4, diff --git a/langchain/document_loaders/__init__.py b/langchain/document_loaders/__init__.py index d408add3..1b8aa3cb 100644 --- a/langchain/document_loaders/__init__.py +++ b/langchain/document_loaders/__init__.py @@ -79,7 +79,10 @@ from langchain.document_loaders.slack_directory import SlackDirectoryLoader from langchain.document_loaders.spreedly import SpreedlyLoader from langchain.document_loaders.srt import SRTLoader from langchain.document_loaders.stripe import StripeLoader -from langchain.document_loaders.telegram import TelegramChatLoader +from langchain.document_loaders.telegram import ( + TelegramChatApiLoader, + TelegramChatFileLoader, +) from langchain.document_loaders.text import TextLoader from langchain.document_loaders.toml import TomlLoader from langchain.document_loaders.twitter import TwitterTweetLoader @@ -108,6 +111,9 @@ from langchain.document_loaders.youtube import ( # Legacy: only for backwards compat. Use PyPDFLoader instead PagedPDFSplitter = PyPDFLoader +# For backwards compatability +TelegramChatLoader = TelegramChatFileLoader + __all__ = [ "AZLyricsLoader", "AirbyteJSONLoader", @@ -176,9 +182,10 @@ __all__ = [ "SeleniumURLLoader", "SitemapLoader", "SlackDirectoryLoader", + "TelegramChatFileLoader", + "TelegramChatApiLoader", "SpreedlyLoader", "StripeLoader", - "TelegramChatLoader", "TextLoader", "TomlLoader", "TwitterTweetLoader", @@ -201,4 +208,5 @@ __all__ = [ "WhatsAppChatLoader", "WikipediaLoader", "YoutubeLoader", + "TelegramChatLoader", ] diff --git a/langchain/document_loaders/telegram.py b/langchain/document_loaders/telegram.py index db304095..6b9b8921 100644 --- a/langchain/document_loaders/telegram.py +++ b/langchain/document_loaders/telegram.py @@ -1,10 +1,17 @@ """Loader that loads Telegram chat json dump.""" +from __future__ import annotations + +import asyncio import json from pathlib import Path -from typing import List +from typing import TYPE_CHECKING, Dict, List, Optional, Union from langchain.docstore.document import Document from langchain.document_loaders.base import BaseLoader +from langchain.text_splitter import RecursiveCharacterTextSplitter + +if TYPE_CHECKING: + import pandas as pd def concatenate_rows(row: dict) -> str: @@ -15,7 +22,7 @@ def concatenate_rows(row: dict) -> str: return f"{sender} on {date}: {text}\n\n" -class TelegramChatLoader(BaseLoader): +class TelegramChatFileLoader(BaseLoader): """Loader that loads Telegram chat json directory dump.""" def __init__(self, path: str): @@ -37,3 +44,201 @@ class TelegramChatLoader(BaseLoader): metadata = {"source": str(p)} return [Document(page_content=text, metadata=metadata)] + + +def text_to_docs(text: Union[str, List[str]]) -> List[Document]: + """Converts a string or list of strings to a list of Documents with metadata.""" + if isinstance(text, str): + # Take a single string as one page + text = [text] + page_docs = [Document(page_content=page) for page in text] + + # Add page numbers as metadata + for i, doc in enumerate(page_docs): + doc.metadata["page"] = i + 1 + + # Split pages into chunks + doc_chunks = [] + + for doc in page_docs: + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=800, + separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""], + chunk_overlap=20, + ) + chunks = text_splitter.split_text(doc.page_content) + for i, chunk in enumerate(chunks): + doc = Document( + page_content=chunk, metadata={"page": doc.metadata["page"], "chunk": i} + ) + # Add sources a metadata + doc.metadata["source"] = f"{doc.metadata['page']}-{doc.metadata['chunk']}" + doc_chunks.append(doc) + return doc_chunks + + +class TelegramChatApiLoader(BaseLoader): + """Loader that loads Telegram chat json directory dump.""" + + def __init__( + self, + chat_url: Optional[str] = None, + api_id: Optional[int] = None, + api_hash: Optional[str] = None, + username: Optional[str] = None, + ): + """Initialize with API parameters.""" + self.chat_url = chat_url + self.api_id = api_id + self.api_hash = api_hash + self.username = username + + async def fetch_data_from_telegram(self) -> None: + """Fetch data from Telegram API and save it as a JSON file.""" + from telethon.sync import TelegramClient + + data = [] + async with TelegramClient(self.username, self.api_id, self.api_hash) as client: + async for message in client.iter_messages(self.chat_url): + is_reply = message.reply_to is not None + reply_to_id = message.reply_to.reply_to_msg_id if is_reply else None + data.append( + { + "sender_id": message.sender_id, + "text": message.text, + "date": message.date.isoformat(), + "message.id": message.id, + "is_reply": is_reply, + "reply_to_id": reply_to_id, + } + ) + + with open("telegram_data.json", "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=4) + + self.file_path = "telegram_data.json" + + def _get_message_threads(self, data: pd.DataFrame) -> dict: + """Create a dictionary of message threads from the given data. + + Args: + data (pd.DataFrame): A DataFrame containing the conversation \ + data with columns: + - message.sender_id + - text + - date + - message.id + - is_reply + - reply_to_id + + Returns: + dict: A dictionary where the key is the parent message ID and \ + the value is a list of message IDs in ascending order. + """ + + def find_replies(parent_id: int, reply_data: pd.DataFrame) -> List[int]: + """ + Recursively find all replies to a given parent message ID. + + Args: + parent_id (int): The parent message ID. + reply_data (pd.DataFrame): A DataFrame containing reply messages. + + Returns: + list: A list of message IDs that are replies to the parent message ID. + """ + # Find direct replies to the parent message ID + direct_replies = reply_data[reply_data["reply_to_id"] == parent_id][ + "message.id" + ].tolist() + + # Recursively find replies to the direct replies + all_replies = [] + for reply_id in direct_replies: + all_replies += [reply_id] + find_replies(reply_id, reply_data) + + return all_replies + + # Filter out parent messages + parent_messages = data[data["is_reply"] is False] + + # Filter out reply messages and drop rows with NaN in 'reply_to_id' + reply_messages = data[data["is_reply"] is True].dropna(subset=["reply_to_id"]) + + # Convert 'reply_to_id' to integer + reply_messages["reply_to_id"] = reply_messages["reply_to_id"].astype(int) + + # Create a dictionary of message threads with parent message IDs as keys and \ + # lists of reply message IDs as values + message_threads = { + parent_id: [parent_id] + find_replies(parent_id, reply_messages) + for parent_id in parent_messages["message.id"] + } + + return message_threads + + def _combine_message_texts( + self, message_threads: Dict[int, List[int]], data: pd.DataFrame + ) -> str: + """ + Combine the message texts for each parent message ID based \ + on the list of message threads. + + Args: + message_threads (dict): A dictionary where the key is the parent message \ + ID and the value is a list of message IDs in ascending order. + data (pd.DataFrame): A DataFrame containing the conversation data: + - message.sender_id + - text + - date + - message.id + - is_reply + - reply_to_id + + Returns: + str: A combined string of message texts sorted by date. + """ + combined_text = "" + + # Iterate through sorted parent message IDs + for parent_id, message_ids in message_threads.items(): + # Get the message texts for the message IDs and sort them by date + message_texts = ( + data[data["message.id"].isin(message_ids)] + .sort_values(by="date")["text"] + .tolist() + ) + message_texts = [str(elem) for elem in message_texts] + + # Combine the message texts + combined_text += " ".join(message_texts) + ".\n" + + return combined_text.strip() + + def load(self) -> List[Document]: + """Load documents.""" + if self.chat_url is not None: + try: + import nest_asyncio + import pandas as pd + + nest_asyncio.apply() + asyncio.run(self.fetch_data_from_telegram()) + except ImportError: + raise ValueError( + "please install with `pip install nest_asyncio`,\ + `pip install nest_asyncio` " + ) + + p = Path(self.file_path) + + with open(p, encoding="utf8") as f: + d = json.load(f) + + normalized_messages = pd.json_normalize(d) + df = pd.DataFrame(normalized_messages) + + message_threads = self._get_message_threads(df) + combined_texts = self._combine_message_texts(message_threads, df) + + return text_to_docs(combined_texts) diff --git a/tests/integration_tests/document_loaders/test_telegram.py b/tests/integration_tests/document_loaders/test_telegram.py index 05e2f051..5b07abbe 100644 --- a/tests/integration_tests/document_loaders/test_telegram.py +++ b/tests/integration_tests/document_loaders/test_telegram.py @@ -1,12 +1,12 @@ from pathlib import Path -from langchain.document_loaders import TelegramChatLoader +from langchain.document_loaders import TelegramChatFileLoader -def test_telegram_chat_loader() -> None: +def test_telegram_chat_file_loader() -> None: """Test TelegramChatLoader.""" file_path = Path(__file__).parent.parent / "examples/telegram.json" - loader = TelegramChatLoader(str(file_path)) + loader = TelegramChatFileLoader(str(file_path)) docs = loader.load() assert len(docs) == 1