From c7ff5f19a811c8eaa43a4dbe03ca9473ff30821f Mon Sep 17 00:00:00 2001 From: Jeff Vestal <53237856+jeffvestal@users.noreply.github.com> Date: Fri, 28 Jul 2023 00:00:18 -0500 Subject: [PATCH] ElasticKnnSearch rewrite - bug fix - return Document (#8180) Fixes: https://github.com/hwchase17/langchain/issues/7117 https://github.com/hwchase17/langchain/issues/5760 Adding back `create_index` , `add_texts`, `from_texts` to ElasticKnnSearch `from_texts` matches standard `from_texts` methods as quick start up method `knn_search` and `hybrid_result` return a list of [`Document()`, `score`,] # Test `from_texts` for quick start ``` # create new index using from_text from langchain.vectorstores.elastic_vector_search import ElasticKnnSearch from langchain.embeddings import ElasticsearchEmbeddings model_id = "sentence-transformers__all-distilroberta-v1" dims = 768 es_cloud_id = "" es_user = "" es_password = "" test_index = "knn_test_index_305" embeddings = ElasticsearchEmbeddings.from_credentials( model_id, #input_field=input_field, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password, ) # add texts and create class instance texts = ["This is a test document", "This is another test document"] knnvectorsearch = ElasticKnnSearch.from_texts( texts=texts, embedding=embeddings, index_name= test_index, vector_query_field='vector', query_field='text', model_id=model_id, dims=dims, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password ) # Test `add_texts` method texts2 = ["Hello, world!", "Machine learning is fun.", "I love Python."] knnvectorsearch.add_texts(texts2) query = "Hello" knn_result = knnvectorsearch.knn_search(query = query, model_id= model_id, k=2) hybrid_result = knnvectorsearch.knn_hybrid_search(query = query, model_id= model_id, k=2) ``` The mapping is as follows: ``` { "knn_test_index_012": { "mappings": { "properties": { "text": { "type": "text" }, "vector": { "type": "dense_vector", "dims": 768, "index": true, "similarity": "dot_product" } } } } } ``` # Check response type ``` >>> hybrid_result [(Document(page_content='Hello, world!', metadata={}), 0.94232327), (Document(page_content='I love Python.', metadata={}), 0.5321523)] >>> hybrid_result[0] (Document(page_content='Hello, world!', metadata={}), 0.94232327) >>> hybrid_result[0][0] Document(page_content='Hello, world!', metadata={}) >>> type(hybrid_result[0][0]) ``` # Test with existing Index ``` from langchain.vectorstores.elastic_vector_search import ElasticKnnSearch from langchain.embeddings import ElasticsearchEmbeddings ## Initialize ElasticsearchEmbeddings model_id = "sentence-transformers__all-distilroberta-v1" dims = 768 es_cloud_id = es_user = "" es_password = "" test_index = "knn_test_index_012" embeddings = ElasticsearchEmbeddings.from_credentials( model_id, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password, ) ## Initialize ElasticKnnSearch knn_search = ElasticKnnSearch( es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password, index_name= test_index, embedding= embeddings ) ## Test adding vectors ### Test `add_texts` method when index created texts = ["Hello, world!", "Machine learning is fun.", "I love Python."] knn_search.add_texts(texts) ``` --------- Co-authored-by: Bagatur --- .../vectorstores/elastic_vector_search.py | 391 ++++++++++++------ 1 file changed, 269 insertions(+), 122 deletions(-) diff --git a/libs/langchain/langchain/vectorstores/elastic_vector_search.py b/libs/langchain/langchain/vectorstores/elastic_vector_search.py index 9e67cb1164..7be146c161 100644 --- a/libs/langchain/langchain/vectorstores/elastic_vector_search.py +++ b/libs/langchain/langchain/vectorstores/elastic_vector_search.py @@ -339,11 +339,38 @@ class ElasticVectorSearch(VectorStore, ABC): self.client.delete(index=self.index_name, id=id) -class ElasticKnnSearch(ElasticVectorSearch): +class ElasticKnnSearch(VectorStore, ABC): """ - A class for performing k-Nearest Neighbors (k-NN) search on an Elasticsearch index. - The class is designed for a text search scenario where documents are text strings - and their embeddings are vector representations of those strings. + ElasticKnnSearch is a class for performing k-nearest neighbor + (k-NN) searches on text data using Elasticsearch. + + This class is used to create an Elasticsearch index of text data that + can be searched using k-NN search. The text data is transformed into + vector embeddings using a provided embedding model, and these embeddings + are stored in the Elasticsearch index. + + Attributes: + index_name (str): The name of the Elasticsearch index. + embedding (Embeddings): The embedding model to use for transforming text data + into vector embeddings. + es_connection (Elasticsearch, optional): An existing Elasticsearch connection. + es_cloud_id (str, optional): The Cloud ID of your Elasticsearch Service + deployment. + es_user (str, optional): The username for your Elasticsearch Service deployment. + es_password (str, optional): The password for your Elasticsearch Service + deployment. + vector_query_field (str, optional): The name of the field in the Elasticsearch + index that contains the vector embeddings. + query_field (str, optional): The name of the field in the Elasticsearch index + that contains the original text data. + + Usage: + >>> from embeddings import Embeddings + >>> embedding = Embeddings.load('glove') + >>> es_search = ElasticKnnSearch('my_index', embedding) + >>> es_search.add_texts(['Hello world!', 'Another text']) + >>> results = es_search.knn_search('Hello') + [(Document(page_content='Hello world!', metadata={}), 0.9)] """ def __init__( @@ -357,22 +384,6 @@ class ElasticKnnSearch(ElasticVectorSearch): vector_query_field: Optional[str] = "vector", query_field: Optional[str] = "text", ): - """ - Initializes an instance of the ElasticKnnSearch class and sets up the - Elasticsearch client. - - Args: - index_name: The name of the Elasticsearch index. - embedding: An instance of the Embeddings class, used to generate vector - representations of text strings. - es_connection: An existing Elasticsearch connection. - es_cloud_id: The Cloud ID of the Elasticsearch instance. Required if - creating a new connection. - es_user: The username for the Elasticsearch instance. Required if - creating a new connection. - es_password: The password for the Elasticsearch instance. Required if - creating a new connection. - """ try: import elasticsearch except ImportError: @@ -402,48 +413,10 @@ class ElasticKnnSearch(ElasticVectorSearch): or valid credentials for creating a new connection.""" ) - @classmethod - def from_texts( - cls, - texts: List[str], - embedding: Embeddings, - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - index_name: Optional[str] = None, - refresh_indices: bool = True, - es_connection: Optional["Elasticsearch"] = None, - es_cloud_id: Optional[str] = None, - es_user: Optional[str] = None, - es_password: Optional[str] = None, - **kwargs: Any, - ) -> ElasticKnnSearch: - """Construct ElasticKnnSearch wrapper from raw documents. - - This is a user-friendly interface that: - 1. Embeds documents. - 2. Creates a new index for the embeddings in the Elasticsearch instance. - 3. Adds the documents to the newly created Elasticsearch index. - - This is intended to be a quick way to get started. - """ - index_name = index_name or uuid.uuid4().hex - vectorsearch = cls( - index_name, - embedding, - es_connection=es_connection, - es_cloud_id=es_cloud_id, - es_user=es_user, - es_password=es_password, - **kwargs, - ) - vectorsearch.add_texts( - texts, metadatas=metadatas, refresh_indices=refresh_indices, ids=ids - ) - return vectorsearch - @staticmethod - def _default_knn_mapping(dims: int) -> Dict: - """Generates a default index mapping for kNN search.""" + def _default_knn_mapping( + dims: int, similarity: Optional[str] = "dot_product" + ) -> Dict: return { "properties": { "text": {"type": "text"}, @@ -451,7 +424,7 @@ class ElasticKnnSearch(ElasticVectorSearch): "type": "dense_vector", "dims": dims, "index": True, - "similarity": "dot_product", + "similarity": similarity, }, } } @@ -490,6 +463,21 @@ class ElasticKnnSearch(ElasticVectorSearch): return knn + def similarity_search( + self, query: str, k: int = 4, filter: Optional[dict] = None, **kwargs: Any + ) -> List[Document]: + """ + Pass through to `knn_search` + """ + results = self.knn_search(query=query, k=k, **kwargs) + return [doc for doc, score in results] + + def similarity_search_with_score( + self, query: str, k: int = 10, **kwargs: Any + ) -> List[Tuple[Document, float]]: + """Pass through to `knn_search including score`""" + return self.knn_search(query=query, k=k, **kwargs) + def knn_search( self, query: Optional[str] = None, @@ -501,51 +489,62 @@ class ElasticKnnSearch(ElasticVectorSearch): fields: Optional[ Union[List[Mapping[str, Any]], Tuple[Mapping[str, Any], ...], None] ] = None, - ) -> Dict: + page_content: Optional[str] = "text", + ) -> List[Tuple[Document, float]]: """ - Performs a k-nearest neighbor (k-NN) search on the Elasticsearch index. - - The search can be conducted using either a raw query vector or a model ID. - The method first generates - the body of the search query, which can be interpreted by Elasticsearch. - It then performs the k-NN - search on the Elasticsearch index and returns the results. + Perform a k-NN search on the Elasticsearch index. Args: - query: The query or queries to be used for the search. Required if - `query_vector` is not provided. - k: The number of nearest neighbors to return. Defaults to 10. - query_vector: The query vector to be used for the search. Required if - `query` is not provided. - model_id: The ID of the model to use for generating the query vector, if - `query` is provided. - size: The number of search hits to return. Defaults to 10. - source: Whether to include the source of each hit in the results. - fields: The fields to include in the source of each hit. If None, all - fields are included. - vector_query_field: Field name to use in knn search if not default 'vector' + query (str, optional): The query text to search for. + k (int, optional): The number of nearest neighbors to return. + query_vector (List[float], optional): The query vector to search for. + model_id (str, optional): The ID of the model to use for transforming the + query text into a vector. + size (int, optional): The number of search results to return. + source (bool, optional): Whether to return the source of the search results. + fields (List[Mapping[str, Any]], optional): The fields to return in the + search results. + page_content (str, optional): The name of the field that contains the page + content. Returns: - The search results. - - Raises: - ValueError: If neither `query_vector` nor `model_id` is provided, or if - both are provided. + A list of tuples, where each tuple contains a Document object and a score. """ + # if not source and (fields == None or page_content not in fields): + if not source and ( + fields is None or not any(page_content in field for field in fields) + ): + raise ValueError("If source=False `page_content` field must be in `fields`") + knn_query_body = self._default_knn_query( query_vector=query_vector, query=query, model_id=model_id, k=k ) # Perform the kNN search on the Elasticsearch index and return the results. - res = self.client.search( + response = self.client.search( index=self.index_name, knn=knn_query_body, size=size, source=source, fields=fields, ) - return dict(res) + + hits = [hit for hit in response["hits"]["hits"]] + docs_and_scores = [ + ( + Document( + page_content=hit["_source"][page_content] + if source + else hit["fields"][page_content][0], + metadata=hit["fields"] if fields else {}, + ), + hit["_score"], + ) + for hit in hits + ] + + return docs_and_scores def knn_hybrid_search( self, @@ -560,43 +559,38 @@ class ElasticKnnSearch(ElasticVectorSearch): fields: Optional[ Union[List[Mapping[str, Any]], Tuple[Mapping[str, Any], ...], None] ] = None, - ) -> Dict[Any, Any]: - """Performs a hybrid k-nearest neighbor (k-NN) and text-based search on the - Elasticsearch index. - - The search can be conducted using either a raw query vector or a model ID. - The method first generates - the body of the k-NN search query and the text-based query, which can be - interpreted by Elasticsearch. - It then performs the hybrid search on the Elasticsearch index and returns the - results. + page_content: Optional[str] = "text", + ) -> List[Tuple[Document, float]]: + """ + Perform a hybrid k-NN and text search on the Elasticsearch index. Args: - query: The query or queries to be used for the search. Required if - `query_vector` is not provided. - k: The number of nearest neighbors to return. Defaults to 10. - query_vector: The query vector to be used for the search. Required if - `query` is not provided. - model_id: The ID of the model to use for generating the query vector, if - `query` is provided. - size: The number of search hits to return. Defaults to 10. - source: Whether to include the source of each hit in the results. - knn_boost: The boost factor for the k-NN part of the search. - query_boost: The boost factor for the text-based part of the search. - fields - The fields to include in the source of each hit. If None, all fields are - included. Defaults to None. - vector_query_field: Field name to use in knn search if not default 'vector' - query_field: Field name to use in search if not default 'text' + query (str, optional): The query text to search for. + k (int, optional): The number of nearest neighbors to return. + query_vector (List[float], optional): The query vector to search for. + model_id (str, optional): The ID of the model to use for transforming the + query text into a vector. + size (int, optional): The number of search results to return. + source (bool, optional): Whether to return the source of the search results. + knn_boost (float, optional): The boost value to apply to the k-NN search + results. + query_boost (float, optional): The boost value to apply to the text search + results. + fields (List[Mapping[str, Any]], optional): The fields to return in the + search results. + page_content (str, optional): The name of the field that contains the page + content. Returns: - The search results. - - Raises: - ValueError: If neither `query_vector` nor `model_id` is provided, or if - both are provided. + A list of tuples, where each tuple contains a Document object and a score. """ + # if not source and (fields == None or page_content not in fields): + if not source and ( + fields is None or not any(page_content in field for field in fields) + ): + raise ValueError("If source=False `page_content` field must be in `fields`") + knn_query_body = self._default_knn_query( query_vector=query_vector, query=query, model_id=model_id, k=k ) @@ -610,7 +604,7 @@ class ElasticKnnSearch(ElasticVectorSearch): } # Perform the hybrid search on the Elasticsearch index and return the results. - res = self.client.search( + response = self.client.search( index=self.index_name, query=match_query_body, knn=knn_query_body, @@ -618,4 +612,157 @@ class ElasticKnnSearch(ElasticVectorSearch): size=size, source=source, ) - return dict(res) + + hits = [hit for hit in response["hits"]["hits"]] + docs_and_scores = [ + ( + Document( + page_content=hit["_source"][page_content] + if source + else hit["fields"][page_content][0], + metadata=hit["fields"] if fields else {}, + ), + hit["_score"], + ) + for hit in hits + ] + + return docs_and_scores + + def create_knn_index(self, mapping: Dict) -> None: + """ + Create a new k-NN index in Elasticsearch. + + Args: + mapping (Dict): The mapping to use for the new index. + + Returns: + None + """ + + self.client.indices.create(index=self.index_name, mappings=mapping) + + def add_texts( + self, + texts: Iterable[str], + metadatas: Optional[List[Dict[Any, Any]]] = None, + model_id: Optional[str] = None, + refresh_indices: bool = False, + **kwargs: Any, + ) -> List[str]: + """ + Add a list of texts to the Elasticsearch index. + + Args: + texts (Iterable[str]): The texts to add to the index. + metadatas (List[Dict[Any, Any]], optional): A list of metadata dictionaries + to associate with the texts. + model_id (str, optional): The ID of the model to use for transforming the + texts into vectors. + refresh_indices (bool, optional): Whether to refresh the Elasticsearch + indices after adding the texts. + **kwargs: Arbitrary keyword arguments. + + Returns: + A list of IDs for the added texts. + """ + + # Check if the index exists. + if not self.client.indices.exists(index=self.index_name): + dims = kwargs.get("dims") + + if dims is None: + raise ValueError("ElasticKnnSearch requires 'dims' parameter") + + similarity = kwargs.get("similarity") + optional_args = {} + + if similarity is not None: + optional_args["similarity"] = similarity + + mapping = self._default_knn_mapping(dims=dims, **optional_args) + self.create_knn_index(mapping) + + embeddings = self.embedding.embed_documents(list(texts)) + + # body = [] + body: List[Mapping[str, Any]] = [] + for text, vector in zip(texts, embeddings): + body.extend( + [ + {"index": {"_index": self.index_name}}, + {"text": text, "vector": vector}, + ] + ) + + responses = self.client.bulk(operations=body) + + ids = [ + item["index"]["_id"] + for item in responses["items"] + if item["index"]["result"] == "created" + ] + + if refresh_indices: + self.client.indices.refresh(index=self.index_name) + + return ids + + @classmethod + def from_texts( + cls, + texts: List[str], + embedding: Embeddings, + metadatas: Optional[List[Dict[Any, Any]]] = None, + **kwargs: Any, + ) -> ElasticKnnSearch: + """ + Create a new ElasticKnnSearch instance and add a list of texts to the + Elasticsearch index. + + Args: + texts (List[str]): The texts to add to the index. + embedding (Embeddings): The embedding model to use for transforming the + texts into vectors. + metadatas (List[Dict[Any, Any]], optional): A list of metadata dictionaries + to associate with the texts. + **kwargs: Arbitrary keyword arguments. + + Returns: + A new ElasticKnnSearch instance. + """ + + index_name = kwargs.get("index_name", str(uuid.uuid4())) + es_connection = kwargs.get("es_connection") + es_cloud_id = kwargs.get("es_cloud_id") + es_user = kwargs.get("es_user") + es_password = kwargs.get("es_password") + vector_query_field = kwargs.get("vector_query_field", "vector") + query_field = kwargs.get("query_field", "text") + model_id = kwargs.get("model_id") + dims = kwargs.get("dims") + + if dims is None: + raise ValueError("ElasticKnnSearch requires 'dims' parameter") + + optional_args = {} + + if vector_query_field is not None: + optional_args["vector_query_field"] = vector_query_field + + if query_field is not None: + optional_args["query_field"] = query_field + + knnvectorsearch = cls( + index_name=index_name, + embedding=embedding, + es_connection=es_connection, + es_cloud_id=es_cloud_id, + es_user=es_user, + es_password=es_password, + **optional_args, + ) + # Encode the provided texts and add them to the newly created index. + knnvectorsearch.add_texts(texts, model_id=model_id, dims=dims, **optional_args) + + return knnvectorsearch