Integrate Rockset as a document loader (#7681)

<!-- Thank you for contributing to LangChain!

Replace this comment with:
  - Description: a description of the change, 
  - Issue: the issue # it fixes (if applicable),
  - Dependencies: any dependencies required for this change,
- Tag maintainer: for a quicker response, tag the relevant maintainer
(see below),
- Twitter handle: we announce bigger features on Twitter. If your PR
gets announced and you'd like a mention, we'll gladly shout you out!

If you're adding a new integration, please include:
1. a test for the integration, preferably unit tests that do not rely on
network access,
  2. an example notebook showing its use.

Maintainer responsibilities:
  - General / Misc / if you don't know who to tag: @baskaryan
  - DataLoaders / VectorStores / Retrievers: @rlancemartin, @eyurtsev
  - Models / Prompts: @hwchase17, @baskaryan
  - Memory: @hwchase17
  - Agents / Tools / Toolkits: @hinthornw
  - Tracing / Callbacks: @agola11
  - Async: @agola11

If no one reviews your PR within a few days, feel free to @-mention the
same people again.

See contribution guidelines for more information on how to write/run
tests, lint, etc:
https://github.com/hwchase17/langchain/blob/master/.github/CONTRIBUTING.md
 -->

Integrate [Rockset](https://rockset.com/docs/) as a document loader.

Issue: None
Dependencies: Nothing new (rockset's dependency was already added
[here](https://github.com/hwchase17/langchain/pull/6216))
Tag maintainer: @rlancemartin

I have added a test for the integration and an example notebook showing
its use. I ran `make lint` and everything looks good.

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
pull/3524/head^2
Aarav Borthakur 1 year ago committed by GitHub
parent ad7d97670b
commit 210296a71f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,3 +17,10 @@ See a [usage example](/docs/modules/data_connection/vectorstores/integrations/ro
```python
from langchain.vectorstores import RocksetDB
```
## Document Loader
See a [usage example](docs/modules/data_connection/document_loaders/integrations/rockset).
```python
from langchain.document_loaders import RocksetLoader
```

@ -0,0 +1,251 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Rockset\n",
"\n",
"> Rockset is a real-time analytics database which enables queries on massive, semi-structured data without operational burden. With Rockset, ingested data is queryable within one second and analytical queries against that data typically execute in milliseconds. Rockset is compute optimized, making it suitable for serving high concurrency applications in the sub-100TB range (or larger than 100s of TBs with rollups).\n",
"\n",
"This notebook demonstrates how to use Rockset as a document loader in langchain. To get started, make sure you have a Rockset account and an API key available.\n",
"\n",
"\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setting up the environment\n",
"\n",
"1. Go to the [Rockset console](https://console.rockset.com/apikeys) and get an API key. Find your API region from the [API reference](https://rockset.com/docs/rest-api/#introduction). For the purpose of this notebook, we will assume you're using Rockset from `Oregon(us-west-2)`.\n",
"2. Set your the environment variable `ROCKSET_API_KEY`.\n",
"3. Install the Rockset python client, which will be used by langchain to interact with the Rockset database."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"vscode": {
"languageId": "shellscript"
}
},
"outputs": [],
"source": [
"$ pip3 install rockset"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Loading Documents\n",
"The Rockset integration with LangChain allows you to load documents from Rockset collections with SQL queries. In order to do this you must construct a `RocksetLoader` object. Here is an example snippet that initializes a `RocksetLoader`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.document_loaders import RocksetLoader\n",
"from rockset import RocksetClient, Regions, models\n",
"\n",
"loader = RocksetLoader(\n",
" RocksetClient(Regions.usw2a1, \"<api key>\"),\n",
" models.QueryRequestSql(query=\"SELECT * FROM langchain_demo LIMIT 3\"), # SQL query\n",
" [\"text\"], # content columns\n",
" metadata_keys=[\"id\", \"date\"], # metadata columns\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Here, you can see that the following query is run:\n",
"\n",
"```sql\n",
"SELECT * FROM langchain_demo LIMIT 3\n",
"```\n",
"\n",
"The `text` column in the collection is used as the page content, and the record's `id` and `date` columns are used as metadata (if you do not pass anything into `metadata_keys`, the whole Rockset document will be used as metadata). \n",
"\n",
"To execute the query and access an iterator over the resulting `Document`s, run:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"loader.lazy_load()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"To execute the query and access all resulting `Document`s at once, run:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"loader.load()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Here is an example response of `loader.load()`:\n",
"```python\n",
"[\n",
" Document(\n",
" page_content=\"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas a libero porta, dictum ipsum eget, hendrerit neque. Morbi blandit, ex ut suscipit viverra, enim velit tincidunt tellus, a tempor velit nunc et ex. Proin hendrerit odio nec convallis lobortis. Aenean in purus dolor. Vestibulum orci orci, laoreet eget magna in, commodo euismod justo.\", \n",
" metadata={\"id\": 83209, \"date\": \"2022-11-13T18:26:45.000000Z\"}\n",
" ),\n",
" Document(\n",
" page_content=\"Integer at finibus odio. Nam sit amet enim cursus lacus gravida feugiat vestibulum sed libero. Aenean eleifend est quis elementum tincidunt. Curabitur sit amet ornare erat. Nulla id dolor ut magna volutpat sodales fringilla vel ipsum. Donec ultricies, lacus sed fermentum dignissim, lorem elit aliquam ligula, sed suscipit sapien purus nec ligula.\", \n",
" metadata={\"id\": 89313, \"date\": \"2022-11-13T18:28:53.000000Z\"}\n",
" ),\n",
" Document(\n",
" page_content=\"Morbi tortor enim, commodo id efficitur vitae, fringilla nec mi. Nullam molestie faucibus aliquet. Praesent a est facilisis, condimentum justo sit amet, viverra erat. Fusce volutpat nisi vel purus blandit, et facilisis felis accumsan. Phasellus luctus ligula ultrices tellus tempor hendrerit. Donec at ultricies leo.\", \n",
" metadata={\"id\": 87732, \"date\": \"2022-11-13T18:49:04.000000Z\"}\n",
" )\n",
"]\n",
"```"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using multiple columns as content\n",
"\n",
"You can choose to use multiple columns as content:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.document_loaders import RocksetLoader\n",
"from rockset import RocksetClient, Regions, models\n",
"\n",
"loader = RocksetLoader(\n",
" RocksetClient(Regions.usw2a1, \"<api key>\"),\n",
" models.QueryRequestSql(query=\"SELECT * FROM langchain_demo LIMIT 1 WHERE id=38\"),\n",
" [\"sentence1\", \"sentence2\"], # TWO content columns\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Assuming the \"sentence1\" field is `\"This is the first sentence.\"` and the \"sentence2\" field is `\"This is the second sentence.\"`, the `page_content` of the resulting `Document` would be:\n",
"\n",
"```\n",
"This is the first sentence.\n",
"This is the second sentence.\n",
"```\n",
"\n",
"You can define you own function to join content columns by setting the `content_columns_joiner` argument in the `RocksetLoader` constructor. `content_columns_joiner` is a method that takes in a `List[Tuple[str, Any]]]` as an argument, representing a list of tuples of (column name, column value). By default, this is a method that joins each column value with a new line.\n",
"\n",
"For example, if you wanted to join sentence1 and sentence2 with a space instead of a new line, you could set `content_columns_joiner` like so:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"RocksetLoader(\n",
" RocksetClient(Regions.usw2a1, \"<api key>\"),\n",
" models.QueryRequestSql(query=\"SELECT * FROM langchain_demo LIMIT 1 WHERE id=38\"),\n",
" [\"sentence1\", \"sentence2\"],\n",
" content_columns_joiner=lambda docs: \" \".join(\n",
" [doc[1] for doc in docs]\n",
" ), # join with space instead of /n\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"The `page_content` of the resulting `Document` would be:\n",
"\n",
"```\n",
"This is the first sentence. This is the second sentence.\n",
"```\n",
"\n",
"Oftentimes you want to include the column name in the `page_content`. You can do that like this:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"RocksetLoader(\n",
" RocksetClient(Regions.usw2a1, \"<api key>\"),\n",
" models.QueryRequestSql(query=\"SELECT * FROM langchain_demo LIMIT 1 WHERE id=38\"),\n",
" [\"sentence1\", \"sentence2\"],\n",
" content_columns_joiner=lambda docs: \"\\n\".join(\n",
" [f\"{doc[0]}: {doc[1]}\" for doc in docs]\n",
" ),\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"This would result in the following `page_content`:\n",
"\n",
"```\n",
"sentence1: This is the first sentence.\n",
"sentence2: This is the second sentence.\n",
"```"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "env",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.4"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -104,6 +104,7 @@ from langchain.document_loaders.readthedocs import ReadTheDocsLoader
from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader
from langchain.document_loaders.reddit import RedditPostsLoader
from langchain.document_loaders.roam import RoamLoader
from langchain.document_loaders.rocksetdb import RocksetLoader
from langchain.document_loaders.rst import UnstructuredRSTLoader
from langchain.document_loaders.rtf import UnstructuredRTFLoader
from langchain.document_loaders.s3_directory import S3DirectoryLoader
@ -248,6 +249,7 @@ __all__ = [
"RecursiveUrlLoader",
"RedditPostsLoader",
"RoamLoader",
"RocksetLoader",
"S3DirectoryLoader",
"S3FileLoader",
"SRTLoader",

@ -0,0 +1,115 @@
from typing import Any, Callable, Iterator, List, Optional, Tuple
from langchain.document_loaders.base import BaseLoader
from langchain.schema import Document
def default_joiner(docs: List[Tuple[str, Any]]) -> str:
return "\n".join([doc[1] for doc in docs])
class ColumnNotFoundError(Exception):
def __init__(self, missing_key: str, query: str):
super().__init__(f'Column "{missing_key}" not selected in query:\n{query}')
class RocksetLoader(BaseLoader):
"""Wrapper around Rockset db
To use, you should have the `rockset` python package installed.
Example:
.. code-block:: python
# This code will load 3 records from the "langchain_demo"
# collection as Documents, with the `text` column used as
# the content
from langchain.document_loaders import RocksetLoader
from rockset import RocksetClient, Regions, models
loader = RocksetLoader(
RocksetClient(Regions.usw2a1, "<api key>"),
models.QueryRequestSql(
query="select * from langchain_demo limit 3"
),
["text"]
)
)
"""
def __init__(
self,
client: Any,
query: Any,
content_keys: List[str],
metadata_keys: Optional[List[str]] = None,
content_columns_joiner: Callable[[List[Tuple[str, Any]]], str] = default_joiner,
):
"""Initialize with Rockset client.
Args:
client: Rockset client object.
query: Rockset query object.
content_keys: The collection columns to be written into the `page_content`
of the Documents.
metadata_keys: The collection columns to be written into the `metadata` of
the Documents. By default, this is all the keys in the document.
content_columns_joiner: Method that joins content_keys and its values into a
string. It's method that takes in a List[Tuple[str, Any]]],
representing a list of tuples of (column name, column value).
By default, this is a method that joins each column value with a new
line. This method is only relevant if there are multiple content_keys.
"""
try:
from rockset import QueryPaginator, RocksetClient
from rockset.models import QueryRequestSql
except ImportError:
raise ImportError(
"Could not import rockset client python package. "
"Please install it with `pip install rockset`."
)
if not isinstance(client, RocksetClient):
raise ValueError(
f"client should be an instance of rockset.RocksetClient, "
f"got {type(client)}"
)
if not isinstance(query, QueryRequestSql):
raise ValueError(
f"query should be an instance of rockset.model.QueryRequestSql, "
f"got {type(query)}"
)
self.client = client
self.query = query
self.content_keys = content_keys
self.content_columns_joiner = content_columns_joiner
self.metadata_keys = metadata_keys
self.paginator = QueryPaginator
self.request_model = QueryRequestSql
def load(self) -> List[Document]:
return list(self.lazy_load())
def lazy_load(self) -> Iterator[Document]:
query_results = self.client.Queries.query(
sql=self.query
).results # execute the SQL query
for doc in query_results: # for each doc in the response
try:
yield Document(
page_content=self.content_columns_joiner(
[(col, doc[col]) for col in self.content_keys]
),
metadata={col: doc[col] for col in self.metadata_keys}
if self.metadata_keys is not None
else doc,
) # try to yield the Document
except (
KeyError
) as e: # either content_columns or metadata_columns is invalid
raise ColumnNotFoundError(
e.args[0], self.query
) # raise that the column isn't in the db schema

@ -0,0 +1,60 @@
import logging
import os
from langchain.docstore.document import Document
from langchain.document_loaders import RocksetLoader
logger = logging.getLogger(__name__)
def test_sql_query() -> None:
import rockset
assert os.environ.get("ROCKSET_API_KEY") is not None
assert os.environ.get("ROCKSET_REGION") is not None
api_key = os.environ.get("ROCKSET_API_KEY")
region = os.environ.get("ROCKSET_REGION")
if region == "use1a1":
host = rockset.Regions.use1a1
elif region == "usw2a1":
host = rockset.Regions.usw2a1
elif region == "euc1a1":
host = rockset.Regions.euc1a1
elif region == "dev":
host = rockset.DevRegions.usw2a1
else:
logger.warning(
"Using ROCKSET_REGION:%s as it is.. \
You should know what you're doing...",
region,
)
host = region
client = rockset.RocksetClient(host, api_key)
col_1 = "Rockset is a real-time analytics database which enables queries on massive, semi-structured data without operational burden. Rockset is serverless and fully managed. It offloads the work of managing configuration, cluster provisioning, denormalization, and shard / index management. Rockset is also SOC 2 Type II compliant and offers encryption at rest and in flight, securing and protecting any sensitive data. Most teams can ingest data into Rockset and start executing queries in less than 15 minutes." # noqa: E501
col_2 = 2
col_3 = "e903e069-b0b5-4b80-95e2-86471b41f55f"
id = 7320132
"""Run a simple SQL query query"""
loader = RocksetLoader(
client,
rockset.models.QueryRequestSql(
query=(
f"SELECT '{col_1}' AS col_1, {col_2} AS col_2, '{col_3}' AS col_3,"
f" {id} AS id"
)
),
["col_1"],
metadata_keys=["col_2", "col_3", "id"],
)
output = loader.load()
assert len(output) == 1
assert isinstance(output[0], Document)
assert output[0].page_content == col_1
assert output[0].metadata == {"col_2": col_2, "col_3": col_3, "id": id}
Loading…
Cancel
Save