diff --git a/libs/core/langchain_core/tracers/log_stream.py b/libs/core/langchain_core/tracers/log_stream.py index 98189ad19c..6d2b7e43a0 100644 --- a/libs/core/langchain_core/tracers/log_stream.py +++ b/libs/core/langchain_core/tracers/log_stream.py @@ -1,5 +1,6 @@ from __future__ import annotations +import copy import math import threading from collections import defaultdict @@ -82,7 +83,7 @@ class RunLogPatch: def __add__(self, other: Union[RunLogPatch, Any]) -> RunLog: if type(other) == RunLogPatch: ops = self.ops + other.ops - state = jsonpatch.apply_patch(None, ops) + state = jsonpatch.apply_patch(None, copy.deepcopy(ops)) return RunLog(*ops, state=state) raise TypeError( diff --git a/libs/core/tests/unit_tests/runnables/test_runnable.py b/libs/core/tests/unit_tests/runnables/test_runnable.py index 9d0bc740bd..bca88f7572 100644 --- a/libs/core/tests/unit_tests/runnables/test_runnable.py +++ b/libs/core/tests/unit_tests/runnables/test_runnable.py @@ -4187,3 +4187,52 @@ async def test_ainvoke_astream_passthrough_assign_trace() -> None: assert tracer.runs[0].name == "RunnableAssign" assert tracer.runs[0].child_runs[0].name == "RunnableParallel" + + +async def test_astream_log_deep_copies() -> None: + """Verify that deep copies are used when using jsonpatch in astream log. + + jsonpatch re-uses objects in its API; e.g., + + import jsonpatch + obj1 = { "a": 1 } + value = { "b": 2 } + obj2 = { "a": 1, "value": value } + + ops = list(jsonpatch.JsonPatch.from_diff(obj1, obj2)) + assert id(ops[0]['value']) == id(value) + + This can create unexpected consequences for downstream code. + """ + + def _get_run_log(run_log_patches: Sequence[RunLogPatch]) -> RunLog: + """Get run log""" + run_log = RunLog(state=None) # type: ignore + for log_patch in run_log_patches: + run_log = run_log + log_patch + return run_log + + def add_one(x: int) -> int: + """Add one.""" + return x + 1 + + chain = RunnableLambda(add_one) + chunks = [] + final_output = None + async for chunk in chain.astream_log(1): + chunks.append(chunk) + if final_output is None: + final_output = chunk + else: + final_output = final_output + chunk + + run_log = _get_run_log(chunks) + state = run_log.state.copy() + # Ignoring type here since we know that the state is a dict + # so we can delete `id` for testing purposes + state.pop("id") # type: ignore + assert state == { + "final_output": 2, + "logs": {}, + "streamed_output": [2], + }