feat: Supprt pgvecto.rs as a VectorStore (#12718)

Supprt [pgvecto.rs](https://github.com/tensorchord/pgvecto.rs) as a new
VectorStore type.

This introduces a new dependency
[pgvecto_rs](https://pypi.org/project/pgvecto_rs/) and upgrade
SQLAlchemy to ^2.

Relate to https://github.com/tensorchord/pgvecto.rs/issues/11

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
盐粒 Yanli 2023-11-03 08:16:04 +08:00 committed by GitHub
parent 0cbdba6a9b
commit 1b233798a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 463 additions and 0 deletions

View File

@ -0,0 +1,214 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PGVecto.rs\n",
"\n",
"This notebook shows how to use functionality related to the Postgres vector database ([pgvecto.rs](https://github.com/tensorchord/pgvecto.rs)). You need to install SQLAlchemy >= 2 manually."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"## Loading Environment Variables\n",
"from dotenv import load_dotenv\n",
"\n",
"load_dotenv()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from typing import List\n",
"from langchain.embeddings.openai import OpenAIEmbeddings\n",
"from langchain.text_splitter import CharacterTextSplitter\n",
"from langchain.vectorstores.pgvecto_rs import PGVecto_rs\n",
"from langchain.document_loaders import TextLoader\n",
"from langchain.docstore.document import Document"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"loader = TextLoader(\"../../../state_of_the_union.txt\")\n",
"documents = loader.load()\n",
"text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n",
"docs = text_splitter.split_documents(documents)\n",
"\n",
"embeddings = OpenAIEmbeddings()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Start the database with the [official demo docker image](https://github.com/tensorchord/pgvecto.rs#installation)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"vscode": {
"languageId": "shellscript"
}
},
"outputs": [],
"source": [
"docker run --name pgvecto-rs-demo -e POSTGRES_PASSWORD=mysecretpassword -p 5432:5432 -d tensorchord/pgvecto-rs:latest"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Then contruct the db URL"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"## PGVecto.rs needs the connection string to the database.\n",
"## We will load it from the environment variables.\n",
"import os\n",
"\n",
"PORT = os.getenv(\"DB_PORT\", 5432)\n",
"HOST = os.getenv(\"DB_HOST\", \"localhost\")\n",
"USER = os.getenv(\"DB_USER\", \"postgres\")\n",
"PASS = os.getenv(\"DB_PASS\", \"mysecretpassword\")\n",
"DB_NAME = os.getenv(\"DB_NAME\", \"postgres\")\n",
"\n",
"# Run tests with shell:\n",
"URL = \"postgresql+psycopg://{username}:{password}@{host}:{port}/{db_name}\".format(\n",
" port=PORT,\n",
" host=HOST,\n",
" username=USER,\n",
" password=PASS,\n",
" db_name=DB_NAME,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, create the VectorStore from the documents:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"db1 = PGVecto_rs.from_documents(\n",
" documents=docs,\n",
" embedding=embeddings,\n",
" db_url=URL,\n",
" # The table name is f\"collection_{collection_name}\", so that it should be unique.\n",
" collection_name=\"state_of_the_union\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can connect to the table laterly with:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create new empty vectorstore with collection_name.\n",
"# Or connect to an existing vectorstore in database if exists.\n",
"# Arguments should be the same as when the vectorstore was created.\n",
"db1 = PGVecto_rs.from_collection_name(\n",
" embedding=embeddings,\n",
" db_url=URL,\n",
" collection_name=\"state_of_the_union\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Make sure that the user is permitted to create a table."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Similarity search with score"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Similarity Search with Euclidean Distance (Default)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"query = \"What did the president say about Ketanji Brown Jackson\"\n",
"docs: List[Document] = db1.similarity_search(query, k=4)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for doc in docs:\n",
" print(doc.page_content)\n",
" print(\"======================\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -0,0 +1,249 @@
from __future__ import annotations
import uuid
from typing import Any, Iterable, List, Literal, Optional, Tuple, Type
import numpy as np
import sqlalchemy
from sqlalchemy import insert, select
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.orm.session import Session
from langchain.schema import Document
from langchain.schema.embeddings import Embeddings
from langchain.schema.vectorstore import VectorStore
class _ORMBase(DeclarativeBase):
__tablename__: str
id: Mapped[uuid.UUID]
text: Mapped[str]
meta: Mapped[dict]
embedding: Mapped[np.ndarray]
class PGVecto_rs(VectorStore):
_engine: sqlalchemy.engine.Engine
_table: Type[_ORMBase]
_embedding: Embeddings
def __init__(
self,
embedding: Embeddings,
dimension: int,
db_url: str,
collection_name: str,
new_table: bool = False,
) -> None:
try:
from pgvecto_rs.sqlalchemy import Vector
except ImportError as e:
raise ImportError(
"Unable to import pgvector_rs, please install with "
"`pip install pgvector_rs`."
) from e
class _Table(_ORMBase):
__tablename__ = f"collection_{collection_name}"
id: Mapped[uuid.UUID] = mapped_column(
postgresql.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
text: Mapped[str] = mapped_column(sqlalchemy.String)
meta: Mapped[dict] = mapped_column(postgresql.JSONB)
embedding: Mapped[np.ndarray] = mapped_column(Vector(dimension))
self._engine = sqlalchemy.create_engine(db_url)
self._table = _Table
self._table.__table__.create(self._engine, checkfirst=not new_table) # type: ignore
self._embedding = embedding
# ================ Create interface =================
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
db_url: str = "",
collection_name: str = str(uuid.uuid4().hex),
**kwargs: Any,
) -> PGVecto_rs:
"""Return VectorStore initialized from texts and optional metadatas."""
sample_embedding = embedding.embed_query("Hello pgvecto_rs!")
dimension = len(sample_embedding)
if db_url is None:
raise ValueError("db_url must be provided")
_self: PGVecto_rs = cls(
embedding=embedding,
dimension=dimension,
db_url=db_url,
collection_name=collection_name,
new_table=True,
)
_self.add_texts(texts, metadatas, **kwargs)
return _self
@classmethod
def from_documents(
cls,
documents: List[Document],
embedding: Embeddings,
db_url: str = "",
collection_name: str = str(uuid.uuid4().hex),
**kwargs: Any,
) -> PGVecto_rs:
"""Return VectorStore initialized from documents."""
texts = [document.page_content for document in documents]
metadatas = [document.metadata for document in documents]
return cls.from_texts(
texts, embedding, metadatas, db_url, collection_name, **kwargs
)
@classmethod
def from_collection_name(
cls,
embedding: Embeddings,
db_url: str,
collection_name: str,
) -> PGVecto_rs:
"""Create new empty vectorstore with collection_name.
Or connect to an existing vectorstore in database if exists.
Arguments should be the same as when the vectorstore was created."""
sample_embedding = embedding.embed_query("Hello pgvecto_rs!")
return cls(
embedding=embedding,
dimension=len(sample_embedding),
db_url=db_url,
collection_name=collection_name,
)
# ================ Insert interface =================
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
kwargs: vectorstore specific parameters
Returns:
List of ids of the added texts.
"""
embeddings = self._embedding.embed_documents(list(texts))
with Session(self._engine) as _session:
results: List[str] = []
for text, embedding, metadata in zip(
texts, embeddings, metadatas or [dict()] * len(list(texts))
):
t = insert(self._table).values(
text=text, meta=metadata, embedding=embedding
)
id = _session.execute(t).inserted_primary_key[0] # type: ignore
results.append(str(id))
_session.commit()
return results
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
"""Run more documents through the embeddings and add to the vectorstore.
Args:
documents (List[Document]): List of documents to add to the vectorstore.
Returns:
List of ids of the added documents.
"""
return self.add_texts(
[document.page_content for document in documents],
[document.metadata for document in documents],
**kwargs,
)
# ================ Query interface =================
def similarity_search_with_score_by_vector(
self,
query_vector: List[float],
k: int = 4,
distance_func: Literal[
"sqrt_euclid", "neg_dot_prod", "ned_cos"
] = "sqrt_euclid",
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query vector, with its score."""
with Session(self._engine) as _session:
real_distance_func = (
self._table.embedding.squared_euclidean_distance
if distance_func == "sqrt_euclid"
else self._table.embedding.negative_dot_product_distance
if distance_func == "neg_dot_prod"
else self._table.embedding.negative_cosine_distance
if distance_func == "ned_cos"
else None
)
if real_distance_func is None:
raise ValueError("Invalid distance function")
t = (
select(self._table, real_distance_func(query_vector).label("score"))
.order_by("score")
.limit(k) # type: ignore
)
return [
(Document(page_content=row[0].text, metadata=row[0].meta), row[1])
for row in _session.execute(t)
]
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
distance_func: Literal[
"sqrt_euclid", "neg_dot_prod", "ned_cos"
] = "sqrt_euclid",
**kwargs: Any,
) -> List[Document]:
return [
doc
for doc, score in self.similarity_search_with_score_by_vector(
embedding, k, distance_func, **kwargs
)
]
def similarity_search_with_score(
self,
query: str,
k: int = 4,
distance_func: Literal[
"sqrt_euclid", "neg_dot_prod", "ned_cos"
] = "sqrt_euclid",
**kwargs: Any,
) -> List[Tuple[Document, float]]:
query_vector = self._embedding.embed_query(query)
return self.similarity_search_with_score_by_vector(
query_vector, k, distance_func, **kwargs
)
def similarity_search(
self,
query: str,
k: int = 4,
distance_func: Literal[
"sqrt_euclid", "neg_dot_prod", "ned_cos"
] = "sqrt_euclid",
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to query."""
query_vector = self._embedding.embed_query(query)
return [
doc
for doc, score in self.similarity_search_with_score_by_vector(
query_vector, k, distance_func, **kwargs
)
]