Pinecone upsert parallelization (#9859)

Issue: closes #9855

* consolidates `from_texts` and `add_texts` functions for pinecone
upsert
* adds two types of batching (one for embeddings and one for index
upsert)
* adds thread pool size when instantiating pinecone index
pull/9791/head
Stanko Kuveljic 9 months ago committed by GitHub
parent 16a27ab244
commit 4765c09703
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -3,15 +3,19 @@ from __future__ import annotations
import logging
import uuid
import warnings
from typing import Any, Callable, Iterable, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple, Union
import numpy as np
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.utils.iter import batch_iterate
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.utils import DistanceStrategy, maximal_marginal_relevance
if TYPE_CHECKING:
from pinecone import Index
logger = logging.getLogger(__name__)
@ -51,16 +55,16 @@ class Pinecone(VectorStore):
"Could not import pinecone python package. "
"Please install it with `pip install pinecone-client`."
)
if not isinstance(index, pinecone.index.Index):
raise ValueError(
f"client should be an instance of pinecone.index.Index, "
f"got {type(index)}"
)
if not isinstance(embedding, Embeddings):
warnings.warn(
"Passing in `embedding` as a Callable is deprecated. Please pass in an"
" Embeddings object instead."
)
if not isinstance(index, pinecone.index.Index):
raise ValueError(
f"client should be an instance of pinecone.index.Index, "
f"got {type(index)}"
)
self._index = index
self._embedding = embedding
self._text_key = text_key
@ -93,15 +97,22 @@ class Pinecone(VectorStore):
ids: Optional[List[str]] = None,
namespace: Optional[str] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Upsert optimization is done by chunking the embeddings and upserting them.
This is done to avoid memory issues and optimize using HTTP based embeddings.
For OpenAI embeddings, use pool_threads>4 when constructing the pinecone.Index,
embedding_chunk_size>1000 and batch_size~64 for best performance.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids to associate with the texts.
namespace: Optional pinecone namespace to add the texts to.
batch_size: Batch size to use when adding the texts to the vectorstore.
embedding_chunk_size: Chunk size to use when embedding the texts.
Returns:
List of ids from adding the texts into the vectorstore.
@ -109,18 +120,34 @@ class Pinecone(VectorStore):
"""
if namespace is None:
namespace = self._namespace
# Embed and create the documents
docs = []
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
embeddings = self._embed_documents(texts)
for i, (text, embedding) in enumerate(zip(texts, embeddings)):
metadata = metadatas[i] if metadatas else {}
metadatas = metadatas or [{} for _ in texts]
for metadata, text in zip(metadatas, texts):
metadata[self._text_key] = text
docs.append((ids[i], embedding, metadata))
# upsert to Pinecone
self._index.upsert(
vectors=docs, namespace=namespace, batch_size=batch_size, **kwargs
)
# For loops to avoid memory issues and optimize when using HTTP based embeddings
# The first loop runs the embeddings, it benefits when using OpenAI embeddings
# The second loops runs the pinecone upsert asynchoronously.
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embed_documents(chunk_texts)
async_res = [
self._index.upsert(
vectors=batch,
namespace=namespace,
async_req=True,
**kwargs,
)
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
)
]
[res.get() for res in async_res]
return ids
def similarity_search_with_score(
@ -302,6 +329,45 @@ class Pinecone(VectorStore):
embedding, k, fetch_k, lambda_mult, filter, namespace
)
@classmethod
def get_pinecone_index(
cls,
index_name: Optional[str],
pool_threads: int = 4,
) -> Index:
"""Return a Pinecone Index instance.
Args:
index_name: Name of the index to use.
pool_threads: Number of threads to use for index upsert.
Returns:
Pinecone Index instance."""
try:
import pinecone
except ImportError:
raise ValueError(
"Could not import pinecone python package. "
"Please install it with `pip install pinecone-client`."
)
indexes = pinecone.list_indexes() # checks if provided index exists
if index_name in indexes:
index = pinecone.Index(index_name, pool_threads=pool_threads)
elif len(indexes) == 0:
raise ValueError(
"No active indexes found in your Pinecone project, "
"are you sure you're using the right Pinecone API key and Environment? "
"Please double check your Pinecone dashboard."
)
else:
raise ValueError(
f"Index '{index_name}' not found in your Pinecone project. "
f"Did you mean one of the following indexes: {', '.join(indexes)}"
)
return index
@classmethod
def from_texts(
cls,
@ -311,9 +377,11 @@ class Pinecone(VectorStore):
ids: Optional[List[str]] = None,
batch_size: int = 32,
text_key: str = "text",
index_name: Optional[str] = None,
namespace: Optional[str] = None,
index_name: Optional[str] = None,
upsert_kwargs: Optional[dict] = None,
pool_threads: int = 4,
embeddings_chunk_size: int = 1000,
**kwargs: Any,
) -> Pinecone:
"""Construct Pinecone wrapper from raw documents.
@ -324,6 +392,7 @@ class Pinecone(VectorStore):
This is intended to be a quick way to get started.
The `pool_threads` affects the speed of the upsert operations.
Example:
.. code-block:: python
@ -341,54 +410,19 @@ class Pinecone(VectorStore):
index_name="langchain-demo"
)
"""
try:
import pinecone
except ImportError:
raise ValueError(
"Could not import pinecone python package. "
"Please install it with `pip install pinecone-client`."
)
indexes = pinecone.list_indexes() # checks if provided index exists
if index_name in indexes:
index = pinecone.Index(index_name)
elif len(indexes) == 0:
raise ValueError(
"No active indexes found in your Pinecone project, "
"are you sure you're using the right API key and environment?"
)
else:
raise ValueError(
f"Index '{index_name}' not found in your Pinecone project. "
f"Did you mean one of the following indexes: {', '.join(indexes)}"
)
for i in range(0, len(texts), batch_size):
# set end position of batch
i_end = min(i + batch_size, len(texts))
# get batch of texts and ids
lines_batch = texts[i:i_end]
# create ids if not provided
if ids:
ids_batch = ids[i:i_end]
else:
ids_batch = [str(uuid.uuid4()) for n in range(i, i_end)]
# create embeddings
embeds = embedding.embed_documents(lines_batch)
# prep metadata and upsert batch
if metadatas:
metadata = metadatas[i:i_end]
else:
metadata = [{} for _ in range(i, i_end)]
for j, line in enumerate(lines_batch):
metadata[j][text_key] = line
to_upsert = zip(ids_batch, embeds, metadata)
pinecone_index = cls.get_pinecone_index(index_name, pool_threads)
pinecone = cls(pinecone_index, embedding, text_key, namespace, **kwargs)
# upsert to Pinecone
_upsert_kwargs = upsert_kwargs or {}
index.upsert(vectors=list(to_upsert), namespace=namespace, **_upsert_kwargs)
return cls(index, embedding, text_key, namespace, **kwargs)
pinecone.add_texts(
texts,
metadatas=metadatas,
ids=ids,
namespace=namespace,
batch_size=batch_size,
embedding_chunk_size=embeddings_chunk_size,
**(upsert_kwargs or {}),
)
return pinecone
@classmethod
def from_existing_index(
@ -397,17 +431,11 @@ class Pinecone(VectorStore):
embedding: Embeddings,
text_key: str = "text",
namespace: Optional[str] = None,
pool_threads: int = 4,
) -> Pinecone:
"""Load pinecone vectorstore from index name."""
try:
import pinecone
except ImportError:
raise ValueError(
"Could not import pinecone python package. "
"Please install it with `pip install pinecone-client`."
)
return cls(pinecone.Index(index_name), embedding, text_key, namespace)
pinecone_index = cls.get_pinecone_index(index_name, pool_threads)
return cls(pinecone_index, embedding, text_key, namespace)
def delete(
self,

@ -231,3 +231,57 @@ class TestPinecone:
assert all(
(1 >= score or np.isclose(score, 1)) and score >= 0 for _, score in output
)
@pytest.mark.skipif(reason="slow to run for benchmark")
@pytest.mark.parametrize(
"pool_threads,batch_size,embeddings_chunk_size,data_multiplier",
[
(
1,
32,
32,
1000,
), # simulate single threaded with embeddings_chunk_size = batch_size = 32
(
1,
32,
1000,
1000,
), # simulate single threaded with embeddings_chunk_size = 1000
(
4,
32,
1000,
1000,
), # simulate 4 threaded with embeddings_chunk_size = 1000
(20, 64, 5000, 1000),
], # simulate 20 threaded with embeddings_chunk_size = 5000
)
def test_from_texts_with_metadatas_benchmark(
self,
pool_threads: int,
batch_size: int,
embeddings_chunk_size: int,
data_multiplier: int,
documents: List[Document],
embedding_openai: OpenAIEmbeddings,
) -> None:
"""Test end to end construction and search."""
texts = [document.page_content for document in documents] * data_multiplier
uuids = [uuid.uuid4().hex for _ in range(len(texts))]
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = Pinecone.from_texts(
texts,
embedding_openai,
ids=uuids,
metadatas=metadatas,
index_name=index_name,
namespace=namespace_name,
pool_threads=pool_threads,
batch_size=batch_size,
embeddings_chunk_size=embeddings_chunk_size,
)
query = "What did the president say about Ketanji Brown Jackson"
_ = docsearch.similarity_search(query, k=1, namespace=namespace_name)

@ -40,4 +40,4 @@ ignore-regex = '.*(Stati Uniti|Tense=Pres).*'
# whats is a typo but used frequently in queries so kept as is
# aapply - async apply
# unsecure - typo but part of API, decided to not bother for now
ignore-words-list = 'momento,collison,ned,foor,reworkd,parth,whats,aapply,mysogyny,unsecure,damon,crate'
ignore-words-list = 'momento,collison,ned,foor,reworkd,parth,whats,aapply,mysogyny,unsecure,damon,crate'

Loading…
Cancel
Save