From 44ae673388d572a7f64f90091ec6f0ae7424f153 Mon Sep 17 00:00:00 2001 From: Harrison Chase Date: Sat, 13 May 2023 21:46:02 -0700 Subject: [PATCH] Harrison/multithreading directory loader (#4650) Co-authored-by: PawelFaron <42373772+PawelFaron@users.noreply.github.com> Co-authored-by: Pawel Faron --- .../examples/file_directory.ipynb | 28 ++++++++++ langchain/document_loaders/directory.py | 51 +++++++++++++------ 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/docs/modules/indexes/document_loaders/examples/file_directory.ipynb b/docs/modules/indexes/document_loaders/examples/file_directory.ipynb index 117284ca..996f8f9d 100644 --- a/docs/modules/indexes/document_loaders/examples/file_directory.ipynb +++ b/docs/modules/indexes/document_loaders/examples/file_directory.ipynb @@ -112,6 +112,34 @@ "docs = loader.load()" ] }, + { + "cell_type": "markdown", + "id": "c16ed46a", + "metadata": {}, + "source": [ + "## Use multithreading" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "5752e23e", + "metadata": {}, + "source": [ + "By default the loading happens in one thread. In order to utilize several threads set the `use_multithreading` flag to true." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f8d84f52", + "metadata": {}, + "outputs": [], + "source": [ + "loader = DirectoryLoader('../', glob=\"**/*.md\", use_multithreading=True)\n", + "docs = loader.load()" + ] + }, { "cell_type": "markdown", "id": "c5652850", diff --git a/langchain/document_loaders/directory.py b/langchain/document_loaders/directory.py index c180a3cd..cf1065f2 100644 --- a/langchain/document_loaders/directory.py +++ b/langchain/document_loaders/directory.py @@ -1,7 +1,8 @@ """Loading logic for loading documents from a directory.""" +import concurrent import logging from pathlib import Path -from typing import List, Type, Union +from typing import Any, List, Optional, Type, Union from langchain.docstore.document import Document from langchain.document_loaders.base import BaseLoader @@ -36,6 +37,8 @@ class DirectoryLoader(BaseLoader): loader_kwargs: Union[dict, None] = None, recursive: bool = False, show_progress: bool = False, + use_multithreading: bool = False, + max_concurrency: int = 4, ): """Initialize with path to directory and how to glob over it.""" if loader_kwargs is None: @@ -48,11 +51,30 @@ class DirectoryLoader(BaseLoader): self.silent_errors = silent_errors self.recursive = recursive self.show_progress = show_progress + self.use_multithreading = use_multithreading + self.max_concurrency = max_concurrency + + def load_file( + self, item: Path, path: Path, docs: List[Document], pbar: Optional[Any] + ) -> None: + if item.is_file(): + if _is_visible(item.relative_to(path)) or self.load_hidden: + try: + sub_docs = self.loader_cls(str(item), **self.loader_kwargs).load() + docs.extend(sub_docs) + except Exception as e: + if self.silent_errors: + logger.warning(e) + else: + raise e + finally: + if pbar: + pbar.update(1) def load(self) -> List[Document]: """Load documents.""" p = Path(self.path) - docs = [] + docs: List[Document] = [] items = list(p.rglob(self.glob) if self.recursive else p.glob(self.glob)) pbar = None @@ -71,22 +93,19 @@ class DirectoryLoader(BaseLoader): else: raise e - for i in items: - if i.is_file(): - if _is_visible(i.relative_to(p)) or self.load_hidden: - try: - sub_docs = self.loader_cls(str(i), **self.loader_kwargs).load() - docs.extend(sub_docs) - except Exception as e: - if self.silent_errors: - logger.warning(e) - else: - raise e - finally: - if pbar: - pbar.update(1) + if self.use_multithreading: + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.max_concurrency + ) as executor: + executor.map(lambda i: self.load_file(i, p, docs, pbar), items) + else: + for i in items: + self.load_file(i, p, docs, pbar) if pbar: pbar.close() return docs + + +#