Elasticsearch Store Improvements (#8636)

Todo:
- [x] Connection options (cloud, localhost url, es_connection) support
- [x] Logging support
- [x] Customisable field support
- [x] Distance Similarity support 
- [x] Metadata support
  - [x] Metadata Filter support 
- [x] Retrieval Strategies
  - [x] Approx
  - [x] Approx with Hybrid
  - [x] Exact
  - [x] Custom 
  - [x] ELSER (excluding hybrid as we are working on RRF support)
- [x] integration tests 
- [x] Documentation

👋 this is a contribution to improve Elasticsearch integration with
Langchain. Its based loosely on the changes that are in master but with
some notable changes:

## Package name & design improvements
The import name is now `ElasticsearchStore`, to aid discoverability of
the VectorStore.

```py
## Before
from langchain.vectorstores.elastic_vector_search import ElasticVectorSearch, ElasticKnnSearch

## Now
from langchain.vectorstores.elasticsearch import ElasticsearchStore
```

## Retrieval Strategy support
Before we had a number of classes, depending on the strategy you wanted.
`ElasticKnnSearch` for approx, `ElasticVectorSearch` for exact / brute
force.

With `ElasticsearchStore` we have retrieval strategies:

### Approx Example
Default strategy for the vast majority of developers who use
Elasticsearch will be inferring the embeddings from outside of
Elasticsearch. Uses KNN functionality of _search.

```py
        texts = ["foo", "bar", "baz"]
       docsearch = ElasticsearchStore.from_texts(
            texts,
            FakeEmbeddings(),
            es_url="http://localhost:9200",
            index_name="sample-index"
        )
        output = docsearch.similarity_search("foo", k=1)
```

### Approx, with hybrid
Developers who want to search, using both the embedding and the text
bm25 match. Its simple to enable.

```py
 texts = ["foo", "bar", "baz"]
       docsearch = ElasticsearchStore.from_texts(
            texts,
            FakeEmbeddings(),
            es_url="http://localhost:9200",
            index_name="sample-index",
            strategy=ElasticsearchStore.ApproxRetrievalStrategy(hybrid=True)
        )
        output = docsearch.similarity_search("foo", k=1)
```

### Approx, with `query_model_id`
Developers who want to infer within Elasticsearch, using the model
loaded in the ml node.

This relies on the developer to setup the pipeline and index if they
wish to embed the text in Elasticsearch. Example of this in the test.

```py
 texts = ["foo", "bar", "baz"]
       docsearch = ElasticsearchStore.from_texts(
            texts,
            FakeEmbeddings(),
            es_url="http://localhost:9200",
            index_name="sample-index",
            strategy=ElasticsearchStore.ApproxRetrievalStrategy(
                query_model_id="sentence-transformers__all-minilm-l6-v2"
            ),
        )
        output = docsearch.similarity_search("foo", k=1)
```

### I want to provide my own custom Elasticsearch Query
You might want to have more control over the query, to perform
multi-phase retrieval such as LTR, linearly boosting on document
parameters like recently updated or geo-distance. You can do this with
`custom_query_fn`

```py
        def my_custom_query(query_body: dict, query: str) -> dict:
            return {"query": {"match": {"text": {"query": "bar"}}}}

        texts = ["foo", "bar", "baz"]
        docsearch = ElasticsearchStore.from_texts(
            texts, FakeEmbeddings(), **elasticsearch_connection, index_name=index_name
        )
        docsearch.similarity_search("foo", k=1, custom_query=my_custom_query)

```

### Exact Example
Developers who have a small dataset in Elasticsearch, dont want the cost
of indexing the dims vs tradeoff on cost at query time. Uses
script_score.

```py
        texts = ["foo", "bar", "baz"]
       docsearch = ElasticsearchStore.from_texts(
            texts,
            FakeEmbeddings(),
            es_url="http://localhost:9200",
            index_name="sample-index",
            strategy=ElasticsearchStore.ExactRetrievalStrategy(),
        )
        output = docsearch.similarity_search("foo", k=1)
```

### ELSER Example
Elastic provides its own sparse vector model called ELSER. With these
changes, its really easy to use. The vector store creates a pipeline and
index thats setup for ELSER. All the developer needs to do is configure,
ingest and query via langchain tooling.

```py
texts = ["foo", "bar", "baz"]
       docsearch = ElasticsearchStore.from_texts(
            texts,
            FakeEmbeddings(),
            es_url="http://localhost:9200",
            index_name="sample-index",
            strategy=ElasticsearchStore.SparseVectorStrategy(),
        )
        output = docsearch.similarity_search("foo", k=1)

```

## Architecture
In future, we can introduce new strategies and allow us to not break bwc
as we evolve the index / query strategy.

## Credit
On release, could you credit @elastic and @phoey1 please? Thank you!

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
pull/9242/head
Joseph McElroy 1 year ago committed by GitHub
parent 71d5b7c9bf
commit eac4ddb4bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,24 +1,54 @@
# Elasticsearch
>[Elasticsearch](https://www.elastic.co/elasticsearch/) is a distributed, RESTful search and analytics engine.
> It provides a distributed, multi-tenant-capable full-text search engine with an HTTP web interface and schema-free
> [Elasticsearch](https://www.elastic.co/elasticsearch/) is a distributed, RESTful search and analytics engine.
> It provides a distributed, multi-tenant-capable full-text search engine with an HTTP web interface and schema-free
> JSON documents.
## Installation and Setup
There are two ways to get started with Elasticsearch:
#### Install Elasticsearch on your local machine via docker
Example: Run a single-node Elasticsearch instance with security disabled. This is not recommended for production use.
```bash
pip install elasticsearch
docker run -p 9200:9200 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.9.0
```
## Retriever
#### Deploy Elasticsearch on Elastic Cloud
Elastic Cloud is a managed Elasticsearch service. Signup for a [free trial](https://cloud.elastic.co/registration?storm=langchain-notebook).
>In information retrieval, [Okapi BM25](https://en.wikipedia.org/wiki/Okapi_BM25) (BM is an abbreviation of best matching) is a ranking function used by search engines to estimate the relevance of documents to a given search query. It is based on the probabilistic retrieval framework developed in the 1970s and 1980s by Stephen E. Robertson, Karen Spärck Jones, and others.
### Install Client
```bash
pip install elasticsearch
```
>The name of the actual ranking function is BM25. The fuller name, Okapi BM25, includes the name of the first system to use it, which was the Okapi information retrieval system, implemented at London's City University in the 1980s and 1990s. BM25 and its newer variants, e.g. BM25F (a version of BM25 that can take document structure and anchor text into account), represent TF-IDF-like retrieval functions used in document retrieval.
## Vector Store
See a [usage example](/docs/integrations/retrievers/elastic_search_bm25).
The vector store is a simple wrapper around Elasticsearch. It provides a simple interface to store and retrieve vectors.
```python
from langchain.retrievers import ElasticSearchBM25Retriever
from langchain.vectorstores import ElasticsearchStore
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
loader = TextLoader("./state_of_the_union.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
db = ElasticsearchStore.from_documents(
docs, embeddings, es_url="http://localhost:9200", index_name="test-basic",
)
db.client.indices.refresh(index="test-basic")
query = "What did the president say about Ketanji Brown Jackson"
results = db.similarity_search(query)
```

File diff suppressed because it is too large Load Diff

@ -40,6 +40,7 @@ from langchain.vectorstores.elastic_vector_search import (
ElasticKnnSearch,
ElasticVectorSearch,
)
from langchain.vectorstores.elasticsearch import ElasticsearchStore
from langchain.vectorstores.faiss import FAISS
from langchain.vectorstores.hologres import Hologres
from langchain.vectorstores.lancedb import LanceDB
@ -88,6 +89,7 @@ __all__ = [
"DocArrayInMemorySearch",
"ElasticVectorSearch",
"ElasticKnnSearch",
"ElasticsearchStore",
"FAISS",
"PGEmbedding",
"Hologres",

@ -1,7 +1,8 @@
"""Wrapper around Elasticsearch vector database."""
"""[DEPRECATED] Please use ElasticsearchStore instead."""
from __future__ import annotations
import uuid
import warnings
from abc import ABC
from typing import (
TYPE_CHECKING,
@ -15,6 +16,7 @@ from typing import (
Union,
)
from langchain._api import deprecated
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.utils import get_from_dict_or_env
@ -50,13 +52,7 @@ def _default_script_query(query_vector: List[float], filter: Optional[dict]) ->
}
# ElasticVectorSearch is a concrete implementation of the abstract base class
# VectorStore, which defines a common interface for all vector database
# implementations. By inheriting from the ABC class, ElasticVectorSearch can be
# defined as an abstract base class itself, allowing the creation of subclasses with
# their own specific implementations. If you plan to subclass ElasticVectorSearch,
# you can inherit from it and define your own implementation of the necessary methods
# and attributes.
@deprecated("0.0.265", alternative="ElasticsearchStore class.", pending=True)
class ElasticVectorSearch(VectorStore, ABC):
"""Wrapper around Elasticsearch as a vector database.
@ -136,6 +132,11 @@ class ElasticVectorSearch(VectorStore, ABC):
ssl_verify: Optional[Dict[str, Any]] = None,
):
"""Initialize with necessary components."""
warnings.warn(
"ElasticVectorSearch will be removed in a future release. See"
"Elasticsearch integration docs on how to upgrade."
)
try:
import elasticsearch
except ImportError:
@ -392,6 +393,11 @@ class ElasticKnnSearch(VectorStore, ABC):
"Please install it with `pip install elasticsearch`."
)
warnings.warn(
"ElasticKnnSearch will be removed in a future release."
"Use ElasticsearchStore instead. See Elasticsearch "
"integration docs on how to upgrade."
)
self.embedding = embedding
self.index_name = index_name
self.query_field = query_field

File diff suppressed because it is too large Load Diff

@ -2,7 +2,7 @@ version: "3"
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.7.0 # https://www.docker.elastic.co/r/elasticsearch/elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:8.9.0 # https://www.docker.elastic.co/r/elasticsearch/elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false # security has been disabled, so no login or password is required.
@ -10,17 +10,25 @@ services:
ports:
- "9200:9200"
healthcheck:
test: [ "CMD-SHELL", "curl --silent --fail http://localhost:9200/_cluster/health || exit 1" ]
test:
[
"CMD-SHELL",
"curl --silent --fail http://localhost:9200/_cluster/health || exit 1",
]
interval: 10s
retries: 60
kibana:
image: docker.elastic.co/kibana/kibana:8.7.0
image: docker.elastic.co/kibana/kibana:8.9.0
environment:
- ELASTICSEARCH_URL=http://elasticsearch:9200
ports:
- "5601:5601"
healthcheck:
test: [ "CMD-SHELL", "curl --silent --fail http://localhost:5601/login || exit 1" ]
test:
[
"CMD-SHELL",
"curl --silent --fail http://localhost:5601/login || exit 1",
]
interval: 10s
retries: 60

@ -0,0 +1,160 @@
"""Test ElasticSearch functionality."""
import logging
import os
import uuid
from typing import Generator, List, Union
import pytest
from langchain.docstore.document import Document
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores.elastic_vector_search import ElasticVectorSearch
from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings
logging.basicConfig(level=logging.DEBUG)
"""
cd tests/integration_tests/vectorstores/docker-compose
docker-compose -f elasticsearch.yml up
"""
class TestElasticsearch:
@classmethod
def setup_class(cls) -> None:
if not os.getenv("OPENAI_API_KEY"):
raise ValueError("OPENAI_API_KEY environment variable is not set")
@pytest.fixture(scope="class", autouse=True)
def elasticsearch_url(self) -> Union[str, Generator[str, None, None]]:
"""Return the elasticsearch url."""
from elasticsearch import Elasticsearch
url = "http://localhost:9200"
yield url
es = Elasticsearch(hosts=url)
# Clear all indexes
index_names = es.indices.get(index="_all").keys()
for index_name in index_names:
# print(index_name)
es.indices.delete(index=index_name)
def test_similarity_search_without_metadata(self, elasticsearch_url: str) -> None:
"""Test end to end construction and search without metadata."""
texts = ["foo", "bar", "baz"]
docsearch = ElasticVectorSearch.from_texts(
texts, FakeEmbeddings(), elasticsearch_url=elasticsearch_url
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
@pytest.mark.skip(
reason="Docker build has no ssl certs. Enable this test when testing with ssl."
)
def test_similarity_search_with_ssl_verify(self, elasticsearch_url: str) -> None:
"""Test end to end construction and search with ssl verify."""
ssl_verify = {
"verify_certs": True,
"basic_auth": ("ES_USER", "ES_PASSWORD"),
"ca_certs": "ES_CA_CERTS_PATH",
}
texts = ["foo", "bar", "baz"]
docsearch = ElasticVectorSearch.from_texts(
texts,
FakeEmbeddings(),
elasticsearch_url="http://localhost:9200",
ssl_verify=ssl_verify,
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
def test_similarity_search_with_metadata(self, elasticsearch_url: str) -> None:
"""Test end to end construction and search with metadata."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = ElasticVectorSearch.from_texts(
texts,
FakeEmbeddings(),
metadatas=metadatas,
elasticsearch_url=elasticsearch_url,
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"page": 0})]
@pytest.mark.vcr(ignore_localhost=True)
def test_default_index_from_documents(
self,
documents: List[Document],
embedding_openai: OpenAIEmbeddings,
elasticsearch_url: str,
) -> None:
"""This test checks the construction of a default
ElasticSearch index using the 'from_documents'."""
elastic_vector_search = ElasticVectorSearch.from_documents(
documents=documents,
embedding=embedding_openai,
elasticsearch_url=elasticsearch_url,
)
search_result = elastic_vector_search.similarity_search("sharks")
assert len(search_result) != 0
@pytest.mark.vcr(ignore_localhost=True)
def test_custom_index_from_documents(
self,
documents: List[Document],
embedding_openai: OpenAIEmbeddings,
elasticsearch_url: str,
) -> None:
"""This test checks the construction of a custom
ElasticSearch index using the 'from_documents'."""
from elasticsearch import Elasticsearch
index_name = f"custom_index_{uuid.uuid4().hex}"
elastic_vector_search = ElasticVectorSearch.from_documents(
documents=documents,
embedding=embedding_openai,
elasticsearch_url=elasticsearch_url,
index_name=index_name,
)
es = Elasticsearch(hosts=elasticsearch_url)
index_names = es.indices.get(index="_all").keys()
assert index_name in index_names
search_result = elastic_vector_search.similarity_search("sharks")
assert len(search_result) != 0
@pytest.mark.vcr(ignore_localhost=True)
def test_custom_index_add_documents(
self,
documents: List[Document],
embedding_openai: OpenAIEmbeddings,
elasticsearch_url: str,
) -> None:
"""This test checks the construction of a custom
ElasticSearch index using the 'add_documents'."""
from elasticsearch import Elasticsearch
index_name = f"custom_index_{uuid.uuid4().hex}"
elastic_vector_search = ElasticVectorSearch(
embedding=embedding_openai,
elasticsearch_url=elasticsearch_url,
index_name=index_name,
)
es = Elasticsearch(hosts=elasticsearch_url)
elastic_vector_search.add_documents(documents)
index_names = es.indices.get(index="_all").keys()
assert index_name in index_names
search_result = elastic_vector_search.similarity_search("sharks")
assert len(search_result) != 0
def test_custom_index_add_documents_to_exists_store(self) -> None:
# TODO: implement it
pass

@ -7,17 +7,38 @@ from typing import Generator, List, Union
import pytest
from langchain.docstore.document import Document
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores.elastic_vector_search import ElasticVectorSearch
from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings
from langchain.vectorstores.elasticsearch import ElasticsearchStore
from tests.integration_tests.vectorstores.fake_embeddings import (
ConsistentFakeEmbeddings,
FakeEmbeddings,
)
logging.basicConfig(level=logging.DEBUG)
"""
cd tests/integration_tests/vectorstores/docker-compose
docker-compose -f elasticsearch.yml up
By default runs against local docker instance of Elasticsearch.
To run against Elastic Cloud, set the following environment variables:
- ES_CLOUD_ID
- ES_USERNAME
- ES_PASSWORD
Some of the tests require the following models to be deployed in the ML Node:
- elser (can be downloaded and deployed through Kibana and trained models UI)
- sentence-transformers__all-minilm-l6-v2 (can be deployed
through API, loaded via eland)
These tests that require the models to be deployed are skipped by default.
Enable them by adding the model name to the modelsDeployed list below.
"""
modelsDeployed: List[str] = [
# "elser",
# "sentence-transformers__all-minilm-l6-v2",
]
class TestElasticsearch:
@classmethod
@ -26,135 +47,530 @@ class TestElasticsearch:
raise ValueError("OPENAI_API_KEY environment variable is not set")
@pytest.fixture(scope="class", autouse=True)
def elasticsearch_url(self) -> Union[str, Generator[str, None, None]]:
"""Return the elasticsearch url."""
def elasticsearch_connection(self) -> Union[dict, Generator[dict, None, None]]:
# Running this integration test with Elastic Cloud
# Required for in-stack inference testing (ELSER + model_id)
from elasticsearch import Elasticsearch
url = "http://localhost:9200"
yield url
es = Elasticsearch(hosts=url)
es_url = os.environ.get("ES_URL", "http://localhost:9200")
cloud_id = os.environ.get("ES_CLOUD_ID")
es_username = os.environ.get("ES_USERNAME", "elastic")
es_password = os.environ.get("ES_PASSWORD", "changeme")
if cloud_id:
yield {
"es_cloud_id": cloud_id,
"es_user": es_username,
"es_password": es_password,
}
es = Elasticsearch(cloud_id=cloud_id, basic_auth=(es_username, es_password))
else:
# Running this integration test with local docker instance
yield {
"es_url": es_url,
}
es = Elasticsearch(hosts=es_url)
# Clear all indexes
index_names = es.indices.get(index="_all").keys()
for index_name in index_names:
# print(index_name)
es.indices.delete(index=index_name)
if index_name.startswith("test_"):
es.indices.delete(index=index_name)
es.indices.refresh(index="_all")
# clear all test pipelines
try:
response = es.ingest.get_pipeline(id="test_*,*_sparse_embedding")
for pipeline_id, _ in response.items():
try:
es.ingest.delete_pipeline(id=pipeline_id)
print(f"Deleted pipeline: {pipeline_id}")
except Exception as e:
print(f"Pipeline error: {e}")
except Exception:
pass
def test_similarity_search_without_metadata(self, elasticsearch_url: str) -> None:
@pytest.fixture(scope="function")
def index_name(self) -> str:
"""Return the index name."""
return f"test_{uuid.uuid4().hex}"
def test_similarity_search_without_metadata(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test end to end construction and search without metadata."""
def assert_query(query_body: dict, query: str) -> dict:
assert query_body == {
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0],
}
}
return query_body
texts = ["foo", "bar", "baz"]
docsearch = ElasticVectorSearch.from_texts(
texts, FakeEmbeddings(), elasticsearch_url=elasticsearch_url
print(elasticsearch_connection)
docsearch = ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
**elasticsearch_connection,
index_name=index_name,
)
output = docsearch.similarity_search("foo", k=1)
output = docsearch.similarity_search("foo", k=1, custom_query=assert_query)
assert output == [Document(page_content="foo")]
def test_similarity_search_with_ssl_verify(self, elasticsearch_url: str) -> None:
"""Test end to end construction and search with ssl verify."""
ssl_verify = {
"verify_certs": True,
"basic_auth": ("ES_USER", "ES_PASSWORD"),
"ca_certs": "ES_CA_CERTS_PATH",
}
@pytest.mark.asyncio
async def test_similarity_search_without_metadat_async(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test end to end construction and search without metadata."""
texts = ["foo", "bar", "baz"]
docsearch = ElasticVectorSearch.from_texts(
print(elasticsearch_connection)
docsearch = ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
elasticsearch_url=elasticsearch_url,
ssl_verify=ssl_verify,
**elasticsearch_connection,
index_name=index_name,
)
output = await docsearch.asimilarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
def test_similarity_search_with_metadata(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test end to end construction and search with metadata."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = ElasticsearchStore.from_texts(
texts,
ConsistentFakeEmbeddings(),
metadatas=metadatas,
**elasticsearch_connection,
index_name=index_name,
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"page": 0})]
output = docsearch.similarity_search("bar", k=1)
assert output == [Document(page_content="bar", metadata={"page": 1})]
def test_similarity_search_with_filter(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test end to end construction and search with metadata."""
texts = ["foo", "foo", "foo"]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
metadatas=metadatas,
**elasticsearch_connection,
index_name=index_name,
)
def assert_query(query_body: dict, query: str) -> dict:
assert query_body == {
"knn": {
"field": "vector",
"filter": [{"term": {"metadata.page": "1"}}],
"k": 3,
"num_candidates": 50,
"query_vector": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0],
}
}
return query_body
output = docsearch.similarity_search(
query="foo",
k=3,
filter=[{"term": {"metadata.page": "1"}}],
custom_query=assert_query,
)
assert output == [Document(page_content="foo", metadata={"page": 1})]
def test_similarity_search_exact_search(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test end to end construction and search with metadata."""
texts = ["foo", "bar", "baz"]
docsearch = ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
**elasticsearch_connection,
index_name=index_name,
strategy=ElasticsearchStore.ExactRetrievalStrategy(),
)
expected_query = {
"query": {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'vector') + 1.0", # noqa: E501
"params": {
"query_vector": [
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
0.0,
]
},
},
}
}
}
def assert_query(query_body: dict, query: str) -> dict:
assert query_body == expected_query
return query_body
output = docsearch.similarity_search("foo", k=1, custom_query=assert_query)
assert output == [Document(page_content="foo")]
def test_similarity_search_with_metadata(self, elasticsearch_url: str) -> None:
def test_similarity_search_exact_search_with_filter(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test end to end construction and search with metadata."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = ElasticVectorSearch.from_texts(
docsearch = ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
**elasticsearch_connection,
index_name=index_name,
metadatas=metadatas,
elasticsearch_url=elasticsearch_url,
strategy=ElasticsearchStore.ExactRetrievalStrategy(),
)
def assert_query(query_body: dict, query: str) -> dict:
expected_query = {
"query": {
"script_score": {
"query": {"bool": {"filter": [{"term": {"metadata.page": 0}}]}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'vector') + 1.0", # noqa: E501
"params": {
"query_vector": [
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
0.0,
]
},
},
}
}
}
assert query_body == expected_query
return query_body
output = docsearch.similarity_search(
"foo",
k=1,
custom_query=assert_query,
filter=[{"term": {"metadata.page": 0}}],
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"page": 0})]
@pytest.mark.vcr(ignore_localhost=True)
def test_default_index_from_documents(
self,
documents: List[Document],
embedding_openai: OpenAIEmbeddings,
elasticsearch_url: str,
def test_similarity_search_exact_search_distance_dot_product(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""This test checks the construction of a default
ElasticSearch index using the 'from_documents'."""
"""Test end to end construction and search with metadata."""
texts = ["foo", "bar", "baz"]
docsearch = ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
**elasticsearch_connection,
index_name=index_name,
strategy=ElasticsearchStore.ExactRetrievalStrategy(),
distance_strategy="DOT_PRODUCT",
)
def assert_query(query_body: dict, query: str) -> dict:
assert query_body == {
"query": {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": """
double value = dotProduct(params.query_vector, 'vector');
return sigmoid(1, Math.E, -value);
""",
"params": {
"query_vector": [
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
0.0,
]
},
},
}
}
}
return query_body
output = docsearch.similarity_search("foo", k=1, custom_query=assert_query)
assert output == [Document(page_content="foo")]
def test_similarity_search_exact_search_unknown_distance_strategy(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test end to end construction and search with unknown distance strategy."""
with pytest.raises(KeyError):
texts = ["foo", "bar", "baz"]
ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
**elasticsearch_connection,
index_name=index_name,
strategy=ElasticsearchStore.ExactRetrievalStrategy(),
distance_strategy="NOT_A_STRATEGY",
)
elastic_vector_search = ElasticVectorSearch.from_documents(
documents=documents,
embedding=embedding_openai,
elasticsearch_url=elasticsearch_url,
def test_similarity_search_approx_with_hybrid_search(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test end to end construction and search with metadata."""
texts = ["foo", "bar", "baz"]
docsearch = ElasticsearchStore.from_texts(
texts,
FakeEmbeddings(),
**elasticsearch_connection,
index_name=index_name,
strategy=ElasticsearchStore.ApproxRetrievalStrategy(hybrid=True),
)
search_result = elastic_vector_search.similarity_search("sharks")
def assert_query(query_body: dict, query: str) -> dict:
assert query_body == {
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0],
},
"query": {
"bool": {
"filter": [],
"must": [{"match": {"text": {"query": "foo"}}}],
}
},
"rank": {"rrf": {}},
}
return query_body
print(search_result)
assert len(search_result) != 0
output = docsearch.similarity_search("foo", k=1, custom_query=assert_query)
assert output == [Document(page_content="foo")]
@pytest.mark.vcr(ignore_localhost=True)
def test_custom_index_from_documents(
self,
documents: List[Document],
embedding_openai: OpenAIEmbeddings,
elasticsearch_url: str,
def test_similarity_search_approx_with_custom_query_fn(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""This test checks the construction of a custom
ElasticSearch index using the 'from_documents'."""
from elasticsearch import Elasticsearch
"""test that custom query function is called
with the query string and query body"""
index_name = f"custom_index_{uuid.uuid4().hex}"
elastic_vector_search = ElasticVectorSearch.from_documents(
documents=documents,
embedding=embedding_openai,
elasticsearch_url=elasticsearch_url,
def my_custom_query(query_body: dict, query: str) -> dict:
assert query == "foo"
assert query_body == {
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0],
}
}
return {"query": {"match": {"text": {"query": "bar"}}}}
"""Test end to end construction and search with metadata."""
texts = ["foo", "bar", "baz"]
docsearch = ElasticsearchStore.from_texts(
texts, FakeEmbeddings(), **elasticsearch_connection, index_name=index_name
)
output = docsearch.similarity_search("foo", k=1, custom_query=my_custom_query)
assert output == [Document(page_content="bar")]
@pytest.mark.skipif(
"sentence-transformers__all-minilm-l6-v2" not in modelsDeployed,
reason="Sentence Transformers model not deployed in ML Node, skipping test",
)
def test_similarity_search_with_approx_infer_instack(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""test end to end with approx retrieval strategy and inference in-stack"""
docsearch = ElasticsearchStore(
index_name=index_name,
strategy=ElasticsearchStore.ApproxRetrievalStrategy(
query_model_id="sentence-transformers__all-minilm-l6-v2"
),
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
**elasticsearch_connection,
)
es = Elasticsearch(hosts=elasticsearch_url)
index_names = es.indices.get(index="_all").keys()
assert index_name in index_names
search_result = elastic_vector_search.similarity_search("sharks")
print(search_result)
# setting up the pipeline for inference
docsearch.client.ingest.put_pipeline(
id="test_pipeline",
processors=[
{
"inference": {
"model_id": "sentence-transformers__all-minilm-l6-v2",
"field_map": {"query_field": "text_field"},
"target_field": "vector_query_field",
}
}
],
)
# creating a new index with the pipeline,
# not relying on langchain to create the index
docsearch.client.indices.create(
index=index_name,
mappings={
"properties": {
"text_field": {"type": "text"},
"vector_query_field": {
"properties": {
"predicted_value": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "l2_norm",
}
}
},
}
},
settings={"index": {"default_pipeline": "pipeline"}},
)
# adding documents to the index
texts = ["foo", "bar", "baz"]
for i, text in enumerate(texts):
docsearch.client.create(
index=index_name,
id=str(i),
document={"text_field": text, "metadata": {}},
)
assert len(search_result) != 0
def assert_query(query_body: dict, query: str) -> dict:
assert query_body == {
"knn": {
"filter": [],
"field": "vector_query_field.predicted_value",
"k": 1,
"num_candidates": 50,
"query_vector_builder": {
"text_embedding": {
"model_id": "sentence-transformers__all-minilm-l6-v2",
"model_text": "foo",
}
},
}
}
return query_body
output = docsearch.similarity_search("foo", k=1, custom_query=assert_query)
assert output == [Document(page_content="foo")]
@pytest.mark.vcr(ignore_localhost=True)
def test_custom_index_add_documents(
self,
documents: List[Document],
embedding_openai: OpenAIEmbeddings,
elasticsearch_url: str,
output = docsearch.similarity_search("bar", k=1)
assert output == [Document(page_content="bar")]
@pytest.mark.skipif(
"elser" not in modelsDeployed,
reason="ELSER not deployed in ML Node, skipping test",
)
def test_similarity_search_with_sparse_infer_instack(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""This test checks the construction of a custom
ElasticSearch index using the 'add_documents'."""
from elasticsearch import Elasticsearch
"""test end to end with sparse retrieval strategy and inference in-stack"""
texts = ["foo", "bar", "baz"]
docsearch = ElasticsearchStore.from_texts(
texts,
**elasticsearch_connection,
index_name=index_name,
strategy=ElasticsearchStore.SparseVectorRetrievalStrategy(),
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
def test_elasticsearch_with_relevance_score(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test to make sure the relevance score is scaled to 0-1."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
embeddings = FakeEmbeddings()
index_name = f"custom_index_{uuid.uuid4().hex}"
elastic_vector_search = ElasticVectorSearch(
embedding=embedding_openai,
elasticsearch_url=elasticsearch_url,
docsearch = ElasticsearchStore.from_texts(
index_name=index_name,
texts=texts,
embedding=embeddings,
metadatas=metadatas,
**elasticsearch_connection,
)
es = Elasticsearch(hosts=elasticsearch_url)
elastic_vector_search.add_documents(documents)
index_names = es.indices.get(index="_all").keys()
assert index_name in index_names
embedded_query = embeddings.embed_query("foo")
output = docsearch.similarity_search_by_vector_with_relevance_scores(
embedding=embedded_query, k=1
)
assert output == [(Document(page_content="foo", metadata={"page": "0"}), 1.0)]
def test_elasticsearch_delete_ids(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""Test delete methods from vector store."""
texts = ["foo", "bar", "baz", "gni"]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = ElasticsearchStore(
embedding=ConsistentFakeEmbeddings(),
**elasticsearch_connection,
index_name=index_name,
)
ids = docsearch.add_texts(texts, metadatas)
output = docsearch.similarity_search("foo", k=10)
assert len(output) == 4
docsearch.delete(ids[1:3])
output = docsearch.similarity_search("foo", k=10)
assert len(output) == 2
search_result = elastic_vector_search.similarity_search("sharks")
print(search_result)
docsearch.delete(["not-existing"])
output = docsearch.similarity_search("foo", k=10)
assert len(output) == 2
assert len(search_result) != 0
docsearch.delete([ids[0]])
output = docsearch.similarity_search("foo", k=10)
assert len(output) == 1
def test_custom_index_add_documents_to_exists_store(self) -> None:
# TODO: implement it
pass
docsearch.delete([ids[3]])
output = docsearch.similarity_search("gni", k=10)
assert len(output) == 0

Loading…
Cancel
Save