forked from Archives/langchain
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
422 lines
15 KiB
Python
422 lines
15 KiB
Python
"""Interface for vector stores."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import warnings
|
|
from abc import ABC, abstractmethod
|
|
from functools import partial
|
|
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar
|
|
|
|
from pydantic import BaseModel, Field, root_validator
|
|
|
|
from langchain.docstore.document import Document
|
|
from langchain.embeddings.base import Embeddings
|
|
from langchain.schema import BaseRetriever
|
|
|
|
VST = TypeVar("VST", bound="VectorStore")
|
|
|
|
|
|
class VectorStore(ABC):
|
|
"""Interface for vector stores."""
|
|
|
|
@abstractmethod
|
|
def add_texts(
|
|
self,
|
|
texts: Iterable[str],
|
|
metadatas: Optional[List[dict]] = None,
|
|
**kwargs: Any,
|
|
) -> List[str]:
|
|
"""Run more texts through the embeddings and add to the vectorstore.
|
|
|
|
Args:
|
|
texts: Iterable of strings to add to the vectorstore.
|
|
metadatas: Optional list of metadatas associated with the texts.
|
|
kwargs: vectorstore specific parameters
|
|
|
|
Returns:
|
|
List of ids from adding the texts into the vectorstore.
|
|
"""
|
|
|
|
async def aadd_texts(
|
|
self,
|
|
texts: Iterable[str],
|
|
metadatas: Optional[List[dict]] = None,
|
|
**kwargs: Any,
|
|
) -> List[str]:
|
|
"""Run more texts through the embeddings and add to the vectorstore."""
|
|
raise NotImplementedError
|
|
|
|
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
|
|
"""Run more documents through the embeddings and add to the vectorstore.
|
|
|
|
Args:
|
|
documents (List[Document]: Documents to add to the vectorstore.
|
|
|
|
|
|
Returns:
|
|
List[str]: List of IDs of the added texts.
|
|
"""
|
|
# TODO: Handle the case where the user doesn't provide ids on the Collection
|
|
texts = [doc.page_content for doc in documents]
|
|
metadatas = [doc.metadata for doc in documents]
|
|
return self.add_texts(texts, metadatas, **kwargs)
|
|
|
|
async def aadd_documents(
|
|
self, documents: List[Document], **kwargs: Any
|
|
) -> List[str]:
|
|
"""Run more documents through the embeddings and add to the vectorstore.
|
|
|
|
Args:
|
|
documents (List[Document]: Documents to add to the vectorstore.
|
|
|
|
Returns:
|
|
List[str]: List of IDs of the added texts.
|
|
"""
|
|
texts = [doc.page_content for doc in documents]
|
|
metadatas = [doc.metadata for doc in documents]
|
|
return await self.aadd_texts(texts, metadatas, **kwargs)
|
|
|
|
def search(self, query: str, search_type: str, **kwargs: Any) -> List[Document]:
|
|
"""Return docs most similar to query using specified search type."""
|
|
if search_type == "similarity":
|
|
return self.similarity_search(query, **kwargs)
|
|
elif search_type == "mmr":
|
|
return self.max_marginal_relevance_search(query, **kwargs)
|
|
else:
|
|
raise ValueError(
|
|
f"search_type of {search_type} not allowed. Expected "
|
|
"search_type to be 'similarity' or 'mmr'."
|
|
)
|
|
|
|
async def asearch(
|
|
self, query: str, search_type: str, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to query using specified search type."""
|
|
if search_type == "similarity":
|
|
return await self.asimilarity_search(query, **kwargs)
|
|
elif search_type == "mmr":
|
|
return await self.amax_marginal_relevance_search(query, **kwargs)
|
|
else:
|
|
raise ValueError(
|
|
f"search_type of {search_type} not allowed. Expected "
|
|
"search_type to be 'similarity' or 'mmr'."
|
|
)
|
|
|
|
@abstractmethod
|
|
def similarity_search(
|
|
self, query: str, k: int = 4, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to query."""
|
|
|
|
def similarity_search_with_relevance_scores(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
**kwargs: Any,
|
|
) -> List[Tuple[Document, float]]:
|
|
"""Return docs and relevance scores in the range [0, 1].
|
|
|
|
0 is dissimilar, 1 is most similar.
|
|
|
|
Args:
|
|
query: input text
|
|
k: Number of Documents to return. Defaults to 4.
|
|
**kwargs: kwargs to be passed to similarity search. Should include:
|
|
score_threshold: Optional, a floating point value between 0 to 1 to
|
|
filter the resulting set of retrieved docs
|
|
|
|
Returns:
|
|
List of Tuples of (doc, similarity_score)
|
|
"""
|
|
docs_and_similarities = self._similarity_search_with_relevance_scores(
|
|
query, k=k, **kwargs
|
|
)
|
|
if any(
|
|
similarity < 0.0 or similarity > 1.0
|
|
for _, similarity in docs_and_similarities
|
|
):
|
|
warnings.warn(
|
|
"Relevance scores must be between"
|
|
f" 0 and 1, got {docs_and_similarities}"
|
|
)
|
|
|
|
score_threshold = kwargs.get("score_threshold")
|
|
if score_threshold is not None:
|
|
docs_and_similarities = [
|
|
(doc, similarity)
|
|
for doc, similarity in docs_and_similarities
|
|
if similarity >= score_threshold
|
|
]
|
|
if len(docs_and_similarities) == 0:
|
|
warnings.warn(
|
|
f"No relevant docs were retrieved using the relevance score\
|
|
threshold {score_threshold}"
|
|
)
|
|
return docs_and_similarities
|
|
|
|
def _similarity_search_with_relevance_scores(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
**kwargs: Any,
|
|
) -> List[Tuple[Document, float]]:
|
|
"""Return docs and relevance scores, normalized on a scale from 0 to 1.
|
|
|
|
0 is dissimilar, 1 is most similar.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def asimilarity_search_with_relevance_scores(
|
|
self, query: str, k: int = 4, **kwargs: Any
|
|
) -> List[Tuple[Document, float]]:
|
|
"""Return docs most similar to query."""
|
|
|
|
# This is a temporary workaround to make the similarity search
|
|
# asynchronous. The proper solution is to make the similarity search
|
|
# asynchronous in the vector store implementations.
|
|
func = partial(self.similarity_search_with_relevance_scores, query, k, **kwargs)
|
|
return await asyncio.get_event_loop().run_in_executor(None, func)
|
|
|
|
async def asimilarity_search(
|
|
self, query: str, k: int = 4, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to query."""
|
|
|
|
# This is a temporary workaround to make the similarity search
|
|
# asynchronous. The proper solution is to make the similarity search
|
|
# asynchronous in the vector store implementations.
|
|
func = partial(self.similarity_search, query, k, **kwargs)
|
|
return await asyncio.get_event_loop().run_in_executor(None, func)
|
|
|
|
def similarity_search_by_vector(
|
|
self, embedding: List[float], k: int = 4, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to embedding vector.
|
|
|
|
Args:
|
|
embedding: Embedding to look up documents similar to.
|
|
k: Number of Documents to return. Defaults to 4.
|
|
|
|
Returns:
|
|
List of Documents most similar to the query vector.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def asimilarity_search_by_vector(
|
|
self, embedding: List[float], k: int = 4, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to embedding vector."""
|
|
|
|
# This is a temporary workaround to make the similarity search
|
|
# asynchronous. The proper solution is to make the similarity search
|
|
# asynchronous in the vector store implementations.
|
|
func = partial(self.similarity_search_by_vector, embedding, k, **kwargs)
|
|
return await asyncio.get_event_loop().run_in_executor(None, func)
|
|
|
|
def max_marginal_relevance_search(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return docs selected using the maximal marginal relevance.
|
|
|
|
Maximal marginal relevance optimizes for similarity to query AND diversity
|
|
among selected documents.
|
|
|
|
Args:
|
|
query: Text to look up documents similar to.
|
|
k: Number of Documents to return. Defaults to 4.
|
|
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
|
|
lambda_mult: Number between 0 and 1 that determines the degree
|
|
of diversity among the results with 0 corresponding
|
|
to maximum diversity and 1 to minimum diversity.
|
|
Defaults to 0.5.
|
|
Returns:
|
|
List of Documents selected by maximal marginal relevance.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def amax_marginal_relevance_search(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return docs selected using the maximal marginal relevance."""
|
|
|
|
# This is a temporary workaround to make the similarity search
|
|
# asynchronous. The proper solution is to make the similarity search
|
|
# asynchronous in the vector store implementations.
|
|
func = partial(
|
|
self.max_marginal_relevance_search, query, k, fetch_k, lambda_mult, **kwargs
|
|
)
|
|
return await asyncio.get_event_loop().run_in_executor(None, func)
|
|
|
|
def max_marginal_relevance_search_by_vector(
|
|
self,
|
|
embedding: List[float],
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return docs selected using the maximal marginal relevance.
|
|
|
|
Maximal marginal relevance optimizes for similarity to query AND diversity
|
|
among selected documents.
|
|
|
|
Args:
|
|
embedding: Embedding to look up documents similar to.
|
|
k: Number of Documents to return. Defaults to 4.
|
|
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
|
|
lambda_mult: Number between 0 and 1 that determines the degree
|
|
of diversity among the results with 0 corresponding
|
|
to maximum diversity and 1 to minimum diversity.
|
|
Defaults to 0.5.
|
|
Returns:
|
|
List of Documents selected by maximal marginal relevance.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def amax_marginal_relevance_search_by_vector(
|
|
self,
|
|
embedding: List[float],
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return docs selected using the maximal marginal relevance."""
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def from_documents(
|
|
cls: Type[VST],
|
|
documents: List[Document],
|
|
embedding: Embeddings,
|
|
**kwargs: Any,
|
|
) -> VST:
|
|
"""Return VectorStore initialized from documents and embeddings."""
|
|
texts = [d.page_content for d in documents]
|
|
metadatas = [d.metadata for d in documents]
|
|
return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
|
|
|
|
@classmethod
|
|
async def afrom_documents(
|
|
cls: Type[VST],
|
|
documents: List[Document],
|
|
embedding: Embeddings,
|
|
**kwargs: Any,
|
|
) -> VST:
|
|
"""Return VectorStore initialized from documents and embeddings."""
|
|
texts = [d.page_content for d in documents]
|
|
metadatas = [d.metadata for d in documents]
|
|
return await cls.afrom_texts(texts, embedding, metadatas=metadatas, **kwargs)
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def from_texts(
|
|
cls: Type[VST],
|
|
texts: List[str],
|
|
embedding: Embeddings,
|
|
metadatas: Optional[List[dict]] = None,
|
|
**kwargs: Any,
|
|
) -> VST:
|
|
"""Return VectorStore initialized from texts and embeddings."""
|
|
|
|
@classmethod
|
|
async def afrom_texts(
|
|
cls: Type[VST],
|
|
texts: List[str],
|
|
embedding: Embeddings,
|
|
metadatas: Optional[List[dict]] = None,
|
|
**kwargs: Any,
|
|
) -> VST:
|
|
"""Return VectorStore initialized from texts and embeddings."""
|
|
raise NotImplementedError
|
|
|
|
def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
|
|
return VectorStoreRetriever(vectorstore=self, **kwargs)
|
|
|
|
|
|
class VectorStoreRetriever(BaseRetriever, BaseModel):
|
|
vectorstore: VectorStore
|
|
search_type: str = "similarity"
|
|
search_kwargs: dict = Field(default_factory=dict)
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
arbitrary_types_allowed = True
|
|
|
|
@root_validator()
|
|
def validate_search_type(cls, values: Dict) -> Dict:
|
|
"""Validate search type."""
|
|
if "search_type" in values:
|
|
search_type = values["search_type"]
|
|
if search_type not in ("similarity", "similarity_score_threshold", "mmr"):
|
|
raise ValueError(f"search_type of {search_type} not allowed.")
|
|
if search_type == "similarity_score_threshold":
|
|
score_threshold = values["search_kwargs"].get("score_threshold")
|
|
if (score_threshold is None) or (
|
|
not isinstance(score_threshold, float)
|
|
):
|
|
raise ValueError(
|
|
"`score_threshold` is not specified with a float value(0~1) "
|
|
"in `search_kwargs`."
|
|
)
|
|
return values
|
|
|
|
def get_relevant_documents(self, query: str) -> List[Document]:
|
|
if self.search_type == "similarity":
|
|
docs = self.vectorstore.similarity_search(query, **self.search_kwargs)
|
|
elif self.search_type == "similarity_score_threshold":
|
|
docs_and_similarities = (
|
|
self.vectorstore.similarity_search_with_relevance_scores(
|
|
query, **self.search_kwargs
|
|
)
|
|
)
|
|
docs = [doc for doc, _ in docs_and_similarities]
|
|
elif self.search_type == "mmr":
|
|
docs = self.vectorstore.max_marginal_relevance_search(
|
|
query, **self.search_kwargs
|
|
)
|
|
else:
|
|
raise ValueError(f"search_type of {self.search_type} not allowed.")
|
|
return docs
|
|
|
|
async def aget_relevant_documents(self, query: str) -> List[Document]:
|
|
if self.search_type == "similarity":
|
|
docs = await self.vectorstore.asimilarity_search(
|
|
query, **self.search_kwargs
|
|
)
|
|
elif self.search_type == "similarity_score_threshold":
|
|
docs_and_similarities = (
|
|
await self.vectorstore.asimilarity_search_with_relevance_scores(
|
|
query, **self.search_kwargs
|
|
)
|
|
)
|
|
docs = [doc for doc, _ in docs_and_similarities]
|
|
elif self.search_type == "mmr":
|
|
docs = await self.vectorstore.amax_marginal_relevance_search(
|
|
query, **self.search_kwargs
|
|
)
|
|
else:
|
|
raise ValueError(f"search_type of {self.search_type} not allowed.")
|
|
return docs
|
|
|
|
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
|
|
"""Add documents to vectorstore."""
|
|
return self.vectorstore.add_documents(documents, **kwargs)
|
|
|
|
async def aadd_documents(
|
|
self, documents: List[Document], **kwargs: Any
|
|
) -> List[str]:
|
|
"""Add documents to vectorstore."""
|
|
return await self.vectorstore.aadd_documents(documents, **kwargs)
|