Bos loader (#11525)

**Description:**
Add  BaiduCloud BOS document loader.

---------

Co-authored-by: chenweixu01 <chenweixu01@baidu.com>
Co-authored-by: root <root@icoding-cwx.bcc-szzj.baidu.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
pull/11684/head^2
wemysschen 1 year ago committed by GitHub
parent fbb82608cd
commit 2363c02cf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,52 @@
from typing import Any, Iterator, List
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
class BaiduBOSDirectoryLoader(BaseLoader):
"""Load from `Baidu BOS directory`."""
def __init__(self, conf: Any, bucket: str, prefix: str = ""):
"""Initialize with BOS config, bucket and prefix.
:param conf(BosConfig): BOS config.
:param bucket(str): BOS bucket.
:param prefix(str): prefix.
"""
self.conf = conf
self.bucket = bucket
self.prefix = prefix
def load(self) -> List[Document]:
return list(self.lazy_load())
def lazy_load(self) -> Iterator[Document]:
"""Load documents."""
try:
from baidubce.services.bos.bos_client import BosClient
except ImportError:
raise ImportError(
"Please install bce-python-sdk with `pip install bce-python-sdk`."
)
client = BosClient(self.conf)
contents = []
marker = ""
while True:
response = client.list_objects(
bucket_name=self.bucket,
prefix=self.prefix,
marker=marker,
max_keys=1000,
)
contents_len = len(response.contents)
contents.extend(response.contents)
if response.is_truncated or contents_len < int(str(response.max_keys)):
break
marker = response.next_marker
from baidu_bos_file import BaiduBOSFileLoader
for content in contents:
if str(content.key).endswith("/"):
continue
loader = BaiduBOSFileLoader(self.conf, self.bucket, str(content.key))
yield loader.load()[0]

@ -0,0 +1,53 @@
import logging
import os
import tempfile
from typing import Any, Iterator, List
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from langchain.document_loaders.unstructured import UnstructuredFileLoader
logger = logging.getLogger(__name__)
class BaiduBOSFileLoader(BaseLoader):
"""Load from `Baidu Cloud BOS` file."""
def __init__(self, conf: Any, bucket: str, key: str):
"""Initialize with BOS config, bucket and key name.
:param conf(BceClientConfiguration): BOS config.
:param bucket(str): BOS bucket.
:param key(str): BOS file key.
"""
self.conf = conf
self.bucket = bucket
self.key = key
def load(self) -> List[Document]:
return list(self.lazy_load())
def lazy_load(self) -> Iterator[Document]:
"""Load documents."""
try:
from baidubce.services.bos.bos_client import BosClient
except ImportError:
raise ImportError(
"Please using `pip install bce-python-sdk`"
+ " before import bos related package."
)
# Initialize BOS Client
client = BosClient(self.conf)
with tempfile.TemporaryDirectory() as temp_dir:
file_path = f"{temp_dir}/{self.bucket}/{self.key}"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Download the file to a destination
logger.debug(f"get object key {self.key} to file {file_path}")
client.get_object_to_file(self.bucket, self.key, file_path)
try:
loader = UnstructuredFileLoader(file_path)
documents = loader.load()
return iter(documents)
except Exception as ex:
logger.error(f"load document error = {ex}")
return iter([Document(page_content="")])
Loading…
Cancel
Save