@ -7,7 +7,7 @@ import uuid
from typing import Any , Callable , Dict , Iterable , List , Mapping , Optional , Tuple
from typing import Any , Callable , Dict , Iterable , List , Mapping , Optional , Tuple
import numpy as np
import numpy as np
from pydantic import BaseModel , Field, root_validator
from pydantic import BaseModel , root_validator
from redis . client import Redis as RedisType
from redis . client import Redis as RedisType
from langchain . docstore . document import Document
from langchain . docstore . document import Document
@ -19,8 +19,47 @@ from langchain.vectorstores.base import VectorStore
logger = logging . getLogger ( )
logger = logging . getLogger ( )
def _check_redis_module_exist ( client : RedisType , module : str ) - > bool :
# required modules
return module in [ m [ " name " ] for m in client . info ( ) . get ( " modules " , { " name " : " " } ) ]
REDIS_REQUIRED_MODULES = [
{ " name " : " search " , " ver " : 20400 } ,
]
def _check_redis_module_exist ( client : RedisType , modules : List [ dict ] ) - > None :
""" Check if the correct Redis modules are installed. """
installed_modules = client . info ( ) . get ( " modules " , [ ] )
installed_modules = { module [ " name " ] : module for module in installed_modules }
for module in modules :
if module [ " name " ] not in installed_modules or int (
installed_modules [ module [ " name " ] ] [ " ver " ]
) < int ( module [ " ver " ] ) :
error_message = (
" You must add the RediSearch (>= 2.4) module from Redis Stack. "
" Please refer to Redis Stack docs: https://redis.io/docs/stack/ "
)
logging . error ( error_message )
raise ValueError ( error_message )
def _check_index_exists ( client : RedisType , index_name : str ) - > bool :
""" Check if Redis index exists. """
try :
client . ft ( index_name ) . info ( )
except : # noqa: E722
logger . info ( " Index does not exist " )
return False
logger . info ( " Index already exists " )
return True
def _redis_key ( prefix : str ) - > str :
""" Redis key schema for a given prefix. """
return f " { prefix } : { uuid . uuid4 ( ) . hex } "
def _redis_prefix ( index_name : str ) - > str :
""" Redis key prefix for a given index. """
return f " doc: { index_name } "
class Redis ( VectorStore ) :
class Redis ( VectorStore ) :
@ -43,16 +82,12 @@ class Redis(VectorStore):
self . embedding_function = embedding_function
self . embedding_function = embedding_function
self . index_name = index_name
self . index_name = index_name
try :
try :
# connect to redis from url
redis_client = redis . from_url ( redis_url , * * kwargs )
redis_client = redis . from_url ( redis_url , * * kwargs )
# check if redis has redisearch module installed
_check_redis_module_exist ( redis_client , REDIS_REQUIRED_MODULES )
except ValueError as e :
except ValueError as e :
raise ValueError ( f " Your redis connected error: { e } " )
raise ValueError ( f " Redis failed to connect: { e } " )
# check if redis add redisearch module
if not _check_redis_module_exist ( redis_client , " search " ) :
raise ValueError (
" Could not use redis directly, you need to add search module "
" Please refer [RediSearch](https://redis.io/docs/stack/search/quick_start/) " # noqa
)
self . client = redis_client
self . client = redis_client
@ -62,17 +97,17 @@ class Redis(VectorStore):
metadatas : Optional [ List [ dict ] ] = None ,
metadatas : Optional [ List [ dict ] ] = None ,
* * kwargs : Any ,
* * kwargs : Any ,
) - > List [ str ] :
) - > List [ str ] :
# `prefix`: Maybe in the future we can let the user choose the index_name.
""" Add texts data to an existing index. """
prefix = " doc " # prefix for the document keys
prefix = _redis_prefix ( self . index_name )
keys = kwargs . get ( " keys " )
keys = kwargs . get ( " keys " )
ids = [ ]
ids = [ ]
# Check if index exists
# Write data to redis
pipeline = self . client . pipeline ( transaction = False )
for i , text in enumerate ( texts ) :
for i , text in enumerate ( texts ) :
_key = keys [ i ] if keys else self . index_name
# Use provided key otherwise use default key
key = f " { prefix } : { _key } "
key = keys [ i ] if keys else _redis_key ( prefix )
metadata = metadatas [ i ] if metadatas else { }
metadata = metadatas [ i ] if metadatas else { }
self . client . hset (
pipeline . hset (
key ,
key ,
mapping = {
mapping = {
" content " : text ,
" content " : text ,
@ -83,11 +118,22 @@ class Redis(VectorStore):
} ,
} ,
)
)
ids . append ( key )
ids . append ( key )
pipeline . execute ( )
return ids
return ids
def similarity_search (
def similarity_search (
self , query : str , k : int = 4 , * * kwargs : Any
self , query : str , k : int = 4 , * * kwargs : Any
) - > List [ Document ] :
) - > List [ Document ] :
"""
Returns the most similar indexed documents to the query text .
Args :
query ( str ) : The query text for which to find similar documents .
k ( int ) : The number of documents to return . Default is 4.
Returns :
List [ Document ] : A list of documents that are most similar to the query text .
"""
docs_and_scores = self . similarity_search_with_score ( query , k = k )
docs_and_scores = self . similarity_search_with_score ( query , k = k )
return [ doc for doc , _ in docs_and_scores ]
return [ doc for doc , _ in docs_and_scores ]
@ -95,7 +141,8 @@ class Redis(VectorStore):
self , query : str , k : int = 4 , score_threshold : float = 0.2 , * * kwargs : Any
self , query : str , k : int = 4 , score_threshold : float = 0.2 , * * kwargs : Any
) - > List [ Document ] :
) - > List [ Document ] :
"""
"""
Returns the most similar indexed documents to the query text .
Returns the most similar indexed documents to the query text within the
score_threshold range .
Args :
Args :
query ( str ) : The query text for which to find similar documents .
query ( str ) : The query text for which to find similar documents .
@ -217,55 +264,49 @@ class Redis(VectorStore):
# otherwise passing it to Redis will result in an error.
# otherwise passing it to Redis will result in an error.
kwargs . pop ( " redis_url " )
kwargs . pop ( " redis_url " )
client = redis . from_url ( url = redis_url , * * kwargs )
client = redis . from_url ( url = redis_url , * * kwargs )
# check if redis has redisearch module installed
_check_redis_module_exist ( client , REDIS_REQUIRED_MODULES )
except ValueError as e :
except ValueError as e :
raise ValueError ( f " Your redis connected error: { e } " )
raise ValueError ( f " Redis failed to connect: { e } " )
# check if redis add redisearch module
if not _check_redis_module_exist ( client , " search " ) :
raise ValueError (
" Could not use redis directly, you need to add search module "
" Please refer [RediSearch](https://redis.io/docs/stack/search/quick_start/) " # noqa
)
# Create embeddings over documents
embeddings = embedding . embed_documents ( texts )
embeddings = embedding . embed_documents ( texts )
dim = len ( embeddings [ 0 ] )
# Constants
# Name of the search index if not given
vector_number = len ( embeddings ) # initial number of vectors
# name of the search index if not given
if not index_name :
if not index_name :
index_name = uuid . uuid4 ( ) . hex
index_name = uuid . uuid4 ( ) . hex
prefix = f " doc: { index_name } " # prefix for the document keys
prefix = _redis_prefix ( index_name ) # prefix for the document keys
distance_metric = (
" COSINE " # distance metric for the vectors (ex. COSINE, IP, L2)
)
content = TextField ( name = " content " )
metadata = TextField ( name = " metadata " )
content_embedding = VectorField (
" content_vector " ,
" FLAT " ,
{
" TYPE " : " FLOAT32 " ,
" DIM " : dim ,
" DISTANCE_METRIC " : distance_metric ,
" INITIAL_CAP " : vector_number ,
} ,
)
fields = [ content , metadata , content_embedding ]
# Check if index exists
# Check if index exists
try :
if not _check_index_exists ( client , index_name ) :
client . ft ( index_name ) . info ( )
# Constants
logger . info ( " Index already exists " )
dim = len ( embeddings [ 0 ] )
except : # noqa
distance_metric = (
" COSINE " # distance metric for the vectors (ex. COSINE, IP, L2)
)
schema = (
TextField ( name = " content " ) ,
TextField ( name = " metadata " ) ,
VectorField (
" content_vector " ,
" FLAT " ,
{
" TYPE " : " FLOAT32 " ,
" DIM " : dim ,
" DISTANCE_METRIC " : distance_metric ,
} ,
) ,
)
# Create Redis Index
# Create Redis Index
client . ft ( index_name ) . create_index (
client . ft ( index_name ) . create_index (
fields = fields ,
fields = schema ,
definition = IndexDefinition ( prefix = [ prefix ] , index_type = IndexType . HASH ) ,
definition = IndexDefinition ( prefix = [ prefix ] , index_type = IndexType . HASH ) ,
)
)
pipeline = client . pipeline ( )
# Write data to Redis
pipeline = client . pipeline ( transaction = False )
for i , text in enumerate ( texts ) :
for i , text in enumerate ( texts ) :
key = f " { prefix } : { i } "
key = _redis_key ( prefix )
metadata = metadatas [ i ] if metadatas else { }
metadata = metadatas [ i ] if metadatas else { }
pipeline . hset (
pipeline . hset (
key ,
key ,
@ -286,6 +327,16 @@ class Redis(VectorStore):
delete_documents : bool ,
delete_documents : bool ,
* * kwargs : Any ,
* * kwargs : Any ,
) - > bool :
) - > bool :
"""
Drop a Redis search index .
Args :
index_name ( str ) : Name of the index to drop .
delete_documents ( bool ) : Whether to drop the associated documents .
Returns :
bool : Whether or not the drop was successful .
"""
redis_url = get_from_dict_or_env ( kwargs , " redis_url " , " REDIS_URL " )
redis_url = get_from_dict_or_env ( kwargs , " redis_url " , " REDIS_URL " )
try :
try :
import redis
import redis
@ -306,7 +357,7 @@ class Redis(VectorStore):
client . ft ( index_name ) . dropindex ( delete_documents )
client . ft ( index_name ) . dropindex ( delete_documents )
logger . info ( " Drop index " )
logger . info ( " Drop index " )
return True
return True
except : # noqa
except : # noqa : E722
# Index not exist
# Index not exist
return False
return False
@ -317,6 +368,7 @@ class Redis(VectorStore):
index_name : str ,
index_name : str ,
* * kwargs : Any ,
* * kwargs : Any ,
) - > Redis :
) - > Redis :
""" Connect to an existing Redis index. """
redis_url = get_from_dict_or_env ( kwargs , " redis_url " , " REDIS_URL " )
redis_url = get_from_dict_or_env ( kwargs , " redis_url " , " REDIS_URL " )
try :
try :
import redis
import redis
@ -330,15 +382,14 @@ class Redis(VectorStore):
# otherwise passing it to Redis will result in an error.
# otherwise passing it to Redis will result in an error.
kwargs . pop ( " redis_url " )
kwargs . pop ( " redis_url " )
client = redis . from_url ( url = redis_url , * * kwargs )
client = redis . from_url ( url = redis_url , * * kwargs )
except ValueError as e :
# check if redis has redisearch module installed
raise ValueError ( f " Your redis connected error: { e } " )
_check_redis_module_exist ( client , REDIS_REQUIRED_MODULES )
# ensure that the index already exists
# check if redis add redisearch module
assert _check_index_exists (
if not _check_redis_module_exist ( client , " search " ) :
client , index_name
raise ValueError (
) , f " Index { index_name } does not exist "
" Could not use redis directly, you need to add search module "
except Exception as e :
" Please refer [RediSearch](https://redis.io/docs/stack/search/quick_start/) " # noqa
raise ValueError ( f " Redis failed to connect: { e } " )
)
return cls ( redis_url , index_name , embedding . embed_query )
return cls ( redis_url , index_name , embedding . embed_query )
@ -349,7 +400,8 @@ class Redis(VectorStore):
class RedisVectorStoreRetriever ( BaseRetriever , BaseModel ) :
class RedisVectorStoreRetriever ( BaseRetriever , BaseModel ) :
vectorstore : Redis
vectorstore : Redis
search_type : str = " similarity "
search_type : str = " similarity "
search_kwargs : dict = Field ( default_factory = dict )
k : int = 4
score_threshold : float = 0.4
class Config :
class Config :
""" Configuration for this pydantic object. """
""" Configuration for this pydantic object. """
@ -367,10 +419,10 @@ class RedisVectorStoreRetriever(BaseRetriever, BaseModel):
def get_relevant_documents ( self , query : str ) - > List [ Document ] :
def get_relevant_documents ( self , query : str ) - > List [ Document ] :
if self . search_type == " similarity " :
if self . search_type == " similarity " :
docs = self . vectorstore . similarity_search ( query , * * self . search_kwargs )
docs = self . vectorstore . similarity_search ( query , k = self . k )
elif self . search_type == " similarity_limit " :
elif self . search_type == " similarity_limit " :
docs = self . vectorstore . similarity_search_limit_score (
docs = self . vectorstore . similarity_search_limit_score (
query , * * self . search_kwargs
query , k = self . k , score_threshold = self . score_threshold
)
)
else :
else :
raise ValueError ( f " search_type of { self . search_type } not allowed. " )
raise ValueError ( f " search_type of { self . search_type } not allowed. " )