Use deepcopy in RunLogPatch (#14244)

This PR adds deepcopy usage in RunLogPatch.

I included a unit-test that shows an issue that was caused in LangServe
in the RemoteClient.

```python
import jsonpatch

s1 = {}
s2 = {'value': []}
s3 = {'value': ['a']}

ops0 = list(jsonpatch.JsonPatch.from_diff(None, s1))
ops1 = list(jsonpatch.JsonPatch.from_diff(s1, s2))
ops2 = list(jsonpatch.JsonPatch.from_diff(s2, s3))
ops = ops0 + ops1 + ops2

jsonpatch.apply_patch(None, ops)
{'value': ['a']}

jsonpatch.apply_patch(None, ops)
{'value': ['a', 'a']}

jsonpatch.apply_patch(None, ops)
{'value': ['a', 'a', 'a']}
```
pull/14451/head^2
Eugene Yurtsev 10 months ago committed by GitHub
parent 1d7e5c51aa
commit 37bee92b8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,5 +1,6 @@
from __future__ import annotations
import copy
import math
import threading
from collections import defaultdict
@ -82,7 +83,7 @@ class RunLogPatch:
def __add__(self, other: Union[RunLogPatch, Any]) -> RunLog:
if type(other) == RunLogPatch:
ops = self.ops + other.ops
state = jsonpatch.apply_patch(None, ops)
state = jsonpatch.apply_patch(None, copy.deepcopy(ops))
return RunLog(*ops, state=state)
raise TypeError(

@ -4187,3 +4187,52 @@ async def test_ainvoke_astream_passthrough_assign_trace() -> None:
assert tracer.runs[0].name == "RunnableAssign"
assert tracer.runs[0].child_runs[0].name == "RunnableParallel"
async def test_astream_log_deep_copies() -> None:
"""Verify that deep copies are used when using jsonpatch in astream log.
jsonpatch re-uses objects in its API; e.g.,
import jsonpatch
obj1 = { "a": 1 }
value = { "b": 2 }
obj2 = { "a": 1, "value": value }
ops = list(jsonpatch.JsonPatch.from_diff(obj1, obj2))
assert id(ops[0]['value']) == id(value)
This can create unexpected consequences for downstream code.
"""
def _get_run_log(run_log_patches: Sequence[RunLogPatch]) -> RunLog:
"""Get run log"""
run_log = RunLog(state=None) # type: ignore
for log_patch in run_log_patches:
run_log = run_log + log_patch
return run_log
def add_one(x: int) -> int:
"""Add one."""
return x + 1
chain = RunnableLambda(add_one)
chunks = []
final_output = None
async for chunk in chain.astream_log(1):
chunks.append(chunk)
if final_output is None:
final_output = chunk
else:
final_output = final_output + chunk
run_log = _get_run_log(chunks)
state = run_log.state.copy()
# Ignoring type here since we know that the state is a dict
# so we can delete `id` for testing purposes
state.pop("id") # type: ignore
assert state == {
"final_output": 2,
"logs": {},
"streamed_output": [2],
}

Loading…
Cancel
Save