Bagatur/nuclia (#8404)

Co-authored-by: Eric BREHAULT <ebrehault@gmail.com>
bagatur/rm_nuclia_ext
Bagatur 1 year ago committed by GitHub
parent ef5bc1fef1
commit 9fc9018951
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,144 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Nuclia Understanding API document loader\n",
"\n",
"[Nuclia](https://nuclia.com) automatically indexes your unstructured data from any internal and external source, providing optimized search results and generative answers. It can handle video and audio transcription, image content extraction, and document parsing.\n",
"\n",
"The Nuclia Understanding API supports the processing of unstructured data, including text, web pages, documents, and audio/video contents. It extracts all texts wherever they are (using speech-to-text or OCR when needed), it also extracts metadata, embedded files (like images in a PDF), and web links. If machine learning is enabled, it identifies entities, provides a summary of the content and generates embeddings for all the sentences.\n",
"\n",
"To use the Nuclia Understanding API, you need to have a Nuclia account. You can create one for free at [https://nuclia.cloud](https://nuclia.cloud), and then [create a NUA key](https://docs.nuclia.dev/docs/docs/using/understanding/intro)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#!pip install --upgrade protobuf\n",
"#!pip install nucliadb-protos"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"os.environ[\"NUCLIA_ZONE\"] = \"<YOUR_ZONE>\" # e.g. europe-1\n",
"os.environ[\"NUCLIA_NUA_KEY\"] = \"<YOUR_API_KEY>\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"To use the Nuclia document loader, you need to instantiate a `NucliaUnderstandingAPI` tool:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.tools.nuclia import NucliaUnderstandingAPI\n",
"\n",
"nua = NucliaUnderstandingAPI(enable_ml=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.document_loaders.nuclia import NucliaLoader\n",
"\n",
"loader = NucliaLoader(\"./interview.mp4\", nua)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"You can now call the `load` the document in a loop until you get the document."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"pending = True\n",
"while pending:\n",
" time.sleep(15)\n",
" docs = loader.load()\n",
" if len(docs) > 0:\n",
" print(docs[0].page_content)\n",
" print(docs[0].metadata)\n",
" pending = False\n",
" else:\n",
" print(\"waiting...\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Retrieved information\n",
"\n",
"Nuclia returns the following information:\n",
"\n",
"- file metadata\n",
"- extracted text\n",
"- nested text (like text in an embedded image)\n",
"- paragraphs and sentences splitting (defined by the position of their first and last characters, plus start time and end time for a video or audio file)\n",
"- links\n",
"- a thumbnail\n",
"- embedded files\n",
"\n",
"Note:\n",
"\n",
" Generated files (thumbnail, extracted embedded files, etc.) are provided as a token. You can download them with the [`/processing/download` endpoint](https://docs.nuclia.dev/docs/api#operation/Download_binary_file_processing_download_get).\n",
"\n",
" Also at any level, if an attribute exceeds a certain size, it will be put in a downloadable file and will be replaced in the document by a file pointer. This will consist of `{\"file\": {\"uri\": \"JWT_TOKEN\"}}`. The rule is that if the size of the message is greater than 1000000 characters, the biggest parts will be moved to downloadable files. First, the compression process will target vectors. If that is not enough, it will target large field metadata, and finally it will target extracted text.\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "langchain",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.5"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -0,0 +1,103 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Nuclia Understanding API document transformer\n",
"\n",
"[Nuclia](https://nuclia.com) automatically indexes your unstructured data from any internal and external source, providing optimized search results and generative answers. It can handle video and audio transcription, image content extraction, and document parsing.\n",
"\n",
"The Nuclia Understanding API document transformer splits text into paragraphs and sentences, identifies entities, provides a summary of the text and generates embeddings for all the sentences.\n",
"\n",
"To use the Nuclia Understanding API, you need to have a Nuclia account. You can create one for free at [https://nuclia.cloud](https://nuclia.cloud), and then [create a NUA key](https://docs.nuclia.dev/docs/docs/using/understanding/intro).\n",
"\n",
"from langchain.document_transformers.nuclia_text_transform import NucliaTextTransformer"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#!pip install --upgrade protobuf\n",
"#!pip install nucliadb-protos"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"os.environ[\"NUCLIA_ZONE\"] = \"<YOUR_ZONE>\" # e.g. europe-1\n",
"os.environ[\"NUCLIA_NUA_KEY\"] = \"<YOUR_API_KEY>\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"To use the Nuclia document transformer, you need to instantiate a `NucliaUnderstandingAPI` tool with `enable_ml` set to `True`:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.tools.nuclia import NucliaUnderstandingAPI\n",
"\n",
"nua = NucliaUnderstandingAPI(enable_ml=True)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"The Nuclia document transformer must be called in async mode, so you need to use the `atransform_documents` method:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"\n",
"from langchain.document_transformers.nuclia_text_transform import NucliaTextTransformer\n",
"from langchain.schema.document import Document\n",
"\n",
"\n",
"async def process():\n",
" documents = [\n",
" Document(page_content=\"<TEXT 1>\", metadata={}),\n",
" Document(page_content=\"<TEXT 2>\", metadata={}),\n",
" Document(page_content=\"<TEXT 3>\", metadata={}),\n",
" ]\n",
" nuclia_transformer = NucliaTextTransformer(nua)\n",
" transformed_documents = await nuclia_transformer.atransform_documents(documents)\n",
" print(transformed_documents)\n",
"\n",
"\n",
"asyncio.run(process())"
]
}
],
"metadata": {
"language_info": {
"name": "python"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -0,0 +1,173 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Nuclia Understanding API tool\n",
"\n",
"[Nuclia](https://nuclia.com) automatically indexes your unstructured data from any internal and external source, providing optimized search results and generative answers. It can handle video and audio transcription, image content extraction, and document parsing.\n",
"\n",
"The Nuclia Understanding API supports the processing of unstructured data, including text, web pages, documents, and audio/video contents. It extracts all texts wherever it is (using speech-to-text or OCR when needed), it identifies entities, it aslo extracts metadata, embedded files (like images in a PDF), and web links. It also provides a summary of the content.\n",
"\n",
"To use the Nuclia Understanding API, you need to have a Nuclia account. You can create one for free at [https://nuclia.cloud](https://nuclia.cloud), and then [create a NUA key](https://docs.nuclia.dev/docs/docs/using/understanding/intro)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#!pip install --upgrade protobuf\n",
"#!pip install nucliadb-protos"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"os.environ[\"NUCLIA_ZONE\"] = \"<YOUR_ZONE>\" # e.g. europe-1\n",
"os.environ[\"NUCLIA_NUA_KEY\"] = \"<YOUR_API_KEY>\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.tools.nuclia import NucliaUnderstandingAPI\n",
"\n",
"nua = NucliaUnderstandingAPI(enable_ml=False)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"You can push files to the Nuclia Understanding API using the `push` action. As the processing is done asynchronously, the results might be returned in a different order than the files were pushed. That is why you need to provide an `id` to match the results with the corresponding file."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"nua.run({\"action\": \"push\", \"id\": \"1\", \"path\": \"./report.docx\"})\n",
"nua.run({\"action\": \"push\", \"id\": \"2\", \"path\": \"./interview.mp4\"})"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"You can now call the `pull` action in a loop until you get the JSON-formatted result."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"pending = True\n",
"data = None\n",
"while pending:\n",
" time.sleep(15)\n",
" data = nua.run({\"action\": \"pull\", \"id\": \"1\", \"path\": None})\n",
" if data:\n",
" print(data)\n",
" pending = False\n",
" else:\n",
" print(\"waiting...\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also do it in one step in `async` mode, you only need to do a push, and it will wait until the results are pulled:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"\n",
"\n",
"async def process():\n",
" data = await nua.arun(\n",
" {\"action\": \"push\", \"id\": \"1\", \"path\": \"./talk.mp4\", \"text\": None}\n",
" )\n",
" print(data)\n",
"\n",
"\n",
"asyncio.run(process())"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Retrieved information\n",
"\n",
"Nuclia returns the following information:\n",
"\n",
"- file metadata\n",
"- extracted text\n",
"- nested text (like text in an embedded image)\n",
"- a summary (only when `enable_ml` is set to `True`)\n",
"- paragraphs and sentences splitting (defined by the position of their first and last characters, plus start time and end time for a video or audio file)\n",
"- named entities: people, dates, places, organizations, etc. (only when `enable_ml` is set to `True`)\n",
"- links\n",
"- a thumbnail\n",
"- embedded files\n",
"- the vector representations of the text (only when `enable_ml` is set to `True`)\n",
"\n",
"Note:\n",
"\n",
" Generated files (thumbnail, extracted embedded files, etc.) are provided as a token. You can download them with the [`/processing/download` endpoint](https://docs.nuclia.dev/docs/api#operation/Download_binary_file_processing_download_get).\n",
"\n",
" Also at any level, if an attribute exceeds a certain size, it will be put in a downloadable file and will be replaced in the document by a file pointer. This will consist of `{\"file\": {\"uri\": \"JWT_TOKEN\"}}`. The rule is that if the size of the message is greater than 1000000 characters, the biggest parts will be moved to downloadable files. First, the compression process will target vectors. If that is not enough, it will target large field metadata, and finally it will target extracted text.\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "langchain",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.5"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -0,0 +1,33 @@
"""Extract text from any file type."""
import json
import uuid
from typing import List
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from langchain.tools.nuclia.tool import NucliaUnderstandingAPI
class NucliaLoader(BaseLoader):
"""Extract text from any file type."""
def __init__(self, path: str, nuclia_tool: NucliaUnderstandingAPI):
self.nua = nuclia_tool
self.id = str(uuid.uuid4())
self.nua.run({"action": "push", "id": self.id, "path": path, "text": None})
def load(self) -> List[Document]:
"""Load documents."""
data = self.nua.run(
{"action": "pull", "id": self.id, "path": None, "text": None}
)
if not data:
return []
obj = json.loads(data)
text = obj["extracted_text"][0]["body"]["text"]
print(text)
metadata = {
"file": obj["file_extracted_data"][0],
"metadata": obj["field_metadata"][0],
}
return [Document(page_content=text, metadata=metadata)]

@ -27,6 +27,7 @@ from langchain.document_transformers.embeddings_redundant_filter import (
)
from langchain.document_transformers.html2text import Html2TextTransformer
from langchain.document_transformers.long_context_reorder import LongContextReorder
from langchain.document_transformers.nuclia_text_transform import NucliaTextTransformer
from langchain.document_transformers.openai_functions import OpenAIMetadataTagger
__all__ = [
@ -37,6 +38,7 @@ __all__ = [
"EmbeddingsRedundantFilter",
"get_stateful_documents",
"LongContextReorder",
"NucliaTextTransformer",
"OpenAIMetadataTagger",
"Html2TextTransformer",
]

@ -0,0 +1,47 @@
import asyncio
import json
import uuid
from typing import Any, Sequence
from langchain.schema.document import BaseDocumentTransformer, Document
from langchain.tools.nuclia.tool import NucliaUnderstandingAPI
class NucliaTextTransformer(BaseDocumentTransformer):
"""
The Nuclia Understanding API splits into paragraphs and sentences,
identifies entities, provides a summary of the text and generates
embeddings for all the sentences.
"""
def __init__(self, nua: NucliaUnderstandingAPI):
self.nua = nua
def transform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
raise NotImplementedError
async def atransform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
tasks = [
self.nua.arun(
{
"action": "push",
"id": str(uuid.uuid4()),
"text": doc.page_content,
"path": None,
}
)
for doc in documents
]
results = await asyncio.gather(*tasks)
for doc, result in zip(documents, results):
obj = json.loads(result)
metadata = {
"file": obj["file_extracted_data"][0],
"metadata": obj["field_metadata"][0],
}
doc.metadata["nuclia"] = metadata
return documents

@ -0,0 +1,3 @@
from langchain.tools.nuclia.tool import NucliaUnderstandingAPI
__all__ = ["NucliaUnderstandingAPI"]

@ -0,0 +1,229 @@
"""Tool for the Nuclia Understanding API.
Installation:
```bash
pip install --upgrade protobuf
pip install nucliadb-protos
```
"""
import asyncio
import base64
import logging
import mimetypes
import os
from typing import Any, Dict, Optional, Type, Union
import requests
from pydantic import BaseModel, Field
from langchain.callbacks.manager import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
from langchain.tools.base import BaseTool
logger = logging.getLogger(__name__)
class NUASchema(BaseModel):
action: str = Field(
...,
description="Action to perform. Either `push` or `pull`.",
)
id: str = Field(
...,
description="ID of the file to push or pull.",
)
path: Optional[str] = Field(
...,
description="Path to the file to push (needed only for `push` action).",
)
text: Optional[str] = Field(
...,
description="Text content to process (needed only for `push` action).",
)
class NucliaUnderstandingAPI(BaseTool):
"""Tool to process files with the Nuclia Understanding API."""
name = "nuclia_understanding_api"
description = (
"A wrapper around Nuclia Understanding API endpoints. "
"Useful for when you need to extract text from any kind of files. "
)
args_schema: Type[BaseModel] = NUASchema
_results: Dict[str, Any] = {}
_config: Dict[str, Any] = {}
def __init__(self, enable_ml: bool = False) -> None:
zone = os.environ.get("NUCLIA_ZONE", "europe-1")
self._config["BACKEND"] = f"https://{zone}.nuclia.cloud/api/v1"
key = os.environ.get("NUCLIA_NUA_KEY")
if not key:
raise ValueError("NUCLIA_NUA_KEY environment variable not set")
else:
self._config["NUA_KEY"] = key
self._config["enable_ml"] = enable_ml
super().__init__()
def _run(
self,
action: str,
id: str,
path: Optional[str],
text: Optional[str],
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> str:
"""Use the tool."""
if action == "push":
self._check_params(path, text)
if path:
return self._pushFile(id, path)
if text:
return self._pushText(id, text)
elif action == "pull":
return self._pull(id)
return ""
async def _arun(
self,
action: str,
id: str,
path: Optional[str] = None,
text: Optional[str] = None,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the tool asynchronously."""
self._check_params(path, text)
if path:
self._pushFile(id, path)
if text:
self._pushText(id, text)
data = None
while True:
data = self._pull(id)
if data:
break
await asyncio.sleep(15)
return data
def _pushText(self, id: str, text: str) -> str:
field = {
"textfield": {"text": {"body": text, "format": 0}},
"processing_options": {"ml_text": self._config["enable_ml"]},
}
return self._pushField(id, field)
def _pushFile(self, id: str, content_path: str) -> str:
with open(content_path, "rb") as source_file:
response = requests.post(
self._config["BACKEND"] + "/processing/upload",
headers={
"content-type": mimetypes.guess_type(content_path)[0]
or "application/octet-stream",
"x-stf-nuakey": "Bearer " + self._config["NUA_KEY"],
},
data=source_file.read(),
)
if response.status_code != 200:
logger.info(
f"Error uploading {content_path}: "
f"{response.status_code} {response.text}"
)
return ""
else:
field = {
"filefield": {"file": f"{response.text}"},
"processing_options": {"ml_text": self._config["enable_ml"]},
}
return self._pushField(id, field)
def _pushField(self, id: str, field: Any) -> str:
logger.info(f"Pushing {id} in queue")
response = requests.post(
self._config["BACKEND"] + "/processing/push",
headers={
"content-type": "application/json",
"x-stf-nuakey": "Bearer " + self._config["NUA_KEY"],
},
json=field,
)
if response.status_code != 200:
logger.info(
f"Error pushing field {id}:" f"{response.status_code} {response.text}"
)
raise ValueError("Error pushing field")
else:
uuid = response.json()["uuid"]
logger.info(f"Field {id} pushed in queue, uuid: {uuid}")
self._results[id] = {"uuid": uuid, "status": "pending"}
return uuid
def _pull(self, id: str) -> str:
self._pull_queue()
result = self._results.get(id, None)
if not result:
logger.info(f"{id} not in queue")
return ""
elif result["status"] == "pending":
logger.info(f'Waiting for {result["uuid"]} to be processed')
return ""
else:
return result["data"]
def _pull_queue(self) -> None:
try:
from nucliadb_protos.writer_pb2 import BrokerMessage
except ImportError as e:
raise ImportError(
"nucliadb-protos is not installed. "
"Run `pip install nucliadb-protos` to install."
) from e
try:
from google.protobuf.json_format import MessageToJson
except ImportError as e:
raise ImportError(
"Unable to import google.protobuf, please install with "
"`pip install protobuf`."
) from e
res = requests.get(
self._config["BACKEND"] + "/processing/pull",
headers={
"x-stf-nuakey": "Bearer " + self._config["NUA_KEY"],
},
).json()
if res["status"] == "empty":
logger.info("Queue empty")
elif res["status"] == "ok":
payload = res["payload"]
pb = BrokerMessage()
pb.ParseFromString(base64.b64decode(payload))
uuid = pb.uuid
logger.info(f"Pulled {uuid} from queue")
matching_id = self._find_matching_id(uuid)
if not matching_id:
logger.info(f"No matching id for {uuid}")
else:
self._results[matching_id]["status"] = "done"
data = MessageToJson(
pb,
preserving_proto_field_name=True,
including_default_value_fields=True,
)
self._results[matching_id]["data"] = data
def _find_matching_id(self, uuid: str) -> Union[str, None]:
for id, result in self._results.items():
if result["uuid"] == uuid:
return id
return None
def _check_params(self, path: Optional[str], text: Optional[str]) -> None:
if not path and not text:
raise ValueError("File path or text is required")
if path and text:
raise ValueError("Cannot process both file and text on a single run")

@ -3812,7 +3812,7 @@ name = "grpcio-tools"
version = "1.47.5"
description = "Protobuf code generator for gRPC"
category = "main"
optional = true
optional = false
python-versions = ">=3.6"
files = [
{file = "grpcio-tools-1.47.5.tar.gz", hash = "sha256:62ced60566a4cbcf35c57e887e2e68b4f108b3474ef3ec0022d38cd579345f92"},
@ -4738,7 +4738,6 @@ optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, !=3.6.*"
files = [
{file = "jsonpointer-2.4-py2.py3-none-any.whl", hash = "sha256:15d51bba20eea3165644553647711d150376234112651b4f1811022aecad7d7a"},
{file = "jsonpointer-2.4.tar.gz", hash = "sha256:585cee82b70211fa9e6043b7bb89db6e1aa49524340dde8ad6b63206ea689d88"},
]
[[package]]
@ -6349,7 +6348,7 @@ files = [
name = "mypy-protobuf"
version = "3.3.0"
description = "Generate mypy stub files from protobuf specs"
category = "dev"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@ -6697,6 +6696,23 @@ jupyter-server = ">=1.8,<3"
[package.extras]
test = ["pytest", "pytest-console-scripts", "pytest-jupyter", "pytest-tornasync"]
[[package]]
name = "nucliadb-protos"
version = "2.20.2.post523"
description = "protos for nucliadb"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "nucliadb_protos-2.20.2.post523-py3-none-any.whl", hash = "sha256:23afc2fd600818b4f27fbd2ae17863cb390e5cfb4f54da47fcfffe17bf7cf979"},
]
[package.dependencies]
grpcio = ">=1.44.0"
grpcio-tools = ">=1.44.0"
mypy-protobuf = ">=3.2.0"
protobuf = "*"
[[package]]
name = "numba"
version = "0.57.1"
@ -12196,7 +12212,7 @@ files = [
name = "types-protobuf"
version = "4.23.0.1"
description = "Typing stubs for protobuf"
category = "dev"
category = "main"
optional = false
python-versions = "*"
files = [
@ -13576,7 +13592,7 @@ clarifai = ["clarifai"]
cohere = ["cohere"]
docarray = ["docarray"]
embeddings = ["sentence-transformers"]
extended-testing = ["amazon-textract-caller", "atlassian-python-api", "beautifulsoup4", "bibtexparser", "cassio", "chardet", "esprima", "feedparser", "geopandas", "gitpython", "gql", "html2text", "jinja2", "jq", "lxml", "mwparserfromhell", "mwxml", "newspaper3k", "openai", "openai", "pandas", "pdfminer-six", "pgvector", "psychicapi", "py-trello", "pymupdf", "pypdf", "pypdfium2", "pyspark", "rank-bm25", "rapidfuzz", "requests-toolbelt", "scikit-learn", "streamlit", "sympy", "telethon", "tqdm", "xinference", "zep-python"]
extended-testing = ["amazon-textract-caller", "atlassian-python-api", "beautifulsoup4", "bibtexparser", "cassio", "chardet", "esprima", "feedparser", "geopandas", "gitpython", "gql", "html2text", "jinja2", "jq", "lxml", "mwparserfromhell", "mwxml", "newspaper3k", "nucliadb-protos", "openai", "openai", "pandas", "pdfminer-six", "pgvector", "psychicapi", "py-trello", "pymupdf", "pypdf", "pypdfium2", "pyspark", "rank-bm25", "rapidfuzz", "requests-toolbelt", "scikit-learn", "streamlit", "sympy", "telethon", "tqdm", "xinference", "zep-python"]
javascript = ["esprima"]
llms = ["anthropic", "clarifai", "cohere", "huggingface_hub", "manifest-ml", "nlpcloud", "openai", "openllm", "openlm", "torch", "transformers", "xinference"]
openai = ["openai", "tiktoken"]
@ -13586,4 +13602,4 @@ text-helpers = ["chardet"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.8.1,<4.0"
content-hash = "39305f23d3d69179d247d643631133ac50f5e944d98518c8a56c5f839b8e7a04"
content-hash = "4b4da20485a13da3b7ef22aad38e5a38086d22d8ca0163db79c2276f10b94e4b"

@ -124,6 +124,7 @@ langsmith = "~0.0.11"
rank-bm25 = {version = "^0.2.2", optional = true}
amadeus = {version = ">=8.1.0", optional = true}
geopandas = {version = "^0.13.1", optional = true}
nucliadb-protos = {version = "^2.17.0.post469", optional = true}
xinference = {version = "^0.0.6", optional = true}
python-arango = {version = "^7.5.9", optional = true}
gitpython = {version = "^3.1.32", optional = true}
@ -201,6 +202,7 @@ momento = "^1.5.0"
# Please do not add any dependencies in the test_integration group
# See instructions above ^^
pygithub = "^1.59.0"
nucliadb-protos = "^2.17.0.post469"
[tool.poetry.group.lint.dependencies]
ruff = "^0.0.249"
@ -365,6 +367,7 @@ extended_testing = [
"rank_bm25",
"geopandas",
"jinja2",
"nucliadb-protos",
"xinference",
"gitpython",
"newspaper3k",

@ -0,0 +1,45 @@
import json
import os
from typing import Any
from unittest import mock
from langchain.document_loaders.nuclia import NucliaLoader
from langchain.tools.nuclia.tool import NucliaUnderstandingAPI
def fakerun(**args: Any) -> Any:
def run(self: Any, **args: Any) -> str:
data = {
"extracted_text": [{"body": {"text": "Hello World"}}],
"file_extracted_data": [{"language": "en"}],
"field_metadata": [
{
"metadata": {
"metadata": {
"paragraphs": [
{"end": 66, "sentences": [{"start": 1, "end": 67}]}
]
}
}
}
],
}
return json.dumps(data)
return run
@mock.patch.dict(os.environ, {"NUCLIA_NUA_KEY": "_a_key_"})
def test_nuclia_loader() -> None:
with mock.patch(
"langchain.tools.nuclia.tool.NucliaUnderstandingAPI._run", new_callable=fakerun
):
nua = NucliaUnderstandingAPI(enable_ml=False)
loader = NucliaLoader("/whatever/file.mp3", nua)
docs = loader.load()
assert len(docs) == 1
assert docs[0].page_content == "Hello World"
assert docs[0].metadata["file"]["language"] == "en"
assert (
len(docs[0].metadata["metadata"]["metadata"]["metadata"]["paragraphs"]) == 1
)

@ -0,0 +1,62 @@
import asyncio
import json
from typing import Any
from unittest import mock
import pytest
from langchain.document_transformers.nuclia_text_transform import NucliaTextTransformer
from langchain.schema.document import Document
from langchain.tools.nuclia.tool import NucliaUnderstandingAPI
def fakerun(**args: Any) -> Any:
async def run(self: Any, **args: Any) -> str:
await asyncio.sleep(0.1)
data = {
"extracted_text": [{"body": {"text": "Hello World"}}],
"file_extracted_data": [{"language": "en"}],
"field_metadata": [
{
"metadata": {
"metadata": {
"paragraphs": [
{"end": 66, "sentences": [{"start": 1, "end": 67}]}
]
}
}
}
],
}
return json.dumps(data)
return run
@pytest.mark.asyncio
async def test_nuclia_loader() -> None:
with mock.patch(
"langchain.tools.nuclia.tool.NucliaUnderstandingAPI._arun", new_callable=fakerun
):
with mock.patch("os.environ.get", return_value="_a_key_"):
nua = NucliaUnderstandingAPI(enable_ml=False)
documents = [
Document(page_content="Hello, my name is Alice", metadata={}),
Document(page_content="Hello, my name is Bob", metadata={}),
]
nuclia_transformer = NucliaTextTransformer(nua)
transformed_documents = await nuclia_transformer.atransform_documents(
documents
)
assert len(transformed_documents) == 2
assert (
transformed_documents[0].metadata["nuclia"]["file"]["language"] == "en"
)
assert (
len(
transformed_documents[1].metadata["nuclia"]["metadata"]["metadata"][
"metadata"
]["paragraphs"]
)
== 1
)

@ -0,0 +1,110 @@
import base64
import json
import os
from pathlib import Path
from typing import Any
from unittest import mock
import pytest
from langchain.tools.nuclia.tool import NucliaUnderstandingAPI
README_PATH = Path(__file__).parents[4] / "README.md"
class FakeUploadResponse:
status_code = 200
text = "fake_uuid"
class FakePushResponse:
status_code = 200
def json(self) -> Any:
return {"uuid": "fake_uuid"}
class FakePullResponse:
status_code = 200
def json(self) -> Any:
return {
"status": "ok",
"payload": base64.b64encode(bytes('{"some": "data"}}', "utf-8")),
}
def FakeParseFromString(**args: Any) -> Any:
def ParseFromString(self: Any, data: str) -> None:
self.uuid = "fake_uuid"
return ParseFromString
def fakepost(**kwargs: Any) -> Any:
def fn(url: str, **kwargs: Any) -> Any:
if url.endswith("/processing/upload"):
return FakeUploadResponse()
elif url.endswith("/processing/push"):
return FakePushResponse()
else:
raise Exception("Invalid POST URL")
return fn
def fakeget(**kwargs: Any) -> Any:
def fn(url: str, **kwargs: Any) -> Any:
if url.endswith("/processing/pull"):
return FakePullResponse()
else:
raise Exception("Invalid GET URL")
return fn
@mock.patch.dict(os.environ, {"NUCLIA_NUA_KEY": "_a_key_"})
@pytest.mark.requires("nucliadb_protos")
def test_nuclia_tool() -> None:
with mock.patch(
"nucliadb_protos.writer_pb2.BrokerMessage.ParseFromString",
new_callable=FakeParseFromString,
):
with mock.patch("requests.post", new_callable=fakepost):
with mock.patch("requests.get", new_callable=fakeget):
nua = NucliaUnderstandingAPI(enable_ml=False)
uuid = nua.run(
{
"action": "push",
"id": "1",
"path": str(README_PATH),
"text": None,
}
)
assert uuid == "fake_uuid"
data = nua.run(
{"action": "pull", "id": "1", "path": None, "text": None}
)
assert json.loads(data)["uuid"] == "fake_uuid"
@pytest.mark.asyncio
@pytest.mark.requires("nucliadb_protos")
async def test_async_call() -> None:
with mock.patch(
"nucliadb_protos.writer_pb2.BrokerMessage.ParseFromString",
new_callable=FakeParseFromString,
):
with mock.patch("requests.post", new_callable=fakepost):
with mock.patch("requests.get", new_callable=fakeget):
with mock.patch("os.environ.get", return_value="_a_key_"):
nua = NucliaUnderstandingAPI(enable_ml=False)
data = await nua.arun(
{
"action": "push",
"id": "1",
"path": str(README_PATH),
"text": None,
}
)
assert json.loads(data)["uuid"] == "fake_uuid"
Loading…
Cancel
Save