mirror of https://github.com/hwchase17/langchain
mongodb doc loader init (#10645)
- **Description:** A Document Loader for MongoDB - **Issue:** n/a - **Dependencies:** Motor, the async driver for MongoDB - **Tag maintainer:** n/a - **Twitter handle:** pigpenblue Note that an initial mongodb document loader was created 4 months ago, but the [PR ](https://github.com/langchain-ai/langchain/pull/4285)was never pulled in. @leo-gan had commented on that PR, but given it is extremely far behind the master branch and a ton has changed in Langchain since then (including repo name and structure), I rewrote the branch and issued a new PR with the expectation that the old one can be closed. Please reference that old PR for comments/context, but it can be closed in favor of this one. Thanks! --------- Co-authored-by: Bagatur <baskaryan@gmail.com> Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>pull/11224/head^2
parent
523898ab9c
commit
715ffda28b
@ -0,0 +1,163 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"attachments": {},
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"id": "vm8vn9t8DvC_"
|
||||
},
|
||||
"source": [
|
||||
"# MongoDB"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"[MongoDB](https://www.mongodb.com/) is a NoSQL , document-oriented database that supports JSON-like documents with a dynamic schema."
|
||||
]
|
||||
},
|
||||
{
|
||||
"attachments": {},
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"id": "5WjXERXzFEhg"
|
||||
},
|
||||
"source": [
|
||||
"## Overview"
|
||||
]
|
||||
},
|
||||
{
|
||||
"attachments": {},
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"id": "juAmbgoWD17u"
|
||||
},
|
||||
"source": [
|
||||
"The MongoDB Document Loader returns a list of Langchain Documents from a MongoDB database.\n",
|
||||
"\n",
|
||||
"The Loader requires the following parameters:\n",
|
||||
"\n",
|
||||
"* MongoDB connection string\n",
|
||||
"* MongoDB database name\n",
|
||||
"* MongoDB collection name\n",
|
||||
"* (Optional) Content Filter dictionary\n",
|
||||
"\n",
|
||||
"The output takes the following format:\n",
|
||||
"\n",
|
||||
"- pageContent= Mongo Document\n",
|
||||
"- metadata={'database': '[database_name]', 'collection': '[collection_name]'}"
|
||||
]
|
||||
},
|
||||
{
|
||||
"attachments": {},
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Load the Document Loader"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 24,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# add this import for running in jupyter notebook\n",
|
||||
"import nest_asyncio\n",
|
||||
"nest_asyncio.apply()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from langchain.document_loaders.mongodb import MongodbLoader"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 25,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"loader = MongodbLoader(connection_string=\"mongodb://localhost:27017/\",\n",
|
||||
" db_name=\"sample_restaurants\", \n",
|
||||
" collection_name=\"restaurants\",\n",
|
||||
" filter_criteria={\"borough\": \"Bronx\", \"cuisine\": \"Bakery\" },\n",
|
||||
" ) "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 26,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"25359"
|
||||
]
|
||||
},
|
||||
"execution_count": 26,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"docs = loader.load()\n",
|
||||
"\n",
|
||||
"len(docs)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 27,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"Document(page_content=\"{'_id': ObjectId('5eb3d668b31de5d588f4292a'), 'address': {'building': '2780', 'coord': [-73.98241999999999, 40.579505], 'street': 'Stillwell Avenue', 'zipcode': '11224'}, 'borough': 'Brooklyn', 'cuisine': 'American', 'grades': [{'date': datetime.datetime(2014, 6, 10, 0, 0), 'grade': 'A', 'score': 5}, {'date': datetime.datetime(2013, 6, 5, 0, 0), 'grade': 'A', 'score': 7}, {'date': datetime.datetime(2012, 4, 13, 0, 0), 'grade': 'A', 'score': 12}, {'date': datetime.datetime(2011, 10, 12, 0, 0), 'grade': 'A', 'score': 12}], 'name': 'Riviera Caterer', 'restaurant_id': '40356018'}\", metadata={'database': 'sample_restaurants', 'collection': 'restaurants'})"
|
||||
]
|
||||
},
|
||||
"execution_count": 27,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"docs[0]"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"collapsed_sections": [
|
||||
"5WjXERXzFEhg"
|
||||
],
|
||||
"provenance": []
|
||||
},
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.18"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 4
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.document_loaders.base import BaseLoader
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MongodbLoader(BaseLoader):
|
||||
"""Load MongoDB documents."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
connection_string: str,
|
||||
db_name: str,
|
||||
collection_name: str,
|
||||
*,
|
||||
filter_criteria: Optional[Dict] = None,
|
||||
) -> None:
|
||||
try:
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
except ImportError as e:
|
||||
raise ImportError(
|
||||
"Cannot import from motor, please install with `pip install motor`."
|
||||
) from e
|
||||
if not connection_string:
|
||||
raise ValueError("connection_string must be provided.")
|
||||
|
||||
if not db_name:
|
||||
raise ValueError("db_name must be provided.")
|
||||
|
||||
if not collection_name:
|
||||
raise ValueError("collection_name must be provided.")
|
||||
|
||||
self.client = AsyncIOMotorClient(connection_string)
|
||||
self.db_name = db_name
|
||||
self.collection_name = collection_name
|
||||
self.filter_criteria = filter_criteria or {}
|
||||
|
||||
self.db = self.client.get_database(db_name)
|
||||
self.collection = self.db.get_collection(collection_name)
|
||||
|
||||
def load(self) -> List[Document]:
|
||||
"""Load data into Document objects.
|
||||
|
||||
Attention:
|
||||
|
||||
This implementation starts an asyncio event loop which
|
||||
will only work if running in a sync env. In an async env, it should
|
||||
fail since there is already an event loop running.
|
||||
|
||||
This code should be updated to kick off the event loop from a separate
|
||||
thread if running within an async context.
|
||||
"""
|
||||
return asyncio.run(self.aload())
|
||||
|
||||
async def aload(self) -> List[Document]:
|
||||
"""Load data into Document objects."""
|
||||
result = []
|
||||
total_docs = await self.collection.count_documents(self.filter_criteria)
|
||||
async for doc in self.collection.find(self.filter_criteria):
|
||||
metadata = {
|
||||
"database": self.db_name,
|
||||
"collection": self.collection_name,
|
||||
}
|
||||
result.append(Document(page_content=str(doc), metadata=metadata))
|
||||
|
||||
if len(result) != total_docs:
|
||||
logger.warning(
|
||||
f"Only partial collection of documents returned. Loaded {len(result)} "
|
||||
f"docs, expected {total_docs}."
|
||||
)
|
||||
|
||||
return result
|
@ -0,0 +1,60 @@
|
||||
from typing import Dict, List
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.document_loaders.mongodb import MongodbLoader
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def raw_docs() -> List[Dict]:
|
||||
return [
|
||||
{"_id": "1", "address": {"building": "1", "room": "1"}},
|
||||
{"_id": "2", "address": {"building": "2", "room": "2"}},
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def expected_documents() -> List[Document]:
|
||||
return [
|
||||
Document(
|
||||
page_content="{'_id': '1', 'address': {'building': '1', 'room': '1'}}",
|
||||
metadata={"database": "sample_restaurants", "collection": "restaurants"},
|
||||
),
|
||||
Document(
|
||||
page_content="{'_id': '2', 'address': {'building': '2', 'room': '2'}}",
|
||||
metadata={"database": "sample_restaurants", "collection": "restaurants"},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.requires("motor")
|
||||
@pytest.mark.asyncio
|
||||
async def test_load_mocked(expected_documents: List[Document]) -> None:
|
||||
mock_async_load = AsyncMock()
|
||||
mock_async_load.return_value = expected_documents
|
||||
|
||||
mock_find = AsyncMock()
|
||||
mock_find.return_value = iter(expected_documents)
|
||||
|
||||
mock_count_documents = MagicMock()
|
||||
mock_count_documents.return_value = len(expected_documents)
|
||||
|
||||
mock_collection = MagicMock()
|
||||
mock_collection.find = mock_find
|
||||
mock_collection.count_documents = mock_count_documents
|
||||
|
||||
with patch(
|
||||
"motor.motor_asyncio.AsyncIOMotorClient", return_value=MagicMock()
|
||||
), patch(
|
||||
"langchain.document_loaders.mongodb.MongodbLoader.aload",
|
||||
new=mock_async_load,
|
||||
):
|
||||
loader = MongodbLoader(
|
||||
"mongodb://localhost:27017", "test_db", "test_collection"
|
||||
)
|
||||
loader.collection = mock_collection
|
||||
documents = await loader.aload()
|
||||
|
||||
assert documents == expected_documents
|
Loading…
Reference in New Issue