mirror of https://github.com/hwchase17/langchain
feat: add momento vector index as a vector store provider (#11567)
**Description**: - Added Momento Vector Index (MVI) as a vector store provider. This includes an implementation with docstrings, integration tests, a notebook, and documentation on the docs pages. - Updated the Momento dependency in pyproject.toml and the lock file to enable access to MVI. - Refactored the Momento cache and chat history session store to prefer using "MOMENTO_API_KEY" over "MOMENTO_AUTH_TOKEN" for consistency with MVI. This change is backwards compatible with the previous "auth_token" variable usage. Updated the code and tests accordingly. **Dependencies**: - Updated Momento dependency in pyproject.toml. **Testing**: - Run the integration tests with a Momento API key. Get one at the [Momento Console](https://console.gomomento.com) for free. MVI is available in AWS us-west-2 with a superuser key. - `MOMENTO_API_KEY=<your key> poetry run pytest tests/integration_tests/vectorstores/test_momento_vector_index.py` **Tag maintainer:** @eyurtsev **Twitter handle**: Please mention @momentohq for this addition to langchain. With the integration of Momento Vector Index, Momento caching, and session store, Momento provides serverless support for the core langchain data needs. Also mention @mlonml for the integration.pull/11553/head
parent
ca2eed36b7
commit
8e45f720a8
@ -0,0 +1,389 @@
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
Type,
|
||||
TypeVar,
|
||||
cast,
|
||||
)
|
||||
from uuid import uuid4
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.schema.embeddings import Embeddings
|
||||
from langchain.schema.vectorstore import VectorStore
|
||||
from langchain.utils import get_from_env
|
||||
from langchain.vectorstores.utils import DistanceStrategy
|
||||
|
||||
VST = TypeVar("VST", bound="VectorStore")
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from momento import PreviewVectorIndexClient
|
||||
|
||||
|
||||
class MomentoVectorIndex(VectorStore):
|
||||
"""`Momento Vector Index` (MVI) vector store.
|
||||
|
||||
Momento Vector Index is a serverless vector index that can be used to store and
|
||||
search vectors. To use you should have the ``momento`` python package installed.
|
||||
|
||||
Example:
|
||||
.. code-block:: python
|
||||
|
||||
from langchain.embeddings import OpenAIEmbeddings
|
||||
from langchain.vectorstores import MomentoVectorIndex
|
||||
from momento import (
|
||||
CredentialProvider,
|
||||
PreviewVectorIndexClient,
|
||||
VectorIndexConfigurations,
|
||||
)
|
||||
|
||||
vectorstore = MomentoVectorIndex(
|
||||
embedding=OpenAIEmbeddings(),
|
||||
client=PreviewVectorIndexClient(
|
||||
VectorIndexConfigurations.Default.latest(),
|
||||
credential_provider=CredentialProvider.from_environment_variable(
|
||||
"MOMENTO_API_KEY"
|
||||
),
|
||||
),
|
||||
index_name="my-index",
|
||||
)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
embedding: Embeddings,
|
||||
client: "PreviewVectorIndexClient",
|
||||
index_name: str = "default",
|
||||
distance_strategy: DistanceStrategy = DistanceStrategy.COSINE,
|
||||
text_field: str = "text",
|
||||
ensure_index_exists: bool = True,
|
||||
**kwargs: Any,
|
||||
):
|
||||
"""Initialize a Vector Store backed by Momento Vector Index.
|
||||
|
||||
Args:
|
||||
embedding (Embeddings): The embedding function to use.
|
||||
configuration (VectorIndexConfiguration): The configuration to initialize
|
||||
the Vector Index with.
|
||||
credential_provider (CredentialProvider): The credential provider to
|
||||
authenticate the Vector Index with.
|
||||
index_name (str, optional): The name of the index to store the documents in.
|
||||
Defaults to "default".
|
||||
distance_strategy (DistanceStrategy, optional): The distance strategy to
|
||||
use. Defaults to DistanceStrategy.COSINE. If you select
|
||||
DistanceStrategy.EUCLIDEAN_DISTANCE, Momento uses the squared
|
||||
Euclidean distance.
|
||||
text_field (str, optional): The name of the metadata field to store the
|
||||
original text in. Defaults to "text".
|
||||
ensure_index_exists (bool, optional): Whether to ensure that the index
|
||||
exists before adding documents to it. Defaults to True.
|
||||
"""
|
||||
try:
|
||||
from momento import PreviewVectorIndexClient
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"Could not import momento python package. "
|
||||
"Please install it with `pip install momento`."
|
||||
)
|
||||
|
||||
self._client: PreviewVectorIndexClient = client
|
||||
self._embedding = embedding
|
||||
self.index_name = index_name
|
||||
self.__validate_distance_strategy(distance_strategy)
|
||||
self.distance_strategy = distance_strategy
|
||||
self.text_field = text_field
|
||||
self._ensure_index_exists = ensure_index_exists
|
||||
|
||||
@staticmethod
|
||||
def __validate_distance_strategy(distance_strategy: DistanceStrategy) -> None:
|
||||
if distance_strategy not in [
|
||||
DistanceStrategy.COSINE,
|
||||
DistanceStrategy.MAX_INNER_PRODUCT,
|
||||
DistanceStrategy.MAX_INNER_PRODUCT,
|
||||
]:
|
||||
raise ValueError(f"Distance strategy {distance_strategy} not implemented.")
|
||||
|
||||
@property
|
||||
def embeddings(self) -> Embeddings:
|
||||
return self._embedding
|
||||
|
||||
def _create_index_if_not_exists(self, num_dimensions: int) -> bool:
|
||||
"""Create index if it does not exist."""
|
||||
from momento.requests.vector_index import SimilarityMetric
|
||||
from momento.responses.vector_index import CreateIndex
|
||||
|
||||
similarity_metric = None
|
||||
if self.distance_strategy == DistanceStrategy.COSINE:
|
||||
similarity_metric = SimilarityMetric.COSINE_SIMILARITY
|
||||
elif self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
|
||||
similarity_metric = SimilarityMetric.INNER_PRODUCT
|
||||
elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
|
||||
similarity_metric = SimilarityMetric.EUCLIDEAN_SIMILARITY
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Distance strategy {self.distance_strategy} not implemented."
|
||||
)
|
||||
|
||||
response = self._client.create_index(
|
||||
self.index_name, num_dimensions, similarity_metric
|
||||
)
|
||||
if isinstance(response, CreateIndex.Success):
|
||||
return True
|
||||
elif isinstance(response, CreateIndex.IndexAlreadyExists):
|
||||
return False
|
||||
elif isinstance(response, CreateIndex.Error):
|
||||
raise response.inner_exception
|
||||
else:
|
||||
raise Exception(f"Unexpected response: {response}")
|
||||
|
||||
def add_texts(
|
||||
self,
|
||||
texts: Iterable[str],
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
"""Run more texts through the embeddings and add to the vectorstore.
|
||||
|
||||
Args:
|
||||
texts (Iterable[str]): Iterable of strings to add to the vectorstore.
|
||||
metadatas (Optional[List[dict]]): Optional list of metadatas associated with
|
||||
the texts.
|
||||
kwargs (Any): Other optional parameters. Specifically:
|
||||
- ids (List[str], optional): List of ids to use for the texts.
|
||||
Defaults to None, in which case uuids are generated.
|
||||
|
||||
Returns:
|
||||
List[str]: List of ids from adding the texts into the vectorstore.
|
||||
"""
|
||||
from momento.requests.vector_index import Item
|
||||
from momento.responses.vector_index import UpsertItemBatch
|
||||
|
||||
texts = list(texts)
|
||||
|
||||
if len(texts) == 0:
|
||||
return []
|
||||
|
||||
if metadatas is not None:
|
||||
for metadata, text in zip(metadatas, texts):
|
||||
metadata[self.text_field] = text
|
||||
else:
|
||||
metadatas = [{self.text_field: text} for text in texts]
|
||||
|
||||
try:
|
||||
embeddings = self._embedding.embed_documents(texts)
|
||||
except NotImplementedError:
|
||||
embeddings = [self._embedding.embed_query(x) for x in texts]
|
||||
|
||||
# Create index if it does not exist.
|
||||
# We assume that if it does exist, then it was created with the desired number
|
||||
# of dimensions and similarity metric.
|
||||
if self._ensure_index_exists:
|
||||
self._create_index_if_not_exists(len(embeddings[0]))
|
||||
|
||||
if "ids" in kwargs:
|
||||
ids = kwargs["ids"]
|
||||
if len(ids) != len(embeddings):
|
||||
raise ValueError("Number of ids must match number of texts")
|
||||
else:
|
||||
ids = [str(uuid4()) for _ in range(len(embeddings))]
|
||||
|
||||
batch_size = 128
|
||||
for i in range(0, len(embeddings), batch_size):
|
||||
start = i
|
||||
end = min(i + batch_size, len(embeddings))
|
||||
items = [
|
||||
Item(id=id, vector=vector, metadata=metadata)
|
||||
for id, vector, metadata in zip(
|
||||
ids[start:end],
|
||||
embeddings[start:end],
|
||||
metadatas[start:end],
|
||||
)
|
||||
]
|
||||
|
||||
response = self._client.upsert_item_batch(self.index_name, items)
|
||||
if isinstance(response, UpsertItemBatch.Success):
|
||||
pass
|
||||
elif isinstance(response, UpsertItemBatch.Error):
|
||||
raise response.inner_exception
|
||||
else:
|
||||
raise Exception(f"Unexpected response: {response}")
|
||||
|
||||
return ids
|
||||
|
||||
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
|
||||
"""Delete by vector ID.
|
||||
|
||||
Args:
|
||||
ids (List[str]): List of ids to delete.
|
||||
kwargs (Any): Other optional parameters (unused)
|
||||
|
||||
Returns:
|
||||
Optional[bool]: True if deletion is successful,
|
||||
False otherwise, None if not implemented.
|
||||
"""
|
||||
from momento.responses.vector_index import DeleteItemBatch
|
||||
|
||||
if ids is None:
|
||||
return True
|
||||
response = self._client.delete_item_batch(self.index_name, ids)
|
||||
return isinstance(response, DeleteItemBatch.Success)
|
||||
|
||||
def similarity_search(
|
||||
self, query: str, k: int = 4, **kwargs: Any
|
||||
) -> List[Document]:
|
||||
"""Search for similar documents to the query string.
|
||||
|
||||
Args:
|
||||
query (str): The query string to search for.
|
||||
k (int, optional): The number of results to return. Defaults to 4.
|
||||
|
||||
Returns:
|
||||
List[Document]: A list of documents that are similar to the query.
|
||||
"""
|
||||
res = self.similarity_search_with_score(query=query, k=k, **kwargs)
|
||||
return [doc for doc, _ in res]
|
||||
|
||||
def similarity_search_with_score(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
**kwargs: Any,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Search for similar documents to the query string.
|
||||
|
||||
Args:
|
||||
query (str): The query string to search for.
|
||||
k (int, optional): The number of results to return. Defaults to 4.
|
||||
kwargs (Any): Vector Store specific search parameters. The following are
|
||||
forwarded to the Momento Vector Index:
|
||||
- top_k (int, optional): The number of results to return.
|
||||
|
||||
Returns:
|
||||
List[Tuple[Document, float]]: A list of tuples of the form
|
||||
(Document, score).
|
||||
"""
|
||||
embedding = self._embedding.embed_query(query)
|
||||
|
||||
results = self.similarity_search_with_score_by_vector(
|
||||
embedding=embedding, k=k, **kwargs
|
||||
)
|
||||
return results
|
||||
|
||||
def similarity_search_with_score_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
**kwargs: Any,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Search for similar documents to the query vector.
|
||||
|
||||
Args:
|
||||
embedding (List[float]): The query vector to search for.
|
||||
k (int, optional): The number of results to return. Defaults to 4.
|
||||
kwargs (Any): Vector Store specific search parameters. The following are
|
||||
forwarded to the Momento Vector Index:
|
||||
- top_k (int, optional): The number of results to return.
|
||||
|
||||
Returns:
|
||||
List[Tuple[Document, float]]: A list of tuples of the form
|
||||
(Document, score).
|
||||
"""
|
||||
from momento.requests.vector_index import ALL_METADATA
|
||||
from momento.responses.vector_index import Search
|
||||
|
||||
if "top_k" in kwargs:
|
||||
k = kwargs["k"]
|
||||
response = self._client.search(
|
||||
self.index_name, embedding, top_k=k, metadata_fields=ALL_METADATA
|
||||
)
|
||||
|
||||
if not isinstance(response, Search.Success):
|
||||
return []
|
||||
|
||||
results = []
|
||||
for hit in response.hits:
|
||||
text = cast(str, hit.metadata.pop(self.text_field))
|
||||
doc = Document(page_content=text, metadata=hit.metadata)
|
||||
pair = (doc, hit.distance)
|
||||
results.append(pair)
|
||||
|
||||
return results
|
||||
|
||||
def similarity_search_by_vector(
|
||||
self, embedding: List[float], k: int = 4, **kwargs: Any
|
||||
) -> List[Document]:
|
||||
"""Search for similar documents to the query vector.
|
||||
|
||||
Args:
|
||||
embedding (List[float]): The query vector to search for.
|
||||
k (int, optional): The number of results to return. Defaults to 4.
|
||||
|
||||
Returns:
|
||||
List[Document]: A list of documents that are similar to the query.
|
||||
"""
|
||||
results = self.similarity_search_with_score_by_vector(
|
||||
embedding=embedding, k=k, **kwargs
|
||||
)
|
||||
return [doc for doc, _ in results]
|
||||
|
||||
@classmethod
|
||||
def from_texts(
|
||||
cls: Type[VST],
|
||||
texts: List[str],
|
||||
embedding: Embeddings,
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
**kwargs: Any,
|
||||
) -> VST:
|
||||
"""Return the Vector Store initialized from texts and embeddings.
|
||||
|
||||
Args:
|
||||
cls (Type[VST]): The Vector Store class to use to initialize
|
||||
the Vector Store.
|
||||
texts (List[str]): The texts to initialize the Vector Store with.
|
||||
embedding (Embeddings): The embedding function to use.
|
||||
metadatas (Optional[List[dict]], optional): The metadata associated with
|
||||
the texts. Defaults to None.
|
||||
kwargs (Any): Vector Store specific parameters. The following are forwarded
|
||||
to the Vector Store constructor and required:
|
||||
- index_name (str, optional): The name of the index to store the documents
|
||||
in. Defaults to "default".
|
||||
- text_field (str, optional): The name of the metadata field to store the
|
||||
original text in. Defaults to "text".
|
||||
- distance_strategy (DistanceStrategy, optional): The distance strategy to
|
||||
use. Defaults to DistanceStrategy.COSINE. If you select
|
||||
DistanceStrategy.EUCLIDEAN_DISTANCE, Momento uses the squared
|
||||
Euclidean distance.
|
||||
- ensure_index_exists (bool, optional): Whether to ensure that the index
|
||||
exists before adding documents to it. Defaults to True.
|
||||
Additionally you can either pass in a client or an API key
|
||||
- client (PreviewVectorIndexClient): The Momento Vector Index client to use.
|
||||
- api_key (Optional[str]): The configuration to use to initialize
|
||||
the Vector Index with. Defaults to None. If None, the configuration
|
||||
is initialized from the environment variable `MOMENTO_API_KEY`.
|
||||
|
||||
Returns:
|
||||
VST: Momento Vector Index vector store initialized from texts and
|
||||
embeddings.
|
||||
"""
|
||||
from momento import (
|
||||
CredentialProvider,
|
||||
PreviewVectorIndexClient,
|
||||
VectorIndexConfigurations,
|
||||
)
|
||||
|
||||
if "client" in kwargs:
|
||||
client = kwargs.pop("client")
|
||||
else:
|
||||
supplied_api_key = kwargs.pop("api_key", None)
|
||||
api_key = supplied_api_key or get_from_env("api_key", "MOMENTO_API_KEY")
|
||||
client = PreviewVectorIndexClient(
|
||||
configuration=VectorIndexConfigurations.Default.latest(),
|
||||
credential_provider=CredentialProvider.from_string(api_key),
|
||||
)
|
||||
vector_db = cls(embedding=embedding, client=client, **kwargs) # type: ignore
|
||||
vector_db.add_texts(texts=texts, metadatas=metadatas, **kwargs)
|
||||
return vector_db
|
@ -0,0 +1,164 @@
|
||||
import time
|
||||
import uuid
|
||||
from typing import Iterator, List
|
||||
|
||||
import pytest
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.embeddings import OpenAIEmbeddings
|
||||
from langchain.vectorstores import MomentoVectorIndex
|
||||
|
||||
API_KEY_ENV_VAR = "MOMENTO_API_KEY"
|
||||
|
||||
|
||||
def random_string() -> str:
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def random_index_name() -> str:
|
||||
return f"langchain-test-index-{random_string()}"
|
||||
|
||||
|
||||
def wait() -> None:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def vector_store(
|
||||
embedding_openai: OpenAIEmbeddings, random_index_name: str
|
||||
) -> Iterator[MomentoVectorIndex]:
|
||||
from momento import (
|
||||
CredentialProvider,
|
||||
PreviewVectorIndexClient,
|
||||
VectorIndexConfigurations,
|
||||
)
|
||||
|
||||
vector_store = None
|
||||
try:
|
||||
client = PreviewVectorIndexClient(
|
||||
VectorIndexConfigurations.Default.latest(),
|
||||
credential_provider=CredentialProvider.from_environment_variable(
|
||||
API_KEY_ENV_VAR
|
||||
),
|
||||
)
|
||||
vector_store = MomentoVectorIndex(
|
||||
embedding=embedding_openai,
|
||||
client=client,
|
||||
index_name=random_index_name,
|
||||
)
|
||||
yield vector_store
|
||||
finally:
|
||||
if vector_store is not None:
|
||||
vector_store._client.delete_index(random_index_name)
|
||||
|
||||
|
||||
def test_from_texts(
|
||||
random_index_name: str, embedding_openai: OpenAIEmbeddings, texts: List[str]
|
||||
) -> None:
|
||||
from momento import (
|
||||
CredentialProvider,
|
||||
VectorIndexConfigurations,
|
||||
)
|
||||
|
||||
random_text = random_string()
|
||||
random_document = f"Hello world {random_text} goodbye world!"
|
||||
texts.insert(0, random_document)
|
||||
|
||||
vector_store = None
|
||||
try:
|
||||
vector_store = MomentoVectorIndex.from_texts(
|
||||
texts=texts,
|
||||
embedding=embedding_openai,
|
||||
index_name=random_index_name,
|
||||
configuration=VectorIndexConfigurations.Default.latest(),
|
||||
credential_provider=CredentialProvider.from_environment_variable(
|
||||
"MOMENTO_API_KEY"
|
||||
),
|
||||
)
|
||||
wait()
|
||||
|
||||
documents = vector_store.similarity_search(query=random_text, k=1)
|
||||
assert documents == [Document(page_content=random_document)]
|
||||
finally:
|
||||
if vector_store is not None:
|
||||
vector_store._client.delete_index(random_index_name)
|
||||
|
||||
|
||||
def test_from_texts_with_metadatas(
|
||||
random_index_name: str, embedding_openai: OpenAIEmbeddings, texts: List[str]
|
||||
) -> None:
|
||||
"""Test end to end construction and search."""
|
||||
from momento import (
|
||||
CredentialProvider,
|
||||
VectorIndexConfigurations,
|
||||
)
|
||||
|
||||
random_text = random_string()
|
||||
random_document = f"Hello world {random_text} goodbye world!"
|
||||
texts.insert(0, random_document)
|
||||
metadatas = [{"page": f"{i}", "source": "user"} for i in range(len(texts))]
|
||||
|
||||
vector_store = None
|
||||
try:
|
||||
vector_store = MomentoVectorIndex.from_texts(
|
||||
texts=texts,
|
||||
embedding=embedding_openai,
|
||||
index_name=random_index_name,
|
||||
metadatas=metadatas,
|
||||
configuration=VectorIndexConfigurations.Default.latest(),
|
||||
credential_provider=CredentialProvider.from_environment_variable(
|
||||
API_KEY_ENV_VAR
|
||||
),
|
||||
)
|
||||
|
||||
wait()
|
||||
documents = vector_store.similarity_search(query=random_text, k=1)
|
||||
assert documents == [
|
||||
Document(
|
||||
page_content=random_document, metadata={"page": "0", "source": "user"}
|
||||
)
|
||||
]
|
||||
finally:
|
||||
if vector_store is not None:
|
||||
vector_store._client.delete_index(random_index_name)
|
||||
|
||||
|
||||
def test_from_texts_with_scores(vector_store: MomentoVectorIndex) -> None:
|
||||
# """Test end to end construction and search with scores and IDs."""
|
||||
texts = ["apple", "orange", "hammer"]
|
||||
metadatas = [{"page": f"{i}"} for i in range(len(texts))]
|
||||
|
||||
vector_store.add_texts(texts, metadatas)
|
||||
wait()
|
||||
search_results = vector_store.similarity_search_with_score("apple", k=3)
|
||||
docs = [o[0] for o in search_results]
|
||||
scores = [o[1] for o in search_results]
|
||||
|
||||
assert docs == [
|
||||
Document(page_content="apple", metadata={"page": "0"}),
|
||||
Document(page_content="orange", metadata={"page": "1"}),
|
||||
Document(page_content="hammer", metadata={"page": "2"}),
|
||||
]
|
||||
assert scores[0] > scores[1] > scores[2]
|
||||
|
||||
|
||||
def test_add_documents_with_ids(vector_store: MomentoVectorIndex) -> None:
|
||||
"""Test end to end construction and search with scores and IDs."""
|
||||
from momento.responses.vector_index import Search
|
||||
|
||||
texts = ["apple", "orange", "hammer"]
|
||||
ids = [random_string() for _ in range(len(texts))]
|
||||
metadatas = [{"page": f"{i}"} for i in range(len(texts))]
|
||||
|
||||
# Add texts with metadata and ids
|
||||
stored_ids = vector_store.add_texts(texts, metadatas, ids=ids)
|
||||
assert stored_ids == ids
|
||||
wait()
|
||||
|
||||
# Verify that the ids are in the index
|
||||
response = vector_store._client.search(
|
||||
vector_store.index_name, vector_store.embeddings.embed_query("apple")
|
||||
)
|
||||
assert isinstance(response, Search.Success)
|
||||
assert [hit.id for hit in response.hits] == ids
|
Loading…
Reference in New Issue