Source code for langchain_core.tracers.evaluation

"""A tracer that runs evaluators over completed runs."""

from __future__ import annotations

import logging
import threading
import weakref
from concurrent.futures import Future, ThreadPoolExecutor, wait
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, cast
from uuid import UUID

import langsmith
from langsmith.evaluation.evaluator import EvaluationResult, EvaluationResults

from langchain_core.tracers import langchain as langchain_tracer
from langchain_core.tracers.base import BaseTracer
from langchain_core.tracers.context import tracing_v2_enabled
from langchain_core.tracers.langchain import _get_executor
from langchain_core.tracers.schemas import Run

logger = logging.getLogger(__name__)

_TRACERS: weakref.WeakSet[EvaluatorCallbackHandler] = weakref.WeakSet()


[docs]def wait_for_all_evaluators() -> None: """Wait for all tracers to finish.""" global _TRACERS for tracer in list(_TRACERS): if tracer is not None: tracer.wait_for_futures()
[docs]class EvaluatorCallbackHandler(BaseTracer): """Tracer that runs a run evaluator whenever a run is persisted. Parameters ---------- evaluators : Sequence[RunEvaluator] The run evaluators to apply to all top level runs. client : LangSmith Client, optional The LangSmith client instance to use for evaluating the runs. If not specified, a new instance will be created. example_id : Union[UUID, str], optional The example ID to be associated with the runs. project_name : str, optional The LangSmith project name to be organize eval chain runs under. Attributes ---------- example_id : Union[UUID, None] The example ID associated with the runs. client : Client The LangSmith client instance used for evaluating the runs. evaluators : Sequence[RunEvaluator] The sequence of run evaluators to be executed. executor : ThreadPoolExecutor The thread pool executor used for running the evaluators. futures : Set[Future] The set of futures representing the running evaluators. skip_unfinished : bool Whether to skip runs that are not finished or raised an error. project_name : Optional[str] The LangSmith project name to be organize eval chain runs under. """ name: str = "evaluator_callback_handler"
[docs] def __init__( self, evaluators: Sequence[langsmith.RunEvaluator], client: Optional[langsmith.Client] = None, example_id: Optional[Union[UUID, str]] = None, skip_unfinished: bool = True, project_name: Optional[str] = "evaluators", max_concurrency: Optional[int] = None, **kwargs: Any, ) -> None: super().__init__(**kwargs) self.example_id = ( UUID(example_id) if isinstance(example_id, str) else example_id ) self.client = client or langchain_tracer.get_client() self.evaluators = evaluators if max_concurrency is None: self.executor: Optional[ThreadPoolExecutor] = _get_executor() elif max_concurrency > 0: self.executor = ThreadPoolExecutor(max_workers=max_concurrency) weakref.finalize( self, lambda: cast(ThreadPoolExecutor, self.executor).shutdown(wait=True), ) else: self.executor = None self.futures: weakref.WeakSet[Future] = weakref.WeakSet() self.skip_unfinished = skip_unfinished self.project_name = project_name self.logged_eval_results: Dict[Tuple[str, str], List[EvaluationResult]] = {} self.lock = threading.Lock() global _TRACERS _TRACERS.add(self)
def _evaluate_in_project(self, run: Run, evaluator: langsmith.RunEvaluator) -> None: """Evaluate the run in the project. Args: ---------- run : Run The run to be evaluated. evaluator : RunEvaluator The evaluator to use for evaluating the run. """ try: if self.project_name is None: eval_result = self.client.evaluate_run(run, evaluator) eval_results = [eval_result] with tracing_v2_enabled( project_name=self.project_name, tags=["eval"], client=self.client ) as cb: reference_example = ( self.client.read_example(run.reference_example_id) if run.reference_example_id else None ) evaluation_result = evaluator.evaluate_run( # This is subclass, but getting errors for some reason run, # type: ignore example=reference_example, ) eval_results = self._log_evaluation_feedback( evaluation_result, run, source_run_id=cb.latest_run.id if cb.latest_run else None, ) except Exception as e: logger.error( f"Error evaluating run {run.id} with " f"{evaluator.__class__.__name__}: {repr(e)}", exc_info=True, ) raise e example_id = str(run.reference_example_id) with self.lock: for res in eval_results: run_id = ( str(getattr(res, "target_run_id")) if hasattr(res, "target_run_id") else str(run.id) ) self.logged_eval_results.setdefault((run_id, example_id), []).append( res ) def _select_eval_results( self, results: Union[EvaluationResult, EvaluationResults], ) -> List[EvaluationResult]: if isinstance(results, EvaluationResult): results_ = [results] elif isinstance(results, dict) and "results" in results: results_ = cast(List[EvaluationResult], results["results"]) else: raise TypeError( f"Invalid evaluation result type {type(results)}." " Expected EvaluationResult or EvaluationResults." ) return results_ def _log_evaluation_feedback( self, evaluator_response: Union[EvaluationResult, EvaluationResults], run: Run, source_run_id: Optional[UUID] = None, ) -> List[EvaluationResult]: results = self._select_eval_results(evaluator_response) for res in results: source_info_: Dict[str, Any] = {} if res.evaluator_info: source_info_ = {**res.evaluator_info, **source_info_} run_id_ = ( getattr(res, "target_run_id") if hasattr(res, "target_run_id") and res.target_run_id is not None else run.id ) self.client.create_feedback( run_id_, res.key, score=res.score, value=res.value, comment=res.comment, correction=res.correction, source_info=source_info_, source_run_id=res.source_run_id or source_run_id, feedback_source_type=langsmith.schemas.FeedbackSourceType.MODEL, ) return results def _persist_run(self, run: Run) -> None: """Run the evaluator on the run. Args: ---------- run : Run The run to be evaluated. """ if self.skip_unfinished and not run.outputs: logger.debug(f"Skipping unfinished run {run.id}") return run_ = run.copy() run_.reference_example_id = self.example_id for evaluator in self.evaluators: if self.executor is None: self._evaluate_in_project(run_, evaluator) else: self.futures.add( self.executor.submit(self._evaluate_in_project, run_, evaluator) )
[docs] def wait_for_futures(self) -> None: """Wait for all futures to complete.""" wait(self.futures)