from __future__ import annotations import logging import uuid from typing import ( Any, Iterable, List, Optional, Tuple, ) import numpy as np from langchain_core.documents import Document from langchain_core.embeddings import Embeddings from langchain_core.utils import get_from_env from langchain_core.vectorstores import VectorStore from langchain_community.vectorstores.utils import maximal_marginal_relevance logger = logging.getLogger(__name__) class DashVector(VectorStore): """`DashVector` vector store. To use, you should have the ``dashvector`` python package installed. Example: .. code-block:: python from langchain_community.vectorstores import DashVector from langchain_community.embeddings.openai import OpenAIEmbeddings import dashvector client = dashvector.Client(api_key="***") client.create("langchain", dimension=1024) collection = client.get("langchain") embeddings = OpenAIEmbeddings() vectorstore = DashVector(collection, embeddings.embed_query, "text") """ def __init__( self, collection: Any, embedding: Embeddings, text_field: str, ): """Initialize with DashVector collection.""" try: import dashvector except ImportError: raise ValueError( "Could not import dashvector python package. " "Please install it with `pip install dashvector`." ) if not isinstance(collection, dashvector.Collection): raise ValueError( f"collection should be an instance of dashvector.Collection, " f"bug got {type(collection)}" ) self._collection = collection self._embedding = embedding self._text_field = text_field def _create_partition_if_not_exists(self, partition: str) -> None: """Create a Partition in current Collection.""" self._collection.create_partition(partition) def _similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[str] = None, partition: str = "default", ) -> List[Tuple[Document, float]]: """Return docs most similar to query vector, along with scores""" # query by vector ret = self._collection.query( embedding, topk=k, filter=filter, partition=partition ) if not ret: raise ValueError( f"Fail to query docs by vector, error {self._collection.message}" ) docs = [] for doc in ret: metadata = doc.fields text = metadata.pop(self._text_field) score = doc.score docs.append((Document(page_content=text, metadata=metadata), score)) return docs def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = 25, partition: str = "default", **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of ids associated with the texts. batch_size: Optional batch size to upsert docs. partition: a partition name in collection. [optional]. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """ self._create_partition_if_not_exists(partition) ids = ids or [str(uuid.uuid4().hex) for _ in texts] text_list = list(texts) for i in range(0, len(text_list), batch_size): # batch end end = min(i + batch_size, len(text_list)) batch_texts = text_list[i:end] batch_ids = ids[i:end] batch_embeddings = self._embedding.embed_documents(list(batch_texts)) # batch metadatas if metadatas: batch_metadatas = metadatas[i:end] else: batch_metadatas = [{} for _ in range(i, end)] for metadata, text in zip(batch_metadatas, batch_texts): metadata[self._text_field] = text # batch upsert to collection docs = list(zip(batch_ids, batch_embeddings, batch_metadatas)) ret = self._collection.upsert(docs, partition=partition) if not ret: raise ValueError( f"Fail to upsert docs to dashvector vector database," f"Error: {ret.message}" ) return ids def delete( self, ids: Optional[List[str]] = None, partition: str = "default", **kwargs: Any ) -> bool: """Delete by vector ID. Args: ids: List of ids to delete. partition: a partition name in collection. [optional]. Returns: True if deletion is successful, False otherwise. """ return bool(self._collection.delete(ids, partition=partition)) def similarity_search( self, query: str, k: int = 4, filter: Optional[str] = None, partition: str = "default", **kwargs: Any, ) -> List[Document]: """Return docs most similar to query. Args: query: Text to search documents similar to. k: Number of documents to return. Default to 4. filter: Doc fields filter conditions that meet the SQL where clause specification. partition: a partition name in collection. [optional]. Returns: List of Documents most similar to the query text. """ docs_and_scores = self.similarity_search_with_relevance_scores( query, k, filter, partition ) return [doc for doc, _ in docs_and_scores] def similarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[str] = None, partition: str = "default", **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs most similar to query text , alone with relevance scores. Less is more similar, more is more dissimilar. Args: query: input text k: Number of Documents to return. Defaults to 4. filter: Doc fields filter conditions that meet the SQL where clause specification. partition: a partition name in collection. [optional]. Returns: List of Tuples of (doc, similarity_score) """ embedding = self._embedding.embed_query(query) return self._similarity_search_with_score_by_vector( embedding, k=k, filter=filter, partition=partition ) def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[str] = None, partition: str = "default", **kwargs: Any, ) -> List[Document]: """Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Doc fields filter conditions that meet the SQL where clause specification. partition: a partition name in collection. [optional]. Returns: List of Documents most similar to the query vector. """ docs_and_scores = self._similarity_search_with_score_by_vector( embedding, k, filter, partition ) return [doc for doc, _ in docs_and_scores] def max_marginal_relevance_search( self, query: str, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[dict] = None, partition: str = "default", **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Doc fields filter conditions that meet the SQL where clause specification. partition: a partition name in collection. [optional]. Returns: List of Documents selected by maximal marginal relevance. """ embedding = self._embedding.embed_query(query) return self.max_marginal_relevance_search_by_vector( embedding, k, fetch_k, lambda_mult, filter, partition ) def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[dict] = None, partition: str = "default", **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Doc fields filter conditions that meet the SQL where clause specification. partition: a partition name in collection. [optional]. Returns: List of Documents selected by maximal marginal relevance. """ # query by vector ret = self._collection.query( embedding, topk=fetch_k, filter=filter, partition=partition, include_vector=True, ) if not ret: raise ValueError( f"Fail to query docs by vector, error {self._collection.message}" ) candidate_embeddings = [doc.vector for doc in ret] mmr_selected = maximal_marginal_relevance( np.array(embedding), candidate_embeddings, lambda_mult, k ) metadatas = [ret.output[i].fields for i in mmr_selected] return [ Document(page_content=metadata.pop(self._text_field), metadata=metadata) for metadata in metadatas ] @classmethod def from_texts( cls, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, dashvector_api_key: Optional[str] = None, dashvector_endpoint: Optional[str] = None, collection_name: str = "langchain", text_field: str = "text", batch_size: int = 25, ids: Optional[List[str]] = None, **kwargs: Any, ) -> DashVector: """Return DashVector VectorStore initialized from texts and embeddings. This is the quick way to get started with dashvector vector store. Example: .. code-block:: python from langchain_community.vectorstores import DashVector from langchain_community.embeddings import OpenAIEmbeddings import dashvector embeddings = OpenAIEmbeddings() dashvector = DashVector.from_documents( docs, embeddings, dashvector_api_key="{DASHVECTOR_API_KEY}" ) """ try: import dashvector except ImportError: raise ValueError( "Could not import dashvector python package. " "Please install it with `pip install dashvector`." ) dashvector_api_key = dashvector_api_key or get_from_env( "dashvector_api_key", "DASHVECTOR_API_KEY" ) dashvector_endpoint = dashvector_endpoint or get_from_env( "dashvector_endpoint", "DASHVECTOR_ENDPOINT", default="dashvector.cn-hangzhou.aliyuncs.com", ) dashvector_client = dashvector.Client( api_key=dashvector_api_key, endpoint=dashvector_endpoint ) dashvector_client.delete(collection_name) collection = dashvector_client.get(collection_name) if not collection: dim = len(embedding.embed_query(texts[0])) # create collection if not existed resp = dashvector_client.create(collection_name, dimension=dim) if resp: collection = dashvector_client.get(collection_name) else: raise ValueError( "Fail to create collection. " f"Error: {resp.message}." ) dashvector_vector_db = cls(collection, embedding, text_field) dashvector_vector_db.add_texts(texts, metadatas, ids, batch_size) return dashvector_vector_db