From e533da8bf2a313650e67dacc8282c96898ea6eee Mon Sep 17 00:00:00 2001 From: Tom Date: Thu, 6 Jul 2023 07:44:12 +1000 Subject: [PATCH] Adding Marqo to vectorstore ecosystem (#7068) This PR brings in a vectorstore interface for [Marqo](https://www.marqo.ai/). The Marqo vectorstore exposes some of Marqo's functionality in addition the the VectorStore base class. The Marqo vectorstore also makes the embedding parameter optional because inference for embeddings is an inherent part of Marqo. Docs, notebook examples and integration tests included. Related PR: https://github.com/hwchase17/langchain/pull/2807 --------- Co-authored-by: Tom Hamer Co-authored-by: Harrison Chase --- docs/extras/ecosystem/integrations/marqo.md | 31 ++ .../vectorstores/integrations/marqo.ipynb | 442 ++++++++++++++++ langchain/vectorstores/__init__.py | 2 + langchain/vectorstores/marqo.py | 471 ++++++++++++++++++ poetry.lock | 37 +- pyproject.toml | 2 + .../vectorstores/test_marqo.py | 178 +++++++ 7 files changed, 1150 insertions(+), 13 deletions(-) create mode 100644 docs/extras/ecosystem/integrations/marqo.md create mode 100644 docs/extras/modules/data_connection/vectorstores/integrations/marqo.ipynb create mode 100644 langchain/vectorstores/marqo.py create mode 100644 tests/integration_tests/vectorstores/test_marqo.py diff --git a/docs/extras/ecosystem/integrations/marqo.md b/docs/extras/ecosystem/integrations/marqo.md new file mode 100644 index 0000000000..4000a3bfd7 --- /dev/null +++ b/docs/extras/ecosystem/integrations/marqo.md @@ -0,0 +1,31 @@ +# Marqo + +This page covers how to use the Marqo ecosystem within LangChain. + +### **What is Marqo?** + +Marqo is a tensor search engine that uses embeddings stored in in-memory HNSW indexes to achieve cutting edge search speeds. Marqo can scale to hundred-million document indexes with horizontal index sharding and allows for async and non-blocking data upload and search. Marqo uses the latest machine learning models from PyTorch, Huggingface, OpenAI and more. You can start with a pre-configured model or bring your own. The built in ONNX support and conversion allows for faster inference and higher throughput on both CPU and GPU. + +Because Marqo include its own inference your documents can have a mix of text and images, you can bring Marqo indexes with data from your other systems into the langchain ecosystem without having to worry about your embeddings being compatible. + +Deployment of Marqo is flexible, you can get started yourself with our docker image or [contact us about our managed cloud offering!](https://www.marqo.ai/pricing) + +To run Marqo locally with our docker image, [see our getting started.](https://docs.marqo.ai/latest/) + +## Installation and Setup +- Install the Python SDK with `pip install marqo` + +## Wrappers + +### VectorStore + +There exists a wrapper around Marqo indexes, allowing you to use them within the vectorstore framework. Marqo lets you select from a range of models for generating embeddings and exposes some preprocessing configurations. + +The Marqo vectorstore can also work with existing multimodel indexes where your documents have a mix of images and text, for more information refer to [our documentation](https://docs.marqo.ai/latest/#multi-modal-and-cross-modal-search). Note that instaniating the Marqo vectorstore with an existing multimodal index will disable the ability to add any new documents to it via the langchain vectorstore `add_texts` method. + +To import this vectorstore: +```python +from langchain.vectorstores import Marqo +``` + +For a more detailed walkthrough of the Marqo wrapper and some of its unique features, see [this notebook](../modules/indexes/vectorstores/examples/marqo.ipynb) diff --git a/docs/extras/modules/data_connection/vectorstores/integrations/marqo.ipynb b/docs/extras/modules/data_connection/vectorstores/integrations/marqo.ipynb new file mode 100644 index 0000000000..caadabf06e --- /dev/null +++ b/docs/extras/modules/data_connection/vectorstores/integrations/marqo.ipynb @@ -0,0 +1,442 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "id": "683953b3", + "metadata": {}, + "source": [ + "# Marqo\n", + "\n", + "This notebook shows how to use functionality related to the Marqo database." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "aac9563e", + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.text_splitter import CharacterTextSplitter\n", + "from langchain.vectorstores import Marqo\n", + "from langchain.document_loaders import TextLoader" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "a3c3999a", + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.document_loaders import TextLoader\n", + "loader = TextLoader('../../../state_of_the_union.txt')\n", + "documents = loader.load()\n", + "text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)\n", + "docs = text_splitter.split_documents(documents)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "6e104aee", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Index langchain-demo exists.\n" + ] + } + ], + "source": [ + "import marqo \n", + "\n", + "# initialize marqo\n", + "marqo_url = \"http://localhost:8882\" # if using marqo cloud replace with your endpoint (console.marqo.ai)\n", + "marqo_api_key = \"\" # if using marqo cloud replace with your api key (console.marqo.ai)\n", + "\n", + "client = marqo.Client(url=marqo_url, api_key=marqo_api_key)\n", + "\n", + "index_name = \"langchain-demo\"\n", + "\n", + "docsearch = Marqo.from_documents(docs, index_name=index_name)\n", + "\n", + "query = \"What did the president say about Ketanji Brown Jackson\"\n", + "result_docs = docsearch.similarity_search(query)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "9c608226", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n", + "\n", + "And I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nation’s top legal minds, who will continue Justice Breyer’s legacy of excellence.\n" + ] + } + ], + "source": [ + "print(result_docs[0].page_content)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "98704b27", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n", + "\n", + "And I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nation’s top legal minds, who will continue Justice Breyer’s legacy of excellence.\n", + "0.68647254\n" + ] + } + ], + "source": [ + "result_docs = docsearch.similarity_search_with_score(query)\n", + "print(result_docs[0][0].page_content, result_docs[0][1], sep=\"\\n\")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "eb3395b6", + "metadata": {}, + "source": [ + "## Additional features\n", + "\n", + "One of the powerful features of Marqo as a vectorstore is that you can use indexes created externally. For example:\n", + "\n", + "+ If you had a database of image and text pairs from another application, you can simply just use it in langchain with the Marqo vectorstore. Note that bringing your own multimodal indexes will disable the `add_texts` method.\n", + "\n", + "+ If you had a database of text documents, you can bring it into the langchain framework and add more texts through `add_texts`.\n", + "\n", + "The documents that are returned are customised by passing your own function to the `page_content_builder` callback in the search methods." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "35b99fef", + "metadata": {}, + "source": [ + "#### Multimodal Example" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "a359ed74", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'errors': False,\n", + " 'processingTimeMs': 4675.6921890009835,\n", + " 'index_name': 'langchain-multimodal-demo',\n", + " 'items': [{'_id': '7af25f35-5d41-4ff5-95fa-ab6bd6755176',\n", + " 'result': 'created',\n", + " 'status': 201},\n", + " {'_id': '70434d17-2680-4e33-b060-a37b9b8b6959',\n", + " 'result': 'created',\n", + " 'status': 201}]}" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "\n", + "# use a new index\n", + "index_name = \"langchain-multimodal-demo\"\n", + "\n", + "# incase the demo is re-run\n", + "try:\n", + " client.delete_index(index_name)\n", + "except Exception:\n", + " print(f\"Creating {index_name}\")\n", + " \n", + "# This index could have been created by another system\n", + "settings = {\"treat_urls_and_pointers_as_images\": True, \"model\": \"ViT-L/14\"}\n", + "client.create_index(index_name, **settings)\n", + "client.index(index_name).add_documents(\n", + " [ \n", + " # image of a bus\n", + " {\n", + " \"caption\": \"Bus\",\n", + " \"image\": \"https://raw.githubusercontent.com/marqo-ai/marqo/mainline/examples/ImageSearchGuide/data/image4.jpg\"\n", + " },\n", + " # image of a plane\n", + " { \n", + " \"caption\": \"Plane\", \n", + " \"image\": \"https://raw.githubusercontent.com/marqo-ai/marqo/mainline/examples/ImageSearchGuide/data/image2.jpg\"\n", + " }\n", + " ],\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "368d1fab", + "metadata": {}, + "outputs": [], + "source": [ + "def get_content(res):\n", + " \"\"\"Helper to format Marqo's documents into text to be used as page_content\"\"\"\n", + " return f\"{res['caption']}: {res['image']}\"\n", + "\n", + "docsearch = Marqo(client, index_name, page_content_builder=get_content)\n", + "\n", + "\n", + "query = \"vehicles that fly\"\n", + "doc_results = docsearch.similarity_search(query)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "eef4edf9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Plane: https://raw.githubusercontent.com/marqo-ai/marqo/mainline/examples/ImageSearchGuide/data/image2.jpg\n", + "Bus: https://raw.githubusercontent.com/marqo-ai/marqo/mainline/examples/ImageSearchGuide/data/image4.jpg\n" + ] + } + ], + "source": [ + "for doc in doc_results:\n", + " print(doc.page_content)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "c255f603", + "metadata": {}, + "source": [ + "#### Text only example" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "9e9a2b20", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'errors': False,\n", + " 'processingTimeMs': 500.1302719992964,\n", + " 'index_name': 'langchain-byo-index-demo',\n", + " 'items': [{'_id': 'cbad6f9e-a4ea-45c6-9a85-1b9c0a59827c',\n", + " 'result': 'created',\n", + " 'status': 201},\n", + " {'_id': 'c0be68cb-8847-4e95-a4c9-4791b54f772c',\n", + " 'result': 'created',\n", + " 'status': 201}]}" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "\n", + "# use a new index\n", + "index_name = \"langchain-byo-index-demo\"\n", + "\n", + "# incase the demo is re-run\n", + "try:\n", + " client.delete_index(index_name)\n", + "except Exception:\n", + " print(f\"Creating {index_name}\")\n", + "\n", + "# This index could have been created by another system\n", + "client.index(index_name).add_documents(\n", + " [ \n", + " {\n", + " \"Title\": \"Smartphone\",\n", + " \"Description\": \"A smartphone is a portable computer device that combines mobile telephone \"\n", + " \"functions and computing functions into one unit.\",\n", + " },\n", + " { \n", + " \"Title\": \"Telephone\",\n", + " \"Description\": \"A telephone is a telecommunications device that permits two or more users to\"\n", + " \"conduct a conversation when they are too far apart to be easily heard directly.\",\n", + " }\n", + " ],\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "b2943ea9", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['484d8436-cb09-49f2-8f9d-39671c7ebfaa']" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Note text indexes retain the ability to use add_texts despite different field names in documents\n", + "# this is because the page_content_builder callback lets you handle these document fields as required\n", + "\n", + "def get_content(res):\n", + " \"\"\"Helper to format Marqo's documents into text to be used as page_content\"\"\"\n", + " if 'text' in res:\n", + " return res['text']\n", + " return res['Description']\n", + "\n", + "\n", + "docsearch = Marqo(client, index_name, page_content_builder=get_content)\n", + "\n", + "docsearch.add_texts([\"This is a document that is about elephants\"])\n" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "851450e9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "A smartphone is a portable computer device that combines mobile telephone functions and computing functions into one unit.\n" + ] + } + ], + "source": [ + "query = \"modern communications devices\"\n", + "doc_results = docsearch.similarity_search(query)\n", + "\n", + "print(doc_results[0].page_content)" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "9a438fec", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "This is a document that is about elephants\n" + ] + } + ], + "source": [ + "query = \"elephants\"\n", + "doc_results = docsearch.similarity_search(query, page_content_builder=get_content)\n", + "\n", + "print(doc_results[0].page_content)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "0d04c9d4", + "metadata": {}, + "source": [ + "## Weighted Queries\n", + "\n", + "We also expose marqos weighted queries which are a powerful way to compose complex semantic searches." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "d42ba0d6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "A smartphone is a portable computer device that combines mobile telephone functions and computing functions into one unit.\n" + ] + } + ], + "source": [ + "query = {\"communications devices\" : 1.0}\n", + "doc_results = docsearch.similarity_search(query)\n", + "print(doc_results[0].page_content)" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "b5918a16", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "A telephone is a telecommunications device that permits two or more users toconduct a conversation when they are too far apart to be easily heard directly.\n" + ] + } + ], + "source": [ + "query = {\"communications devices\" : 1.0, \"technology post 2000\": -1.0}\n", + "doc_results = docsearch.similarity_search(query)\n", + "print(doc_results[0].page_content)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/langchain/vectorstores/__init__.py b/langchain/vectorstores/__init__.py index cb3db2fa56..57b0835066 100644 --- a/langchain/vectorstores/__init__.py +++ b/langchain/vectorstores/__init__.py @@ -19,6 +19,7 @@ from langchain.vectorstores.elastic_vector_search import ElasticVectorSearch from langchain.vectorstores.faiss import FAISS from langchain.vectorstores.hologres import Hologres from langchain.vectorstores.lancedb import LanceDB +from langchain.vectorstores.marqo import Marqo from langchain.vectorstores.matching_engine import MatchingEngine from langchain.vectorstores.milvus import Milvus from langchain.vectorstores.mongodb_atlas import MongoDBAtlasVectorSearch @@ -61,6 +62,7 @@ __all__ = [ "Hologres", "LanceDB", "MatchingEngine", + "Marqo", "Milvus", "Zilliz", "SingleStoreDB", diff --git a/langchain/vectorstores/marqo.py b/langchain/vectorstores/marqo.py new file mode 100644 index 0000000000..119a1b6231 --- /dev/null +++ b/langchain/vectorstores/marqo.py @@ -0,0 +1,471 @@ +"""Wrapper around weaviate vector database.""" +from __future__ import annotations + +import json +import uuid +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterable, + List, + Optional, + Tuple, + Type, + Union, +) + +from langchain.docstore.document import Document +from langchain.embeddings.base import Embeddings +from langchain.vectorstores.base import VectorStore + +if TYPE_CHECKING: + import marqo + + +class Marqo(VectorStore): + """Wrapper around Marqo database. + + Marqo indexes have their own models associated with them to generate your + embeddings. This means that you can selected from a range of different models + and also use CLIP models to create multimodal indexes + with images and text together. + + Marqo also supports more advanced queries with mutliple weighted terms, see See + https://docs.marqo.ai/latest/#searching-using-weights-in-queries. + This class can flexibly take strings or dictionaries for weighted queries + in its similarity search methods. + + To use, you should have the `marqo` python package installed, you can do this with + `pip install marqo`. + + Example: + .. code-block:: python + + import marqo + from langchain.vectorstores import Marqo + client = marqo.Client(url=os.environ["MARQO_URL"], ...) + vectorstore = Marqo(client, index_name) + + """ + + def __init__( + self, + client: marqo.Client, + index_name: str, + add_documents_settings: Optional[Dict[str, Any]] = None, + searchable_attributes: Optional[List[str]] = None, + page_content_builder: Optional[Callable[[Dict[str, Any]], str]] = None, + ): + """Initialize with Marqo client.""" + try: + import marqo + except ImportError: + raise ValueError( + "Could not import marqo python package. " + "Please install it with `pip install marqo`." + ) + if not isinstance(client, marqo.Client): + raise ValueError( + f"client should be an instance of marqo.Client, got {type(client)}" + ) + self._client = client + self._index_name = index_name + self._add_documents_settings = ( + {} if add_documents_settings is None else add_documents_settings + ) + self._searchable_attributes = searchable_attributes + self.page_content_builder = page_content_builder + + self._non_tensor_fields = ["metadata"] + + self._document_batch_size = 1024 + + def add_texts( + self, + texts: Iterable[str], + metadatas: Optional[List[dict]] = None, + **kwargs: Any, + ) -> List[str]: + """Upload texts with metadata (properties) to Marqo. + + You can either have marqo generate ids for each document or you can provide + your own by including a "_id" field in the metadata objects. + + Args: + texts (Iterable[str]): am iterator of texts - assumed to preserve an + order that matches the metadatas. + metadatas (Optional[List[dict]], optional): a list of metadatas. + + Raises: + ValueError: if metadatas is provided and the number of metadatas differs + from the number of texts. + + Returns: + List[str]: The list of ids that were added. + """ + + if self._client.index(self._index_name).get_settings()["index_defaults"][ + "treat_urls_and_pointers_as_images" + ]: + raise ValueError( + "Marqo.add_texts is disabled for multimodal indexes. To add documents " + "with a multimodal index use the Python client for Marqo directly." + ) + documents: List[Dict[str, str]] = [] + + num_docs = 0 + for i, text in enumerate(texts): + doc = { + "text": text, + "metadata": json.dumps(metadatas[i]) if metadatas else json.dumps({}), + } + documents.append(doc) + num_docs += 1 + + ids = [] + for i in range(0, num_docs, self._document_batch_size): + response = self._client.index(self._index_name).add_documents( + documents[i : i + self._document_batch_size], + non_tensor_fields=self._non_tensor_fields, + **self._add_documents_settings, + ) + if response["errors"]: + err_msg = ( + f"Error in upload for documents in index range [{i}," + f"{i + self._document_batch_size}], " + f"check Marqo logs." + ) + raise RuntimeError(err_msg) + + ids += [item["_id"] for item in response["items"]] + + return ids + + def similarity_search( + self, + query: Union[str, Dict[str, float]], + k: int = 4, + **kwargs: Any, + ) -> List[Document]: + """Search the marqo index for the most similar documents. + + Args: + query (Union[str, Dict[str, float]]): The query for the search, either + as a string or a weighted query. + k (int, optional): The number of documents to return. Defaults to 4. + + Returns: + List[Document]: k documents ordered from best to worst match. + """ + results = self.marqo_similarity_search(query=query, k=k) + + documents = self._construct_documents_from_results_without_score(results) + return documents + + def similarity_search_with_score( + self, + query: Union[str, Dict[str, float]], + k: int = 4, + ) -> List[Tuple[Document, float]]: + """Return documents from Marqo that are similar to the query as well + as their scores. + + Args: + query (str): The query to search with, either as a string or a weighted + query. + k (int, optional): The number of documents to return. Defaults to 4. + + Returns: + List[Tuple[Document, float]]: The matching documents and their scores, + ordered by descending score. + """ + results = self.marqo_similarity_search(query=query, k=k) + + scored_documents = self._construct_documents_from_results_with_score(results) + return scored_documents + + def bulk_similarity_search( + self, + queries: Iterable[Union[str, Dict[str, float]]], + k: int = 4, + **kwargs: Any, + ) -> List[List[Document]]: + """Search the marqo index for the most similar documents in bulk with multiple + queries. + + Args: + queries (Iterable[Union[str, Dict[str, float]]]): An iterable of queries to + execute in bulk, queries in the list can be strings or dictonaries of + weighted queries. + k (int, optional): The number of documents to return for each query. + Defaults to 4. + + Returns: + List[List[Document]]: A list of results for each query. + """ + bulk_results = self.marqo_bulk_similarity_search(queries=queries, k=k) + bulk_documents: List[List[Document]] = [] + for results in bulk_results["result"]: + documents = self._construct_documents_from_results_without_score(results) + bulk_documents.append(documents) + + return bulk_documents + + def bulk_similarity_search_with_score( + self, + queries: Iterable[Union[str, Dict[str, float]]], + k: int = 4, + **kwargs: Any, + ) -> List[List[Tuple[Document, float]]]: + """Return documents from Marqo that are similar to the query as well as + their scores using a batch of queries. + + Args: + query (Iterable[Union[str, Dict[str, float]]]): An iterable of queries + to execute in bulk, queries in the list can be strings or dictonaries + of weighted queries. + k (int, optional): The number of documents to return. Defaults to 4. + + Returns: + List[Tuple[Document, float]]: A list of lists of the matching + documents and their scores for each query + """ + bulk_results = self.marqo_bulk_similarity_search(queries=queries, k=k) + bulk_documents: List[List[Tuple[Document, float]]] = [] + for results in bulk_results["result"]: + documents = self._construct_documents_from_results_with_score(results) + bulk_documents.append(documents) + + return bulk_documents + + def _construct_documents_from_results_with_score( + self, results: Dict[str, List[Dict[str, str]]] + ) -> List[Tuple[Document, Any]]: + """Helper to convert Marqo results into documents. + + Args: + results (List[dict]): A marqo results object with the 'hits'. + include_scores (bool, optional): Include scores alongside documents. + Defaults to False. + + Returns: + Union[List[Document], List[Tuple[Document, float]]]: The documents or + document score pairs if `include_scores` is true. + """ + documents: List[Tuple[Document, Any]] = [] + for res in results["hits"]: + if self.page_content_builder is None: + text = res["text"] + else: + text = self.page_content_builder(res) + + metadata = json.loads(res.get("metadata", "{}")) + documents.append( + (Document(page_content=text, metadata=metadata), res["_score"]) + ) + return documents + + def _construct_documents_from_results_without_score( + self, results: Dict[str, List[Dict[str, str]]] + ) -> List[Document]: + """Helper to convert Marqo results into documents. + + Args: + results (List[dict]): A marqo results object with the 'hits'. + include_scores (bool, optional): Include scores alongside documents. + Defaults to False. + + Returns: + Union[List[Document], List[Tuple[Document, float]]]: The documents or + document score pairs if `include_scores` is true. + """ + documents: List[Document] = [] + for res in results["hits"]: + if self.page_content_builder is None: + text = res["text"] + else: + text = self.page_content_builder(res) + + metadata = json.loads(res.get("metadata", "{}")) + documents.append(Document(page_content=text, metadata=metadata)) + return documents + + def marqo_similarity_search( + self, + query: Union[str, Dict[str, float]], + k: int = 4, + ) -> Dict[str, List[Dict[str, str]]]: + """Return documents from Marqo exposing Marqo's output directly + + Args: + query (str): The query to search with. + k (int, optional): The number of documents to return. Defaults to 4. + + Returns: + List[Dict[str, Any]]: This hits from marqo. + """ + results = self._client.index(self._index_name).search( + q=query, searchable_attributes=self._searchable_attributes, limit=k + ) + return results + + def marqo_bulk_similarity_search( + self, queries: Iterable[Union[str, Dict[str, float]]], k: int = 4 + ) -> Dict[str, List[Dict[str, List[Dict[str, str]]]]]: + """Return documents from Marqo using a bulk search, exposes Marqo's + output directly + + Args: + queries (Iterable[Union[str, Dict[str, float]]]): A list of queries. + k (int, optional): The number of documents to return for each query. + Defaults to 4. + + Returns: + Dict[str, Dict[List[Dict[str, Dict[str, Any]]]]]: A bulk search results + object + """ + bulk_results = self._client.bulk_search( + [ + { + "index": self._index_name, + "q": query, + "searchableAttributes": self._searchable_attributes, + "limit": k, + } + for query in queries + ] + ) + return bulk_results + + @classmethod + def from_documents( + cls: Type[Marqo], + documents: List[Document], + embedding: Embeddings, + **kwargs: Any, + ) -> Marqo: + """Return VectorStore initialized from documents. Note that Marqo does not + need embeddings, we retain the parameter to adhere to the Liskov substitution + principle. + + + Args: + documents (List[Document]): Input documents + embedding (Any, optional): Embeddings (not required). Defaults to None. + + Returns: + VectorStore: A Marqo vectorstore + """ + texts = [d.page_content for d in documents] + metadatas = [d.metadata for d in documents] + return cls.from_texts(texts, metadatas=metadatas, **kwargs) + + @classmethod + def from_texts( + cls, + texts: List[str], + embedding: Any = None, + metadatas: Optional[List[dict]] = None, + index_name: str = "", + url: str = "http://localhost:8882", + api_key: str = "", + marqo_device: str = "cpu", + add_documents_settings: Optional[Dict[str, Any]] = {}, + searchable_attributes: Optional[List[str]] = None, + page_content_builder: Optional[Callable[[Dict[str, str]], str]] = None, + index_settings: Optional[Dict[str, Any]] = {}, + verbose: bool = True, + **kwargs: Any, + ) -> Marqo: + """Return Marqo initialized from texts. Note that Marqo does not need + embeddings, we retain the parameter to adhere to the Liskov + substitution principle. + + This is a quick way to get started with marqo - simply provide your texts and + metadatas and this will create an instance of the data store and index the + provided data. + + To know the ids of your documents with this approach you will need to include + them in under the key "_id" in your metadatas for each text + + Example: + .. code-block:: python + + from langchain.vectorstores import Marqo + + datastore = Marqo(texts=['text'], index_name='my-first-index', + url='http://localhost:8882') + + Args: + texts (List[str]): A list of texts to index into marqo upon creation. + embedding (Any, optional): Embeddings (not required). Defaults to None. + index_name (str, optional): The name of the index to use, if none is + provided then one will be created with a UUID. Defaults to None. + url (str, optional): The URL for Marqo. Defaults to "http://localhost:8882". + api_key (str, optional): The API key for Marqo. Defaults to "". + metadatas (Optional[List[dict]], optional): A list of metadatas, to + accompany the texts. Defaults to None. + marqo_device (str, optional): The device for the marqo to use on the server, + this is only used when a new index is being created. Defaults to "cpu". Can + be "cpu" or "cuda". + add_documents_settings (Optional[Dict[str, Any]], optional): Settings + for adding documents, see + https://docs.marqo.ai/0.0.16/API-Reference/documents/#query-parameters. + Defaults to {}. + index_settings (Optional[Dict[str, Any]], optional): Index settings if + the index doesn't exist, see + https://docs.marqo.ai/0.0.16/API-Reference/indexes/#index-defaults-object. + Defaults to {}. + + Returns: + Marqo: An instance of the Marqo vector store + """ + try: + import marqo + except ImportError: + raise ValueError( + "Could not import marqo python package. " + "Please install it with `pip install marqo`." + ) + + if not index_name: + index_name = str(uuid.uuid4()) + + client = marqo.Client(url=url, api_key=api_key, indexing_device=marqo_device) + + try: + client.create_index(index_name, settings_dict=index_settings) + if verbose: + print(f"Created {index_name} successfully.") + except Exception: + if verbose: + print(f"Index {index_name} exists.") + + instance: Marqo = cls( + client, + index_name, + searchable_attributes=searchable_attributes, + add_documents_settings=add_documents_settings, + page_content_builder=page_content_builder, + ) + instance.add_texts(texts, metadatas) + return instance + + def get_indexes(self) -> List[Dict[str, str]]: + """Helper to see your available indexes in marqo, useful if the + from_texts method was used without an index name specified + + Returns: + List[Dict[str, str]]: The list of indexes + """ + return self._client.get_indexes()["results"] + + def get_number_of_documents(self) -> int: + """Helper to see the number of documents in the index + + Returns: + int: The number of documents + """ + return self._client.index(self._index_name).get_stats()["numberOfDocuments"] diff --git a/poetry.lock b/poetry.lock index 22e95cfb7b..683062cd59 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. +# This file is automatically @generated by Poetry and should not be changed by hand. [[package]] name = "absl-py" @@ -642,17 +642,12 @@ category = "main" optional = true python-versions = ">=3.7" files = [ - {file = "awadb-0.3.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:871e2b10c79348d44522b8430af1ed1ad2632322c74abc20d8a3154de242da96"}, {file = "awadb-0.3.3-cp310-cp310-manylinux1_x86_64.whl", hash = "sha256:daebc108103c8cace41dfb3235fcfdda28ea48e6cd6548b6072f7ad49b64274b"}, {file = "awadb-0.3.3-cp311-cp311-macosx_10_13_universal2.whl", hash = "sha256:2bb3ca2f943448060b1bba4395dd99e2218d7f2149507a8fdfa7a3fd4cfe97ec"}, - {file = "awadb-0.3.3-cp311-cp311-macosx_10_13_x86_64.whl", hash = "sha256:83e92963cde54a4382b0c299939865ce12e145853637642bc8e6eb22bf689386"}, - {file = "awadb-0.3.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:6f249b04f38840146a5c17ffcd0f4da1bb00a39b8882c96e042acf58045faca2"}, {file = "awadb-0.3.3-cp311-cp311-manylinux1_x86_64.whl", hash = "sha256:7b99662af9f7b58e217661a70c295e40605900552bec6d8e9553d90dbf19c5c1"}, {file = "awadb-0.3.3-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:94be44e587f28fa26b2cade0b6f4c04689f50cb0c07183db5ee50e48fe2e9ae3"}, {file = "awadb-0.3.3-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:314929dc3a8d25c0f234a2b86c920543050f4eb298a6f68bd2c97c9fe3fb6224"}, - {file = "awadb-0.3.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:35d8f580973e137864d2e00edbc7369cd01cf72b673d60fe902c7b3f983c76e9"}, {file = "awadb-0.3.3-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:8bfccff1c7373899153427d93d96a97ae5371e8a6f09ff4dcbd28fb9f3f63ff4"}, - {file = "awadb-0.3.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c791bdd0646ec620c8b0fa026915780ebf78c16169cd9da81f54409553ec0114"}, {file = "awadb-0.3.3-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:810021a90b873f668d8ab63e2c2747b2b2835bf0ae25f4223b6c94f06faffea4"}, ] @@ -4294,7 +4289,6 @@ optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, !=3.6.*" files = [ {file = "jsonpointer-2.4-py2.py3-none-any.whl", hash = "sha256:15d51bba20eea3165644553647711d150376234112651b4f1811022aecad7d7a"}, - {file = "jsonpointer-2.4.tar.gz", hash = "sha256:585cee82b70211fa9e6043b7bb89db6e1aa49524340dde8ad6b63206ea689d88"}, ] [[package]] @@ -4992,6 +4986,23 @@ files = [ {file = "MarkupSafe-2.1.3.tar.gz", hash = "sha256:af598ed32d6ae86f1b747b82783958b1a4ab8f617b06fe68795c7f026abbdcad"}, ] +[[package]] +name = "marqo" +version = "0.9.6" +description = "Tensor search for humans" +category = "main" +optional = true +python-versions = ">=3" +files = [ + {file = "marqo-0.9.6-py3-none-any.whl", hash = "sha256:edccded5dd1e1f519a5419ec9165b65eb3240d94fd0e2875ce2f599248187ecf"}, + {file = "marqo-0.9.6.tar.gz", hash = "sha256:6b8dd18c6216313a62fae40c62c14e7ea5e76c4f06ca30edbb3157075490a42e"}, +] + +[package.dependencies] +pydantic = "*" +requests = "*" +urllib3 = "*" + [[package]] name = "marshmallow" version = "3.19.0" @@ -11125,7 +11136,7 @@ files = [ ] [package.dependencies] -accelerate = {version = ">=0.20.2", optional = true, markers = "extra == \"accelerate\" or extra == \"torch\""} +accelerate = {version = ">=0.20.2", optional = true, markers = "extra == \"accelerate\""} filelock = "*" huggingface-hub = ">=0.14.1,<1.0" numpy = ">=1.17" @@ -12382,15 +12393,15 @@ cffi = {version = ">=1.11", markers = "platform_python_implementation == \"PyPy\ cffi = ["cffi (>=1.11)"] [extras] -all = ["O365", "aleph-alpha-client", "anthropic", "arxiv", "atlassian-python-api", "awadb", "azure-ai-formrecognizer", "azure-ai-vision", "azure-cognitiveservices-speech", "azure-cosmos", "azure-identity", "beautifulsoup4", "clarifai", "clickhouse-connect", "cohere", "deeplake", "docarray", "duckduckgo-search", "elasticsearch", "esprima", "faiss-cpu", "google-api-python-client", "google-auth", "google-search-results", "gptcache", "html2text", "huggingface_hub", "jina", "jinja2", "jq", "lancedb", "langkit", "lark", "lxml", "manifest-ml", "momento", "nebula3-python", "neo4j", "networkx", "nlpcloud", "nltk", "nomic", "octoai-sdk", "openai", "openlm", "opensearch-py", "pdfminer-six", "pexpect", "pgvector", "pinecone-client", "pinecone-text", "psycopg2-binary", "pymongo", "pyowm", "pypdf", "pytesseract", "pyvespa", "qdrant-client", "rdflib", "redis", "requests-toolbelt", "sentence-transformers", "singlestoredb", "spacy", "steamship", "tensorflow-text", "tigrisdb", "tiktoken", "torch", "transformers", "weaviate-client", "wikipedia", "wolframalpha"] -azure = ["azure-ai-formrecognizer", "azure-ai-vision", "azure-cognitiveservices-speech", "azure-core", "azure-cosmos", "azure-identity", "azure-search-documents", "openai"] +all = ["anthropic", "clarifai", "cohere", "openai", "nlpcloud", "huggingface_hub", "jina", "manifest-ml", "elasticsearch", "opensearch-py", "google-search-results", "faiss-cpu", "sentence-transformers", "transformers", "spacy", "nltk", "wikipedia", "beautifulsoup4", "tiktoken", "torch", "jinja2", "pinecone-client", "pinecone-text", "marqo", "pymongo", "weaviate-client", "redis", "google-api-python-client", "google-auth", "wolframalpha", "qdrant-client", "tensorflow-text", "pypdf", "networkx", "nomic", "aleph-alpha-client", "deeplake", "pgvector", "psycopg2-binary", "pyowm", "pytesseract", "html2text", "atlassian-python-api", "gptcache", "duckduckgo-search", "arxiv", "azure-identity", "clickhouse-connect", "azure-cosmos", "lancedb", "langkit", "lark", "pexpect", "pyvespa", "O365", "jq", "docarray", "steamship", "pdfminer-six", "lxml", "requests-toolbelt", "neo4j", "openlm", "azure-ai-formrecognizer", "azure-ai-vision", "azure-cognitiveservices-speech", "momento", "singlestoredb", "tigrisdb", "nebula3-python", "awadb", "esprima", "octoai-sdk", "rdflib"] +azure = ["azure-identity", "azure-cosmos", "openai", "azure-core", "azure-ai-formrecognizer", "azure-ai-vision", "azure-cognitiveservices-speech", "azure-search-documents"] clarifai = ["clarifai"] cohere = ["cohere"] docarray = ["docarray"] embeddings = ["sentence-transformers"] -extended-testing = ["atlassian-python-api", "beautifulsoup4", "beautifulsoup4", "bibtexparser", "cassio", "chardet", "esprima", "gql", "html2text", "jq", "lxml", "openai", "pandas", "pdfminer-six", "pgvector", "psychicapi", "py-trello", "pymupdf", "pypdf", "pypdfium2", "pyspark", "requests-toolbelt", "scikit-learn", "streamlit", "telethon", "tqdm", "zep-python"] +extended-testing = ["beautifulsoup4", "bibtexparser", "cassio", "chardet", "esprima", "jq", "pdfminer-six", "pgvector", "pypdf", "pymupdf", "pypdfium2", "tqdm", "lxml", "atlassian-python-api", "beautifulsoup4", "pandas", "telethon", "psychicapi", "zep-python", "gql", "requests-toolbelt", "html2text", "py-trello", "scikit-learn", "streamlit", "pyspark", "openai"] javascript = ["esprima"] -llms = ["anthropic", "clarifai", "cohere", "huggingface_hub", "manifest-ml", "nlpcloud", "openai", "openllm", "openlm", "torch", "transformers"] +llms = ["anthropic", "clarifai", "cohere", "openai", "openllm", "openlm", "nlpcloud", "huggingface_hub", "manifest-ml", "torch", "transformers"] openai = ["openai", "tiktoken"] qdrant = ["qdrant-client"] text-helpers = ["chardet"] @@ -12398,4 +12409,4 @@ text-helpers = ["chardet"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "e2450d84b1a0747c45b015e1071ed37265269c325362779cd8bd3b9caa94a9c9" +content-hash = "ce9bfa2954a3b468d925410fe836c6db92040e95cb9720227e12abef1f4c11ca" diff --git a/pyproject.toml b/pyproject.toml index 3560d6b8d1..1cb5b70b69 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ pinecone-text = {version = "^0.4.2", optional = true} pymongo = {version = "^4.3.3", optional = true} clickhouse-connect = {version="^0.5.14", optional=true} weaviate-client = {version = "^3", optional = true} +marqo = {version = "^0.9.1", optional=true} google-api-python-client = {version = "2.70.0", optional = true} google-auth = {version = "^2.18.1", optional = true} wolframalpha = {version = "5.0.0", optional = true} @@ -262,6 +263,7 @@ all = [ "jinja2", "pinecone-client", "pinecone-text", + "marqo", "pymongo", "weaviate-client", "redis", diff --git a/tests/integration_tests/vectorstores/test_marqo.py b/tests/integration_tests/vectorstores/test_marqo.py new file mode 100644 index 0000000000..f2685ecc1b --- /dev/null +++ b/tests/integration_tests/vectorstores/test_marqo.py @@ -0,0 +1,178 @@ +"""Test Marqo functionality.""" +from typing import Dict + +import marqo +import pytest + +from langchain.docstore.document import Document +from langchain.vectorstores.marqo import Marqo + +DEFAULT_MARQO_URL = "http://localhost:8882" +DEFAULT_MARQO_API_KEY = "" +INDEX_NAME = "langchain-integration-tests" + + +@pytest.fixture +def client() -> Marqo: + # fixture for marqo client to be used throughout testing, resets the index + client = marqo.Client(url=DEFAULT_MARQO_URL, api_key=DEFAULT_MARQO_API_KEY) + try: + client.index(INDEX_NAME).delete() + except Exception: + pass + + client.create_index(INDEX_NAME) + return client + + +def test_marqo(client: Marqo) -> None: + """Test end to end construction and search.""" + texts = ["foo", "bar", "baz"] + marqo_search = Marqo.from_texts( + texts=texts, + index_name=INDEX_NAME, + url=DEFAULT_MARQO_URL, + api_key=DEFAULT_MARQO_API_KEY, + verbose=False, + ) + results = marqo_search.similarity_search("foo", k=1) + assert results == [Document(page_content="foo")] + + +def test_marqo_with_metadatas(client: Marqo) -> None: + """Test end to end construction and search.""" + texts = ["foo", "bar", "baz"] + metadatas = [{"page": i} for i in range(len(texts))] + marqo_search = Marqo.from_texts( + texts=texts, + metadatas=metadatas, + index_name=INDEX_NAME, + url=DEFAULT_MARQO_URL, + api_key=DEFAULT_MARQO_API_KEY, + verbose=False, + ) + results = marqo_search.similarity_search("foo", k=1) + assert results == [Document(page_content="foo", metadata={"page": 0})] + + +def test_marqo_with_scores(client: Marqo) -> None: + """Test end to end construction and search with scores and IDs.""" + texts = ["foo", "bar", "baz"] + metadatas = [{"page": i} for i in range(len(texts))] + marqo_search = Marqo.from_texts( + texts=texts, + metadatas=metadatas, + index_name=INDEX_NAME, + url=DEFAULT_MARQO_URL, + api_key=DEFAULT_MARQO_API_KEY, + verbose=False, + ) + results = marqo_search.similarity_search_with_score("foo", k=3) + docs = [r[0] for r in results] + scores = [r[1] for r in results] + + assert docs == [ + Document(page_content="foo", metadata={"page": 0}), + Document(page_content="bar", metadata={"page": 1}), + Document(page_content="baz", metadata={"page": 2}), + ] + assert scores[0] > scores[1] > scores[2] + + +def test_marqo_add_texts(client: Marqo) -> None: + marqo_search = Marqo(client=client, index_name=INDEX_NAME) + ids1 = marqo_search.add_texts(["1", "2", "3"]) + assert len(ids1) == 3 + ids2 = marqo_search.add_texts(["1", "2", "3"]) + assert len(ids2) == 3 + assert len(set(ids1).union(set(ids2))) == 6 + + +def test_marqo_search(client: Marqo) -> None: + marqo_search = Marqo(client=client, index_name=INDEX_NAME) + input_documents = ["This is document 1", "2", "3"] + ids = marqo_search.add_texts(input_documents) + results = marqo_search.marqo_similarity_search("What is the first document?", k=3) + assert len(ids) == len(input_documents) + assert ids[0] == results["hits"][0]["_id"] + + +def test_marqo_bulk(client: Marqo) -> None: + marqo_search = Marqo(client=client, index_name=INDEX_NAME) + input_documents = ["This is document 1", "2", "3"] + ids = marqo_search.add_texts(input_documents) + bulk_results = marqo_search.bulk_similarity_search( + ["What is the first document?", "2", "3"], k=3 + ) + + assert len(ids) == len(input_documents) + assert bulk_results[0][0].page_content == input_documents[0] + assert bulk_results[1][0].page_content == input_documents[1] + assert bulk_results[2][0].page_content == input_documents[2] + + +def test_marqo_weighted_query(client: Marqo) -> None: + """Test end to end construction and search.""" + texts = ["Smartphone", "Telephone"] + marqo_search = Marqo.from_texts( + texts=texts, + index_name=INDEX_NAME, + url=DEFAULT_MARQO_URL, + api_key=DEFAULT_MARQO_API_KEY, + verbose=False, + ) + results = marqo_search.similarity_search( + {"communications device": 1.0, "Old technology": -5.0}, k=1 + ) + assert results == [Document(page_content="Smartphone")] + + +def test_marqo_multimodal() -> None: + client = marqo.Client(url=DEFAULT_MARQO_URL, api_key=DEFAULT_MARQO_API_KEY) + try: + client.index(INDEX_NAME).delete() + except Exception: + pass + + # reset the index for this example + client.delete_index(INDEX_NAME) + + # This index could have been created by another system + settings = {"treat_urls_and_pointers_as_images": True, "model": "ViT-L/14"} + client.create_index(INDEX_NAME, **settings) + client.index(INDEX_NAME).add_documents( + [ + # image of a bus + { + "caption": "Bus", + "image": "https://raw.githubusercontent.com/marqo-ai/marqo/mainline/" + "examples/ImageSearchGuide/data/image4.jpg", + }, + # image of a plane + { + "caption": "Plane", + "image": "https://raw.githubusercontent.com/marqo-ai/marqo/" + "mainline/examples/ImageSearchGuide/data/image2.jpg", + }, + ], + ) + + def get_content(res: Dict[str, str]) -> str: + if "text" in res: + return res["text"] + return f"{res['caption']}: {res['image']}" + + marqo_search = Marqo(client, INDEX_NAME, page_content_builder=get_content) + + query = "vehicles that fly" + docs = marqo_search.similarity_search(query) + + assert docs[0].page_content.split(":")[0] == "Plane" + + raised_value_error = False + try: + marqo_search.add_texts(["text"]) + except ValueError: + raised_value_error = True + + assert raised_value_error