@ -64,6 +64,7 @@ from langchain_core.runnables import (
RunnableLambda ,
RunnableParallel ,
RunnablePassthrough ,
RunnablePick ,
RunnableSequence ,
RunnableWithFallbacks ,
add ,
@ -510,7 +511,7 @@ def test_schemas(snapshot: SnapshotAssertion) -> None:
} ,
" items " : { " $ref " : " #/definitions/PromptInput " } ,
" type " : " array " ,
" title " : " RunnableEach Input" ,
" title " : " RunnableEach <PromptTemplate> Input" ,
}
assert prompt_mapper . output_schema . schema ( ) == snapshot
@ -571,7 +572,7 @@ def test_schemas(snapshot: SnapshotAssertion) -> None:
" properties " : { " name " : { " title " : " Name " , " type " : " string " } } ,
}
assert seq_w_map . output_schema . schema ( ) == {
" title " : " RunnableParallel Output" ,
" title " : " RunnableParallel <original,as_list,length> Output" ,
" type " : " object " ,
" properties " : {
" original " : { " title " : " Original " , " type " : " string " } ,
@ -615,7 +616,7 @@ def test_passthrough_assign_schema() -> None:
# expected dict input_schema
assert invalid_seq_w_assign . input_schema . schema ( ) == {
" properties " : { " question " : { " title " : " Question " } } ,
" title " : " RunnableParallel Input" ,
" title " : " RunnableParallel <context> Input" ,
" type " : " object " ,
}
@ -774,7 +775,7 @@ def test_schema_complex_seq() -> None:
)
assert chain2 . input_schema . schema ( ) == {
" title " : " RunnableParallel Input" ,
" title " : " RunnableParallel <city,language> Input" ,
" type " : " object " ,
" properties " : {
" person " : { " title " : " Person " , " type " : " string " } ,
@ -2160,8 +2161,8 @@ async def test_stream_log_retriever() -> None:
" FakeListLLM:2 " ,
" Retriever " ,
" RunnableLambda " ,
" RunnableParallel " ,
" RunnableParallel :2 " ,
" RunnableParallel <documents,question> " ,
" RunnableParallel <one,two> " ,
]
@ -2444,7 +2445,7 @@ What is your name?"""
parent_run = next ( r for r in tracer . runs if r . parent_run_id is None )
assert len ( parent_run . child_runs ) == 4
map_run = parent_run . child_runs [ 0 ]
assert map_run . name == " RunnableParallel "
assert map_run . name == " RunnableParallel <question,documents,just_to_test_lambda> "
assert len ( map_run . child_runs ) == 3
@ -2505,7 +2506,7 @@ def test_seq_prompt_dict(mocker: MockerFixture, snapshot: SnapshotAssertion) ->
parent_run = next ( r for r in tracer . runs if r . parent_run_id is None )
assert len ( parent_run . child_runs ) == 3
map_run = parent_run . child_runs [ 2 ]
assert map_run . name == " RunnableParallel "
assert map_run . name == " RunnableParallel <chat,llm> "
assert len ( map_run . child_runs ) == 2
@ -2721,7 +2722,7 @@ def test_seq_prompt_map(mocker: MockerFixture, snapshot: SnapshotAssertion) -> N
parent_run = next ( r for r in tracer . runs if r . parent_run_id is None )
assert len ( parent_run . child_runs ) == 3
map_run = parent_run . child_runs [ 2 ]
assert map_run . name == " RunnableParallel "
assert map_run . name == " RunnableParallel <chat,llm,passthrough> "
assert len ( map_run . child_runs ) == 3
@ -2770,7 +2771,7 @@ def test_map_stream() -> None:
{ " question " : " What is your name? " }
)
chain_pick_one = chain | RunnablePassthrough . pick ( " llm " )
chain_pick_one = chain . pick ( " llm " )
assert chain_pick_one . output_schema . schema ( ) == {
" title " : " RunnableSequenceOutput " ,
@ -2791,10 +2792,8 @@ def test_map_stream() -> None:
assert streamed_chunks [ 0 ] == " i "
assert len ( streamed_chunks ) == len ( llm_res )
chain_pick_two = (
chain
| RunnablePassthrough . assign ( hello = RunnablePassthrough . pick ( " llm " ) | llm )
| RunnablePassthrough . pick ( [ " llm " , " hello " ] )
chain_pick_two = chain . assign ( hello = RunnablePick ( " llm " ) . pipe ( llm ) ) . pick (
[ " llm " , " hello " ]
)
assert chain_pick_two . output_schema . schema ( ) == {
@ -2940,12 +2939,15 @@ async def test_map_astream() -> None:
assert final_state . state [ " logs " ] [ " ChatPromptTemplate " ] [
" final_output "
] == prompt . invoke ( { " question " : " What is your name? " } )
assert final_state . state [ " logs " ] [ " RunnableParallel " ] [ " name " ] == " RunnableParallel "
assert (
final_state . state [ " logs " ] [ " RunnableParallel<chat,llm,passthrough> " ] [ " name " ]
== " RunnableParallel<chat,llm,passthrough> "
)
assert sorted ( final_state . state [ " logs " ] ) == [
" ChatPromptTemplate " ,
" FakeListChatModel " ,
" FakeStreamingListLLM " ,
" RunnableParallel " ,
" RunnableParallel <chat,llm,passthrough> " ,
" RunnablePassthrough " ,
]
@ -2985,11 +2987,14 @@ async def test_map_astream() -> None:
assert final_state . state [ " logs " ] [ " ChatPromptTemplate " ] [ " final_output " ] == (
prompt . invoke ( { " question " : " What is your name? " } )
)
assert final_state . state [ " logs " ] [ " RunnableParallel " ] [ " name " ] == " RunnableParallel "
assert (
final_state . state [ " logs " ] [ " RunnableParallel<chat,llm,passthrough> " ] [ " name " ]
== " RunnableParallel<chat,llm,passthrough> "
)
assert sorted ( final_state . state [ " logs " ] ) == [
" ChatPromptTemplate " ,
" FakeStreamingListLLM " ,
" RunnableParallel " ,
" RunnableParallel <chat,llm,passthrough> " ,
" RunnablePassthrough " ,
]
@ -3130,9 +3135,7 @@ def test_deep_stream_assign() -> None:
assert len ( chunks ) == len ( " foo-lish " )
assert add ( chunks ) == { " str " : " foo-lish " }
chain_with_assign = chain | RunnablePassthrough . assign (
hello = itemgetter ( " str " ) | llm
)
chain_with_assign = chain . assign ( hello = itemgetter ( " str " ) | llm )
assert chain_with_assign . input_schema . schema ( ) == {
" title " : " PromptInput " ,
@ -3179,7 +3182,7 @@ def test_deep_stream_assign() -> None:
" hello " : " foo-lish " ,
}
chain_with_assign_shadow = chain | RunnablePassthrough . assign (
chain_with_assign_shadow = chain . assign (
str = lambda _ : " shadow " ,
hello = itemgetter ( " str " ) | llm ,
)
@ -3254,7 +3257,7 @@ async def test_deep_astream_assign() -> None:
assert len ( chunks ) == len ( " foo-lish " )
assert add ( chunks ) == { " str " : " foo-lish " }
chain_with_assign = chain | RunnablePassthrough . assign (
chain_with_assign = chain . assign (
hello = itemgetter ( " str " ) | llm ,
)
@ -4473,15 +4476,15 @@ def test_invoke_stream_passthrough_assign_trace() -> None:
tracer = FakeTracer ( )
chain . invoke ( { " example " : [ 1 , 2 , 3 ] } , dict ( callbacks = [ tracer ] ) )
assert tracer . runs [ 0 ] . name == " RunnableAssign "
assert tracer . runs [ 0 ] . child_runs [ 0 ] . name == " RunnableParallel "
assert tracer . runs [ 0 ] . name == " RunnableAssign <urls> "
assert tracer . runs [ 0 ] . child_runs [ 0 ] . name == " RunnableParallel <urls> "
tracer = FakeTracer ( )
for item in chain . stream ( { " example " : [ 1 , 2 , 3 ] } , dict ( callbacks = [ tracer ] ) ) :
pass
assert tracer . runs [ 0 ] . name == " RunnableAssign "
assert tracer . runs [ 0 ] . child_runs [ 0 ] . name == " RunnableParallel "
assert tracer . runs [ 0 ] . name == " RunnableAssign <urls> "
assert tracer . runs [ 0 ] . child_runs [ 0 ] . name == " RunnableParallel <urls> "
async def test_ainvoke_astream_passthrough_assign_trace ( ) - > None :
@ -4493,15 +4496,15 @@ async def test_ainvoke_astream_passthrough_assign_trace() -> None:
tracer = FakeTracer ( )
await chain . ainvoke ( { " example " : [ 1 , 2 , 3 ] } , dict ( callbacks = [ tracer ] ) )
assert tracer . runs [ 0 ] . name == " RunnableAssign "
assert tracer . runs [ 0 ] . child_runs [ 0 ] . name == " RunnableParallel "
assert tracer . runs [ 0 ] . name == " RunnableAssign <urls> "
assert tracer . runs [ 0 ] . child_runs [ 0 ] . name == " RunnableParallel <urls> "
tracer = FakeTracer ( )
async for item in chain . astream ( { " example " : [ 1 , 2 , 3 ] } , dict ( callbacks = [ tracer ] ) ) :
pass
assert tracer . runs [ 0 ] . name == " RunnableAssign "
assert tracer . runs [ 0 ] . child_runs [ 0 ] . name == " RunnableParallel "
assert tracer . runs [ 0 ] . name == " RunnableAssign <urls> "
assert tracer . runs [ 0 ] . child_runs [ 0 ] . name == " RunnableParallel <urls> "
async def test_astream_log_deep_copies ( ) - > None :