From 2363c02cf3838b611164c01c5767aff560e78e0a Mon Sep 17 00:00:00 2001 From: wemysschen <38650638+wemysschen@users.noreply.github.com> Date: Thu, 12 Oct 2023 05:43:48 +0800 Subject: [PATCH] Bos loader (#11525) **Description:** Add BaiduCloud BOS document loader. --------- Co-authored-by: chenweixu01 Co-authored-by: root Co-authored-by: Bagatur --- .../baiducloud_bos_directory.py | 52 ++++++++++++++++++ .../document_loaders/baiducloud_bos_file.py | 53 +++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 libs/langchain/langchain/document_loaders/baiducloud_bos_directory.py create mode 100644 libs/langchain/langchain/document_loaders/baiducloud_bos_file.py diff --git a/libs/langchain/langchain/document_loaders/baiducloud_bos_directory.py b/libs/langchain/langchain/document_loaders/baiducloud_bos_directory.py new file mode 100644 index 0000000000..2dd23a60c9 --- /dev/null +++ b/libs/langchain/langchain/document_loaders/baiducloud_bos_directory.py @@ -0,0 +1,52 @@ +from typing import Any, Iterator, List + +from langchain.docstore.document import Document +from langchain.document_loaders.base import BaseLoader + + +class BaiduBOSDirectoryLoader(BaseLoader): + """Load from `Baidu BOS directory`.""" + + def __init__(self, conf: Any, bucket: str, prefix: str = ""): + """Initialize with BOS config, bucket and prefix. + :param conf(BosConfig): BOS config. + :param bucket(str): BOS bucket. + :param prefix(str): prefix. + """ + self.conf = conf + self.bucket = bucket + self.prefix = prefix + + def load(self) -> List[Document]: + return list(self.lazy_load()) + + def lazy_load(self) -> Iterator[Document]: + """Load documents.""" + try: + from baidubce.services.bos.bos_client import BosClient + except ImportError: + raise ImportError( + "Please install bce-python-sdk with `pip install bce-python-sdk`." + ) + client = BosClient(self.conf) + contents = [] + marker = "" + while True: + response = client.list_objects( + bucket_name=self.bucket, + prefix=self.prefix, + marker=marker, + max_keys=1000, + ) + contents_len = len(response.contents) + contents.extend(response.contents) + if response.is_truncated or contents_len < int(str(response.max_keys)): + break + marker = response.next_marker + from baidu_bos_file import BaiduBOSFileLoader + + for content in contents: + if str(content.key).endswith("/"): + continue + loader = BaiduBOSFileLoader(self.conf, self.bucket, str(content.key)) + yield loader.load()[0] diff --git a/libs/langchain/langchain/document_loaders/baiducloud_bos_file.py b/libs/langchain/langchain/document_loaders/baiducloud_bos_file.py new file mode 100644 index 0000000000..a5cd78e4f2 --- /dev/null +++ b/libs/langchain/langchain/document_loaders/baiducloud_bos_file.py @@ -0,0 +1,53 @@ +import logging +import os +import tempfile +from typing import Any, Iterator, List + +from langchain.docstore.document import Document +from langchain.document_loaders.base import BaseLoader +from langchain.document_loaders.unstructured import UnstructuredFileLoader + +logger = logging.getLogger(__name__) + + +class BaiduBOSFileLoader(BaseLoader): + """Load from `Baidu Cloud BOS` file.""" + + def __init__(self, conf: Any, bucket: str, key: str): + """Initialize with BOS config, bucket and key name. + :param conf(BceClientConfiguration): BOS config. + :param bucket(str): BOS bucket. + :param key(str): BOS file key. + """ + self.conf = conf + self.bucket = bucket + self.key = key + + def load(self) -> List[Document]: + return list(self.lazy_load()) + + def lazy_load(self) -> Iterator[Document]: + """Load documents.""" + try: + from baidubce.services.bos.bos_client import BosClient + except ImportError: + raise ImportError( + "Please using `pip install bce-python-sdk`" + + " before import bos related package." + ) + + # Initialize BOS Client + client = BosClient(self.conf) + with tempfile.TemporaryDirectory() as temp_dir: + file_path = f"{temp_dir}/{self.bucket}/{self.key}" + os.makedirs(os.path.dirname(file_path), exist_ok=True) + # Download the file to a destination + logger.debug(f"get object key {self.key} to file {file_path}") + client.get_object_to_file(self.bucket, self.key, file_path) + try: + loader = UnstructuredFileLoader(file_path) + documents = loader.load() + return iter(documents) + except Exception as ex: + logger.error(f"load document error = {ex}") + return iter([Document(page_content="")])