From 175ef0a55d2fc04f6fd02089d40a35a2216237fa Mon Sep 17 00:00:00 2001 From: Joseph McElroy Date: Tue, 26 Sep 2023 20:53:50 +0100 Subject: [PATCH] [ElasticsearchStore] Enable custom Bulk Args (#11065) This enables bulk args like `chunk_size` to be passed down from the ingest methods (from_text, from_documents) to be passed down to the bulk API. This helps alleviate issues where bulk importing a large amount of documents into Elasticsearch was resulting in a timeout. Contribution Shoutout - @elastic - [x] Updated Integration tests --------- Co-authored-by: Bagatur --- .../integrations/providers/elasticsearch.mdx | 2 +- .../vectorstores/elasticsearch.ipynb | 33 +++++- .../langchain/vectorstores/elasticsearch.py | 39 +++++-- .../docker-compose/elasticsearch.yml | 1 + .../vectorstores/test_elasticsearch.py | 102 ++++++++++++++++-- 5 files changed, 161 insertions(+), 16 deletions(-) diff --git a/docs/extras/integrations/providers/elasticsearch.mdx b/docs/extras/integrations/providers/elasticsearch.mdx index 93e93055db..e2e96779b5 100644 --- a/docs/extras/integrations/providers/elasticsearch.mdx +++ b/docs/extras/integrations/providers/elasticsearch.mdx @@ -18,7 +18,7 @@ Example: Run a single-node Elasticsearch instance with security disabled. This i #### Deploy Elasticsearch on Elastic Cloud -Elastic Cloud is a managed Elasticsearch service. Signup for a [free trial](https://cloud.elastic.co/registration?storm=langchain-notebook). +Elastic Cloud is a managed Elasticsearch service. Signup for a [free trial](https://cloud.elastic.co/registration?utm_source=langchain&utm_content=documentation). ### Install Client diff --git a/docs/extras/integrations/vectorstores/elasticsearch.ipynb b/docs/extras/integrations/vectorstores/elasticsearch.ipynb index 173c2436fa..acc338c320 100644 --- a/docs/extras/integrations/vectorstores/elasticsearch.ipynb +++ b/docs/extras/integrations/vectorstores/elasticsearch.ipynb @@ -44,7 +44,7 @@ "source": [ "There are two main ways to setup an Elasticsearch instance for use with:\n", "\n", - "1. Elastic Cloud: Elastic Cloud is a managed Elasticsearch service. Signup for a [free trial](https://cloud.elastic.co/registration?storm=langchain-notebook).\n", + "1. Elastic Cloud: Elastic Cloud is a managed Elasticsearch service. Signup for a [free trial](https://cloud.elastic.co/registration?utm_source=langchain&utm_content=documentation).\n", "\n", "To connect to an Elasticsearch instance that does not require\n", "login credentials (starting the docker instance with security enabled), pass the Elasticsearch URL and index name along with the\n", @@ -662,7 +662,7 @@ "id": "0960fa0a", "metadata": {}, "source": [ - "# Customise the Query\n", + "## Customise the Query\n", "With `custom_query` parameter at search, you are able to adjust the query that is used to retrieve documents from Elasticsearch. This is useful if you want to want to use a more complex query, to support linear boosting of fields." ] }, @@ -720,6 +720,35 @@ "print(results[0])" ] }, + { + "cell_type": "markdown", + "id": "3242fd42", + "metadata": {}, + "source": [ + "# FAQ\n", + "\n", + "## Question: Im getting timeout errors when indexing documents into Elasticsearch. How do I fix this?\n", + "One possible issue is your documents might take longer to index into Elasticsearch. ElasticsearchStore uses the Elasticsearch bulk API which has a few defaults that you can adjust to reduce the chance of timeout errors.\n", + "\n", + "This is also a good idea when you're using SparseVectorRetrievalStrategy.\n", + "\n", + "The defaults are:\n", + "- `chunk_size`: 500\n", + "- `max_chunk_bytes`: 100MB\n", + "\n", + "To adjust these, you can pass in the `chunk_size` and `max_chunk_bytes` parameters to the ElasticsearchStore `add_texts` method.\n", + "\n", + "```python\n", + " vector_store.add_texts(\n", + " texts,\n", + " bulk_kwargs={\n", + " \"chunk_size\": 50,\n", + " \"max_chunk_bytes\": 200000000\n", + " }\n", + " )\n", + "```" + ] + }, { "cell_type": "markdown", "id": "604c66ea", diff --git a/libs/langchain/langchain/vectorstores/elasticsearch.py b/libs/langchain/langchain/vectorstores/elasticsearch.py index 0066d5071d..f76db9ccab 100644 --- a/libs/langchain/langchain/vectorstores/elasticsearch.py +++ b/libs/langchain/langchain/vectorstores/elasticsearch.py @@ -506,7 +506,9 @@ class ElasticsearchStore(VectorStore): self.strategy = strategy if es_connection is not None: - self.client = es_connection + self.client = es_connection.options( + headers={"user-agent": self.get_user_agent()} + ) elif es_url is not None or es_cloud_id is not None: self.client = ElasticsearchStore.connect_to_elasticsearch( es_url=es_url, @@ -521,6 +523,12 @@ class ElasticsearchStore(VectorStore): or valid credentials for creating a new connection.""" ) + @staticmethod + def get_user_agent() -> str: + from langchain import __version__ + + return f"langchain-py-vs/{__version__}" + @staticmethod def connect_to_elasticsearch( *, @@ -557,7 +565,10 @@ class ElasticsearchStore(VectorStore): elif username and password: connection_params["basic_auth"] = (username, password) - es_client = elasticsearch.Elasticsearch(**connection_params) + es_client = elasticsearch.Elasticsearch( + **connection_params, + headers={"user-agent": ElasticsearchStore.get_user_agent()}, + ) try: es_client.info() except Exception as e: @@ -791,6 +802,7 @@ class ElasticsearchStore(VectorStore): ids: Optional[List[str]] = None, refresh_indices: bool = True, create_index_if_not_exists: bool = True, + bulk_kwargs: Optional[Dict] = None, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. @@ -803,6 +815,9 @@ class ElasticsearchStore(VectorStore): after adding the texts. create_index_if_not_exists: Whether to create the Elasticsearch index if it doesn't already exist. + *bulk_kwargs: Additional arguments to pass to Elasticsearch bulk. + - chunk_size: Optional. Number of texts to add to the + index at a time. Defaults to 500. Returns: List of ids from adding the texts into the vectorstore. @@ -814,7 +829,7 @@ class ElasticsearchStore(VectorStore): "Could not import elasticsearch python package. " "Please install it with `pip install elasticsearch`." ) - + bulk_kwargs = bulk_kwargs or {} embeddings = [] ids = ids or [str(uuid.uuid4()) for _ in texts] requests = [] @@ -866,7 +881,11 @@ class ElasticsearchStore(VectorStore): if len(requests) > 0: try: success, failed = bulk( - self.client, requests, stats_only=True, refresh=refresh_indices + self.client, + requests, + stats_only=True, + refresh=refresh_indices, + **bulk_kwargs, ) logger.debug( f"Added {success} and failed to add {failed} texts to index" @@ -890,6 +909,7 @@ class ElasticsearchStore(VectorStore): texts: List[str], embedding: Optional[Embeddings] = None, metadatas: Optional[List[Dict[str, Any]]] = None, + bulk_kwargs: Optional[Dict] = None, **kwargs: Any, ) -> "ElasticsearchStore": """Construct ElasticsearchStore wrapper from raw documents. @@ -927,6 +947,8 @@ class ElasticsearchStore(VectorStore): strategy to use. Defaults to "COSINE". can be one of "COSINE", "EUCLIDEAN_DISTANCE", "DOT_PRODUCT". + bulk_kwargs: Optional. Additional arguments to pass to + Elasticsearch bulk. """ elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs( @@ -934,7 +956,9 @@ class ElasticsearchStore(VectorStore): ) # Encode the provided texts and add them to the newly created index. - elasticsearchStore.add_texts(texts, metadatas=metadatas) + elasticsearchStore.add_texts( + texts, metadatas=metadatas, bulk_kwargs=bulk_kwargs + ) return elasticsearchStore @@ -985,6 +1009,7 @@ class ElasticsearchStore(VectorStore): cls, documents: List[Document], embedding: Optional[Embeddings] = None, + bulk_kwargs: Optional[Dict] = None, **kwargs: Any, ) -> "ElasticsearchStore": """Construct ElasticsearchStore wrapper from documents. @@ -1018,13 +1043,15 @@ class ElasticsearchStore(VectorStore): vector_query_field: Optional. Name of the field to store the embedding vectors in. query_field: Optional. Name of the field to store the texts in. + bulk_kwargs: Optional. Additional arguments to pass to + Elasticsearch bulk. """ elasticsearchStore = ElasticsearchStore._create_cls_from_kwargs( embedding=embedding, **kwargs ) # Encode the provided texts and add them to the newly created index. - elasticsearchStore.add_documents(documents) + elasticsearchStore.add_documents(documents, bulk_kwargs=bulk_kwargs) return elasticsearchStore diff --git a/libs/langchain/tests/integration_tests/vectorstores/docker-compose/elasticsearch.yml b/libs/langchain/tests/integration_tests/vectorstores/docker-compose/elasticsearch.yml index 6d27d63a5d..989c900da0 100644 --- a/libs/langchain/tests/integration_tests/vectorstores/docker-compose/elasticsearch.yml +++ b/libs/langchain/tests/integration_tests/vectorstores/docker-compose/elasticsearch.yml @@ -7,6 +7,7 @@ services: - discovery.type=single-node - xpack.security.enabled=false # security has been disabled, so no login or password is required. - xpack.security.http.ssl.enabled=false + - xpack.license.self_generated.type=trial ports: - "9200:9200" healthcheck: diff --git a/libs/langchain/tests/integration_tests/vectorstores/test_elasticsearch.py b/libs/langchain/tests/integration_tests/vectorstores/test_elasticsearch.py index dafba4b1f7..64ce2dd77b 100644 --- a/libs/langchain/tests/integration_tests/vectorstores/test_elasticsearch.py +++ b/libs/langchain/tests/integration_tests/vectorstores/test_elasticsearch.py @@ -1,8 +1,9 @@ """Test ElasticSearch functionality.""" import logging import os +import re import uuid -from typing import Generator, List, Union +from typing import Any, Dict, Generator, List, Union import pytest @@ -58,19 +59,20 @@ class TestElasticsearch: es_password = os.environ.get("ES_PASSWORD", "changeme") if cloud_id: + es = Elasticsearch( + cloud_id=cloud_id, + basic_auth=(es_username, es_password), + ) yield { "es_cloud_id": cloud_id, "es_user": es_username, "es_password": es_password, } - es = Elasticsearch(cloud_id=cloud_id, basic_auth=(es_username, es_password)) else: # Running this integration test with local docker instance - yield { - "es_url": es_url, - } es = Elasticsearch(hosts=es_url) + yield {"es_url": es_url} # Clear all indexes index_names = es.indices.get(index="_all").keys() @@ -92,6 +94,37 @@ class TestElasticsearch: except Exception: pass + @pytest.fixture(scope="function") + def es_client(self) -> Any: + # Running this integration test with Elastic Cloud + # Required for in-stack inference testing (ELSER + model_id) + from elastic_transport import Transport + from elasticsearch import Elasticsearch + + class CustomTransport(Transport): + requests = [] + + def perform_request(self, *args, **kwargs): # type: ignore + self.requests.append(kwargs) + return super().perform_request(*args, **kwargs) + + es_url = os.environ.get("ES_URL", "http://localhost:9200") + cloud_id = os.environ.get("ES_CLOUD_ID") + es_username = os.environ.get("ES_USERNAME", "elastic") + es_password = os.environ.get("ES_PASSWORD", "changeme") + + if cloud_id: + es = Elasticsearch( + cloud_id=cloud_id, + basic_auth=(es_username, es_password), + transport_class=CustomTransport, + ) + return es + else: + # Running this integration test with local docker instance + es = Elasticsearch(hosts=es_url, transport_class=CustomTransport) + return es + @pytest.fixture(scope="function") def index_name(self) -> str: """Return the index name.""" @@ -115,7 +148,6 @@ class TestElasticsearch: return query_body texts = ["foo", "bar", "baz"] - print(elasticsearch_connection) docsearch = ElasticsearchStore.from_texts( texts, FakeEmbeddings(), @@ -131,7 +163,6 @@ class TestElasticsearch: ) -> None: """Test end to end construction and search without metadata.""" texts = ["foo", "bar", "baz"] - print(elasticsearch_connection) docsearch = ElasticsearchStore.from_texts( texts, FakeEmbeddings(), @@ -607,3 +638,60 @@ class TestElasticsearch: log_message = f"First error reason: {error_reason}" assert log_message in caplog.text + + def test_elasticsearch_with_user_agent( + self, es_client: Any, index_name: str + ) -> None: + """Test to make sure the user-agent is set correctly.""" + + texts = ["foo", "bob", "baz"] + ElasticsearchStore.from_texts( + texts, + FakeEmbeddings(), + es_connection=es_client, + index_name=index_name, + ) + + user_agent = es_client.transport.requests[0]["headers"]["User-Agent"] + pattern = r"^langchain-py-vs/\d+\.\d+\.\d+$" + match = re.match(pattern, user_agent) + + assert ( + match is not None + ), f"The string '{user_agent}' does not match the expected pattern." + + def test_elasticsearch_with_internal_user_agent( + self, elasticsearch_connection: Dict, index_name: str + ) -> None: + """Test to make sure the user-agent is set correctly.""" + + texts = ["foo"] + store = ElasticsearchStore.from_texts( + texts, + FakeEmbeddings(), + **elasticsearch_connection, + index_name=index_name, + ) + + user_agent = store.client._headers["User-Agent"] + pattern = r"^langchain-py-vs/\d+\.\d+\.\d+$" + match = re.match(pattern, user_agent) + + assert ( + match is not None + ), f"The string '{user_agent}' does not match the expected pattern." + + def test_bulk_args(self, es_client: Any, index_name: str) -> None: + """Test to make sure the user-agent is set correctly.""" + + texts = ["foo", "bob", "baz"] + ElasticsearchStore.from_texts( + texts, + FakeEmbeddings(), + es_connection=es_client, + index_name=index_name, + bulk_kwargs={"chunk_size": 1}, + ) + + # 1 for index exist, 1 for index create, 3 for index docs + assert len(es_client.transport.requests) == 5 # type: ignore