VectorStore Infinispan: adding TLS and authentication

pull/21910/head
rigazilla 1 month ago
parent 242eeb537f
commit 601ab12ef3

@ -5,9 +5,10 @@ from __future__ import annotations
import json
import logging
import uuid
from typing import Any, Iterable, List, Optional, Tuple, Type, cast
from typing import Any, Iterable, List, Optional, Tuple, Type, Union, cast
import requests
import httpx
from httpx import DigestAuth
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
@ -121,7 +122,7 @@ repeated float %s = 1;
metadata_proto += "}\n"
return metadata_proto
def schema_create(self, proto: str) -> requests.Response:
def schema_create(self, proto: str) -> httpx.Response:
"""Deploy the schema for the vector db
Args:
proto(str): protobuf schema
@ -130,14 +131,14 @@ repeated float %s = 1;
"""
return self.ispn.schema_post(self._entity_name + ".proto", proto)
def schema_delete(self) -> requests.Response:
def schema_delete(self) -> httpx.Response:
"""Delete the schema for the vector db
Returns:
An http Response containing the result of the operation
"""
return self.ispn.schema_delete(self._entity_name + ".proto")
def cache_create(self, config: str = "") -> requests.Response:
def cache_create(self, config: str = "") -> httpx.Response:
"""Create the cache for the vector db
Args:
config(str): configuration of the cache.
@ -172,14 +173,14 @@ repeated float %s = 1;
)
return self.ispn.cache_post(self._cache_name, config)
def cache_delete(self) -> requests.Response:
def cache_delete(self) -> httpx.Response:
"""Delete the cache for the vector db
Returns:
An http Response containing the result of the operation
"""
return self.ispn.cache_delete(self._cache_name)
def cache_clear(self) -> requests.Response:
def cache_clear(self) -> httpx.Response:
"""Clear the cache for the vector db
Returns:
An http Response containing the result of the operation
@ -193,14 +194,14 @@ repeated float %s = 1;
"""
return self.ispn.cache_exists(self._cache_name)
def cache_index_clear(self) -> requests.Response:
def cache_index_clear(self) -> httpx.Response:
"""Clear the index for the vector db
Returns:
An http Response containing the result of the operation
"""
return self.ispn.index_clear(self._cache_name)
def cache_index_reindex(self) -> requests.Response:
def cache_index_reindex(self) -> httpx.Response:
"""Rebuild the for the vector db
Returns:
An http Response containing the result of the operation
@ -325,12 +326,16 @@ repeated float %s = 1;
def configure(self, metadata: dict, dimension: int) -> None:
schema = self.schema_builder(metadata, dimension)
output = self.schema_create(schema)
assert output.ok, "Unable to create schema. Already exists? "
assert (
output.status_code == httpx.codes.OK
), "Unable to create schema. Already exists? "
"Consider using clear_old=True"
assert json.loads(output.text)["error"] is None
if not self.cache_exists():
output = self.cache_create()
assert output.ok, "Unable to create cache. Already exists? "
assert (
output.status_code == httpx.codes.OK
), "Unable to create cache. Already exists? "
"Consider using clear_old=True"
# Ensure index is clean
self.cache_index_clear()
@ -384,6 +389,8 @@ class Infinispan:
def __init__(self, **kwargs: Any):
self._configuration = kwargs
self._schema = str(self._configuration.get("schema", "http"))
self._user = str(self._configuration.get("user"))
self._password = str(self._configuration.get("password"))
self._host = str(self._configuration.get("hosts", ["127.0.0.1:11222"])[0])
self._default_node = self._schema + "://" + self._host
self._cache_url = str(self._configuration.get("cache_url", "/rest/v2/caches"))
@ -391,10 +398,20 @@ class Infinispan:
self._use_post_for_query = str(
self._configuration.get("use_post_for_query", True)
)
if self._user and self._password:
if self._schema == "http":
auth: Union[Tuple[str, str], DigestAuth] = httpx.DigestAuth(
username=self._user, password=self._password
)
else:
auth = (self._user, self._password)
self._h2c = httpx.Client(
http2=True, http1=False, auth=auth, verify=self._configuration.get("verify")
)
def req_query(
self, query: str, cache_name: str, local: bool = False
) -> requests.Response:
) -> httpx.Response:
"""Request a query
Args:
query(str): query requested
@ -409,7 +426,7 @@ class Infinispan:
def _query_post(
self, query_str: str, cache_name: str, local: bool = False
) -> requests.Response:
) -> httpx.Response:
api_url = (
self._default_node
+ self._cache_url
@ -420,9 +437,9 @@ class Infinispan:
)
data = {"query": query_str}
data_json = json.dumps(data)
response = requests.post(
response = self._h2c.post(
api_url,
data_json,
content=data_json,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
@ -430,7 +447,7 @@ class Infinispan:
def _query_get(
self, query_str: str, cache_name: str, local: bool = False
) -> requests.Response:
) -> httpx.Response:
api_url = (
self._default_node
+ self._cache_url
@ -441,10 +458,10 @@ class Infinispan:
+ "&local="
+ str(local)
)
response = requests.get(api_url, timeout=REST_TIMEOUT)
response = self._h2c.get(api_url, timeout=REST_TIMEOUT)
return response
def post(self, key: str, data: str, cache_name: str) -> requests.Response:
def post(self, key: str, data: str, cache_name: str) -> httpx.Response:
"""Post an entry
Args:
key(str): key of the entry
@ -454,15 +471,15 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key
response = requests.post(
response = self._h2c.post(
api_url,
data,
content=data,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
def put(self, key: str, data: str, cache_name: str) -> requests.Response:
def put(self, key: str, data: str, cache_name: str) -> httpx.Response:
"""Put an entry
Args:
key(str): key of the entry
@ -472,15 +489,15 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key
response = requests.put(
response = self._h2c.put(
api_url,
data,
content=data,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
def get(self, key: str, cache_name: str) -> requests.Response:
def get(self, key: str, cache_name: str) -> httpx.Response:
"""Get an entry
Args:
key(str): key of the entry
@ -489,12 +506,12 @@ class Infinispan:
An http Response containing the entry or errors
"""
api_url = self._default_node + self._cache_url + "/" + cache_name + "/" + key
response = requests.get(
response = self._h2c.get(
api_url, headers={"Content-Type": "application/json"}, timeout=REST_TIMEOUT
)
return response
def schema_post(self, name: str, proto: str) -> requests.Response:
def schema_post(self, name: str, proto: str) -> httpx.Response:
"""Deploy a schema
Args:
name(str): name of the schema. Will be used as a key
@ -503,10 +520,10 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._schema_url + "/" + name
response = requests.post(api_url, proto, timeout=REST_TIMEOUT)
response = self._h2c.post(api_url, content=proto, timeout=REST_TIMEOUT)
return response
def cache_post(self, name: str, config: str) -> requests.Response:
def cache_post(self, name: str, config: str) -> httpx.Response:
"""Create a cache
Args:
name(str): name of the cache.
@ -515,15 +532,15 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._cache_url + "/" + name
response = requests.post(
response = self._h2c.post(
api_url,
config,
content=config,
headers={"Content-Type": "application/json"},
timeout=REST_TIMEOUT,
)
return response
def schema_delete(self, name: str) -> requests.Response:
def schema_delete(self, name: str) -> httpx.Response:
"""Delete a schema
Args:
name(str): name of the schema.
@ -531,10 +548,10 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._schema_url + "/" + name
response = requests.delete(api_url, timeout=REST_TIMEOUT)
response = self._h2c.delete(api_url, timeout=REST_TIMEOUT)
return response
def cache_delete(self, name: str) -> requests.Response:
def cache_delete(self, name: str) -> httpx.Response:
"""Delete a cache
Args:
name(str): name of the cache.
@ -542,10 +559,10 @@ class Infinispan:
An http Response containing the result of the operation
"""
api_url = self._default_node + self._cache_url + "/" + name
response = requests.delete(api_url, timeout=REST_TIMEOUT)
response = self._h2c.delete(api_url, timeout=REST_TIMEOUT)
return response
def cache_clear(self, cache_name: str) -> requests.Response:
def cache_clear(self, cache_name: str) -> httpx.Response:
"""Clear a cache
Args:
cache_name(str): name of the cache.
@ -555,7 +572,7 @@ class Infinispan:
api_url = (
self._default_node + self._cache_url + "/" + cache_name + "?action=clear"
)
response = requests.post(api_url, timeout=REST_TIMEOUT)
response = self._h2c.post(api_url, timeout=REST_TIMEOUT)
return response
def cache_exists(self, cache_name: str) -> bool:
@ -570,18 +587,17 @@ class Infinispan:
)
return self.resource_exists(api_url)
@staticmethod
def resource_exists(api_url: str) -> bool:
def resource_exists(self, api_url: str) -> bool:
"""Check if a resource exists
Args:
api_url(str): url of the resource.
Returns:
true if resource exists
"""
response = requests.head(api_url, timeout=REST_TIMEOUT)
return response.ok
response = self._h2c.head(api_url, timeout=REST_TIMEOUT)
return response.status_code == httpx.codes.OK
def index_clear(self, cache_name: str) -> requests.Response:
def index_clear(self, cache_name: str) -> httpx.Response:
"""Clear an index on a cache
Args:
cache_name(str): name of the cache.
@ -595,9 +611,9 @@ class Infinispan:
+ cache_name
+ "/search/indexes?action=clear"
)
return requests.post(api_url, timeout=REST_TIMEOUT)
return self._h2c.post(api_url, timeout=REST_TIMEOUT)
def index_reindex(self, cache_name: str) -> requests.Response:
def index_reindex(self, cache_name: str) -> httpx.Response:
"""Rebuild index on a cache
Args:
cache_name(str): name of the cache.
@ -611,4 +627,4 @@ class Infinispan:
+ cache_name
+ "/search/indexes?action=reindex"
)
return requests.post(api_url, timeout=REST_TIMEOUT)
return self._h2c.post(api_url, timeout=REST_TIMEOUT)

@ -0,0 +1,62 @@
<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:15.0 https://infinispan.org/schemas/infinispan-config-15.0.xsd
urn:infinispan:server:15.0 https://infinispan.org/schemas/infinispan-server-15.0.xsd"
xmlns="urn:infinispan:config:15.0"
xmlns:server="urn:infinispan:server:15.0">
<cache-container name="default" statistics="true">
<transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
</cache-container>
<server xmlns="urn:infinispan:server:15.0">
<interfaces>
<interface name="public">
<inet-address value="${infinispan.bind.address:127.0.0.1}"/>
</interface>
</interfaces>
<socket-bindings default-interface="public" port-offset="${infinispan.socket.binding.port-offset:0}">
<socket-binding name="default" port="${infinispan.bind.port:11222}"/>
<socket-binding name="authenticated" port="11232"/>
<socket-binding name="auth-tls" port="11242"/>
</socket-bindings>
<security>
<credential-stores>
<credential-store name="credentials" path="credentials.pfx">
<clear-text-credential clear-text="secret"/>
</credential-store>
</credential-stores>
<security-realms>
<security-realm name="default">
<properties-realm groups-attribute="Roles">
<user-properties path="/user-config/users.properties"/>
<group-properties path="/user-config/groups.properties"/>
</properties-realm>
</security-realm>
<security-realm name="tls">
<!-- Uncomment to enable TLS on the realm -->
<server-identities>
<ssl>
<keystore path="application.keystore"
password="password" alias="server"
generate-self-signed-certificate-host="localhost"/>
</ssl>
</server-identities>
<properties-realm groups-attribute="Roles">
<user-properties path="/user-config/users.properties"/>
<group-properties path="/user-config/groups.properties"/>
</properties-realm>
</security-realm>
</security-realms>
</security>
<endpoints>
<endpoint socket-binding="default"/>
<endpoint socket-binding="authenticated" security-realm="default"/>
<endpoint socket-binding="auth-tls" security-realm="tls"/>
</endpoints>
</server>
</infinispan>

@ -0,0 +1,4 @@
#$REALM_NAME=default$
#$ALGORITHM=encrypted$
#Fri May 03 10:19:58 CEST 2024
user=scram-sha-1\:BYGcIAws2gznU/kpezoSb1VQNVd+YMX9r+9SAINFoZtPHaHTAQ\=\=;scram-sha-256\:BYGcIAwRiWiD+8f7dyQEs1Wsum/64MOcjGJ2UcmZFQB6DZJqwRDJ4NrvII4NttmxlA\=\=;scram-sha-384\:BYGcIAz+Eud65N8GWK4TMwhSCZpeE5EFSdynywdryQj3ZwBEgv+KF8hRUuGxiq3EyRxsby6w7DHK3CICGZLsPrM\=;scram-sha-512\:BYGcIAwWxVY9DHn42kHydivyU3s9LSPmyfPPJkIFYyt/XsMASFHGoy5rzk4ahX4HjpJgb+NjdCwhGfi33CY0azUIrn439s62Yg5mq9i+ISto;digest-md5\:AgR1c2VyB2RlZmF1bHSYYyzPjRDR7MhrsdFSK03P;digest-sha\:AgR1c2VyB2RlZmF1bHTga5gDNnNYh7/2HqhBVOdUHjBzhw\=\=;digest-sha-256\:AgR1c2VyB2RlZmF1bHTig5qZQIxqtJBTUp3EMh5UIFoS4qOhz9Uk5aOW9ZKCfw\=\=;digest-sha-384\:AgR1c2VyB2RlZmF1bHT01pAN/pRMLS5afm4Q9S0kuLlA0NokuP8F0AISTwXCb1E8RMsFHlBVPOa5rC6Nyso\=;digest-sha-512\:AgR1c2VyB2RlZmF1bHTi+cHn1Ez2Ze41CvPXb9eP/7JmRys7m1f5qPMQWhAmDOuuUXNWEG4yKSI9k2EZgQvMKTd5hDbR24ul1BsYP8X5;

@ -0,0 +1,16 @@
version: "3.7"
services:
infinispan:
image: quay.io/infinispan/server:15.0
ports:
- '11222:11222'
- '11232:11232'
- '11242:11242'
deploy:
resources:
limits:
memory: 25Gb
volumes:
- ./conf:/user-config
command: -c /user-config/infinispan.xml

@ -11,9 +11,14 @@ from tests.integration_tests.vectorstores.fake_embeddings import (
fake_texts,
)
"""
cd tests/integration_tests/vectorstores/docker-compose
./infinispan.sh
"""
def _infinispan_setup_noautoconf() -> None:
ispnvs = InfinispanVS(auto_config=False)
def _infinispan_setup_noautoconf(**kwargs: Any) -> None:
ispnvs = InfinispanVS(auto_config=False, **kwargs)
ispnvs.cache_delete()
ispnvs.schema_delete()
proto = """
@ -59,59 +64,90 @@ def _infinispanvs_from_texts(
@pytest.mark.parametrize("autoconfig", [False, True])
@pytest.mark.parametrize(
"conn_opts",
[
{},
{
"user": "user",
"password": "password",
"hosts": ["localhost:11232"],
"schema": "http",
},
{
"user": "user",
"password": "password",
"hosts": ["localhost:11242"],
"schema": "https",
"verify": False,
},
],
)
class TestBasic:
def test_infinispan(self, autoconfig: bool) -> None:
def test_infinispan(self, autoconfig: bool, conn_opts: dict) -> None:
"""Test end to end construction and search."""
if not autoconfig:
_infinispan_setup_noautoconf()
docsearch = _infinispanvs_from_texts(auto_config=autoconfig)
_infinispan_setup_noautoconf(**conn_opts)
docsearch = _infinispanvs_from_texts(auto_config=autoconfig, **conn_opts)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
def test_infinispan_with_metadata(self, autoconfig: bool) -> None:
def test_infinispan_with_auth(self, autoconfig: bool, conn_opts: dict) -> None:
"""Test end to end construction and search."""
if not autoconfig:
_infinispan_setup_noautoconf(**conn_opts)
docsearch = _infinispanvs_from_texts(auto_config=autoconfig, **conn_opts)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
def test_infinispan_with_metadata(self, autoconfig: bool, conn_opts: dict) -> None:
"""Test with metadata"""
if not autoconfig:
_infinispan_setup_noautoconf()
_infinispan_setup_noautoconf(**conn_opts)
meta = []
for _ in range(len(fake_texts)):
meta.append({"label": "test"})
docsearch = _infinispanvs_from_texts(metadatas=meta, auto_config=autoconfig)
docsearch = _infinispanvs_from_texts(
metadatas=meta, auto_config=autoconfig, **conn_opts
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"label": "test"})]
def test_infinispan_with_metadata_with_output_fields(
self, autoconfig: bool
self, autoconfig: bool, conn_opts: dict
) -> None:
"""Test with metadata"""
if not autoconfig:
_infinispan_setup_noautoconf()
_infinispan_setup_noautoconf(**conn_opts)
metadatas = [
{"page": i, "label": "label" + str(i)} for i in range(len(fake_texts))
]
c = {"output_fields": ["label", "page", "text"]}
docsearch = _infinispanvs_from_texts(
metadatas=metadatas, configuration=c, auto_config=autoconfig
metadatas=metadatas, configuration=c, auto_config=autoconfig, **conn_opts
)
output = docsearch.similarity_search("foo", k=1)
assert output == [
Document(page_content="foo", metadata={"label": "label0", "page": 0})
]
def test_infinispanvs_with_id(self, autoconfig: bool) -> None:
def test_infinispanvs_with_id(self, autoconfig: bool, conn_opts: dict) -> None:
"""Test with ids"""
ids = ["id_" + str(i) for i in range(len(fake_texts))]
docsearch = _infinispanvs_from_texts(ids=ids, auto_config=autoconfig)
docsearch = _infinispanvs_from_texts(
ids=ids, auto_config=autoconfig, **conn_opts
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
def test_infinispan_with_score(self, autoconfig: bool) -> None:
def test_infinispan_with_score(self, autoconfig: bool, conn_opts: dict) -> None:
"""Test end to end construction and search with scores and IDs."""
if not autoconfig:
_infinispan_setup_noautoconf()
_infinispan_setup_noautoconf(**conn_opts)
texts = ["foo", "bar", "baz"]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = _infinispanvs_from_texts(
metadatas=metadatas, auto_config=autoconfig
metadatas=metadatas, auto_config=autoconfig, **conn_opts
)
output = docsearch.similarity_search_with_score("foo", k=3)
docs = [o[0] for o in output]
@ -123,14 +159,14 @@ class TestBasic:
]
assert scores[0] >= scores[1] >= scores[2]
def test_infinispan_add_texts(self, autoconfig: bool) -> None:
def test_infinispan_add_texts(self, autoconfig: bool, conn_opts: dict) -> None:
"""Test end to end construction and MRR search."""
if not autoconfig:
_infinispan_setup_noautoconf()
_infinispan_setup_noautoconf(**conn_opts)
texts = ["foo", "bar", "baz"]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = _infinispanvs_from_texts(
metadatas=metadatas, auto_config=autoconfig
metadatas=metadatas, auto_config=autoconfig, **conn_opts
)
docsearch.add_texts(texts, metadatas)
@ -138,19 +174,22 @@ class TestBasic:
output = docsearch.similarity_search("foo", k=10)
assert len(output) == 6
def test_infinispan_no_clear_old(self, autoconfig: bool) -> None:
def test_infinispan_no_clear_old(self, autoconfig: bool, conn_opts: dict) -> None:
"""Test end to end construction and MRR search."""
if not autoconfig:
_infinispan_setup_noautoconf()
_infinispan_setup_noautoconf(**conn_opts)
texts = ["foo", "bar", "baz"]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = _infinispanvs_from_texts(
metadatas=metadatas, auto_config=autoconfig
metadatas=metadatas, auto_config=autoconfig, **conn_opts
)
del docsearch
try:
docsearch = _infinispanvs_from_texts(
metadatas=metadatas, clear_old=False, auto_config=autoconfig
metadatas=metadatas,
clear_old=False,
auto_config=autoconfig,
**conn_opts,
)
except AssertionError:
if autoconfig:

Loading…
Cancel
Save