Harrison/multithreading directory loader (#4650)

Co-authored-by: PawelFaron <42373772+PawelFaron@users.noreply.github.com>
Co-authored-by: Pawel Faron <ext-pawel.faron@vaisala.com>
pull/4662/head
Harrison Chase 1 year ago committed by GitHub
parent b0c733e327
commit 44ae673388
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -112,6 +112,34 @@
"docs = loader.load()" "docs = loader.load()"
] ]
}, },
{
"cell_type": "markdown",
"id": "c16ed46a",
"metadata": {},
"source": [
"## Use multithreading"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "5752e23e",
"metadata": {},
"source": [
"By default the loading happens in one thread. In order to utilize several threads set the `use_multithreading` flag to true."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f8d84f52",
"metadata": {},
"outputs": [],
"source": [
"loader = DirectoryLoader('../', glob=\"**/*.md\", use_multithreading=True)\n",
"docs = loader.load()"
]
},
{ {
"cell_type": "markdown", "cell_type": "markdown",
"id": "c5652850", "id": "c5652850",

@ -1,7 +1,8 @@
"""Loading logic for loading documents from a directory.""" """Loading logic for loading documents from a directory."""
import concurrent
import logging import logging
from pathlib import Path from pathlib import Path
from typing import List, Type, Union from typing import Any, List, Optional, Type, Union
from langchain.docstore.document import Document from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader from langchain.document_loaders.base import BaseLoader
@ -36,6 +37,8 @@ class DirectoryLoader(BaseLoader):
loader_kwargs: Union[dict, None] = None, loader_kwargs: Union[dict, None] = None,
recursive: bool = False, recursive: bool = False,
show_progress: bool = False, show_progress: bool = False,
use_multithreading: bool = False,
max_concurrency: int = 4,
): ):
"""Initialize with path to directory and how to glob over it.""" """Initialize with path to directory and how to glob over it."""
if loader_kwargs is None: if loader_kwargs is None:
@ -48,11 +51,30 @@ class DirectoryLoader(BaseLoader):
self.silent_errors = silent_errors self.silent_errors = silent_errors
self.recursive = recursive self.recursive = recursive
self.show_progress = show_progress self.show_progress = show_progress
self.use_multithreading = use_multithreading
self.max_concurrency = max_concurrency
def load_file(
self, item: Path, path: Path, docs: List[Document], pbar: Optional[Any]
) -> None:
if item.is_file():
if _is_visible(item.relative_to(path)) or self.load_hidden:
try:
sub_docs = self.loader_cls(str(item), **self.loader_kwargs).load()
docs.extend(sub_docs)
except Exception as e:
if self.silent_errors:
logger.warning(e)
else:
raise e
finally:
if pbar:
pbar.update(1)
def load(self) -> List[Document]: def load(self) -> List[Document]:
"""Load documents.""" """Load documents."""
p = Path(self.path) p = Path(self.path)
docs = [] docs: List[Document] = []
items = list(p.rglob(self.glob) if self.recursive else p.glob(self.glob)) items = list(p.rglob(self.glob) if self.recursive else p.glob(self.glob))
pbar = None pbar = None
@ -71,22 +93,19 @@ class DirectoryLoader(BaseLoader):
else: else:
raise e raise e
for i in items: if self.use_multithreading:
if i.is_file(): with concurrent.futures.ThreadPoolExecutor(
if _is_visible(i.relative_to(p)) or self.load_hidden: max_workers=self.max_concurrency
try: ) as executor:
sub_docs = self.loader_cls(str(i), **self.loader_kwargs).load() executor.map(lambda i: self.load_file(i, p, docs, pbar), items)
docs.extend(sub_docs) else:
except Exception as e: for i in items:
if self.silent_errors: self.load_file(i, p, docs, pbar)
logger.warning(e)
else:
raise e
finally:
if pbar:
pbar.update(1)
if pbar: if pbar:
pbar.close() pbar.close()
return docs return docs
#

Loading…
Cancel
Save