ElasticsearchStore: improve error logging for adding documents (#9648)

Not obvious what the error is when you cannot index. This pr adds the
ability to log the first errors reason, to help the user diagnose the
issue.

Also added some more documentation for when you want to use the
vectorstore with an embedding model deployed in elasticsearch.

Credit: @elastic and @phoey1
This commit is contained in:
Joseph McElroy 2023-08-23 15:04:09 +01:00 committed by GitHub
parent b379c5f9c8
commit 2a06e7b216
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 124 additions and 39 deletions

View File

@ -14,6 +14,16 @@
"This notebook shows how to use functionality related to the `Elasticsearch` database."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e5bbffe2",
"metadata": {},
"outputs": [],
"source": [
"!pip install elasticsearch openai tiktoken langchain"
]
},
{
"cell_type": "markdown",
"id": "b66c12b2-2a07-4136-ac77-ce1c9fa7a409",
@ -119,33 +129,6 @@
"```"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "d6197931-cbe5-460c-a5e6-b5eedb83887c",
"metadata": {
"id": "d6197931-cbe5-460c-a5e6-b5eedb83887c",
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: elasticsearch in /Users/joe/Library/Caches/pypoetry/virtualenvs/langchain-monorepo-ln7dNLl5-py3.10/lib/python3.10/site-packages (8.9.0)\n",
"Requirement already satisfied: elastic-transport<9,>=8 in /Users/joe/Library/Caches/pypoetry/virtualenvs/langchain-monorepo-ln7dNLl5-py3.10/lib/python3.10/site-packages (from elasticsearch) (8.4.0)\n",
"Requirement already satisfied: urllib3<2,>=1.26.2 in /Users/joe/Library/Caches/pypoetry/virtualenvs/langchain-monorepo-ln7dNLl5-py3.10/lib/python3.10/site-packages (from elastic-transport<9,>=8->elasticsearch) (1.26.16)\n",
"Requirement already satisfied: certifi in /Users/joe/Library/Caches/pypoetry/virtualenvs/langchain-monorepo-ln7dNLl5-py3.10/lib/python3.10/site-packages (from elastic-transport<9,>=8->elasticsearch) (2023.7.22)\n",
"\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m23.2\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m23.2.1\u001b[0m\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n"
]
}
],
"source": [
"!pip install elasticsearch"
]
},
{
"cell_type": "markdown",
"id": "ea167a29",
@ -528,24 +511,89 @@
"\n",
"To use this, specify the model_id in `ElasticsearchStore` `ApproxRetrievalStrategy` constructor via the `query_model_id` argument.\n",
"\n",
"**NOTE** This requires the model to be deployed and running in Elasticsearch ml node. \n",
"\n",
"```python\n",
"**NOTE** This requires the model to be deployed and running in Elasticsearch ml node. See [notebook example](https://github.com/elastic/elasticsearch-labs/blob/main/notebooks/integrations/hugging-face/loading-model-from-hugging-face.ipynb) on how to deploy the model with eland.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0a0c85e7",
"metadata": {},
"outputs": [],
"source": [
"APPROX_SELF_DEPLOYED_INDEX_NAME = \"test-approx-self-deployed\"\n",
"\n",
"# Note: This does not have an embedding function specified\n",
"# Instead, we will use the embedding model deployed in Elasticsearch\n",
"db = ElasticsearchStore( \n",
" es_url=\"http://localhost:9200\", \n",
" index_name=\"test\",\n",
" es_cloud_id=\"<your cloud id>\",\n",
" es_user=\"elastic\",\n",
" es_password=\"<your password>\", \n",
" index_name=APPROX_SELF_DEPLOYED_INDEX_NAME,\n",
" query_field=\"text_field\",\n",
" vector_query_field=\"vector_query_field.predicted_value\",\n",
" strategy=ElasticsearchStore.ApproxRetrievalStrategy(\n",
" query_model_id=\"sentence-transformers__all-minilm-l6-v2\"\n",
" )\n",
")\n",
"\n",
"# Perform search\n",
"db.similarity_search(\"hello world\", k=10)\n",
"```\n",
"# Setup a Ingest Pipeline to perform the embedding\n",
"# of the text field\n",
"db.client.ingest.put_pipeline(\n",
" id=\"test_pipeline\",\n",
" processors=[\n",
" {\n",
" \"inference\": {\n",
" \"model_id\": \"sentence-transformers__all-minilm-l6-v2\",\n",
" \"field_map\": {\"query_field\": \"text_field\"},\n",
" \"target_field\": \"vector_query_field\",\n",
" }\n",
" }\n",
" ],\n",
")\n",
"\n",
"# creating a new index with the pipeline,\n",
"# not relying on langchain to create the index\n",
"db.client.indices.create(\n",
" index=APPROX_SELF_DEPLOYED_INDEX_NAME,\n",
" mappings={\n",
" \"properties\": {\n",
" \"text_field\": {\"type\": \"text\"},\n",
" \"vector_query_field\": {\n",
" \"properties\": {\n",
" \"predicted_value\": {\n",
" \"type\": \"dense_vector\",\n",
" \"dims\": 384,\n",
" \"index\": True,\n",
" \"similarity\": \"l2_norm\",\n",
" }\n",
" }\n",
" },\n",
" }\n",
" },\n",
" settings={\"index\": {\"default_pipeline\": \"test_pipeline\"}},\n",
")\n",
"\n",
"db.from_texts([\"hello world\"], \n",
" es_cloud_id=\"<cloud id>\",\n",
" es_user=\"elastic\",\n",
" es_password=\"<cloud password>\", \n",
" index_name=APPROX_SELF_DEPLOYED_INDEX_NAME,\n",
" query_field=\"text_field\",\n",
" vector_query_field=\"vector_query_field.predicted_value\",\n",
" strategy=ElasticsearchStore.ApproxRetrievalStrategy(\n",
" query_model_id=\"sentence-transformers__all-minilm-l6-v2\"\n",
" ))\n",
"\n",
"# Perform search\n",
"db.similarity_search(\"hello world\", k=10)"
]
},
{
"cell_type": "markdown",
"id": "53959de6",
"metadata": {},
"source": [
"## SparseVectorRetrievalStrategy (ELSER)\n",
"This strategy uses Elasticsearch's sparse vector retrieval to retrieve the top-k results. We only support our own \"ELSER\" embedding model for now.\n",
"\n",

View File

@ -710,7 +710,7 @@ class ElasticsearchStore(VectorStore):
after deleting documents. Defaults to True.
"""
try:
from elasticsearch.helpers import bulk
from elasticsearch.helpers import BulkIndexError, bulk
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
@ -731,8 +731,10 @@ class ElasticsearchStore(VectorStore):
logger.debug(f"Deleted {len(body)} texts from index")
return True
except Exception as e:
except BulkIndexError as e:
logger.error(f"Error deleting texts: {e}")
firstError = e.errors[0].get("index", {}).get("error", {})
logger.error(f"First error reason: {firstError.get('reason')}")
raise e
else:
@ -801,7 +803,7 @@ class ElasticsearchStore(VectorStore):
List of ids from adding the texts into the vectorstore.
"""
try:
from elasticsearch.helpers import bulk
from elasticsearch.helpers import BulkIndexError, bulk
except ImportError:
raise ImportError(
"Could not import elasticsearch python package. "
@ -867,8 +869,10 @@ class ElasticsearchStore(VectorStore):
logger.debug(f"added texts {ids} to index")
return ids
except Exception as e:
except BulkIndexError as e:
logger.error(f"Error adding texts: {e}")
firstError = e.errors[0].get("index", {}).get("error", {})
logger.error(f"First error reason: {firstError.get('reason')}")
raise e
else:

View File

@ -5,6 +5,7 @@ import uuid
from typing import Generator, List, Union
import pytest
from elasticsearch.helpers import BulkIndexError
from langchain.docstore.document import Document
from langchain.vectorstores.elasticsearch import ElasticsearchStore
@ -480,6 +481,8 @@ class TestElasticsearch:
document={"text_field": text, "metadata": {}},
)
docsearch.client.indices.refresh(index=index_name)
def assert_query(query_body: dict, query: str) -> dict:
assert query_body == {
"knn": {
@ -574,3 +577,33 @@ class TestElasticsearch:
docsearch.delete([ids[3]])
output = docsearch.similarity_search("gni", k=10)
assert len(output) == 0
def test_elasticsearch_indexing_exception_error(
self,
elasticsearch_connection: dict,
index_name: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test bulk exception logging is giving better hints."""
docsearch = ElasticsearchStore(
embedding=ConsistentFakeEmbeddings(),
**elasticsearch_connection,
index_name=index_name,
)
docsearch.client.indices.create(
index=index_name,
mappings={"properties": {}},
settings={"index": {"default_pipeline": "not-existing-pipeline"}},
)
texts = ["foo"]
with pytest.raises(BulkIndexError):
docsearch.add_texts(texts)
error_reason = "pipeline with id [not-existing-pipeline] does not exist"
log_message = f"First error reason: {error_reason}"
assert log_message in caplog.text