diff --git a/libs/langchain/langchain/schema/runnable/base.py b/libs/langchain/langchain/schema/runnable/base.py index fb000ae1e7..9f6d0bab74 100644 --- a/libs/langchain/langchain/schema/runnable/base.py +++ b/libs/langchain/langchain/schema/runnable/base.py @@ -1795,15 +1795,35 @@ class RunnableEach(Serializable, Runnable[List[Input], List[Output]]): def bind(self, **kwargs: Any) -> RunnableEach[Input, Output]: return RunnableEach(bound=self.bound.bind(**kwargs)) + def _invoke( + self, + inputs: List[Input], + run_manager: CallbackManagerForChainRun, + config: RunnableConfig, + ) -> List[Output]: + return self.bound.batch( + inputs, patch_config(config, callbacks=run_manager.get_child()) + ) + def invoke( self, input: List[Input], config: Optional[RunnableConfig] = None ) -> List[Output]: - return self.bound.batch(input, config) + return self._call_with_config(self._invoke, input, config) + + async def _ainvoke( + self, + inputs: List[Input], + run_manager: AsyncCallbackManagerForChainRun, + config: RunnableConfig, + ) -> List[Output]: + return await self.bound.abatch( + inputs, patch_config(config, callbacks=run_manager.get_child()) + ) async def ainvoke( self, input: List[Input], config: Optional[RunnableConfig] = None, **kwargs: Any ) -> List[Output]: - return await self.bound.abatch(input, config, **kwargs) + return await self._call_with_config(self._ainvoke, input, config) class RunnableBinding(Serializable, Runnable[Input, Output]):