diff --git a/libs/langchain/langchain/utils/aiter.py b/libs/langchain/langchain/utils/aiter.py index a71650cdb4..ca44dee395 100644 --- a/libs/langchain/langchain/utils/aiter.py +++ b/libs/langchain/langchain/utils/aiter.py @@ -107,14 +107,15 @@ async def tee_peer( peer_buffer.append(item) yield buffer.popleft() finally: - # this peer is done – remove its buffer - for idx, peer_buffer in enumerate(peers): # pragma: no branch - if peer_buffer is buffer: - peers.pop(idx) - break - # if we are the last peer, try and close the iterator - if not peers and hasattr(iterator, "aclose"): - await iterator.aclose() + async with lock: + # this peer is done – remove its buffer + for idx, peer_buffer in enumerate(peers): # pragma: no branch + if peer_buffer is buffer: + peers.pop(idx) + break + # if we are the last peer, try and close the iterator + if not peers and hasattr(iterator, "aclose"): + await iterator.aclose() class Tee(Generic[T]): diff --git a/libs/langchain/langchain/utils/iter.py b/libs/langchain/langchain/utils/iter.py index 498ccfbf73..1b95f180ea 100644 --- a/libs/langchain/langchain/utils/iter.py +++ b/libs/langchain/langchain/utils/iter.py @@ -60,14 +60,15 @@ def tee_peer( peer_buffer.append(item) yield buffer.popleft() finally: - # this peer is done – remove its buffer - for idx, peer_buffer in enumerate(peers): # pragma: no branch - if peer_buffer is buffer: - peers.pop(idx) - break - # if we are the last peer, try and close the iterator - if not peers and hasattr(iterator, "close"): - iterator.close() + with lock: + # this peer is done – remove its buffer + for idx, peer_buffer in enumerate(peers): # pragma: no branch + if peer_buffer is buffer: + peers.pop(idx) + break + # if we are the last peer, try and close the iterator + if not peers and hasattr(iterator, "close"): + iterator.close() class Tee(Generic[T]):