diff --git a/docs/extras/integrations/document_loaders/concurrent.ipynb b/docs/extras/integrations/document_loaders/concurrent.ipynb new file mode 100644 index 0000000000..197f0457fa --- /dev/null +++ b/docs/extras/integrations/document_loaders/concurrent.ipynb @@ -0,0 +1,94 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "23c6e167", + "metadata": {}, + "source": [ + "# Concurrent Loader\n", + "\n", + "Works just like the GenericLoader but concurrently for those who choose to optimize their workflow.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "6ff3fb1f", + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.document_loaders import ConcurrentLoader" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "ce96fa20", + "metadata": {}, + "outputs": [], + "source": [ + "loader = ConcurrentLoader.from_filesystem('example_data/', glob=\"**/*.txt\")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "06a6cf5d", + "metadata": {}, + "outputs": [], + "source": [ + "files = loader.load()" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "b87d3e58", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "2" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(files)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "668f1ee5", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.1" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/libs/langchain/langchain/document_loaders/__init__.py b/libs/langchain/langchain/document_loaders/__init__.py index bb285c1ef4..5d985ae280 100644 --- a/libs/langchain/langchain/document_loaders/__init__.py +++ b/libs/langchain/langchain/document_loaders/__init__.py @@ -28,6 +28,7 @@ from langchain.document_loaders.brave_search import BraveSearchLoader from langchain.document_loaders.browserless import BrowserlessLoader from langchain.document_loaders.chatgpt import ChatGPTLoader from langchain.document_loaders.college_confidential import CollegeConfidentialLoader +from langchain.document_loaders.concurrent import ConcurrentLoader from langchain.document_loaders.confluence import ConfluenceLoader from langchain.document_loaders.conllu import CoNLLULoader from langchain.document_loaders.csv_loader import CSVLoader, UnstructuredCSVLoader @@ -305,4 +306,5 @@ __all__ = [ "XorbitsLoader", "YoutubeAudioLoader", "YoutubeLoader", + "ConcurrentLoader", ] diff --git a/libs/langchain/langchain/document_loaders/concurrent.py b/libs/langchain/langchain/document_loaders/concurrent.py new file mode 100644 index 0000000000..545449527c --- /dev/null +++ b/libs/langchain/langchain/document_loaders/concurrent.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import concurrent.futures +from pathlib import Path +from typing import Iterator, Literal, Optional, Sequence, Union + +from langchain.document_loaders.base import BaseBlobParser +from langchain.document_loaders.blob_loaders import BlobLoader, FileSystemBlobLoader +from langchain.document_loaders.generic import GenericLoader +from langchain.document_loaders.parsers.registry import get_parser +from langchain.schema import Document + +_PathLike = Union[str, Path] + +DEFAULT = Literal["default"] + + +class ConcurrentLoader(GenericLoader): + """ + A generic document loader that loads and parses documents concurrently. + """ + + def __init__( + self, blob_loader: BlobLoader, blob_parser: BaseBlobParser, num_workers: int = 4 + ) -> None: + super().__init__(blob_loader, blob_parser) + self.num_workers = num_workers + + def lazy_load( + self, + ) -> Iterator[Document]: + """Load documents lazily with concurrent parsing.""" + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.num_workers + ) as executor: + futures = { + executor.submit(self.blob_parser.lazy_parse, blob) + for blob in self.blob_loader.yield_blobs() + } + for future in concurrent.futures.as_completed(futures): + yield from future.result() + + @classmethod + def from_filesystem( + cls, + path: _PathLike, + *, + glob: str = "**/[!.]*", + suffixes: Optional[Sequence[str]] = None, + show_progress: bool = False, + parser: Union[DEFAULT, BaseBlobParser] = "default", + num_workers: int = 4, + ) -> ConcurrentLoader: + """ + Create a concurrent generic document loader using a + filesystem blob loader. + """ + blob_loader = FileSystemBlobLoader( + path, glob=glob, suffixes=suffixes, show_progress=show_progress + ) + if isinstance(parser, str): + blob_parser = get_parser(parser) + else: + blob_parser = parser + return cls(blob_loader, blob_parser, num_workers) diff --git a/libs/langchain/tests/integration_tests/document_loaders/parsers/test_language.py b/libs/langchain/tests/integration_tests/document_loaders/parsers/test_language.py index 270bae886b..c0de58d683 100644 --- a/libs/langchain/tests/integration_tests/document_loaders/parsers/test_language.py +++ b/libs/langchain/tests/integration_tests/document_loaders/parsers/test_language.py @@ -2,6 +2,7 @@ from pathlib import Path import pytest +from langchain.document_loaders.concurrent import ConcurrentLoader from langchain.document_loaders.generic import GenericLoader from langchain.document_loaders.parsers import LanguageParser from langchain.text_splitter import Language @@ -131,3 +132,52 @@ def test_language_loader_for_javascript_with_parser_threshold() -> None: docs = loader.load() assert len(docs) == 1 + + +def test_concurrent_language_loader_for_javascript_with_parser_threshold() -> None: + """Test JavaScript ConcurrentLoader with parser enabled and below threshold.""" + file_path = Path(__file__).parent.parent.parent / "examples" + loader = ConcurrentLoader.from_filesystem( + file_path, + glob="hello_world.js", + parser=LanguageParser(language=Language.JS, parser_threshold=1000), + ) + docs = loader.load() + + assert len(docs) == 1 + + +def test_concurrent_language_loader_for_python_with_parser_threshold() -> None: + """Test Python ConcurrentLoader with parser enabled and below threshold.""" + file_path = Path(__file__).parent.parent.parent / "examples" + loader = ConcurrentLoader.from_filesystem( + file_path, + glob="hello_world.py", + parser=LanguageParser(language=Language.PYTHON, parser_threshold=1000), + ) + docs = loader.load() + + assert len(docs) == 1 + + +@pytest.mark.skipif(not esprima_installed(), reason="requires esprima package") +def test_concurrent_language_loader_for_javascript() -> None: + """Test JavaScript ConcurrentLoader with parser enabled.""" + file_path = Path(__file__).parent.parent.parent / "examples" + loader = ConcurrentLoader.from_filesystem( + file_path, glob="hello_world.js", parser=LanguageParser(parser_threshold=5) + ) + docs = loader.load() + + assert len(docs) == 3 + + +def test_concurrent_language_loader_for_python() -> None: + """Test Python ConcurrentLoader with parser enabled.""" + file_path = Path(__file__).parent.parent.parent / "examples" + loader = ConcurrentLoader.from_filesystem( + file_path, glob="hello_world.py", parser=LanguageParser(parser_threshold=5) + ) + docs = loader.load() + + assert len(docs) == 2