diff --git a/docs/docs/integrations/llms/llm_caching.ipynb b/docs/docs/integrations/llms/llm_caching.ipynb index 55d557b8b0..3ba3107358 100644 --- a/docs/docs/integrations/llms/llm_caching.ipynb +++ b/docs/docs/integrations/llms/llm_caching.ipynb @@ -12,12 +12,12 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 9, "id": "10ad9224", "metadata": { "ExecuteTime": { - "end_time": "2024-03-18T01:01:08.425930Z", - "start_time": "2024-03-18T01:01:08.327196Z" + "end_time": "2024-04-12T02:05:57.319706Z", + "start_time": "2024-04-12T02:05:57.303868Z" } }, "outputs": [], @@ -1358,7 +1358,10 @@ "cell_type": "markdown", "id": "40624c26e86b57a4", "metadata": { - "collapsed": false + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } }, "source": [ "## Azure Cosmos DB Semantic Cache\n", @@ -1435,7 +1438,10 @@ "end_time": "2024-03-12T00:12:57.462226Z", "start_time": "2024-03-12T00:12:55.166201Z" }, - "collapsed": false + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } }, "outputs": [ { @@ -1865,6 +1871,116 @@ "source": [ "!rm .langchain.db sqlite.db" ] + }, + { + "cell_type": "markdown", + "id": "544a90cbdd9894ba", + "metadata": {}, + "source": [] + }, + { + "cell_type": "markdown", + "id": "9ecfa565038eff71", + "metadata": {}, + "source": [ + "## OpenSearch Semantic Cache\n", + "Use [OpenSearch](https://python.langchain.com/docs/integrations/vectorstores/opensearch/) as a semantic cache to cache prompts and responses and evaluate hits based on semantic similarity." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "7379fd5aa83ee500", + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T02:06:03.766873Z", + "start_time": "2024-04-12T02:06:03.754481Z" + } + }, + "outputs": [], + "source": [ + "from langchain_community.cache import OpenSearchSemanticCache\n", + "from langchain_openai import OpenAIEmbeddings\n", + "\n", + "set_llm_cache(\n", + " OpenSearchSemanticCache(\n", + " opensearch_url=\"http://localhost:9200\", embedding=OpenAIEmbeddings()\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "fecb26634bf27e93", + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T02:06:08.734403Z", + "start_time": "2024-04-12T02:06:07.178381Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 39.4 ms, sys: 11.8 ms, total: 51.2 ms\n", + "Wall time: 1.55 s\n" + ] + }, + { + "data": { + "text/plain": [ + "\"\\n\\nWhy don't scientists trust atoms?\\n\\nBecause they make up everything.\"" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%time\n", + "# The first time, it is not yet in cache, so it should take longer\n", + "llm(\"Tell me a joke\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "43b24b725ea4ba98", + "metadata": { + "ExecuteTime": { + "end_time": "2024-04-12T02:06:12.073448Z", + "start_time": "2024-04-12T02:06:11.957571Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 4.66 ms, sys: 1.1 ms, total: 5.76 ms\n", + "Wall time: 113 ms\n" + ] + }, + { + "data": { + "text/plain": [ + "\"\\n\\nWhy don't scientists trust atoms?\\n\\nBecause they make up everything.\"" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%time\n", + "# The second time, while not a direct hit, the question is semantically similar to the original question,\n", + "# so it uses the cached result!\n", + "llm(\"Tell me one joke\")" + ] } ], "metadata": { @@ -1883,7 +1999,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.4" + "version": "3.9.1" } }, "nbformat": 4, diff --git a/libs/community/langchain_community/cache.py b/libs/community/langchain_community/cache.py index 9343b8cc57..356f48800c 100644 --- a/libs/community/langchain_community/cache.py +++ b/libs/community/langchain_community/cache.py @@ -19,6 +19,7 @@ Cache directly competes with Memory. See documentation for Pros and Cons. BaseCache --> Cache # Examples: InMemoryCache, RedisCache, GPTCache """ + from __future__ import annotations import hashlib @@ -76,6 +77,9 @@ from langchain_community.utilities.astradb import ( _AstraDBCollectionEnvironment, ) from langchain_community.vectorstores import AzureCosmosDBVectorSearch +from langchain_community.vectorstores import ( + OpenSearchVectorSearch as OpenSearchVectorStore, +) from langchain_community.vectorstores.redis import Redis as RedisVectorstore logger = logging.getLogger(__file__) @@ -2049,3 +2053,105 @@ class AzureCosmosDBSemanticCache(BaseCache): def _validate_enum_value(value: Any, enum_type: Type[Enum]) -> None: if not isinstance(value, enum_type): raise ValueError(f"Invalid enum value: {value}. Expected {enum_type}.") + + +class OpenSearchSemanticCache(BaseCache): + """Cache that uses OpenSearch vector store backend""" + + def __init__( + self, opensearch_url: str, embedding: Embeddings, score_threshold: float = 0.2 + ): + """ + Args: + opensearch_url (str): URL to connect to OpenSearch. + embedding (Embedding): Embedding provider for semantic encoding and search. + score_threshold (float, 0.2): + Example: + .. code-block:: python + import langchain + from langchain.cache import OpenSearchSemanticCache + from langchain.embeddings import OpenAIEmbeddings + langchain.llm_cache = OpenSearchSemanticCache( + opensearch_url="http//localhost:9200", + embedding=OpenAIEmbeddings() + ) + """ + self._cache_dict: Dict[str, OpenSearchVectorStore] = {} + self.opensearch_url = opensearch_url + self.embedding = embedding + self.score_threshold = score_threshold + + def _index_name(self, llm_string: str) -> str: + hashed_index = _hash(llm_string) + return f"cache_{hashed_index}" + + def _get_llm_cache(self, llm_string: str) -> OpenSearchVectorStore: + index_name = self._index_name(llm_string) + + # return vectorstore client for the specific llm string + if index_name in self._cache_dict: + return self._cache_dict[index_name] + + # create new vectorstore client for the specific llm string + self._cache_dict[index_name] = OpenSearchVectorStore( + opensearch_url=self.opensearch_url, + index_name=index_name, + embedding_function=self.embedding, + ) + + # create index for the vectorstore + vectorstore = self._cache_dict[index_name] + if not vectorstore.index_exists(): + _embedding = self.embedding.embed_query(text="test") + vectorstore.create_index(len(_embedding), index_name) + return vectorstore + + def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: + """Look up based on prompt and llm_string.""" + llm_cache = self._get_llm_cache(llm_string) + generations: List = [] + # Read from a Hash + results = llm_cache.similarity_search( + query=prompt, + k=1, + score_threshold=self.score_threshold, + ) + if results: + for document in results: + try: + generations.extend(loads(document.metadata["return_val"])) + except Exception: + logger.warning( + "Retrieving a cache value that could not be deserialized " + "properly. This is likely due to the cache being in an " + "older format. Please recreate your cache to avoid this " + "error." + ) + + generations.extend( + _load_generations_from_json(document.metadata["return_val"]) + ) + return generations if generations else None + + def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None: + """Update cache based on prompt and llm_string.""" + for gen in return_val: + if not isinstance(gen, Generation): + raise ValueError( + "OpenSearchSemanticCache only supports caching of " + f"normal LLM generations, got {type(gen)}" + ) + llm_cache = self._get_llm_cache(llm_string) + metadata = { + "llm_string": llm_string, + "prompt": prompt, + "return_val": dumps([g for g in return_val]), + } + llm_cache.add_texts(texts=[prompt], metadatas=[metadata]) + + def clear(self, **kwargs: Any) -> None: + """Clear semantic cache for a given llm_string.""" + index_name = self._index_name(kwargs["llm_string"]) + if index_name in self._cache_dict: + self._cache_dict[index_name].delete_index(index_name=index_name) + del self._cache_dict[index_name] diff --git a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py index 5f28de34ef..c7372f65fc 100644 --- a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py +++ b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py @@ -274,10 +274,12 @@ def _default_approximate_search_query( query_vector: List[float], k: int = 4, vector_field: str = "vector_field", + score_threshold: Optional[float] = 0.0, ) -> Dict: """For Approximate k-NN Search, this is the default query.""" return { "size": k, + "min_score": score_threshold, "query": {"knn": {vector_field: {"vector": query_vector, "k": k}}}, } @@ -288,10 +290,12 @@ def _approximate_search_query_with_boolean_filter( k: int = 4, vector_field: str = "vector_field", subquery_clause: str = "must", + score_threshold: Optional[float] = 0.0, ) -> Dict: """For Approximate k-NN Search, with Boolean Filter.""" return { "size": k, + "min_score": score_threshold, "query": { "bool": { "filter": boolean_filter, @@ -308,11 +312,12 @@ def _approximate_search_query_with_efficient_filter( efficient_filter: Dict, k: int = 4, vector_field: str = "vector_field", + score_threshold: Optional[float] = 0.0, ) -> Dict: """For Approximate k-NN Search, with Efficient Filter for Lucene and Faiss Engines.""" search_query = _default_approximate_search_query( - query_vector, k=k, vector_field=vector_field + query_vector, k=k, vector_field=vector_field, score_threshold=score_threshold ) search_query["query"]["knn"][vector_field]["filter"] = efficient_filter return search_query @@ -324,6 +329,7 @@ def _default_script_query( space_type: str = "l2", pre_filter: Optional[Dict] = None, vector_field: str = "vector_field", + score_threshold: Optional[float] = 0.0, ) -> Dict: """For Script Scoring Search, this is the default query.""" @@ -332,6 +338,7 @@ def _default_script_query( return { "size": k, + "min_score": score_threshold, "query": { "script_score": { "query": pre_filter, @@ -368,6 +375,7 @@ def _default_painless_scripting_query( space_type: str = "l2Squared", pre_filter: Optional[Dict] = None, vector_field: str = "vector_field", + score_threshold: Optional[float] = 0.0, ) -> Dict: """For Painless Scripting Search, this is the default query.""" @@ -377,6 +385,7 @@ def _default_painless_scripting_query( source = __get_painless_scripting_source(space_type, vector_field=vector_field) return { "size": k, + "min_score": score_threshold, "query": { "script_score": { "query": pre_filter, @@ -509,6 +518,72 @@ class OpenSearchVectorSearch(VectorStore): is_aoss=self.is_aoss, ) + def delete_index(self, index_name: Optional[str] = None) -> Optional[bool]: + """Deletes a given index from vectorstore.""" + if index_name is None: + if self.index_name is None: + raise ValueError("index_name must be provided.") + index_name = self.index_name + try: + self.client.indices.delete(index=index_name) + return True + except Exception as e: + raise e + + def index_exists(self, index_name: Optional[str] = None) -> Optional[bool]: + """If given index present in vectorstore, returns True else False.""" + if index_name is None: + if self.index_name is None: + raise ValueError("index_name must be provided.") + index_name = self.index_name + + return self.client.indices.exists(index=index_name) + + def create_index( + self, + dimension: int, + index_name: Optional[str] = uuid.uuid4().hex, + **kwargs: Any, + ) -> Optional[str]: + """Create a new Index with given arguments""" + is_appx_search = kwargs.get("is_appx_search", True) + vector_field = kwargs.get("vector_field", "vector_field") + kwargs.get("text_field", "text") + http_auth = kwargs.get("http_auth") + is_aoss = _is_aoss_enabled(http_auth=http_auth) + + if is_aoss and not is_appx_search: + raise ValueError( + "Amazon OpenSearch Service Serverless only " + "supports `approximate_search`" + ) + + if is_appx_search: + engine = kwargs.get("engine", "nmslib") + space_type = kwargs.get("space_type", "l2") + ef_search = kwargs.get("ef_search", 512) + ef_construction = kwargs.get("ef_construction", 512) + m = kwargs.get("m", 16) + + _validate_aoss_with_engines(is_aoss, engine) + + mapping = _default_text_mapping( + dimension, + engine, + space_type, + ef_search, + ef_construction, + m, + vector_field, + ) + else: + mapping = _default_scripting_text_mapping(dimension) + + if self.index_exists(index_name): + raise RuntimeError(f"The index, {index_name} already exists.") + self.client.indices.create(index=index_name, body=mapping) + return index_name + def add_texts( self, texts: Iterable[str], @@ -659,7 +734,11 @@ class OpenSearchVectorSearch(VectorStore): ) def similarity_search( - self, query: str, k: int = 4, **kwargs: Any + self, + query: str, + k: int = 4, + score_threshold: Optional[float] = 0.0, + **kwargs: Any, ) -> List[Document]: """Return docs most similar to query. @@ -669,6 +748,8 @@ class OpenSearchVectorSearch(VectorStore): Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. + score_threshold: Specify a score threshold to return only documents + above the threshold. Defaults to 0.0. Returns: List of Documents most similar to the query. @@ -717,20 +798,30 @@ class OpenSearchVectorSearch(VectorStore): pre_filter: script_score query to pre-filter documents before identifying nearest neighbors; default: {"match_all": {}} """ - docs_with_scores = self.similarity_search_with_score(query, k, **kwargs) + docs_with_scores = self.similarity_search_with_score( + query, k, score_threshold, **kwargs + ) return [doc[0] for doc in docs_with_scores] def similarity_search_by_vector( - self, embedding: List[float], k: int = 4, **kwargs: Any + self, + embedding: List[float], + k: int = 4, + score_threshold: Optional[float] = 0.0, + **kwargs: Any, ) -> List[Document]: """Return docs most similar to the embedding vector.""" docs_with_scores = self.similarity_search_with_score_by_vector( - embedding, k, **kwargs + embedding, k, score_threshold, **kwargs ) return [doc[0] for doc in docs_with_scores] def similarity_search_with_score( - self, query: str, k: int = 4, **kwargs: Any + self, + query: str, + k: int = 4, + score_threshold: Optional[float] = 0.0, + **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs and it's scores most similar to query. @@ -740,6 +831,8 @@ class OpenSearchVectorSearch(VectorStore): Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. + score_threshold: Specify a score threshold to return only documents + above the threshold. Defaults to 0.0. Returns: List of Documents along with its scores most similar to the query. @@ -748,10 +841,16 @@ class OpenSearchVectorSearch(VectorStore): same as `similarity_search` """ embedding = self.embedding_function.embed_query(query) - return self.similarity_search_with_score_by_vector(embedding, k, **kwargs) + return self.similarity_search_with_score_by_vector( + embedding, k, score_threshold, **kwargs + ) def similarity_search_with_score_by_vector( - self, embedding: List[float], k: int = 4, **kwargs: Any + self, + embedding: List[float], + k: int = 4, + score_threshold: Optional[float] = 0.0, + **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs and it's scores most similar to the embedding vector. @@ -761,6 +860,8 @@ class OpenSearchVectorSearch(VectorStore): Args: embedding: Embedding vector to look up documents similar to. k: Number of Documents to return. Defaults to 4. + score_threshold: Specify a score threshold to return only documents + above the threshold. Defaults to 0.0. Returns: List of Documents along with its scores most similar to the query. @@ -772,7 +873,7 @@ class OpenSearchVectorSearch(VectorStore): metadata_field = kwargs.get("metadata_field", "metadata") hits = self._raw_similarity_search_with_score_by_vector( - embedding=embedding, k=k, **kwargs + embedding=embedding, k=k, score_threshold=score_threshold, **kwargs ) documents_with_scores = [ @@ -792,7 +893,11 @@ class OpenSearchVectorSearch(VectorStore): return documents_with_scores def _raw_similarity_search_with_score_by_vector( - self, embedding: List[float], k: int = 4, **kwargs: Any + self, + embedding: List[float], + k: int = 4, + score_threshold: Optional[float] = 0.0, + **kwargs: Any, ) -> List[dict]: """Return raw opensearch documents (dict) including vectors, scores most similar to the embedding vector. @@ -803,6 +908,8 @@ class OpenSearchVectorSearch(VectorStore): Args: embedding: Embedding vector to look up documents similar to. k: Number of Documents to return. Defaults to 4. + score_threshold: Specify a score threshold to return only documents + above the threshold. Defaults to 0.0. Returns: List of dict with its scores most similar to the embedding. @@ -868,10 +975,15 @@ class OpenSearchVectorSearch(VectorStore): k=k, vector_field=vector_field, subquery_clause=subquery_clause, + score_threshold=score_threshold, ) elif efficient_filter != {}: search_query = _approximate_search_query_with_efficient_filter( - embedding, efficient_filter, k=k, vector_field=vector_field + embedding, + efficient_filter, + k=k, + vector_field=vector_field, + score_threshold=score_threshold, ) elif lucene_filter != {}: warnings.warn( @@ -879,23 +991,40 @@ class OpenSearchVectorSearch(VectorStore): " `efficient_filter`" ) search_query = _approximate_search_query_with_efficient_filter( - embedding, lucene_filter, k=k, vector_field=vector_field + embedding, + lucene_filter, + k=k, + vector_field=vector_field, + score_threshold=score_threshold, ) else: search_query = _default_approximate_search_query( - embedding, k=k, vector_field=vector_field + embedding, + k=k, + vector_field=vector_field, + score_threshold=score_threshold, ) elif search_type == SCRIPT_SCORING_SEARCH: space_type = kwargs.get("space_type", "l2") pre_filter = kwargs.get("pre_filter", MATCH_ALL_QUERY) search_query = _default_script_query( - embedding, k, space_type, pre_filter, vector_field + embedding, + k, + space_type, + pre_filter, + vector_field, + score_threshold=score_threshold, ) elif search_type == PAINLESS_SCRIPTING_SEARCH: space_type = kwargs.get("space_type", "l2Squared") pre_filter = kwargs.get("pre_filter", MATCH_ALL_QUERY) search_query = _default_painless_scripting_query( - embedding, k, space_type, pre_filter, vector_field + embedding, + k, + space_type, + pre_filter, + vector_field, + score_threshold=score_threshold, ) else: raise ValueError("Invalid `search_type` provided as an argument") diff --git a/libs/langchain/tests/integration_tests/cache/test_opensearch_cache.py b/libs/langchain/tests/integration_tests/cache/test_opensearch_cache.py new file mode 100644 index 0000000000..eda3297e86 --- /dev/null +++ b/libs/langchain/tests/integration_tests/cache/test_opensearch_cache.py @@ -0,0 +1,59 @@ +from langchain_community.cache import OpenSearchSemanticCache +from langchain_core.outputs import Generation + +from langchain.globals import get_llm_cache, set_llm_cache +from tests.integration_tests.cache.fake_embeddings import ( + FakeEmbeddings, +) +from tests.unit_tests.llms.fake_llm import FakeLLM + +DEFAULT_OPENSEARCH_URL = "http://localhost:9200" + + +def test_opensearch_semantic_cache() -> None: + """Test opensearch semantic cache functionality.""" + set_llm_cache( + OpenSearchSemanticCache( + embedding=FakeEmbeddings(), + opensearch_url=DEFAULT_OPENSEARCH_URL, + score_threshold=0.0, + ) + ) + llm = FakeLLM() + params = llm.dict() + params["stop"] = None + llm_string = str(sorted([(k, v) for k, v in params.items()])) + get_llm_cache().update("foo", llm_string, [Generation(text="fizz")]) + cache_output = get_llm_cache().lookup("bar", llm_string) + assert cache_output == [Generation(text="fizz")] + + get_llm_cache().clear(llm_string=llm_string) + output = get_llm_cache().lookup("bar", llm_string) + assert output != [Generation(text="fizz")] + + +def test_opensearch_semantic_cache_multi() -> None: + set_llm_cache( + OpenSearchSemanticCache( + embedding=FakeEmbeddings(), + opensearch_url=DEFAULT_OPENSEARCH_URL, + score_threshold=0.0, + ) + ) + + llm = FakeLLM() + params = llm.dict() + params["stop"] = None + llm_string = str(sorted([(k, v) for k, v in params.items()])) + get_llm_cache().update( + "foo", llm_string, [Generation(text="fizz"), Generation(text="Buzz")] + ) + + # foo and bar will have the same embedding produced by FakeEmbeddings + cache_output = get_llm_cache().lookup("bar", llm_string) + assert cache_output == [Generation(text="fizz"), Generation(text="Buzz")] + + # clear the cache + get_llm_cache().clear(llm_string=llm_string) + output = get_llm_cache().lookup("bar", llm_string) + assert output != [Generation(text="fizz"), Generation(text="Buzz")]