diff --git a/docs/modules/indexes/document_loaders/examples/alibaba_cloud_maxcompute.ipynb b/docs/modules/indexes/document_loaders/examples/alibaba_cloud_maxcompute.ipynb new file mode 100644 index 00000000..847a035e --- /dev/null +++ b/docs/modules/indexes/document_loaders/examples/alibaba_cloud_maxcompute.ipynb @@ -0,0 +1,256 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f08772b0", + "metadata": {}, + "source": [ + "# Alibaba Cloud MaxCompute\n", + "\n", + ">[Alibaba Cloud MaxCompute](https://www.alibabacloud.com/product/maxcompute) (previously known as ODPS) is a general purpose, fully managed, multi-tenancy data processing platform for large-scale data warehousing. MaxCompute supports various data importing solutions and distributed computing models, enabling users to effectively query massive datasets, reduce production costs, and ensure data security.\n", + "\n", + "The `MaxComputeLoader` lets you execute a MaxCompute SQL query and loads the results as one document per row." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "067b7213", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Collecting pyodps\n", + " Downloading pyodps-0.11.4.post0-cp39-cp39-macosx_10_9_universal2.whl (2.0 MB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m2.0/2.0 MB\u001b[0m \u001b[31m1.7 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m00:01\u001b[0m00:01\u001b[0m0m\n", + "\u001b[?25hRequirement already satisfied: charset-normalizer>=2 in /Users/newboy/anaconda3/envs/langchain/lib/python3.9/site-packages (from pyodps) (3.1.0)\n", + "Requirement already satisfied: urllib3<2.0,>=1.26.0 in /Users/newboy/anaconda3/envs/langchain/lib/python3.9/site-packages (from pyodps) (1.26.15)\n", + "Requirement already satisfied: idna>=2.5 in /Users/newboy/anaconda3/envs/langchain/lib/python3.9/site-packages (from pyodps) (3.4)\n", + "Requirement already satisfied: certifi>=2017.4.17 in /Users/newboy/anaconda3/envs/langchain/lib/python3.9/site-packages (from pyodps) (2023.5.7)\n", + "Installing collected packages: pyodps\n", + "Successfully installed pyodps-0.11.4.post0\n" + ] + } + ], + "source": [ + "!pip install pyodps" + ] + }, + { + "cell_type": "markdown", + "id": "19641457", + "metadata": {}, + "source": [ + "## Basic Usage\n", + "To instantiate the loader you'll need a SQL query to execute, your MaxCompute endpoint and project name, and you access ID and secret access key. The access ID and secret access key can either be passed in direct via the `access_id` and `secret_access_key` parameters or they can be set as environment variables `MAX_COMPUTE_ACCESS_ID` and `MAX_COMPUTE_SECRET_ACCESS_KEY`." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "71a0da4b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from langchain.document_loaders import MaxComputeLoader" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "d4770c4a", + "metadata": {}, + "outputs": [], + "source": [ + "base_query = \"\"\"\n", + "SELECT *\n", + "FROM (\n", + " SELECT 1 AS id, 'content1' AS content, 'meta_info1' AS meta_info\n", + " UNION ALL\n", + " SELECT 2 AS id, 'content2' AS content, 'meta_info2' AS meta_info\n", + " UNION ALL\n", + " SELECT 3 AS id, 'content3' AS content, 'meta_info3' AS meta_info\n", + ") mydata;\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1616c174", + "metadata": {}, + "outputs": [], + "source": [ + "endpoint=\"\"\n", + "project=\"\"\n", + "ACCESS_ID = \"\"\n", + "SECRET_ACCESS_KEY = \"\"" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "e5c25041", + "metadata": {}, + "outputs": [], + "source": [ + "loader = MaxComputeLoader.from_params(\n", + " base_query,\n", + " endpoint,\n", + " project,\n", + " access_id=ACCESS_ID,\n", + " secret_access_key=SECRET_ACCESS_KEY,\n", + "\n", + ")\n", + "data = loader.load()" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "311e74ea", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[Document(page_content='id: 1\\ncontent: content1\\nmeta_info: meta_info1', metadata={}), Document(page_content='id: 2\\ncontent: content2\\nmeta_info: meta_info2', metadata={}), Document(page_content='id: 3\\ncontent: content3\\nmeta_info: meta_info3', metadata={})]\n" + ] + } + ], + "source": [ + "print(data)" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "a4d8c388", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "id: 1\n", + "content: content1\n", + "meta_info: meta_info1\n" + ] + } + ], + "source": [ + "print(data[0].page_content)" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "f2422e6c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{}\n" + ] + } + ], + "source": [ + "print(data[0].metadata)" + ] + }, + { + "cell_type": "markdown", + "id": "85e07e28", + "metadata": {}, + "source": [ + "## Specifying Which Columns are Content vs Metadata\n", + "You can configure which subset of columns should be loaded as the contents of the Document and which as the metadata using the `page_content_columns` and `metadata_columns` parameters." + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "a7b9d726", + "metadata": {}, + "outputs": [], + "source": [ + "loader = MaxComputeLoader.from_params(\n", + " base_query,\n", + " endpoint,\n", + " project,\n", + " page_content_columns=[\"content\"], # Specify Document page content\n", + " metadata_columns=[\"id\", \"meta_info\"], # Specify Document metadata\n", + " access_id=ACCESS_ID,\n", + " secret_access_key=SECRET_ACCESS_KEY,\n", + ")\n", + "data = loader.load()" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "532c19e9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "content: content1\n" + ] + } + ], + "source": [ + "print(data[0].page_content)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "5fe4990a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'id': 1, 'meta_info': 'meta_info1'}\n" + ] + } + ], + "source": [ + "print(data[0].metadata)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/langchain/document_loaders/__init__.py b/langchain/document_loaders/__init__.py index d89718ac..d3769613 100644 --- a/langchain/document_loaders/__init__.py +++ b/langchain/document_loaders/__init__.py @@ -52,6 +52,7 @@ from langchain.document_loaders.joplin import JoplinLoader from langchain.document_loaders.json_loader import JSONLoader from langchain.document_loaders.markdown import UnstructuredMarkdownLoader from langchain.document_loaders.mastodon import MastodonTootsLoader +from langchain.document_loaders.max_compute import MaxComputeLoader from langchain.document_loaders.mediawikidump import MWDumpLoader from langchain.document_loaders.modern_treasury import ModernTreasuryLoader from langchain.document_loaders.notebook import NotebookLoader @@ -172,6 +173,7 @@ __all__ = [ "MWDumpLoader", "MastodonTootsLoader", "MathpixPDFLoader", + "MaxComputeLoader", "ModernTreasuryLoader", "NotebookLoader", "NotionDBLoader", diff --git a/langchain/document_loaders/max_compute.py b/langchain/document_loaders/max_compute.py new file mode 100644 index 00000000..ee3c64ae --- /dev/null +++ b/langchain/document_loaders/max_compute.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from typing import Any, Iterator, List, Optional, Sequence + +from langchain.docstore.document import Document +from langchain.document_loaders.base import BaseLoader +from langchain.utilities.max_compute import MaxComputeAPIWrapper + + +class MaxComputeLoader(BaseLoader): + """Loads a query result from Alibaba Cloud MaxCompute table into documents.""" + + def __init__( + self, + query: str, + api_wrapper: MaxComputeAPIWrapper, + *, + page_content_columns: Optional[Sequence[str]] = None, + metadata_columns: Optional[Sequence[str]] = None, + ): + """Initialize Alibaba Cloud MaxCompute document loader. + + Args: + query: SQL query to execute. + api_wrapper: MaxCompute API wrapper. + page_content_columns: The columns to write into the `page_content` of the + Document. If unspecified, all columns will be written to `page_content`. + metadata_columns: The columns to write into the `metadata` of the Document. + If unspecified, all columns not added to `page_content` will be written. + """ + self.query = query + self.api_wrapper = api_wrapper + self.page_content_columns = page_content_columns + self.metadata_columns = metadata_columns + + @classmethod + def from_params( + cls, + query: str, + endpoint: str, + project: str, + *, + access_id: Optional[str] = None, + secret_access_key: Optional[str] = None, + **kwargs: Any, + ) -> MaxComputeLoader: + """Convenience constructor that builds the MaxCompute API wrapper from + given parameters. + + Args: + query: SQL query to execute. + endpoint: MaxCompute endpoint. + project: A project is a basic organizational unit of MaxCompute, which is + similar to a database. + access_id: MaxCompute access ID. Should be passed in directly or set as the + environment variable `MAX_COMPUTE_ACCESS_ID`. + secret_access_key: MaxCompute secret access key. Should be passed in + directly or set as the environment variable + `MAX_COMPUTE_SECRET_ACCESS_KEY`. + """ + api_wrapper = MaxComputeAPIWrapper.from_params( + endpoint, project, access_id=access_id, secret_access_key=secret_access_key + ) + return cls(query, api_wrapper, **kwargs) + + def lazy_load(self) -> Iterator[Document]: + for row in self.api_wrapper.query(self.query): + if self.page_content_columns: + page_content_data = { + k: v for k, v in row.items() if k in self.page_content_columns + } + else: + page_content_data = row + page_content = "\n".join(f"{k}: {v}" for k, v in page_content_data.items()) + if self.metadata_columns: + metadata = {k: v for k, v in row.items() if k in self.metadata_columns} + else: + metadata = {k: v for k, v in row.items() if k not in page_content_data} + yield Document(page_content=page_content, metadata=metadata) + + def load(self) -> List[Document]: + return list(self.lazy_load()) diff --git a/langchain/utilities/max_compute.py b/langchain/utilities/max_compute.py new file mode 100644 index 00000000..b54c322e --- /dev/null +++ b/langchain/utilities/max_compute.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Iterator, List, Optional + +from langchain.utils import get_from_env + +if TYPE_CHECKING: + from odps import ODPS + + +class MaxComputeAPIWrapper: + """Interface for querying Alibaba Cloud MaxCompute tables.""" + + def __init__(self, client: ODPS): + """Initialize MaxCompute document loader. + + Args: + client: odps.ODPS MaxCompute client object. + """ + self.client = client + + @classmethod + def from_params( + cls, + endpoint: str, + project: str, + *, + access_id: Optional[str] = None, + secret_access_key: Optional[str] = None, + ) -> MaxComputeAPIWrapper: + """Convenience constructor that builds the odsp.ODPS MaxCompute client from + given parameters. + + Args: + endpoint: MaxCompute endpoint. + project: A project is a basic organizational unit of MaxCompute, which is + similar to a database. + access_id: MaxCompute access ID. Should be passed in directly or set as the + environment variable `MAX_COMPUTE_ACCESS_ID`. + secret_access_key: MaxCompute secret access key. Should be passed in + directly or set as the environment variable + `MAX_COMPUTE_SECRET_ACCESS_KEY`. + """ + try: + from odps import ODPS + except ImportError as ex: + raise ImportError( + "Could not import pyodps python package. " + "Please install it with `pip install pyodps` or refer to " + "https://pyodps.readthedocs.io/." + ) from ex + access_id = access_id or get_from_env("access_id", "MAX_COMPUTE_ACCESS_ID") + secret_access_key = secret_access_key or get_from_env( + "secret_access_key", "MAX_COMPUTE_SECRET_ACCESS_KEY" + ) + client = ODPS( + access_id=access_id, + secret_access_key=secret_access_key, + project=project, + endpoint=endpoint, + ) + if not client.exist_project(project): + raise ValueError(f'The project "{project}" does not exist.') + + return cls(client) + + def lazy_query(self, query: str) -> Iterator[dict]: + # Execute SQL query. + with self.client.execute_sql(query).open_reader() as reader: + if reader.count == 0: + raise ValueError("Table contains no data.") + for record in reader: + yield {k: v for k, v in record} + + def query(self, query: str) -> List[dict]: + return list(self.lazy_query(query)) diff --git a/tests/integration_tests/document_loaders/test_max_compute.py b/tests/integration_tests/document_loaders/test_max_compute.py new file mode 100644 index 00000000..e69de29b