diff --git a/docs/docs/modules/data_connection/indexing.ipynb b/docs/docs/modules/data_connection/indexing.ipynb index 1260c1b8d0..54735350b8 100644 --- a/docs/docs/modules/data_connection/indexing.ipynb +++ b/docs/docs/modules/data_connection/indexing.ipynb @@ -60,7 +60,8 @@ " * document addition by id (`add_documents` method with `ids` argument)\n", " * delete by id (`delete` method with `ids` argument)\n", "\n", - "Compatible Vectorstores: `AnalyticDB`, `AstraDB`, `AwaDB`, `Bagel`, `Cassandra`, `Chroma`, `CouchbaseVectorStore`, `DashVector`, `DatabricksVectorSearch`, `DeepLake`, `Dingo`, `ElasticVectorSearch`, `ElasticsearchStore`, `FAISS`, `HanaDB`, `Milvus`, `MyScale`, `OpenSearchVectorSearch`, `PGVector`, `Pinecone`, `Qdrant`, `Redis`, `Rockset`, `ScaNN`, `SupabaseVectorStore`, `SurrealDBStore`, `TimescaleVector`, `Vald`, `VDMS`, `Vearch`, `VespaStore`, `Weaviate`, `ZepVectorStore`.\n", + + "Compatible Vectorstores: `AnalyticDB`, `AstraDB`, `AwaDB`, `Bagel`, `Cassandra`, `Chroma`, `CouchbaseVectorStore`, `DashVector`, `DatabricksVectorSearch`, `DeepLake`, `Dingo`, `ElasticVectorSearch`, `ElasticsearchStore`, `FAISS`, `HanaDB`, `Milvus`, `MyScale`, `OpenSearchVectorSearch`, `PGVector`, `Pinecone`, `Qdrant`, `Redis`, `Rockset`, `ScaNN`, `SupabaseVectorStore`, `SurrealDBStore`, `TimescaleVector`, `Vald`, `VDMS`, `Vearch`, `VespaStore`, `Weaviate`, `ZepVectorStore`, `OpenSearchVectorSearch`.\n", " \n", "## Caution\n", "\n", diff --git a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py index 7ac53600d0..23d51c2a7a 100644 --- a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py +++ b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py @@ -15,6 +15,10 @@ from langchain_community.vectorstores.utils import maximal_marginal_relevance IMPORT_OPENSEARCH_PY_ERROR = ( "Could not import OpenSearch. Please install it with `pip install opensearch-py`." ) +IMPORT_ASYNC_OPENSEARCH_PY_ERROR = """ +Could not import AsyncOpenSearch. +Please install it with `pip install opensearch-py`.""" + SCRIPT_SCORING_SEARCH = "script_scoring" PAINLESS_SCRIPTING_SEARCH = "painless_scripting" MATCH_ALL_QUERY = {"match_all": {}} # type: Dict @@ -29,6 +33,15 @@ def _import_opensearch() -> Any: return OpenSearch +def _import_async_opensearch() -> Any: + """Import AsyncOpenSearch if available, otherwise raise error.""" + try: + from opensearchpy import AsyncOpenSearch + except ImportError: + raise ImportError(IMPORT_ASYNC_OPENSEARCH_PY_ERROR) + return AsyncOpenSearch + + def _import_bulk() -> Any: """Import bulk if available, otherwise raise error.""" try: @@ -38,6 +51,15 @@ def _import_bulk() -> Any: return bulk +def _import_async_bulk() -> Any: + """Import async_bulk if available, otherwise raise error.""" + try: + from opensearchpy.helpers import async_bulk + except ImportError: + raise ImportError(IMPORT_ASYNC_OPENSEARCH_PY_ERROR) + return async_bulk + + def _import_not_found_error() -> Any: """Import not found error if available, otherwise raise error.""" try: @@ -60,6 +82,19 @@ def _get_opensearch_client(opensearch_url: str, **kwargs: Any) -> Any: return client +def _get_async_opensearch_client(opensearch_url: str, **kwargs: Any) -> Any: + """Get AsyncOpenSearch client from the opensearch_url, otherwise raise error.""" + try: + async_opensearch = _import_async_opensearch() + client = async_opensearch(opensearch_url, **kwargs) + except ValueError as e: + raise ImportError( + f"AsyncOpenSearch client string provided is not in proper format. " + f"Got error: {e} " + ) + return client + + def _validate_embeddings_and_bulk_size(embeddings_length: int, bulk_size: int) -> None: """Validate Embeddings Length and Bulk Size.""" if embeddings_length == 0: @@ -141,6 +176,57 @@ def _bulk_ingest_embeddings( return return_ids +async def _abulk_ingest_embeddings( + client: Any, + index_name: str, + embeddings: List[List[float]], + texts: Iterable[str], + metadatas: Optional[List[dict]] = None, + ids: Optional[List[str]] = None, + vector_field: str = "vector_field", + text_field: str = "text", + mapping: Optional[Dict] = None, + max_chunk_bytes: Optional[int] = 1 * 1024 * 1024, + is_aoss: bool = False, +) -> List[str]: + """Bulk Ingest Embeddings into given index asynchronously using AsyncOpenSearch.""" + if not mapping: + mapping = dict() + + async_bulk = _import_async_bulk() + not_found_error = _import_not_found_error() + requests = [] + return_ids = [] + + try: + await client.indices.get(index=index_name) + except not_found_error: + await client.indices.create(index=index_name, body=mapping) + + for i, text in enumerate(texts): + metadata = metadatas[i] if metadatas else {} + _id = ids[i] if ids else str(uuid.uuid4()) + request = { + "_op_type": "index", + "_index": index_name, + vector_field: embeddings[i], + text_field: text, + "metadata": metadata, + } + if is_aoss: + request["id"] = _id + else: + request["_id"] = _id + requests.append(request) + return_ids.append(_id) + + await async_bulk(client, requests, max_chunk_bytes=max_chunk_bytes) + if not is_aoss: + await client.indices.refresh(index=index_name) + + return return_ids + + def _default_scripting_text_mapping( dim: int, vector_field: str = "vector_field", @@ -334,6 +420,7 @@ class OpenSearchVectorSearch(VectorStore): http_auth = kwargs.get("http_auth") self.is_aoss = _is_aoss_enabled(http_auth=http_auth) self.client = _get_opensearch_client(opensearch_url, **kwargs) + self.async_client = _get_async_opensearch_client(opensearch_url, **kwargs) self.engine = kwargs.get("engine") @property @@ -381,6 +468,47 @@ class OpenSearchVectorSearch(VectorStore): is_aoss=self.is_aoss, ) + async def __aadd( + self, + texts: Iterable[str], + embeddings: List[List[float]], + metadatas: Optional[List[dict]] = None, + ids: Optional[List[str]] = None, + bulk_size: int = 500, + **kwargs: Any, + ) -> List[str]: + _validate_embeddings_and_bulk_size(len(embeddings), bulk_size) + index_name = kwargs.get("index_name", self.index_name) + text_field = kwargs.get("text_field", "text") + dim = len(embeddings[0]) + engine = kwargs.get("engine", "nmslib") + space_type = kwargs.get("space_type", "l2") + ef_search = kwargs.get("ef_search", 512) + ef_construction = kwargs.get("ef_construction", 512) + m = kwargs.get("m", 16) + vector_field = kwargs.get("vector_field", "vector_field") + max_chunk_bytes = kwargs.get("max_chunk_bytes", 1 * 1024 * 1024) + + _validate_aoss_with_engines(self.is_aoss, engine) + + mapping = _default_text_mapping( + dim, engine, space_type, ef_search, ef_construction, m, vector_field + ) + + return await _abulk_ingest_embeddings( + self.async_client, + index_name, + embeddings, + texts, + metadatas=metadatas, + ids=ids, + vector_field=vector_field, + text_field=text_field, + mapping=mapping, + max_chunk_bytes=max_chunk_bytes, + is_aoss=self.is_aoss, + ) + def add_texts( self, texts: Iterable[str], @@ -417,6 +545,28 @@ class OpenSearchVectorSearch(VectorStore): **kwargs, ) + async def aadd_texts( + self, + texts: Iterable[str], + metadatas: Optional[List[dict]] = None, + ids: Optional[List[str]] = None, + bulk_size: int = 500, + **kwargs: Any, + ) -> List[str]: + """ + Asynchronously run more texts through the embeddings + and add to the vectorstore. + """ + embeddings = await self.embedding_function.aembed_documents(list(texts)) + return await self.__aadd( + texts, + embeddings, + metadatas=metadatas, + ids=ids, + bulk_size=bulk_size, + **kwargs, + ) + def add_embeddings( self, text_embeddings: Iterable[Tuple[str, List[float]]], @@ -486,6 +636,28 @@ class OpenSearchVectorSearch(VectorStore): else: return False + async def adelete( + self, ids: Optional[List[str]] = None, **kwargs: Any + ) -> Optional[bool]: + """Asynchronously delete by vector ID or other criteria. + + Args: + ids: List of ids to delete. + **kwargs: Other keyword arguments that subclasses might use. + + Returns: + Optional[bool]: True if deletion is successful, + False otherwise, None if not implemented. + """ + if ids is None: + raise ValueError("No ids provided to delete.") + + actions = [{"delete": {"_index": self.index_name, "_id": id_}} for id_ in ids] + response = await self.async_client.bulk(body=actions, **kwargs) + return not any( + item.get("delete", {}).get("error") for item in response["items"] + ) + def similarity_search( self, query: str, k: int = 4, **kwargs: Any ) -> List[Document]: diff --git a/libs/community/tests/unit_tests/vectorstores/test_indexing_docs.py b/libs/community/tests/unit_tests/vectorstores/test_indexing_docs.py index 36b67ae074..33fbf42bc1 100644 --- a/libs/community/tests/unit_tests/vectorstores/test_indexing_docs.py +++ b/libs/community/tests/unit_tests/vectorstores/test_indexing_docs.py @@ -91,5 +91,6 @@ def test_compatible_vectorstore_documentation() -> None: "ZepVectorStore", "Zilliz", "Lantern", + "OpenSearchVectorSearch", } assert compatible == documented