community[patch]: AzureSearch async functions (#22075)

pull/22577/head
Bagatur 4 months ago committed by GitHub
parent 1a911018bc
commit 584a1e30ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -19,10 +19,14 @@ from typing import (
Tuple, Tuple,
Type, Type,
Union, Union,
cast,
) )
import numpy as np import numpy as np
from langchain_core.callbacks import CallbackManagerForRetrieverRun from langchain_core.callbacks import (
AsyncCallbackManagerForRetrieverRun,
CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import root_validator from langchain_core.pydantic_v1 import root_validator
@ -36,6 +40,7 @@ logger = logging.getLogger()
if TYPE_CHECKING: if TYPE_CHECKING:
from azure.search.documents import SearchClient, SearchItemPaged from azure.search.documents import SearchClient, SearchItemPaged
from azure.search.documents.aio import SearchClient as AsyncSearchClient
from azure.search.documents.indexes.models import ( from azure.search.documents.indexes.models import (
CorsOptions, CorsOptions,
ScoringProfile, ScoringProfile,
@ -80,11 +85,13 @@ def _get_search_client(
default_fields: Optional[List[SearchField]] = None, default_fields: Optional[List[SearchField]] = None,
user_agent: Optional[str] = "langchain", user_agent: Optional[str] = "langchain",
cors_options: Optional[CorsOptions] = None, cors_options: Optional[CorsOptions] = None,
) -> SearchClient: async_: bool = False,
) -> Union[SearchClient, AsyncSearchClient]:
from azure.core.credentials import AzureKeyCredential from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import ResourceNotFoundError from azure.core.exceptions import ResourceNotFoundError
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.search.documents import SearchClient from azure.search.documents import SearchClient
from azure.search.documents.aio import SearchClient as AsyncSearchClient
from azure.search.documents.indexes import SearchIndexClient from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import ( from azure.search.documents.indexes.models import (
ExhaustiveKnnAlgorithmConfiguration, ExhaustiveKnnAlgorithmConfiguration,
@ -212,12 +219,20 @@ def _get_search_client(
) )
index_client.create_index(index) index_client.create_index(index)
# Create the search client # Create the search client
return SearchClient( if not async_:
endpoint=endpoint, return SearchClient(
index_name=index_name, endpoint=endpoint,
credential=credential, index_name=index_name,
user_agent=user_agent, credential=credential,
) user_agent=user_agent,
)
else:
return AsyncSearchClient(
endpoint=endpoint,
index_name=index_name,
credential=credential,
user_agent=user_agent,
)
class AzureSearch(VectorStore): class AzureSearch(VectorStore):
@ -243,12 +258,18 @@ class AzureSearch(VectorStore):
vector_search_dimensions: Optional[int] = None, vector_search_dimensions: Optional[int] = None,
**kwargs: Any, **kwargs: Any,
): ):
from azure.search.documents.indexes.models import ( try:
SearchableField, from azure.search.documents.indexes.models import (
SearchField, SearchableField,
SearchFieldDataType, SearchField,
SimpleField, SearchFieldDataType,
) SimpleField,
)
except ImportError as e:
raise ImportError(
"Unable to import azure.search.documents. Please install with "
"`pip install -U azure-search-documents`."
) from e
"""Initialize with necessary components.""" """Initialize with necessary components."""
# Initialize base class # Initialize base class
@ -304,24 +325,64 @@ class AzureSearch(VectorStore):
self.semantic_configuration_name = semantic_configuration_name self.semantic_configuration_name = semantic_configuration_name
self.fields = fields if fields else default_fields self.fields = fields if fields else default_fields
self._azure_search_endpoint = azure_search_endpoint
self._azure_search_key = azure_search_key
self._index_name = index_name
self._semantic_configuration_name = semantic_configuration_name
self._fields = fields
self._vector_search = vector_search
self._semantic_configurations = semantic_configurations
self._scoring_profiles = scoring_profiles
self._default_scoring_profile = default_scoring_profile
self._default_fields = default_fields
self._user_agent = user_agent
self._cors_options = cors_options
def _async_client(self) -> AsyncSearchClient:
return _get_search_client(
self._azure_search_endpoint,
self._azure_search_key,
self._index_name,
semantic_configuration_name=self._semantic_configuration_name,
fields=self._fields,
vector_search=self._vector_search,
semantic_configurations=self._semantic_configurations,
scoring_profiles=self._scoring_profiles,
default_scoring_profile=self._default_scoring_profile,
default_fields=self._default_fields,
user_agent=self._user_agent,
cors_options=self._cors_options,
async_=True,
)
@property @property
def embeddings(self) -> Optional[Embeddings]: def embeddings(self) -> Optional[Embeddings]:
# TODO: Support embedding object directly # TODO: Support embedding object directly
return None return (
self.embedding_function
if isinstance(self.embedding_function, Embeddings)
else None
)
async def _aembed_query(self, text: str) -> List[float]:
if self.embeddings:
return await self.embeddings.aembed_query(text)
else:
return cast(Callable, self.embedding_function)(text)
def add_texts( def add_texts(
self, self,
texts: Iterable[str], texts: Iterable[str],
metadatas: Optional[List[dict]] = None, metadatas: Optional[List[dict]] = None,
*,
keys: Optional[List[str]] = None,
**kwargs: Any, **kwargs: Any,
) -> List[str]: ) -> List[str]:
"""Add texts data to an existing index.""" """Add texts data to an existing index."""
keys = kwargs.get("keys")
# batching support if embedding function is an Embeddings object # batching support if embedding function is an Embeddings object
if isinstance(self.embedding_function, Embeddings): if isinstance(self.embedding_function, Embeddings):
try: try:
embeddings = self.embedding_function.embed_documents(texts) # type: ignore[arg-type] embeddings = self.embedding_function.embed_documents(list(texts))
except NotImplementedError: except NotImplementedError:
embeddings = [self.embedding_function.embed_query(x) for x in texts] embeddings = [self.embedding_function.embed_query(x) for x in texts]
else: else:
@ -333,6 +394,30 @@ class AzureSearch(VectorStore):
return self.add_embeddings(zip(texts, embeddings), metadatas, keys=keys) return self.add_embeddings(zip(texts, embeddings), metadatas, keys=keys)
async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
*,
keys: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
if isinstance(self.embedding_function, Embeddings):
try:
embeddings = await self.embedding_function.aembed_documents(list(texts))
except NotImplementedError:
embeddings = [
await self.embedding_function.aembed_query(x) for x in texts
]
else:
embeddings = [self.embedding_function(x) for x in texts]
if len(embeddings) == 0:
logger.debug("Nothing to insert, skipping.")
return []
return await self.aadd_embeddings(zip(texts, embeddings), metadatas, keys=keys)
def add_embeddings( def add_embeddings(
self, self,
text_embeddings: Iterable[Tuple[str, List[float]]], text_embeddings: Iterable[Tuple[str, List[float]]],
@ -390,6 +475,65 @@ class AzureSearch(VectorStore):
else: else:
raise Exception(response) raise Exception(response)
async def aadd_embeddings(
self,
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
*,
keys: Optional[List[str]] = None,
) -> List[str]:
"""Add embeddings to an existing index."""
ids = []
# Write data to index
data = []
for i, (text, embedding) in enumerate(text_embeddings):
# Use provided key otherwise use default key
key = keys[i] if keys else str(uuid.uuid4())
# Encoding key for Azure Search valid characters
key = base64.urlsafe_b64encode(bytes(key, "utf-8")).decode("ascii")
metadata = metadatas[i] if metadatas else {}
# Add data to index
# Additional metadata to fields mapping
doc = {
"@search.action": "upload",
FIELDS_ID: key,
FIELDS_CONTENT: text,
FIELDS_CONTENT_VECTOR: np.array(embedding, dtype=np.float32).tolist(),
FIELDS_METADATA: json.dumps(metadata),
}
if metadata:
additional_fields = {
k: v
for k, v in metadata.items()
if k in [x.name for x in self.fields]
}
doc.update(additional_fields)
data.append(doc)
ids.append(key)
# Upload data in batches
if len(data) == MAX_UPLOAD_BATCH_SIZE:
async with self._async_client() as async_client:
response = await async_client.upload_documents(documents=data)
# Check if all documents were successfully uploaded
if not all(r.succeeded for r in response):
raise Exception(response)
# Reset data
data = []
# Considering case where data is an exact multiple of batch-size entries
if len(data) == 0:
return ids
# Upload data to index
async with self._async_client() as async_client:
response = await async_client.upload_documents(documents=data)
# Check if all documents were successfully uploaded
if all(r.succeeded for r in response):
return ids
else:
raise Exception(response)
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> bool: def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> bool:
"""Delete by vector ID. """Delete by vector ID.
@ -406,10 +550,32 @@ class AzureSearch(VectorStore):
else: else:
return False return False
async def adelete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> bool:
"""Delete by vector ID.
Args:
ids: List of ids to delete.
Returns:
bool: True if deletion is successful,
False otherwise.
"""
if ids:
async with self._async_client() as async_client:
res = await async_client.delete_documents([{"id": i} for i in ids])
return len(res) > 0
else:
return False
def similarity_search( def similarity_search(
self, query: str, k: int = 4, **kwargs: Any self,
query: str,
k: int = 4,
*,
search_type: Optional[str] = None,
**kwargs: Any,
) -> List[Document]: ) -> List[Document]:
search_type = kwargs.get("search_type", self.search_type) search_type = search_type or self.search_type
if search_type == "similarity": if search_type == "similarity":
docs = self.vector_search(query, k=k, **kwargs) docs = self.vector_search(query, k=k, **kwargs)
elif search_type == "hybrid": elif search_type == "hybrid":
@ -420,10 +586,61 @@ class AzureSearch(VectorStore):
raise ValueError(f"search_type of {search_type} not allowed.") raise ValueError(f"search_type of {search_type} not allowed.")
return docs return docs
def similarity_search_with_score(
self, query: str, *, k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""Run similarity search with distance."""
search_type = kwargs.get("search_type", self.search_type)
if search_type == "similarity":
return self.vector_search_with_score(query, k=k, **kwargs)
elif search_type == "hybrid":
return self.hybrid_search_with_score(query, k=k, **kwargs)
elif search_type == "semantic_hybrid":
return self.semantic_hybrid_search_with_score(query, k=k, **kwargs)
else:
raise ValueError(f"search_type of {search_type} not allowed.")
async def asimilarity_search(
self,
query: str,
k: int = 4,
*,
search_type: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
search_type = search_type or self.search_type
if search_type == "similarity":
docs = await self.avector_search(query, k=k, **kwargs)
elif search_type == "hybrid":
docs = await self.ahybrid_search(query, k=k, **kwargs)
elif search_type == "semantic_hybrid":
docs = await self.asemantic_hybrid_search(query, k=k, **kwargs)
else:
raise ValueError(f"search_type of {search_type} not allowed.")
return docs
async def asimilarity_search_with_score(
self, query: str, *, k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""Run similarity search with distance."""
search_type = kwargs.get("search_type", self.search_type)
if search_type == "similarity":
return await self.avector_search_with_score(query, k=k, **kwargs)
elif search_type == "hybrid":
return await self.ahybrid_search_with_score(query, k=k, **kwargs)
elif search_type == "semantic_hybrid":
return await self.asemantic_hybrid_search_with_score(query, k=k, **kwargs)
else:
raise ValueError(f"search_type of {search_type} not allowed.")
def similarity_search_with_relevance_scores( def similarity_search_with_relevance_scores(
self, query: str, k: int = 4, **kwargs: Any self,
query: str,
k: int = 4,
*,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]: ) -> List[Tuple[Document, float]]:
score_threshold = kwargs.pop("score_threshold", None)
result = self.vector_search_with_score(query, k=k, **kwargs) result = self.vector_search_with_score(query, k=k, **kwargs)
return ( return (
result result
@ -431,7 +648,40 @@ class AzureSearch(VectorStore):
else [r for r in result if r[1] >= score_threshold] else [r for r in result if r[1] >= score_threshold]
) )
def vector_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]: async def asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
*,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
result = await self.avector_search_with_score(query, k=k, **kwargs)
return (
result
if score_threshold is None
else [r for r in result if r[1] >= score_threshold]
)
def vector_search(
self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any
) -> List[Document]:
"""
Returns the most similar indexed documents to the query text.
Args:
query (str): The query text for which to find similar documents.
k (int): The number of documents to return. Default is 4.
Returns:
List[Document]: A list of documents that are most similar to the query text.
"""
docs_and_scores = self.vector_search_with_score(query, k=k, filters=filters)
return [doc for doc, _ in docs_and_scores]
async def avector_search(
self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any
) -> List[Document]:
""" """
Returns the most similar indexed documents to the query text. Returns the most similar indexed documents to the query text.
@ -442,8 +692,8 @@ class AzureSearch(VectorStore):
Returns: Returns:
List[Document]: A list of documents that are most similar to the query text. List[Document]: A list of documents that are most similar to the query text.
""" """
docs_and_scores = self.vector_search_with_score( docs_and_scores = await self.avector_search_with_score(
query, k=k, filters=kwargs.get("filters", None) query, k=k, filters=filters
) )
return [doc for doc, _ in docs_and_scores] return [doc for doc, _ in docs_and_scores]
@ -470,6 +720,31 @@ class AzureSearch(VectorStore):
return _results_to_documents(results) return _results_to_documents(results)
async def avector_search_with_score(
self,
query: str,
k: int = 4,
filters: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query.
Args:
query (str): Text to look up documents similar to.
k (int, optional): Number of Documents to return. Defaults to 4.
filters (str, optional): Filtering expression. Defaults to None.
Returns:
List[Tuple[Document, float]]: List of Documents most similar
to the query and score for each
"""
embedding = await self._aembed_query(query)
docs, scores, _ = await self._asimple_search(
embedding, "", k, filters=filters, **kwargs
)
return list(zip(docs, scores))
def max_marginal_relevance_search_with_score( def max_marginal_relevance_search_with_score(
self, self,
query: str, query: str,
@ -504,6 +779,47 @@ class AzureSearch(VectorStore):
results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k
) )
async def amax_marginal_relevance_search_with_score(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
*,
filters: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Perform a search and return results that are reordered by MMR.
Args:
query (str): Text to look up documents similar to.
k (int, optional): How many results to give. Defaults to 4.
fetch_k (int, optional): Total results to select k from.
Defaults to 20.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5
filters (str, optional): Filtering expression. Defaults to None.
Returns:
List[Tuple[Document, float]]: List of Documents most similar
to the query and score for each
"""
embedding = await self._aembed_query(query)
docs, scores, vectors = await self._asimple_search(
embedding, "", fetch_k, filters=filters, **kwargs
)
return await self._areorder_results_with_maximal_marginal_relevance(
docs,
scores,
vectors,
query_embedding=np.array(embedding),
lambda_mult=lambda_mult,
k=k,
)
def hybrid_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]: def hybrid_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]:
""" """
Returns the most similar indexed documents to the query text. Returns the most similar indexed documents to the query text.
@ -518,6 +834,22 @@ class AzureSearch(VectorStore):
docs_and_scores = self.hybrid_search_with_score(query, k=k, **kwargs) docs_and_scores = self.hybrid_search_with_score(query, k=k, **kwargs)
return [doc for doc, _ in docs_and_scores] return [doc for doc, _ in docs_and_scores]
async def ahybrid_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""
Returns the most similar indexed documents to the query text.
Args:
query (str): The query text for which to find similar documents.
k (int): The number of documents to return. Default is 4.
Returns:
List[Document]: A list of documents that are most similar to the query text.
"""
docs_and_scores = await self.ahybrid_search_with_score(query, k=k, **kwargs)
return [doc for doc, _ in docs_and_scores]
def hybrid_search_with_score( def hybrid_search_with_score(
self, self,
query: str, query: str,
@ -540,10 +872,38 @@ class AzureSearch(VectorStore):
return _results_to_documents(results) return _results_to_documents(results)
async def ahybrid_search_with_score(
self,
query: str,
k: int = 4,
filters: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query with a hybrid query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
Returns:
List of Documents most similar to the query and score for each
"""
embedding = await self._aembed_query(query)
docs, scores, _ = await self._asimple_search(
embedding, query, k, filters=filters, **kwargs
)
return list(zip(docs, scores))
def hybrid_search_with_relevance_scores( def hybrid_search_with_relevance_scores(
self, query: str, k: int = 4, **kwargs: Any self,
query: str,
k: int = 4,
*,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]: ) -> List[Tuple[Document, float]]:
score_threshold = kwargs.pop("score_threshold", None)
result = self.hybrid_search_with_score(query, k=k, **kwargs) result = self.hybrid_search_with_score(query, k=k, **kwargs)
return ( return (
result result
@ -551,6 +911,21 @@ class AzureSearch(VectorStore):
else [r for r in result if r[1] >= score_threshold] else [r for r in result if r[1] >= score_threshold]
) )
async def ahybrid_search_with_relevance_scores(
self,
query: str,
k: int = 4,
*,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
result = await self.ahybrid_search_with_score(query, k=k, **kwargs)
return (
result
if score_threshold is None
else [r for r in result if r[1] >= score_threshold]
)
def hybrid_max_marginal_relevance_search_with_score( def hybrid_max_marginal_relevance_search_with_score(
self, self,
query: str, query: str,
@ -588,6 +963,48 @@ class AzureSearch(VectorStore):
results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k
) )
async def ahybrid_max_marginal_relevance_search_with_score(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
*,
filters: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query with a hybrid query
and reorder results by MMR.
Args:
query (str): Text to look up documents similar to.
k (int, optional): Number of Documents to return. Defaults to 4.
fetch_k (int, optional): Total results to select k from.
Defaults to 20.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5
filters (str, optional): Filtering expression. Defaults to None.
Returns:
List of Documents most similar to the query and score for each
"""
embedding = await self._aembed_query(query)
docs, scores, vectors = await self._asimple_search(
embedding, query, fetch_k, filters=filters, **kwargs
)
return await self._areorder_results_with_maximal_marginal_relevance(
docs,
scores,
vectors,
query_embedding=np.array(embedding),
lambda_mult=lambda_mult,
k=k,
)
def _simple_search( def _simple_search(
self, self,
embedding: List[float], embedding: List[float],
@ -624,6 +1041,55 @@ class AzureSearch(VectorStore):
**kwargs, **kwargs,
) )
async def _asimple_search(
self,
embedding: List[float],
text_query: str,
k: int,
*,
filters: Optional[str] = None,
**kwargs: Any,
) -> Tuple[List[Document], List[float], List[List[float]]]:
"""Perform vector or hybrid search in the Azure search index.
Args:
embedding: A vector embedding to search in the vector space.
text_query: A full-text search query expression;
Use "*" or omit this parameter to perform only vector search.
k: Number of documents to return.
filters: Filtering expression.
Returns:
Search items
"""
from azure.search.documents.models import VectorizedQuery
async with self._async_client() as async_client:
results = await async_client.search(
search_text=text_query,
vector_queries=[
VectorizedQuery(
vector=np.array(embedding, dtype=np.float32).tolist(),
k_nearest_neighbors=k,
fields=FIELDS_CONTENT_VECTOR,
)
],
filter=filters,
top=k,
**kwargs,
)
docs = [
(
_result_to_document(result),
float(result["@search.score"]),
result[FIELDS_CONTENT_VECTOR],
)
async for result in results
]
if not docs:
raise ValueError(f"No {docs=}")
documents, scores, vectors = map(list, zip(*docs))
return documents, scores, vectors
def semantic_hybrid_search( def semantic_hybrid_search(
self, query: str, k: int = 4, **kwargs: Any self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]: ) -> List[Document]:
@ -643,11 +1109,32 @@ class AzureSearch(VectorStore):
) )
return [doc for doc, _, _ in docs_and_scores] return [doc for doc, _, _ in docs_and_scores]
async def asemantic_hybrid_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""
Returns the most similar indexed documents to the query text.
Args:
query (str): The query text for which to find similar documents.
k (int): The number of documents to return. Default is 4.
filters: Filtering expression.
Returns:
List[Document]: A list of documents that are most similar to the query text.
"""
docs_and_scores = await self.asemantic_hybrid_search_with_score_and_rerank(
query, k=k, **kwargs
)
return [doc for doc, _, _ in docs_and_scores]
def semantic_hybrid_search_with_score( def semantic_hybrid_search_with_score(
self, self,
query: str, query: str,
k: int = 4, k: int = 4,
score_type: Literal["score", "reranker_score"] = "score", score_type: Literal["score", "reranker_score"] = "score",
*,
score_threshold: Optional[float] = None,
**kwargs: Any, **kwargs: Any,
) -> List[Tuple[Document, float]]: ) -> List[Tuple[Document, float]]:
""" """
@ -664,7 +1151,6 @@ class AzureSearch(VectorStore):
List[Tuple[Document, float]]: A list of documents and their List[Tuple[Document, float]]: A list of documents and their
corresponding scores. corresponding scores.
""" """
score_threshold = kwargs.pop("score_threshold", None)
docs_and_scores = self.semantic_hybrid_search_with_score_and_rerank( docs_and_scores = self.semantic_hybrid_search_with_score_and_rerank(
query, k=k, **kwargs query, k=k, **kwargs
) )
@ -681,6 +1167,45 @@ class AzureSearch(VectorStore):
if score_threshold is None or reranker_score >= score_threshold if score_threshold is None or reranker_score >= score_threshold
] ]
async def asemantic_hybrid_search_with_score(
self,
query: str,
k: int = 4,
score_type: Literal["score", "reranker_score"] = "score",
*,
score_threshold: Optional[float] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
Returns the most similar indexed documents to the query text.
Args:
query (str): The query text for which to find similar documents.
k (int): The number of documents to return. Default is 4.
score_type: Must either be "score" or "reranker_score".
Defaulted to "score".
filters: Filtering expression.
Returns:
List[Tuple[Document, float]]: A list of documents and their
corresponding scores.
"""
docs_and_scores = await self.asemantic_hybrid_search_with_score_and_rerank(
query, k=k, **kwargs
)
if score_type == "score":
return [
(doc, score)
for doc, score, _ in docs_and_scores
if score_threshold is None or score >= score_threshold
]
elif score_type == "reranker_score":
return [
(doc, reranker_score)
for doc, _, reranker_score in docs_and_scores
if score_threshold is None or reranker_score >= score_threshold
]
def semantic_hybrid_search_with_score_and_rerank( def semantic_hybrid_search_with_score_and_rerank(
self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any
) -> List[Tuple[Document, float, float]]: ) -> List[Tuple[Document, float, float]]:
@ -759,6 +1284,88 @@ class AzureSearch(VectorStore):
] ]
return docs return docs
async def asemantic_hybrid_search_with_score_and_rerank(
self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any
) -> List[Tuple[Document, float, float]]:
"""Return docs most similar to query with a hybrid query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filters: Filtering expression.
Returns:
List of Documents most similar to the query and score for each
"""
from azure.search.documents.models import VectorizedQuery
vector = await self._aembed_query(query)
async with self._async_client() as async_client:
results = await async_client.search(
search_text=query,
vector_queries=[
VectorizedQuery(
vector=np.array(vector, dtype=np.float32).tolist(),
k_nearest_neighbors=k,
fields=FIELDS_CONTENT_VECTOR,
)
],
filter=filters,
query_type="semantic",
semantic_configuration_name=self.semantic_configuration_name,
query_caption="extractive",
query_answer="extractive",
top=k,
**kwargs,
)
# Get Semantic Answers
semantic_answers = (await results.get_answers()) or []
semantic_answers_dict: Dict = {}
for semantic_answer in semantic_answers:
semantic_answers_dict[semantic_answer.key] = {
"text": semantic_answer.text,
"highlights": semantic_answer.highlights,
}
# Convert results to Document objects
docs = [
(
Document(
page_content=result.pop(FIELDS_CONTENT),
metadata={
**(
json.loads(result[FIELDS_METADATA])
if FIELDS_METADATA in result
else {
k: v
for k, v in result.items()
if k != FIELDS_CONTENT_VECTOR
}
),
**{
"captions": {
"text": result.get("@search.captions", [{}])[
0
].text,
"highlights": result.get("@search.captions", [{}])[
0
].highlights,
}
if result.get("@search.captions")
else {},
"answers": semantic_answers_dict.get(
result.get(FIELDS_ID, ""),
"",
),
},
},
),
float(result["@search.score"]),
float(result["@search.reranker_score"]),
)
async for result in results
]
return docs
@classmethod @classmethod
def from_texts( def from_texts(
cls: Type[AzureSearch], cls: Type[AzureSearch],
@ -783,6 +1390,30 @@ class AzureSearch(VectorStore):
azure_search.add_texts(texts, metadatas, **kwargs) azure_search.add_texts(texts, metadatas, **kwargs)
return azure_search return azure_search
@classmethod
async def afrom_texts(
cls: Type[AzureSearch],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
azure_search_endpoint: str = "",
azure_search_key: str = "",
index_name: str = "langchain-index",
fields: Optional[List[SearchField]] = None,
**kwargs: Any,
) -> AzureSearch:
# Creating a new Azure Search instance
azure_search = cls(
azure_search_endpoint,
azure_search_key,
index_name,
embedding,
fields=fields,
**kwargs,
)
await azure_search.aadd_texts(texts, metadatas, **kwargs)
return azure_search
@classmethod @classmethod
async def afrom_embeddings( async def afrom_embeddings(
cls: Type[AzureSearch], cls: Type[AzureSearch],
@ -796,16 +1427,22 @@ class AzureSearch(VectorStore):
fields: Optional[List[SearchField]] = None, fields: Optional[List[SearchField]] = None,
**kwargs: Any, **kwargs: Any,
) -> AzureSearch: ) -> AzureSearch:
return cls.from_embeddings( text_embeddings, first_text_embedding = _peek(text_embeddings)
text_embeddings, if first_text_embedding is None:
embedding, raise ValueError("Cannot create AzureSearch from empty embeddings.")
metadatas=metadatas, vector_search_dimensions = len(first_text_embedding[1])
azure_search = cls(
azure_search_endpoint=azure_search_endpoint, azure_search_endpoint=azure_search_endpoint,
azure_search_key=azure_search_key, azure_search_key=azure_search_key,
index_name=index_name, index_name=index_name,
embedding_function=embedding,
fields=fields, fields=fields,
vector_search_dimensions=vector_search_dimensions,
**kwargs, **kwargs,
) )
await azure_search.aadd_embeddings(text_embeddings, metadatas, **kwargs)
return azure_search
@classmethod @classmethod
def from_embeddings( def from_embeddings(
@ -838,6 +1475,30 @@ class AzureSearch(VectorStore):
azure_search.add_embeddings(text_embeddings, metadatas, **kwargs) azure_search.add_embeddings(text_embeddings, metadatas, **kwargs)
return azure_search return azure_search
async def _areorder_results_with_maximal_marginal_relevance(
self,
documents: List[Document],
scores: List[float],
vectors: List[List[float]],
query_embedding: np.ndarray,
lambda_mult: float = 0.5,
k: int = 4,
) -> List[Tuple[Document, float]]:
# Get the new order of results.
new_ordering = maximal_marginal_relevance(
query_embedding, vectors, k=k, lambda_mult=lambda_mult
)
# Reorder the values and return.
ret: List[Tuple[Document, float]] = []
for x in new_ordering:
# Function can return -1 index
if x == -1:
break
ret.append((documents[x], scores[x])) # type: ignore
return ret
def as_retriever(self, **kwargs: Any) -> AzureSearchVectorStoreRetriever: # type: ignore def as_retriever(self, **kwargs: Any) -> AzureSearchVectorStoreRetriever: # type: ignore
"""Return AzureSearchVectorStoreRetriever initialized from this VectorStore. """Return AzureSearchVectorStoreRetriever initialized from this VectorStore.
@ -949,6 +1610,48 @@ class AzureSearchVectorStoreRetriever(BaseRetriever):
raise ValueError(f"search_type of {self.search_type} not allowed.") raise ValueError(f"search_type of {self.search_type} not allowed.")
return docs return docs
async def _aget_relevant_documents(
self,
query: str,
*,
run_manager: AsyncCallbackManagerForRetrieverRun,
**kwargs: Any,
) -> List[Document]:
params = {**self.search_kwargs, **kwargs}
if self.search_type == "similarity":
docs = await self.vectorstore.avector_search(query, k=self.k, **params)
elif self.search_type == "similarity_score_threshold":
docs_and_scores = (
await self.vectorstore.asimilarity_search_with_relevance_scores(
query, k=self.k, **params
)
)
docs = [doc for doc, _ in docs_and_scores]
elif self.search_type == "hybrid":
docs = await self.vectorstore.ahybrid_search(query, k=self.k, **params)
elif self.search_type == "hybrid_score_threshold":
docs_and_scores = (
await self.vectorstore.ahybrid_search_with_relevance_scores(
query, k=self.k, **params
)
)
docs = [doc for doc, _ in docs_and_scores]
elif self.search_type == "semantic_hybrid":
docs = await self.vectorstore.asemantic_hybrid_search(
query, k=self.k, **params
)
elif self.search_type == "semantic_hybrid_score_threshold":
docs = [
doc
for doc, _ in await self.vectorstore.asemantic_hybrid_search_with_score(
query, k=self.k, **params
)
]
else:
raise ValueError(f"search_type of {self.search_type} not allowed.")
return docs
def _results_to_documents( def _results_to_documents(
results: SearchItemPaged[Dict], results: SearchItemPaged[Dict],

Loading…
Cancel
Save