Send evaluator logs to new session (#7206)

Also stop specifying "eval" mode since explicit project modes are
deprecated
pull/7209/head^2
William FH 1 year ago committed by GitHub
parent 0dc700eebf
commit 75aa408f10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -6,6 +6,7 @@ from uuid import UUID
from langchainplus_sdk import LangChainPlusClient, RunEvaluator
from langchain.callbacks.manager import tracing_v2_enabled
from langchain.callbacks.tracers.base import BaseTracer
from langchain.callbacks.tracers.schemas import Run
@ -27,6 +28,8 @@ class EvaluatorCallbackHandler(BaseTracer):
If not specified, a new instance will be created.
example_id : Union[UUID, str], optional
The example ID to be associated with the runs.
project_name : str, optional
The LangSmith project name to be organize eval chain runs under.
Attributes
----------
@ -40,6 +43,8 @@ class EvaluatorCallbackHandler(BaseTracer):
The thread pool executor used for running the evaluators.
futures : Set[Future]
The set of futures representing the running evaluators.
project_name : Optional[str]
The LangSmith project name to be organize eval chain runs under.
"""
name = "evaluator_callback_handler"
@ -50,6 +55,7 @@ class EvaluatorCallbackHandler(BaseTracer):
max_workers: Optional[int] = None,
client: Optional[LangChainPlusClient] = None,
example_id: Optional[Union[UUID, str]] = None,
project_name: Optional[str] = None,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
@ -62,10 +68,24 @@ class EvaluatorCallbackHandler(BaseTracer):
max_workers=max(max_workers or len(evaluators), 1)
)
self.futures: Set[Future] = set()
self.project_name = project_name
def _evaluate_run(self, run: Run, evaluator: RunEvaluator) -> None:
def _evaluate_in_project(self, run: Run, evaluator: RunEvaluator) -> None:
"""Evaluate the run in the project.
Parameters
----------
run : Run
The run to be evaluated.
evaluator : RunEvaluator
The evaluator to use for evaluating the run.
"""
try:
self.client.evaluate_run(run, evaluator)
if self.project_name is None:
self.client.evaluate_run(run, evaluator)
with tracing_v2_enabled(project_name=self.project_name):
self.client.evaluate_run(run, evaluator)
except Exception as e:
logger.error(
f"Error evaluating run {run.id} with "
@ -86,7 +106,9 @@ class EvaluatorCallbackHandler(BaseTracer):
run_ = run.copy()
run_.reference_example_id = self.example_id
for evaluator in self.evaluators:
self.futures.add(self.executor.submit(self._evaluate_run, run_, evaluator))
self.futures.add(
self.executor.submit(self._evaluate_in_project, run_, evaluator)
)
def wait_for_futures(self) -> None:
"""Wait for all futures to complete."""

@ -313,28 +313,35 @@ async def _callbacks_initializer(
project_name: Optional[str],
client: LangChainPlusClient,
run_evaluators: Sequence[RunEvaluator],
evaluation_handler_collector: List[EvaluatorCallbackHandler],
) -> List[BaseTracer]:
"""
Initialize a tracer to share across tasks.
Args:
project_name: The project name for the tracer.
client: The client to use for the tracer.
run_evaluators: The evaluators to run.
evaluation_handler_collector: A list to collect the evaluators.
Used to wait for the evaluators to finish.
Returns:
A LangChainTracer instance with an active project.
The callbacks for this thread.
"""
callbacks: List[BaseTracer] = []
if project_name:
callbacks.append(LangChainTracer(project_name=project_name))
evaluator_project_name = f"{project_name}-evaluators" if project_name else None
if run_evaluators:
callbacks.append(
EvaluatorCallbackHandler(
client=client,
evaluators=run_evaluators,
# We already have concurrency, don't want to overload the machine
max_workers=1,
)
callback = EvaluatorCallbackHandler(
client=client,
evaluators=run_evaluators,
# We already have concurrency, don't want to overload the machine
max_workers=1,
project_name=evaluator_project_name,
)
callbacks.append(callback)
evaluation_handler_collector.append(callback)
return callbacks
@ -382,12 +389,9 @@ async def arun_on_examples(
"""
project_name = _get_project_name(project_name, llm_or_chain_factory, None)
client_ = client or LangChainPlusClient()
client_.create_project(project_name, mode="eval")
client_.create_project(project_name)
results: Dict[str, List[Any]] = {}
evaluation_handler = EvaluatorCallbackHandler(
evaluators=run_evaluators or [], client=client_
)
async def process_example(
example: Example, callbacks: List[BaseCallbackHandler], job_state: dict
@ -410,17 +414,20 @@ async def arun_on_examples(
flush=True,
)
evaluation_handlers: List[EvaluatorCallbackHandler] = []
await _gather_with_concurrency(
concurrency_level,
functools.partial(
_callbacks_initializer,
project_name=project_name,
client=client_,
evaluation_handler_collector=evaluation_handlers,
run_evaluators=run_evaluators or [],
),
*(functools.partial(process_example, e) for e in examples),
)
evaluation_handler.wait_for_futures()
for handler in evaluation_handlers:
handler.wait_for_futures()
return results
@ -581,10 +588,13 @@ def run_on_examples(
results: Dict[str, Any] = {}
project_name = _get_project_name(project_name, llm_or_chain_factory, None)
client_ = client or LangChainPlusClient()
client_.create_project(project_name, mode="eval")
client_.create_project(project_name)
tracer = LangChainTracer(project_name=project_name)
evaluator_project_name = f"{project_name}-evaluators"
evalution_handler = EvaluatorCallbackHandler(
evaluators=run_evaluators or [], client=client_
evaluators=run_evaluators or [],
client=client_,
project_name=evaluator_project_name,
)
callbacks: List[BaseCallbackHandler] = [tracer, evalution_handler]
for i, example in enumerate(examples):

Loading…
Cancel
Save