From e17edc4d0b1dccc98df2a7d1db86bfcc0eb66ca5 Mon Sep 17 00:00:00 2001 From: Nicolas Bondoux Date: Tue, 28 Nov 2023 12:18:26 +0100 Subject: [PATCH] RunnableLambda: create afunc instance from func when not provided (#13408) Fixes #13407. This workaround consists in letting the RunnableLambda create its self.afunc from its self.func when self.afunc is not provided; the change has no dependency. --------- Co-authored-by: Erick Friis Co-authored-by: Nuno Campos --- libs/core/langchain_core/runnables/base.py | 31 ++++++++++++------- .../unit_tests/runnables/test_runnable.py | 19 ++++++++++++ 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/libs/core/langchain_core/runnables/base.py b/libs/core/langchain_core/runnables/base.py index 68778af3c1..3796d4cc7b 100644 --- a/libs/core/langchain_core/runnables/base.py +++ b/libs/core/langchain_core/runnables/base.py @@ -2477,8 +2477,19 @@ class RunnableLambda(Runnable[Input, Output]): config: RunnableConfig, **kwargs: Any, ) -> Output: + if hasattr(self, "afunc"): + afunc = self.afunc + else: + + async def f(*args, **kwargs): # type: ignore[no-untyped-def] + return await asyncio.get_running_loop().run_in_executor( + None, partial(self.func, **kwargs), *args + ) + + afunc = f + output = await acall_func_with_variable_args( - self.afunc, input, config, run_manager, **kwargs + afunc, input, config, run_manager, **kwargs ) # If the output is a runnable, invoke it if isinstance(output, Runnable): @@ -2539,17 +2550,13 @@ class RunnableLambda(Runnable[Input, Output]): **kwargs: Optional[Any], ) -> Output: """Invoke this runnable asynchronously.""" - if hasattr(self, "afunc"): - return await self._acall_with_config( - self._ainvoke, - input, - self._config(config, self.afunc), - **kwargs, - ) - else: - # Delegating to super implementation of ainvoke. - # Uses asyncio executor to run the sync version (invoke) - return await super().ainvoke(input, config) + the_func = self.afunc if hasattr(self, "afunc") else self.func + return await self._acall_with_config( + self._ainvoke, + input, + self._config(config, the_func), + **kwargs, + ) class RunnableEachBase(RunnableSerializable[List[Input], List[Output]]): diff --git a/libs/core/tests/unit_tests/runnables/test_runnable.py b/libs/core/tests/unit_tests/runnables/test_runnable.py index 89522ef3f4..0ee3915966 100644 --- a/libs/core/tests/unit_tests/runnables/test_runnable.py +++ b/libs/core/tests/unit_tests/runnables/test_runnable.py @@ -4061,3 +4061,22 @@ def test_with_config_callbacks() -> None: # ConfigError: field "callbacks" not yet prepared so type is still a ForwardRef, # you might need to call RunnableConfig.update_forward_refs(). assert isinstance(result, RunnableBinding) + + +@pytest.mark.asyncio +async def test_ainvoke_on_returned_runnable() -> None: + """Verify that a runnable returned by a sync runnable in the async path will + be runthroughaasync path (issue #13407)""" + + def idchain_sync(__input: dict) -> bool: + return False + + async def idchain_async(__input: dict) -> bool: + return True + + idchain = RunnableLambda(func=idchain_sync, afunc=idchain_async) + + def func(__input: dict) -> Runnable: + return idchain + + assert await RunnableLambda(func).ainvoke({})