From 584a1e30acbcccf631d1ff2337902177353d2b67 Mon Sep 17 00:00:00 2001 From: Bagatur <22008038+baskaryan@users.noreply.github.com> Date: Wed, 5 Jun 2024 14:39:54 -0700 Subject: [PATCH] community[patch]: AzureSearch async functions (#22075) --- .../vectorstores/azuresearch.py | 767 +++++++++++++++++- 1 file changed, 735 insertions(+), 32 deletions(-) diff --git a/libs/community/langchain_community/vectorstores/azuresearch.py b/libs/community/langchain_community/vectorstores/azuresearch.py index 29177043cf..109d52f655 100644 --- a/libs/community/langchain_community/vectorstores/azuresearch.py +++ b/libs/community/langchain_community/vectorstores/azuresearch.py @@ -19,10 +19,14 @@ from typing import ( Tuple, Type, Union, + cast, ) import numpy as np -from langchain_core.callbacks import CallbackManagerForRetrieverRun +from langchain_core.callbacks import ( + AsyncCallbackManagerForRetrieverRun, + CallbackManagerForRetrieverRun, +) from langchain_core.documents import Document from langchain_core.embeddings import Embeddings from langchain_core.pydantic_v1 import root_validator @@ -36,6 +40,7 @@ logger = logging.getLogger() if TYPE_CHECKING: from azure.search.documents import SearchClient, SearchItemPaged + from azure.search.documents.aio import SearchClient as AsyncSearchClient from azure.search.documents.indexes.models import ( CorsOptions, ScoringProfile, @@ -80,11 +85,13 @@ def _get_search_client( default_fields: Optional[List[SearchField]] = None, user_agent: Optional[str] = "langchain", cors_options: Optional[CorsOptions] = None, -) -> SearchClient: + async_: bool = False, +) -> Union[SearchClient, AsyncSearchClient]: from azure.core.credentials import AzureKeyCredential from azure.core.exceptions import ResourceNotFoundError from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential from azure.search.documents import SearchClient + from azure.search.documents.aio import SearchClient as AsyncSearchClient from azure.search.documents.indexes import SearchIndexClient from azure.search.documents.indexes.models import ( ExhaustiveKnnAlgorithmConfiguration, @@ -212,12 +219,20 @@ def _get_search_client( ) index_client.create_index(index) # Create the search client - return SearchClient( - endpoint=endpoint, - index_name=index_name, - credential=credential, - user_agent=user_agent, - ) + if not async_: + return SearchClient( + endpoint=endpoint, + index_name=index_name, + credential=credential, + user_agent=user_agent, + ) + else: + return AsyncSearchClient( + endpoint=endpoint, + index_name=index_name, + credential=credential, + user_agent=user_agent, + ) class AzureSearch(VectorStore): @@ -243,12 +258,18 @@ class AzureSearch(VectorStore): vector_search_dimensions: Optional[int] = None, **kwargs: Any, ): - from azure.search.documents.indexes.models import ( - SearchableField, - SearchField, - SearchFieldDataType, - SimpleField, - ) + try: + from azure.search.documents.indexes.models import ( + SearchableField, + SearchField, + SearchFieldDataType, + SimpleField, + ) + except ImportError as e: + raise ImportError( + "Unable to import azure.search.documents. Please install with " + "`pip install -U azure-search-documents`." + ) from e """Initialize with necessary components.""" # Initialize base class @@ -304,24 +325,64 @@ class AzureSearch(VectorStore): self.semantic_configuration_name = semantic_configuration_name self.fields = fields if fields else default_fields + self._azure_search_endpoint = azure_search_endpoint + self._azure_search_key = azure_search_key + self._index_name = index_name + self._semantic_configuration_name = semantic_configuration_name + self._fields = fields + self._vector_search = vector_search + self._semantic_configurations = semantic_configurations + self._scoring_profiles = scoring_profiles + self._default_scoring_profile = default_scoring_profile + self._default_fields = default_fields + self._user_agent = user_agent + self._cors_options = cors_options + + def _async_client(self) -> AsyncSearchClient: + return _get_search_client( + self._azure_search_endpoint, + self._azure_search_key, + self._index_name, + semantic_configuration_name=self._semantic_configuration_name, + fields=self._fields, + vector_search=self._vector_search, + semantic_configurations=self._semantic_configurations, + scoring_profiles=self._scoring_profiles, + default_scoring_profile=self._default_scoring_profile, + default_fields=self._default_fields, + user_agent=self._user_agent, + cors_options=self._cors_options, + async_=True, + ) + @property def embeddings(self) -> Optional[Embeddings]: # TODO: Support embedding object directly - return None + return ( + self.embedding_function + if isinstance(self.embedding_function, Embeddings) + else None + ) + + async def _aembed_query(self, text: str) -> List[float]: + if self.embeddings: + return await self.embeddings.aembed_query(text) + else: + return cast(Callable, self.embedding_function)(text) def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, + *, + keys: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """Add texts data to an existing index.""" - keys = kwargs.get("keys") - # batching support if embedding function is an Embeddings object if isinstance(self.embedding_function, Embeddings): try: - embeddings = self.embedding_function.embed_documents(texts) # type: ignore[arg-type] + embeddings = self.embedding_function.embed_documents(list(texts)) except NotImplementedError: embeddings = [self.embedding_function.embed_query(x) for x in texts] else: @@ -333,6 +394,30 @@ class AzureSearch(VectorStore): return self.add_embeddings(zip(texts, embeddings), metadatas, keys=keys) + async def aadd_texts( + self, + texts: Iterable[str], + metadatas: Optional[List[dict]] = None, + *, + keys: Optional[List[str]] = None, + **kwargs: Any, + ) -> List[str]: + if isinstance(self.embedding_function, Embeddings): + try: + embeddings = await self.embedding_function.aembed_documents(list(texts)) + except NotImplementedError: + embeddings = [ + await self.embedding_function.aembed_query(x) for x in texts + ] + else: + embeddings = [self.embedding_function(x) for x in texts] + + if len(embeddings) == 0: + logger.debug("Nothing to insert, skipping.") + return [] + + return await self.aadd_embeddings(zip(texts, embeddings), metadatas, keys=keys) + def add_embeddings( self, text_embeddings: Iterable[Tuple[str, List[float]]], @@ -390,6 +475,65 @@ class AzureSearch(VectorStore): else: raise Exception(response) + async def aadd_embeddings( + self, + text_embeddings: Iterable[Tuple[str, List[float]]], + metadatas: Optional[List[dict]] = None, + *, + keys: Optional[List[str]] = None, + ) -> List[str]: + """Add embeddings to an existing index.""" + ids = [] + + # Write data to index + data = [] + for i, (text, embedding) in enumerate(text_embeddings): + # Use provided key otherwise use default key + key = keys[i] if keys else str(uuid.uuid4()) + # Encoding key for Azure Search valid characters + key = base64.urlsafe_b64encode(bytes(key, "utf-8")).decode("ascii") + metadata = metadatas[i] if metadatas else {} + # Add data to index + # Additional metadata to fields mapping + doc = { + "@search.action": "upload", + FIELDS_ID: key, + FIELDS_CONTENT: text, + FIELDS_CONTENT_VECTOR: np.array(embedding, dtype=np.float32).tolist(), + FIELDS_METADATA: json.dumps(metadata), + } + if metadata: + additional_fields = { + k: v + for k, v in metadata.items() + if k in [x.name for x in self.fields] + } + doc.update(additional_fields) + data.append(doc) + ids.append(key) + # Upload data in batches + if len(data) == MAX_UPLOAD_BATCH_SIZE: + async with self._async_client() as async_client: + response = await async_client.upload_documents(documents=data) + # Check if all documents were successfully uploaded + if not all(r.succeeded for r in response): + raise Exception(response) + # Reset data + data = [] + + # Considering case where data is an exact multiple of batch-size entries + if len(data) == 0: + return ids + + # Upload data to index + async with self._async_client() as async_client: + response = await async_client.upload_documents(documents=data) + # Check if all documents were successfully uploaded + if all(r.succeeded for r in response): + return ids + else: + raise Exception(response) + def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> bool: """Delete by vector ID. @@ -406,10 +550,32 @@ class AzureSearch(VectorStore): else: return False + async def adelete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> bool: + """Delete by vector ID. + + Args: + ids: List of ids to delete. + + Returns: + bool: True if deletion is successful, + False otherwise. + """ + if ids: + async with self._async_client() as async_client: + res = await async_client.delete_documents([{"id": i} for i in ids]) + return len(res) > 0 + else: + return False + def similarity_search( - self, query: str, k: int = 4, **kwargs: Any + self, + query: str, + k: int = 4, + *, + search_type: Optional[str] = None, + **kwargs: Any, ) -> List[Document]: - search_type = kwargs.get("search_type", self.search_type) + search_type = search_type or self.search_type if search_type == "similarity": docs = self.vector_search(query, k=k, **kwargs) elif search_type == "hybrid": @@ -420,10 +586,61 @@ class AzureSearch(VectorStore): raise ValueError(f"search_type of {search_type} not allowed.") return docs + def similarity_search_with_score( + self, query: str, *, k: int = 4, **kwargs: Any + ) -> List[Tuple[Document, float]]: + """Run similarity search with distance.""" + search_type = kwargs.get("search_type", self.search_type) + if search_type == "similarity": + return self.vector_search_with_score(query, k=k, **kwargs) + elif search_type == "hybrid": + return self.hybrid_search_with_score(query, k=k, **kwargs) + elif search_type == "semantic_hybrid": + return self.semantic_hybrid_search_with_score(query, k=k, **kwargs) + else: + raise ValueError(f"search_type of {search_type} not allowed.") + + async def asimilarity_search( + self, + query: str, + k: int = 4, + *, + search_type: Optional[str] = None, + **kwargs: Any, + ) -> List[Document]: + search_type = search_type or self.search_type + if search_type == "similarity": + docs = await self.avector_search(query, k=k, **kwargs) + elif search_type == "hybrid": + docs = await self.ahybrid_search(query, k=k, **kwargs) + elif search_type == "semantic_hybrid": + docs = await self.asemantic_hybrid_search(query, k=k, **kwargs) + else: + raise ValueError(f"search_type of {search_type} not allowed.") + return docs + + async def asimilarity_search_with_score( + self, query: str, *, k: int = 4, **kwargs: Any + ) -> List[Tuple[Document, float]]: + """Run similarity search with distance.""" + search_type = kwargs.get("search_type", self.search_type) + if search_type == "similarity": + return await self.avector_search_with_score(query, k=k, **kwargs) + elif search_type == "hybrid": + return await self.ahybrid_search_with_score(query, k=k, **kwargs) + elif search_type == "semantic_hybrid": + return await self.asemantic_hybrid_search_with_score(query, k=k, **kwargs) + else: + raise ValueError(f"search_type of {search_type} not allowed.") + def similarity_search_with_relevance_scores( - self, query: str, k: int = 4, **kwargs: Any + self, + query: str, + k: int = 4, + *, + score_threshold: Optional[float] = None, + **kwargs: Any, ) -> List[Tuple[Document, float]]: - score_threshold = kwargs.pop("score_threshold", None) result = self.vector_search_with_score(query, k=k, **kwargs) return ( result @@ -431,7 +648,40 @@ class AzureSearch(VectorStore): else [r for r in result if r[1] >= score_threshold] ) - def vector_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]: + async def asimilarity_search_with_relevance_scores( + self, + query: str, + k: int = 4, + *, + score_threshold: Optional[float] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + result = await self.avector_search_with_score(query, k=k, **kwargs) + return ( + result + if score_threshold is None + else [r for r in result if r[1] >= score_threshold] + ) + + def vector_search( + self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any + ) -> List[Document]: + """ + Returns the most similar indexed documents to the query text. + + Args: + query (str): The query text for which to find similar documents. + k (int): The number of documents to return. Default is 4. + + Returns: + List[Document]: A list of documents that are most similar to the query text. + """ + docs_and_scores = self.vector_search_with_score(query, k=k, filters=filters) + return [doc for doc, _ in docs_and_scores] + + async def avector_search( + self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any + ) -> List[Document]: """ Returns the most similar indexed documents to the query text. @@ -442,8 +692,8 @@ class AzureSearch(VectorStore): Returns: List[Document]: A list of documents that are most similar to the query text. """ - docs_and_scores = self.vector_search_with_score( - query, k=k, filters=kwargs.get("filters", None) + docs_and_scores = await self.avector_search_with_score( + query, k=k, filters=filters ) return [doc for doc, _ in docs_and_scores] @@ -470,6 +720,31 @@ class AzureSearch(VectorStore): return _results_to_documents(results) + async def avector_search_with_score( + self, + query: str, + k: int = 4, + filters: Optional[str] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Return docs most similar to query. + + Args: + query (str): Text to look up documents similar to. + k (int, optional): Number of Documents to return. Defaults to 4. + filters (str, optional): Filtering expression. Defaults to None. + + Returns: + List[Tuple[Document, float]]: List of Documents most similar + to the query and score for each + """ + embedding = await self._aembed_query(query) + docs, scores, _ = await self._asimple_search( + embedding, "", k, filters=filters, **kwargs + ) + + return list(zip(docs, scores)) + def max_marginal_relevance_search_with_score( self, query: str, @@ -504,6 +779,47 @@ class AzureSearch(VectorStore): results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k ) + async def amax_marginal_relevance_search_with_score( + self, + query: str, + k: int = 4, + fetch_k: int = 20, + lambda_mult: float = 0.5, + *, + filters: Optional[str] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Perform a search and return results that are reordered by MMR. + + Args: + query (str): Text to look up documents similar to. + k (int, optional): How many results to give. Defaults to 4. + fetch_k (int, optional): Total results to select k from. + Defaults to 20. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5 + filters (str, optional): Filtering expression. Defaults to None. + + Returns: + List[Tuple[Document, float]]: List of Documents most similar + to the query and score for each + """ + embedding = await self._aembed_query(query) + docs, scores, vectors = await self._asimple_search( + embedding, "", fetch_k, filters=filters, **kwargs + ) + + return await self._areorder_results_with_maximal_marginal_relevance( + docs, + scores, + vectors, + query_embedding=np.array(embedding), + lambda_mult=lambda_mult, + k=k, + ) + def hybrid_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]: """ Returns the most similar indexed documents to the query text. @@ -518,6 +834,22 @@ class AzureSearch(VectorStore): docs_and_scores = self.hybrid_search_with_score(query, k=k, **kwargs) return [doc for doc, _ in docs_and_scores] + async def ahybrid_search( + self, query: str, k: int = 4, **kwargs: Any + ) -> List[Document]: + """ + Returns the most similar indexed documents to the query text. + + Args: + query (str): The query text for which to find similar documents. + k (int): The number of documents to return. Default is 4. + + Returns: + List[Document]: A list of documents that are most similar to the query text. + """ + docs_and_scores = await self.ahybrid_search_with_score(query, k=k, **kwargs) + return [doc for doc, _ in docs_and_scores] + def hybrid_search_with_score( self, query: str, @@ -540,10 +872,38 @@ class AzureSearch(VectorStore): return _results_to_documents(results) + async def ahybrid_search_with_score( + self, + query: str, + k: int = 4, + filters: Optional[str] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Return docs most similar to query with a hybrid query. + + Args: + query: Text to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + + Returns: + List of Documents most similar to the query and score for each + """ + + embedding = await self._aembed_query(query) + docs, scores, _ = await self._asimple_search( + embedding, query, k, filters=filters, **kwargs + ) + + return list(zip(docs, scores)) + def hybrid_search_with_relevance_scores( - self, query: str, k: int = 4, **kwargs: Any + self, + query: str, + k: int = 4, + *, + score_threshold: Optional[float] = None, + **kwargs: Any, ) -> List[Tuple[Document, float]]: - score_threshold = kwargs.pop("score_threshold", None) result = self.hybrid_search_with_score(query, k=k, **kwargs) return ( result @@ -551,6 +911,21 @@ class AzureSearch(VectorStore): else [r for r in result if r[1] >= score_threshold] ) + async def ahybrid_search_with_relevance_scores( + self, + query: str, + k: int = 4, + *, + score_threshold: Optional[float] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + result = await self.ahybrid_search_with_score(query, k=k, **kwargs) + return ( + result + if score_threshold is None + else [r for r in result if r[1] >= score_threshold] + ) + def hybrid_max_marginal_relevance_search_with_score( self, query: str, @@ -588,6 +963,48 @@ class AzureSearch(VectorStore): results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k ) + async def ahybrid_max_marginal_relevance_search_with_score( + self, + query: str, + k: int = 4, + fetch_k: int = 20, + lambda_mult: float = 0.5, + *, + filters: Optional[str] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Return docs most similar to query with a hybrid query + and reorder results by MMR. + + Args: + query (str): Text to look up documents similar to. + k (int, optional): Number of Documents to return. Defaults to 4. + fetch_k (int, optional): Total results to select k from. + Defaults to 20. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5 + filters (str, optional): Filtering expression. Defaults to None. + + Returns: + List of Documents most similar to the query and score for each + """ + + embedding = await self._aembed_query(query) + docs, scores, vectors = await self._asimple_search( + embedding, query, fetch_k, filters=filters, **kwargs + ) + + return await self._areorder_results_with_maximal_marginal_relevance( + docs, + scores, + vectors, + query_embedding=np.array(embedding), + lambda_mult=lambda_mult, + k=k, + ) + def _simple_search( self, embedding: List[float], @@ -624,6 +1041,55 @@ class AzureSearch(VectorStore): **kwargs, ) + async def _asimple_search( + self, + embedding: List[float], + text_query: str, + k: int, + *, + filters: Optional[str] = None, + **kwargs: Any, + ) -> Tuple[List[Document], List[float], List[List[float]]]: + """Perform vector or hybrid search in the Azure search index. + + Args: + embedding: A vector embedding to search in the vector space. + text_query: A full-text search query expression; + Use "*" or omit this parameter to perform only vector search. + k: Number of documents to return. + filters: Filtering expression. + Returns: + Search items + """ + from azure.search.documents.models import VectorizedQuery + + async with self._async_client() as async_client: + results = await async_client.search( + search_text=text_query, + vector_queries=[ + VectorizedQuery( + vector=np.array(embedding, dtype=np.float32).tolist(), + k_nearest_neighbors=k, + fields=FIELDS_CONTENT_VECTOR, + ) + ], + filter=filters, + top=k, + **kwargs, + ) + docs = [ + ( + _result_to_document(result), + float(result["@search.score"]), + result[FIELDS_CONTENT_VECTOR], + ) + async for result in results + ] + if not docs: + raise ValueError(f"No {docs=}") + documents, scores, vectors = map(list, zip(*docs)) + return documents, scores, vectors + def semantic_hybrid_search( self, query: str, k: int = 4, **kwargs: Any ) -> List[Document]: @@ -643,11 +1109,32 @@ class AzureSearch(VectorStore): ) return [doc for doc, _, _ in docs_and_scores] + async def asemantic_hybrid_search( + self, query: str, k: int = 4, **kwargs: Any + ) -> List[Document]: + """ + Returns the most similar indexed documents to the query text. + + Args: + query (str): The query text for which to find similar documents. + k (int): The number of documents to return. Default is 4. + filters: Filtering expression. + + Returns: + List[Document]: A list of documents that are most similar to the query text. + """ + docs_and_scores = await self.asemantic_hybrid_search_with_score_and_rerank( + query, k=k, **kwargs + ) + return [doc for doc, _, _ in docs_and_scores] + def semantic_hybrid_search_with_score( self, query: str, k: int = 4, score_type: Literal["score", "reranker_score"] = "score", + *, + score_threshold: Optional[float] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """ @@ -664,7 +1151,6 @@ class AzureSearch(VectorStore): List[Tuple[Document, float]]: A list of documents and their corresponding scores. """ - score_threshold = kwargs.pop("score_threshold", None) docs_and_scores = self.semantic_hybrid_search_with_score_and_rerank( query, k=k, **kwargs ) @@ -681,6 +1167,45 @@ class AzureSearch(VectorStore): if score_threshold is None or reranker_score >= score_threshold ] + async def asemantic_hybrid_search_with_score( + self, + query: str, + k: int = 4, + score_type: Literal["score", "reranker_score"] = "score", + *, + score_threshold: Optional[float] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """ + Returns the most similar indexed documents to the query text. + + Args: + query (str): The query text for which to find similar documents. + k (int): The number of documents to return. Default is 4. + score_type: Must either be "score" or "reranker_score". + Defaulted to "score". + filters: Filtering expression. + + Returns: + List[Tuple[Document, float]]: A list of documents and their + corresponding scores. + """ + docs_and_scores = await self.asemantic_hybrid_search_with_score_and_rerank( + query, k=k, **kwargs + ) + if score_type == "score": + return [ + (doc, score) + for doc, score, _ in docs_and_scores + if score_threshold is None or score >= score_threshold + ] + elif score_type == "reranker_score": + return [ + (doc, reranker_score) + for doc, _, reranker_score in docs_and_scores + if score_threshold is None or reranker_score >= score_threshold + ] + def semantic_hybrid_search_with_score_and_rerank( self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any ) -> List[Tuple[Document, float, float]]: @@ -759,6 +1284,88 @@ class AzureSearch(VectorStore): ] return docs + async def asemantic_hybrid_search_with_score_and_rerank( + self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any + ) -> List[Tuple[Document, float, float]]: + """Return docs most similar to query with a hybrid query. + + Args: + query: Text to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + filters: Filtering expression. + + Returns: + List of Documents most similar to the query and score for each + """ + from azure.search.documents.models import VectorizedQuery + + vector = await self._aembed_query(query) + async with self._async_client() as async_client: + results = await async_client.search( + search_text=query, + vector_queries=[ + VectorizedQuery( + vector=np.array(vector, dtype=np.float32).tolist(), + k_nearest_neighbors=k, + fields=FIELDS_CONTENT_VECTOR, + ) + ], + filter=filters, + query_type="semantic", + semantic_configuration_name=self.semantic_configuration_name, + query_caption="extractive", + query_answer="extractive", + top=k, + **kwargs, + ) + # Get Semantic Answers + semantic_answers = (await results.get_answers()) or [] + semantic_answers_dict: Dict = {} + for semantic_answer in semantic_answers: + semantic_answers_dict[semantic_answer.key] = { + "text": semantic_answer.text, + "highlights": semantic_answer.highlights, + } + # Convert results to Document objects + docs = [ + ( + Document( + page_content=result.pop(FIELDS_CONTENT), + metadata={ + **( + json.loads(result[FIELDS_METADATA]) + if FIELDS_METADATA in result + else { + k: v + for k, v in result.items() + if k != FIELDS_CONTENT_VECTOR + } + ), + **{ + "captions": { + "text": result.get("@search.captions", [{}])[ + 0 + ].text, + "highlights": result.get("@search.captions", [{}])[ + 0 + ].highlights, + } + if result.get("@search.captions") + else {}, + "answers": semantic_answers_dict.get( + result.get(FIELDS_ID, ""), + "", + ), + }, + }, + ), + float(result["@search.score"]), + float(result["@search.reranker_score"]), + ) + async for result in results + ] + return docs + @classmethod def from_texts( cls: Type[AzureSearch], @@ -783,6 +1390,30 @@ class AzureSearch(VectorStore): azure_search.add_texts(texts, metadatas, **kwargs) return azure_search + @classmethod + async def afrom_texts( + cls: Type[AzureSearch], + texts: List[str], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + azure_search_endpoint: str = "", + azure_search_key: str = "", + index_name: str = "langchain-index", + fields: Optional[List[SearchField]] = None, + **kwargs: Any, + ) -> AzureSearch: + # Creating a new Azure Search instance + azure_search = cls( + azure_search_endpoint, + azure_search_key, + index_name, + embedding, + fields=fields, + **kwargs, + ) + await azure_search.aadd_texts(texts, metadatas, **kwargs) + return azure_search + @classmethod async def afrom_embeddings( cls: Type[AzureSearch], @@ -796,16 +1427,22 @@ class AzureSearch(VectorStore): fields: Optional[List[SearchField]] = None, **kwargs: Any, ) -> AzureSearch: - return cls.from_embeddings( - text_embeddings, - embedding, - metadatas=metadatas, + text_embeddings, first_text_embedding = _peek(text_embeddings) + if first_text_embedding is None: + raise ValueError("Cannot create AzureSearch from empty embeddings.") + vector_search_dimensions = len(first_text_embedding[1]) + + azure_search = cls( azure_search_endpoint=azure_search_endpoint, azure_search_key=azure_search_key, index_name=index_name, + embedding_function=embedding, fields=fields, + vector_search_dimensions=vector_search_dimensions, **kwargs, ) + await azure_search.aadd_embeddings(text_embeddings, metadatas, **kwargs) + return azure_search @classmethod def from_embeddings( @@ -838,6 +1475,30 @@ class AzureSearch(VectorStore): azure_search.add_embeddings(text_embeddings, metadatas, **kwargs) return azure_search + async def _areorder_results_with_maximal_marginal_relevance( + self, + documents: List[Document], + scores: List[float], + vectors: List[List[float]], + query_embedding: np.ndarray, + lambda_mult: float = 0.5, + k: int = 4, + ) -> List[Tuple[Document, float]]: + # Get the new order of results. + new_ordering = maximal_marginal_relevance( + query_embedding, vectors, k=k, lambda_mult=lambda_mult + ) + + # Reorder the values and return. + ret: List[Tuple[Document, float]] = [] + for x in new_ordering: + # Function can return -1 index + if x == -1: + break + ret.append((documents[x], scores[x])) # type: ignore + + return ret + def as_retriever(self, **kwargs: Any) -> AzureSearchVectorStoreRetriever: # type: ignore """Return AzureSearchVectorStoreRetriever initialized from this VectorStore. @@ -949,6 +1610,48 @@ class AzureSearchVectorStoreRetriever(BaseRetriever): raise ValueError(f"search_type of {self.search_type} not allowed.") return docs + async def _aget_relevant_documents( + self, + query: str, + *, + run_manager: AsyncCallbackManagerForRetrieverRun, + **kwargs: Any, + ) -> List[Document]: + params = {**self.search_kwargs, **kwargs} + + if self.search_type == "similarity": + docs = await self.vectorstore.avector_search(query, k=self.k, **params) + elif self.search_type == "similarity_score_threshold": + docs_and_scores = ( + await self.vectorstore.asimilarity_search_with_relevance_scores( + query, k=self.k, **params + ) + ) + docs = [doc for doc, _ in docs_and_scores] + elif self.search_type == "hybrid": + docs = await self.vectorstore.ahybrid_search(query, k=self.k, **params) + elif self.search_type == "hybrid_score_threshold": + docs_and_scores = ( + await self.vectorstore.ahybrid_search_with_relevance_scores( + query, k=self.k, **params + ) + ) + docs = [doc for doc, _ in docs_and_scores] + elif self.search_type == "semantic_hybrid": + docs = await self.vectorstore.asemantic_hybrid_search( + query, k=self.k, **params + ) + elif self.search_type == "semantic_hybrid_score_threshold": + docs = [ + doc + for doc, _ in await self.vectorstore.asemantic_hybrid_search_with_score( + query, k=self.k, **params + ) + ] + else: + raise ValueError(f"search_type of {self.search_type} not allowed.") + return docs + def _results_to_documents( results: SearchItemPaged[Dict],