You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

437 lines
16 KiB

"""Interface for vector stores."""
from __future__ import annotations
import asyncio
import warnings
from abc import ABC, abstractmethod
from functools import partial
from typing import (
from pydantic import BaseModel, Field, root_validator
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.schema import BaseRetriever
VST = TypeVar("VST", bound="VectorStore")
class VectorStore(ABC):
"""Interface for vector stores."""
def add_texts(
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
kwargs: vectorstore specific parameters
List of ids from adding the texts into the vectorstore.
async def aadd_texts(
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore."""
raise NotImplementedError
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
"""Run more documents through the embeddings and add to the vectorstore.
documents (List[Document]: Documents to add to the vectorstore.
List[str]: List of IDs of the added texts.
# TODO: Handle the case where the user doesn't provide ids on the Collection
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return self.add_texts(texts, metadatas, **kwargs)
async def aadd_documents(
self, documents: List[Document], **kwargs: Any
) -> List[str]:
"""Run more documents through the embeddings and add to the vectorstore.
documents (List[Document]: Documents to add to the vectorstore.
List[str]: List of IDs of the added texts.
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return await self.aadd_texts(texts, metadatas, **kwargs)
def search(self, query: str, search_type: str, **kwargs: Any) -> List[Document]:
"""Return docs most similar to query using specified search type."""
if search_type == "similarity":
return self.similarity_search(query, **kwargs)
elif search_type == "mmr":
return self.max_marginal_relevance_search(query, **kwargs)
raise ValueError(
f"search_type of {search_type} not allowed. Expected "
"search_type to be 'similarity' or 'mmr'."
async def asearch(
self, query: str, search_type: str, **kwargs: Any
) -> List[Document]:
"""Return docs most similar to query using specified search type."""
if search_type == "similarity":
return await self.asimilarity_search(query, **kwargs)
elif search_type == "mmr":
return await self.amax_marginal_relevance_search(query, **kwargs)
raise ValueError(
f"search_type of {search_type} not allowed. Expected "
"search_type to be 'similarity' or 'mmr'."
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""Return docs most similar to query."""
def similarity_search_with_relevance_scores(
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs and relevance scores in the range [0, 1].
0 is dissimilar, 1 is most similar.
query: input text
k: Number of Documents to return. Defaults to 4.
**kwargs: kwargs to be passed to similarity search. Should include:
score_threshold: Optional, a floating point value between 0 to 1 to
filter the resulting set of retrieved docs
List of Tuples of (doc, similarity_score)
docs_and_similarities = self._similarity_search_with_relevance_scores(
query, k=k, **kwargs
if any(
similarity < 0.0 or similarity > 1.0
for _, similarity in docs_and_similarities
"Relevance scores must be between"
f" 0 and 1, got {docs_and_similarities}"
score_threshold = kwargs.get("score_threshold")
if score_threshold is not None:
docs_and_similarities = [
(doc, similarity)
for doc, similarity in docs_and_similarities
if similarity >= score_threshold
if len(docs_and_similarities) == 0:
"No relevant docs were retrieved using the relevance score"
f" threshold {score_threshold}"
return docs_and_similarities
def _similarity_search_with_relevance_scores(
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs and relevance scores, normalized on a scale from 0 to 1.
0 is dissimilar, 1 is most similar.
raise NotImplementedError
async def asimilarity_search_with_relevance_scores(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query."""
# This is a temporary workaround to make the similarity search
# asynchronous. The proper solution is to make the similarity search
# asynchronous in the vector store implementations.
func = partial(self.similarity_search_with_relevance_scores, query, k, **kwargs)
return await asyncio.get_event_loop().run_in_executor(None, func)
async def asimilarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""Return docs most similar to query."""
# This is a temporary workaround to make the similarity search
# asynchronous. The proper solution is to make the similarity search
# asynchronous in the vector store implementations.
func = partial(self.similarity_search, query, k, **kwargs)
return await asyncio.get_event_loop().run_in_executor(None, func)
def similarity_search_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Document]:
"""Return docs most similar to embedding vector.
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
List of Documents most similar to the query vector.
raise NotImplementedError
async def asimilarity_search_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Document]:
"""Return docs most similar to embedding vector."""
# This is a temporary workaround to make the similarity search
# asynchronous. The proper solution is to make the similarity search
# asynchronous in the vector store implementations.
func = partial(self.similarity_search_by_vector, embedding, k, **kwargs)
return await asyncio.get_event_loop().run_in_executor(None, func)
def max_marginal_relevance_search(
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
List of Documents selected by maximal marginal relevance.
raise NotImplementedError
async def amax_marginal_relevance_search(
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance."""
# This is a temporary workaround to make the similarity search
# asynchronous. The proper solution is to make the similarity search
# asynchronous in the vector store implementations.
func = partial(
self.max_marginal_relevance_search, query, k, fetch_k, lambda_mult, **kwargs
return await asyncio.get_event_loop().run_in_executor(None, func)
def max_marginal_relevance_search_by_vector(
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
List of Documents selected by maximal marginal relevance.
raise NotImplementedError
async def amax_marginal_relevance_search_by_vector(
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance."""
raise NotImplementedError
def from_documents(
cls: Type[VST],
documents: List[Document],
embedding: Embeddings,
**kwargs: Any,
) -> VST:
"""Return VectorStore initialized from documents and embeddings."""
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
async def afrom_documents(
cls: Type[VST],
documents: List[Document],
embedding: Embeddings,
**kwargs: Any,
) -> VST:
"""Return VectorStore initialized from documents and embeddings."""
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
return await cls.afrom_texts(texts, embedding, metadatas=metadatas, **kwargs)
def from_texts(
cls: Type[VST],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> VST:
"""Return VectorStore initialized from texts and embeddings."""
async def afrom_texts(
cls: Type[VST],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> VST:
"""Return VectorStore initialized from texts and embeddings."""
raise NotImplementedError
def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
return VectorStoreRetriever(vectorstore=self, **kwargs)
class VectorStoreRetriever(BaseRetriever, BaseModel):
vectorstore: VectorStore
search_type: str = "similarity"
search_kwargs: dict = Field(default_factory=dict)
allowed_search_types: ClassVar[Collection[str]] = (
class Config:
"""Configuration for this pydantic object."""
arbitrary_types_allowed = True
def validate_search_type(cls, values: Dict) -> Dict:
"""Validate search type."""
search_type = values["search_type"]
if search_type not in cls.allowed_search_types:
raise ValueError(
f"search_type of {search_type} not allowed. Valid values are: "
if search_type == "similarity_score_threshold":
score_threshold = values["search_kwargs"].get("score_threshold")
if (score_threshold is None) or (not isinstance(score_threshold, float)):
raise ValueError(
"`score_threshold` is not specified with a float value(0~1) "
"in `search_kwargs`."
return values
def get_relevant_documents(self, query: str) -> List[Document]:
if self.search_type == "similarity":
docs = self.vectorstore.similarity_search(query, **self.search_kwargs)
elif self.search_type == "similarity_score_threshold":
docs_and_similarities = (
query, **self.search_kwargs
docs = [doc for doc, _ in docs_and_similarities]
elif self.search_type == "mmr":
docs = self.vectorstore.max_marginal_relevance_search(
query, **self.search_kwargs
raise ValueError(f"search_type of {self.search_type} not allowed.")
return docs
async def aget_relevant_documents(self, query: str) -> List[Document]:
if self.search_type == "similarity":
docs = await self.vectorstore.asimilarity_search(
query, **self.search_kwargs
elif self.search_type == "similarity_score_threshold":
docs_and_similarities = (
await self.vectorstore.asimilarity_search_with_relevance_scores(
query, **self.search_kwargs
docs = [doc for doc, _ in docs_and_similarities]
elif self.search_type == "mmr":
docs = await self.vectorstore.amax_marginal_relevance_search(
query, **self.search_kwargs
raise ValueError(f"search_type of {self.search_type} not allowed.")
return docs
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
"""Add documents to vectorstore."""
return self.vectorstore.add_documents(documents, **kwargs)
async def aadd_documents(
self, documents: List[Document], **kwargs: Any
) -> List[str]:
"""Add documents to vectorstore."""
return await self.vectorstore.aadd_documents(documents, **kwargs)