pinecone: Fix multiprocessing issue in PineconeVectorStore (#22571)

**Description:**

Currently, the `langchain_pinecone` library forces the `async_req`
(asynchronous required) argument to Pinecone to `True`. This design
choice causes problems when deploying to environments that do not
support multiprocessing, such as AWS Lambda. In such environments, this
restriction can prevent users from successfully using
`langchain_pinecone`.

This PR introduces a change that allows users to specify whether they
want to use asynchronous requests by passing the `async_req` parameter
through `**kwargs`. By doing so, users can set `async_req=False` to
utilize synchronous processing, making the library compatible with AWS
Lambda and other environments that do not support multithreading.

**Issue:**
This PR does not address a specific issue number but aims to resolve
compatibility issues with AWS Lambda by allowing synchronous processing.

**Dependencies:**
None, that I'm aware of.

---------

Co-authored-by: Erick Friis <erick@langchain.dev>
pull/23249/head
Vwake04 2 weeks ago committed by GitHub
parent 75c7c3a1a7
commit 0deb98ac0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -161,19 +161,26 @@ class PineconeVectorStore(VectorStore):
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embedding.embed_documents(chunk_texts)
async_res = [
vector_tuples = zip(chunk_ids, embeddings, chunk_metadatas)
if async_req:
# Runs the pinecone upsert asynchronously.
async_res = [
self._index.upsert(
vectors=batch_vector_tuples,
namespace=namespace,
async_req=async_req,
**kwargs,
)
for batch_vector_tuples in batch_iterate(batch_size, vector_tuples)
]
[res.get() for res in async_res]
else:
self._index.upsert(
vectors=batch,
vectors=vector_tuples,
namespace=namespace,
async_req=async_req,
**kwargs,
)
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
)
]
if async_req:
[res.get() for res in async_res]
return ids
@ -412,6 +419,7 @@ class PineconeVectorStore(VectorStore):
upsert_kwargs: Optional[dict] = None,
pool_threads: int = 4,
embeddings_chunk_size: int = 1000,
async_req: bool = True,
*,
id_prefix: Optional[str] = None,
**kwargs: Any,
@ -453,6 +461,7 @@ class PineconeVectorStore(VectorStore):
namespace=namespace,
batch_size=batch_size,
embedding_chunk_size=embeddings_chunk_size,
async_req=async_req,
id_prefix=id_prefix,
**(upsert_kwargs or {}),
)

@ -9,6 +9,7 @@ import pytest
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from pinecone import PodSpec
from pytest_mock import MockerFixture
from langchain_pinecone import PineconeVectorStore
@ -290,3 +291,41 @@ class TestPinecone:
query = "What did the president say about Ketanji Brown Jackson"
_ = docsearch.similarity_search(query, k=1, namespace=NAMESPACE_NAME)
@pytest.fixture
def mock_pool_not_supported(self, mocker: MockerFixture) -> None:
"""
This is the error thrown when multiprocessing is not supported.
See https://github.com/langchain-ai/langchain/issues/11168
"""
mocker.patch(
"multiprocessing.synchronize.SemLock.__init__",
side_effect=OSError(
"FileNotFoundError: [Errno 2] No such file or directory"
),
)
@pytest.mark.usefixtures("mock_pool_not_supported")
def test_that_async_freq_uses_multiprocessing(
self, texts: List[str], embedding_openai: OpenAIEmbeddings
) -> None:
with pytest.raises(OSError):
PineconeVectorStore.from_texts(
texts=texts,
embedding=embedding_openai,
index_name=INDEX_NAME,
namespace=NAMESPACE_NAME,
async_req=True,
)
@pytest.mark.usefixtures("mock_pool_not_supported")
def test_that_async_freq_false_enabled_singlethreading(
self, texts: List[str], embedding_openai: OpenAIEmbeddings
) -> None:
PineconeVectorStore.from_texts(
texts=texts,
embedding=embedding_openai,
index_name=INDEX_NAME,
namespace=NAMESPACE_NAME,
async_req=False,
)

Loading…
Cancel
Save