core[minor],langchain[patch]: Move base indexing interface and logic to core (#20667)

This PR moves the interface and the logic to core.

The following changes to namespaces:

`indexes` -> `indexing`
`indexes._api` -> `indexing.api`

Testing code is intentionally duplicated for now since it's testing
implementations of the record manager (in-memory vs. SQL).

Common logic will need to be pulled out into the test client.

A follow up PR will move the SQL based implementation outside of
Eugene Yurtsev 1 month ago committed by GitHub
parent 3bcfbcc871
commit d8aa72f51d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,15 @@
"""Code to help indexing data into a vectorstore.
This package contains helper logic to help deal with indexing data into
a vectorstore while avoiding duplicated content and over-writing content
if it's unchanged.
from langchain_core.indexing.api import IndexingResult, aindex, index
from langchain_core.indexing.base import RecordManager
__all__ = [

@ -0,0 +1,606 @@
"""Module contains logic for indexing documents into vector stores."""
from __future__ import annotations
import hashlib
import json
import uuid
from itertools import islice
from typing import (
from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.indexing.base import RecordManager
from langchain_core.pydantic_v1 import root_validator
from langchain_core.vectorstores import VectorStore
# Magic UUID to use as a namespace for hashing.
# Used to try and generate a unique UUID for each document
# from hashing the document content and metadata.
NAMESPACE_UUID = uuid.UUID(int=1984)
T = TypeVar("T")
def _hash_string_to_uuid(input_string: str) -> uuid.UUID:
"""Hashes a string and returns the corresponding UUID."""
hash_value = hashlib.sha1(input_string.encode("utf-8")).hexdigest()
return uuid.uuid5(NAMESPACE_UUID, hash_value)
def _hash_nested_dict_to_uuid(data: dict[Any, Any]) -> uuid.UUID:
"""Hashes a nested dictionary and returns the corresponding UUID."""
serialized_data = json.dumps(data, sort_keys=True)
hash_value = hashlib.sha1(serialized_data.encode("utf-8")).hexdigest()
return uuid.uuid5(NAMESPACE_UUID, hash_value)
class _HashedDocument(Document):
"""A hashed document with a unique ID."""
uid: str
hash_: str
"""The hash of the document including content and metadata."""
content_hash: str
"""The hash of the document content."""
metadata_hash: str
"""The hash of the document metadata."""
def is_lc_serializable(cls) -> bool:
return False
def calculate_hashes(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Root validator to calculate content and metadata hash."""
content = values.get("page_content", "")
metadata = values.get("metadata", {})
forbidden_keys = ("hash_", "content_hash", "metadata_hash")
for key in forbidden_keys:
if key in metadata:
raise ValueError(
f"Metadata cannot contain key {key} as it "
f"is reserved for internal use."
content_hash = str(_hash_string_to_uuid(content))
metadata_hash = str(_hash_nested_dict_to_uuid(metadata))
except Exception as e:
raise ValueError(
f"Failed to hash metadata: {e}. "
f"Please use a dict that can be serialized using json."
values["content_hash"] = content_hash
values["metadata_hash"] = metadata_hash
values["hash_"] = str(_hash_string_to_uuid(content_hash + metadata_hash))
_uid = values.get("uid", None)
if _uid is None:
values["uid"] = values["hash_"]
return values
def to_document(self) -> Document:
"""Return a Document object."""
return Document(
def from_document(
cls, document: Document, *, uid: Optional[str] = None
) -> _HashedDocument:
"""Create a HashedDocument from a Document."""
return cls( # type: ignore[call-arg]
uid=uid, # type: ignore[arg-type]
def _batch(size: int, iterable: Iterable[T]) -> Iterator[List[T]]:
"""Utility batching function."""
it = iter(iterable)
while True:
chunk = list(islice(it, size))
if not chunk:
yield chunk
async def _abatch(size: int, iterable: AsyncIterable[T]) -> AsyncIterator[List[T]]:
"""Utility batching function."""
batch: List[T] = []
async for element in iterable:
if len(batch) < size:
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch
def _get_source_id_assigner(
source_id_key: Union[str, Callable[[Document], str], None],
) -> Callable[[Document], Union[str, None]]:
"""Get the source id from the document."""
if source_id_key is None:
return lambda doc: None
elif isinstance(source_id_key, str):
return lambda doc: doc.metadata[source_id_key]
elif callable(source_id_key):
return source_id_key
raise ValueError(
f"source_id_key should be either None, a string or a callable. "
f"Got {source_id_key} of type {type(source_id_key)}."
def _deduplicate_in_order(
hashed_documents: Iterable[_HashedDocument],
) -> Iterator[_HashedDocument]:
"""Deduplicate a list of hashed documents while preserving order."""
seen: Set[str] = set()
for hashed_doc in hashed_documents:
if hashed_doc.hash_ not in seen:
yield hashed_doc
class IndexingResult(TypedDict):
"""Return a detailed a breakdown of the result of the indexing operation."""
num_added: int
"""Number of added documents."""
num_updated: int
"""Number of updated documents because they were not up to date."""
num_deleted: int
"""Number of deleted documents."""
num_skipped: int
"""Number of skipped documents because they were already up to date."""
def index(
docs_source: Union[BaseLoader, Iterable[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
) -> IndexingResult:
"""Index data from the loader into the vector store.
Indexing functionality uses a manager to keep track of which documents
are in the vector store.
This allows us to keep track of which documents were updated, and which
documents were deleted, which documents should be skipped.
For the time being, documents are indexed using their hashes, and users
are not able to specify the uid of the document.
if auto_cleanup is set to True, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
docs_source: Data loader or iterable of documents to index.
record_manager: Timestamped set to keep track of which documents were
vector_store: Vector store to index the documents into.
batch_size: Batch size to use when indexing.
cleanup: How to handle clean up of documents.
- Incremental: Cleans up all documents that haven't been updated AND
that are associated with source ids that were seen
during indexing.
Clean up is done continuously during indexing helping
to minimize the probability of users seeing duplicated
- Full: Delete all documents that have not been returned by the loader
during this run of indexing.
Clean up runs after all documents have been indexed.
This means that users may see duplicated content during indexing.
- None: Do not delete any documents.
source_id_key: Optional key that helps identify the original source
of the document.
cleanup_batch_size: Batch size to use when cleaning up documents.
force_update: Force update documents even if they are present in the
record manager. Useful if you are re-indexing with updated embeddings.
Indexing result which contains information about how many documents
were added, updated, deleted, or skipped.
if cleanup not in {"incremental", "full", None}:
raise ValueError(
f"cleanup should be one of 'incremental', 'full' or None. "
f"Got {cleanup}."
if cleanup == "incremental" and source_id_key is None:
raise ValueError("Source id key is required when cleanup mode is incremental.")
# Check that the Vectorstore has required methods implemented
methods = ["delete", "add_documents"]
for method in methods:
if not hasattr(vector_store, method):
raise ValueError(
f"Vectorstore {vector_store} does not have required method {method}"
if type(vector_store).delete == VectorStore.delete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
if isinstance(docs_source, BaseLoader):
doc_iterator = docs_source.lazy_load()
except NotImplementedError:
doc_iterator = iter(docs_source.load())
doc_iterator = iter(docs_source)
source_id_assigner = _get_source_id_assigner(source_id_key)
# Mark when the update started.
index_start_dt = record_manager.get_time()
num_added = 0
num_skipped = 0
num_updated = 0
num_deleted = 0
for doc_batch in _batch(batch_size, doc_iterator):
hashed_docs = list(
[_HashedDocument.from_document(doc) for doc in doc_batch]
source_ids: Sequence[Optional[str]] = [
source_id_assigner(doc) for doc in hashed_docs
if cleanup == "incremental":
# If the cleanup mode is incremental, source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
raise ValueError(
"Source ids are required when cleanup mode is incremental. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment]
exists_batch = record_manager.exists([doc.uid for doc in hashed_docs])
# Filter out documents that already exist in the record store.
uids = []
docs_to_index = []
uids_to_refresh = []
seen_docs: Set[str] = set()
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists:
if force_update:
# Update refresh timestamp
if uids_to_refresh:
record_manager.update(uids_to_refresh, time_at_least=index_start_dt)
num_skipped += len(uids_to_refresh)
# Be pessimistic and assume that all vector store write will fail.
# First write to vector store
if docs_to_index:
vector_store.add_documents(docs_to_index, ids=uids, batch_size=batch_size)
num_added += len(docs_to_index) - len(seen_docs)
num_updated += len(seen_docs)
# And only then update the record store.
# Update ALL records, even if they already exist since we want to refresh
# their timestamp.
[doc.uid for doc in hashed_docs],
# If source IDs are provided, we can do the deletion incrementally!
if cleanup == "incremental":
# Get the uids of the documents that were not returned by the loader.
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
_source_ids = cast(Sequence[str], source_ids)
uids_to_delete = record_manager.list_keys(
group_ids=_source_ids, before=index_start_dt
if uids_to_delete:
# Then delete from vector store.
# First delete from record store.
num_deleted += len(uids_to_delete)
if cleanup == "full":
while uids_to_delete := record_manager.list_keys(
before=index_start_dt, limit=cleanup_batch_size
# First delete from record store.
# Then delete from record manager.
num_deleted += len(uids_to_delete)
return {
"num_added": num_added,
"num_updated": num_updated,
"num_skipped": num_skipped,
"num_deleted": num_deleted,
# Define an asynchronous generator function
async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]:
"""Convert an iterable to an async iterator."""
for item in iterator:
yield item
async def aindex(
docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
) -> IndexingResult:
"""Index data from the loader into the vector store.
Indexing functionality uses a manager to keep track of which documents
are in the vector store.
This allows us to keep track of which documents were updated, and which
documents were deleted, which documents should be skipped.
For the time being, documents are indexed using their hashes, and users
are not able to specify the uid of the document.
if auto_cleanup is set to True, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
docs_source: Data loader or iterable of documents to index.
record_manager: Timestamped set to keep track of which documents were
vector_store: Vector store to index the documents into.
batch_size: Batch size to use when indexing.
cleanup: How to handle clean up of documents.
- Incremental: Cleans up all documents that haven't been updated AND
that are associated with source ids that were seen
during indexing.
Clean up is done continuously during indexing helping
to minimize the probability of users seeing duplicated
- Full: Delete all documents that haven to been returned by the loader.
Clean up runs after all documents have been indexed.
This means that users may see duplicated content during indexing.
- None: Do not delete any documents.
source_id_key: Optional key that helps identify the original source
of the document.
cleanup_batch_size: Batch size to use when cleaning up documents.
force_update: Force update documents even if they are present in the
record manager. Useful if you are re-indexing with updated embeddings.
Indexing result which contains information about how many documents
were added, updated, deleted, or skipped.
if cleanup not in {"incremental", "full", None}:
raise ValueError(
f"cleanup should be one of 'incremental', 'full' or None. "
f"Got {cleanup}."
if cleanup == "incremental" and source_id_key is None:
raise ValueError("Source id key is required when cleanup mode is incremental.")
# Check that the Vectorstore has required methods implemented
methods = ["adelete", "aadd_documents"]
for method in methods:
if not hasattr(vector_store, method):
raise ValueError(
f"Vectorstore {vector_store} does not have required method {method}"
if type(vector_store).adelete == VectorStore.adelete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
async_doc_iterator: AsyncIterator[Document]
if isinstance(docs_source, BaseLoader):
async_doc_iterator = docs_source.alazy_load()
except NotImplementedError:
# Exception triggered when neither lazy_load nor alazy_load are implemented.
# * The default implementation of alazy_load uses lazy_load.
# * The default implementation of lazy_load raises NotImplementedError.
# In such a case, we use the load method and convert it to an async
# iterator.
async_doc_iterator = _to_async_iterator(docs_source.load())
if hasattr(docs_source, "__aiter__"):
async_doc_iterator = docs_source # type: ignore[assignment]
async_doc_iterator = _to_async_iterator(docs_source)
source_id_assigner = _get_source_id_assigner(source_id_key)
# Mark when the update started.
index_start_dt = await record_manager.aget_time()
num_added = 0
num_skipped = 0
num_updated = 0
num_deleted = 0
async for doc_batch in _abatch(batch_size, async_doc_iterator):
hashed_docs = list(
[_HashedDocument.from_document(doc) for doc in doc_batch]
source_ids: Sequence[Optional[str]] = [
source_id_assigner(doc) for doc in hashed_docs
if cleanup == "incremental":
# If the cleanup mode is incremental, source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
raise ValueError(
"Source ids are required when cleanup mode is incremental. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids)
exists_batch = await record_manager.aexists([doc.uid for doc in hashed_docs])
# Filter out documents that already exist in the record store.
uids: list[str] = []
docs_to_index: list[Document] = []
uids_to_refresh = []
seen_docs: Set[str] = set()
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists:
if force_update:
if uids_to_refresh:
# Must be updated to refresh timestamp.
await record_manager.aupdate(uids_to_refresh, time_at_least=index_start_dt)
num_skipped += len(uids_to_refresh)
# Be pessimistic and assume that all vector store write will fail.
# First write to vector store
if docs_to_index:
await vector_store.aadd_documents(
docs_to_index, ids=uids, batch_size=batch_size
num_added += len(docs_to_index) - len(seen_docs)
num_updated += len(seen_docs)
# And only then update the record store.
# Update ALL records, even if they already exist since we want to refresh
# their timestamp.
await record_manager.aupdate(
[doc.uid for doc in hashed_docs],
# If source IDs are provided, we can do the deletion incrementally!
if cleanup == "incremental":
# Get the uids of the documents that were not returned by the loader.
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
_source_ids = cast(Sequence[str], source_ids)
uids_to_delete = await record_manager.alist_keys(
group_ids=_source_ids, before=index_start_dt
if uids_to_delete:
# Then delete from vector store.
await vector_store.adelete(uids_to_delete)
# First delete from record store.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
if cleanup == "full":
while uids_to_delete := await record_manager.alist_keys(
before=index_start_dt, limit=cleanup_batch_size
# First delete from record store.
await vector_store.adelete(uids_to_delete)
# Then delete from record manager.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
return {
"num_added": num_added,
"num_updated": num_updated,
"num_skipped": num_skipped,
"num_deleted": num_deleted,

@ -1,11 +1,8 @@
from __future__ import annotations
import uuid
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence
NAMESPACE_UUID = uuid.UUID(int=1984)
class RecordManager(ABC):
"""An abstract base class representing the interface for a record manager."""
@ -64,8 +61,16 @@ class RecordManager(ABC):
keys: A list of record keys to upsert.
group_ids: A list of group IDs corresponding to the keys.
time_at_least: if provided, updates should only happen if the
updated_at field is at least this time.
time_at_least: Optional timestamp. Implementation can use this
to optionally verify that the timestamp IS at least this time
in the system that stores the data.
e.g., use to validate that the time in the postgres database
is equal to or larger than the given timestamp, if not
raise an error.
This is meant to help prevent time-drift issues since
time may not be monotonically increasing!
ValueError: If the length of keys doesn't match the length of group_ids.
@ -84,8 +89,16 @@ class RecordManager(ABC):
keys: A list of record keys to upsert.
group_ids: A list of group IDs corresponding to the keys.
time_at_least: if provided, updates should only happen if the
updated_at field is at least this time.
time_at_least: Optional timestamp. Implementation can use this
to optionally verify that the timestamp IS at least this time
in the system that stores the data.
e.g., use to validate that the time in the postgres database
is equal to or larger than the given timestamp, if not
raise an error.
This is meant to help prevent time-drift issues since
time may not be monotonically increasing!
ValueError: If the length of keys doesn't match the length of group_ids.

@ -0,0 +1,105 @@
import time
from typing import Dict, List, Optional, Sequence, TypedDict
from langchain_core.indexing.base import RecordManager
class _Record(TypedDict):
group_id: Optional[str]
updated_at: float
class InMemoryRecordManager(RecordManager):
"""An in-memory record manager for testing purposes."""
def __init__(self, namespace: str) -> None:
# Each key points to a dictionary
# of {'group_id': group_id, 'updated_at': timestamp}
self.records: Dict[str, _Record] = {}
self.namespace = namespace
def create_schema(self) -> None:
"""In-memory schema creation is simply ensuring the structure is initialized."""
async def acreate_schema(self) -> None:
"""In-memory schema creation is simply ensuring the structure is initialized."""
def get_time(self) -> float:
"""Get the current server time as a high resolution timestamp!"""
return time.time()
async def aget_time(self) -> float:
"""Get the current server time as a high resolution timestamp!"""
return self.get_time()
def update(
keys: Sequence[str],
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
if group_ids and len(keys) != len(group_ids):
raise ValueError("Length of keys must match length of group_ids")
for index, key in enumerate(keys):
group_id = group_ids[index] if group_ids else None
if time_at_least and time_at_least > self.get_time():
raise ValueError("time_at_least must be in the past")
self.records[key] = {"group_id": group_id, "updated_at": self.get_time()}
async def aupdate(
keys: Sequence[str],
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
self.update(keys, group_ids=group_ids, time_at_least=time_at_least)
def exists(self, keys: Sequence[str]) -> List[bool]:
return [key in self.records for key in keys]
async def aexists(self, keys: Sequence[str]) -> List[bool]:
return self.exists(keys)
def list_keys(
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
result = []
for key, data in self.records.items():
if before and data["updated_at"] >= before:
if after and data["updated_at"] <= after:
if group_ids and data["group_id"] not in group_ids:
if limit:
return result[:limit]
return result
async def alist_keys(
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
return self.list_keys(
before=before, after=after, group_ids=group_ids, limit=limit
def delete_keys(self, keys: Sequence[str]) -> None:
for key in keys:
if key in self.records:
del self.records[key]
async def adelete_keys(self, keys: Sequence[str]) -> None:

@ -0,0 +1,50 @@
import pytest
from langchain_core.documents import Document
from langchain_core.indexing.api import _HashedDocument
def test_hashed_document_hashing() -> None:
hashed_document = _HashedDocument( # type: ignore[call-arg]
uid="123", page_content="Lorem ipsum dolor sit amet", metadata={"key": "value"}
assert isinstance(hashed_document.hash_, str)
def test_hashing_with_missing_content() -> None:
"""Check that ValueError is raised if page_content is missing."""
with pytest.raises(TypeError):
metadata={"key": "value"},
) # type: ignore
def test_uid_auto_assigned_to_hash() -> None:
"""Test uid is auto-assigned to the hashed_document hash."""
hashed_document = _HashedDocument( # type: ignore[call-arg]
page_content="Lorem ipsum dolor sit amet", metadata={"key": "value"}
assert hashed_document.uid == hashed_document.hash_
def test_to_document() -> None:
"""Test to_document method."""
hashed_document = _HashedDocument( # type: ignore[call-arg]
page_content="Lorem ipsum dolor sit amet", metadata={"key": "value"}
doc = hashed_document.to_document()
assert isinstance(doc, Document)
assert doc.page_content == "Lorem ipsum dolor sit amet"
assert doc.metadata == {"key": "value"}
def test_from_document() -> None:
"""Test from document class method."""
document = Document(
page_content="Lorem ipsum dolor sit amet", metadata={"key": "value"}
hashed_document = _HashedDocument.from_document(document)
# hash should be deterministic
assert hashed_document.hash_ == "fd1dc827-051b-537d-a1fe-1fa043e8b276"
assert hashed_document.uid == hashed_document.hash_

@ -0,0 +1,223 @@
from datetime import datetime
from unittest.mock import patch
import pytest
import pytest_asyncio
from tests.unit_tests.indexing.in_memory import InMemoryRecordManager
def manager() -> InMemoryRecordManager:
"""Initialize the test database and yield the TimestampedSet instance."""
# Initialize and yield the TimestampedSet instance
record_manager = InMemoryRecordManager(namespace="kittens")
return record_manager
async def amanager() -> InMemoryRecordManager:
"""Initialize the test database and yield the TimestampedSet instance."""
# Initialize and yield the TimestampedSet instance
record_manager = InMemoryRecordManager(namespace="kittens")
await record_manager.acreate_schema()
return record_manager
def test_update(manager: InMemoryRecordManager) -> None:
"""Test updating records in the database."""
# no keys should be present in the set
read_keys = manager.list_keys()
assert read_keys == []
# Insert records
keys = ["key1", "key2", "key3"]
# Retrieve the records
read_keys = manager.list_keys()
assert read_keys == ["key1", "key2", "key3"]
async def test_aupdate(amanager: InMemoryRecordManager) -> None:
"""Test updating records in the database."""
# no keys should be present in the set
read_keys = await amanager.alist_keys()
assert read_keys == []
# Insert records
keys = ["key1", "key2", "key3"]
await amanager.aupdate(keys)
# Retrieve the records
read_keys = await amanager.alist_keys()
assert read_keys == ["key1", "key2", "key3"]
def test_update_timestamp(manager: InMemoryRecordManager) -> None:
"""Test updating records in the database."""
# no keys should be present in the set
with patch.object(
manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
assert manager.list_keys() == ["key1"]
assert manager.list_keys(before=datetime(2021, 1, 1).timestamp()) == []
assert manager.list_keys(after=datetime(2021, 1, 1).timestamp()) == ["key1"]
assert manager.list_keys(after=datetime(2021, 1, 3).timestamp()) == []
# Update the timestamp
with patch.object(
manager, "get_time", return_value=datetime(2023, 1, 5).timestamp()
assert manager.list_keys() == ["key1"]
assert manager.list_keys(before=datetime(2023, 1, 1).timestamp()) == []
assert manager.list_keys(after=datetime(2023, 1, 1).timestamp()) == ["key1"]
assert manager.list_keys(after=datetime(2023, 1, 3).timestamp()) == ["key1"]
async def test_aupdate_timestamp(manager: InMemoryRecordManager) -> None:
"""Test updating records in the database."""
# no keys should be present in the set
with patch.object(
manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
await manager.aupdate(["key1"])
assert await manager.alist_keys() == ["key1"]
assert await manager.alist_keys(before=datetime(2021, 1, 1).timestamp()) == []
assert await manager.alist_keys(after=datetime(2021, 1, 1).timestamp()) == ["key1"]
assert await manager.alist_keys(after=datetime(2021, 1, 3).timestamp()) == []
# Update the timestamp
with patch.object(
manager, "get_time", return_value=datetime(2023, 1, 5).timestamp()
await manager.aupdate(["key1"])
assert await manager.alist_keys() == ["key1"]
assert await manager.alist_keys(before=datetime(2023, 1, 1).timestamp()) == []
assert await manager.alist_keys(after=datetime(2023, 1, 1).timestamp()) == ["key1"]
assert await manager.alist_keys(after=datetime(2023, 1, 3).timestamp()) == ["key1"]
def test_exists(manager: InMemoryRecordManager) -> None:
"""Test checking if keys exist in the database."""
# Insert records
keys = ["key1", "key2", "key3"]
# Check if the keys exist in the database
exists = manager.exists(keys)
assert len(exists) == len(keys)
assert exists == [True, True, True]
exists = manager.exists(["key1", "key4"])
assert len(exists) == 2
assert exists == [True, False]
async def test_aexists(amanager: InMemoryRecordManager) -> None:
"""Test checking if keys exist in the database."""
# Insert records
keys = ["key1", "key2", "key3"]
await amanager.aupdate(keys)
# Check if the keys exist in the database
exists = await amanager.aexists(keys)
assert len(exists) == len(keys)
assert exists == [True, True, True]
exists = await amanager.aexists(["key1", "key4"])
assert len(exists) == 2
assert exists == [True, False]
async def test_list_keys(manager: InMemoryRecordManager) -> None:
"""Test listing keys based on the provided date range."""
# Insert records
assert manager.list_keys() == []
assert await manager.alist_keys() == []
with patch.object(
manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
manager.update(["key1", "key2"])
manager.update(["key3"], group_ids=["group1"])
manager.update(["key4"], group_ids=["group2"])
with patch.object(
manager, "get_time", return_value=datetime(2021, 1, 10).timestamp()
assert sorted(manager.list_keys()) == ["key1", "key2", "key3", "key4", "key5"]
assert sorted(await manager.alist_keys()) == [
# By group
assert manager.list_keys(group_ids=["group1"]) == ["key3"]
assert await manager.alist_keys(group_ids=["group1"]) == ["key3"]
# Before
assert sorted(manager.list_keys(before=datetime(2021, 1, 3).timestamp())) == [
assert sorted(
await manager.alist_keys(before=datetime(2021, 1, 3).timestamp())
) == [
# After
assert sorted(manager.list_keys(after=datetime(2021, 1, 3).timestamp())) == ["key5"]
assert sorted(await manager.alist_keys(after=datetime(2021, 1, 3).timestamp())) == [
results = manager.list_keys(limit=1)
assert len(results) == 1
assert results[0] in ["key1", "key2", "key3", "key4", "key5"]
results = await manager.alist_keys(limit=1)
assert len(results) == 1
assert results[0] in ["key1", "key2", "key3", "key4", "key5"]
def test_delete_keys(manager: InMemoryRecordManager) -> None:
"""Test deleting keys from the database."""
# Insert records
keys = ["key1", "key2", "key3"]
# Delete some keys
keys_to_delete = ["key1", "key2"]
# Check if the deleted keys are no longer in the database
remaining_keys = manager.list_keys()
assert remaining_keys == ["key3"]
async def test_adelete_keys(amanager: InMemoryRecordManager) -> None:
"""Test deleting keys from the database."""
# Insert records
keys = ["key1", "key2", "key3"]
await amanager.aupdate(keys)
# Delete some keys
keys_to_delete = ["key1", "key2"]
await amanager.adelete_keys(keys_to_delete)
# Check if the deleted keys are no longer in the database
remaining_keys = await amanager.alist_keys()
assert remaining_keys == ["key3"]

File diff suppressed because it is too large Load Diff

@ -0,0 +1,12 @@
from langchain_core.indexing import __all__
def test_all() -> None:
"""Use to catch obvious breaking changes."""
assert __all__ == sorted(__all__, key=str.lower)
assert __all__ == [

@ -11,7 +11,8 @@ Importantly, Index keeps on working even if the content being written is derived
via a set of transformations from some source content (e.g., indexing children
documents that were derived from parent documents by chunking.)
from langchain.indexes._api import IndexingResult, aindex, index
from langchain_core.indexing.api import IndexingResult, aindex, index
from langchain.indexes._sql_record_manager import SQLRecordManager
from langchain.indexes.graph import GraphIndexCreator
from langchain.indexes.vectorstore import VectorstoreIndexCreator

@ -1,600 +1,5 @@
"""Module contains logic for indexing documents into vector stores."""
from __future__ import annotations
from langchain_core.indexing.api import _abatch, _batch, _HashedDocument
import hashlib
import json
import uuid
from itertools import islice
from typing import (
from langchain_community.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import root_validator
from langchain_core.vectorstores import VectorStore
from langchain.indexes.base import NAMESPACE_UUID, RecordManager
T = TypeVar("T")
def _hash_string_to_uuid(input_string: str) -> uuid.UUID:
"""Hashes a string and returns the corresponding UUID."""
hash_value = hashlib.sha1(input_string.encode("utf-8")).hexdigest()
return uuid.uuid5(NAMESPACE_UUID, hash_value)
def _hash_nested_dict_to_uuid(data: dict[Any, Any]) -> uuid.UUID:
"""Hashes a nested dictionary and returns the corresponding UUID."""
serialized_data = json.dumps(data, sort_keys=True)
hash_value = hashlib.sha1(serialized_data.encode("utf-8")).hexdigest()
return uuid.uuid5(NAMESPACE_UUID, hash_value)
class _HashedDocument(Document):
"""A hashed document with a unique ID."""
uid: str
hash_: str
"""The hash of the document including content and metadata."""
content_hash: str
"""The hash of the document content."""
metadata_hash: str
"""The hash of the document metadata."""
def is_lc_serializable(cls) -> bool:
return False
def calculate_hashes(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Root validator to calculate content and metadata hash."""
content = values.get("page_content", "")
metadata = values.get("metadata", {})
forbidden_keys = ("hash_", "content_hash", "metadata_hash")
for key in forbidden_keys:
if key in metadata:
raise ValueError(
f"Metadata cannot contain key {key} as it "
f"is reserved for internal use."
content_hash = str(_hash_string_to_uuid(content))
metadata_hash = str(_hash_nested_dict_to_uuid(metadata))
except Exception as e:
raise ValueError(
f"Failed to hash metadata: {e}. "
f"Please use a dict that can be serialized using json."
values["content_hash"] = content_hash
values["metadata_hash"] = metadata_hash
values["hash_"] = str(_hash_string_to_uuid(content_hash + metadata_hash))
_uid = values.get("uid", None)
if _uid is None:
values["uid"] = values["hash_"]
return values
def to_document(self) -> Document:
"""Return a Document object."""
return Document(
def from_document(
cls, document: Document, *, uid: Optional[str] = None
) -> _HashedDocument:
"""Create a HashedDocument from a Document."""
return cls( # type: ignore[call-arg]
uid=uid, # type: ignore[arg-type]
def _batch(size: int, iterable: Iterable[T]) -> Iterator[List[T]]:
"""Utility batching function."""
it = iter(iterable)
while True:
chunk = list(islice(it, size))
if not chunk:
yield chunk
async def _abatch(size: int, iterable: AsyncIterable[T]) -> AsyncIterator[List[T]]:
"""Utility batching function."""
batch: List[T] = []
async for element in iterable:
if len(batch) < size:
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch
def _get_source_id_assigner(
source_id_key: Union[str, Callable[[Document], str], None],
) -> Callable[[Document], Union[str, None]]:
"""Get the source id from the document."""
if source_id_key is None:
return lambda doc: None
elif isinstance(source_id_key, str):
return lambda doc: doc.metadata[source_id_key]
elif callable(source_id_key):
return source_id_key
raise ValueError(
f"source_id_key should be either None, a string or a callable. "
f"Got {source_id_key} of type {type(source_id_key)}."
def _deduplicate_in_order(
hashed_documents: Iterable[_HashedDocument],
) -> Iterator[_HashedDocument]:
"""Deduplicate a list of hashed documents while preserving order."""
seen: Set[str] = set()
for hashed_doc in hashed_documents:
if hashed_doc.hash_ not in seen:
yield hashed_doc
class IndexingResult(TypedDict):
"""Return a detailed a breakdown of the result of the indexing operation."""
num_added: int
"""Number of added documents."""
num_updated: int
"""Number of updated documents because they were not up to date."""
num_deleted: int
"""Number of deleted documents."""
num_skipped: int
"""Number of skipped documents because they were already up to date."""
def index(
docs_source: Union[BaseLoader, Iterable[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
) -> IndexingResult:
"""Index data from the loader into the vector store.
Indexing functionality uses a manager to keep track of which documents
are in the vector store.
This allows us to keep track of which documents were updated, and which
documents were deleted, which documents should be skipped.
For the time being, documents are indexed using their hashes, and users
are not able to specify the uid of the document.
if auto_cleanup is set to True, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
docs_source: Data loader or iterable of documents to index.
record_manager: Timestamped set to keep track of which documents were
vector_store: Vector store to index the documents into.
batch_size: Batch size to use when indexing.
cleanup: How to handle clean up of documents.
- Incremental: Cleans up all documents that haven't been updated AND
that are associated with source ids that were seen
during indexing.
Clean up is done continuously during indexing helping
to minimize the probability of users seeing duplicated
- Full: Delete all documents that haven to been returned by the loader.
Clean up runs after all documents have been indexed.
This means that users may see duplicated content during indexing.
- None: Do not delete any documents.
source_id_key: Optional key that helps identify the original source
of the document.
cleanup_batch_size: Batch size to use when cleaning up documents.
force_update: Force update documents even if they are present in the
record manager. Useful if you are re-indexing with updated embeddings.
Indexing result which contains information about how many documents
were added, updated, deleted, or skipped.
if cleanup not in {"incremental", "full", None}:
raise ValueError(
f"cleanup should be one of 'incremental', 'full' or None. "
f"Got {cleanup}."
if cleanup == "incremental" and source_id_key is None:
raise ValueError("Source id key is required when cleanup mode is incremental.")
# Check that the Vectorstore has required methods implemented
methods = ["delete", "add_documents"]
for method in methods:
if not hasattr(vector_store, method):
raise ValueError(
f"Vectorstore {vector_store} does not have required method {method}"
if type(vector_store).delete == VectorStore.delete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
if isinstance(docs_source, BaseLoader):
doc_iterator = docs_source.lazy_load()
except NotImplementedError:
doc_iterator = iter(docs_source.load())
doc_iterator = iter(docs_source)
source_id_assigner = _get_source_id_assigner(source_id_key)
# Mark when the update started.
index_start_dt = record_manager.get_time()
num_added = 0
num_skipped = 0
num_updated = 0
num_deleted = 0
for doc_batch in _batch(batch_size, doc_iterator):
hashed_docs = list(
[_HashedDocument.from_document(doc) for doc in doc_batch]
source_ids: Sequence[Optional[str]] = [
source_id_assigner(doc) for doc in hashed_docs
if cleanup == "incremental":
# If the cleanup mode is incremental, source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
raise ValueError(
"Source ids are required when cleanup mode is incremental. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment]
exists_batch = record_manager.exists([doc.uid for doc in hashed_docs])
# Filter out documents that already exist in the record store.
uids = []
docs_to_index = []
uids_to_refresh = []
seen_docs: Set[str] = set()
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists:
if force_update:
# Update refresh timestamp
if uids_to_refresh:
record_manager.update(uids_to_refresh, time_at_least=index_start_dt)
num_skipped += len(uids_to_refresh)
# Be pessimistic and assume that all vector store write will fail.
# First write to vector store
if docs_to_index:
vector_store.add_documents(docs_to_index, ids=uids, batch_size=batch_size)
num_added += len(docs_to_index) - len(seen_docs)
num_updated += len(seen_docs)
# And only then update the record store.
# Update ALL records, even if they already exist since we want to refresh
# their timestamp.
[doc.uid for doc in hashed_docs],
# If source IDs are provided, we can do the deletion incrementally!
if cleanup == "incremental":
# Get the uids of the documents that were not returned by the loader.
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
_source_ids = cast(Sequence[str], source_ids)
uids_to_delete = record_manager.list_keys(
group_ids=_source_ids, before=index_start_dt
if uids_to_delete:
# Then delete from vector store.
# First delete from record store.
num_deleted += len(uids_to_delete)
if cleanup == "full":
while uids_to_delete := record_manager.list_keys(
before=index_start_dt, limit=cleanup_batch_size
# First delete from record store.
# Then delete from record manager.
num_deleted += len(uids_to_delete)
return {
"num_added": num_added,
"num_updated": num_updated,
"num_skipped": num_skipped,
"num_deleted": num_deleted,
# Define an asynchronous generator function
async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]:
"""Convert an iterable to an async iterator."""
for item in iterator:
yield item
async def aindex(
docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
) -> IndexingResult:
"""Index data from the loader into the vector store.
Indexing functionality uses a manager to keep track of which documents
are in the vector store.
This allows us to keep track of which documents were updated, and which
documents were deleted, which documents should be skipped.
For the time being, documents are indexed using their hashes, and users
are not able to specify the uid of the document.
if auto_cleanup is set to True, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
docs_source: Data loader or iterable of documents to index.
record_manager: Timestamped set to keep track of which documents were
vector_store: Vector store to index the documents into.
batch_size: Batch size to use when indexing.
cleanup: How to handle clean up of documents.
- Incremental: Cleans up all documents that haven't been updated AND
that are associated with source ids that were seen
during indexing.
Clean up is done continuously during indexing helping
to minimize the probability of users seeing duplicated
- Full: Delete all documents that haven to been returned by the loader.
Clean up runs after all documents have been indexed.
This means that users may see duplicated content during indexing.
- None: Do not delete any documents.
source_id_key: Optional key that helps identify the original source
of the document.
cleanup_batch_size: Batch size to use when cleaning up documents.
force_update: Force update documents even if they are present in the
record manager. Useful if you are re-indexing with updated embeddings.
Indexing result which contains information about how many documents
were added, updated, deleted, or skipped.
if cleanup not in {"incremental", "full", None}:
raise ValueError(
f"cleanup should be one of 'incremental', 'full' or None. "
f"Got {cleanup}."
if cleanup == "incremental" and source_id_key is None:
raise ValueError("Source id key is required when cleanup mode is incremental.")
# Check that the Vectorstore has required methods implemented
methods = ["adelete", "aadd_documents"]
for method in methods:
if not hasattr(vector_store, method):
raise ValueError(
f"Vectorstore {vector_store} does not have required method {method}"
if type(vector_store).adelete == VectorStore.adelete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
async_doc_iterator: AsyncIterator[Document]
if isinstance(docs_source, BaseLoader):
async_doc_iterator = docs_source.alazy_load()
except NotImplementedError:
# Exception triggered when neither lazy_load nor alazy_load are implemented.
# * The default implementation of alazy_load uses lazy_load.
# * The default implementation of lazy_load raises NotImplementedError.
# In such a case, we use the load method and convert it to an async
# iterator.
async_doc_iterator = _to_async_iterator(docs_source.load())
if hasattr(docs_source, "__aiter__"):
async_doc_iterator = docs_source # type: ignore[assignment]
async_doc_iterator = _to_async_iterator(docs_source)
source_id_assigner = _get_source_id_assigner(source_id_key)
# Mark when the update started.
index_start_dt = await record_manager.aget_time()
num_added = 0
num_skipped = 0
num_updated = 0
num_deleted = 0
async for doc_batch in _abatch(batch_size, async_doc_iterator):
hashed_docs = list(
[_HashedDocument.from_document(doc) for doc in doc_batch]
source_ids: Sequence[Optional[str]] = [
source_id_assigner(doc) for doc in hashed_docs
if cleanup == "incremental":
# If the cleanup mode is incremental, source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
raise ValueError(
"Source ids are required when cleanup mode is incremental. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids)
exists_batch = await record_manager.aexists([doc.uid for doc in hashed_docs])
# Filter out documents that already exist in the record store.
uids: list[str] = []
docs_to_index: list[Document] = []
uids_to_refresh = []
seen_docs: Set[str] = set()
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists:
if force_update:
if uids_to_refresh:
# Must be updated to refresh timestamp.
await record_manager.aupdate(uids_to_refresh, time_at_least=index_start_dt)
num_skipped += len(uids_to_refresh)
# Be pessimistic and assume that all vector store write will fail.
# First write to vector store
if docs_to_index:
await vector_store.aadd_documents(
docs_to_index, ids=uids, batch_size=batch_size
num_added += len(docs_to_index) - len(seen_docs)
num_updated += len(seen_docs)
# And only then update the record store.
# Update ALL records, even if they already exist since we want to refresh
# their timestamp.
await record_manager.aupdate(
[doc.uid for doc in hashed_docs],
# If source IDs are provided, we can do the deletion incrementally!
if cleanup == "incremental":
# Get the uids of the documents that were not returned by the loader.
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
_source_ids = cast(Sequence[str], source_ids)
uids_to_delete = await record_manager.alist_keys(
group_ids=_source_ids, before=index_start_dt
if uids_to_delete:
# Then delete from vector store.
await vector_store.adelete(uids_to_delete)
# First delete from record store.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
if cleanup == "full":
while uids_to_delete := await record_manager.alist_keys(
before=index_start_dt, limit=cleanup_batch_size
# First delete from record store.
await vector_store.adelete(uids_to_delete)
# Then delete from record manager.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
return {
"num_added": num_added,
"num_updated": num_updated,
"num_skipped": num_skipped,
"num_deleted": num_deleted,
# Please do not use these in your application. These are private APIs.
# Here to avoid changing unit tests during a migration.
__all__ = ["_HashedDocument", "_abatch", "_batch"]

@ -18,6 +18,7 @@ import decimal
import uuid
from typing import Any, AsyncGenerator, Dict, Generator, List, Optional, Sequence, Union
from langchain_core.indexing import RecordManager
from sqlalchemy import (
@ -41,8 +42,6 @@ from sqlalchemy.ext.asyncio import (
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Query, Session, sessionmaker
from langchain.indexes.base import RecordManager
Base = declarative_base()
