pull/21638/head
Eugene Yurtsev 4 weeks ago
parent 807cd5409d
commit 66d0bb248b

@ -1108,11 +1108,6 @@ class Runnable(Generic[Input, Output], ABC):
exclude_tags=exclude_tags,
)
config = ensure_config(config)
first_event_sent = False
first_event_run_id = None
# Assign the stream handler to the config
config = ensure_config(config)
callbacks = config.get("callbacks")
@ -1142,6 +1137,10 @@ class Runnable(Generic[Input, Output], ABC):
# Start the runnable in a task, so we can start consuming output
task = asyncio.create_task(consume_astream())
first_event_sent = False
first_event_run_id = None
try:
async for event in event_streamer:
if not first_event_sent:

@ -436,13 +436,16 @@ class _AstreamEventHandler(AsyncCallbackHandler):
"metadata": metadata or {},
"name": name,
"run_type": "tool",
"inputs": {"query": query},
}
await self._send(
{
"event": "on_retriever_start",
"data": {
"query": query,
"input": {
"query": query,
}
},
"name": name,
"tags": tags or [],
@ -462,7 +465,10 @@ class _AstreamEventHandler(AsyncCallbackHandler):
{
"event": "on_retriever_end",
"data": {
"output": documents,
"output": {
"documents": documents,
},
"input": run_info["inputs"],
},
"run_id": str(run_id),
"name": run_info["name"],

@ -1490,17 +1490,3 @@ async def test_runnable_with_message_history() -> None:
AIMessage(content="world", id="ai4"),
]
}
async def test_event_stream():
"""Test event stream."""
handler = _AstreamEventHandler()
def foo(x: int):
return x
chain = RunnableLambda(foo)
implementation = _event_stream_implementation(chain, 1, stream=handler)
events = [event async for event in implementation]
assert events == []

Loading…
Cancel
Save