| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022 |
- """This module contains the evaluator classes for evaluating runs."""
- from __future__ import annotations
- import asyncio
- import inspect
- import uuid
- from abc import abstractmethod
- from collections.abc import Awaitable, Sequence
- from typing import (
- Any,
- Callable,
- Literal,
- Optional,
- Union,
- cast,
- )
- from typing_extensions import TypedDict
- from langsmith import run_helpers as rh
- from langsmith import schemas
- try:
- from pydantic.v1 import ( # type: ignore[import]
- BaseModel,
- Field,
- ValidationError,
- validator,
- )
- except ImportError:
- from pydantic import ( # type: ignore[assignment]
- BaseModel,
- Field,
- ValidationError,
- validator,
- )
- import logging
- from functools import wraps
- from langsmith.schemas import SCORE_TYPE, VALUE_TYPE, Example, Run
- logger = logging.getLogger(__name__)
- class Category(TypedDict):
- """A category for categorical feedback."""
- value: Optional[Union[float, int]]
- """The numeric score/ordinal corresponding to this category."""
- label: str
- """The label for this category."""
- class FeedbackConfig(TypedDict, total=False):
- """Configuration to define a type of feedback.
- Applied on on the first creation of a `feedback_key`.
- """
- type: Literal["continuous", "categorical", "freeform"]
- """The type of feedback."""
- min: Optional[Union[float, int]]
- """The minimum permitted value (if continuous type)."""
- max: Optional[Union[float, int]]
- """The maximum value permitted value (if continuous type)."""
- categories: Optional[list[Union[Category, dict]]]
- class EvaluationResult(BaseModel):
- """Evaluation result."""
- key: str
- """The aspect, metric name, or label for this evaluation."""
- score: SCORE_TYPE = None
- """The numeric score for this evaluation."""
- value: VALUE_TYPE = None
- """The value for this evaluation, if not numeric."""
- comment: Optional[str] = None
- """An explanation regarding the evaluation."""
- correction: Optional[dict] = None
- """What the correct value should be, if applicable."""
- evaluator_info: dict = Field(default_factory=dict)
- """Additional information about the evaluator."""
- feedback_config: Optional[Union[FeedbackConfig, dict]] = None
- """The configuration used to generate this feedback."""
- source_run_id: Optional[Union[uuid.UUID, str]] = None
- """The ID of the trace of the evaluator itself."""
- target_run_id: Optional[Union[uuid.UUID, str]] = None
- """The ID of the trace this evaluation is applied to.
-
- If none provided, the evaluation feedback is applied to the
- root trace being."""
- extra: Optional[dict] = None
- """Metadata for the evaluator run."""
- class Config:
- """Pydantic model configuration."""
- allow_extra = False
- @validator("value", pre=True)
- def check_value_non_numeric(cls, v, values):
- """Check that the value is not numeric."""
- # If a score isn't provided and the value is numeric
- # it's more likely the user intended use the score field
- if "score" not in values or values["score"] is None:
- if isinstance(v, (int, float)):
- logger.warning(
- "Numeric values should be provided in"
- " the 'score' field, not 'value'."
- f" Got: {v}"
- )
- return v
- class EvaluationResults(TypedDict, total=False):
- """Batch evaluation results.
- This makes it easy for your evaluator to return multiple
- metrics at once.
- """
- results: list[EvaluationResult]
- """The evaluation results."""
- class RunEvaluator:
- """Evaluator interface class."""
- @abstractmethod
- def evaluate_run(
- self,
- run: Run,
- example: Optional[Example] = None,
- evaluator_run_id: Optional[uuid.UUID] = None,
- ) -> Union[EvaluationResult, EvaluationResults]:
- """Evaluate an example."""
- async def aevaluate_run(
- self,
- run: Run,
- example: Optional[Example] = None,
- evaluator_run_id: Optional[uuid.UUID] = None,
- ) -> Union[EvaluationResult, EvaluationResults]:
- """Evaluate an example asynchronously."""
- current_context = rh.get_tracing_context()
- def _run_with_context():
- with rh.tracing_context(**current_context):
- return self.evaluate_run(run, example, evaluator_run_id)
- return await asyncio.get_running_loop().run_in_executor(None, _run_with_context)
- _RUNNABLE_OUTPUT = Union[EvaluationResult, EvaluationResults, dict]
- class ComparisonEvaluationResult(BaseModel):
- """Feedback scores for the results of comparative evaluations.
- These are generated by functions that compare two or more runs,
- returning a ranking or other feedback.
- """
- key: str
- """The aspect, metric name, or label for this evaluation."""
- scores: dict[Union[uuid.UUID, str], SCORE_TYPE]
- """The scores for each run in the comparison."""
- source_run_id: Optional[Union[uuid.UUID, str]] = None
- """The ID of the trace of the evaluator itself."""
- comment: Optional[Union[str, dict[Union[uuid.UUID, str], str]]] = None
- """Comment for the scores. If a string, it's shared across all target runs.
-
- If a `dict`, it maps run IDs to individual comments.
- """
- _COMPARISON_OUTPUT = Union[ComparisonEvaluationResult, dict]
- class DynamicRunEvaluator(RunEvaluator):
- """A dynamic evaluator that wraps a function and transforms it into a `RunEvaluator`.
- This class is designed to be used with the `@run_evaluator` decorator, allowing
- functions that take a `Run` and an optional `Example` as arguments, and return
- an `EvaluationResult` or `EvaluationResults`, to be used as instances of `RunEvaluator`.
- Attributes:
- func (Callable): The function that is wrapped by this evaluator.
- """ # noqa: E501
- def __init__(
- self,
- func: Callable[
- [Run, Optional[Example]],
- Union[_RUNNABLE_OUTPUT, Awaitable[_RUNNABLE_OUTPUT]],
- ],
- # Async function to be used for async evaluation. Optional
- afunc: Optional[
- Callable[
- [Run, Optional[Example]],
- Awaitable[_RUNNABLE_OUTPUT],
- ]
- ] = None,
- ):
- """Initialize the `DynamicRunEvaluator` with a given function.
- Args:
- func (Callable): A function that takes a `Run` and an optional `Example` as
- arguments, and returns a dict or `ComparisonEvaluationResult`.
- """
- (func, prepare_inputs) = _normalize_evaluator_func(func)
- if afunc:
- (afunc, prepare_inputs) = _normalize_evaluator_func(afunc) # type: ignore[assignment]
- def process_inputs(inputs: dict) -> dict:
- if prepare_inputs is None:
- return inputs
- (_, _, traced_inputs) = prepare_inputs(
- inputs.get("run"), inputs.get("example")
- )
- return traced_inputs
- wraps(func)(self)
- from langsmith import run_helpers # type: ignore
- if afunc is not None:
- self.afunc = run_helpers.ensure_traceable(
- afunc, process_inputs=process_inputs
- )
- self._name = getattr(afunc, "__name__", "DynamicRunEvaluator")
- if inspect.iscoroutinefunction(func):
- if afunc is not None:
- raise TypeError(
- "Func was provided as a coroutine function, but afunc was "
- "also provided. If providing both, func should be a regular "
- "function to avoid ambiguity."
- )
- self.afunc = run_helpers.ensure_traceable(
- func, process_inputs=process_inputs
- )
- self._name = getattr(func, "__name__", "DynamicRunEvaluator")
- else:
- self.func = run_helpers.ensure_traceable(
- cast(Callable[[Run, Optional[Example]], _RUNNABLE_OUTPUT], func),
- process_inputs=process_inputs,
- )
- self._name = getattr(func, "__name__", "DynamicRunEvaluator")
- def _coerce_evaluation_result(
- self,
- result: Union[EvaluationResult, dict],
- source_run_id: uuid.UUID,
- allow_no_key: bool = False,
- ) -> EvaluationResult:
- if isinstance(result, EvaluationResult):
- if not result.source_run_id:
- result.source_run_id = source_run_id
- return result
- try:
- if not result:
- raise ValueError(
- "Expected an EvaluationResult object, or dict with a metric"
- f" 'key' and optional 'score'; got empty result: {result}"
- )
- if "key" not in result and allow_no_key:
- result["key"] = self._name
- if all(k not in result for k in ("score", "value", "comment")):
- raise ValueError(
- "Expected an EvaluationResult object, or dict with a metric"
- f" 'key' and optional 'score' or categorical 'value'; got {result}"
- )
- return EvaluationResult(**{"source_run_id": source_run_id, **result})
- except ValidationError as e:
- raise ValueError(
- "Expected an EvaluationResult object, or dict with a metric"
- f" 'key' and optional 'score'; got {result}"
- ) from e
- def _coerce_evaluation_results(
- self,
- results: Union[dict, EvaluationResults],
- source_run_id: uuid.UUID,
- ) -> Union[EvaluationResult, EvaluationResults]:
- if "results" in results:
- cp = results.copy()
- cp["results"] = [
- self._coerce_evaluation_result(r, source_run_id=source_run_id)
- for r in results["results"]
- ]
- return EvaluationResults(**cp)
- return self._coerce_evaluation_result(
- cast(dict, results), source_run_id=source_run_id, allow_no_key=True
- )
- def _format_result(
- self,
- result: Union[
- EvaluationResult, EvaluationResults, dict, str, int, bool, float, list
- ],
- source_run_id: uuid.UUID,
- ) -> Union[EvaluationResult, EvaluationResults]:
- if isinstance(result, EvaluationResult):
- if not result.source_run_id:
- result.source_run_id = source_run_id
- return result
- result = _format_evaluator_result(result)
- return self._coerce_evaluation_results(result, source_run_id)
- @property
- def is_async(self) -> bool:
- """Check if the evaluator function is asynchronous.
- Returns:
- bool: `True` if the evaluator function is asynchronous, `False` otherwise.
- """
- return hasattr(self, "afunc")
- def evaluate_run(
- self,
- run: Run,
- example: Optional[Example] = None,
- evaluator_run_id: Optional[uuid.UUID] = None,
- ) -> Union[EvaluationResult, EvaluationResults]:
- """Evaluate a run using the wrapped function.
- This method directly invokes the wrapped function with the provided arguments.
- Args:
- run (Run): The run to be evaluated.
- example (Optional[Example]): An optional example to be used in the evaluation.
- Returns:
- Union[EvaluationResult, EvaluationResults]: The result of the evaluation.
- """ # noqa: E501
- if not hasattr(self, "func"):
- running_loop = asyncio.get_event_loop()
- if running_loop.is_running():
- raise RuntimeError(
- "Cannot call `evaluate_run` on an async run evaluator from"
- " within an running event loop. Use `aevaluate_run` instead."
- )
- else:
- return running_loop.run_until_complete(self.aevaluate_run(run, example))
- if evaluator_run_id is None:
- evaluator_run_id = uuid.uuid4()
- metadata: dict[str, Any] = {"target_run_id": run.id}
- if getattr(run, "session_id", None):
- metadata["experiment"] = str(run.session_id)
- result = self.func(
- run,
- example,
- langsmith_extra={"run_id": evaluator_run_id, "metadata": metadata},
- )
- return self._format_result(result, evaluator_run_id)
- async def aevaluate_run(
- self,
- run: Run,
- example: Optional[Example] = None,
- evaluator_run_id: Optional[uuid.UUID] = None,
- ):
- """Evaluate a run asynchronously using the wrapped async function.
- This method directly invokes the wrapped async function with the
- provided arguments.
- Args:
- run (Run): The run to be evaluated.
- example (Optional[Example]): An optional example to be used
- in the evaluation.
- Returns:
- Union[EvaluationResult, EvaluationResults]: The result of the evaluation.
- """
- if not hasattr(self, "afunc"):
- return await super().aevaluate_run(run, example)
- if evaluator_run_id is None:
- evaluator_run_id = uuid.uuid4()
- metadata: dict[str, Any] = {"target_run_id": run.id}
- if getattr(run, "session_id", None):
- metadata["experiment"] = str(run.session_id)
- result = await self.afunc(
- run,
- example,
- langsmith_extra={"run_id": evaluator_run_id, "metadata": metadata},
- )
- return self._format_result(result, evaluator_run_id)
- def __call__(
- self, run: Run, example: Optional[Example] = None
- ) -> Union[EvaluationResult, EvaluationResults]:
- """Make the evaluator callable, allowing it to be used like a function.
- This method enables the evaluator instance to be called directly, forwarding the
- call to `evaluate_run`.
- Args:
- run (Run): The run to be evaluated.
- example (Optional[Example]): An optional example to be used in the evaluation.
- Returns:
- Union[EvaluationResult, EvaluationResults]: The result of the evaluation.
- """ # noqa: E501
- return self.evaluate_run(run, example)
- def __repr__(self) -> str:
- """Represent the DynamicRunEvaluator object."""
- return f"<DynamicRunEvaluator {self._name}>"
- def run_evaluator(
- func: Callable[
- [Run, Optional[Example]], Union[_RUNNABLE_OUTPUT, Awaitable[_RUNNABLE_OUTPUT]]
- ],
- ):
- """Create a run evaluator from a function.
- Decorator that transforms a function into a `RunEvaluator`.
- """
- return DynamicRunEvaluator(func)
- _MAXSIZE = 10_000
- def _maxsize_repr(obj: Any):
- s = repr(obj)
- if len(s) > _MAXSIZE:
- s = s[: _MAXSIZE - 4] + "...)"
- return s
- class DynamicComparisonRunEvaluator:
- """Compare predictions (as traces) from 2 or more runs."""
- def __init__(
- self,
- func: Callable[
- [Sequence[Run], Optional[Example]],
- Union[_COMPARISON_OUTPUT, Awaitable[_COMPARISON_OUTPUT]],
- ],
- # Async function to be used for async evaluation. Optional
- afunc: Optional[
- Callable[
- [Sequence[Run], Optional[Example]],
- Awaitable[_COMPARISON_OUTPUT],
- ]
- ] = None,
- ):
- """Initialize the `DynamicRunEvaluator` with a given function.
- Args:
- func (Callable): A function that takes a `Run` and an optional `Example` as
- arguments, and returns an `EvaluationResult` or `EvaluationResults`.
- """
- (func, prepare_inputs) = _normalize_comparison_evaluator_func(func)
- if afunc:
- (afunc, prepare_inputs) = _normalize_comparison_evaluator_func(afunc) # type: ignore[assignment]
- def process_inputs(inputs: dict) -> dict:
- if prepare_inputs is None:
- return inputs
- (_, _, traced_inputs) = prepare_inputs(
- inputs.get("runs"), inputs.get("example")
- )
- return traced_inputs
- wraps(func)(self)
- from langsmith import run_helpers # type: ignore
- if afunc is not None:
- self.afunc = run_helpers.ensure_traceable(
- afunc, process_inputs=process_inputs
- )
- self._name = getattr(afunc, "__name__", "DynamicRunEvaluator")
- if inspect.iscoroutinefunction(func):
- if afunc is not None:
- raise TypeError(
- "Func was provided as a coroutine function, but afunc was "
- "also provided. If providing both, func should be a regular "
- "function to avoid ambiguity."
- )
- self.afunc = run_helpers.ensure_traceable(
- func, process_inputs=process_inputs
- )
- self._name = getattr(func, "__name__", "DynamicRunEvaluator")
- else:
- self.func = run_helpers.ensure_traceable(
- cast(
- Callable[
- [Sequence[Run], Optional[Example]],
- _COMPARISON_OUTPUT,
- ],
- func,
- ),
- process_inputs=process_inputs,
- )
- self._name = getattr(func, "__name__", "DynamicRunEvaluator")
- @property
- def is_async(self) -> bool:
- """Check if the evaluator function is asynchronous.
- Returns:
- bool: `True` if the evaluator function is asynchronous, `False` otherwise.
- """
- return hasattr(self, "afunc")
- def compare_runs(
- self, runs: Sequence[Run], example: Optional[Example] = None
- ) -> ComparisonEvaluationResult:
- """Compare runs to score preferences.
- Args:
- runs: A list of runs to compare.
- example: An optional example to be used in the evaluation.
- """ # noqa: E501
- if not hasattr(self, "func"):
- running_loop = asyncio.get_event_loop()
- if running_loop.is_running():
- raise RuntimeError(
- "Cannot call `evaluate_run` on an async run evaluator from"
- " within an running event loop. Use `aevaluate_run` instead."
- )
- else:
- return running_loop.run_until_complete(
- self.acompare_runs(runs, example)
- )
- source_run_id = uuid.uuid4()
- tags = self._get_tags(runs)
- # TODO: Add metadata for the "comparison experiment" here
- result = self.func(
- runs,
- example,
- langsmith_extra={"run_id": source_run_id, "tags": tags},
- )
- return self._format_results(result, source_run_id, runs)
- async def acompare_runs(
- self, runs: Sequence[Run], example: Optional[Example] = None
- ) -> ComparisonEvaluationResult:
- """Evaluate a run asynchronously using the wrapped async function.
- This method directly invokes the wrapped async function with the
- provided arguments.
- Args:
- runs (Run): The runs to be evaluated.
- example (Optional[Example]): An optional example to be used
- in the evaluation.
- Returns:
- ComparisonEvaluationResult: The result of the evaluation.
- """
- if not hasattr(self, "afunc"):
- return self.compare_runs(runs, example)
- source_run_id = uuid.uuid4()
- tags = self._get_tags(runs)
- # TODO: Add metadata for the "comparison experiment" here
- result = await self.afunc(
- runs,
- example,
- langsmith_extra={"run_id": source_run_id, "tags": tags},
- )
- return self._format_results(result, source_run_id, runs)
- def __call__(
- self, runs: Sequence[Run], example: Optional[Example] = None
- ) -> ComparisonEvaluationResult:
- """Make the evaluator callable, allowing it to be used like a function.
- This method enables the evaluator instance to be called directly, forwarding the
- call to `evaluate_run`.
- Args:
- run (Run): The run to be evaluated.
- example (Optional[Example]): An optional example to be used in the evaluation.
- Returns:
- ComparisonEvaluationResult: The result of the evaluation.
- """ # noqa: E501
- return self.compare_runs(runs, example)
- def __repr__(self) -> str:
- """Represent the DynamicRunEvaluator object."""
- return f"<DynamicComparisonRunEvaluator {self._name}>"
- @staticmethod
- def _get_tags(runs: Sequence[Run]) -> list[str]:
- """Extract tags from runs."""
- # Add tags to support filtering
- tags = []
- for run in runs:
- tags.append("run:" + str(run.id))
- if getattr(run, "session_id", None):
- tags.append("experiment:" + str(run.session_id))
- return tags
- def _format_results(
- self,
- result: Union[dict, list, ComparisonEvaluationResult],
- source_run_id: uuid.UUID,
- runs: Sequence[Run],
- ) -> ComparisonEvaluationResult:
- if isinstance(result, ComparisonEvaluationResult):
- if not result.source_run_id:
- result.source_run_id = source_run_id
- return result
- elif isinstance(result, list):
- result = {
- "scores": {run.id: score for run, score in zip(runs, result)},
- "key": self._name,
- "source_run_id": source_run_id,
- }
- elif isinstance(result, dict):
- if "key" not in result:
- result["key"] = self._name
- else:
- msg = (
- "Expected 'dict', 'list' or 'ComparisonEvaluationResult' result "
- f"object. Received: {result=}"
- )
- raise ValueError(msg)
- try:
- return ComparisonEvaluationResult(
- **{"source_run_id": source_run_id, **result}
- )
- except ValidationError as e:
- raise ValueError(
- f"Expected a dictionary with a 'key' and dictionary of scores mapping"
- "run IDs to numeric scores, or ComparisonEvaluationResult object,"
- f" got {result}"
- ) from e
- def comparison_evaluator(
- func: Callable[
- [Sequence[Run], Optional[Example]],
- Union[_COMPARISON_OUTPUT, Awaitable[_COMPARISON_OUTPUT]],
- ],
- ) -> DynamicComparisonRunEvaluator:
- """Create a comaprison evaluator from a function."""
- return DynamicComparisonRunEvaluator(func)
- def _normalize_evaluator_func(
- func: Callable,
- ) -> tuple[
- Union[
- Callable[[Run, Optional[Example]], _RUNNABLE_OUTPUT],
- Callable[[Run, Optional[Example]], Awaitable[_RUNNABLE_OUTPUT]],
- ],
- Optional[Callable[..., dict]],
- ]:
- supported_args = (
- "run",
- "example",
- "inputs",
- "outputs",
- "reference_outputs",
- "attachments",
- )
- sig = inspect.signature(func)
- all_args = [pname for pname, p in sig.parameters.items() if p.kind != p.VAR_KEYWORD]
- args_with_defaults = [
- pname
- for pname, p in sig.parameters.items()
- if p.default is not inspect.Parameter.empty
- ]
- if not all_args or (
- not all(
- pname in supported_args or pname in args_with_defaults for pname in all_args
- )
- and len([a for a in all_args if a not in args_with_defaults]) != 2
- ):
- msg = (
- f"Invalid evaluator function. Must have at least one "
- f"argument. Supported arguments are {supported_args}. Please "
- f"see https://docs.smith.langchain.com/evaluation/how_to_guides/evaluation/evaluate_llm_application#use-custom-evaluators"
- # noqa: E501
- )
- raise ValueError(msg)
- # For backwards compatibility we assume custom arg names are Run and Example
- # types, respectively.
- elif not all(
- pname in supported_args or pname in args_with_defaults for pname in all_args
- ) or all_args == [
- "run",
- "example",
- ]:
- return func, None
- else:
- if inspect.iscoroutinefunction(func):
- def _prepare_inputs(
- run: Run, example: Optional[Example]
- ) -> tuple[list, dict, dict]:
- arg_map = {
- "run": run,
- "example": example,
- "inputs": example.inputs if example else {},
- "outputs": run.outputs or {},
- "attachments": example.attachments or {} if example else {},
- "reference_outputs": example.outputs or {} if example else {},
- }
- kwargs = {}
- args = []
- traced_inputs = {}
- for param_name, param in sig.parameters.items():
- # Could have params with defaults that are not in the arg map
- if param_name in arg_map:
- if param.kind in (
- param.POSITIONAL_OR_KEYWORD,
- param.POSITIONAL_ONLY,
- ):
- args.append(arg_map[param_name])
- else:
- kwargs[param_name] = arg_map[param_name]
- traced_inputs[param_name] = (
- _maxsize_repr(arg_map[param_name])
- if param_name in ("run", "example")
- else arg_map[param_name]
- )
- return args, kwargs, traced_inputs
- async def awrapper(
- run: Run, example: Optional[Example]
- ) -> _RUNNABLE_OUTPUT:
- (args, kwargs, _) = _prepare_inputs(run, example)
- return await func(*args, **kwargs)
- awrapper.__name__ = (
- getattr(func, "__name__")
- if hasattr(func, "__name__")
- else awrapper.__name__
- )
- return (awrapper, _prepare_inputs) # type: ignore[return-value]
- else:
- def _prepare_inputs(
- run: Run, example: Optional[Example]
- ) -> tuple[list, dict, dict]:
- arg_map = {
- "run": run,
- "example": example,
- "inputs": example.inputs if example else {},
- "outputs": run.outputs or {},
- "attachments": example.attachments or {} if example else {},
- "reference_outputs": example.outputs or {} if example else {},
- }
- kwargs = {}
- args = []
- traced_inputs = {}
- for param_name, param in sig.parameters.items():
- # Could have params with defaults that are not in the arg map
- if param_name in arg_map:
- if param.kind in (
- param.POSITIONAL_OR_KEYWORD,
- param.POSITIONAL_ONLY,
- ):
- args.append(arg_map[param_name])
- else:
- kwargs[param_name] = arg_map[param_name]
- traced_inputs[param_name] = (
- _maxsize_repr(arg_map[param_name])
- if param_name in ("run", "example")
- else arg_map[param_name]
- )
- return args, kwargs, traced_inputs
- def wrapper(run: Run, example: Optional[Example]) -> _RUNNABLE_OUTPUT:
- (args, kwargs, _) = _prepare_inputs(run, example)
- return func(*args, **kwargs)
- wrapper.__name__ = (
- getattr(func, "__name__")
- if hasattr(func, "__name__")
- else wrapper.__name__
- )
- return (wrapper, _prepare_inputs) # type: ignore[return-value]
- def _normalize_comparison_evaluator_func(
- func: Callable,
- ) -> tuple[
- Union[
- Callable[[Sequence[Run], Optional[Example]], _COMPARISON_OUTPUT],
- Callable[[Sequence[Run], Optional[Example]], Awaitable[_COMPARISON_OUTPUT]],
- ],
- Optional[Callable[..., dict]],
- ]:
- supported_args = ("runs", "example", "inputs", "outputs", "reference_outputs")
- sig = inspect.signature(func)
- all_args = [pname for pname, p in sig.parameters.items() if p.kind != p.VAR_KEYWORD]
- args_with_defaults = [
- pname
- for pname, p in sig.parameters.items()
- if p.default is not inspect.Parameter.empty
- ]
- if not all_args or (
- not all(
- pname in supported_args or pname in args_with_defaults for pname in all_args
- )
- and len([a for a in all_args if a not in args_with_defaults]) != 2
- ):
- msg = (
- f"Invalid evaluator function. Must have at least one "
- f"argument. Supported arguments are {supported_args}. Please "
- f"see https://docs.smith.langchain.com/evaluation/how_to_guides/evaluation/evaluate_llm_application#use-custom-evaluators"
- # noqa: E501
- )
- raise ValueError(msg)
- # For backwards compatibility we assume custom arg names are List[Run] and
- # List[Example] types, respectively.
- elif not all(
- pname in supported_args or pname in args_with_defaults for pname in all_args
- ) or all_args == [
- "runs",
- "example",
- ]:
- return func, None
- else:
- if inspect.iscoroutinefunction(func):
- def _prepare_inputs(
- runs: Sequence[Run], example: Optional[Example]
- ) -> tuple[list, dict, dict]:
- arg_map = {
- "runs": runs,
- "example": example,
- "inputs": example.inputs if example else {},
- "outputs": [run.outputs or {} for run in runs],
- "reference_outputs": example.outputs or {} if example else {},
- }
- kwargs = {}
- args = []
- traced_inputs = {}
- for param_name, param in sig.parameters.items():
- # Could have params with defaults that are not in the arg map
- if param_name in arg_map:
- if param.kind in (
- param.POSITIONAL_OR_KEYWORD,
- param.POSITIONAL_ONLY,
- ):
- args.append(arg_map[param_name])
- else:
- kwargs[param_name] = arg_map[param_name]
- traced_inputs[param_name] = (
- _maxsize_repr(arg_map[param_name])
- if param_name in ("runs", "example")
- else arg_map[param_name]
- )
- return args, kwargs, traced_inputs
- async def awrapper(
- runs: Sequence[Run], example: Optional[Example]
- ) -> _COMPARISON_OUTPUT:
- (args, kwargs, _) = _prepare_inputs(runs, example)
- return await func(*args, **kwargs)
- awrapper.__name__ = (
- getattr(func, "__name__")
- if hasattr(func, "__name__")
- else awrapper.__name__
- )
- return awrapper, _prepare_inputs # type: ignore[return-value]
- else:
- def _prepare_inputs(
- runs: Sequence[Run], example: Optional[Example]
- ) -> tuple[list, dict, dict]:
- arg_map = {
- "runs": runs,
- "example": example,
- "inputs": example.inputs if example else {},
- "outputs": [run.outputs or {} for run in runs],
- "reference_outputs": example.outputs or {} if example else {},
- }
- kwargs = {}
- args = []
- traced_inputs = {}
- for param_name, param in sig.parameters.items():
- # Could have params with defaults that are not in the arg map
- if param_name in arg_map:
- if param.kind in (
- param.POSITIONAL_OR_KEYWORD,
- param.POSITIONAL_ONLY,
- ):
- args.append(arg_map[param_name])
- else:
- kwargs[param_name] = arg_map[param_name]
- traced_inputs[param_name] = (
- _maxsize_repr(arg_map[param_name])
- if param_name in ("runs", "example")
- else arg_map[param_name]
- )
- return args, kwargs, traced_inputs
- def wrapper(
- runs: Sequence[Run], example: Optional[Example]
- ) -> _COMPARISON_OUTPUT:
- (args, kwargs, _) = _prepare_inputs(runs, example)
- return func(*args, **kwargs)
- wrapper.__name__ = (
- getattr(func, "__name__")
- if hasattr(func, "__name__")
- else wrapper.__name__
- )
- return wrapper, _prepare_inputs # type: ignore[return-value]
- def _format_evaluator_result(
- result: Union[EvaluationResults, dict, str, int, bool, float, list],
- ) -> Union[EvaluationResults, dict]:
- if isinstance(result, (bool, float, int)):
- result = {"score": result}
- elif not result:
- raise ValueError(
- f"Expected a non-empty dict, str, bool, int, float, list, "
- f"EvaluationResult, or EvaluationResults. Got {result}"
- )
- elif isinstance(result, list):
- if not all(isinstance(x, dict) for x in result):
- raise ValueError(
- f"Expected a list of dicts or EvaluationResults. Received {result}."
- )
- result = {"results": result} # type: ignore[misc]
- elif isinstance(result, str):
- result = {"value": result}
- elif isinstance(result, dict):
- pass
- else:
- raise ValueError(
- f"Expected a dict, str, bool, int, float, list, EvaluationResult, or "
- f"EvaluationResults. Got {result}"
- )
- return result
- SUMMARY_EVALUATOR_T = Union[
- Callable[
- [Sequence[schemas.Run], Sequence[schemas.Example]],
- Union[EvaluationResult, EvaluationResults],
- ],
- Callable[
- [list[schemas.Run], list[schemas.Example]],
- Union[EvaluationResult, EvaluationResults],
- ],
- ]
- def _normalize_summary_evaluator(func: Callable) -> SUMMARY_EVALUATOR_T:
- supported_args = ("runs", "examples", "inputs", "outputs", "reference_outputs")
- sig = inspect.signature(func)
- all_args = [pname for pname, p in sig.parameters.items()]
- args_with_defaults = [
- pname
- for pname, p in sig.parameters.items()
- if p.default is not inspect.Parameter.empty
- ]
- if not all_args or (
- not all(
- pname in supported_args or pname in args_with_defaults for pname in all_args
- )
- and len([a for a in all_args if a not in args_with_defaults]) != 2
- ):
- msg = (
- f"Invalid evaluator function. Must have at least one "
- f"argument. Supported arguments are {supported_args}."
- )
- if all_args:
- msg += f" Received arguments {all_args}."
- raise ValueError(msg)
- # For backwards compatibility we assume custom arg names are Sequence[Run] and
- # Sequence[Example] types, respectively.
- elif not all(pname in supported_args for pname in all_args) or all_args == [
- "runs",
- "examples",
- ]:
- return func
- else:
- def wrapper(
- runs: Sequence[schemas.Run], examples: Sequence[schemas.Example]
- ) -> Union[EvaluationResult, EvaluationResults]:
- arg_map = {
- "runs": runs,
- "examples": examples,
- "inputs": [example.inputs for example in examples],
- "outputs": [run.outputs or {} for run in runs],
- "reference_outputs": [example.outputs or {} for example in examples],
- }
- kwargs = {}
- args = []
- for param_name, param in sig.parameters.items():
- # Could have params with defaults that are not in the arg map
- if param_name in arg_map:
- if param.kind in (
- param.POSITIONAL_OR_KEYWORD,
- param.POSITIONAL_ONLY,
- ):
- args.append(arg_map[param_name])
- else:
- kwargs[param_name] = arg_map[param_name]
- result = func(*args, **kwargs)
- if isinstance(result, EvaluationResult):
- return result
- return _format_evaluator_result(result) # type: ignore
- wrapper.__name__ = (
- getattr(func, "__name__") if hasattr(func, "__name__") else wrapper.__name__
- )
- return wrapper # type: ignore[return-value]
|