|
|
|
@ -1,4 +1,5 @@
|
|
|
|
|
"""A tracer that runs evaluators over completed runs."""
|
|
|
|
|
import logging
|
|
|
|
|
from concurrent.futures import Future, ThreadPoolExecutor, wait
|
|
|
|
|
from typing import Any, Optional, Sequence, Set, Union
|
|
|
|
|
from uuid import UUID
|
|
|
|
@ -8,6 +9,8 @@ from langchainplus_sdk import LangChainPlusClient, RunEvaluator
|
|
|
|
|
from langchain.callbacks.tracers.base import BaseTracer
|
|
|
|
|
from langchain.callbacks.tracers.schemas import Run
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EvaluatorCallbackHandler(BaseTracer):
|
|
|
|
|
"""A tracer that runs a run evaluator whenever a run is persisted.
|
|
|
|
@ -47,7 +50,7 @@ class EvaluatorCallbackHandler(BaseTracer):
|
|
|
|
|
max_workers: Optional[int] = None,
|
|
|
|
|
client: Optional[LangChainPlusClient] = None,
|
|
|
|
|
example_id: Optional[Union[UUID, str]] = None,
|
|
|
|
|
**kwargs: Any
|
|
|
|
|
**kwargs: Any,
|
|
|
|
|
) -> None:
|
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
|
self.example_id = (
|
|
|
|
@ -60,6 +63,17 @@ class EvaluatorCallbackHandler(BaseTracer):
|
|
|
|
|
)
|
|
|
|
|
self.futures: Set[Future] = set()
|
|
|
|
|
|
|
|
|
|
def _evaluate_run(self, run: Run, evaluator: RunEvaluator) -> None:
|
|
|
|
|
try:
|
|
|
|
|
self.client.evaluate_run(run, evaluator)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(
|
|
|
|
|
f"Error evaluating run {run.id} with "
|
|
|
|
|
f"{evaluator.__class__.__name__}: {e}",
|
|
|
|
|
exc_info=True,
|
|
|
|
|
)
|
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
def _persist_run(self, run: Run) -> None:
|
|
|
|
|
"""Run the evaluator on the run.
|
|
|
|
|
|
|
|
|
@ -72,9 +86,7 @@ class EvaluatorCallbackHandler(BaseTracer):
|
|
|
|
|
run_ = run.copy()
|
|
|
|
|
run_.reference_example_id = self.example_id
|
|
|
|
|
for evaluator in self.evaluators:
|
|
|
|
|
self.futures.add(
|
|
|
|
|
self.executor.submit(self.client.evaluate_run, run_, evaluator)
|
|
|
|
|
)
|
|
|
|
|
self.futures.add(self.executor.submit(self._evaluate_run, run_, evaluator))
|
|
|
|
|
|
|
|
|
|
def wait_for_futures(self) -> None:
|
|
|
|
|
"""Wait for all futures to complete."""
|
|
|
|
|