Upstash redis integration (#10871)

- **Description:** Introduced Upstash provider with following wrappers:
UpstashRedisCache, UpstashRedisEntityStore,
UpstashRedisChatMessageHistory, UpstashRedisStore
  - **Issue:** -,
  - **Dependencies:** upstash-redis python package is needed,
  - **Tag maintainer:** @baskaryan 
  - **Twitter handle:** @BurakY744

---------

Co-authored-by: Burak Yılmaz <burakyilmaz@Buraks-MacBook-Pro.local>
Co-authored-by: Bagatur <baskaryan@gmail.com>
pull/11748/head
Burak Yılmaz 12 months ago committed by GitHub
parent a9db2b0b92
commit 63e516c2b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,7 +12,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 3,
"id": "10ad9224",
"metadata": {},
"outputs": [],
@ -37,7 +37,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 4,
"id": "426ff912",
"metadata": {},
"outputs": [],
@ -207,6 +207,101 @@
"llm(\"Tell me a joke\")"
]
},
{
"cell_type": "markdown",
"id": "e71273ab",
"metadata": {},
"source": [
"## `Upstash Redis` Cache"
]
},
{
"cell_type": "markdown",
"id": "f10dabef",
"metadata": {},
"source": [
"### Standard Cache\n",
"Use [Upstash Redis](https://upstash.com) to cache prompts and responses with a serverless HTTP API."
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "f3920f25",
"metadata": {},
"outputs": [],
"source": [
"from upstash_redis import Redis\n",
"from langchain.cache import UpstashRedisCache\n",
"\n",
"URL = \"<UPSTASH_REDIS_REST_URL>\"\n",
"TOKEN = \"<UPSTASH_REDIS_REST_TOKEN>\"\n",
"\n",
"langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN))"
]
},
{
"cell_type": "code",
"execution_count": 39,
"id": "3bf7d959",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 7.56 ms, sys: 2.98 ms, total: 10.5 ms\n",
"Wall time: 1.14 s\n"
]
},
{
"data": {
"text/plain": [
"'\\n\\nWhy did the chicken cross the road?\\n\\nTo get to the other side!'"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"# The first time, it is not yet in cache, so it should take longer\n",
"llm(\"Tell me a joke\")"
]
},
{
"cell_type": "code",
"execution_count": 50,
"id": "00fc3a34",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.78 ms, sys: 1.95 ms, total: 4.73 ms\n",
"Wall time: 82.9 ms\n"
]
},
{
"data": {
"text/plain": [
"'\\n\\nTwo guys stole a calendar. They got six months each.'"
]
},
"execution_count": 50,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"# The first time, it is not yet in cache, so it should take longer\n",
"llm(\"Tell me a joke\")"
]
},
{
"cell_type": "markdown",
"id": "278ad7ae",
@ -229,7 +324,7 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": 9,
"id": "39f6eb0b",
"metadata": {},
"outputs": [],
@ -244,7 +339,7 @@
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": null,
"id": "28920749",
"metadata": {},
"outputs": [
@ -440,7 +535,7 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": null,
"id": "9e4ecfd1",
"metadata": {},
"outputs": [

@ -0,0 +1,61 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Upstash Redis Chat Message History\n",
"\n",
"This notebook goes over how to use Upstash Redis to store chat message history."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.memory.chat_message_histories.upstash_redis import UpstashRedisChatMessageHistory\n",
"\n",
"URL = \"<UPSTASH_REDIS_REST_URL>\"\n",
"TOKEN = \"<UPSTASH_REDIS_REST_TOKEN>\"\n",
"\n",
"history = UpstashRedisChatMessageHistory(url=URL, token=TOKEN, ttl=10, session_id=\"my-test-session\")\n",
"\n",
"history.add_user_message(\"hello llm!\")\n",
"history.add_ai_message(\"hello user!\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"history.messages"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.3"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -0,0 +1,42 @@
# Upstash Redis
Upstash offers developers serverless databases and messaging platforms to build powerful applications without having to worry about the operational complexity of running databases at scale.
This page covers how to use [Upstash Redis](https://upstash.com/redis) with LangChain.
## Installation and Setup
- Upstash Redis Python SDK can be installed with `pip install upstash-redis`
- A globally distributed, low-latency and highly available database can be created at the [Upstash Console](https://console.upstash.com)
## Integrations
All of Upstash-LangChain integrations are based on `upstash-redis` Python SDK being utilized as wrappers for LangChain.
This SDK utilizes Upstash Redis DB by giving UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN parameters from the console.
One significant advantage of this is that, this SDK uses a REST API. This means, you can run this in serverless platforms, edge or any platform that does not support TCP connections.
### Cache
[Upstash Redis](https://upstash.com/redis) can be used as a cache for LLM prompts and responses.
To import this cache:
```python
from langchain.cache import UpstashRedisCache
```
To use with your LLMs:
```python
import langchain
from upstash_redis import Redis
URL = "<UPSTASH_REDIS_REST_URL>"
TOKEN = "<UPSTASH_REDIS_REST_TOKEN>"
langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN))
```
### Memory
Upstash Redis can be used to persist LLM conversations.
#### Chat Message History Memory
An example of Upstash Redis for caching conversation message history can be seen in [this notebook](/docs/integrations/memory/upstash_redis_chat_message_history.html).

@ -23,14 +23,14 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 7,
"id": "a463c3c2-749b-40d1-a433-84f68a1cd1c7",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from langchain.storage import InMemoryStore, LocalFileStore, RedisStore\n",
"from langchain.storage import InMemoryStore, LocalFileStore, RedisStore, UpstashRedisStore\n",
"from langchain.embeddings import OpenAIEmbeddings, CacheBackedEmbeddings"
]
},
@ -46,7 +46,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"id": "9e4314d8-88ef-4f52-81ae-0be771168bb6",
"metadata": {},
"outputs": [],
@ -59,7 +59,7 @@
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": null,
"id": "3e751f26-9b5b-4c10-843a-d784b5ea8538",
"metadata": {},
"outputs": [],
@ -69,7 +69,7 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": null,
"id": "30743664-38f5-425d-8216-772b64e7f348",
"metadata": {},
"outputs": [],
@ -91,7 +91,7 @@
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": null,
"id": "f9ad627f-ced2-4277-b336-2434f22f2c8a",
"metadata": {},
"outputs": [
@ -120,7 +120,7 @@
},
{
"cell_type": "code",
"execution_count": 10,
"execution_count": null,
"id": "cf958ac2-e60e-4668-b32c-8bb2d78b3c61",
"metadata": {},
"outputs": [],
@ -140,7 +140,7 @@
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": null,
"id": "3a1d7bb8-3b72-4bb5-9013-cf7729caca61",
"metadata": {},
"outputs": [
@ -168,7 +168,7 @@
},
{
"cell_type": "code",
"execution_count": 12,
"execution_count": null,
"id": "714cb2e2-77ba-41a8-bb83-84e75342af2d",
"metadata": {},
"outputs": [
@ -196,7 +196,7 @@
},
{
"cell_type": "code",
"execution_count": 13,
"execution_count": null,
"id": "f2ca32dd-3712-4093-942b-4122f3dc8a8e",
"metadata": {},
"outputs": [
@ -232,7 +232,7 @@
},
{
"cell_type": "code",
"execution_count": 14,
"execution_count": null,
"id": "13bd1c5b-b7ba-4394-957c-7d5b5a841972",
"metadata": {
"tags": []
@ -244,7 +244,7 @@
},
{
"cell_type": "code",
"execution_count": 15,
"execution_count": null,
"id": "9d99885f-99e1-498c-904d-6db539ac9466",
"metadata": {
"tags": []
@ -259,7 +259,7 @@
},
{
"cell_type": "code",
"execution_count": 16,
"execution_count": null,
"id": "682eb5d4-0b7a-4dac-b8fb-3de4ca6e421c",
"metadata": {
"tags": []
@ -289,7 +289,7 @@
},
{
"cell_type": "code",
"execution_count": 17,
"execution_count": null,
"id": "f819c3ff-a212-4d06-a5f7-5eb1435c1feb",
"metadata": {
"tags": []
@ -311,7 +311,7 @@
},
{
"cell_type": "code",
"execution_count": 18,
"execution_count": null,
"id": "ec38fb72-90a9-4687-a483-c62c87d1f4dd",
"metadata": {
"tags": []
@ -344,7 +344,7 @@
},
{
"cell_type": "code",
"execution_count": 19,
"execution_count": null,
"id": "a0070271-0809-4528-97e0-2a88216846f3",
"metadata": {
"tags": []
@ -356,7 +356,7 @@
},
{
"cell_type": "code",
"execution_count": 20,
"execution_count": null,
"id": "0b20e9fe-f57f-4d7c-9f81-105c5f8726f4",
"metadata": {
"tags": []
@ -370,7 +370,7 @@
},
{
"cell_type": "code",
"execution_count": 21,
"execution_count": null,
"id": "630515fd-bf5c-4d9c-a404-9705308f3a2c",
"metadata": {
"tags": []
@ -392,7 +392,7 @@
},
{
"cell_type": "code",
"execution_count": 22,
"execution_count": null,
"id": "30e6bb87-42c9-4d08-88ac-0d22c9c449a1",
"metadata": {
"tags": []
@ -424,7 +424,7 @@
},
{
"cell_type": "code",
"execution_count": 23,
"execution_count": null,
"id": "658e2914-05e9-44a3-a8fe-3fe17ca84039",
"metadata": {},
"outputs": [
@ -444,6 +444,86 @@
"list(fs.yield_keys())"
]
},
{
"cell_type": "markdown",
"id": "904c1d47",
"metadata": {},
"source": [
"## Upstash Redis Store"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d0f9f212",
"metadata": {},
"outputs": [],
"source": [
"from langchain.storage.upstash_redis import UpstashRedisStore"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "45bf62e4",
"metadata": {},
"outputs": [],
"source": [
"from upstash_redis import Redis\n",
"URL = \"<UPSTASH_REDIS_REST_URL>\"\n",
"TOKEN = \"<UPSTASH_REDIS_REST_TOKEN>\"\n",
"\n",
"redis_client = Redis(url=URL, token=TOKEN)\n",
"store = UpstashRedisStore(client=redis_client, ttl=None, namespace=\"test-ns\")\n",
"\n",
"underlying_embeddings = OpenAIEmbeddings()\n",
"embedder = CacheBackedEmbeddings.from_bytes_store(\n",
" underlying_embeddings, store, namespace=underlying_embeddings.model\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3eac3504",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"embeddings = embedder.embed_documents([\"welcome\", \"goodbye\"])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "085dcd30",
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"embeddings = embedder.embed_documents([\"welcome\", \"goodbye\"])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3570e83f",
"metadata": {},
"outputs": [],
"source": [
"list(store.yield_keys())"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d7dc8e51",
"metadata": {},
"outputs": [],
"source": [
"list(store.client.scan(0))"
]
},
{
"cell_type": "markdown",
"id": "cd5f5a96-6ffa-429d-aa82-00b3f6532871",
@ -455,7 +535,7 @@
},
{
"cell_type": "code",
"execution_count": 24,
"execution_count": null,
"id": "4879c134-141f-48a0-acfe-7d6f30253af0",
"metadata": {},
"outputs": [],
@ -465,7 +545,7 @@
},
{
"cell_type": "code",
"execution_count": 25,
"execution_count": null,
"id": "8b2bb9a0-6549-4487-8532-29ab4ab7336f",
"metadata": {},
"outputs": [],
@ -482,7 +562,7 @@
},
{
"cell_type": "code",
"execution_count": 26,
"execution_count": null,
"id": "eca3cb99-2bb3-49d5-81f9-1dee03da4b8c",
"metadata": {},
"outputs": [
@ -502,7 +582,7 @@
},
{
"cell_type": "code",
"execution_count": 27,
"execution_count": null,
"id": "317ba5d8-89f9-462c-b807-ad4ef26e518b",
"metadata": {},
"outputs": [
@ -522,7 +602,7 @@
},
{
"cell_type": "code",
"execution_count": 16,
"execution_count": null,
"id": "8a540317-5142-4491-9062-a097932b56e3",
"metadata": {},
"outputs": [
@ -544,7 +624,7 @@
},
{
"cell_type": "code",
"execution_count": 17,
"execution_count": null,
"id": "cd9b0d4a-f816-4dce-9dde-cde1ad9a65fb",
"metadata": {},
"outputs": [
@ -581,7 +661,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.1"
"version": "3.11.3"
}
},
"nbformat": 4,

@ -26,6 +26,7 @@ import inspect
import json
import logging
import uuid
import warnings
from datetime import timedelta
from functools import lru_cache
from typing import (
@ -53,7 +54,7 @@ except ImportError:
from langchain.llms.base import LLM, get_prompts
from langchain.load.dump import dumps
from langchain.load.load import loads
from langchain.schema import Generation
from langchain.schema import ChatGeneration, Generation
from langchain.schema.cache import RETURN_VAL_TYPE, BaseCache
from langchain.schema.embeddings import Embeddings
from langchain.utils import get_from_env
@ -260,6 +261,92 @@ class SQLiteCache(SQLAlchemyCache):
super().__init__(engine)
class UpstashRedisCache(BaseCache):
"""Cache that uses Upstash Redis as a backend."""
def __init__(self, redis_: Any, *, ttl: Optional[int] = None):
"""
Initialize an instance of UpstashRedisCache.
This method initializes an object with Upstash Redis caching capabilities.
It takes a `redis_` parameter, which should be an instance of an Upstash Redis
client class, allowing the object to interact with Upstash Redis
server for caching purposes.
Parameters:
redis_: An instance of Upstash Redis client class
(e.g., Redis) used for caching.
This allows the object to communicate with
Redis server for caching operations on.
ttl (int, optional): Time-to-live (TTL) for cached items in seconds.
If provided, it sets the time duration for how long cached
items will remain valid. If not provided, cached items will not
have an automatic expiration.
"""
try:
from upstash_redis import Redis
except ImportError:
raise ValueError(
"Could not import upstash_redis python package. "
"Please install it with `pip install upstash_redis`."
)
if not isinstance(redis_, Redis):
raise ValueError("Please pass in Upstash Redis object.")
self.redis = redis_
self.ttl = ttl
def _key(self, prompt: str, llm_string: str) -> str:
"""Compute key from prompt and llm_string"""
return _hash(prompt + llm_string)
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up based on prompt and llm_string."""
generations = []
# Read from a HASH
results = self.redis.hgetall(self._key(prompt, llm_string))
if results:
for _, text in results.items():
generations.append(Generation(text=text))
return generations if generations else None
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update cache based on prompt and llm_string."""
for gen in return_val:
if not isinstance(gen, Generation):
raise ValueError(
"UpstashRedisCache supports caching of normal LLM generations, "
f"got {type(gen)}"
)
if isinstance(gen, ChatGeneration):
warnings.warn(
"NOTE: Generation has not been cached. UpstashRedisCache does not"
" support caching ChatModel outputs."
)
return
# Write to a HASH
key = self._key(prompt, llm_string)
mapping = {
str(idx): generation.text for idx, generation in enumerate(return_val)
}
self.redis.hset(key=key, values=mapping)
if self.ttl is not None:
self.redis.expire(key, self.ttl)
def clear(self, **kwargs: Any) -> None:
"""
Clear cache. If `asynchronous` is True, flush asynchronously.
This flushes the *whole* db.
"""
asynchronous = kwargs.get("asynchronous", False)
if asynchronous:
asynchronous = "ASYNC"
else:
asynchronous = "SYNC"
self.redis.flushdb(flush_type=asynchronous)
class RedisCache(BaseCache):
"""Cache that uses Redis as a backend."""

@ -44,6 +44,7 @@ from langchain.memory.chat_message_histories import (
RedisChatMessageHistory,
SQLChatMessageHistory,
StreamlitChatMessageHistory,
UpstashRedisChatMessageHistory,
XataChatMessageHistory,
ZepChatMessageHistory,
)
@ -53,6 +54,7 @@ from langchain.memory.entity import (
InMemoryEntityStore,
RedisEntityStore,
SQLiteEntityStore,
UpstashRedisEntityStore,
)
from langchain.memory.kg import ConversationKGMemory
from langchain.memory.motorhead_memory import MotorheadMemory
@ -96,4 +98,6 @@ __all__ = [
"XataChatMessageHistory",
"ZepChatMessageHistory",
"ZepMemory",
"UpstashRedisEntityStore",
"UpstashRedisChatMessageHistory",
]

@ -20,6 +20,9 @@ from langchain.memory.chat_message_histories.sql import SQLChatMessageHistory
from langchain.memory.chat_message_histories.streamlit import (
StreamlitChatMessageHistory,
)
from langchain.memory.chat_message_histories.upstash_redis import (
UpstashRedisChatMessageHistory,
)
from langchain.memory.chat_message_histories.xata import XataChatMessageHistory
from langchain.memory.chat_message_histories.zep import ZepChatMessageHistory
@ -40,4 +43,5 @@ __all__ = [
"StreamlitChatMessageHistory",
"XataChatMessageHistory",
"ZepChatMessageHistory",
"UpstashRedisChatMessageHistory",
]

@ -0,0 +1,67 @@
import json
import logging
from typing import List, Optional
from langchain.schema import (
BaseChatMessageHistory,
)
from langchain.schema.messages import BaseMessage, _message_to_dict, messages_from_dict
logger = logging.getLogger(__name__)
class UpstashRedisChatMessageHistory(BaseChatMessageHistory):
"""Chat message history stored in an Upstash Redis database."""
def __init__(
self,
session_id: str,
url: str = "",
token: str = "",
key_prefix: str = "message_store:",
ttl: Optional[int] = None,
):
try:
from upstash_redis import Redis
except ImportError:
raise ImportError(
"Could not import upstash redis python package. "
"Please install it with `pip install upstash_redis`."
)
if url == "" or token == "":
raise ValueError(
"UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN are needed."
)
try:
self.redis_client = Redis(url=url, token=token)
except Exception:
logger.error("Upstash Redis instance could not be initiated.")
self.session_id = session_id
self.key_prefix = key_prefix
self.ttl = ttl
@property
def key(self) -> str:
"""Construct the record key to use"""
return self.key_prefix + self.session_id
@property
def messages(self) -> List[BaseMessage]: # type: ignore
"""Retrieve the messages from Upstash Redis"""
_items = self.redis_client.lrange(self.key, 0, -1)
items = [json.loads(m) for m in _items[::-1]]
messages = messages_from_dict(items)
return messages
def add_message(self, message: BaseMessage) -> None:
"""Append the message to the record in Upstash Redis"""
self.redis_client.lpush(self.key, json.dumps(_message_to_dict(message)))
if self.ttl:
self.redis_client.expire(self.key, self.ttl)
def clear(self) -> None:
"""Clear session memory from Upstash Redis"""
self.redis_client.delete(self.key)

@ -69,6 +69,84 @@ class InMemoryEntityStore(BaseEntityStore):
return self.store.clear()
class UpstashRedisEntityStore(BaseEntityStore):
"""Upstash Redis backed Entity store.
Entities get a TTL of 1 day by default, and
that TTL is extended by 3 days every time the entity is read back.
"""
def __init__(
self,
session_id: str = "default",
url: str = "",
token: str = "",
key_prefix: str = "memory_store",
ttl: Optional[int] = 60 * 60 * 24,
recall_ttl: Optional[int] = 60 * 60 * 24 * 3,
*args: Any,
**kwargs: Any,
):
try:
from upstash_redis import Redis
except ImportError:
raise ImportError(
"Could not import upstash_redis python package. "
"Please install it with `pip install upstash_redis`."
)
super().__init__(*args, **kwargs)
try:
self.redis_client = Redis(url=url, token=token)
except Exception:
logger.error("Upstash Redis instance could not be initiated.")
self.session_id = session_id
self.key_prefix = key_prefix
self.ttl = ttl
self.recall_ttl = recall_ttl or ttl
@property
def full_key_prefix(self) -> str:
return f"{self.key_prefix}:{self.session_id}"
def get(self, key: str, default: Optional[str] = None) -> Optional[str]:
res = (
self.redis_client.getex(f"{self.full_key_prefix}:{key}", ex=self.recall_ttl)
or default
or ""
)
logger.debug(f"Upstash Redis MEM get '{self.full_key_prefix}:{key}': '{res}'")
return res
def set(self, key: str, value: Optional[str]) -> None:
if not value:
return self.delete(key)
self.redis_client.set(f"{self.full_key_prefix}:{key}", value, ex=self.ttl)
logger.debug(
f"Redis MEM set '{self.full_key_prefix}:{key}': '{value}' EX {self.ttl}"
)
def delete(self, key: str) -> None:
self.redis_client.delete(f"{self.full_key_prefix}:{key}")
def exists(self, key: str) -> bool:
return self.redis_client.exists(f"{self.full_key_prefix}:{key}") == 1
def clear(self) -> None:
def scan_and_delete(cursor: int) -> int:
cursor, keys_to_delete = self.redis_client.scan(
cursor, f"{self.full_key_prefix}:*"
)
self.redis_client.delete(*keys_to_delete)
return cursor
cursor = scan_and_delete(0)
while cursor != 0:
scan_and_delete(cursor)
class RedisEntityStore(BaseEntityStore):
"""Redis-backed Entity store.

@ -11,6 +11,7 @@ from langchain.storage.encoder_backed import EncoderBackedStore
from langchain.storage.file_system import LocalFileStore
from langchain.storage.in_memory import InMemoryStore
from langchain.storage.redis import RedisStore
from langchain.storage.upstash_redis import UpstashRedisStore
__all__ = [
"EncoderBackedStore",
@ -19,4 +20,5 @@ __all__ = [
"RedisStore",
"create_lc_store",
"create_kv_docstore",
"UpstashRedisStore",
]

@ -0,0 +1,119 @@
from typing import Any, Iterator, List, Optional, Sequence, Tuple, cast
from langchain.schema import BaseStore
class UpstashRedisStore(BaseStore[str, str]):
"""BaseStore implementation using Upstash Redis as the underlying store."""
def __init__(
self,
*,
client: Any = None,
url: Optional[str] = None,
token: Optional[str] = None,
ttl: Optional[int] = None,
namespace: Optional[str] = None,
) -> None:
"""Initialize the UpstashRedisStore with HTTP API.
Must provide either an Upstash Redis client or a url.
Args:
client: An Upstash Redis instance
url: UPSTASH_REDIS_REST_URL
token: UPSTASH_REDIS_REST_TOKEN
ttl: time to expire keys in seconds if provided,
if None keys will never expire
namespace: if provided, all keys will be prefixed with this namespace
"""
try:
from upstash_redis import Redis
except ImportError as e:
raise ImportError(
"UpstashRedisStore requires the upstash_redis library to be installed. "
"pip install upstash_redis"
) from e
if client and url:
raise ValueError(
"Either an Upstash Redis client or a url must be provided, not both."
)
if client:
if not isinstance(client, Redis):
raise TypeError(
f"Expected Upstash Redis client, got {type(client).__name__}."
)
_client = client
else:
if not url or not token:
raise ValueError(
"Either an Upstash Redis client or url and token must be provided."
)
_client = Redis(url=url, token=token)
self.client = _client
if not isinstance(ttl, int) and ttl is not None:
raise TypeError(f"Expected int or None, got {type(ttl)} instead.")
self.ttl = ttl
self.namespace = namespace
def _get_prefixed_key(self, key: str) -> str:
"""Get the key with the namespace prefix.
Args:
key (str): The original key.
Returns:
str: The key with the namespace prefix.
"""
delimiter = "/"
if self.namespace:
return f"{self.namespace}{delimiter}{key}"
return key
def mget(self, keys: Sequence[str]) -> List[Optional[str]]:
"""Get the values associated with the given keys."""
keys = [self._get_prefixed_key(key) for key in keys]
return cast(
List[Optional[str]],
self.client.mget(*keys),
)
def mset(self, key_value_pairs: Sequence[Tuple[str, str]]) -> None:
"""Set the given key-value pairs."""
for key, value in key_value_pairs:
self.client.set(self._get_prefixed_key(key), value, ex=self.ttl)
def mdelete(self, keys: Sequence[str]) -> None:
"""Delete the given keys."""
_keys = [self._get_prefixed_key(key) for key in keys]
self.client.delete(*_keys)
def yield_keys(self, *, prefix: Optional[str] = None) -> Iterator[str]:
"""Yield keys in the store."""
if prefix:
pattern = self._get_prefixed_key(prefix)
else:
pattern = self._get_prefixed_key("*")
cursor, keys = self.client.scan(0, match=pattern)
for key in keys:
if self.namespace:
relative_key = key[len(self.namespace) + 1 :]
yield relative_key
else:
yield key
while cursor != 0:
cursor, keys = self.client.scan(cursor, match=pattern)
for key in keys:
if self.namespace:
relative_key = key[len(self.namespace) + 1 :]
yield relative_key
else:
yield key

@ -10131,6 +10131,21 @@ tzdata = {version = "*", markers = "platform_system == \"Windows\""}
[package.extras]
devenv = ["black", "check-manifest", "flake8", "pyroma", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"]
[[package]]
name = "upstash-redis"
version = "0.15.0"
description = "Serverless Redis SDK from Upstash"
optional = true
python-versions = ">=3.8,<4.0"
files = [
{file = "upstash_redis-0.15.0-py3-none-any.whl", hash = "sha256:4a89913cb2bb2422610bc2a9c8d6b9a9d75d0674c22c5ea8037d35d343ee5846"},
{file = "upstash_redis-0.15.0.tar.gz", hash = "sha256:910f6a567142167b742c38efecfabf23f47e24fcbddb00a6b5845cb11064c3af"},
]
[package.dependencies]
aiohttp = ">=3.8.4,<4.0.0"
requests = ">=2.31.0,<3.0.0"
[[package]]
name = "uri-template"
version = "1.3.0"
@ -10883,7 +10898,7 @@ cli = ["typer"]
cohere = ["cohere"]
docarray = ["docarray"]
embeddings = ["sentence-transformers"]
extended-testing = ["aiosqlite", "amazon-textract-caller", "anthropic", "arxiv", "assemblyai", "atlassian-python-api", "beautifulsoup4", "bibtexparser", "cassio", "chardet", "dashvector", "esprima", "faiss-cpu", "feedparser", "geopandas", "gitpython", "gql", "html2text", "jinja2", "jq", "lxml", "markdownify", "motor", "mwparserfromhell", "mwxml", "newspaper3k", "numexpr", "openai", "openai", "openapi-schema-pydantic", "pandas", "pdfminer-six", "pgvector", "psychicapi", "py-trello", "pymupdf", "pypdf", "pypdfium2", "pyspark", "rank-bm25", "rapidfuzz", "rapidocr-onnxruntime", "requests-toolbelt", "rspace_client", "scikit-learn", "sqlite-vss", "streamlit", "sympy", "telethon", "timescale-vector", "tqdm", "xata", "xmltodict"]
extended-testing = ["aiosqlite", "amazon-textract-caller", "anthropic", "arxiv", "assemblyai", "atlassian-python-api", "beautifulsoup4", "bibtexparser", "cassio", "chardet", "dashvector", "esprima", "faiss-cpu", "feedparser", "geopandas", "gitpython", "gql", "html2text", "jinja2", "jq", "lxml", "markdownify", "motor", "mwparserfromhell", "mwxml", "newspaper3k", "numexpr", "openai", "openai", "openapi-schema-pydantic", "pandas", "pdfminer-six", "pgvector", "psychicapi", "py-trello", "pymupdf", "pypdf", "pypdfium2", "pyspark", "rank-bm25", "rapidfuzz", "rapidocr-onnxruntime", "requests-toolbelt", "rspace_client", "scikit-learn", "sqlite-vss", "streamlit", "sympy", "telethon", "timescale-vector", "tqdm", "upstash-redis", "xata", "xmltodict"]
javascript = ["esprima"]
llms = ["clarifai", "cohere", "huggingface_hub", "manifest-ml", "nlpcloud", "openai", "openlm", "torch", "transformers"]
openai = ["openai", "tiktoken"]
@ -10893,4 +10908,4 @@ text-helpers = ["chardet"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.8.1,<4.0"
content-hash = "3a5bca34a60eaa9b66a4d1f9ec14de5e6a0e5ca1071a0a874499fe122cc0ee36"
content-hash = "6205031e342d6e4640b47b0b5a37fa7d11ea1915e8a3ee05c00e2e49fdec071e"

@ -139,6 +139,7 @@ typer = {version= "^0.9.0", optional = true}
anthropic = {version = "^0.3.11", optional = true}
aiosqlite = {version = "^0.19.0", optional = true}
rspace_client = {version = "^2.5.0", optional = true}
upstash-redis = {version = "^0.15.0", optional = true}
[tool.poetry.group.test.dependencies]
@ -367,6 +368,7 @@ extended_testing = [
"motor",
"timescale-vector",
"anthropic",
"upstash-redis",
"rspace_client",
]

@ -0,0 +1,91 @@
"""Test Upstash Redis cache functionality."""
import uuid
import pytest
import langchain
from langchain.cache import UpstashRedisCache
from langchain.schema import Generation, LLMResult
from tests.unit_tests.llms.fake_chat_model import FakeChatModel
from tests.unit_tests.llms.fake_llm import FakeLLM
URL = "<UPSTASH_REDIS_REST_URL>"
TOKEN = "<UPSTASH_REDIS_REST_TOKEN>"
def random_string() -> str:
return str(uuid.uuid4())
@pytest.mark.requires("upstash_redis")
def test_redis_cache_ttl() -> None:
from upstash_redis import Redis
langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN), ttl=1)
langchain.llm_cache.update("foo", "bar", [Generation(text="fizz")])
key = langchain.llm_cache._key("foo", "bar")
assert langchain.llm_cache.redis.pttl(key) > 0
@pytest.mark.requires("upstash_redis")
def test_redis_cache() -> None:
from upstash_redis import Redis
langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN), ttl=1)
llm = FakeLLM()
params = llm.dict()
params["stop"] = None
llm_string = str(sorted([(k, v) for k, v in params.items()]))
langchain.llm_cache.update("foo", llm_string, [Generation(text="fizz")])
output = llm.generate(["foo"])
expected_output = LLMResult(
generations=[[Generation(text="fizz")]],
llm_output={},
)
assert output == expected_output
lookup_output = langchain.llm_cache.lookup("foo", llm_string)
if lookup_output and len(lookup_output) > 0:
assert lookup_output == expected_output.generations[0]
langchain.llm_cache.clear()
output = llm.generate(["foo"])
assert output != expected_output
langchain.llm_cache.redis.flushall()
def test_redis_cache_multi() -> None:
from upstash_redis import Redis
langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN), ttl=1)
llm = FakeLLM()
params = llm.dict()
params["stop"] = None
llm_string = str(sorted([(k, v) for k, v in params.items()]))
langchain.llm_cache.update(
"foo", llm_string, [Generation(text="fizz"), Generation(text="Buzz")]
)
output = llm.generate(
["foo"]
) # foo and bar will have the same embedding produced by FakeEmbeddings
expected_output = LLMResult(
generations=[[Generation(text="fizz"), Generation(text="Buzz")]],
llm_output={},
)
assert output == expected_output
# clear the cache
langchain.llm_cache.clear()
@pytest.mark.requires("upstash_redis")
def test_redis_cache_chat() -> None:
from upstash_redis import Redis
langchain.llm_cache = UpstashRedisCache(redis_=Redis(url=URL, token=TOKEN), ttl=1)
llm = FakeChatModel()
params = llm.dict()
params["stop"] = None
with pytest.warns():
llm.predict("foo")
langchain.llm_cache.redis.flushall()

@ -0,0 +1,38 @@
import json
import pytest
from langchain.memory import ConversationBufferMemory
from langchain.memory.chat_message_histories.upstash_redis import (
UpstashRedisChatMessageHistory,
)
from langchain.schema.messages import _message_to_dict
URL = "<UPSTASH_REDIS_REST_URL>"
TOKEN = "<UPSTASH_REDIS_REST_TOKEN>"
@pytest.mark.requires("upstash_redis")
def test_memory_with_message_store() -> None:
"""Test the memory with a message store."""
# setup Upstash Redis as a message store
message_history = UpstashRedisChatMessageHistory(
url=URL, token=TOKEN, ttl=10, session_id="my-test-session"
)
memory = ConversationBufferMemory(
memory_key="baz", chat_memory=message_history, return_messages=True
)
# add some messages
memory.chat_memory.add_ai_message("This is me, the AI")
memory.chat_memory.add_user_message("This is me, the human")
# get the message history from the memory store and turn it into a json
messages = memory.chat_memory.messages
messages_json = json.dumps([_message_to_dict(msg) for msg in messages])
assert "This is me, the AI" in messages_json
assert "This is me, the human" in messages_json
# remove the record from Redis, so the next test run won't pick it up
memory.chat_memory.clear()

@ -0,0 +1,95 @@
"""Implement integration tests for Redis storage."""
import pytest
from upstash_redis import Redis
from langchain.storage.upstash_redis import UpstashRedisStore
pytest.importorskip("upstash_redis")
URL = "<UPSTASH_REDIS_REST_URL>"
TOKEN = "<UPSTASH_REDIS_REST_TOKEN>"
@pytest.fixture
def redis_client() -> Redis:
"""Yield redis client."""
from upstash_redis import Redis
# This fixture flushes the database!
client = Redis(url=URL, token=TOKEN)
try:
client.ping()
except Exception:
pytest.skip("Ping request failed. Verify that credentials are correct.")
client.flushdb()
return client
def test_mget(redis_client: Redis) -> None:
store = UpstashRedisStore(client=redis_client, ttl=None)
keys = ["key1", "key2"]
redis_client.mset({"key1": "value1", "key2": "value2"})
result = store.mget(keys)
assert result == ["value1", "value2"]
def test_mset(redis_client: Redis) -> None:
store = UpstashRedisStore(client=redis_client, ttl=None)
key_value_pairs = [("key1", "value1"), ("key2", "value2")]
store.mset(key_value_pairs)
result = redis_client.mget("key1", "key2")
assert result == ["value1", "value2"]
def test_mdelete(redis_client: Redis) -> None:
"""Test that deletion works as expected."""
store = UpstashRedisStore(client=redis_client, ttl=None)
keys = ["key1", "key2"]
redis_client.mset({"key1": "value1", "key2": "value2"})
store.mdelete(keys)
result = redis_client.mget(*keys)
assert result == [None, None]
def test_yield_keys(redis_client: Redis) -> None:
store = UpstashRedisStore(client=redis_client, ttl=None)
redis_client.mset({"key1": "value2", "key2": "value2"})
assert sorted(store.yield_keys()) == ["key1", "key2"]
assert sorted(store.yield_keys(prefix="key*")) == ["key1", "key2"]
assert sorted(store.yield_keys(prefix="lang*")) == []
def test_namespace(redis_client: Redis) -> None:
store = UpstashRedisStore(client=redis_client, ttl=None, namespace="meow")
key_value_pairs = [("key1", "value1"), ("key2", "value2")]
store.mset(key_value_pairs)
cursor, all_keys = redis_client.scan(0)
while cursor != 0:
cursor, keys = redis_client.scan(cursor)
if len(keys) != 0:
all_keys.extend(keys)
assert sorted(all_keys) == [
"meow/key1",
"meow/key2",
]
store.mdelete(["key1"])
cursor, all_keys = redis_client.scan(0, match="*")
while cursor != 0:
cursor, keys = redis_client.scan(cursor, match="*")
if len(keys) != 0:
all_keys.extend(keys)
assert sorted(all_keys) == [
"meow/key2",
]
assert list(store.yield_keys()) == ["key2"]
assert list(store.yield_keys(prefix="key*")) == ["key2"]
assert list(store.yield_keys(prefix="key1")) == []

@ -0,0 +1,8 @@
"""Light weight unit test that attempts to import UpstashRedisStore.
"""
import pytest
@pytest.mark.requires("upstash_redis")
def test_import_storage() -> None:
from langchain.storage.upstash_redis import UpstashRedisStore # noqa
Loading…
Cancel
Save