@ -4,10 +4,13 @@ from __future__ import annotations
import uuid
from typing import Any , Dict , Iterable , List , Optional , Tuple
from langchain . docstore . document import Document
import numpy as np
from langchain . embeddings . base import Embeddings
from langchain . schema import Document
from langchain . utils import get_from_dict_or_env
from langchain . vectorstores . base import VectorStore
from langchain . vectorstores . utils import maximal_marginal_relevance
IMPORT_OPENSEARCH_PY_ERROR = (
" Could not import OpenSearch. Please install it with `pip install opensearch-py`. "
@ -76,9 +79,12 @@ def _bulk_ingest_embeddings(
metadatas : Optional [ List [ dict ] ] = None ,
vector_field : str = " vector_field " ,
text_field : str = " text " ,
mapping : Dict = { } ,
mapping : Optional[ Dict ] = None ,
) - > List [ str ] :
""" Bulk Ingest Embeddings into given index. """
if not mapping :
mapping = dict ( )
bulk = _import_bulk ( )
not_found_error = _import_not_found_error ( )
requests = [ ]
@ -201,10 +207,14 @@ def _approximate_search_query_with_lucene_filter(
def _default_script_query (
query_vector : List [ float ] ,
space_type : str = " l2 " ,
pre_filter : Dict = MATCH_ALL_QUERY ,
pre_filter : Optional[ Dict ] = None ,
vector_field : str = " vector_field " ,
) - > Dict :
""" For Script Scoring Search, this is the default query. """
if not pre_filter :
pre_filter = MATCH_ALL_QUERY
return {
" query " : {
" script_score " : {
@ -245,10 +255,14 @@ def __get_painless_scripting_source(
def _default_painless_scripting_query (
query_vector : List [ float ] ,
space_type : str = " l2Squared " ,
pre_filter : Dict = MATCH_ALL_QUERY ,
pre_filter : Optional[ Dict ] = None ,
vector_field : str = " vector_field " ,
) - > Dict :
""" For Painless Scripting Search, this is the default query. """
if not pre_filter :
pre_filter = MATCH_ALL_QUERY
source = __get_painless_scripting_source ( space_type , query_vector )
return {
" query " : {
@ -355,7 +369,7 @@ class OpenSearchVectorSearch(VectorStore):
) - > List [ Document ] :
""" Return docs most similar to query.
By default supports Approximate Search .
By default , supports Approximate Search .
Also supports Script Scoring and Painless Scripting .
Args :
@ -413,7 +427,7 @@ class OpenSearchVectorSearch(VectorStore):
) - > List [ Tuple [ Document , float ] ] :
""" Return docs and it ' s scores most similar to query.
By default supports Approximate Search .
By default , supports Approximate Search .
Also supports Script Scoring and Painless Scripting .
Args :
@ -426,10 +440,47 @@ class OpenSearchVectorSearch(VectorStore):
Optional Args :
same as ` similarity_search `
"""
embedding = self . embedding_function . embed_query ( query )
search_type = _get_kwargs_value ( kwargs , " search_type " , " approximate_search " )
text_field = _get_kwargs_value ( kwargs , " text_field " , " text " )
metadata_field = _get_kwargs_value ( kwargs , " metadata_field " , " metadata " )
hits = self . _raw_similarity_search_with_score ( query = query , k = k , * * kwargs )
documents_with_scores = [
(
Document (
page_content = hit [ " _source " ] [ text_field ] ,
metadata = hit [ " _source " ]
if metadata_field == " * " or metadata_field not in hit [ " _source " ]
else hit [ " _source " ] [ metadata_field ] ,
) ,
hit [ " _score " ] ,
)
for hit in hits
]
return documents_with_scores
def _raw_similarity_search_with_score (
self , query : str , k : int = 4 , * * kwargs : Any
) - > List [ dict ] :
""" Return raw opensearch documents (dict) including vectors,
scores most similar to query .
By default , supports Approximate Search .
Also supports Script Scoring and Painless Scripting .
Args :
query : Text to look up documents similar to .
k : Number of Documents to return . Defaults to 4.
Returns :
List of dict with its scores most similar to the query .
Optional Args :
same as ` similarity_search `
"""
embedding = self . embedding_function . embed_query ( query )
search_type = _get_kwargs_value ( kwargs , " search_type " , " approximate_search " )
vector_field = _get_kwargs_value ( kwargs , " vector_field " , " vector_field " )
if search_type == " approximate_search " :
@ -473,20 +524,59 @@ class OpenSearchVectorSearch(VectorStore):
raise ValueError ( " Invalid `search_type` provided as an argument " )
response = self . client . search ( index = self . index_name , body = search_query )
hits = [ hit for hit in response [ " hits " ] [ " hits " ] [ : k ] ]
documents_with_scores = [
(
Document (
page_content = hit [ " _source " ] [ text_field ] ,
metadata = hit [ " _source " ]
if metadata_field == " * " or metadata_field not in hit [ " _source " ]
else hit [ " _source " ] [ metadata_field ] ,
) ,
hit [ " _score " ] ,
return [ hit for hit in response [ " hits " ] [ " hits " ] [ : k ] ]
def max_marginal_relevance_search (
self ,
query : str ,
k : int = 4 ,
fetch_k : int = 20 ,
lambda_mult : float = 0.5 ,
* * kwargs : Any ,
) - > list [ Document ] :
""" Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents .
Args :
query : Text to look up documents similar to .
k : Number of Documents to return . Defaults to 4.
fetch_k : Number of Documents to fetch to pass to MMR algorithm .
Defaults to 20.
lambda_mult : Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity .
Defaults to 0.5 .
Returns :
List of Documents selected by maximal marginal relevance .
"""
vector_field = _get_kwargs_value ( kwargs , " vector_field " , " vector_field " )
text_field = _get_kwargs_value ( kwargs , " text_field " , " text " )
metadata_field = _get_kwargs_value ( kwargs , " metadata_field " , " metadata " )
# Get embedding of the user query
embedding = self . embedding_function . embed_query ( query )
# Do ANN/KNN search to get top fetch_k results where fetch_k >= k
results = self . _raw_similarity_search_with_score ( query , fetch_k , * * kwargs )
embeddings = [ result [ " _source " ] [ vector_field ] for result in results ]
# Rerank top k results using MMR, (mmr_selected is a list of indices)
mmr_selected = maximal_marginal_relevance (
np . array ( embedding ) , embeddings , k = k , lambda_mult = lambda_mult
)
return [
Document (
page_content = results [ i ] [ " _source " ] [ text_field ] ,
metadata = results [ i ] [ " _source " ] [ metadata_field ] ,
)
for hit in hits
for i in mmr_selected
]
return documents_with_scores
@classmethod
def from_texts (