mirror of https://github.com/hwchase17/langchain
Bagatur/lakefs loader2 (#12524)
Co-authored-by: Jonathan Rosenberg <96974219+Jonathan-Rosenberg@users.noreply.github.com>pull/12590/head
parent
3243dcc83e
commit
9bedda50f2
@ -0,0 +1,103 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# lakeFS\n",
|
||||
"\n",
|
||||
">[lakeFS](https://docs.lakefs.io/) provides scalable version control over the data lake, and uses Git-like semantics to create and access those versions.\n",
|
||||
"\n",
|
||||
"This notebooks covers how to load document objects from a `lakeFS` path (whether it's an object or a prefix).\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"source": [
|
||||
"## Initializing the lakeFS loader\n",
|
||||
"\n",
|
||||
"Replace `ENDPOINT`, `LAKEFS_ACCESS_KEY`, and `LAKEFS_SECRET_KEY` values with your own."
|
||||
],
|
||||
"metadata": {
|
||||
"collapsed": false
|
||||
}
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from langchain.document_loaders import LakeFSLoader"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"ENDPOINT = \"\"\n",
|
||||
"LAKEFS_ACCESS_KEY = \"\"\n",
|
||||
"LAKEFS_SECRET_KEY = \"\"\n",
|
||||
"\n",
|
||||
"lakefs_loader = LakeFSLoader(\n",
|
||||
" lakefs_access_key=LAKEFS_ACCESS_KEY,\n",
|
||||
" lakefs_secret_key=LAKEFS_SECRET_KEY,\n",
|
||||
" lakefs_endpoint=ENDPOINT,\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"source": [
|
||||
"## Specifying a path\n",
|
||||
"You can specify a prefix or a complete object path to control which files to load.\n",
|
||||
"\n",
|
||||
"Specify the repository, reference (branch, commit id, or tag), and path in the corresponding `REPO`, `REF`, and `PATH` to load the documents from:"
|
||||
],
|
||||
"metadata": {
|
||||
"collapsed": false
|
||||
}
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"REPO = \"\"\n",
|
||||
"REF = \"\"\n",
|
||||
"PATH = \"\"\n",
|
||||
"\n",
|
||||
"lakefs_loader.set_repo(REPO)\n",
|
||||
"lakefs_loader.set_ref(REF)\n",
|
||||
"lakefs_loader.set_path(PATH)\n",
|
||||
"\n",
|
||||
"docs = lakefs_loader.load()\n",
|
||||
"docs"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.1"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -0,0 +1,179 @@
|
||||
import os
|
||||
import tempfile
|
||||
import urllib.parse
|
||||
from typing import Any, List, Optional
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from langchain.document_loaders.base import BaseLoader
|
||||
from langchain.document_loaders.unstructured import UnstructuredBaseLoader
|
||||
from langchain.schema import Document
|
||||
|
||||
|
||||
class LakeFSClient:
|
||||
def __init__(
|
||||
self,
|
||||
lakefs_access_key: str,
|
||||
lakefs_secret_key: str,
|
||||
lakefs_endpoint: str,
|
||||
):
|
||||
self.__endpoint = "/".join([lakefs_endpoint, "api", "v1/"])
|
||||
self.__auth = HTTPBasicAuth(lakefs_access_key, lakefs_secret_key)
|
||||
try:
|
||||
health_check = requests.get(
|
||||
urljoin(self.__endpoint, "healthcheck"), auth=self.__auth
|
||||
)
|
||||
health_check.raise_for_status()
|
||||
except Exception:
|
||||
raise ValueError(
|
||||
"lakeFS server isn't accessible. Make sure lakeFS is running."
|
||||
)
|
||||
|
||||
def ls_objects(
|
||||
self, repo: str, ref: str, path: str, presign: Optional[bool]
|
||||
) -> List:
|
||||
qp = {"prefix": path, "presign": presign}
|
||||
eqp = urllib.parse.urlencode(qp)
|
||||
objects_ls_endpoint = urljoin(
|
||||
self.__endpoint, f"repositories/{repo}/refs/{ref}/objects/ls?{eqp}"
|
||||
)
|
||||
olsr = requests.get(objects_ls_endpoint, auth=self.__auth)
|
||||
olsr.raise_for_status()
|
||||
olsr_json = olsr.json()
|
||||
return list(
|
||||
map(
|
||||
lambda res: (res["path"], res["physical_address"]), olsr_json["results"]
|
||||
)
|
||||
)
|
||||
|
||||
def is_presign_supported(self) -> bool:
|
||||
config_endpoint = self.__endpoint + "config"
|
||||
response = requests.get(config_endpoint, auth=self.__auth)
|
||||
response.raise_for_status()
|
||||
config = response.json()
|
||||
return config["storage_config"]["pre_sign_support"]
|
||||
|
||||
|
||||
class LakeFSLoader(BaseLoader):
|
||||
"""Load from `lakeFS`."""
|
||||
|
||||
repo: str
|
||||
ref: str
|
||||
path: str
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
lakefs_access_key: str,
|
||||
lakefs_secret_key: str,
|
||||
lakefs_endpoint: str,
|
||||
repo: Optional[str] = None,
|
||||
ref: Optional[str] = "main",
|
||||
path: Optional[str] = "",
|
||||
):
|
||||
"""
|
||||
|
||||
:param lakefs_access_key: [required] lakeFS server's access key
|
||||
:param lakefs_secret_key: [required] lakeFS server's secret key
|
||||
:param lakefs_endpoint: [required] lakeFS server's endpoint address,
|
||||
ex: https://example.my-lakefs.com
|
||||
:param repo: [optional, default = ''] target repository
|
||||
:param ref: [optional, default = 'main'] target ref (branch name,
|
||||
tag, or commit ID)
|
||||
:param path: [optional, default = ''] target path
|
||||
"""
|
||||
|
||||
self.__lakefs_client = LakeFSClient(
|
||||
lakefs_access_key, lakefs_secret_key, lakefs_endpoint
|
||||
)
|
||||
self.repo = "" if repo is None or repo == "" else str(repo)
|
||||
self.ref = "main" if ref is None or ref == "" else str(ref)
|
||||
self.path = "" if path is None else str(path)
|
||||
|
||||
def set_path(self, path: str) -> None:
|
||||
self.path = path
|
||||
|
||||
def set_ref(self, ref: str) -> None:
|
||||
self.ref = ref
|
||||
|
||||
def set_repo(self, repo: str) -> None:
|
||||
self.repo = repo
|
||||
|
||||
def load(self) -> List[Document]:
|
||||
self.__validate_instance()
|
||||
presigned = self.__lakefs_client.is_presign_supported()
|
||||
docs: List[Document] = []
|
||||
objs = self.__lakefs_client.ls_objects(
|
||||
repo=self.repo, ref=self.ref, path=self.path, presign=presigned
|
||||
)
|
||||
for obj in objs:
|
||||
lakefs_unstructured_loader = UnstructuredLakeFSLoader(
|
||||
obj[1], self.repo, self.ref, obj[0], presigned
|
||||
)
|
||||
docs.extend(lakefs_unstructured_loader.load())
|
||||
return docs
|
||||
|
||||
def __validate_instance(self) -> None:
|
||||
if self.repo is None or self.repo == "":
|
||||
raise ValueError(
|
||||
"no repository was provided. use `set_repo` to specify a repository"
|
||||
)
|
||||
if self.ref is None or self.ref == "":
|
||||
raise ValueError("no ref was provided. use `set_ref` to specify a ref")
|
||||
if self.path is None:
|
||||
raise ValueError("no path was provided. use `set_path` to specify a path")
|
||||
|
||||
|
||||
class UnstructuredLakeFSLoader(UnstructuredBaseLoader):
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
repo: str,
|
||||
ref: str = "main",
|
||||
path: str = "",
|
||||
presign: bool = True,
|
||||
**unstructured_kwargs: Any,
|
||||
):
|
||||
"""
|
||||
|
||||
Args:
|
||||
|
||||
:param lakefs_access_key:
|
||||
:param lakefs_secret_key:
|
||||
:param lakefs_endpoint:
|
||||
:param repo:
|
||||
:param ref:
|
||||
"""
|
||||
|
||||
super().__init__(**unstructured_kwargs)
|
||||
self.url = url
|
||||
self.repo = repo
|
||||
self.ref = ref
|
||||
self.path = path
|
||||
self.presign = presign
|
||||
|
||||
def _get_metadata(self) -> dict:
|
||||
return {"repo": self.repo, "ref": self.ref, "path": self.path}
|
||||
|
||||
def _get_elements(self) -> List:
|
||||
from unstructured.partition.auto import partition
|
||||
|
||||
local_prefix = "local://"
|
||||
|
||||
if self.presign:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
file_path = f"{temp_dir}/{self.path.split('/')[-1]}"
|
||||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||||
response = requests.get(self.url)
|
||||
response.raise_for_status()
|
||||
with open(file_path, mode="wb") as file:
|
||||
file.write(response.content)
|
||||
return partition(filename=file_path)
|
||||
elif not self.url.startswith(local_prefix):
|
||||
raise ValueError(
|
||||
"Non pre-signed URLs are supported only with 'local' blockstore"
|
||||
)
|
||||
else:
|
||||
local_path = self.url[len(local_prefix) :]
|
||||
return partition(filename=local_path)
|
@ -0,0 +1,71 @@
|
||||
import unittest
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import requests_mock
|
||||
from requests_mock.mocker import Mocker
|
||||
|
||||
from langchain.document_loaders.lakefs import LakeFSLoader
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_lakefs_client() -> Any:
|
||||
with patch("langchain.document_loaders.lakefs.LakeFSClient") as mock_lakefs_client:
|
||||
mock_lakefs_client.return_value.ls_objects.return_value = [
|
||||
("path_bla.txt", "https://physical_address_bla")
|
||||
]
|
||||
mock_lakefs_client.return_value.is_presign_supported.return_value = True
|
||||
yield mock_lakefs_client.return_value
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_lakefs_client_no_presign_not_local() -> Any:
|
||||
with patch("langchain.document_loaders.lakefs.LakeFSClient") as mock_lakefs_client:
|
||||
mock_lakefs_client.return_value.ls_objects.return_value = [
|
||||
("path_bla.txt", "https://physical_address_bla")
|
||||
]
|
||||
mock_lakefs_client.return_value.is_presign_supported.return_value = False
|
||||
yield mock_lakefs_client.return_value
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_unstructured_local() -> Any:
|
||||
with patch(
|
||||
"langchain.document_loaders.lakefs.UnstructuredLakeFSLoader"
|
||||
) as mock_unstructured_lakefs:
|
||||
mock_unstructured_lakefs.return_value.load.return_value = [
|
||||
("text content", "pdf content")
|
||||
]
|
||||
yield mock_unstructured_lakefs.return_value
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_lakefs_client_no_presign_local() -> Any:
|
||||
with patch("langchain.document_loaders.lakefs.LakeFSClient") as mock_lakefs_client:
|
||||
mock_lakefs_client.return_value.ls_objects.return_value = [
|
||||
("path_bla.txt", "local:///physical_address_bla")
|
||||
]
|
||||
mock_lakefs_client.return_value.is_presign_supported.return_value = False
|
||||
yield mock_lakefs_client.return_value
|
||||
|
||||
|
||||
class TestLakeFSLoader(unittest.TestCase):
|
||||
lakefs_access_key = "lakefs_access_key"
|
||||
lakefs_secret_key = "lakefs_secret_key"
|
||||
endpoint = "endpoint"
|
||||
repo = "repo"
|
||||
ref = "ref"
|
||||
path = "path"
|
||||
|
||||
@requests_mock.Mocker()
|
||||
@pytest.mark.usefixtures("mock_lakefs_client")
|
||||
def test_presigned_loading(self, mocker: Mocker) -> None:
|
||||
mocker.register_uri("GET", requests_mock.ANY, text="data")
|
||||
loader = LakeFSLoader(
|
||||
self.lakefs_access_key, self.lakefs_secret_key, self.endpoint
|
||||
)
|
||||
loader.set_repo(self.repo)
|
||||
loader.set_ref(self.ref)
|
||||
loader.set_path(self.path)
|
||||
loader.load()
|
@ -0,0 +1,88 @@
|
||||
import unittest
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import requests_mock
|
||||
from requests_mock.mocker import Mocker
|
||||
|
||||
from langchain.document_loaders.lakefs import LakeFSLoader
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_lakefs_client() -> Any:
|
||||
with patch("langchain.document_loaders.lakefs.LakeFSClient") as mock_lakefs_client:
|
||||
mock_lakefs_client.return_value.ls_objects.return_value = [
|
||||
("path_bla.txt", "https://physical_address_bla")
|
||||
]
|
||||
mock_lakefs_client.return_value.is_presign_supported.return_value = True
|
||||
yield mock_lakefs_client.return_value
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_lakefs_client_no_presign_not_local() -> Any:
|
||||
with patch("langchain.document_loaders.lakefs.LakeFSClient") as mock_lakefs_client:
|
||||
mock_lakefs_client.return_value.ls_objects.return_value = [
|
||||
("path_bla.txt", "https://physical_address_bla")
|
||||
]
|
||||
mock_lakefs_client.return_value.is_presign_supported.return_value = False
|
||||
yield mock_lakefs_client.return_value
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_unstructured_local() -> Any:
|
||||
with patch(
|
||||
"langchain.document_loaders.lakefs.UnstructuredLakeFSLoader"
|
||||
) as mock_unstructured_lakefs:
|
||||
mock_unstructured_lakefs.return_value.load.return_value = [
|
||||
("text content", "pdf content")
|
||||
]
|
||||
yield mock_unstructured_lakefs.return_value
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_lakefs_client_no_presign_local() -> Any:
|
||||
with patch("langchain.document_loaders.lakefs.LakeFSClient") as mock_lakefs_client:
|
||||
mock_lakefs_client.return_value.ls_objects.return_value = [
|
||||
("path_bla.txt", "local:///physical_address_bla")
|
||||
]
|
||||
mock_lakefs_client.return_value.is_presign_supported.return_value = False
|
||||
yield mock_lakefs_client.return_value
|
||||
|
||||
|
||||
class TestLakeFSLoader(unittest.TestCase):
|
||||
lakefs_access_key = "lakefs_access_key"
|
||||
lakefs_secret_key = "lakefs_secret_key"
|
||||
endpoint = "endpoint"
|
||||
repo = "repo"
|
||||
ref = "ref"
|
||||
path = "path"
|
||||
|
||||
@requests_mock.Mocker()
|
||||
@pytest.mark.usefixtures("mock_lakefs_client_no_presign_not_local")
|
||||
def test_non_presigned_loading_fail(self, mocker: Mocker) -> None:
|
||||
mocker.register_uri(requests_mock.ANY, requests_mock.ANY, status_code=200)
|
||||
loader = LakeFSLoader(
|
||||
self.lakefs_access_key, self.lakefs_secret_key, self.endpoint
|
||||
)
|
||||
loader.set_repo(self.repo)
|
||||
loader.set_ref(self.ref)
|
||||
loader.set_path(self.path)
|
||||
with pytest.raises(ValueError):
|
||||
loader.load()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
@pytest.mark.usefixtures(
|
||||
"mock_lakefs_client_no_presign_local", "mock_unstructured_local"
|
||||
)
|
||||
def test_non_presigned_loading(self, mocker: Mocker) -> None:
|
||||
mocker.register_uri(requests_mock.ANY, requests_mock.ANY, status_code=200)
|
||||
loader = LakeFSLoader(
|
||||
lakefs_access_key="lakefs_access_key",
|
||||
lakefs_secret_key="lakefs_secret_key",
|
||||
lakefs_endpoint=self.endpoint,
|
||||
)
|
||||
loader.set_repo(self.repo)
|
||||
loader.set_ref(self.ref)
|
||||
loader.set_path(self.path)
|
||||
loader.load()
|
Loading…
Reference in New Issue