community[minor]: Implement Async OpenSearch `afrom_texts` & `afrom_embeddings` (#20009)

- **Description:** Adds async variants of afrom_texts and
afrom_embeddings into `OpenSearchVectorSearch`, which allows for
`afrom_documents` to be called.
- **Issue:** I implemented this because my use case involves an async
scraper generating documents as and when they're ready to be ingested by
Embedding/OpenSearch
- **Dependencies:** None that I'm aware

Co-authored-by: Ben Mitchell <b.mitchell@reply.com>
pull/20011/head
Ben Mitchell 3 months ago committed by GitHub
parent 02152d3909
commit b52b78478f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1022,6 +1022,71 @@ class OpenSearchVectorSearch(VectorStore):
**kwargs,
)
@classmethod
async def afrom_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
"""Asynchronously construct OpenSearchVectorSearch wrapper from raw texts.
Example:
.. code-block:: python
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
opensearch_vector_search = await OpenSearchVectorSearch.afrom_texts(
texts,
embeddings,
opensearch_url="http://localhost:9200"
)
OpenSearch by default supports Approximate Search powered by nmslib, faiss
and lucene engines recommended for large datasets. Also supports brute force
search through Script Scoring and Painless Scripting.
Optional Args:
vector_field: Document field embeddings are stored in. Defaults to
"vector_field".
text_field: Document field the text of the document is stored in. Defaults
to "text".
Optional Keyword Args for Approximate Search:
engine: "nmslib", "faiss", "lucene"; default: "nmslib"
space_type: "l2", "l1", "cosinesimil", "linf", "innerproduct"; default: "l2"
ef_search: Size of the dynamic list used during k-NN searches. Higher values
lead to more accurate but slower searches; default: 512
ef_construction: Size of the dynamic list used during k-NN graph creation.
Higher values lead to more accurate graph but slower indexing speed;
default: 512
m: Number of bidirectional links created for each new element. Large impact
on memory consumption. Between 2 and 100; default: 16
Keyword Args for Script Scoring or Painless Scripting:
is_appx_search: False
"""
embeddings = await embedding.aembed_documents(texts)
return await cls.afrom_embeddings(
embeddings,
texts,
embedding,
metadatas=metadatas,
bulk_size=bulk_size,
ids=ids,
**kwargs,
)
@classmethod
def from_embeddings(
cls,
@ -1151,3 +1216,135 @@ class OpenSearchVectorSearch(VectorStore):
)
kwargs["engine"] = engine
return cls(opensearch_url, index_name, embedding, **kwargs)
@classmethod
async def afrom_embeddings(
cls,
embeddings: List[List[float]],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> OpenSearchVectorSearch:
"""Asynchronously construct OpenSearchVectorSearch wrapper from pre-vectorized
embeddings.
Example:
.. code-block:: python
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_community.embeddings import OpenAIEmbeddings
embedder = OpenAIEmbeddings()
embeddings = await embedder.aembed_documents(["foo", "bar"])
opensearch_vector_search =
await OpenSearchVectorSearch.afrom_embeddings(
embeddings,
texts,
embedder,
opensearch_url="http://localhost:9200"
)
OpenSearch by default supports Approximate Search powered by nmslib, faiss
and lucene engines recommended for large datasets. Also supports brute force
search through Script Scoring and Painless Scripting.
Optional Args:
vector_field: Document field embeddings are stored in. Defaults to
"vector_field".
text_field: Document field the text of the document is stored in. Defaults
to "text".
Optional Keyword Args for Approximate Search:
engine: "nmslib", "faiss", "lucene"; default: "nmslib"
space_type: "l2", "l1", "cosinesimil", "linf", "innerproduct"; default: "l2"
ef_search: Size of the dynamic list used during k-NN searches. Higher values
lead to more accurate but slower searches; default: 512
ef_construction: Size of the dynamic list used during k-NN graph creation.
Higher values lead to more accurate graph but slower indexing speed;
default: 512
m: Number of bidirectional links created for each new element. Large impact
on memory consumption. Between 2 and 100; default: 16
Keyword Args for Script Scoring or Painless Scripting:
is_appx_search: False
"""
opensearch_url = get_from_dict_or_env(
kwargs, "opensearch_url", "OPENSEARCH_URL"
)
# List of arguments that needs to be removed from kwargs
# before passing kwargs to get opensearch client
keys_list = [
"opensearch_url",
"index_name",
"is_appx_search",
"vector_field",
"text_field",
"engine",
"space_type",
"ef_search",
"ef_construction",
"m",
"max_chunk_bytes",
"is_aoss",
]
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
dim = len(embeddings[0])
# Get the index name from either from kwargs or ENV Variable
# before falling back to random generation
index_name = get_from_dict_or_env(
kwargs, "index_name", "OPENSEARCH_INDEX_NAME", default=uuid.uuid4().hex
)
is_appx_search = kwargs.get("is_appx_search", True)
vector_field = kwargs.get("vector_field", "vector_field")
text_field = kwargs.get("text_field", "text")
max_chunk_bytes = kwargs.get("max_chunk_bytes", 1 * 1024 * 1024)
http_auth = kwargs.get("http_auth")
is_aoss = _is_aoss_enabled(http_auth=http_auth)
engine = None
if is_aoss and not is_appx_search:
raise ValueError(
"Amazon OpenSearch Service Serverless only "
"supports `approximate_search`"
)
if is_appx_search:
engine = kwargs.get("engine", "nmslib")
space_type = kwargs.get("space_type", "l2")
ef_search = kwargs.get("ef_search", 512)
ef_construction = kwargs.get("ef_construction", 512)
m = kwargs.get("m", 16)
_validate_aoss_with_engines(is_aoss, engine)
mapping = _default_text_mapping(
dim, engine, space_type, ef_search, ef_construction, m, vector_field
)
else:
mapping = _default_scripting_text_mapping(dim)
[kwargs.pop(key, None) for key in keys_list]
client = _get_async_opensearch_client(opensearch_url, **kwargs)
await _abulk_ingest_embeddings(
client,
index_name,
embeddings,
texts,
ids=ids,
metadatas=metadatas,
vector_field=vector_field,
text_field=text_field,
mapping=mapping,
max_chunk_bytes=max_chunk_bytes,
is_aoss=is_aoss,
)
kwargs["engine"] = engine
return cls(opensearch_url, index_name, embedding, **kwargs)

Loading…
Cancel
Save