@ -1,32 +1,17 @@
from __future__ import annotations
from __future__ import annotations
import uuid
import uuid
from typing import Any , Iterable, List , Literal , Optional , Tuple , Type
from typing import Any , Dict, Iterable, List , Literal , Optional , Tuple , Union
import numpy as np
import sqlalchemy
from langchain_core . documents import Document
from langchain_core . documents import Document
from langchain_core . embeddings import Embeddings
from langchain_core . embeddings import Embeddings
from langchain_core . vectorstores import VectorStore
from langchain_core . vectorstores import VectorStore
from sqlalchemy import insert , select
from sqlalchemy . dialects import postgresql
from sqlalchemy . orm import DeclarativeBase , Mapped , mapped_column
from sqlalchemy . orm . session import Session
class _ORMBase ( DeclarativeBase ) :
__tablename__ : str
id : Mapped [ uuid . UUID ]
text : Mapped [ str ]
meta : Mapped [ dict ]
embedding : Mapped [ np . ndarray ]
class PGVecto_rs ( VectorStore ) :
class PGVecto_rs ( VectorStore ) :
""" VectorStore backed by pgvecto_rs. """
""" VectorStore backed by pgvecto_rs. """
_engine : sqlalchemy . engine . Engine
_store = None
_table : Type [ _ORMBase ]
_embedding : Embeddings
_embedding : Embeddings
def __init__ (
def __init__ (
@ -45,28 +30,22 @@ class PGVecto_rs(VectorStore):
db_url : Database URL .
db_url : Database URL .
collection_name : Name of the collection .
collection_name : Name of the collection .
new_table : Whether to create a new table or connect to an existing one .
new_table : Whether to create a new table or connect to an existing one .
If true , the table will be dropped if exists , then recreated .
Defaults to False .
Defaults to False .
"""
"""
try :
try :
from pgvecto_rs . s qlalchemy import Vector
from pgvecto_rs . s dk import PGVectoRs
except ImportError as e :
except ImportError as e :
raise ImportError (
raise ImportError (
" Unable to import pgvector_rs , please install with "
" Unable to import pgvector_rs .sdk , please install with "
" `pip install pgvector_rs`. "
' `pip install " pgvector_rs[sdk] " `. '
) from e
) from e
self . _store = PGVectoRs (
class _Table ( _ORMBase ) :
db_url = db_url ,
__tablename__ = f " collection_ { collection_name } "
collection_name= collection_name ,
id : Mapped [ uuid . UUID ] = mapped_column (
dimension = dimension ,
postgresql . UUID ( as_uuid = True ) , primary_key = True , default = uuid . uuid4
recreate = new_table ,
)
)
text : Mapped [ str ] = mapped_column ( sqlalchemy . String )
meta : Mapped [ dict ] = mapped_column ( postgresql . JSONB )
embedding : Mapped [ np . ndarray ] = mapped_column ( Vector ( dimension ) )
self . _engine = sqlalchemy . create_engine ( db_url )
self . _table = _Table
self . _table . __table__ . create ( self . _engine , checkfirst = not new_table ) # type: ignore
self . _embedding = embedding
self . _embedding = embedding
# ================ Create interface =================
# ================ Create interface =================
@ -90,7 +69,6 @@ class PGVecto_rs(VectorStore):
dimension = dimension ,
dimension = dimension ,
db_url = db_url ,
db_url = db_url ,
collection_name = collection_name ,
collection_name = collection_name ,
new_table = True ,
)
)
_self . add_texts ( texts , metadatas , * * kwargs )
_self . add_texts ( texts , metadatas , * * kwargs )
return _self
return _self
@ -148,19 +126,15 @@ class PGVecto_rs(VectorStore):
List of ids of the added texts .
List of ids of the added texts .
"""
"""
from pgvecto_rs . sdk import Record
embeddings = self . _embedding . embed_documents ( list ( texts ) )
embeddings = self . _embedding . embed_documents ( list ( texts ) )
with Session ( self . _engine ) as _session :
records = [
results : List [ str ] = [ ]
Record . from_text ( text , embedding , meta )
for text , embedding , metadata in zip (
for text , embedding , meta in zip ( texts , embeddings , metadatas or [ ] )
texts , embeddings , metadatas or [ dict ( ) ] * len ( list ( texts ) )
]
) :
self . _store . insert ( records )
t = insert ( self . _table ) . values (
return [ str ( record . id ) for record in records ]
text = text , meta = metadata , embedding = embedding
)
id = _session . execute ( t ) . inserted_primary_key [ 0 ] # type: ignore
results . append ( str ( id ) )
_session . commit ( )
return results
def add_documents ( self , documents : List [ Document ] , * * kwargs : Any ) - > List [ str ] :
def add_documents ( self , documents : List [ Document ] , * * kwargs : Any ) - > List [ str ] :
""" Run more documents through the embeddings and add to the vectorstore.
""" Run more documents through the embeddings and add to the vectorstore.
@ -185,30 +159,40 @@ class PGVecto_rs(VectorStore):
distance_func : Literal [
distance_func : Literal [
" sqrt_euclid " , " neg_dot_prod " , " ned_cos "
" sqrt_euclid " , " neg_dot_prod " , " ned_cos "
] = " sqrt_euclid " ,
] = " sqrt_euclid " ,
filter : Union [ None , Dict [ str , Any ] , Any ] = None ,
* * kwargs : Any ,
* * kwargs : Any ,
) - > List [ Tuple [ Document , float ] ] :
) - > List [ Tuple [ Document , float ] ] :
""" Return docs most similar to query vector, with its score. """
""" Return docs most similar to query vector, with its score. """
with Session ( self . _engine ) as _session :
real_distance_func = (
self . _table . embedding . squared_euclidean_distance
if distance_func == " sqrt_euclid "
else self . _table . embedding . negative_dot_product_distance
if distance_func == " neg_dot_prod "
else self . _table . embedding . negative_cosine_distance
if distance_func == " ned_cos "
else None
)
if real_distance_func is None :
raise ValueError ( " Invalid distance function " )
t = (
from pgvecto_rs . sdk . filters import meta_contains
select ( self . _table , real_distance_func ( query_vector ) . label ( " score " ) )
. order_by ( " score " )
distance_func_map = {
. limit ( k ) # type: ignore
" sqrt_euclid " : " <-> " ,
" neg_dot_prod " : " <#> " ,
" ned_cos " : " <=> " ,
}
if filter is None :
real_filter = None
elif isinstance ( filter , dict ) :
real_filter = meta_contains ( filter )
else :
real_filter = filter
results = self . _store . search (
query_vector ,
distance_func_map [ distance_func ] ,
k ,
filter = real_filter ,
)
)
return [
return [
( Document ( page_content = row [ 0 ] . text , metadata = row [ 0 ] . meta ) , row [ 1 ] )
(
for row in _session . execute ( t )
Document (
page_content = res [ 0 ] . text ,
metadata = res [ 0 ] . meta ,
) ,
res [ 1 ] ,
)
for res in results
]
]
def similarity_search_by_vector (
def similarity_search_by_vector (
@ -218,11 +202,12 @@ class PGVecto_rs(VectorStore):
distance_func : Literal [
distance_func : Literal [
" sqrt_euclid " , " neg_dot_prod " , " ned_cos "
" sqrt_euclid " , " neg_dot_prod " , " ned_cos "
] = " sqrt_euclid " ,
] = " sqrt_euclid " ,
filter : Optional [ Any ] = None ,
* * kwargs : Any ,
* * kwargs : Any ,
) - > List [ Document ] :
) - > List [ Document ] :
return [
return [
doc
doc
for doc , score in self . similarity_search_with_score_by_vector (
for doc , _ score in self . similarity_search_with_score_by_vector (
embedding , k , distance_func , * * kwargs
embedding , k , distance_func , * * kwargs
)
)
]
]
@ -254,7 +239,7 @@ class PGVecto_rs(VectorStore):
query_vector = self . _embedding . embed_query ( query )
query_vector = self . _embedding . embed_query ( query )
return [
return [
doc
doc
for doc , score in self . similarity_search_with_score_by_vector (
for doc , _ score in self . similarity_search_with_score_by_vector (
query_vector , k , distance_func , * * kwargs
query_vector , k , distance_func , * * kwargs
)
)
]
]