Add Support for Azure Cosmos DB MongoDB vCore Vector Store #11627 (#11632)

This PR adds support for the Azure Cosmos DB MongoDB vCore Vector Store

https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/vcore/

https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/vcore/vector-search

Summary:
- **Description:** added vector store integration for Azure Cosmos DB
MongoDB vCore Vector Store,
  - **Issue:** the issue # it fixes #11627,
  - **Dependencies:** pymongo dependency,
  - **Tag maintainer:** @hwchase17,
  - **Twitter handle:** @izzyacademy

---------

Co-authored-by: Israel Ekpo <israel.ekpo@gmail.com>
Co-authored-by: Israel Ekpo <44282278+izzyacademy@users.noreply.github.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
pull/11685/head
Israel Ekpo 1 year ago committed by GitHub
parent 28ee6a7c12
commit d0603c86b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,6 +1,6 @@
# Microsoft
All functionality related to Microsoft
All functionality related to Microsoft Azure
## LLM
### Azure OpenAI
@ -109,7 +109,39 @@ from langchain.document_loaders import UnstructuredWordDocumentLoader
```
## Retriever
## Vector stores
### Azure Cosmos DB
>[Azure Cosmos DB for MongoDB vCore](https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/vcore/) makes it easy to create a database with full native MongoDB support.
> You can apply your MongoDB experience and continue to use your favorite MongoDB drivers, SDKs, and tools by pointing your application to the API for MongoDB vCore account's connection string.
> Use vector search in Azure Cosmos DB for MongoDB vCore to seamlessly integrate your AI-based applications with your data that's stored in Azure Cosmos DB.
#### Installation and Setup
See [detail configuration instructions](/docs/integrations/vectorstores/azure_cosmos_db).
We need to install `pymongo` python package.
```bash
pip install pymongo
```
#### Deploy Azure Cosmos DB on Microsoft Azure
Azure Cosmos DB for MongoDB vCore provides developers with a fully managed MongoDB-compatible database service for building modern applications with a familiar architecture.
With Cosmos DB for MongoDB vCore, developers can enjoy the benefits of native Azure integrations, low total cost of ownership (TCO), and the familiar vCore architecture when migrating existing applications or building new ones.
[Sign Up](https://azure.microsoft.com/en-us/free/) for free to get started today.
See a [usage example](/docs/integrations/vectorstores/azure_cosmos_db).
```python
from langchain.vectorstores import AzureCosmosDBVectorSearch
```
## Retrievers
### Azure Cognitive Search
>[Azure Cognitive Search](https://learn.microsoft.com/en-us/azure/search/search-what-is-azure-search) (formerly known as `Azure Search`) is a cloud search service that gives developers infrastructure, APIs, and tools for building a rich search experience over private, heterogeneous content in web, mobile, and enterprise applications.

@ -0,0 +1,362 @@
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Azure Cosmos DB\n",
"\n",
">[Azure Cosmos DB for MongoDB vCore](https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/vcore/) makes it easy to create a database with full native MongoDB support.\n",
"> You can apply your MongoDB experience and continue to use your favorite MongoDB drivers, SDKs, and tools by pointing your application to the API for MongoDB vCore account's connection string.\n",
"> Use vector search in Azure Cosmos DB for MongoDB vCore to seamlessly integrate your AI-based applications with your data that's stored in Azure Cosmos DB.\n",
"\n",
"This notebook shows you how to leverage the [Vector Search](https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/vcore/vector-search) capabilities within Azure Cosmos DB for Mongo vCore to store documents in collections, create indicies and perform vector search queries using approximate nearest neighbor algorithms such as COS (cosine distance), L2 (Euclidean distance), and IP (inner product) to locate documents close to the query vectors. \n",
" \n",
"Azure Cosmos DB for MongoDB vCore provides developers with a fully managed MongoDB-compatible database service for building modern applications with a familiar architecture.\n",
"\n",
"With Cosmos DB for MongoDB vCore, developers can enjoy the benefits of native Azure integrations, low total cost of ownership (TCO), and the familiar vCore architecture when migrating existing applications or building new ones.\n",
"\n",
"[Sign Up](https://azure.microsoft.com/en-us/free/) for free to get started today.\n",
" "
],
"metadata": {
"collapsed": false
},
"id": "245c0aa70db77606"
},
{
"cell_type": "code",
"execution_count": 2,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: pymongo in /Users/iekpo/Langchain/langchain-python/.venv/lib/python3.10/site-packages (4.5.0)\r\n",
"Requirement already satisfied: dnspython<3.0.0,>=1.16.0 in /Users/iekpo/Langchain/langchain-python/.venv/lib/python3.10/site-packages (from pymongo) (2.4.2)\r\n"
]
}
],
"source": [
"!pip install pymongo"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-10-10T17:20:00.721985Z",
"start_time": "2023-10-10T17:19:57.996265Z"
}
},
"id": "ab8e45f5bd435ade"
},
{
"cell_type": "code",
"execution_count": 24,
"outputs": [],
"source": [
"import os\n",
"import getpass\n",
"\n",
"CONNECTION_STRING = \"AZURE COSMOS DB MONGO vCORE connection string\"\n",
"INDEX_NAME = \"izzy-test-index\"\n",
"NAMESPACE = \"izzy_test_db.izzy_test_collection\"\n",
"DB_NAME, COLLECTION_NAME = NAMESPACE.split(\".\")"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-10-10T17:50:03.615234Z",
"start_time": "2023-10-10T17:50:03.604289Z"
}
},
"id": "9c7ce9e7b26efbb0"
},
{
"cell_type": "markdown",
"source": [
"We want to use `OpenAIEmbeddings` so we need to set up our Azure OpenAI API Key alongside other environment variables. "
],
"metadata": {
"collapsed": false
},
"id": "f2e66b097c6ce2e3"
},
{
"cell_type": "code",
"execution_count": 25,
"outputs": [],
"source": [
"# Set up the OpenAI Environment Variables\n",
"os.environ[\"OPENAI_API_TYPE\"] = \"azure\"\n",
"os.environ[\"OPENAI_API_VERSION\"] = \"2023-05-15\"\n",
"os.environ[\"OPENAI_API_BASE\"] = \"YOUR_OPEN_AI_ENDPOINT\" # https://example.openai.azure.com/\n",
"os.environ[\"OPENAI_API_KEY\"] = \"YOUR_OPEN_AI_KEY\"\n",
"os.environ[\"OPENAI_EMBEDDINGS_DEPLOYMENT\"] = \"smart-agent-embedding-ada\" # the deployment name for the embedding model\n",
"os.environ[\"OPENAI_EMBEDDINGS_MODEL_NAME\"] = \"text-embedding-ada-002\" # the model name\n"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-10-10T17:50:11.712929Z",
"start_time": "2023-10-10T17:50:11.703871Z"
}
},
"id": "4a052d99c6b8a2a7"
},
{
"cell_type": "markdown",
"source": [
"Now, we need to load the documents into the collection, create the index and then run our queries against the index to retrieve matches.\n",
"\n",
"Please refer to the [documentation](https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/vcore/vector-search) if you have questions about certain parameters"
],
"metadata": {
"collapsed": false
},
"id": "ebaa28c6e2b35063"
},
{
"cell_type": "code",
"execution_count": 26,
"outputs": [],
"source": [
"from langchain.docstore.document import Document\n",
"from langchain.embeddings import OpenAIEmbeddings\n",
"from langchain.schema.embeddings import Embeddings\n",
"from langchain.vectorstores.azure_cosmos_db_vector_search import AzureCosmosDBVectorSearch, CosmosDBSimilarityType\n",
"from langchain.text_splitter import CharacterTextSplitter\n",
"from langchain.document_loaders import TextLoader\n",
"\n",
"SOURCE_FILE_NAME = \"../../modules/state_of_the_union.txt\"\n",
"\n",
"loader = TextLoader(SOURCE_FILE_NAME)\n",
"documents = loader.load()\n",
"text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n",
"docs = text_splitter.split_documents(documents)\n",
"\n",
"# OpenAI Settings\n",
"model_deployment = os.getenv(\"OPENAI_EMBEDDINGS_DEPLOYMENT\", \"smart-agent-embedding-ada\")\n",
"model_name = os.getenv(\"OPENAI_EMBEDDINGS_MODEL_NAME\", \"text-embedding-ada-002\")\n",
"\n",
"\n",
"openai_embeddings: OpenAIEmbeddings = OpenAIEmbeddings(deployment=model_deployment, model=model_name, chunk_size=1)"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-10-10T17:50:16.732718Z",
"start_time": "2023-10-10T17:50:16.716642Z"
}
},
"id": "183741cf8f4c7c53"
},
{
"cell_type": "code",
"execution_count": 28,
"outputs": [
{
"data": {
"text/plain": "{'raw': {'defaultShard': {'numIndexesBefore': 2,\n 'numIndexesAfter': 3,\n 'createdCollectionAutomatically': False,\n 'ok': 1}},\n 'ok': 1}"
},
"execution_count": 28,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pymongo import MongoClient\n",
"\n",
"INDEX_NAME = \"izzy-test-index-2\"\n",
"NAMESPACE = \"izzy_test_db.izzy_test_collection\"\n",
"DB_NAME, COLLECTION_NAME = NAMESPACE.split(\".\")\n",
"\n",
"client: MongoClient = MongoClient(CONNECTION_STRING)\n",
"collection = client[DB_NAME][COLLECTION_NAME]\n",
"\n",
"model_deployment = os.getenv(\"OPENAI_EMBEDDINGS_DEPLOYMENT\", \"smart-agent-embedding-ada\")\n",
"model_name = os.getenv(\"OPENAI_EMBEDDINGS_MODEL_NAME\", \"text-embedding-ada-002\")\n",
"\n",
"vectorstore = AzureCosmosDBVectorSearch.from_documents(\n",
" docs,\n",
" openai_embeddings,\n",
" collection=collection,\n",
" index_name=INDEX_NAME,\n",
")\n",
"\n",
"num_lists = 100\n",
"dimensions = 1536\n",
"similarity_algorithm = CosmosDBSimilarityType.COS\n",
"\n",
"vectorstore.create_index(num_lists, dimensions, similarity_algorithm)\n"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-10-10T17:51:17.980698Z",
"start_time": "2023-10-10T17:51:11.786336Z"
}
},
"id": "39ae6058c2f7fdf1"
},
{
"cell_type": "code",
"execution_count": 29,
"outputs": [],
"source": [
"# perform a similarity search between the embedding of the query and the embeddings of the documents\n",
"query = \"What did the president say about Ketanji Brown Jackson\"\n",
"docs = vectorstore.similarity_search(query)"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-10-10T17:51:44.840121Z",
"start_time": "2023-10-10T17:51:44.498639Z"
}
},
"id": "32c68d3246adc21f"
},
{
"cell_type": "code",
"execution_count": 31,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Tonight. I call on the Senate to: Pass the Freedom to Vote Act. Pass the John Lewis Voting Rights Act. And while youre at it, pass the Disclose Act so Americans can know who is funding our elections. \n",
"\n",
"Tonight, Id like to honor someone who has dedicated his life to serve this country: Justice Stephen Breyer—an Army veteran, Constitutional scholar, and retiring Justice of the United States Supreme Court. Justice Breyer, thank you for your service. \n",
"\n",
"One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n",
"\n",
"And I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nations top legal minds, who will continue Justice Breyers legacy of excellence.\n"
]
}
],
"source": [
"print(docs[0].page_content)"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-10-10T17:52:08.049294Z",
"start_time": "2023-10-10T17:52:08.038511Z"
}
},
"id": "8feeeb4364efb204"
},
{
"cell_type": "markdown",
"source": [
"Once the documents have been loaded and the index has been created, you can now instantiate the vector store directly and run queries against the index"
],
"metadata": {
"collapsed": false
},
"id": "37e4df8c7d7db851"
},
{
"cell_type": "code",
"execution_count": 32,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Tonight. I call on the Senate to: Pass the Freedom to Vote Act. Pass the John Lewis Voting Rights Act. And while youre at it, pass the Disclose Act so Americans can know who is funding our elections. \n",
"\n",
"Tonight, Id like to honor someone who has dedicated his life to serve this country: Justice Stephen Breyer—an Army veteran, Constitutional scholar, and retiring Justice of the United States Supreme Court. Justice Breyer, thank you for your service. \n",
"\n",
"One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n",
"\n",
"And I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nations top legal minds, who will continue Justice Breyers legacy of excellence.\n"
]
}
],
"source": [
"vectorstore = AzureCosmosDBVectorSearch.from_connection_string(CONNECTION_STRING, NAMESPACE, openai_embeddings, index_name=INDEX_NAME)\n",
"\n",
"# perform a similarity search between a query and the ingested documents\n",
"query = \"What did the president say about Ketanji Brown Jackson\"\n",
"docs = vectorstore.similarity_search(query)\n",
"\n",
"print(docs[0].page_content)"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-10-10T17:52:14.994861Z",
"start_time": "2023-10-10T17:52:13.986379Z"
}
},
"id": "3c218ab6f59301f7"
},
{
"cell_type": "code",
"execution_count": 33,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Tonight. I call on the Senate to: Pass the Freedom to Vote Act. Pass the John Lewis Voting Rights Act. And while youre at it, pass the Disclose Act so Americans can know who is funding our elections. \n",
"\n",
"Tonight, Id like to honor someone who has dedicated his life to serve this country: Justice Stephen Breyer—an Army veteran, Constitutional scholar, and retiring Justice of the United States Supreme Court. Justice Breyer, thank you for your service. \n",
"\n",
"One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n",
"\n",
"And I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nations top legal minds, who will continue Justice Breyers legacy of excellence.\n"
]
}
],
"source": [
"vectorstore = AzureCosmosDBVectorSearch(collection, openai_embeddings, index_name=INDEX_NAME)\n",
"\n",
"# perform a similarity search between a query and the ingested documents\n",
"query = \"What did the president say about Ketanji Brown Jackson\"\n",
"docs = vectorstore.similarity_search(query)\n",
"\n",
"print(docs[0].page_content)"
],
"metadata": {
"collapsed": false,
"ExecuteTime": {
"end_time": "2023-10-10T17:53:21.145431Z",
"start_time": "2023-10-10T17:53:20.884531Z"
}
},
"id": "fd67e4d92c9ab32f"
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [],
"metadata": {
"collapsed": false
},
"id": "b63c73c7e905001c"
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

@ -2,14 +2,16 @@
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Azure Cognitive Search\n",
"\n",
"[Azure Cognitive Search](https://learn.microsoft.com/azure/search/search-what-is-azure-search) (formerly known as `Azure Search`) is a cloud search service that gives developers infrastructure, APIs, and tools for building a rich search experience over private, heterogeneous content in web, mobile, and enterprise applications.\n",
"\n",
"Vector search is currently in public preview. It's available through the Azure portal, preview REST API and beta client libraries. [More info](https://learn.microsoft.com/en-us/azure/search/vector-search-overview) Beta client libraries are subject to potential breaking changes, please be sure to use the SDK package version identified below. azure-search-documents==11.4.0b8"
]
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",

@ -37,6 +37,12 @@ def _import_alibaba_cloud_open_search_settings() -> Any:
return AlibabaCloudOpenSearchSettings
def _import_azure_cosmos_db() -> Any:
from langchain.vectorstores.azure_cosmos_db import AzureCosmosDBVectorSearch
return AzureCosmosDBVectorSearch
def _import_elastic_knn_search() -> Any:
from langchain.vectorstores.elastic_vector_search import ElasticKnnSearch
@ -398,6 +404,8 @@ def __getattr__(name: str) -> Any:
return _import_alibaba_cloud_open_search()
elif name == "AlibabaCloudOpenSearchSettings":
return _import_alibaba_cloud_open_search_settings()
elif name == "AzureCosmosDBVectorSearch":
return _import_azure_cosmos_db()
elif name == "ElasticKnnSearch":
return _import_elastic_knn_search()
elif name == "ElasticVectorSearch":
@ -588,4 +596,5 @@ __all__ = [
"Zilliz",
"Zilliz",
"TencentVectorDB",
"AzureCosmosDBVectorSearch",
]

@ -0,0 +1,421 @@
from __future__ import annotations
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generator,
Iterable,
List,
Optional,
Tuple,
TypeVar,
Union,
)
import numpy as np
from langchain.docstore.document import Document
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.utils import maximal_marginal_relevance
if TYPE_CHECKING:
from pymongo.collection import Collection
from langchain.schema.embeddings import Embeddings
# Before Python 3.11 native StrEnum is not available
class CosmosDBSimilarityType(str, Enum):
COS = "COS" # CosineSimilarity
IP = "IP" # inner - product
L2 = "L2" # Euclidean distance
CosmosDBDocumentType = TypeVar("CosmosDBDocumentType", bound=Dict[str, Any])
logger = logging.getLogger(__name__)
DEFAULT_INSERT_BATCH_SIZE = 128
class AzureCosmosDBVectorSearch(VectorStore):
"""`Azure Cosmos DB for MongoDB vCore` vector store.
To use, you should have both:
- the ``pymongo`` python package installed
- a connection string associated with a MongoDB VCore Cluster
Example:
. code-block:: python
from langchain.vectorstores import AzureCosmosDBVectorSearch
from langchain.embeddings.openai import OpenAIEmbeddings
from pymongo import MongoClient
mongo_client = MongoClient("<YOUR-CONNECTION-STRING>")
collection = mongo_client["<db_name>"]["<collection_name>"]
embeddings = OpenAIEmbeddings()
vectorstore = AzureCosmosDBVectorSearch(collection, embeddings)
"""
def __init__(
self,
collection: Collection[CosmosDBDocumentType],
embedding: Embeddings,
*,
index_name: str = "vectorSearchIndex",
text_key: str = "textContent",
embedding_key: str = "vectorContent",
):
"""Constructor for AzureCosmosDBVectorSearch
Args:
collection: MongoDB collection to add the texts to.
embedding: Text embedding model to use.
index_name: Name of the Atlas Search index.
text_key: MongoDB field that will contain the text
for each document.
embedding_key: MongoDB field that will contain the embedding
for each document.
"""
self._collection = collection
self._embedding = embedding
self._index_name = index_name
self._text_key = text_key
self._embedding_key = embedding_key
@property
def embeddings(self) -> Embeddings:
return self._embedding
def get_index_name(self) -> str:
"""Returns the index name
Returns:
Returns the index name
"""
return self._index_name
@classmethod
def from_connection_string(
cls,
connection_string: str,
namespace: str,
embedding: Embeddings,
**kwargs: Any,
) -> AzureCosmosDBVectorSearch:
"""Creates an Instance of AzureCosmosDBVectorSearch from a Connection String
Args:
connection_string: The MongoDB vCore instance connection string
namespace: The namespace (database.collection)
embedding: The embedding utility
**kwargs: Dynamic keyword arguments
Returns:
an instance of the vector store
"""
try:
from pymongo import MongoClient
except ImportError:
raise ImportError(
"Could not import pymongo, please install it with "
"`pip install pymongo`."
)
client: MongoClient = MongoClient(connection_string)
db_name, collection_name = namespace.split(".")
collection = client[db_name][collection_name]
return cls(collection, embedding, **kwargs)
def index_exists(self) -> bool:
"""Verifies if the specified index name during instance
construction exists on the collection
Returns:
Returns True on success and False if no such index exists
on the collection
"""
cursor = self._collection.list_indexes()
index_name = self._index_name
for res in cursor:
current_index_name = res.pop("name")
if current_index_name == index_name:
return True
return False
def delete_index(self) -> None:
"""Deletes the index specified during instance construction if it exists"""
if self.index_exists():
self._collection.drop_index(self._index_name)
# Raises OperationFailure on an error (e.g. trying to drop
# an index that does not exist)
def create_index(
self,
num_lists: int = 100,
dimensions: int = 1536,
similarity: CosmosDBSimilarityType = CosmosDBSimilarityType.COS,
) -> dict[str, Any]:
"""Creates an index using the index name specified at
instance construction
Setting the numLists parameter correctly is important for achieving
good accuracy and performance.
Since the vector store uses IVF as the indexing strategy,
you should create the index only after you
have loaded a large enough sample documents to ensure that the
centroids for the respective buckets are
faily distributed.
We recommend that numLists is set to documentCount/1000 for up
to 1 million documents
and to sqrt(documentCount) for more than 1 million documents.
As the number of items in your database grows, you should
tune numLists to be larger
in order to achieve good latency performance for vector search.
If you're experimenting with a new scenario or creating a
small demo, you can start with numLists
set to 1 to perform a brute-force search across all vectors.
This should provide you with the most
accurate results from the vector search, however be aware that
the search speed and latency will be slow.
After your initial setup, you should go ahead and tune
the numLists parameter using the above guidance.
Args:
num_lists: This integer is the number of clusters that the
inverted file (IVF) index uses to group the vector data.
We recommend that numLists is set to documentCount/1000
for up to 1 million documents and to sqrt(documentCount)
for more than 1 million documents.
Using a numLists value of 1 is akin to performing
brute-force search, which has limited performance
dimensions: Number of dimensions for vector similarity.
The maximum number of supported dimensions is 2000
similarity: Similarity metric to use with the IVF index.
Possible options are:
- CosmosDBSimilarityType.COS (cosine distance),
- CosmosDBSimilarityType.L2 (Euclidean distance), and
- CosmosDBSimilarityType.IP (inner product).
Returns:
An object describing the created index
"""
# prepare the command
create_index_commands = {
"createIndexes": self._collection.name,
"indexes": [
{
"name": self._index_name,
"key": {"vectorContent": "cosmosSearch"},
"cosmosSearchOptions": {
"kind": "vector-ivf",
"numLists": num_lists,
"similarity": similarity,
"dimensions": dimensions,
},
}
],
}
# retrieve the database object
current_database = self._collection.database
# invoke the command from the database object
create_index_responses: dict[str, Any] = current_database.command(
create_index_commands
)
return create_index_responses
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
**kwargs: Any,
) -> List:
batch_size = kwargs.get("batch_size", DEFAULT_INSERT_BATCH_SIZE)
_metadatas: Union[List, Generator] = metadatas or ({} for _ in texts)
texts_batch = []
metadatas_batch = []
result_ids = []
for i, (text, metadata) in enumerate(zip(texts, _metadatas)):
texts_batch.append(text)
metadatas_batch.append(metadata)
if (i + 1) % batch_size == 0:
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
texts_batch = []
metadatas_batch = []
if texts_batch:
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
return result_ids
def _insert_texts(self, texts: List[str], metadatas: List[Dict[str, Any]]) -> List:
"""Used to Load Documents into the collection
Args:
texts: The list of documents strings to load
metadatas: The list of metadata objects associated with each document
Returns:
"""
# If the text is empty, then exit early
if not texts:
return []
# Embed and create the documents
embeddings = self._embedding.embed_documents(texts)
to_insert = [
{self._text_key: t, self._embedding_key: embedding, **m}
for t, m, embedding in zip(texts, metadatas, embeddings)
]
# insert the documents in Cosmos DB
insert_result = self._collection.insert_many(to_insert) # type: ignore
return insert_result.inserted_ids
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection: Optional[Collection[CosmosDBDocumentType]] = None,
**kwargs: Any,
) -> AzureCosmosDBVectorSearch:
if collection is None:
raise ValueError("Must provide 'collection' named parameter.")
vectorstore = cls(collection, embedding, **kwargs)
vectorstore.add_texts(texts, metadatas=metadatas)
return vectorstore
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
if ids is None:
raise ValueError("No document ids provided to delete.")
for document_id in ids:
self.delete_document_by_id(document_id)
return True
def delete_document_by_id(self, document_id: Optional[str] = None) -> None:
"""Removes a Specific Document by Id
Args:
document_id: The document identifier
"""
try:
from bson.objectid import ObjectId
except ImportError as e:
raise ImportError(
"Unable to import bson, please install with `pip install bson`."
) from e
if document_id is None:
raise ValueError("No document id provided to delete.")
self._collection.delete_one({"_id": ObjectId(document_id)})
def _similarity_search_with_score(
self, embeddings: List[float], k: int = 4
) -> List[Tuple[Document, float]]:
"""Returns a list of documents with their scores
Args:
embeddings: The query vector
k: the number of documents to return
Returns:
A list of documents closest to the query vector
"""
pipeline: List[dict[str, Any]] = [
{
"$search": {
"cosmosSearch": {
"vector": embeddings,
"path": self._embedding_key,
"k": k,
},
"returnStoredSource": True,
}
},
{
"$project": {
"similarityScore": {"$meta": "searchScore"},
"document": "$$ROOT",
}
},
]
cursor = self._collection.aggregate(pipeline)
docs = []
for res in cursor:
score = res.pop("similarityScore")
document_object_field = res.pop("document")
text = document_object_field.pop(self._text_key)
docs.append(
(Document(page_content=text, metadata=document_object_field), score)
)
return docs
def similarity_search_with_score(
self, query: str, k: int = 4
) -> List[Tuple[Document, float]]:
embeddings = self._embedding.embed_query(query)
docs = self._similarity_search_with_score(embeddings=embeddings, k=k)
return docs
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
docs_and_scores = self.similarity_search_with_score(query, k=k)
return [doc for doc, _ in docs_and_scores]
def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
# Retrieves the docs with similarity scores
# sorted by similarity scores in DESC order
docs = self._similarity_search_with_score(embedding, k=fetch_k)
# Re-ranks the docs using MMR
mmr_doc_indexes = maximal_marginal_relevance(
np.array(embedding),
[doc.metadata[self._embedding_key] for doc, _ in docs],
k=k,
lambda_mult=lambda_mult,
)
mmr_docs = [docs[i][0] for i in mmr_doc_indexes]
return mmr_docs
def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
# compute the embeddings vector from the query string
embeddings = self._embedding.embed_query(query)
docs = self.max_marginal_relevance_search_by_vector(
embeddings, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult
)
return docs

@ -0,0 +1,435 @@
"""Test AzureCosmosDBVectorSearch functionality."""
import logging
import os
from time import sleep
from typing import Any, Generator, Optional, Union
import pytest
from langchain.docstore.document import Document
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores.azure_cosmos_db import (
AzureCosmosDBVectorSearch,
CosmosDBSimilarityType,
)
logging.basicConfig(level=logging.DEBUG)
model_deployment = os.getenv(
"OPENAI_EMBEDDINGS_DEPLOYMENT", "smart-agent-embedding-ada"
)
model_name = os.getenv("OPENAI_EMBEDDINGS_MODEL_NAME", "text-embedding-ada-002")
INDEX_NAME = "langchain-test-index"
NAMESPACE = "langchain_test_db.langchain_test_collection"
CONNECTION_STRING: str = os.environ.get("MONGODB_VCORE_URI", "")
DB_NAME, COLLECTION_NAME = NAMESPACE.split(".")
num_lists = 3
dimensions = 1536
similarity_algorithm = CosmosDBSimilarityType.COS
def prepare_collection() -> Any:
from pymongo import MongoClient
test_client: MongoClient = MongoClient(CONNECTION_STRING)
return test_client[DB_NAME][COLLECTION_NAME]
@pytest.fixture()
def collection() -> Any:
return prepare_collection()
@pytest.fixture()
def azure_openai_embeddings() -> Any:
openai_embeddings: OpenAIEmbeddings = OpenAIEmbeddings(
deployment=model_deployment, model=model_name, chunk_size=1
)
return openai_embeddings
"""
This is how to run the integration tests:
cd libs/langchain
pytest tests/integration_tests/vectorstores/test_azure_cosmos_db.py
"""
class TestAzureCosmosDBVectorSearch:
@classmethod
def setup_class(cls) -> None:
if not os.getenv("OPENAI_API_KEY"):
raise ValueError("OPENAI_API_KEY environment variable is not set")
# insure the test collection is empty
collection = prepare_collection()
assert collection.count_documents({}) == 0 # type: ignore[index] # noqa: E501
@classmethod
def teardown_class(cls) -> None:
collection = prepare_collection()
# delete all the documents in the collection
collection.delete_many({}) # type: ignore[index]
@pytest.fixture(autouse=True)
def setup(self) -> None:
collection = prepare_collection()
# delete all the documents in the collection
collection.delete_many({}) # type: ignore[index]
@pytest.fixture(scope="class", autouse=True)
def cosmos_db_url(self) -> Union[str, Generator[str, None, None]]:
"""Return the elasticsearch url."""
return "805.555.1212"
def test_from_documents_cosine_distance(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
"""Test end to end construction and search."""
documents = [
Document(page_content="Dogs are tough.", metadata={"a": 1}),
Document(page_content="Cats have fluff.", metadata={"b": 1}),
Document(page_content="What is a sandwich?", metadata={"c": 1}),
Document(page_content="That fence is purple.", metadata={"d": 1, "e": 2}),
]
vectorstore = AzureCosmosDBVectorSearch.from_documents(
documents,
azure_openai_embeddings,
collection=collection,
index_name=INDEX_NAME,
)
sleep(1) # waits for Cosmos DB to save contents to the collection
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, similarity_algorithm)
sleep(2) # waits for the index to be set up
output = vectorstore.similarity_search("Sandwich", k=1)
assert output
assert output[0].page_content == "What is a sandwich?"
assert output[0].metadata["c"] == 1
vectorstore.delete_index()
def test_from_documents_inner_product(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
"""Test end to end construction and search."""
documents = [
Document(page_content="Dogs are tough.", metadata={"a": 1}),
Document(page_content="Cats have fluff.", metadata={"b": 1}),
Document(page_content="What is a sandwich?", metadata={"c": 1}),
Document(page_content="That fence is purple.", metadata={"d": 1, "e": 2}),
]
vectorstore = AzureCosmosDBVectorSearch.from_documents(
documents,
azure_openai_embeddings,
collection=collection,
index_name=INDEX_NAME,
)
sleep(1) # waits for Cosmos DB to save contents to the collection
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, CosmosDBSimilarityType.IP)
sleep(2) # waits for the index to be set up
output = vectorstore.similarity_search("Sandwich", k=1)
assert output
assert output[0].page_content == "What is a sandwich?"
assert output[0].metadata["c"] == 1
vectorstore.delete_index()
def test_from_texts_cosine_distance(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
texts = [
"Dogs are tough.",
"Cats have fluff.",
"What is a sandwich?",
"That fence is purple.",
]
vectorstore = AzureCosmosDBVectorSearch.from_texts(
texts,
azure_openai_embeddings,
collection=collection,
index_name=INDEX_NAME,
)
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, similarity_algorithm)
sleep(2) # waits for the index to be set up
output = vectorstore.similarity_search("Sandwich", k=1)
assert output[0].page_content == "What is a sandwich?"
vectorstore.delete_index()
def test_from_texts_with_metadatas_cosine_distance(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
texts = [
"Dogs are tough.",
"Cats have fluff.",
"What is a sandwich?",
"The fence is purple.",
]
metadatas = [{"a": 1}, {"b": 1}, {"c": 1}, {"d": 1, "e": 2}]
vectorstore = AzureCosmosDBVectorSearch.from_texts(
texts,
azure_openai_embeddings,
metadatas=metadatas,
collection=collection,
index_name=INDEX_NAME,
)
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, similarity_algorithm)
sleep(2) # waits for the index to be set up
output = vectorstore.similarity_search("Sandwich", k=1)
assert output
assert output[0].page_content == "What is a sandwich?"
assert output[0].metadata["c"] == 1
vectorstore.delete_index()
def test_from_texts_with_metadatas_delete_one(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
texts = [
"Dogs are tough.",
"Cats have fluff.",
"What is a sandwich?",
"The fence is purple.",
]
metadatas = [{"a": 1}, {"b": 1}, {"c": 1}, {"d": 1, "e": 2}]
vectorstore = AzureCosmosDBVectorSearch.from_texts(
texts,
azure_openai_embeddings,
metadatas=metadatas,
collection=collection,
index_name=INDEX_NAME,
)
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, similarity_algorithm)
sleep(2) # waits for the index to be set up
output = vectorstore.similarity_search("Sandwich", k=1)
assert output
assert output[0].page_content == "What is a sandwich?"
assert output[0].metadata["c"] == 1
first_document_id_object = output[0].metadata["_id"]
first_document_id = str(first_document_id_object)
vectorstore.delete_document_by_id(first_document_id)
sleep(2) # waits for the index to be updated
output2 = vectorstore.similarity_search("Sandwich", k=1)
assert output2
assert output2[0].page_content != "What is a sandwich?"
vectorstore.delete_index()
def test_from_texts_with_metadatas_delete_multiple(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
texts = [
"Dogs are tough.",
"Cats have fluff.",
"What is a sandwich?",
"The fence is purple.",
]
metadatas = [{"a": 1}, {"b": 1}, {"c": 1}, {"d": 1, "e": 2}]
vectorstore = AzureCosmosDBVectorSearch.from_texts(
texts,
azure_openai_embeddings,
metadatas=metadatas,
collection=collection,
index_name=INDEX_NAME,
)
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, similarity_algorithm)
sleep(2) # waits for the index to be set up
output = vectorstore.similarity_search("Sandwich", k=5)
first_document_id_object = output[0].metadata["_id"]
first_document_id = str(first_document_id_object)
output[1].metadata["_id"]
second_document_id = output[1].metadata["_id"]
output[2].metadata["_id"]
third_document_id = output[2].metadata["_id"]
document_ids = [first_document_id, second_document_id, third_document_id]
vectorstore.delete(document_ids)
sleep(2) # waits for the index to be updated
output_2 = vectorstore.similarity_search("Sandwich", k=5)
assert output
assert output_2
assert len(output) == 4 # we should see all the four documents
assert (
len(output_2) == 1
) # we should see only one document left after three have been deleted
vectorstore.delete_index()
def test_from_texts_with_metadatas_inner_product(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
texts = [
"Dogs are tough.",
"Cats have fluff.",
"What is a sandwich?",
"The fence is purple.",
]
metadatas = [{"a": 1}, {"b": 1}, {"c": 1}, {"d": 1, "e": 2}]
vectorstore = AzureCosmosDBVectorSearch.from_texts(
texts,
azure_openai_embeddings,
metadatas=metadatas,
collection=collection,
index_name=INDEX_NAME,
)
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, CosmosDBSimilarityType.IP)
sleep(2) # waits for the index to be set up
output = vectorstore.similarity_search("Sandwich", k=1)
assert output
assert output[0].page_content == "What is a sandwich?"
assert output[0].metadata["c"] == 1
vectorstore.delete_index()
def test_from_texts_with_metadatas_euclidean_distance(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
texts = [
"Dogs are tough.",
"Cats have fluff.",
"What is a sandwich?",
"The fence is purple.",
]
metadatas = [{"a": 1}, {"b": 1}, {"c": 1}, {"d": 1, "e": 2}]
vectorstore = AzureCosmosDBVectorSearch.from_texts(
texts,
azure_openai_embeddings,
metadatas=metadatas,
collection=collection,
index_name=INDEX_NAME,
)
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, CosmosDBSimilarityType.L2)
sleep(2) # waits for the index to be set up
output = vectorstore.similarity_search("Sandwich", k=1)
assert output
assert output[0].page_content == "What is a sandwich?"
assert output[0].metadata["c"] == 1
vectorstore.delete_index()
def test_max_marginal_relevance_cosine_distance(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
texts = ["foo", "foo", "fou", "foy"]
vectorstore = AzureCosmosDBVectorSearch.from_texts(
texts,
azure_openai_embeddings,
collection=collection,
index_name=INDEX_NAME,
)
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, CosmosDBSimilarityType.COS)
sleep(2) # waits for the index to be set up
query = "foo"
output = vectorstore.max_marginal_relevance_search(query, k=10, lambda_mult=0.1)
assert len(output) == len(texts)
assert output[0].page_content == "foo"
assert output[1].page_content != "foo"
vectorstore.delete_index()
def test_max_marginal_relevance_inner_product(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
texts = ["foo", "foo", "fou", "foy"]
vectorstore = AzureCosmosDBVectorSearch.from_texts(
texts,
azure_openai_embeddings,
collection=collection,
index_name=INDEX_NAME,
)
# Create the IVF index that will be leveraged later for vector search
vectorstore.create_index(num_lists, dimensions, CosmosDBSimilarityType.IP)
sleep(2) # waits for the index to be set up
query = "foo"
output = vectorstore.max_marginal_relevance_search(query, k=10, lambda_mult=0.1)
assert len(output) == len(texts)
assert output[0].page_content == "foo"
assert output[1].page_content != "foo"
vectorstore.delete_index()
def invoke_delete_with_no_args(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> Optional[bool]:
vectorstore: AzureCosmosDBVectorSearch = (
AzureCosmosDBVectorSearch.from_connection_string(
CONNECTION_STRING,
NAMESPACE,
azure_openai_embeddings,
index_name=INDEX_NAME,
)
)
return vectorstore.delete()
def invoke_delete_by_id_with_no_args(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
vectorstore: AzureCosmosDBVectorSearch = (
AzureCosmosDBVectorSearch.from_connection_string(
CONNECTION_STRING,
NAMESPACE,
azure_openai_embeddings,
index_name=INDEX_NAME,
)
)
vectorstore.delete_document_by_id()
def test_invalid_arguments_to_delete(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
with pytest.raises(ValueError) as exception_info:
self.invoke_delete_with_no_args(azure_openai_embeddings, collection)
assert str(exception_info.value) == "No document ids provided to delete."
def test_no_arguments_to_delete_by_id(
self, azure_openai_embeddings: OpenAIEmbeddings, collection: Any
) -> None:
with pytest.raises(Exception) as exception_info:
self.invoke_delete_by_id_with_no_args(azure_openai_embeddings, collection)
assert str(exception_info.value) == "No document id provided to delete."
Loading…
Cancel
Save