From 1b233798a02ed5d191d346b86fb47c595e8c89e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=90=E7=B2=92=20Yanli?= Date: Fri, 3 Nov 2023 08:16:04 +0800 Subject: [PATCH] feat: Supprt pgvecto.rs as a VectorStore (#12718) Supprt [pgvecto.rs](https://github.com/tensorchord/pgvecto.rs) as a new VectorStore type. This introduces a new dependency [pgvecto_rs](https://pypi.org/project/pgvecto_rs/) and upgrade SQLAlchemy to ^2. Relate to https://github.com/tensorchord/pgvecto.rs/issues/11 --------- Co-authored-by: Bagatur --- .../vectorstores/pgvecto_rs.ipynb | 214 +++++++++++++++ .../langchain/vectorstores/pgvecto_rs.py | 249 ++++++++++++++++++ 2 files changed, 463 insertions(+) create mode 100644 docs/docs/integrations/vectorstores/pgvecto_rs.ipynb create mode 100644 libs/langchain/langchain/vectorstores/pgvecto_rs.py diff --git a/docs/docs/integrations/vectorstores/pgvecto_rs.ipynb b/docs/docs/integrations/vectorstores/pgvecto_rs.ipynb new file mode 100644 index 0000000000..8e8150f2ed --- /dev/null +++ b/docs/docs/integrations/vectorstores/pgvecto_rs.ipynb @@ -0,0 +1,214 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# PGVecto.rs\n", + "\n", + "This notebook shows how to use functionality related to the Postgres vector database ([pgvecto.rs](https://github.com/tensorchord/pgvecto.rs)). You need to install SQLAlchemy >= 2 manually." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "## Loading Environment Variables\n", + "from dotenv import load_dotenv\n", + "\n", + "load_dotenv()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import List\n", + "from langchain.embeddings.openai import OpenAIEmbeddings\n", + "from langchain.text_splitter import CharacterTextSplitter\n", + "from langchain.vectorstores.pgvecto_rs import PGVecto_rs\n", + "from langchain.document_loaders import TextLoader\n", + "from langchain.docstore.document import Document" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "loader = TextLoader(\"../../../state_of_the_union.txt\")\n", + "documents = loader.load()\n", + "text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n", + "docs = text_splitter.split_documents(documents)\n", + "\n", + "embeddings = OpenAIEmbeddings()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the database with the [official demo docker image](https://github.com/tensorchord/pgvecto.rs#installation)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "vscode": { + "languageId": "shellscript" + } + }, + "outputs": [], + "source": [ + "docker run --name pgvecto-rs-demo -e POSTGRES_PASSWORD=mysecretpassword -p 5432:5432 -d tensorchord/pgvecto-rs:latest" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then contruct the db URL" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "## PGVecto.rs needs the connection string to the database.\n", + "## We will load it from the environment variables.\n", + "import os\n", + "\n", + "PORT = os.getenv(\"DB_PORT\", 5432)\n", + "HOST = os.getenv(\"DB_HOST\", \"localhost\")\n", + "USER = os.getenv(\"DB_USER\", \"postgres\")\n", + "PASS = os.getenv(\"DB_PASS\", \"mysecretpassword\")\n", + "DB_NAME = os.getenv(\"DB_NAME\", \"postgres\")\n", + "\n", + "# Run tests with shell:\n", + "URL = \"postgresql+psycopg://{username}:{password}@{host}:{port}/{db_name}\".format(\n", + " port=PORT,\n", + " host=HOST,\n", + " username=USER,\n", + " password=PASS,\n", + " db_name=DB_NAME,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, create the VectorStore from the documents:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "db1 = PGVecto_rs.from_documents(\n", + " documents=docs,\n", + " embedding=embeddings,\n", + " db_url=URL,\n", + " # The table name is f\"collection_{collection_name}\", so that it should be unique.\n", + " collection_name=\"state_of_the_union\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can connect to the table laterly with:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create new empty vectorstore with collection_name.\n", + "# Or connect to an existing vectorstore in database if exists.\n", + "# Arguments should be the same as when the vectorstore was created.\n", + "db1 = PGVecto_rs.from_collection_name(\n", + " embedding=embeddings,\n", + " db_url=URL,\n", + " collection_name=\"state_of_the_union\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Make sure that the user is permitted to create a table." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Similarity search with score" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Similarity Search with Euclidean Distance (Default)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query = \"What did the president say about Ketanji Brown Jackson\"\n", + "docs: List[Document] = db1.similarity_search(query, k=4)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for doc in docs:\n", + " print(doc.page_content)\n", + " print(\"======================\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/libs/langchain/langchain/vectorstores/pgvecto_rs.py b/libs/langchain/langchain/vectorstores/pgvecto_rs.py new file mode 100644 index 0000000000..2471395295 --- /dev/null +++ b/libs/langchain/langchain/vectorstores/pgvecto_rs.py @@ -0,0 +1,249 @@ +from __future__ import annotations + +import uuid +from typing import Any, Iterable, List, Literal, Optional, Tuple, Type + +import numpy as np +import sqlalchemy +from sqlalchemy import insert, select +from sqlalchemy.dialects import postgresql +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column +from sqlalchemy.orm.session import Session + +from langchain.schema import Document +from langchain.schema.embeddings import Embeddings +from langchain.schema.vectorstore import VectorStore + + +class _ORMBase(DeclarativeBase): + __tablename__: str + id: Mapped[uuid.UUID] + text: Mapped[str] + meta: Mapped[dict] + embedding: Mapped[np.ndarray] + + +class PGVecto_rs(VectorStore): + _engine: sqlalchemy.engine.Engine + _table: Type[_ORMBase] + _embedding: Embeddings + + def __init__( + self, + embedding: Embeddings, + dimension: int, + db_url: str, + collection_name: str, + new_table: bool = False, + ) -> None: + try: + from pgvecto_rs.sqlalchemy import Vector + except ImportError as e: + raise ImportError( + "Unable to import pgvector_rs, please install with " + "`pip install pgvector_rs`." + ) from e + + class _Table(_ORMBase): + __tablename__ = f"collection_{collection_name}" + id: Mapped[uuid.UUID] = mapped_column( + postgresql.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + text: Mapped[str] = mapped_column(sqlalchemy.String) + meta: Mapped[dict] = mapped_column(postgresql.JSONB) + embedding: Mapped[np.ndarray] = mapped_column(Vector(dimension)) + + self._engine = sqlalchemy.create_engine(db_url) + self._table = _Table + self._table.__table__.create(self._engine, checkfirst=not new_table) # type: ignore + self._embedding = embedding + + # ================ Create interface ================= + @classmethod + def from_texts( + cls, + texts: List[str], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + db_url: str = "", + collection_name: str = str(uuid.uuid4().hex), + **kwargs: Any, + ) -> PGVecto_rs: + """Return VectorStore initialized from texts and optional metadatas.""" + sample_embedding = embedding.embed_query("Hello pgvecto_rs!") + dimension = len(sample_embedding) + if db_url is None: + raise ValueError("db_url must be provided") + _self: PGVecto_rs = cls( + embedding=embedding, + dimension=dimension, + db_url=db_url, + collection_name=collection_name, + new_table=True, + ) + _self.add_texts(texts, metadatas, **kwargs) + return _self + + @classmethod + def from_documents( + cls, + documents: List[Document], + embedding: Embeddings, + db_url: str = "", + collection_name: str = str(uuid.uuid4().hex), + **kwargs: Any, + ) -> PGVecto_rs: + """Return VectorStore initialized from documents.""" + texts = [document.page_content for document in documents] + metadatas = [document.metadata for document in documents] + return cls.from_texts( + texts, embedding, metadatas, db_url, collection_name, **kwargs + ) + + @classmethod + def from_collection_name( + cls, + embedding: Embeddings, + db_url: str, + collection_name: str, + ) -> PGVecto_rs: + """Create new empty vectorstore with collection_name. + Or connect to an existing vectorstore in database if exists. + Arguments should be the same as when the vectorstore was created.""" + sample_embedding = embedding.embed_query("Hello pgvecto_rs!") + return cls( + embedding=embedding, + dimension=len(sample_embedding), + db_url=db_url, + collection_name=collection_name, + ) + + # ================ Insert interface ================= + + def add_texts( + self, + texts: Iterable[str], + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> List[str]: + """Run more texts through the embeddings and add to the vectorstore. + + Args: + texts: Iterable of strings to add to the vectorstore. + metadatas: Optional list of metadatas associated with the texts. + kwargs: vectorstore specific parameters + + Returns: + List of ids of the added texts. + + """ + embeddings = self._embedding.embed_documents(list(texts)) + with Session(self._engine) as _session: + results: List[str] = [] + for text, embedding, metadata in zip( + texts, embeddings, metadatas or [dict()] * len(list(texts)) + ): + t = insert(self._table).values( + text=text, meta=metadata, embedding=embedding + ) + id = _session.execute(t).inserted_primary_key[0] # type: ignore + results.append(str(id)) + _session.commit() + return results + + def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]: + """Run more documents through the embeddings and add to the vectorstore. + + Args: + documents (List[Document]): List of documents to add to the vectorstore. + + Returns: + List of ids of the added documents. + """ + return self.add_texts( + [document.page_content for document in documents], + [document.metadata for document in documents], + **kwargs, + ) + + # ================ Query interface ================= + def similarity_search_with_score_by_vector( + self, + query_vector: List[float], + k: int = 4, + distance_func: Literal[ + "sqrt_euclid", "neg_dot_prod", "ned_cos" + ] = "sqrt_euclid", + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Return docs most similar to query vector, with its score.""" + with Session(self._engine) as _session: + real_distance_func = ( + self._table.embedding.squared_euclidean_distance + if distance_func == "sqrt_euclid" + else self._table.embedding.negative_dot_product_distance + if distance_func == "neg_dot_prod" + else self._table.embedding.negative_cosine_distance + if distance_func == "ned_cos" + else None + ) + if real_distance_func is None: + raise ValueError("Invalid distance function") + + t = ( + select(self._table, real_distance_func(query_vector).label("score")) + .order_by("score") + .limit(k) # type: ignore + ) + return [ + (Document(page_content=row[0].text, metadata=row[0].meta), row[1]) + for row in _session.execute(t) + ] + + def similarity_search_by_vector( + self, + embedding: List[float], + k: int = 4, + distance_func: Literal[ + "sqrt_euclid", "neg_dot_prod", "ned_cos" + ] = "sqrt_euclid", + **kwargs: Any, + ) -> List[Document]: + return [ + doc + for doc, score in self.similarity_search_with_score_by_vector( + embedding, k, distance_func, **kwargs + ) + ] + + def similarity_search_with_score( + self, + query: str, + k: int = 4, + distance_func: Literal[ + "sqrt_euclid", "neg_dot_prod", "ned_cos" + ] = "sqrt_euclid", + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + query_vector = self._embedding.embed_query(query) + return self.similarity_search_with_score_by_vector( + query_vector, k, distance_func, **kwargs + ) + + def similarity_search( + self, + query: str, + k: int = 4, + distance_func: Literal[ + "sqrt_euclid", "neg_dot_prod", "ned_cos" + ] = "sqrt_euclid", + **kwargs: Any, + ) -> List[Document]: + """Return docs most similar to query.""" + query_vector = self._embedding.embed_query(query) + return [ + doc + for doc, score in self.similarity_search_with_score_by_vector( + query_vector, k, distance_func, **kwargs + ) + ]