mirror of https://github.com/hwchase17/langchain
feat: add redisearch vectorstore (#1307)
# Description Add `RediSearch` vectorstore for LangChain RediSearch: [RediSearch quick start](https://redis.io/docs/stack/search/quick_start/) # How to use ``` from langchain.vectorstores.redisearch import RediSearch rds = RediSearch.from_documents(docs, embeddings,redisearch_url="redis://localhost:6379") ```pull/1676/head
parent
e5c1659864
commit
4e13cef05a
@ -0,0 +1,227 @@
|
|||||||
|
"""Wrapper around Redis vector database."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
from typing import Any, Callable, Iterable, List, Mapping, Optional
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from redis.client import Redis as RedisType
|
||||||
|
|
||||||
|
from langchain.docstore.document import Document
|
||||||
|
from langchain.embeddings.base import Embeddings
|
||||||
|
from langchain.utils import get_from_dict_or_env
|
||||||
|
from langchain.vectorstores.base import VectorStore
|
||||||
|
|
||||||
|
|
||||||
|
def _check_redis_module_exist(client: RedisType, module: str) -> bool:
|
||||||
|
return module in [m["name"] for m in client.info().get("modules", {"name": ""})]
|
||||||
|
|
||||||
|
|
||||||
|
class Redis(VectorStore):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
redis_url: str,
|
||||||
|
index_name: str,
|
||||||
|
embedding_function: Callable,
|
||||||
|
**kwargs: Any,
|
||||||
|
):
|
||||||
|
"""Initialize with necessary components."""
|
||||||
|
try:
|
||||||
|
import redis
|
||||||
|
except ImportError:
|
||||||
|
raise ValueError(
|
||||||
|
"Could not import redis python package. "
|
||||||
|
"Please install it with `pip install redis`."
|
||||||
|
)
|
||||||
|
|
||||||
|
self.embedding_function = embedding_function
|
||||||
|
self.index_name = index_name
|
||||||
|
try:
|
||||||
|
redis_client = redis.from_url(redis_url, **kwargs)
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(f"Your redis connected error: {e}")
|
||||||
|
|
||||||
|
# check if redis add redisearch module
|
||||||
|
if not _check_redis_module_exist(redis_client, "search"):
|
||||||
|
raise ValueError(
|
||||||
|
"Could not use redis directly, you need to add search module"
|
||||||
|
"Please refer [RediSearch](https://redis.io/docs/stack/search/quick_start/)" # noqa
|
||||||
|
)
|
||||||
|
|
||||||
|
self.client = redis_client
|
||||||
|
|
||||||
|
def add_texts(
|
||||||
|
self,
|
||||||
|
texts: Iterable[str],
|
||||||
|
metadatas: Optional[List[dict]] = None,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> List[str]:
|
||||||
|
# `prefix`: Maybe in the future we can let the user choose the index_name.
|
||||||
|
prefix = "doc" # prefix for the document keys
|
||||||
|
|
||||||
|
ids = []
|
||||||
|
# Check if index exists
|
||||||
|
for i, text in enumerate(texts):
|
||||||
|
key = f"{prefix}:{uuid.uuid4().hex}"
|
||||||
|
metadata = metadatas[i] if metadatas else {}
|
||||||
|
self.client.hset(
|
||||||
|
key,
|
||||||
|
mapping={
|
||||||
|
"content": text,
|
||||||
|
"content_vector": np.array(
|
||||||
|
self.embedding_function(text), dtype=np.float32
|
||||||
|
).tobytes(),
|
||||||
|
"metadata": json.dumps(metadata),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
ids.append(key)
|
||||||
|
return ids
|
||||||
|
|
||||||
|
def similarity_search(
|
||||||
|
self, query: str, k: int = 4, **kwargs: Any
|
||||||
|
) -> List[Document]:
|
||||||
|
try:
|
||||||
|
from redis.commands.search.query import Query
|
||||||
|
except ImportError:
|
||||||
|
raise ValueError(
|
||||||
|
"Could not import redis python package. "
|
||||||
|
"Please install it with `pip install redis`."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Creates embedding vector from user query
|
||||||
|
embedding = self.embedding_function(query)
|
||||||
|
|
||||||
|
# Prepare the Query
|
||||||
|
return_fields = ["metadata", "content", "vector_score"]
|
||||||
|
vector_field = "content_vector"
|
||||||
|
hybrid_fields = "*"
|
||||||
|
base_query = (
|
||||||
|
f"{hybrid_fields}=>[KNN {k} @{vector_field} $vector AS vector_score]"
|
||||||
|
)
|
||||||
|
redis_query = (
|
||||||
|
Query(base_query)
|
||||||
|
.return_fields(*return_fields)
|
||||||
|
.sort_by("vector_score")
|
||||||
|
.paging(0, k)
|
||||||
|
.dialect(2)
|
||||||
|
)
|
||||||
|
params_dict: Mapping[str, str] = {
|
||||||
|
"vector": np.array(embedding) # type: ignore
|
||||||
|
.astype(dtype=np.float32)
|
||||||
|
.tobytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
# perform vector search
|
||||||
|
results = self.client.ft(self.index_name).search(redis_query, params_dict)
|
||||||
|
|
||||||
|
documents = [
|
||||||
|
Document(page_content=result.content, metadata=json.loads(result.metadata))
|
||||||
|
for result in results.docs
|
||||||
|
]
|
||||||
|
|
||||||
|
return documents
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_texts(
|
||||||
|
cls,
|
||||||
|
texts: List[str],
|
||||||
|
embedding: Embeddings,
|
||||||
|
metadatas: Optional[List[dict]] = None,
|
||||||
|
index_name: Optional[str] = None,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> Redis:
|
||||||
|
"""Construct RediSearch wrapper from raw documents.
|
||||||
|
This is a user-friendly interface that:
|
||||||
|
1. Embeds documents.
|
||||||
|
2. Creates a new index for the embeddings in the RediSearch instance.
|
||||||
|
3. Adds the documents to the newly created RediSearch index.
|
||||||
|
This is intended to be a quick way to get started.
|
||||||
|
Example:
|
||||||
|
.. code-block:: python
|
||||||
|
from langchain import RediSearch
|
||||||
|
from langchain.embeddings import OpenAIEmbeddings
|
||||||
|
embeddings = OpenAIEmbeddings()
|
||||||
|
redisearch = RediSearch.from_texts(
|
||||||
|
texts,
|
||||||
|
embeddings,
|
||||||
|
redis_url="redis://username:password@localhost:6379"
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
redis_url = get_from_dict_or_env(kwargs, "redis_url", "REDIS_URL")
|
||||||
|
try:
|
||||||
|
import redis
|
||||||
|
from redis.commands.search.field import TextField, VectorField
|
||||||
|
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
|
||||||
|
except ImportError:
|
||||||
|
raise ValueError(
|
||||||
|
"Could not import redis python package. "
|
||||||
|
"Please install it with `pip install redis`."
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
# We need to first remove redis_url from kwargs,
|
||||||
|
# otherwise passing it to Redis will result in an error.
|
||||||
|
kwargs.pop("redis_url")
|
||||||
|
client = redis.from_url(url=redis_url, **kwargs)
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(f"Your redis connected error: {e}")
|
||||||
|
|
||||||
|
# check if redis add redisearch module
|
||||||
|
if not _check_redis_module_exist(client, "search"):
|
||||||
|
raise ValueError(
|
||||||
|
"Could not use redis directly, you need to add search module"
|
||||||
|
"Please refer [RediSearch](https://redis.io/docs/stack/search/quick_start/)" # noqa
|
||||||
|
)
|
||||||
|
|
||||||
|
embeddings = embedding.embed_documents(texts)
|
||||||
|
dim = len(embeddings[0])
|
||||||
|
# Constants
|
||||||
|
vector_number = len(embeddings) # initial number of vectors
|
||||||
|
# name of the search index if not given
|
||||||
|
if not index_name:
|
||||||
|
index_name = uuid.uuid4().hex
|
||||||
|
prefix = "doc" # prefix for the document keys
|
||||||
|
distance_metric = (
|
||||||
|
"COSINE" # distance metric for the vectors (ex. COSINE, IP, L2)
|
||||||
|
)
|
||||||
|
content = TextField(name="content")
|
||||||
|
metadata = TextField(name="metadata")
|
||||||
|
content_embedding = VectorField(
|
||||||
|
"content_vector",
|
||||||
|
"FLAT",
|
||||||
|
{
|
||||||
|
"TYPE": "FLOAT32",
|
||||||
|
"DIM": dim,
|
||||||
|
"DISTANCE_METRIC": distance_metric,
|
||||||
|
"INITIAL_CAP": vector_number,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
fields = [content, metadata, content_embedding]
|
||||||
|
|
||||||
|
# Check if index exists
|
||||||
|
try:
|
||||||
|
client.ft(index_name).info()
|
||||||
|
print("Index already exists")
|
||||||
|
except: # noqa
|
||||||
|
# Create Redis Index
|
||||||
|
client.ft(index_name).create_index(
|
||||||
|
fields=fields,
|
||||||
|
definition=IndexDefinition(prefix=[prefix], index_type=IndexType.HASH),
|
||||||
|
)
|
||||||
|
|
||||||
|
pipeline = client.pipeline()
|
||||||
|
for i, text in enumerate(texts):
|
||||||
|
key = f"{prefix}:{str(uuid.uuid4().hex)}"
|
||||||
|
metadata = metadatas[i] if metadatas else {}
|
||||||
|
pipeline.hset(
|
||||||
|
key,
|
||||||
|
mapping={
|
||||||
|
"content": text,
|
||||||
|
"content_vector": np.array(
|
||||||
|
embeddings[i], dtype=np.float32
|
||||||
|
).tobytes(),
|
||||||
|
"metadata": json.dumps(metadata),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
pipeline.execute()
|
||||||
|
return cls(redis_url, index_name, embedding.embed_query)
|
@ -0,0 +1,26 @@
|
|||||||
|
"""Test Redis functionality."""
|
||||||
|
|
||||||
|
from langchain.docstore.document import Document
|
||||||
|
from langchain.vectorstores.redis import Redis
|
||||||
|
from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings
|
||||||
|
|
||||||
|
|
||||||
|
def test_redis() -> None:
|
||||||
|
"""Test end to end construction and search."""
|
||||||
|
texts = ["foo", "bar", "baz"]
|
||||||
|
docsearch = Redis.from_texts(
|
||||||
|
texts, FakeEmbeddings(), redis_url="redis://localhost:6379"
|
||||||
|
)
|
||||||
|
output = docsearch.similarity_search("foo", k=1)
|
||||||
|
assert output == [Document(page_content="foo")]
|
||||||
|
|
||||||
|
|
||||||
|
def test_redis_new_vector() -> None:
|
||||||
|
"""Test adding a new document"""
|
||||||
|
texts = ["foo", "bar", "baz"]
|
||||||
|
docsearch = Redis.from_texts(
|
||||||
|
texts, FakeEmbeddings(), redis_url="redis://localhost:6379"
|
||||||
|
)
|
||||||
|
docsearch.add_texts(["foo"])
|
||||||
|
output = docsearch.similarity_search("foo", k=2)
|
||||||
|
assert output == [Document(page_content="foo"), Document(page_content="foo")]
|
Loading…
Reference in New Issue