DirectoryLoader parallel file loading

parallel_dir_loader
blob42 1 year ago
parent ed0d557ede
commit c78075d0bb

@ -3,15 +3,15 @@ import logging
from pathlib import Path from pathlib import Path
from typing import List, Type, Union from typing import List, Type, Union
from langchain.docstore.document import Document from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader from langchain.document_loaders.base import BaseLoader
from langchain.document_loaders.html_bs import BSHTMLLoader from langchain.document_loaders.html_bs import BSHTMLLoader
from langchain.document_loaders.text import TextLoader from langchain.document_loaders.text import TextLoader
from langchain.document_loaders.unstructured import UnstructuredFileLoader from langchain.document_loaders.unstructured import UnstructuredFileLoader
FILE_LOADER_TYPE = Union[ FILE_LOADER_TYPE = Union[Type[UnstructuredFileLoader], Type[TextLoader],
Type[UnstructuredFileLoader], Type[TextLoader], Type[BSHTMLLoader] Type[BSHTMLLoader]]
]
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -36,6 +36,7 @@ class DirectoryLoader(BaseLoader):
loader_kwargs: Union[dict, None] = None, loader_kwargs: Union[dict, None] = None,
recursive: bool = False, recursive: bool = False,
show_progress: bool = False, show_progress: bool = False,
parallel: bool = False,
): ):
"""Initialize with path to directory and how to glob over it.""" """Initialize with path to directory and how to glob over it."""
if loader_kwargs is None: if loader_kwargs is None:
@ -48,12 +49,28 @@ class DirectoryLoader(BaseLoader):
self.silent_errors = silent_errors self.silent_errors = silent_errors
self.recursive = recursive self.recursive = recursive
self.show_progress = show_progress self.show_progress = show_progress
self.parallel = parallel
def _load_file(self, p: Path) -> List[Document]:
if p.is_file():
if _is_visible(p.relative_to(p)) or self.load_hidden:
try:
sub_docs = self.loader_cls(str(p),
**self.loader_kwargs).load()
return sub_docs
except Exception as e:
if self.silent_errors:
logger.warning(e)
else:
raise e
return []
def load(self) -> List[Document]: def load(self) -> List[Document]:
"""Load documents.""" """Load documents."""
p = Path(self.path) p = Path(self.path)
docs = [] docs = []
items = list(p.rglob(self.glob) if self.recursive else p.glob(self.glob)) items = list(
p.rglob(self.glob) if self.recursive else p.glob(self.glob))
pbar = None pbar = None
if self.show_progress: if self.show_progress:
@ -64,27 +81,22 @@ class DirectoryLoader(BaseLoader):
except ImportError as e: except ImportError as e:
logger.warning( logger.warning(
"To log the progress of DirectoryLoader you need to install tqdm, " "To log the progress of DirectoryLoader you need to install tqdm, "
"`pip install tqdm`" "`pip install tqdm`")
)
if self.silent_errors: if self.silent_errors:
logger.warning(e) logger.warning(e)
else: else:
raise e raise e
for i in items: if self.parallel:
if i.is_file(): results = process_map(self._load_file, items) # use default max_workers
if _is_visible(i.relative_to(p)) or self.load_hidden: else:
try: results = process_map(self._load_file, items, max_workers=1)
sub_docs = self.loader_cls(str(i), **self.loader_kwargs).load()
docs.extend(sub_docs) # Combine the results
except Exception as e: for sub_docs in results:
if self.silent_errors: docs.extend(sub_docs)
logger.warning(e) if pbar:
else: pbar.update(1)
raise e
finally:
if pbar:
pbar.update(1)
if pbar: if pbar:
pbar.close() pbar.close()

Loading…
Cancel
Save