diff --git a/langchain/callbacks/tracers/evaluation.py b/langchain/callbacks/tracers/evaluation.py index 52f83b85f9..f6ec4fe68f 100644 --- a/langchain/callbacks/tracers/evaluation.py +++ b/langchain/callbacks/tracers/evaluation.py @@ -6,6 +6,7 @@ from uuid import UUID from langchainplus_sdk import LangChainPlusClient, RunEvaluator +from langchain.callbacks.manager import tracing_v2_enabled from langchain.callbacks.tracers.base import BaseTracer from langchain.callbacks.tracers.schemas import Run @@ -27,6 +28,8 @@ class EvaluatorCallbackHandler(BaseTracer): If not specified, a new instance will be created. example_id : Union[UUID, str], optional The example ID to be associated with the runs. + project_name : str, optional + The LangSmith project name to be organize eval chain runs under. Attributes ---------- @@ -40,6 +43,8 @@ class EvaluatorCallbackHandler(BaseTracer): The thread pool executor used for running the evaluators. futures : Set[Future] The set of futures representing the running evaluators. + project_name : Optional[str] + The LangSmith project name to be organize eval chain runs under. """ name = "evaluator_callback_handler" @@ -50,6 +55,7 @@ class EvaluatorCallbackHandler(BaseTracer): max_workers: Optional[int] = None, client: Optional[LangChainPlusClient] = None, example_id: Optional[Union[UUID, str]] = None, + project_name: Optional[str] = None, **kwargs: Any, ) -> None: super().__init__(**kwargs) @@ -62,10 +68,24 @@ class EvaluatorCallbackHandler(BaseTracer): max_workers=max(max_workers or len(evaluators), 1) ) self.futures: Set[Future] = set() + self.project_name = project_name - def _evaluate_run(self, run: Run, evaluator: RunEvaluator) -> None: + def _evaluate_in_project(self, run: Run, evaluator: RunEvaluator) -> None: + """Evaluate the run in the project. + + Parameters + ---------- + run : Run + The run to be evaluated. + evaluator : RunEvaluator + The evaluator to use for evaluating the run. + + """ try: - self.client.evaluate_run(run, evaluator) + if self.project_name is None: + self.client.evaluate_run(run, evaluator) + with tracing_v2_enabled(project_name=self.project_name): + self.client.evaluate_run(run, evaluator) except Exception as e: logger.error( f"Error evaluating run {run.id} with " @@ -86,7 +106,9 @@ class EvaluatorCallbackHandler(BaseTracer): run_ = run.copy() run_.reference_example_id = self.example_id for evaluator in self.evaluators: - self.futures.add(self.executor.submit(self._evaluate_run, run_, evaluator)) + self.futures.add( + self.executor.submit(self._evaluate_in_project, run_, evaluator) + ) def wait_for_futures(self) -> None: """Wait for all futures to complete.""" diff --git a/langchain/client/runner_utils.py b/langchain/client/runner_utils.py index 73aa3c2b09..7c07a93745 100644 --- a/langchain/client/runner_utils.py +++ b/langchain/client/runner_utils.py @@ -313,28 +313,35 @@ async def _callbacks_initializer( project_name: Optional[str], client: LangChainPlusClient, run_evaluators: Sequence[RunEvaluator], + evaluation_handler_collector: List[EvaluatorCallbackHandler], ) -> List[BaseTracer]: """ Initialize a tracer to share across tasks. Args: project_name: The project name for the tracer. + client: The client to use for the tracer. + run_evaluators: The evaluators to run. + evaluation_handler_collector: A list to collect the evaluators. + Used to wait for the evaluators to finish. Returns: - A LangChainTracer instance with an active project. + The callbacks for this thread. """ callbacks: List[BaseTracer] = [] if project_name: callbacks.append(LangChainTracer(project_name=project_name)) + evaluator_project_name = f"{project_name}-evaluators" if project_name else None if run_evaluators: - callbacks.append( - EvaluatorCallbackHandler( - client=client, - evaluators=run_evaluators, - # We already have concurrency, don't want to overload the machine - max_workers=1, - ) + callback = EvaluatorCallbackHandler( + client=client, + evaluators=run_evaluators, + # We already have concurrency, don't want to overload the machine + max_workers=1, + project_name=evaluator_project_name, ) + callbacks.append(callback) + evaluation_handler_collector.append(callback) return callbacks @@ -382,12 +389,9 @@ async def arun_on_examples( """ project_name = _get_project_name(project_name, llm_or_chain_factory, None) client_ = client or LangChainPlusClient() - client_.create_project(project_name, mode="eval") + client_.create_project(project_name) results: Dict[str, List[Any]] = {} - evaluation_handler = EvaluatorCallbackHandler( - evaluators=run_evaluators or [], client=client_ - ) async def process_example( example: Example, callbacks: List[BaseCallbackHandler], job_state: dict @@ -410,17 +414,20 @@ async def arun_on_examples( flush=True, ) + evaluation_handlers: List[EvaluatorCallbackHandler] = [] await _gather_with_concurrency( concurrency_level, functools.partial( _callbacks_initializer, project_name=project_name, client=client_, + evaluation_handler_collector=evaluation_handlers, run_evaluators=run_evaluators or [], ), *(functools.partial(process_example, e) for e in examples), ) - evaluation_handler.wait_for_futures() + for handler in evaluation_handlers: + handler.wait_for_futures() return results @@ -581,10 +588,13 @@ def run_on_examples( results: Dict[str, Any] = {} project_name = _get_project_name(project_name, llm_or_chain_factory, None) client_ = client or LangChainPlusClient() - client_.create_project(project_name, mode="eval") + client_.create_project(project_name) tracer = LangChainTracer(project_name=project_name) + evaluator_project_name = f"{project_name}-evaluators" evalution_handler = EvaluatorCallbackHandler( - evaluators=run_evaluators or [], client=client_ + evaluators=run_evaluators or [], + client=client_, + project_name=evaluator_project_name, ) callbacks: List[BaseCallbackHandler] = [tracer, evalution_handler] for i, example in enumerate(examples):