From 3f48eed5bd3633acfd84c5730417996369b6375f Mon Sep 17 00:00:00 2001 From: Harrison Chase Date: Thu, 2 Feb 2023 22:05:47 -0800 Subject: [PATCH] Harrison/milvus (#856) Signed-off-by: Filip Haltmayer Signed-off-by: Frank Liu Co-authored-by: Filip Haltmayer <81822489+filip-halt@users.noreply.github.com> Co-authored-by: Frank Liu --- .../combine_docs_examples/vectorstores.ipynb | 72 ++- langchain/vectorstores/__init__.py | 2 + langchain/vectorstores/milvus.py | 428 ++++++++++++++++++ .../vectorstores/fake_embeddings.py | 18 + .../vectorstores/test_elasticsearch.py | 15 +- .../vectorstores/test_faiss.py | 15 +- .../vectorstores/test_milvus.py | 53 +++ .../vectorstores/test_qdrant.py | 16 +- 8 files changed, 572 insertions(+), 47 deletions(-) create mode 100644 langchain/vectorstores/milvus.py create mode 100644 tests/integration_tests/vectorstores/fake_embeddings.py create mode 100644 tests/integration_tests/vectorstores/test_milvus.py diff --git a/docs/modules/utils/combine_docs_examples/vectorstores.ipynb b/docs/modules/utils/combine_docs_examples/vectorstores.ipynb index 483a68e6d1..7edfb49230 100644 --- a/docs/modules/utils/combine_docs_examples/vectorstores.ipynb +++ b/docs/modules/utils/combine_docs_examples/vectorstores.ipynb @@ -16,7 +16,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 3, "id": "965eecee", "metadata": { "pycharm": { @@ -32,7 +32,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 4, "id": "68481687", "metadata": { "pycharm": { @@ -51,7 +51,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 5, "id": "015f4ff5", "metadata": { "pycharm": { @@ -566,10 +566,74 @@ "docs[0]" ] }, + { + "cell_type": "markdown", + "id": "6c3ec797", + "metadata": {}, + "source": [ + "## Milvus\n", + "To run, you should have a Milvus instance up and running: https://milvus.io/docs/install_standalone-docker.md" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "be347313", + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.vectorstores import Milvus" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "f2eee23f", + "metadata": {}, + "outputs": [], + "source": [ + "vector_db = Milvus.from_texts(\n", + " texts,\n", + " embeddings,\n", + " connection_args={\"host\": \"127.0.0.1\", \"port\": \"19530\"},\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "06bdb701", + "metadata": {}, + "outputs": [], + "source": [ + "docs = vector_db.similarity_search(query)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "7b3e94aa", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Document(page_content='In state after state, new laws have been passed, not only to suppress the vote, but to subvert entire elections. \\n\\nWe cannot let this happen. \\n\\nTonight. I call on the Senate to: Pass the Freedom to Vote Act. Pass the John Lewis Voting Rights Act. And while you’re at it, pass the Disclose Act so Americans can know who is funding our elections. \\n\\nTonight, I’d like to honor someone who has dedicated his life to serve this country: Justice Stephen Breyer—an Army veteran, Constitutional scholar, and retiring Justice of the United States Supreme Court. Justice Breyer, thank you for your service. \\n\\nOne of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \\n\\nAnd I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nation’s top legal minds, who will continue Justice Breyer’s legacy of excellence.', lookup_str='', metadata={}, lookup_index=0)" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "docs[0]" + ] + }, { "cell_type": "code", "execution_count": null, - "id": "8ffd66e2", + "id": "4af5a071", "metadata": {}, "outputs": [], "source": [] diff --git a/langchain/vectorstores/__init__.py b/langchain/vectorstores/__init__.py index 8616b574ce..a6e0d4b580 100644 --- a/langchain/vectorstores/__init__.py +++ b/langchain/vectorstores/__init__.py @@ -2,6 +2,7 @@ from langchain.vectorstores.base import VectorStore from langchain.vectorstores.elastic_vector_search import ElasticVectorSearch from langchain.vectorstores.faiss import FAISS +from langchain.vectorstores.milvus import Milvus from langchain.vectorstores.pinecone import Pinecone from langchain.vectorstores.qdrant import Qdrant from langchain.vectorstores.weaviate import Weaviate @@ -13,4 +14,5 @@ __all__ = [ "Pinecone", "Weaviate", "Qdrant", + "Milvus", ] diff --git a/langchain/vectorstores/milvus.py b/langchain/vectorstores/milvus.py new file mode 100644 index 0000000000..b21b551528 --- /dev/null +++ b/langchain/vectorstores/milvus.py @@ -0,0 +1,428 @@ +"""Wrapper around the Milvus vector database.""" +from __future__ import annotations + +import uuid +from typing import Any, Iterable, List, Optional, Tuple + +import numpy as np + +from langchain.docstore.document import Document +from langchain.embeddings.base import Embeddings +from langchain.vectorstores.base import VectorStore +from langchain.vectorstores.utils import maximal_marginal_relevance + + +class Milvus(VectorStore): + """Wrapper around the Milvus vector database.""" + + def __init__( + self, + embedding_function: Embeddings, + connection_args: dict, + collection_name: str, + text_field: str, + ): + """Initialize wrapper around the milvus vector database. + + In order to use this you need to have `pymilvus` installed and a + running Milvus instance. + + See the following documentation for how to run a Milvus instance: + https://milvus.io/docs/install_standalone-docker.md + + Args: + embedding_function (Embeddings): Function used to embed the text + connection_args (dict): Arguments for pymilvus connections.connect() + collection_name (str): The name of the collection to search. + text_field (str): The field in Milvus schema where the + original text is stored. + """ + try: + from pymilvus import Collection, DataType, connections + except ImportError: + raise ValueError( + "Could not import pymilvus python package. " + "Please it install it with `pip install pymilvus`." + ) + # Connecting to Milvus instance + if not connections.has_connection("default"): + connections.connect(**connection_args) + self.embedding_func = embedding_function + self.collection_name = collection_name + + self.text_field = text_field + self.auto_id = False + self.primary_field = None + self.vector_field = None + self.fields = [] + + self.col = Collection(self.collection_name) + schema = self.col.schema + + # Grabbing the fields for the existing collection. + for x in schema.fields: + self.fields.append(x.name) + if x.auto_id: + self.fields.remove(x.name) + if x.is_primary: + self.primary_field = x.name + if x.dtype == DataType.FLOAT_VECTOR or x.dtype == DataType.BINARY_VECTOR: + self.vector_field = x.name + + # Default search params when one is not provided. + self.index_params = { + "IVF_FLAT": {"params": {"nprobe": 10}}, + "IVF_SQ8": {"params": {"nprobe": 10}}, + "IVF_PQ": {"params": {"nprobe": 10}}, + "HNSW": {"params": {"ef": 10}}, + "RHNSW_FLAT": {"params": {"ef": 10}}, + "RHNSW_SQ": {"params": {"ef": 10}}, + "RHNSW_PQ": {"params": {"ef": 10}}, + "IVF_HNSW": {"params": {"nprobe": 10, "ef": 10}}, + "ANNOY": {"params": {"search_k": 10}}, + } + + def add_texts( + self, + texts: Iterable[str], + metadatas: Optional[List[dict]] = None, + partition_name: Optional[str] = None, + timeout: Optional[int] = None, + ) -> List[str]: + """Insert text data into Milvus. + + When using add_texts() it is assumed that a collecton has already + been made and indexed. If metadata is included, it is assumed that + it is ordered correctly to match the schema provided to the Collection + and that the embedding vector is the first schema field. + + Args: + texts (Iterable[str]): The text being embedded and inserted. + metadatas (Optional[List[dict]], optional): The metadata that + corresponds to each insert. Defaults to None. + partition_name (str, optional): The partition of the collection + to insert data into. Defaults to None. + timeout: specified timeout. + + Returns: + List[str]: The resulting keys for each inserted element. + """ + insert_dict: Any = {self.text_field: list(texts)} + try: + insert_dict[self.vector_field] = self.embedding_func.embed_documents( + list(texts) + ) + except NotImplementedError: + insert_dict[self.vector_field] = [ + self.embedding_func.embed_query(x) for x in texts + ] + # Collect the metadata into the insert dict. + if len(self.fields) > 2 and metadatas is not None: + for d in metadatas: + for key, value in d.items(): + if key in self.fields: + insert_dict.setdefault(key, []).append(value) + # Convert dict to list of lists for insertion + insert_list = [insert_dict[x] for x in self.fields] + # Insert into the collection. + res = self.col.insert( + insert_list, partition_name=partition_name, timeout=timeout + ) + # Flush to make sure newly inserted is immediately searchable. + self.col.flush() + return res.primary_keys + + def _worker_search( + self, + query: str, + k: int = 4, + param: Optional[dict] = None, + expr: Optional[str] = None, + partition_names: Optional[List[str]] = None, + round_decimal: int = -1, + timeout: Optional[int] = None, + **kwargs: Any, + ) -> Tuple[List[float], List[Tuple[Document, Any, Any]]]: + # Load the collection into memory for searching. + self.col.load() + # Decide to use default params if not passed in. + if param is None: + index_type = self.col.indexes[0].params["index_type"] + param = self.index_params[index_type] + # Embed the query text. + data = [self.embedding_func.embed_query(query)] + # Determine result metadata fields. + output_fields = self.fields[:] + output_fields.remove(self.vector_field) + # Perform the search. + res = self.col.search( + data, + self.vector_field, + param, + k, + expr=expr, + output_fields=output_fields, + partition_names=partition_names, + round_decimal=round_decimal, + timeout=timeout, + **kwargs, + ) + # Organize results. + ret = [] + for result in res[0]: + meta = {x: result.entity.get(x) for x in output_fields} + ret.append( + ( + Document(page_content=meta.pop(self.text_field), metadata=meta), + result.distance, + result.id, + ) + ) + + return data[0], ret + + def similarity_search_with_score( + self, + query: str, + k: int = 4, + param: Optional[dict] = None, + expr: Optional[str] = None, + partition_names: Optional[List[str]] = None, + round_decimal: int = -1, + timeout: Optional[int] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Perform a search on a query string and return results. + + Args: + query (str): The text being searched. + k (int, optional): The amount of results ot return. Defaults to 4. + param (dict, optional): The search params for the specified index. + Defaults to None. + expr (str, optional): Filtering expression. Defaults to None. + partition_names (List[str], optional): Partitions to search through. + Defaults to None. + round_decimal (int, optional): Round the resulting distance. Defaults + to -1. + timeout (int, optional): Amount to wait before timeout error. Defaults + to None. + kwargs: Collection.search() keyword arguments. + + Returns: + List[float], List[Tuple[Document, any, any]]: search_embedding, + (Document, distance, primary_field) results. + """ + _, result = self._worker_search( + query, k, param, expr, partition_names, round_decimal, timeout, **kwargs + ) + return [(x, y) for x, y, _ in result] + + def max_marginal_relevance_search( + self, + query: str, + k: int = 4, + fetch_k: int = 20, + param: Optional[dict] = None, + expr: Optional[str] = None, + partition_names: Optional[List[str]] = None, + round_decimal: int = -1, + timeout: Optional[int] = None, + **kwargs: Any, + ) -> List[Document]: + """Perform a search and return results that are reordered by MMR. + + Args: + query (str): The text being searched. + k (int, optional): How many results to give. Defaults to 4. + fetch_k (int, optional): Total results to select k from. + Defaults to 20. + param (dict, optional): The search params for the specified index. + Defaults to None. + expr (str, optional): Filtering expression. Defaults to None. + partition_names (List[str], optional): What partitions to search. + Defaults to None. + round_decimal (int, optional): Round the resulting distance. Defaults + to -1. + timeout (int, optional): Amount to wait before timeout error. Defaults + to None. + + Returns: + List[Document]: Document results for search. + """ + data, res = self._worker_search( + query, + fetch_k, + param, + expr, + partition_names, + round_decimal, + timeout, + **kwargs, + ) + # Extract result IDs. + ids = [x for _, _, x in res] + # Get the raw vectors from Milvus. + vectors = self.col.query( + expr=f"{self.primary_field} in {ids}", + output_fields=[self.primary_field, self.vector_field], + ) + # Reorganize the results from query to match result order. + vectors = {x[self.primary_field]: x[self.vector_field] for x in vectors} + search_embedding = data + ordered_result_embeddings = [vectors[x] for x in ids] + # Get the new order of results. + new_ordering = maximal_marginal_relevance( + np.array(search_embedding), ordered_result_embeddings, k=k + ) + # Reorder the values and return. + ret = [] + for x in new_ordering: + if x == -1: + break + else: + ret.append(res[x][0]) + return ret + + def similarity_search( + self, + query: str, + k: int = 4, + param: Optional[dict] = None, + expr: Optional[str] = None, + partition_names: Optional[List[str]] = None, + round_decimal: int = -1, + timeout: Optional[int] = None, + **kwargs: Any, + ) -> List[Document]: + """Perform a similarity search against the query string. + + Args: + query (str): The text to search. + k (int, optional): How many results to return. Defaults to 4. + param (dict, optional): The search params for the index type. + Defaults to None. + expr (str, optional): Filtering expression. Defaults to None. + partition_names (List[str], optional): What partitions to search. + Defaults to None. + round_decimal (int, optional): What decimal point to round to. + Defaults to -1. + timeout (int, optional): How long to wait before timeout error. + Defaults to None. + + Returns: + List[Document]: Document results for search. + """ + _, docs_and_scores = self._worker_search( + query, k, param, expr, partition_names, round_decimal, timeout, **kwargs + ) + return [doc for doc, _, _ in docs_and_scores] + + @classmethod + def from_texts( + cls, + texts: List[str], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> Milvus: + """Create a Milvus collection, indexes it with HNSW, and insert data. + + Args: + texts (List[str]): Text to insert. + embedding (Embeddings): Embedding function to use. + metadatas (Optional[List[dict]], optional): Dict metatadata. + Defaults to None. + + Returns: + VectorStore: The Milvus vector store. + """ + try: + from pymilvus import ( + Collection, + CollectionSchema, + DataType, + FieldSchema, + connections, + ) + from pymilvus.orm.types import infer_dtype_bydata + except ImportError: + raise ValueError( + "Could not import pymilvus python package. " + "Please it install it with `pip install pymilvus`." + ) + # Connect to Milvus instance + if not connections.has_connection("default"): + connections.connect(**kwargs.get("connection_args", {"port": 19530})) + # Determine embedding dim + embeddings = embedding.embed_query(texts[0]) + dim = len(embeddings) + # Generate unique names + primary_field = "c" + str(uuid.uuid4().hex) + vector_field = "c" + str(uuid.uuid4().hex) + text_field = "c" + str(uuid.uuid4().hex) + collection_name = "c" + str(uuid.uuid4().hex) + fields = [] + # Determine metadata schema + if metadatas: + # Check if all metadata keys line up + key = metadatas[0].keys() + for x in metadatas: + if key != x.keys(): + raise ValueError( + "Mismatched metadata. " + "Make sure all metadata has the same keys and datatype." + ) + # Create FieldSchema for each entry in singular metadata. + for key, value in metadatas[0].items(): + # Infer the corresponding datatype of the metadata + dtype = infer_dtype_bydata(value) + if dtype == DataType.UNKNOWN: + raise ValueError(f"Unrecognized datatype for {key}.") + elif dtype == DataType.VARCHAR: + # Find out max length text based metadata + max_length = 0 + for subvalues in metadatas: + max_length = max(max_length, len(subvalues[key])) + fields.append( + FieldSchema(key, DataType.VARCHAR, max_length=max_length + 1) + ) + else: + fields.append(FieldSchema(key, dtype)) + + # Find out max length of texts + max_length = 0 + for y in texts: + max_length = max(max_length, len(y)) + # Create the text field + fields.append( + FieldSchema(text_field, DataType.VARCHAR, max_length=max_length + 1) + ) + # Create the primary key field + fields.append( + FieldSchema(primary_field, DataType.INT64, is_primary=True, auto_id=True) + ) + # Create the vector field + fields.append(FieldSchema(vector_field, DataType.FLOAT_VECTOR, dim=dim)) + # Create the schema for the collection + schema = CollectionSchema(fields) + # Create the collection + collection = Collection(collection_name, schema) + # Index parameters for the collection + index = { + "index_type": "HNSW", + "metric_type": "L2", + "params": {"M": 8, "efConstruction": 64}, + } + # Create the index + collection.create_index(vector_field, index) + # Create the VectorStore + milvus = cls( + embedding, + kwargs.get("connection_args", {"port": 19530}), + collection_name, + text_field, + ) + # Add the texts. + milvus.add_texts(texts, metadatas) + + return milvus diff --git a/tests/integration_tests/vectorstores/fake_embeddings.py b/tests/integration_tests/vectorstores/fake_embeddings.py new file mode 100644 index 0000000000..c7b571e803 --- /dev/null +++ b/tests/integration_tests/vectorstores/fake_embeddings.py @@ -0,0 +1,18 @@ +"""Fake Embedding class for testing purposes.""" +from typing import List + +from langchain.embeddings.base import Embeddings + +fake_texts = ["foo", "bar", "baz"] + + +class FakeEmbeddings(Embeddings): + """Fake embeddings functionality for testing.""" + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + """Return simple embeddings.""" + return [[1.0] * 9 + [i] for i in range(len(texts))] + + def embed_query(self, text: str) -> List[float]: + """Return simple embeddings.""" + return [1.0] * 9 + [0.0] diff --git a/tests/integration_tests/vectorstores/test_elasticsearch.py b/tests/integration_tests/vectorstores/test_elasticsearch.py index d3fd801f0a..075fab4ada 100644 --- a/tests/integration_tests/vectorstores/test_elasticsearch.py +++ b/tests/integration_tests/vectorstores/test_elasticsearch.py @@ -1,21 +1,8 @@ """Test ElasticSearch functionality.""" -from typing import List from langchain.docstore.document import Document -from langchain.embeddings.base import Embeddings from langchain.vectorstores.elastic_vector_search import ElasticVectorSearch - - -class FakeEmbeddings(Embeddings): - """Fake embeddings functionality for testing.""" - - def embed_documents(self, texts: List[str]) -> List[List[float]]: - """Return simple embeddings.""" - return [[1.0] * 9 + [i] for i in range(len(texts))] - - def embed_query(self, text: str) -> List[float]: - """Return simple embeddings.""" - return [1.0] * 9 + [0.0] +from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings def test_elasticsearch() -> None: diff --git a/tests/integration_tests/vectorstores/test_faiss.py b/tests/integration_tests/vectorstores/test_faiss.py index 07d60782b0..690654f76e 100644 --- a/tests/integration_tests/vectorstores/test_faiss.py +++ b/tests/integration_tests/vectorstores/test_faiss.py @@ -1,26 +1,13 @@ """Test FAISS functionality.""" import tempfile -from typing import List import pytest from langchain.docstore.document import Document from langchain.docstore.in_memory import InMemoryDocstore from langchain.docstore.wikipedia import Wikipedia -from langchain.embeddings.base import Embeddings from langchain.vectorstores.faiss import FAISS - - -class FakeEmbeddings(Embeddings): - """Fake embeddings functionality for testing.""" - - def embed_documents(self, texts: List[str]) -> List[List[float]]: - """Return simple embeddings.""" - return [[i] * 10 for i in range(len(texts))] - - def embed_query(self, text: str) -> List[float]: - """Return simple embeddings.""" - return [0] * 10 +from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings def test_faiss() -> None: diff --git a/tests/integration_tests/vectorstores/test_milvus.py b/tests/integration_tests/vectorstores/test_milvus.py new file mode 100644 index 0000000000..063427e766 --- /dev/null +++ b/tests/integration_tests/vectorstores/test_milvus.py @@ -0,0 +1,53 @@ +"""Test Milvus functionality.""" +from typing import List, Optional + +from langchain.docstore.document import Document +from langchain.vectorstores import Milvus +from tests.integration_tests.vectorstores.fake_embeddings import ( + FakeEmbeddings, + fake_texts, +) + + +def _milvus_from_texts(metadatas: Optional[List[dict]] = None) -> Milvus: + return Milvus.from_texts( + fake_texts, + FakeEmbeddings(), + metadatas=metadatas, + connection_args={"host": "127.0.0.1", "port": "19530"}, + ) + + +def test_milvus() -> None: + """Test end to end construction and search.""" + docsearch = _milvus_from_texts() + output = docsearch.similarity_search("foo", k=1) + assert output == [Document(page_content="foo")] + + +def test_milvus_with_score() -> None: + """Test end to end construction and search with scores and IDs.""" + texts = ["foo", "bar", "baz"] + metadatas = [{"page": i} for i in range(len(texts))] + docsearch = _milvus_from_texts(metadatas=metadatas) + output = docsearch.similarity_search_with_score("foo", k=3) + docs = [o[0] for o in output] + scores = [o[1] for o in output] + assert docs == [ + Document(page_content="foo", metadata={"page": 0}), + Document(page_content="bar", metadata={"page": 1}), + Document(page_content="baz", metadata={"page": 2}), + ] + assert scores[0] < scores[1] < scores[2] + + +def test_milvus_max_marginal_relevance_search() -> None: + """Test end to end construction and MRR search.""" + texts = ["foo", "bar", "baz"] + metadatas = [{"page": i} for i in range(len(texts))] + docsearch = _milvus_from_texts(metadatas=metadatas) + output = docsearch.max_marginal_relevance_search("foo", k=2, fetch_k=3) + assert output == [ + Document(page_content="foo", metadata={"page": 0}), + Document(page_content="baz", metadata={"page": 2}), + ] diff --git a/tests/integration_tests/vectorstores/test_qdrant.py b/tests/integration_tests/vectorstores/test_qdrant.py index 660e625011..3849cca838 100644 --- a/tests/integration_tests/vectorstores/test_qdrant.py +++ b/tests/integration_tests/vectorstores/test_qdrant.py @@ -1,21 +1,7 @@ """Test Qdrant functionality.""" -from typing import List - from langchain.docstore.document import Document -from langchain.embeddings.base import Embeddings from langchain.vectorstores import Qdrant - - -class FakeEmbeddings(Embeddings): - """Fake embeddings functionality for testing.""" - - def embed_documents(self, texts: List[str]) -> List[List[float]]: - """Return simple embeddings.""" - return [[1.0] * 9 + [float(i)] for i in range(len(texts))] - - def embed_query(self, text: str) -> List[float]: - """Return simple embeddings.""" - return [1.0] * 9 + [0.0] +from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings def test_qdrant() -> None: