mirror of https://github.com/hwchase17/langchain
community[minor]: Implemented Kinetica Document Loader and added notebooks (#20002)
- [ ] **Kinetica Document Loader**: "community: a class to load Documents from Kinetica" - [ ] **Kinetica Document Loader**: - **Description:** implemented KineticaLoader in `kinetica_loader.py` - **Dependencies:** install the Kinetica API using `pip install gpudb==7.2.0.1 `pull/20075/head^2
parent
5e60d65917
commit
b54b19ba1c
@ -0,0 +1,125 @@
|
|||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"# Kinetica\n",
|
||||||
|
"\n",
|
||||||
|
"This notebooks goes over how to load documents from Kinetica"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"%pip install gpudb==7.2.0.1"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from langchain_community.document_loaders.kinetica_loader import KineticaLoader"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"## Loading Environment Variables\n",
|
||||||
|
"import os\n",
|
||||||
|
"\n",
|
||||||
|
"from dotenv import load_dotenv\n",
|
||||||
|
"from langchain_community.vectorstores import (\n",
|
||||||
|
" KineticaSettings,\n",
|
||||||
|
")\n",
|
||||||
|
"\n",
|
||||||
|
"load_dotenv()"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"# Kinetica needs the connection to the database.\n",
|
||||||
|
"# This is how to set it up.\n",
|
||||||
|
"HOST = os.getenv(\"KINETICA_HOST\", \"http://127.0.0.1:9191\")\n",
|
||||||
|
"USERNAME = os.getenv(\"KINETICA_USERNAME\", \"\")\n",
|
||||||
|
"PASSWORD = os.getenv(\"KINETICA_PASSWORD\", \"\")\n",
|
||||||
|
"\n",
|
||||||
|
"\n",
|
||||||
|
"def create_config() -> KineticaSettings:\n",
|
||||||
|
" return KineticaSettings(host=HOST, username=USERNAME, password=PASSWORD)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from langchain_community.document_loaders.kinetica_loader import KineticaLoader\n",
|
||||||
|
"\n",
|
||||||
|
"# The following `QUERY` is an example which will not run; this\n",
|
||||||
|
"# needs to be substituted with a valid `QUERY` that will return\n",
|
||||||
|
"# data and the `SCHEMA.TABLE` combination must exist in Kinetica.\n",
|
||||||
|
"\n",
|
||||||
|
"QUERY = \"select text, survey_id from SCHEMA.TABLE limit 10\"\n",
|
||||||
|
"kinetica_loader = KineticaLoader(\n",
|
||||||
|
" QUERY,\n",
|
||||||
|
" HOST,\n",
|
||||||
|
" USERNAME,\n",
|
||||||
|
" PASSWORD,\n",
|
||||||
|
")\n",
|
||||||
|
"kinetica_documents = kinetica_loader.load()\n",
|
||||||
|
"print(kinetica_documents)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from langchain_community.document_loaders.kinetica_loader import KineticaLoader\n",
|
||||||
|
"\n",
|
||||||
|
"# The following `QUERY` is an example which will not run; this\n",
|
||||||
|
"# needs to be substituted with a valid `QUERY` that will return\n",
|
||||||
|
"# data and the `SCHEMA.TABLE` combination must exist in Kinetica.\n",
|
||||||
|
"\n",
|
||||||
|
"QUERY = \"select text, survey_id as source from SCHEMA.TABLE limit 10\"\n",
|
||||||
|
"snowflake_loader = KineticaLoader(\n",
|
||||||
|
" query=QUERY,\n",
|
||||||
|
" host=HOST,\n",
|
||||||
|
" username=USERNAME,\n",
|
||||||
|
" password=PASSWORD,\n",
|
||||||
|
" metadata_columns=[\"source\"],\n",
|
||||||
|
")\n",
|
||||||
|
"kinetica_documents = snowflake_loader.load()\n",
|
||||||
|
"print(kinetica_documents)"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": ".venv",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"name": "python",
|
||||||
|
"version": "3.8.10"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
@ -0,0 +1,171 @@
|
|||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"# Kinetica Vectorstore based Retriever\n",
|
||||||
|
"\n",
|
||||||
|
">[Kinetica](https://www.kinetica.com/) is a database with integrated support for vector similarity search\n",
|
||||||
|
"\n",
|
||||||
|
"It supports:\n",
|
||||||
|
"- exact and approximate nearest neighbor search\n",
|
||||||
|
"- L2 distance, inner product, and cosine distance\n",
|
||||||
|
"\n",
|
||||||
|
"This notebook shows how to use a retriever based on Kinetica vector store (`Kinetica`)."
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"# Please ensure that this connector is installed in your working environment.\n",
|
||||||
|
"%pip install gpudb==7.2.0.1"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"We want to use `OpenAIEmbeddings` so we have to get the OpenAI API Key."
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 2,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"import getpass\n",
|
||||||
|
"import os\n",
|
||||||
|
"\n",
|
||||||
|
"os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"OpenAI API Key:\")"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"## Loading Environment Variables\n",
|
||||||
|
"from dotenv import load_dotenv\n",
|
||||||
|
"\n",
|
||||||
|
"load_dotenv()"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 5,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from langchain.docstore.document import Document\n",
|
||||||
|
"from langchain.text_splitter import CharacterTextSplitter\n",
|
||||||
|
"from langchain_community.document_loaders import TextLoader\n",
|
||||||
|
"from langchain_community.vectorstores import (\n",
|
||||||
|
" Kinetica,\n",
|
||||||
|
" KineticaSettings,\n",
|
||||||
|
")\n",
|
||||||
|
"from langchain_openai import OpenAIEmbeddings"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 6,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"# Kinetica needs the connection to the database.\n",
|
||||||
|
"# This is how to set it up.\n",
|
||||||
|
"HOST = os.getenv(\"KINETICA_HOST\", \"http://127.0.0.1:9191\")\n",
|
||||||
|
"USERNAME = os.getenv(\"KINETICA_USERNAME\", \"\")\n",
|
||||||
|
"PASSWORD = os.getenv(\"KINETICA_PASSWORD\", \"\")\n",
|
||||||
|
"OPENAI_API_KEY = os.getenv(\"OPENAI_API_KEY\", \"\")\n",
|
||||||
|
"\n",
|
||||||
|
"\n",
|
||||||
|
"def create_config() -> KineticaSettings:\n",
|
||||||
|
" return KineticaSettings(host=HOST, username=USERNAME, password=PASSWORD)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"## Create Retriever from vector store"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"loader = TextLoader(\"../../modules/state_of_the_union.txt\")\n",
|
||||||
|
"documents = loader.load()\n",
|
||||||
|
"text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n",
|
||||||
|
"docs = text_splitter.split_documents(documents)\n",
|
||||||
|
"\n",
|
||||||
|
"embeddings = OpenAIEmbeddings()\n",
|
||||||
|
"\n",
|
||||||
|
"# The Kinetica Module will try to create a table with the name of the collection.\n",
|
||||||
|
"# So, make sure that the collection name is unique and the user has the permission to create a table.\n",
|
||||||
|
"\n",
|
||||||
|
"COLLECTION_NAME = \"state_of_the_union_test\"\n",
|
||||||
|
"connection = create_config()\n",
|
||||||
|
"\n",
|
||||||
|
"db = Kinetica.from_documents(\n",
|
||||||
|
" embedding=embeddings,\n",
|
||||||
|
" documents=docs,\n",
|
||||||
|
" collection_name=COLLECTION_NAME,\n",
|
||||||
|
" config=connection,\n",
|
||||||
|
")\n",
|
||||||
|
"\n",
|
||||||
|
"# create retriever from the vector store\n",
|
||||||
|
"retriever = db.as_retriever(search_kwargs={\"k\": 2})"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"## Search with retriever"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"result = retriever.get_relevant_documents(\n",
|
||||||
|
" \"What did the president say about Ketanji Brown Jackson\"\n",
|
||||||
|
")\n",
|
||||||
|
"print(docs[0].page_content)"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": ".venv",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.8.10"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
@ -0,0 +1,103 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, Dict, Iterator, List, Optional, Tuple
|
||||||
|
|
||||||
|
from langchain_core.documents import Document
|
||||||
|
|
||||||
|
from langchain_community.document_loaders.base import BaseLoader
|
||||||
|
|
||||||
|
|
||||||
|
class KineticaLoader(BaseLoader):
|
||||||
|
"""Load from `Kinetica` API.
|
||||||
|
|
||||||
|
Each document represents one row of the result. The `page_content_columns`
|
||||||
|
are written into the `page_content` of the document. The `metadata_columns`
|
||||||
|
are written into the `metadata` of the document. By default, all columns
|
||||||
|
are written into the `page_content` and none into the `metadata`.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
query: str,
|
||||||
|
host: str,
|
||||||
|
username: str,
|
||||||
|
password: str,
|
||||||
|
parameters: Optional[Dict[str, Any]] = None,
|
||||||
|
page_content_columns: Optional[List[str]] = None,
|
||||||
|
metadata_columns: Optional[List[str]] = None,
|
||||||
|
):
|
||||||
|
"""Initialize Kinetica document loader.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query: The query to run in Kinetica.
|
||||||
|
parameters: Optional. Parameters to pass to the query.
|
||||||
|
page_content_columns: Optional. Columns written to Document `page_content`.
|
||||||
|
metadata_columns: Optional. Columns written to Document `metadata`.
|
||||||
|
"""
|
||||||
|
self.query = query
|
||||||
|
self.host = host
|
||||||
|
self.username = username
|
||||||
|
self.password = password
|
||||||
|
self.parameters = parameters
|
||||||
|
self.page_content_columns = page_content_columns
|
||||||
|
self.metadata_columns = metadata_columns if metadata_columns is not None else []
|
||||||
|
|
||||||
|
def _execute_query(self) -> List[Dict[str, Any]]:
|
||||||
|
try:
|
||||||
|
from gpudb import GPUdb, GPUdbSqlIterator
|
||||||
|
except ImportError:
|
||||||
|
raise ImportError(
|
||||||
|
"Could not import Kinetica python API. "
|
||||||
|
"Please install it with `pip install gpudb==7.2.0.1`."
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
options = GPUdb.Options()
|
||||||
|
options.username = self.username
|
||||||
|
options.password = self.password
|
||||||
|
|
||||||
|
conn = GPUdb(host=self.host, options=options)
|
||||||
|
|
||||||
|
with GPUdbSqlIterator(conn, self.query) as records:
|
||||||
|
column_names = records.type_map.keys()
|
||||||
|
query_result = [dict(zip(column_names, record)) for record in records]
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"An error occurred: {e}") # noqa: T201
|
||||||
|
query_result = []
|
||||||
|
|
||||||
|
return query_result
|
||||||
|
|
||||||
|
def _get_columns(
|
||||||
|
self, query_result: List[Dict[str, Any]]
|
||||||
|
) -> Tuple[List[str], List[str]]:
|
||||||
|
page_content_columns = (
|
||||||
|
self.page_content_columns if self.page_content_columns else []
|
||||||
|
)
|
||||||
|
metadata_columns = self.metadata_columns if self.metadata_columns else []
|
||||||
|
if page_content_columns is None and query_result:
|
||||||
|
page_content_columns = list(query_result[0].keys())
|
||||||
|
if metadata_columns is None:
|
||||||
|
metadata_columns = []
|
||||||
|
return page_content_columns or [], metadata_columns
|
||||||
|
|
||||||
|
def lazy_load(self) -> Iterator[Document]:
|
||||||
|
query_result = self._execute_query()
|
||||||
|
if isinstance(query_result, Exception):
|
||||||
|
print(f"An error occurred during the query: {query_result}") # noqa: T201
|
||||||
|
return []
|
||||||
|
page_content_columns, metadata_columns = self._get_columns(query_result)
|
||||||
|
if "*" in page_content_columns:
|
||||||
|
page_content_columns = list(query_result[0].keys())
|
||||||
|
for row in query_result:
|
||||||
|
page_content = "\n".join(
|
||||||
|
f"{k}: {v}" for k, v in row.items() if k in page_content_columns
|
||||||
|
)
|
||||||
|
metadata = {k: v for k, v in row.items() if k in metadata_columns}
|
||||||
|
doc = Document(page_content=page_content, metadata=metadata)
|
||||||
|
yield doc
|
||||||
|
|
||||||
|
def load(self) -> List[Document]:
|
||||||
|
"""Load data into document objects."""
|
||||||
|
return list(self.lazy_load())
|
Loading…
Reference in New Issue