RunnableLambda: create afunc instance from func when not provided (#13408)

Fixes #13407.

This workaround consists in letting the RunnableLambda create its
self.afunc from its self.func when self.afunc is not provided; the
change has no dependency.

<!-- Thank you for contributing to LangChain!

Replace this entire comment with:
  - **Description:** a description of the change, 
  - **Issue:** the issue # it fixes (if applicable),
  - **Dependencies:** any dependencies required for this change,
- **Tag maintainer:** for a quicker response, tag the relevant
maintainer (see below),
- **Twitter handle:** we announce bigger features on Twitter. If your PR
gets announced, and you'd like a mention, we'll gladly shout you out!

Please make sure your PR is passing linting and testing before
submitting. Run `make format`, `make lint` and `make test` to check this
locally.

See contribution guidelines for more information on how to write/run
tests, lint, etc:

https://github.com/langchain-ai/langchain/blob/master/.github/CONTRIBUTING.md

If you're adding a new integration, please include:
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in `docs/extras`
directory.

If no one reviews your PR within a few days, please @-mention one of
@baskaryan, @eyurtsev, @hwchase17.
 -->

---------

Co-authored-by: Erick Friis <erick@langchain.dev>
Co-authored-by: Nuno Campos <nuno@langchain.dev>
pull/13973/head
Nicolas Bondoux 7 months ago committed by GitHub
parent 391f200eaa
commit e17edc4d0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2477,8 +2477,19 @@ class RunnableLambda(Runnable[Input, Output]):
config: RunnableConfig,
**kwargs: Any,
) -> Output:
if hasattr(self, "afunc"):
afunc = self.afunc
else:
async def f(*args, **kwargs): # type: ignore[no-untyped-def]
return await asyncio.get_running_loop().run_in_executor(
None, partial(self.func, **kwargs), *args
)
afunc = f
output = await acall_func_with_variable_args(
self.afunc, input, config, run_manager, **kwargs
afunc, input, config, run_manager, **kwargs
)
# If the output is a runnable, invoke it
if isinstance(output, Runnable):
@ -2539,17 +2550,13 @@ class RunnableLambda(Runnable[Input, Output]):
**kwargs: Optional[Any],
) -> Output:
"""Invoke this runnable asynchronously."""
if hasattr(self, "afunc"):
return await self._acall_with_config(
self._ainvoke,
input,
self._config(config, self.afunc),
**kwargs,
)
else:
# Delegating to super implementation of ainvoke.
# Uses asyncio executor to run the sync version (invoke)
return await super().ainvoke(input, config)
the_func = self.afunc if hasattr(self, "afunc") else self.func
return await self._acall_with_config(
self._ainvoke,
input,
self._config(config, the_func),
**kwargs,
)
class RunnableEachBase(RunnableSerializable[List[Input], List[Output]]):

@ -4061,3 +4061,22 @@ def test_with_config_callbacks() -> None:
# ConfigError: field "callbacks" not yet prepared so type is still a ForwardRef,
# you might need to call RunnableConfig.update_forward_refs().
assert isinstance(result, RunnableBinding)
@pytest.mark.asyncio
async def test_ainvoke_on_returned_runnable() -> None:
"""Verify that a runnable returned by a sync runnable in the async path will
be runthroughaasync path (issue #13407)"""
def idchain_sync(__input: dict) -> bool:
return False
async def idchain_async(__input: dict) -> bool:
return True
idchain = RunnableLambda(func=idchain_sync, afunc=idchain_async)
def func(__input: dict) -> Runnable:
return idchain
assert await RunnableLambda(func).ainvoke({})

Loading…
Cancel
Save