DirectoryLoader parallel file loading

This commit is contained in:
blob42 2023-05-12 23:03:26 +02:00
parent 40bd9ebec7
commit c0cca393a7

View File

@ -3,15 +3,16 @@ import logging
from pathlib import Path from pathlib import Path
from typing import List, Type, Union from typing import List, Type, Union
from tqdm.contrib.concurrent import process_map
from langchain.docstore.document import Document from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader from langchain.document_loaders.base import BaseLoader
from langchain.document_loaders.html_bs import BSHTMLLoader from langchain.document_loaders.html_bs import BSHTMLLoader
from langchain.document_loaders.text import TextLoader from langchain.document_loaders.text import TextLoader
from langchain.document_loaders.unstructured import UnstructuredFileLoader from langchain.document_loaders.unstructured import UnstructuredFileLoader
FILE_LOADER_TYPE = Union[ FILE_LOADER_TYPE = Union[Type[UnstructuredFileLoader], Type[TextLoader],
Type[UnstructuredFileLoader], Type[TextLoader], Type[BSHTMLLoader] Type[BSHTMLLoader]]
]
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -36,6 +37,7 @@ class DirectoryLoader(BaseLoader):
loader_kwargs: Union[dict, None] = None, loader_kwargs: Union[dict, None] = None,
recursive: bool = False, recursive: bool = False,
show_progress: bool = False, show_progress: bool = False,
parallel: bool = False,
): ):
"""Initialize with path to directory and how to glob over it.""" """Initialize with path to directory and how to glob over it."""
if loader_kwargs is None: if loader_kwargs is None:
@ -48,12 +50,28 @@ class DirectoryLoader(BaseLoader):
self.silent_errors = silent_errors self.silent_errors = silent_errors
self.recursive = recursive self.recursive = recursive
self.show_progress = show_progress self.show_progress = show_progress
self.parallel = parallel
def _load_file(self, p: Path) -> List[Document]:
if p.is_file():
if _is_visible(p.relative_to(p)) or self.load_hidden:
try:
sub_docs = self.loader_cls(str(p),
**self.loader_kwargs).load()
return sub_docs
except Exception as e:
if self.silent_errors:
logger.warning(e)
else:
raise e
return []
def load(self) -> List[Document]: def load(self) -> List[Document]:
"""Load documents.""" """Load documents."""
p = Path(self.path) p = Path(self.path)
docs = [] docs = []
items = list(p.rglob(self.glob) if self.recursive else p.glob(self.glob)) items = list(
p.rglob(self.glob) if self.recursive else p.glob(self.glob))
pbar = None pbar = None
if self.show_progress: if self.show_progress:
@ -64,25 +82,20 @@ class DirectoryLoader(BaseLoader):
except ImportError as e: except ImportError as e:
logger.warning( logger.warning(
"To log the progress of DirectoryLoader you need to install tqdm, " "To log the progress of DirectoryLoader you need to install tqdm, "
"`pip install tqdm`" "`pip install tqdm`")
)
if self.silent_errors: if self.silent_errors:
logger.warning(e) logger.warning(e)
else: else:
raise e raise e
for i in items: if self.parallel:
if i.is_file(): results = process_map(self._load_file, items) # use default max_workers
if _is_visible(i.relative_to(p)) or self.load_hidden:
try:
sub_docs = self.loader_cls(str(i), **self.loader_kwargs).load()
docs.extend(sub_docs)
except Exception as e:
if self.silent_errors:
logger.warning(e)
else: else:
raise e results = process_map(self._load_file, items, max_workers=1)
finally:
# Combine the results
for sub_docs in results:
docs.extend(sub_docs)
if pbar: if pbar:
pbar.update(1) pbar.update(1)