community[patch]: Make some functions work with Milvus (#10695)

**Description**
Make some functions work with Milvus:
1. get_ids: Get primary keys by field in the metadata
2. delete: Delete one or more entities by ids
3. upsert: Update/Insert one or more entities

**Issue**
None
**Dependencies**
None
**Tag maintainer:**
@hwchase17 
**Twitter handle:**
None

---------

Co-authored-by: HoaNQ9 <hoanq.1811@gmail.com>
Co-authored-by: Erick Friis <erick@langchain.dev>
pull/17351/head
Quang Hoa 5 months ago committed by GitHub
parent c9999557bf
commit 54c1fb3f25
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -204,23 +204,29 @@
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"### Per-User Retrieval\n",
"\n",
"When building a retrieval app, you often have to build it with multiple users in mind. This means that you may be storing data not just for one user, but for many different users, and they should not be able to see eachothers data.\n",
"\n",
"Milvus recommends using [partition_key](https://milvus.io/docs/multi_tenancy.md#Partition-key-based-multi-tenancy) to implement multi-tenancy, here is an example."
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
}
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"from langchain_core.documents import Document\n",
@ -236,16 +242,16 @@
" drop_old=True,\n",
" partition_key_field=\"namespace\", # Use the \"namespace\" field as the partition key\n",
")"
],
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
"name": "#%% md\n"
}
}
},
{
"cell_type": "markdown",
},
"source": [
"To conduct a search using the partition key, you should include either of the following in the boolean expression of the search request:\n",
"\n",
@ -256,21 +262,23 @@
"Do replace `<partition_key>` with the name of the field that is designated as the partition key.\n",
"\n",
"Milvus changes to a partition based on the specified partition key, filters entities according to the partition key, and searches among the filtered entities.\n"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
}
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"data": {
"text/plain": "[Document(page_content='i worked at facebook', metadata={'namespace': 'ankush'})]"
"text/plain": [
"[Document(page_content='i worked at facebook', metadata={'namespace': 'ankush'})]"
]
},
"execution_count": 3,
"metadata": {},
@ -282,21 +290,23 @@
"vectorstore.as_retriever(\n",
" search_kwargs={\"expr\": 'namespace == \"ankush\"'}\n",
").get_relevant_documents(\"where did i work?\")"
],
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": 4,
},
"outputs": [
{
"data": {
"text/plain": "[Document(page_content='i worked at kensho', metadata={'namespace': 'harrison'})]"
"text/plain": [
"[Document(page_content='i worked at kensho', metadata={'namespace': 'harrison'})]"
]
},
"execution_count": 4,
"metadata": {},
@ -308,13 +318,52 @@
"vectorstore.as_retriever(\n",
" search_kwargs={\"expr\": 'namespace == \"harrison\"'}\n",
").get_relevant_documents(\"where did i work?\")"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
]
},
{
"cell_type": "markdown",
"id": "89756e9e",
"metadata": {},
"source": [
"**To delete or upsert (update/insert) one or more entities:**"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "21c4edcf",
"metadata": {},
"outputs": [],
"source": [
"from langchain.docstore.document import Document\n",
"\n",
"# Insert data sample\n",
"docs = [\n",
" Document(page_content=\"foo\", metadata={\"id\": 1}),\n",
" Document(page_content=\"bar\", metadata={\"id\": 2}),\n",
" Document(page_content=\"baz\", metadata={\"id\": 3}),\n",
"]\n",
"vector_db = Milvus.from_documents(\n",
" docs,\n",
" embeddings,\n",
" connection_args={\"host\": \"127.0.0.1\", \"port\": \"19530\"},\n",
")\n",
"\n",
"# Search pks (primary keys) using expression\n",
"expr = \"id in [1,2]\"\n",
"pks = vector_db.get_pks(expr)\n",
"\n",
"# Delete entities by pks\n",
"result = vector_db.delete(pks)\n",
"\n",
"# Upsert (Update/Insert)\n",
"new_docs = [\n",
" Document(page_content=\"new_foo\", metadata={\"id\": 1}),\n",
" Document(page_content=\"new_bar\", metadata={\"id\": 2}),\n",
" Document(page_content=\"upserted_bak\", metadata={\"id\": 3}),\n",
"]\n",
"upserted_pks = vector_db.upsert(pks, new_docs)"
]
}
],
"metadata": {
@ -338,4 +387,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}

@ -989,3 +989,64 @@ class Milvus(VectorStore):
page_content=data.pop(self._text_field),
metadata=data.pop(self._metadata_field) if self._metadata_field else data,
)
def get_pks(self, expr: str, **kwargs: Any) -> List[int] | None:
"""Get primary keys with expression
Args:
expr: Expression - E.g: "id in [1, 2]", or "title LIKE 'Abc%'"
Returns:
List[int]: List of IDs (Primary Keys)
"""
from pymilvus import MilvusException
if self.col is None:
logger.debug("No existing collection to get pk.")
return None
try:
query_result = self.col.query(
expr=expr, output_fields=[self._primary_field]
)
except MilvusException as exc:
logger.error("Failed to get ids: %s error: %s", self.collection_name, exc)
raise exc
pks = [item.get(self._primary_field) for item in query_result]
return pks
def upsert(
self,
ids: Optional[List[str]] = None,
documents: List[Document] | None = None,
**kwargs: Any,
) -> List[str] | None:
"""Update/Insert documents to the vectorstore.
Args:
ids: IDs to update - Let's call get_pks to get ids with expression \n
documents (List[Document]): Documents to add to the vectorstore.
Returns:
List[str]: IDs of the added texts.
"""
from pymilvus import MilvusException
if documents is None or len(documents) == 0:
logger.debug("No documents to upsert.")
return None
if ids is not None and len(ids):
try:
self.delete(ids=ids)
except MilvusException:
pass
try:
return self.add_documents(documents=documents)
except MilvusException as exc:
logger.error(
"Failed to upsert entities: %s error: %s", self.collection_name, exc
)
raise exc

@ -1,5 +1,5 @@
"""Test Milvus functionality."""
from typing import List, Optional
from typing import Any, List, Optional
from langchain_core.documents import Document
@ -25,6 +25,10 @@ def _milvus_from_texts(
)
def _get_pks(expr: str, docsearch: Milvus) -> List[Any]:
return docsearch.get_pks(expr)
def test_milvus() -> None:
"""Test end to end construction and search."""
docsearch = _milvus_from_texts()
@ -109,6 +113,42 @@ def test_milvus_no_drop() -> None:
assert len(output) == 6
def test_milvus_get_pks() -> None:
"""Test end to end construction and get pks with expr"""
texts = ["foo", "bar", "baz"]
metadatas = [{"id": i} for i in range(len(texts))]
docsearch = _milvus_from_texts(metadatas=metadatas)
expr = "id in [1,2]"
output = _get_pks(expr, docsearch)
assert len(output) == 2
def test_milvus_delete_entities() -> None:
"""Test end to end construction and delete entities"""
texts = ["foo", "bar", "baz"]
metadatas = [{"id": i} for i in range(len(texts))]
docsearch = _milvus_from_texts(metadatas=metadatas)
expr = "id in [1,2]"
pks = _get_pks(expr, docsearch)
result = docsearch.delete(pks)
assert result is True
def test_milvus_upsert_entities() -> None:
"""Test end to end construction and upsert entities"""
texts = ["foo", "bar", "baz"]
metadatas = [{"id": i} for i in range(len(texts))]
docsearch = _milvus_from_texts(metadatas=metadatas)
expr = "id in [1,2]"
pks = _get_pks(expr, docsearch)
documents = [
Document(page_content="test_1", metadata={"id": 1}),
Document(page_content="test_2", metadata={"id": 3}),
]
ids = docsearch.upsert(pks, documents)
assert len(ids) == 2
# if __name__ == "__main__":
# test_milvus()
# test_milvus_with_metadata()

Loading…
Cancel
Save