community[patch]: Add semantic info to metadata, classified by pebblo-server. (#20468)

Description: Add support for Semantic topics and entities.
Classification done by pebblo-server is not used to enhance metadata of
Documents loaded by document loaders.
Dependencies: None
Documentation: Updated.

Signed-off-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
Co-authored-by: Rahul Tripathi <rauhl.psit.ec@gmail.com>
pull/20434/head^2
Rahul Triptahi 3 weeks ago committed by GitHub
parent a5028b6356
commit dc921f0823
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -69,7 +69,7 @@
"source": [
"### Send semantic topics and identities to Pebblo cloud server\n",
"\n",
"To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-ket in `PEBBLO_API_KEY` environment variable."
"To send semantic data to pebblo-cloud, pass api-key to PebbloSafeLoader as an argument or alternatively, put the api-key in `PEBBLO_API_KEY` environment variable."
]
},
{
@ -91,6 +91,41 @@
"documents = loader.load()\n",
"print(documents)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Add semantic topics and identities to loaded metadata\n",
"\n",
"To add semantic topics and sematic entities to metadata of loaded documents, set load_semantic to True as an argument or alternatively, define a new environment variable `PEBBLO_LOAD_SEMANTIC`, and setting it to True."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.document_loaders.csv_loader import CSVLoader\n",
"from langchain_community.document_loaders import PebbloSafeLoader\n",
"\n",
"loader = PebbloSafeLoader(\n",
" CSVLoader(\"data/corp_sens_data.csv\"),\n",
" name=\"acme-corp-rag-1\", # App name (Mandatory)\n",
" owner=\"Joe Smith\", # Owner (Optional)\n",
" description=\"Support productivity RAG application\", # Description (Optional)\n",
" api_key=\"my-api-key\", # API key (Optional, can be set in the environment variable PEBBLO_API_KEY)\n",
" load_semantic=True, # Load semantic data (Optional, default is False, can be set in the environment variable PEBBLO_LOAD_SEMANTIC)\n",
")\n",
"documents = loader.load()\n",
"print(documents[0].metadata)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
}
],
"metadata": {

@ -5,9 +5,9 @@ import logging
import os
import uuid
from http import HTTPStatus
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Iterator, List, Optional, Union
import requests
import requests # type: ignore
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
@ -19,6 +19,7 @@ from langchain_community.utilities.pebblo import (
PLUGIN_VERSION,
App,
Doc,
IndexedDocument,
get_full_path,
get_loader_full_path,
get_loader_type,
@ -43,6 +44,7 @@ class PebbloSafeLoader(BaseLoader):
owner: str = "",
description: str = "",
api_key: Optional[str] = None,
load_semantic: bool = False,
):
if not name or not isinstance(name, str):
raise NameError("Must specify a valid name.")
@ -50,15 +52,17 @@ class PebbloSafeLoader(BaseLoader):
self.api_key = os.environ.get("PEBBLO_API_KEY") or api_key
self.load_id = str(uuid.uuid4())
self.loader = langchain_loader
self.load_semantic = os.environ.get("PEBBLO_LOAD_SEMANTIC") or load_semantic
self.owner = owner
self.description = description
self.source_path = get_loader_full_path(self.loader)
self.source_owner = PebbloSafeLoader.get_file_owner_from_path(self.source_path)
self.docs: List[Document] = []
self.docs_with_id: Union[List[IndexedDocument], List[Document], List] = []
loader_name = str(type(self.loader)).split(".")[-1].split("'")[0]
self.source_type = get_loader_type(loader_name)
self.source_path_size = self.get_source_size(self.source_path)
self.source_aggr_size = 0
self.source_aggregate_size = 0
self.loader_details = {
"loader": loader_name,
"source_path": self.source_path,
@ -80,7 +84,15 @@ class PebbloSafeLoader(BaseLoader):
list: Documents fetched from load method of the wrapped `loader`.
"""
self.docs = self.loader.load()
self._send_loader_doc(loading_end=True)
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
return self.docs
self.docs_with_id = self._index_docs()
classified_docs = self._classify_doc(self.docs_with_id, loading_end=True)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_docs
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
return self.docs
def lazy_load(self) -> Iterator[Document]:
@ -104,13 +116,19 @@ class PebbloSafeLoader(BaseLoader):
doc = next(doc_iterator)
except StopIteration:
self.docs = []
self._send_loader_doc(loading_end=True)
break
self.docs = [
doc,
]
self._send_loader_doc()
yield doc
self.docs = list((doc,))
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
yield self.docs[0]
else:
self.docs_with_id = self._index_docs()
classified_doc = self._classify_doc(self.docs)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_doc
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
yield self.docs[0]
@classmethod
def set_discover_sent(cls) -> None:
@ -120,16 +138,23 @@ class PebbloSafeLoader(BaseLoader):
def set_loader_sent(cls) -> None:
cls._loader_sent = True
def _send_loader_doc(self, loading_end: bool = False) -> list:
def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
"""Send documents fetched from loader to pebblo-server. Then send
classified documents to Daxa cloud(If api_key is present). Internal method.
Args:
loaded_docs (list): List of documents fetched from loader's load operation.
loading_end (bool, optional): Flag indicating the halt of data
loading by loader. Defaults to False.
loading by loader. Defaults to False.
"""
headers = {"Accept": "application/json", "Content-Type": "application/json"}
doc_content = [doc.dict() for doc in self.docs]
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if loading_end is True:
PebbloSafeLoader.set_loader_sent()
doc_content = [doc.dict() for doc in loaded_docs]
docs = []
for doc in doc_content:
doc_authorized_identities = doc.get("metadata", {}).get(
@ -144,11 +169,13 @@ class PebbloSafeLoader(BaseLoader):
doc_source_size = self.get_source_size(doc_source_path)
page_content = str(doc.get("page_content"))
page_content_size = self.calculate_content_size(page_content)
self.source_aggr_size += page_content_size
self.source_aggregate_size += page_content_size
doc_id = doc.get("id", None) or 0
docs.append(
{
"doc": page_content,
"source_path": doc_source_path,
"id": doc_id,
"last_modified": doc.get("metadata", {}).get("last_modified"),
"file_owner": doc_source_owner,
**(
@ -176,7 +203,9 @@ class PebbloSafeLoader(BaseLoader):
if loading_end is True:
payload["loading_end"] = "true"
if "loader_details" in payload:
payload["loader_details"]["source_aggr_size"] = self.source_aggr_size
payload["loader_details"]["source_aggregate_size"] = ( # noqa
self.source_aggregate_size
)
payload = Doc(**payload).dict(exclude_unset=True)
load_doc_url = f"{CLASSIFIER_URL}{LOADER_DOC_URL}"
classified_docs = []
@ -202,11 +231,9 @@ class PebbloSafeLoader(BaseLoader):
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: %s", e)
logger.warning("An Exception caught in _send_loader_doc: local %s", e)
if self.api_key:
if not classified_docs:
logger.warning("No classified docs to send to pebblo-cloud.")
return classified_docs
try:
payload["docs"] = classified_docs
@ -234,7 +261,7 @@ class PebbloSafeLoader(BaseLoader):
except requests.exceptions.RequestException:
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_loader_doc: %s", e)
logger.warning("An Exception caught in _send_loader_doc: cloud %s", e)
if loading_end is True:
PebbloSafeLoader.set_loader_sent()
@ -270,6 +297,12 @@ class PebbloSafeLoader(BaseLoader):
pebblo_resp = requests.post(
app_discover_url, headers=headers, json=payload, timeout=20
)
if self.api_key:
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}/v1/discover"
headers.update({"x-api-key": self.api_key})
_ = requests.post(
pebblo_cloud_url, headers=headers, json=payload, timeout=20
)
logger.debug(
"send_discover[local]: request url %s, body %s len %s\
response status %s body %s",
@ -287,8 +320,8 @@ class PebbloSafeLoader(BaseLoader):
)
except requests.exceptions.RequestException:
logger.warning("Unable to reach pebblo server.")
except Exception:
logger.warning("An Exception caught in _send_discover.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: local %s", e)
if self.api_key:
try:
@ -316,7 +349,7 @@ class PebbloSafeLoader(BaseLoader):
except requests.exceptions.RequestException:
logger.warning("Unable to reach Pebblo cloud server.")
except Exception as e:
logger.warning("An Exception caught in _send_discover: %s", e)
logger.warning("An Exception caught in _send_discover: cloud %s", e)
def _get_app_details(self) -> App:
"""Fetch app details. Internal method.
@ -378,3 +411,80 @@ class PebbloSafeLoader(BaseLoader):
total_size += os.path.getsize(fp)
size = total_size
return size
def _index_docs(self) -> List[IndexedDocument]:
"""
Indexes the documents and returns a list of IndexedDocument objects.
Returns:
List[IndexedDocument]: A list of IndexedDocument objects with unique IDs.
"""
docs_with_id = [
IndexedDocument(id=hex(i)[2:], **doc.dict())
for i, doc in enumerate(self.docs)
]
return docs_with_id
def _add_semantic_to_docs(
self, docs_with_id: List[IndexedDocument], classified_docs: List[dict]
) -> List[Document]:
"""
Adds semantic metadata to the given list of documents.
Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects
containing the documents with their IDs.
classified_docs (List[dict]): A list of dictionaries containing the
classified documents.
Returns:
List[Document]: A list of Document objects with added semantic metadata.
"""
indexed_docs = {
doc.id: Document(page_content=doc.page_content, metadata=doc.metadata)
for doc in docs_with_id
}
for classified_doc in classified_docs:
doc_id = classified_doc.get("id")
if doc_id in indexed_docs:
self._add_semantic_to_doc(indexed_docs[doc_id], classified_doc)
semantic_metadata_docs = [doc for doc in indexed_docs.values()]
return semantic_metadata_docs
def _unindex_docs(self, docs_with_id: List[IndexedDocument]) -> List[Document]:
"""
Converts a list of IndexedDocument objects to a list of Document objects.
Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects.
Returns:
List[Document]: A list of Document objects.
"""
docs = [
Document(page_content=doc.page_content, metadata=doc.metadata)
for i, doc in enumerate(docs_with_id)
]
return docs
def _add_semantic_to_doc(self, doc: Document, classified_doc: dict) -> Document:
"""
Adds semantic metadata to the given document in-place.
Args:
doc (Document): A Document object.
classified_doc (dict): A dictionary containing the classified document.
Returns:
Document: The Document object with added semantic metadata.
"""
doc.metadata["pebblo_semantic_entities"] = list(
classified_doc.get("entities", {}).keys()
)
doc.metadata["pebblo_semantic_topics"] = list(
classified_doc.get("topics", {}).keys()
)
return doc

@ -6,6 +6,7 @@ import pathlib
import platform
from typing import Optional, Tuple
from langchain_core.documents import Document
from langchain_core.env import get_runtime_environment
from langchain_core.pydantic_v1 import BaseModel
@ -61,6 +62,10 @@ SUPPORTED_LOADERS = (*file_loader, *dir_loader, *in_memory)
logger = logging.getLogger(__name__)
class IndexedDocument(Document):
id: str
class Runtime(BaseModel):
"""Pebblo Runtime.

Loading…
Cancel
Save