diff --git a/libs/partners/pinecone/langchain_pinecone/vectorstores.py b/libs/partners/pinecone/langchain_pinecone/vectorstores.py index 34b3f85211..1c21179f64 100644 --- a/libs/partners/pinecone/langchain_pinecone/vectorstores.py +++ b/libs/partners/pinecone/langchain_pinecone/vectorstores.py @@ -161,19 +161,26 @@ class PineconeVectorStore(VectorStore): chunk_ids = ids[i : i + embedding_chunk_size] chunk_metadatas = metadatas[i : i + embedding_chunk_size] embeddings = self._embedding.embed_documents(chunk_texts) - async_res = [ + vector_tuples = zip(chunk_ids, embeddings, chunk_metadatas) + if async_req: + # Runs the pinecone upsert asynchronously. + async_res = [ + self._index.upsert( + vectors=batch_vector_tuples, + namespace=namespace, + async_req=async_req, + **kwargs, + ) + for batch_vector_tuples in batch_iterate(batch_size, vector_tuples) + ] + [res.get() for res in async_res] + else: self._index.upsert( - vectors=batch, + vectors=vector_tuples, namespace=namespace, async_req=async_req, **kwargs, ) - for batch in batch_iterate( - batch_size, zip(chunk_ids, embeddings, chunk_metadatas) - ) - ] - if async_req: - [res.get() for res in async_res] return ids @@ -412,6 +419,7 @@ class PineconeVectorStore(VectorStore): upsert_kwargs: Optional[dict] = None, pool_threads: int = 4, embeddings_chunk_size: int = 1000, + async_req: bool = True, *, id_prefix: Optional[str] = None, **kwargs: Any, @@ -453,6 +461,7 @@ class PineconeVectorStore(VectorStore): namespace=namespace, batch_size=batch_size, embedding_chunk_size=embeddings_chunk_size, + async_req=async_req, id_prefix=id_prefix, **(upsert_kwargs or {}), ) diff --git a/libs/partners/pinecone/tests/integration_tests/test_vectorstores.py b/libs/partners/pinecone/tests/integration_tests/test_vectorstores.py index 3d64cec29d..292d29b63c 100644 --- a/libs/partners/pinecone/tests/integration_tests/test_vectorstores.py +++ b/libs/partners/pinecone/tests/integration_tests/test_vectorstores.py @@ -9,6 +9,7 @@ import pytest from langchain_core.documents import Document from langchain_openai import OpenAIEmbeddings from pinecone import PodSpec +from pytest_mock import MockerFixture from langchain_pinecone import PineconeVectorStore @@ -290,3 +291,41 @@ class TestPinecone: query = "What did the president say about Ketanji Brown Jackson" _ = docsearch.similarity_search(query, k=1, namespace=NAMESPACE_NAME) + + @pytest.fixture + def mock_pool_not_supported(self, mocker: MockerFixture) -> None: + """ + This is the error thrown when multiprocessing is not supported. + See https://github.com/langchain-ai/langchain/issues/11168 + """ + mocker.patch( + "multiprocessing.synchronize.SemLock.__init__", + side_effect=OSError( + "FileNotFoundError: [Errno 2] No such file or directory" + ), + ) + + @pytest.mark.usefixtures("mock_pool_not_supported") + def test_that_async_freq_uses_multiprocessing( + self, texts: List[str], embedding_openai: OpenAIEmbeddings + ) -> None: + with pytest.raises(OSError): + PineconeVectorStore.from_texts( + texts=texts, + embedding=embedding_openai, + index_name=INDEX_NAME, + namespace=NAMESPACE_NAME, + async_req=True, + ) + + @pytest.mark.usefixtures("mock_pool_not_supported") + def test_that_async_freq_false_enabled_singlethreading( + self, texts: List[str], embedding_openai: OpenAIEmbeddings + ) -> None: + PineconeVectorStore.from_texts( + texts=texts, + embedding=embedding_openai, + index_name=INDEX_NAME, + namespace=NAMESPACE_NAME, + async_req=False, + )