| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314 |
- """V2 Evaluation Interface."""
- from __future__ import annotations
- import ast
- import collections
- import concurrent.futures as cf
- import functools
- import inspect
- import io
- import itertools
- import logging
- import pathlib
- import queue
- import random
- import textwrap
- import threading
- import uuid
- from collections.abc import Awaitable, Generator, Iterable, Iterator, Sequence
- from contextvars import copy_context
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Literal,
- Optional,
- TypeVar,
- Union,
- cast,
- )
- from typing_extensions import TypedDict, overload
- import langsmith
- from langsmith import env as ls_env
- from langsmith import run_helpers as rh
- from langsmith import run_trees as rt
- from langsmith import schemas
- from langsmith import utils as ls_utils
- from langsmith._internal._beta_decorator import _warn_once
- from langsmith.evaluation.evaluator import (
- SUMMARY_EVALUATOR_T,
- ComparisonEvaluationResult,
- DynamicComparisonRunEvaluator,
- DynamicRunEvaluator,
- EvaluationResult,
- EvaluationResults,
- RunEvaluator,
- _normalize_summary_evaluator,
- comparison_evaluator,
- run_evaluator,
- )
- from langsmith.evaluation.integrations import LangChainStringEvaluator
- if TYPE_CHECKING:
- import pandas as pd
- from langchain_core.runnables import Runnable
- DataFrame = pd.DataFrame
- else:
- DataFrame = Any
- logger = logging.getLogger(__name__)
- TARGET_T = Union[Callable[[dict], dict], Callable[[dict, dict], dict]]
- # Data format: dataset-name, dataset_id, or examples
- DATA_T = Union[str, uuid.UUID, Iterable[schemas.Example], schemas.Dataset]
- # Summary evaluator runs over the whole dataset
- # and reports aggregate metric(s)
- # Row-level evaluator
- EVALUATOR_T = Union[
- RunEvaluator,
- Callable[
- [schemas.Run, Optional[schemas.Example]],
- Union[EvaluationResult, EvaluationResults],
- ],
- Callable[..., Union[dict, EvaluationResults, EvaluationResult]],
- ]
- AEVALUATOR_T = Union[
- Callable[
- [schemas.Run, Optional[schemas.Example]],
- Awaitable[Union[EvaluationResult, EvaluationResults]],
- ],
- ]
- EXPERIMENT_T = Union[str, uuid.UUID, schemas.TracerSession]
- @overload
- def evaluate(
- target: Union[TARGET_T, Runnable, EXPERIMENT_T],
- /,
- data: Optional[DATA_T] = None,
- evaluators: Optional[Sequence[EVALUATOR_T]] = None,
- summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
- metadata: Optional[dict] = None,
- experiment_prefix: Optional[str] = None,
- description: Optional[str] = None,
- max_concurrency: Optional[int] = 0,
- num_repetitions: int = 1,
- client: Optional[langsmith.Client] = None,
- blocking: bool = True,
- experiment: Optional[EXPERIMENT_T] = None,
- upload_results: bool = True,
- **kwargs: Any,
- ) -> ExperimentResults: ...
- @overload
- def evaluate(
- target: Union[tuple[EXPERIMENT_T, EXPERIMENT_T]],
- /,
- data: Optional[DATA_T] = None,
- evaluators: Optional[Sequence[COMPARATIVE_EVALUATOR_T]] = None,
- summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
- metadata: Optional[dict] = None,
- experiment_prefix: Optional[str] = None,
- description: Optional[str] = None,
- max_concurrency: Optional[int] = 0,
- num_repetitions: int = 1,
- client: Optional[langsmith.Client] = None,
- blocking: bool = True,
- experiment: Optional[EXPERIMENT_T] = None,
- upload_results: bool = True,
- **kwargs: Any,
- ) -> ComparativeExperimentResults: ...
- def evaluate(
- target: Union[TARGET_T, Runnable, EXPERIMENT_T, tuple[EXPERIMENT_T, EXPERIMENT_T]],
- /,
- data: Optional[DATA_T] = None,
- evaluators: Optional[
- Union[Sequence[EVALUATOR_T], Sequence[COMPARATIVE_EVALUATOR_T]]
- ] = None,
- summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
- metadata: Optional[dict] = None,
- experiment_prefix: Optional[str] = None,
- description: Optional[str] = None,
- max_concurrency: Optional[int] = 0,
- num_repetitions: int = 1,
- client: Optional[langsmith.Client] = None,
- blocking: bool = True,
- experiment: Optional[EXPERIMENT_T] = None,
- upload_results: bool = True,
- error_handling: Literal["log", "ignore"] = "log",
- **kwargs: Any,
- ) -> Union[ExperimentResults, ComparativeExperimentResults]:
- r"""Evaluate a target system on a given dataset.
- Args:
- target (TARGET_T | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]):
- The target system or experiment(s) to evaluate.
- Can be a function that takes a dict and returns a `dict`, a langchain `Runnable`, an
- existing experiment ID, or a two-tuple of experiment IDs.
- data (DATA_T): The dataset to evaluate on.
- Can be a dataset name, a list of examples, or a generator of examples.
- evaluators (Sequence[EVALUATOR_T] | Sequence[COMPARATIVE_EVALUATOR_T] | None):
- A list of evaluators to run on each example. The evaluator signature
- depends on the target type.
- summary_evaluators (Sequence[SUMMARY_EVALUATOR_T] | None): A list of summary
- evaluators to run on the entire dataset.
- Should not be specified if comparing two existing experiments.
- metadata (dict | None): Metadata to attach to the experiment.
- experiment_prefix (str | None): A prefix to provide for your experiment name.
- description (str | None): A free-form text description for the experiment.
- max_concurrency (int | None): The maximum number of concurrent
- evaluations to run.
- If `None` then no limit is set. If `0` then no concurrency.
- client (langsmith.Client | None): The LangSmith client to use.
- blocking (bool): Whether to block until the evaluation is complete.
- num_repetitions (int): The number of times to run the evaluation.
- Each item in the dataset will be run and evaluated this many times.
- experiment (schemas.TracerSession | None): An existing experiment to
- extend.
- If provided, `experiment_prefix` is ignored.
- For advanced usage only. Should not be specified if target is an existing
- experiment or two-tuple fo experiments.
- error_handling (str, default="log"): How to handle individual run errors.
- `'log'` will trace the runs with the error message as part of the
- experiment, `'ignore'` will not count the run as part of the experiment at
- all.
- Returns:
- ExperimentResults: If target is a function, `Runnable`, or existing experiment.
- ComparativeExperimentResults: If target is a two-tuple of existing experiments.
- Examples:
- Prepare the dataset:
- >>> from typing import Sequence
- >>> from langsmith import Client
- >>> from langsmith.evaluation import evaluate
- >>> from langsmith.schemas import Example, Run
- >>> client = Client()
- >>> dataset = client.clone_public_dataset(
- ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
- ... )
- >>> dataset_name = "Evaluate Examples"
- Basic usage:
- >>> def accuracy(run: Run, example: Example):
- ... # Row-level evaluator for accuracy.
- ... pred = run.outputs["output"]
- ... expected = example.outputs["answer"]
- ... return {"score": expected.lower() == pred.lower()}
- >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
- ... # Experiment-level evaluator for precision.
- ... # TP / (TP + FP)
- ... predictions = [run.outputs["output"].lower() for run in runs]
- ... expected = [example.outputs["answer"].lower() for example in examples]
- ... # yes and no are the only possible answers
- ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
- ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
- ... return {"score": tp / (tp + fp)}
- >>> def predict(inputs: dict) -> dict:
- ... # This can be any function or just an API call to your app.
- ... return {"output": "Yes"}
- >>> results = evaluate(
- ... predict,
- ... data=dataset_name,
- ... evaluators=[accuracy],
- ... summary_evaluators=[precision],
- ... experiment_prefix="My Experiment",
- ... description="Evaluating the accuracy of a simple prediction model.",
- ... metadata={
- ... "my-prompt-version": "abcd-1234",
- ... },
- ... ) # doctest: +ELLIPSIS
- View the evaluation results for experiment:...
- Evaluating over only a subset of the examples
- >>> experiment_name = results.experiment_name
- >>> examples = client.list_examples(dataset_name=dataset_name, limit=5)
- >>> results = evaluate(
- ... predict,
- ... data=examples,
- ... evaluators=[accuracy],
- ... summary_evaluators=[precision],
- ... experiment_prefix="My Experiment",
- ... description="Just testing a subset synchronously.",
- ... ) # doctest: +ELLIPSIS
- View the evaluation results for experiment:...
- Streaming each prediction to more easily + eagerly debug.
- >>> results = evaluate(
- ... predict,
- ... data=dataset_name,
- ... evaluators=[accuracy],
- ... summary_evaluators=[precision],
- ... description="I don't even have to block!",
- ... blocking=False,
- ... ) # doctest: +ELLIPSIS
- View the evaluation results for experiment:...
- >>> for i, result in enumerate(results): # doctest: +ELLIPSIS
- ... pass
- Using the `evaluate` API with an off-the-shelf LangChain evaluator:
- >>> from langsmith.evaluation import LangChainStringEvaluator # doctest: +SKIP
- >>> from langchain_openai import ChatOpenAI # doctest: +SKIP
- >>> def prepare_criteria_data(run: Run, example: Example): # doctest: +SKIP
- ... return {
- ... "prediction": run.outputs["output"],
- ... "reference": example.outputs["answer"],
- ... "input": str(example.inputs),
- ... }
- >>> results = evaluate( # doctest: +SKIP
- ... predict,
- ... data=dataset_name,
- ... evaluators=[
- ... accuracy,
- ... LangChainStringEvaluator("embedding_distance"),
- ... LangChainStringEvaluator(
- ... "labeled_criteria",
- ... config={
- ... "criteria": {
- ... "usefulness": "The prediction is useful if it is correct"
- ... " and/or asks a useful followup question."
- ... },
- ... "llm": ChatOpenAI(model="gpt-4o"),
- ... },
- ... prepare_data=prepare_criteria_data,
- ... ),
- ... ],
- ... description="Evaluating with off-the-shelf LangChain evaluators.",
- ... summary_evaluators=[precision],
- ... )
- View the evaluation results for experiment:... # doctest: +SKIP
- Evaluating a LangChain object:
- >>> from langchain_core.runnables import chain as as_runnable
- >>> @as_runnable
- ... def nested_predict(inputs):
- ... return {"output": "Yes"}
- >>> @as_runnable
- ... def lc_predict(inputs):
- ... return nested_predict.invoke(inputs)
- >>> results = evaluate(
- ... lc_predict.invoke,
- ... data=dataset_name,
- ... evaluators=[accuracy],
- ... description="This time we're evaluating a LangChain object.",
- ... summary_evaluators=[precision],
- ... ) # doctest: +ELLIPSIS
- View the evaluation results for experiment:...
- !!! warning "Behavior changed in `langsmith` 0.2.0"
- 'max_concurrency' default updated from None (no limit on concurrency)
- to 0 (no concurrency at all).
- """ # noqa: E501
- if isinstance(target, (str, uuid.UUID, schemas.TracerSession)):
- invalid_args = {
- "num_repetitions": num_repetitions > 1,
- "experiment": bool(experiment),
- "upload_results": not upload_results,
- "experiment_prefix": bool(experiment_prefix),
- "data": bool(data),
- }
- if any(invalid_args.values()):
- msg = (
- f"Received invalid arguments. "
- f"{tuple(k for k, v in invalid_args.items() if v)} should not be "
- f"specified when target is an existing experiment."
- )
- raise ValueError(msg)
- target_id = target if isinstance(target, (str, uuid.UUID)) else target.id
- logger.debug(f"Running evaluation over existing experiment {target_id}...")
- return evaluate_existing(
- target,
- evaluators=cast(Optional[Sequence[EVALUATOR_T]], evaluators),
- summary_evaluators=summary_evaluators,
- metadata=metadata,
- max_concurrency=max_concurrency,
- client=client,
- blocking=blocking,
- **kwargs,
- )
- elif isinstance(target, (list, tuple)):
- invalid_args = {
- "num_repetitions": num_repetitions > 1,
- "experiment": bool(experiment),
- "upload_results": not upload_results,
- "summary_evaluators": bool(summary_evaluators),
- "data": bool(data),
- }
- if len(target) != 2 or not all(
- isinstance(t, (str, uuid.UUID, schemas.TracerSession)) for t in target
- ):
- msg = (
- "Received invalid target. If a tuple is specified it must have length "
- "2 and each element should by the ID or schemas.TracerSession of an "
- f"existing experiment. Received {target=}"
- )
- raise ValueError(msg)
- elif any(invalid_args.values()):
- msg = (
- f"Received invalid arguments. "
- f"{tuple(k for k, v in invalid_args.items() if v)} should not be "
- f"specified when target is two existing experiments."
- )
- raise ValueError(msg)
- if max_concurrency is not None:
- kwargs["max_concurrency"] = max_concurrency
- target_ids = [t if isinstance(t, (str, uuid.UUID)) else t.id for t in target]
- logger.debug(
- f"Running pairwise evaluation over existing experiments {target_ids}..."
- )
- return evaluate_comparative(
- target,
- evaluators=cast(Sequence[COMPARATIVE_EVALUATOR_T], evaluators or ()),
- experiment_prefix=experiment_prefix,
- description=description,
- client=client,
- metadata=metadata,
- **kwargs,
- )
- elif kwargs:
- msg = (
- f"Received unsupported arguments {kwargs}. These arguments are not "
- f"supported when creating a new experiment."
- )
- raise ValueError(msg)
- elif not data:
- msg = "Must specify 'data' when running evaluations over a target function."
- raise ValueError(msg)
- elif callable(target) and rh.is_async(target):
- msg = (
- "Async functions are not supported by `evaluate`. "
- "Please use `aevaluate` instead:\n\n"
- "from langsmith import aevaluate\n\n"
- "await aevaluate(\n"
- " async_target_function,\n"
- " data=data,\n"
- " evaluators=evaluators,\n"
- " # ... other parameters\n"
- ")"
- )
- raise ValueError(msg)
- elif experiment and experiment_prefix:
- msg = (
- "Expected at most one of 'experiment' or 'experiment_prefix',"
- " but both were provided. "
- f"Got: experiment={experiment}, experiment_prefix={experiment_prefix}"
- )
- raise ValueError(msg)
- else:
- if not upload_results:
- _warn_once("'upload_results' parameter is in beta.")
- logger.debug(f"Running evaluation over target system {target}...")
- return _evaluate(
- target,
- data=data,
- evaluators=cast(Optional[Sequence[EVALUATOR_T]], evaluators),
- summary_evaluators=summary_evaluators,
- metadata=metadata,
- experiment_prefix=experiment_prefix,
- description=description,
- max_concurrency=max_concurrency,
- num_repetitions=num_repetitions,
- client=client,
- blocking=blocking,
- experiment=experiment,
- upload_results=upload_results,
- error_handling=error_handling,
- )
- def evaluate_existing(
- experiment: Union[str, uuid.UUID, schemas.TracerSession],
- /,
- evaluators: Optional[Sequence[EVALUATOR_T]] = None,
- summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
- metadata: Optional[dict] = None,
- max_concurrency: Optional[int] = 0,
- client: Optional[langsmith.Client] = None,
- load_nested: bool = False,
- blocking: bool = True,
- ) -> ExperimentResults:
- r"""Evaluate existing experiment runs.
- Args:
- experiment (Union[str, uuid.UUID]): The identifier of the experiment to evaluate.
- evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation.
- summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): Optional sequence of evaluators
- to apply over the entire dataset.
- metadata (Optional[dict]): Optional metadata to include in the evaluation results.
- max_concurrency (int | None): The maximum number of concurrent
- evaluations to run.
- If `None` then no limit is set. If `0` then no concurrency.
- client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation.
- load_nested: Whether to load all child runs for the experiment.
- Default is to only load the top-level root runs.
- blocking (bool): Whether to block until evaluation is complete.
- Returns:
- The evaluation results.
- Environment:
- - `LANGSMITH_TEST_CACHE`: If set, API calls will be cached to disk to save time and
- cost during testing.
- Recommended to commit the cache files to your repository for faster CI/CD runs.
- Requires the `'langsmith[vcr]'` package to be installed.
- Examples:
- Define your evaluators
- >>> from typing import Sequence
- >>> from langsmith.schemas import Example, Run
- >>> def accuracy(run: Run, example: Example):
- ... # Row-level evaluator for accuracy.
- ... pred = run.outputs["output"]
- ... expected = example.outputs["answer"]
- ... return {"score": expected.lower() == pred.lower()}
- >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
- ... # Experiment-level evaluator for precision.
- ... # TP / (TP + FP)
- ... predictions = [run.outputs["output"].lower() for run in runs]
- ... expected = [example.outputs["answer"].lower() for example in examples]
- ... # yes and no are the only possible answers
- ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
- ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
- ... return {"score": tp / (tp + fp)}
- Load the experiment and run the evaluation.
- >>> import uuid
- >>> from langsmith import Client
- >>> from langsmith.evaluation import evaluate, evaluate_existing
- >>> client = Client()
- >>> dataset_name = "__doctest_evaluate_existing_" + uuid.uuid4().hex[:8]
- >>> dataset = client.create_dataset(dataset_name)
- >>> example = client.create_example(
- ... inputs={"question": "What is 2+2?"},
- ... outputs={"answer": "4"},
- ... dataset_id=dataset.id,
- ... )
- >>> def predict(inputs: dict) -> dict:
- ... return {"output": "4"}
- >>> # First run inference on the dataset
- ... results = evaluate(
- ... predict, data=dataset_name, experiment_prefix="doctest_experiment"
- ... ) # doctest: +ELLIPSIS
- View the evaluation results for experiment:...
- >>> experiment_id = results.experiment_name
- >>> # Wait for the experiment to be fully processed and check if we have results
- >>> len(results) > 0
- True
- >>> import time
- >>> time.sleep(2)
- >>> results = evaluate_existing(
- ... experiment_id,
- ... evaluators=[accuracy],
- ... summary_evaluators=[precision],
- ... ) # doctest: +ELLIPSIS
- View the evaluation results for experiment:...
- >>> client.delete_dataset(dataset_id=dataset.id)
- """ # noqa: E501
- client = client or rt.get_cached_client(timeout_ms=(20_000, 90_001))
- project = _load_experiment(experiment, client)
- runs = _load_traces(experiment, client, load_nested=load_nested)
- data_map = _load_examples_map(client, project)
- data = [data_map[cast(uuid.UUID, run.reference_example_id)] for run in runs]
- return _evaluate(
- runs,
- data=data,
- evaluators=evaluators,
- summary_evaluators=summary_evaluators,
- metadata=metadata,
- max_concurrency=max_concurrency,
- client=client,
- blocking=blocking,
- experiment=project,
- )
- class ExperimentResultRow(TypedDict):
- run: schemas.Run
- example: schemas.Example
- evaluation_results: EvaluationResults
- class ExperimentResults:
- """Represents the results of an evaluate() call.
- This class provides an iterator interface to iterate over the experiment results
- as they become available. It also provides methods to access the experiment name,
- the number of results, and to wait for the results to be processed.
- Methods:
- experiment_name() -> str: Returns the name of the experiment.
- wait() -> None: Waits for the experiment data to be processed.
- """
- def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True):
- self._manager = experiment_manager
- self._results: list[ExperimentResultRow] = []
- self._queue: queue.Queue[ExperimentResultRow] = queue.Queue()
- self._processing_complete = threading.Event()
- if not blocking:
- self._thread: Optional[threading.Thread] = threading.Thread(
- target=self._process_data
- )
- self._thread.start()
- else:
- self._thread = None
- self._process_data()
- @property
- def experiment_name(self) -> str:
- return self._manager.experiment_name
- def __iter__(self) -> Iterator[ExperimentResultRow]:
- ix = 0
- while (
- not self._processing_complete.is_set()
- or not self._queue.empty()
- or ix < len(self._results)
- ):
- try:
- if ix < len(self._results):
- yield self._results[ix]
- ix += 1
- else:
- self._queue.get(block=True, timeout=0.1)
- except queue.Empty:
- continue
- def _process_data(self) -> None:
- tqdm = _load_tqdm()
- results = self._manager.get_results()
- for item in tqdm(results):
- self._queue.put(item)
- self._results.append(item)
- summary_scores = self._manager.get_summary_scores()
- self._summary_results = summary_scores
- self._processing_complete.set()
- def __len__(self) -> int:
- return len(self._results)
- def to_pandas(
- self, start: Optional[int] = 0, end: Optional[int] = None
- ) -> DataFrame:
- return _to_pandas(self._results, start=start, end=end)
- def _repr_html_(self) -> str:
- import importlib.util
- if self._results and importlib.util.find_spec("pandas"):
- df = self.to_pandas()
- return df._repr_html_() # type: ignore[operator]
- else:
- return self.__repr__()
- def __repr__(self) -> str:
- return f"<ExperimentResults {self.experiment_name}>"
- def wait(self) -> None:
- """Wait for the evaluation runner to complete.
- This method blocks the current thread until the evaluation runner has
- finished its execution.
- """
- if self._thread:
- self._thread.join()
- ## Public API for Comparison Experiments
- # Row-level evaluator
- COMPARATIVE_EVALUATOR_T = Callable[
- [Sequence[schemas.Run], Optional[schemas.Example]],
- Union[
- Union[ComparisonEvaluationResult, dict],
- Awaitable[Union[ComparisonEvaluationResult, dict]],
- ],
- ]
- def evaluate_comparative(
- experiments: tuple[EXPERIMENT_T, EXPERIMENT_T],
- /,
- evaluators: Sequence[COMPARATIVE_EVALUATOR_T],
- experiment_prefix: Optional[str] = None,
- description: Optional[str] = None,
- max_concurrency: int = 5,
- client: Optional[langsmith.Client] = None,
- metadata: Optional[dict] = None,
- load_nested: bool = False,
- randomize_order: bool = False,
- ) -> ComparativeExperimentResults:
- r"""Evaluate existing experiment runs against each other.
- This lets you use pairwise preference scoring to generate more
- reliable feedback in your experiments.
- Args:
- experiments (Tuple[Union[str, uuid.UUID], Union[str, uuid.UUID]]):
- The identifiers of the experiments to compare.
- evaluators (Sequence[COMPARATIVE_EVALUATOR_T]):
- A list of evaluators to run on each example.
- experiment_prefix (Optional[str]): A prefix to provide for your experiment name.
- description (Optional[str]): A free-form text description for the experiment.
- max_concurrency (int): The maximum number of concurrent evaluations to run.
- client (Optional[langsmith.Client]): The LangSmith client to use.
- metadata (Optional[dict]): Metadata to attach to the experiment.
- load_nested (bool): Whether to load all child runs for the experiment.
- Default is to only load the top-level root runs.
- randomize_order (bool): Whether to randomize the order of the outputs for each evaluation.
- Returns:
- The results of the comparative evaluation.
- Examples:
- Suppose you want to compare two prompts to see which one is more effective.
- You would first prepare your dataset:
- >>> from typing import Sequence
- >>> from langsmith import Client
- >>> from langsmith.evaluation import evaluate
- >>> from langsmith.schemas import Example, Run
- >>> client = Client()
- >>> dataset = client.clone_public_dataset(
- ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
- ... )
- >>> dataset_name = "Evaluate Examples"
- Then you would run your different prompts:
- >>> import functools
- >>> import openai
- >>> from langsmith.evaluation import evaluate
- >>> from langsmith.wrappers import wrap_openai
- >>> oai_client = openai.Client()
- >>> wrapped_client = wrap_openai(oai_client)
- >>> prompt_1 = "You are a helpful assistant."
- >>> prompt_2 = "You are an exceedingly helpful assistant."
- >>> def predict(inputs: dict, prompt: str) -> dict:
- ... completion = wrapped_client.chat.completions.create(
- ... model="gpt-4o-mini",
- ... messages=[
- ... {"role": "system", "content": prompt},
- ... {
- ... "role": "user",
- ... "content": f"Context: {inputs['context']}"
- ... f"\n\ninputs['question']",
- ... },
- ... ],
- ... )
- ... return {"output": completion.choices[0].message.content}
- >>> results_1 = evaluate(
- ... functools.partial(predict, prompt=prompt_1),
- ... data=dataset_name,
- ... description="Evaluating our basic system prompt.",
- ... blocking=False, # Run these experiments in parallel
- ... ) # doctest: +ELLIPSIS
- View the evaluation results for experiment:...
- >>> results_2 = evaluate(
- ... functools.partial(predict, prompt=prompt_2),
- ... data=dataset_name,
- ... description="Evaluating our advanced system prompt.",
- ... blocking=False,
- ... ) # doctest: +ELLIPSIS
- View the evaluation results for experiment:...
- >>> results_1.wait()
- >>> results_2.wait()
- Finally, you would compare the two prompts directly:
- >>> import json
- >>> from langsmith.evaluation import evaluate_comparative
- >>> from langsmith import schemas
- >>> def score_preferences(runs: list, example: schemas.Example):
- ... assert len(runs) == 2 # Comparing 2 systems
- ... assert isinstance(example, schemas.Example)
- ... assert all(run.reference_example_id == example.id for run in runs)
- ... pred_a = runs[0].outputs["output"] if runs[0].outputs else ""
- ... pred_b = runs[1].outputs["output"] if runs[1].outputs else ""
- ... ground_truth = example.outputs["answer"] if example.outputs else ""
- ... tools = [
- ... {
- ... "type": "function",
- ... "function": {
- ... "name": "rank_preferences",
- ... "description": "Saves the prefered response ('A' or 'B')",
- ... "parameters": {
- ... "type": "object",
- ... "properties": {
- ... "reasoning": {
- ... "type": "string",
- ... "description": "The reasoning behind the choice.",
- ... },
- ... "preferred_option": {
- ... "type": "string",
- ... "enum": ["A", "B"],
- ... "description": "The preferred option, either 'A' or 'B'",
- ... },
- ... },
- ... "required": ["preferred_option"],
- ... },
- ... },
- ... }
- ... ]
- ... completion = openai.Client().chat.completions.create(
- ... model="gpt-4o-mini",
- ... messages=[
- ... {"role": "system", "content": "Select the better response."},
- ... {
- ... "role": "user",
- ... "content": f"Option A: {pred_a}"
- ... f"\n\nOption B: {pred_b}"
- ... f"\n\nGround Truth: {ground_truth}",
- ... },
- ... ],
- ... tools=tools,
- ... tool_choice={
- ... "type": "function",
- ... "function": {"name": "rank_preferences"},
- ... },
- ... )
- ... tool_args = completion.choices[0].message.tool_calls[0].function.arguments
- ... loaded_args = json.loads(tool_args)
- ... preference = loaded_args["preferred_option"]
- ... comment = loaded_args["reasoning"]
- ... if preference == "A":
- ... return {
- ... "key": "ranked_preference",
- ... "scores": {runs[0].id: 1, runs[1].id: 0},
- ... "comment": comment,
- ... }
- ... else:
- ... return {
- ... "key": "ranked_preference",
- ... "scores": {runs[0].id: 0, runs[1].id: 1},
- ... "comment": comment,
- ... }
- >>> def score_length_difference(runs: list, example: schemas.Example):
- ... # Just return whichever response is longer.
- ... # Just an example, not actually useful in real life.
- ... assert len(runs) == 2 # Comparing 2 systems
- ... assert isinstance(example, schemas.Example)
- ... assert all(run.reference_example_id == example.id for run in runs)
- ... pred_a = runs[0].outputs["output"] if runs[0].outputs else ""
- ... pred_b = runs[1].outputs["output"] if runs[1].outputs else ""
- ... if len(pred_a) > len(pred_b):
- ... return {
- ... "key": "length_difference",
- ... "scores": {runs[0].id: 1, runs[1].id: 0},
- ... }
- ... else:
- ... return {
- ... "key": "length_difference",
- ... "scores": {runs[0].id: 0, runs[1].id: 1},
- ... }
- >>> results = evaluate_comparative(
- ... [results_1.experiment_name, results_2.experiment_name],
- ... evaluators=[score_preferences, score_length_difference],
- ... client=client,
- ... ) # doctest: +ELLIPSIS
- View the pairwise evaluation results at:...
- >>> eval_results = list(results)
- >>> assert len(eval_results) >= 10 # doctest: +SKIP
- >>> assert all(
- ... "feedback.ranked_preference" in r["evaluation_results"]
- ... for r in eval_results
- ... ) # doctest: +SKIP
- >>> assert all(
- ... "feedback.length_difference" in r["evaluation_results"]
- ... for r in eval_results
- ... ) # doctest: +SKIP
- """ # noqa: E501
- if len(experiments) < 2:
- raise ValueError("Comparative evaluation requires at least 2 experiments.")
- if not evaluators:
- raise ValueError(
- "At least one evaluator is required for comparative evaluation."
- )
- if max_concurrency < 0:
- raise ValueError("max_concurrency must be a positive integer.")
- client = client or rt.get_cached_client()
- # TODO: Add information about comparison experiments
- projects = [_load_experiment(experiment, client) for experiment in experiments]
- ref_datasets_ = [str(p.reference_dataset_id) for p in projects]
- if not len(set(ref_datasets_)) == 1:
- raise ValueError("All experiments must have the same reference dataset.")
- experiment_ids = [p.id for p in projects]
- if experiment_prefix is None:
- experiment_names = [p.name for p in projects if p.name is not None]
- experiment_name = (
- " vs. ".join(experiment_names) + "-" + str(uuid.uuid4().hex[:4])
- )
- else:
- experiment_name = experiment_prefix + "-" + str(uuid.uuid4().hex[:8])
- comparative_experiment_id = uuid.uuid4()
- comparative_experiment = client.create_comparative_experiment(
- experiment_name,
- experiments=experiment_ids,
- description=description,
- metadata=metadata,
- id=comparative_experiment_id,
- )
- _print_comparative_experiment_start(
- cast(
- tuple[schemas.TracerSessionResult, schemas.TracerSessionResult],
- tuple(projects),
- ),
- comparative_experiment,
- )
- runs = [
- _load_traces(experiment, client, load_nested=load_nested)
- for experiment in experiments
- ]
- # Only check intersections for the experiments
- examples_intersection = None
- for runs_list in runs:
- example_ids_set = {run.reference_example_id for run in runs_list}
- if examples_intersection is None:
- examples_intersection = example_ids_set
- else:
- examples_intersection &= example_ids_set
- example_ids_nullable = (
- list(examples_intersection) if examples_intersection is not None else []
- )
- example_ids = [eid for eid in example_ids_nullable if eid is not None]
- # TODO: Warn if different dataset versions, etc. are used in the different
- # experiments. We aren't providing any training wheels here.
- batch_size = 99
- data = {}
- for i in range(0, len(example_ids), batch_size):
- example_ids_batch = example_ids[i : i + batch_size]
- for e in client.list_examples(
- dataset_id=projects[0].reference_dataset_id,
- as_of=projects[0].metadata.get("dataset_version"),
- example_ids=example_ids_batch,
- ):
- data[e.id] = e
- runs_dict: dict[uuid.UUID, list[schemas.Run]] = collections.defaultdict(list)
- for runs_list in runs:
- for run in runs_list:
- if run.reference_example_id in data:
- runs_dict[cast(uuid.UUID, run.reference_example_id)].append(run)
- comparators = [comparison_evaluator(evaluator) for evaluator in evaluators or []]
- results: dict = {}
- def evaluate_and_submit_feedback(
- runs_list: list[schemas.Run],
- example: schemas.Example,
- comparator: DynamicComparisonRunEvaluator,
- executor: cf.Executor,
- ) -> tuple[uuid.UUID, ComparisonEvaluationResult]:
- feedback_group_id = uuid.uuid4()
- if randomize_order:
- random.shuffle(runs_list)
- with rh.tracing_context(project_name="evaluators", client=client):
- result = comparator.compare_runs(runs_list, example)
- if client is None:
- raise ValueError("Client is required to submit feedback.")
- comments = (
- {str(rid): result.comment for rid in result.scores}
- if isinstance(result.comment, str)
- else (result.comment or {})
- )
- for run_id, score in result.scores.items():
- executor.submit(
- client.create_feedback,
- run_id=run_id,
- key=result.key,
- score=score,
- comment=comments.get(str(run_id)),
- comparative_experiment_id=comparative_experiment.id,
- source_run_id=result.source_run_id,
- feedback_group_id=feedback_group_id,
- )
- return example.id, result
- tqdm = _load_tqdm()
- with ls_utils.ContextThreadPoolExecutor(
- max_workers=max_concurrency or 1
- ) as executor:
- futures = []
- for example_id, runs_list in tqdm(runs_dict.items()):
- results[example_id] = {"runs": runs_list}
- for comparator in comparators:
- if max_concurrency > 1:
- future = executor.submit(
- evaluate_and_submit_feedback,
- runs_list,
- data[example_id],
- comparator,
- executor,
- )
- futures.append(future)
- else:
- _, result = evaluate_and_submit_feedback(
- runs_list, data[example_id], comparator, executor
- )
- results[example_id][f"feedback.{result.key}"] = result
- if futures:
- cf.wait(futures)
- for future in futures:
- example_id, result = future.result()
- results[example_id][f"feedback.{result.key}"] = result
- return ComparativeExperimentResults(results, data)
- class ComparativeExperimentResults:
- """Represents the results of an evaluate_comparative() call.
- This class provides an iterator interface to iterate over the experiment results
- as they become available. It also provides methods to access the experiment name,
- the number of results, and to wait for the results to be processed.
- Methods:
- experiment_name() -> str: Returns the name of the experiment.
- wait() -> None: Waits for the experiment data to be processed.
- """
- def __init__(
- self,
- results: dict,
- examples: Optional[dict[uuid.UUID, schemas.Example]] = None,
- ):
- self._results = results
- self._examples = examples
- def __getitem__(self, key):
- """Return the result associated with the given key."""
- return self._results[key]
- def __iter__(self):
- for key, value in self._results.items():
- yield {
- "example": self._examples[key] if self._examples else None,
- "evaluation_results": value,
- }
- ## Private API
- def _print_comparative_experiment_start(
- experiments: tuple[schemas.TracerSession, schemas.TracerSession],
- comparative_experiment: schemas.ComparativeExperiment,
- ) -> None:
- url = experiments[0].url or experiments[1].url
- if url:
- project_url = url.split("?")[0]
- dataset_id = comparative_experiment.reference_dataset_id
- base_url = project_url.split("/projects/p/")[0]
- comparison_url = (
- f"{base_url}/datasets/{dataset_id}/compare?"
- f"selectedSessions={'%2C'.join([str(e.id) for e in experiments])}"
- f"&comparativeExperiment={comparative_experiment.id}"
- )
- print( # noqa: T201
- f"View the pairwise evaluation results at:\n{comparison_url}\n\n"
- )
- def _is_callable(target: Union[TARGET_T, Iterable[schemas.Run], Runnable]) -> bool:
- return callable(target) or _is_langchain_runnable(target)
- def _evaluate(
- target: Union[TARGET_T, Iterable[schemas.Run], Runnable],
- /,
- data: DATA_T,
- evaluators: Optional[Sequence[EVALUATOR_T]] = None,
- summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
- metadata: Optional[dict] = None,
- experiment_prefix: Optional[str] = None,
- description: Optional[str] = None,
- max_concurrency: Optional[int] = None,
- num_repetitions: int = 1,
- client: Optional[langsmith.Client] = None,
- blocking: bool = True,
- experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
- upload_results: bool = True,
- error_handling: Literal["log", "ignore"] = "log",
- ) -> ExperimentResults:
- # Initialize the experiment manager.
- client = client or rt.get_cached_client()
- runs = None if _is_callable(target) else cast(Iterable[schemas.Run], target)
- experiment_, runs = _resolve_experiment(experiment, runs, client)
- manager = _ExperimentManager(
- data,
- client=client,
- metadata=metadata,
- experiment=experiment_ or experiment_prefix,
- description=description,
- num_repetitions=num_repetitions,
- # If provided, we don't need to create a new experiment.
- runs=runs,
- # Create or resolve the experiment.
- include_attachments=_include_attachments(target, evaluators),
- upload_results=upload_results,
- error_handling=error_handling,
- ).start()
- if cache_dir := ls_utils.get_cache_dir(None):
- cache_path = pathlib.Path(cache_dir) / f"{manager.dataset_id}.yaml"
- else:
- cache_path = None
- with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
- if _is_callable(target):
- # Add predictions to the experiment.
- manager = manager.with_predictions(
- cast(TARGET_T, target), max_concurrency=max_concurrency
- )
- if evaluators:
- # Apply evaluators to the predictions.
- manager = manager.with_evaluators(
- evaluators, max_concurrency=max_concurrency
- )
- if summary_evaluators:
- # Apply the experiment-level summary evaluators.
- manager = manager.with_summary_evaluators(summary_evaluators)
- # Start consuming the results.
- results = ExperimentResults(manager, blocking=blocking)
- return results
- def _is_uuid(value: str) -> bool:
- try:
- uuid.UUID(value)
- return True
- except ValueError:
- return False
- def _load_experiment(
- project: EXPERIMENT_T, client: langsmith.Client
- ) -> schemas.TracerSession:
- if isinstance(project, schemas.TracerSession):
- return project
- elif isinstance(project, uuid.UUID) or _is_uuid(project):
- return client.read_project(project_id=project)
- else:
- return client.read_project(project_name=project)
- def _load_traces(
- project: Union[str, uuid.UUID, schemas.TracerSession],
- client: langsmith.Client,
- load_nested: bool = False,
- ) -> list[schemas.Run]:
- """Load nested traces for a given project."""
- is_root = None if load_nested else True
- if isinstance(project, schemas.TracerSession):
- runs = client.list_runs(project_id=project.id, is_root=is_root)
- elif isinstance(project, uuid.UUID) or _is_uuid(project):
- runs = client.list_runs(project_id=project, is_root=is_root)
- else:
- runs = client.list_runs(project_name=project, is_root=is_root)
- if not load_nested:
- return list(runs)
- treemap: collections.defaultdict[uuid.UUID, list[schemas.Run]] = (
- collections.defaultdict(list)
- )
- results = []
- all_runs = {}
- for run in runs:
- if run.parent_run_id is not None:
- treemap[run.parent_run_id].append(run)
- else:
- results.append(run)
- all_runs[run.id] = run
- for run_id, child_runs in treemap.items():
- all_runs[run_id].child_runs = sorted(child_runs, key=lambda r: r.dotted_order)
- return results
- def _load_examples_map(
- client: langsmith.Client, project: schemas.TracerSession
- ) -> dict[uuid.UUID, schemas.Example]:
- return {
- e.id: e
- for e in client.list_examples(
- dataset_id=project.reference_dataset_id,
- as_of=project.metadata.get("dataset_version"),
- )
- }
- IT = TypeVar("IT")
- def _load_tqdm() -> Callable[[IT], IT]:
- try:
- from tqdm.auto import tqdm
- except ImportError:
- return lambda x: x
- return tqdm # type: ignore[return-value]
- ET = TypeVar("ET", bound="_ExperimentManagerMixin")
- class _ExperimentManagerMixin:
- def __init__(
- self,
- /,
- experiment: Optional[Union[schemas.TracerSession, str]],
- metadata: Optional[dict] = None,
- client: Optional[langsmith.Client] = None,
- description: Optional[str] = None,
- ):
- self.client = client or rt.get_cached_client()
- self._experiment: Optional[schemas.TracerSession] = None
- if experiment is None:
- self._experiment_name = _get_random_name()
- elif isinstance(experiment, str):
- self._experiment_name = experiment + "-" + str(uuid.uuid4().hex[:8])
- else:
- self._experiment_name = cast(str, experiment.name)
- self._experiment = experiment
- metadata = metadata or {}
- if not metadata.get("revision_id"):
- metadata = {
- "revision_id": ls_env.get_langchain_env_var_metadata().get(
- "revision_id"
- ),
- **metadata,
- }
- self._metadata = metadata or {}
- self._description = description
- @property
- def experiment_name(self) -> str:
- if self._experiment_name is not None:
- return self._experiment_name
- raise ValueError(
- "Experiment name not provided, and experiment not yet started."
- )
- def _get_experiment(self) -> schemas.TracerSession:
- if self._experiment is None:
- raise ValueError("Experiment not started yet.")
- return self._experiment
- def _get_experiment_metadata(self):
- project_metadata = self._metadata or {}
- project_metadata["__ls_runner"] = "py_sdk_evaluate"
- git_info = ls_env.get_git_info()
- if git_info:
- project_metadata = {
- **project_metadata,
- "git": git_info,
- }
- if self._experiment:
- project_metadata = {
- **self._experiment.metadata,
- **project_metadata,
- }
- return project_metadata
- def _create_experiment(
- self, dataset_id: uuid.UUID, metadata: dict
- ) -> schemas.TracerSession:
- # There is a chance of name collision, so we'll retry
- starting_name = self._experiment_name
- num_attempts = 10
- for _ in range(num_attempts):
- try:
- return self.client.create_project(
- self._experiment_name,
- description=self._description,
- reference_dataset_id=dataset_id,
- metadata=metadata,
- )
- except ls_utils.LangSmithConflictError:
- self._experiment_name = f"{starting_name}-{str(uuid.uuid4().hex[:6])}"
- raise ValueError(
- f"Could not find a unique experiment name in {num_attempts} attempts."
- " Please try again with a different experiment name."
- )
- def _get_project(self, first_example: schemas.Example) -> schemas.TracerSession:
- if self._experiment is None:
- project_metadata = self._get_experiment_metadata()
- project = self._create_experiment(
- first_example.dataset_id, project_metadata
- )
- else:
- project = self._experiment
- return project
- def _print_experiment_start(
- self, project: Optional[schemas.TracerSession], first_example: schemas.Example
- ) -> None:
- if project and project.url:
- # TODO: Make this a public API
- project_url = project.url.split("?")[0]
- dataset_id = first_example.dataset_id
- base_url = project_url.split("/projects/p/")[0]
- comparison_url = (
- f"{base_url}/datasets/{dataset_id}/compare?"
- f"selectedSessions={project.id}"
- )
- print( # noqa: T201
- f"View the evaluation results for experiment: '{self.experiment_name}'"
- f" at:\n{comparison_url}\n\n"
- )
- else:
- # HACKHACK
- print( # noqa: T201
- "Starting evaluation of experiment: %s", self.experiment_name
- )
- class _ExperimentManager(_ExperimentManagerMixin):
- """Manage the execution of experiments.
- Supports lazily running predictions and evaluations in parallel to facilitate
- result streaming and early debugging.
- Args:
- data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR
- a generator of examples.
- num_repetitions (int): The number of times to run over the data.
- runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment
- predictions.
- experiment (Optional[schemas.TracerSession]): The tracer session
- associated with the experiment.
- experiment_prefix (Optional[str]): The prefix for the experiment name.
- metadata (Optional[dict]): Additional metadata for the experiment.
- client (Optional[langsmith.Client]): The Langsmith client used for
- the experiment.
- evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation
- sresults for the experiment.
- summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results
- for the experiment.
- """
- def __init__(
- self,
- data: DATA_T,
- /,
- experiment: Optional[Union[schemas.TracerSession, str]],
- metadata: Optional[dict] = None,
- client: Optional[langsmith.Client] = None,
- runs: Optional[Iterable[schemas.Run]] = None,
- evaluation_results: Optional[Iterable[EvaluationResults]] = None,
- summary_results: Optional[Iterable[EvaluationResults]] = None,
- description: Optional[str] = None,
- num_repetitions: int = 1,
- include_attachments: bool = False,
- reuse_attachments: bool = False,
- upload_results: bool = True,
- attachment_raw_data_dict: Optional[dict] = None,
- error_handling: Literal["log", "ignore"] = "log",
- ):
- super().__init__(
- experiment=experiment,
- metadata=metadata,
- client=client,
- description=description,
- )
- self._data = data
- self._examples: Optional[Iterable[schemas.Example]] = None
- self._runs = runs
- self._evaluation_results = evaluation_results
- self._summary_results = summary_results
- self._num_repetitions = num_repetitions
- self._include_attachments = include_attachments
- self._reuse_attachments = reuse_attachments
- self._upload_results = upload_results
- self._attachment_raw_data_dict = attachment_raw_data_dict
- self._error_handling = error_handling
- def _reset_example_attachment_readers(
- self, example: schemas.Example
- ) -> schemas.Example:
- """Reset attachment readers for an example.
- This is only in the case that an attachment is going to be used by more
- than 1 callable (target + evaluators). In that case we keep a single copy
- of the attachment data in `self._attachment_raw_data_dict`, and create
- readers from that data. This makes it so that we don't have to keep
- copies of the same data in memory, instead we can just create readers
- from the same data.
- """
- if not hasattr(example, "attachments") or not example.attachments:
- return example
- new_attachments: dict[str, schemas.AttachmentInfo] = {}
- for name, attachment in example.attachments.items():
- if (
- self._attachment_raw_data_dict is not None
- and str(example.id) + name in self._attachment_raw_data_dict
- ):
- new_attachments[name] = {
- "presigned_url": attachment["presigned_url"],
- "reader": io.BytesIO(
- self._attachment_raw_data_dict[str(example.id) + name]
- ),
- "mime_type": attachment["mime_type"],
- }
- else:
- new_attachments[name] = attachment
- # Create a new Example instance with the updated attachments
- return schemas.Example(
- id=example.id,
- created_at=example.created_at,
- dataset_id=example.dataset_id,
- inputs=example.inputs,
- outputs=example.outputs,
- metadata=example.metadata,
- modified_at=example.modified_at,
- source_run_id=example.source_run_id,
- attachments=new_attachments,
- _host_url=example._host_url,
- _tenant_id=example._tenant_id,
- )
- @property
- def examples(self) -> Iterable[schemas.Example]:
- if self._examples is None:
- self._examples = _resolve_data(
- self._data,
- client=self.client,
- include_attachments=self._include_attachments,
- )
- if self._reuse_attachments and self._attachment_raw_data_dict is None:
- examples_copy, self._examples = itertools.tee(self._examples)
- self._attachment_raw_data_dict = {
- str(e.id) + name: value["reader"].read()
- for e in examples_copy
- for name, value in (e.attachments or {}).items()
- }
- if self._num_repetitions > 1:
- examples_list = list(self._examples)
- self._examples = itertools.chain.from_iterable(
- [
- self._reset_example_attachment_readers(example)
- for example in examples_list
- ]
- for _ in range(self._num_repetitions)
- )
- self._examples, examples_iter = itertools.tee(self._examples)
- return examples_iter
- @property
- def dataset_id(self) -> str:
- if self._experiment is None or not getattr(
- self._experiment, "reference_dataset_id", None
- ):
- example = next(iter(self.examples))
- return str(example.dataset_id)
- return str(
- cast(schemas.TracerSessionResult, self._experiment).reference_dataset_id
- )
- @property
- def evaluation_results(self) -> Iterable[EvaluationResults]:
- if self._evaluation_results is None:
- return ({"results": []} for _ in self.examples)
- return self._evaluation_results
- @property
- def runs(self) -> Iterable[schemas.Run]:
- if self._runs is None:
- raise ValueError(
- "Runs not provided in this experiment. Please predict first."
- )
- self._runs, runs_iter = itertools.tee(self._runs)
- return runs_iter
- def start(self) -> _ExperimentManager:
- first_example = next(itertools.islice(self.examples, 1))
- project = self._get_project(first_example) if self._upload_results else None
- self._print_experiment_start(project, first_example)
- self._metadata["num_repetitions"] = self._num_repetitions
- return self._copy(self.examples, experiment=project)
- def with_predictions(
- self,
- target: TARGET_T,
- /,
- max_concurrency: Optional[int] = None,
- ) -> _ExperimentManager:
- """Lazily apply the target function to the experiment."""
- context = copy_context()
- _experiment_results = context.run(
- self._predict,
- target,
- max_concurrency=max_concurrency,
- include_attachments=_target_include_attachments(target),
- )
- r1, r2 = itertools.tee(_experiment_results, 2)
- return self._copy(
- (pred["example"] for pred in r1), runs=(pred["run"] for pred in r2)
- )
- def with_evaluators(
- self,
- evaluators: Sequence[
- Union[
- EVALUATOR_T,
- RunEvaluator,
- ]
- ],
- *,
- max_concurrency: Optional[int] = None,
- ) -> _ExperimentManager:
- """Lazily apply the provided evaluators to the experiment."""
- evaluators = _resolve_evaluators(evaluators)
- context = copy_context()
- experiment_results = context.run(
- self._score, evaluators, max_concurrency=max_concurrency
- )
- # Split the generator into three so the manager
- # can consume each value individually.
- r1, r2, r3 = itertools.tee(experiment_results, 3)
- return self._copy(
- (result["example"] for result in r1),
- runs=(result["run"] for result in r2),
- evaluation_results=(result["evaluation_results"] for result in r3),
- )
- def with_summary_evaluators(
- self,
- summary_evaluators: Sequence[SUMMARY_EVALUATOR_T],
- ) -> _ExperimentManager:
- """Lazily apply the provided summary evaluators to the experiment."""
- wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators)
- context = copy_context()
- aggregate_feedback_gen = context.run(
- self._apply_summary_evaluators, wrapped_evaluators
- )
- return self._copy(
- self.examples, runs=self.runs, summary_results=aggregate_feedback_gen
- )
- def get_results(self) -> Iterable[ExperimentResultRow]:
- """Return the traces, evaluation results, and associated examples."""
- for run, example, evaluation_results in zip(
- self.runs, self.examples, self.evaluation_results
- ):
- yield ExperimentResultRow(
- run=run,
- example=example,
- evaluation_results=evaluation_results,
- )
- def get_summary_scores(self) -> dict[str, list[dict]]:
- """If `summary_evaluators` were applied, consume and return the results."""
- if self._summary_results is None:
- return {"results": []}
- # Consume the generator
- return {
- "results": [
- res # type: ignore[misc]
- for results in self._summary_results
- for res in results["results"]
- ]
- }
- # Private methods
- def _predict(
- self,
- target: TARGET_T,
- /,
- max_concurrency: Optional[int] = None,
- include_attachments: bool = False,
- ) -> Generator[_ForwardResults, None, None]:
- """Run the target function on the examples."""
- fn = _ensure_traceable(target)
- if max_concurrency == 0:
- for example in self.examples:
- yield _forward(
- fn,
- example,
- self.experiment_name,
- self._metadata,
- self.client,
- self._upload_results,
- include_attachments,
- self._error_handling,
- )
- else:
- with ls_utils.ContextThreadPoolExecutor(max_concurrency) as executor:
- futures = [
- executor.submit(
- _forward,
- fn,
- example,
- self.experiment_name,
- self._metadata,
- self.client,
- self._upload_results,
- include_attachments,
- self._error_handling,
- )
- for example in self.examples
- ]
- for future in cf.as_completed(futures):
- yield future.result()
- # Close out the project.
- self._end()
- def _run_evaluators(
- self,
- evaluators: Sequence[RunEvaluator],
- current_results: ExperimentResultRow,
- executor: cf.ThreadPoolExecutor,
- ) -> ExperimentResultRow:
- current_context = rh.get_tracing_context()
- metadata = {
- **(current_context["metadata"] or {}),
- **{
- "experiment": self.experiment_name,
- "reference_example_id": current_results["example"].id,
- "reference_run_id": current_results["run"].id,
- },
- }
- with rh.tracing_context(
- **{
- **current_context,
- "project_name": "evaluators",
- "metadata": metadata,
- "enabled": "local" if not self._upload_results else True,
- "client": self.client,
- }
- ):
- run = current_results["run"]
- example = current_results["example"]
- eval_results = current_results["evaluation_results"]
- for evaluator in evaluators:
- evaluator_run_id = uuid.uuid4()
- try:
- evaluator_response = evaluator.evaluate_run( # type: ignore[call-arg]
- run=run,
- example=example,
- evaluator_run_id=evaluator_run_id,
- )
- eval_results["results"].extend(
- self.client._select_eval_results(evaluator_response)
- )
- if self._upload_results:
- # TODO: This is a hack
- self.client._log_evaluation_feedback(
- evaluator_response, run=run, _executor=executor
- )
- except Exception as e:
- try:
- feedback_keys = _extract_feedback_keys(evaluator)
- error_response = EvaluationResults(
- results=[
- EvaluationResult(
- key=key,
- source_run_id=evaluator_run_id,
- comment=repr(e),
- extra={"error": True},
- )
- for key in feedback_keys
- ]
- )
- eval_results["results"].extend(
- self.client._select_eval_results(error_response)
- )
- if self._upload_results:
- # TODO: This is a hack
- self.client._log_evaluation_feedback(
- error_response, run=run, _executor=executor
- )
- except Exception as e2:
- logger.debug(f"Error parsing feedback keys: {e2}")
- pass
- logger.error(
- f"Error running evaluator {repr(evaluator)} on"
- f" run {run.id if run else ''}: {repr(e)}",
- exc_info=True,
- )
- if example.attachments is not None:
- for attachment in example.attachments:
- reader = example.attachments[attachment]["reader"]
- reader.seek(0)
- return ExperimentResultRow(
- run=run,
- example=example,
- evaluation_results=eval_results,
- )
- def _score(
- self,
- evaluators: Sequence[RunEvaluator],
- max_concurrency: Optional[int] = None,
- ) -> Iterable[ExperimentResultRow]:
- """Run the evaluators on the prediction stream.
- Expects runs to be available in the manager.
- (e.g. from a previous prediction step)
- """
- with ls_utils.ContextThreadPoolExecutor(
- max_workers=max_concurrency or 1
- ) as executor:
- if max_concurrency == 0:
- context = copy_context()
- for current_results in self.get_results():
- yield context.run(
- self._run_evaluators,
- evaluators,
- current_results,
- executor,
- )
- else:
- futures = set()
- for current_results in self.get_results():
- futures.add(
- executor.submit(
- self._run_evaluators,
- evaluators,
- current_results,
- executor,
- )
- )
- try:
- # Since prediction may be slow, yield (with a timeout) to
- # allow for early results to be emitted.
- for future in cf.as_completed(futures, timeout=0.001):
- yield future.result()
- futures.remove(future)
- except (cf.TimeoutError, TimeoutError):
- pass
- for future in cf.as_completed(futures):
- result = future.result()
- yield result
- def _apply_summary_evaluators(
- self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
- ) -> Generator[EvaluationResults, None, None]:
- runs, examples = [], []
- for run, example in zip(self.runs, self.examples):
- runs.append(run)
- examples.append(example)
- aggregate_feedback = []
- with ls_utils.ContextThreadPoolExecutor() as executor:
- project_id = self._get_experiment().id if self._upload_results else None
- current_context = rh.get_tracing_context()
- metadata = {
- **(current_context["metadata"] or {}),
- **{
- "experiment": self.experiment_name,
- "experiment_id": project_id,
- },
- }
- with rh.tracing_context(
- **{
- **current_context,
- "project_name": "evaluators",
- "metadata": metadata,
- "client": self.client,
- "enabled": "local" if not self._upload_results else True,
- }
- ):
- for evaluator in summary_evaluators:
- try:
- summary_eval_result = evaluator(runs, examples)
- # TODO: Expose public API for this.
- flattened_results = self.client._select_eval_results(
- summary_eval_result,
- fn_name=evaluator.__name__,
- )
- aggregate_feedback.extend(flattened_results)
- if self._upload_results:
- for result in flattened_results:
- feedback = result.dict(exclude={"target_run_id"})
- evaluator_info = feedback.pop("evaluator_info", None)
- executor.submit(
- self.client.create_feedback,
- **feedback,
- run_id=None,
- project_id=project_id,
- source_info=evaluator_info,
- )
- except Exception as e:
- logger.error(
- f"Error running summary evaluator {repr(evaluator)}: {e}",
- exc_info=True,
- )
- yield {"results": aggregate_feedback}
- def _get_dataset_version(self) -> Optional[str]:
- examples = list(self.examples)
- modified_at = [ex.modified_at for ex in examples if ex.modified_at]
- # Should always be defined in practice when fetched,
- # but the typing permits None
- max_modified_at = max(modified_at) if modified_at else None
- return max_modified_at.isoformat() if max_modified_at else None
- def _get_dataset_splits(self) -> Optional[list[str]]:
- examples = list(self.examples)
- splits = set()
- for example in examples:
- if (
- example.metadata
- and example.metadata.get("dataset_split")
- and isinstance(example.metadata["dataset_split"], list)
- ):
- for split in example.metadata["dataset_split"]:
- if isinstance(split, str):
- splits.add(split)
- else:
- splits.add("base")
- return list(splits)
- def _end(self) -> None:
- if not self._upload_results:
- return
- experiment = self._experiment
- if experiment is None:
- raise ValueError("Experiment not started yet.")
- project_metadata = self._get_experiment_metadata()
- project_metadata["dataset_version"] = self._get_dataset_version()
- project_metadata["dataset_splits"] = self._get_dataset_splits()
- self.client.update_project(
- experiment.id,
- metadata={
- **experiment.metadata,
- **project_metadata,
- },
- )
- def _copy(self, *args: Any, **kwargs: Any) -> _ExperimentManager:
- default_args = (self._data,)
- default_kwargs = {
- "experiment": self._experiment,
- "metadata": self._metadata,
- "runs": self._runs,
- "client": self.client,
- "evaluation_results": self._evaluation_results,
- "summary_results": self._summary_results,
- "include_attachments": self._include_attachments,
- "reuse_attachments": self._reuse_attachments,
- "upload_results": self._upload_results,
- "attachment_raw_data_dict": self._attachment_raw_data_dict,
- "error_handling": self._error_handling,
- }
- full_args = list(args) + list(default_args[len(args) :])
- full_kwargs = {**default_kwargs, **kwargs}
- return self.__class__(*full_args, **full_kwargs)
- def _resolve_evaluators(
- evaluators: Sequence[Union[EVALUATOR_T, RunEvaluator, AEVALUATOR_T]],
- ) -> Sequence[RunEvaluator]:
- results = []
- for evaluator in evaluators:
- if isinstance(evaluator, RunEvaluator):
- results.append(evaluator)
- elif isinstance(evaluator, LangChainStringEvaluator):
- results.append(evaluator.as_run_evaluator())
- else:
- results.append(run_evaluator(evaluator))
- return results
- def _wrap_summary_evaluators(
- evaluators: Sequence[SUMMARY_EVALUATOR_T],
- ) -> list[SUMMARY_EVALUATOR_T]:
- def _wrap(evaluator: SUMMARY_EVALUATOR_T) -> SUMMARY_EVALUATOR_T:
- eval_name = getattr(evaluator, "__name__", "BatchEvaluator")
- evaluator = _normalize_summary_evaluator(evaluator)
- @functools.wraps(evaluator)
- def _wrapper_inner(
- runs: Sequence[schemas.Run], examples: Sequence[schemas.Example]
- ) -> Union[EvaluationResult, EvaluationResults]:
- @rh.traceable(name=eval_name)
- def _wrapper_super_inner(
- runs_: str, examples_: str
- ) -> Union[EvaluationResult, EvaluationResults]:
- return evaluator(list(runs), list(examples))
- return _wrapper_super_inner(
- f"Runs[] (Length={len(runs)})", f"Examples[] (Length={len(examples)})"
- )
- return _wrapper_inner
- results = []
- for evaluator in evaluators:
- results.append(_wrap(evaluator))
- return results
- class _ForwardResults(TypedDict):
- run: schemas.Run
- example: schemas.Example
- def _forward(
- fn: rh.SupportsLangsmithExtra,
- example: schemas.Example,
- experiment_name: str,
- metadata: dict,
- client: langsmith.Client,
- upload_results: bool,
- include_attachments: bool = False,
- error_handling: Literal["log", "ignore"] = "log",
- ) -> _ForwardResults:
- run: Optional[schemas.RunBase] = None
- def _get_run(r: rt.RunTree) -> None:
- nonlocal run
- run = r
- def _set_reference_example_id(r: rt.RunTree) -> None:
- r.reference_example_id = example.id
- example_version = (example.modified_at or example.created_at).isoformat()
- langsmith_extra = rh.LangSmithExtra(
- on_end=_get_run,
- project_name=experiment_name,
- metadata={**metadata, "example_version": example_version},
- client=client,
- )
- if error_handling == "log":
- langsmith_extra["reference_example_id"] = example.id
- elif error_handling == "ignore":
- # Only set the reference_example_id if the run succeeds.
- langsmith_extra["_on_success"] = _set_reference_example_id
- else:
- raise ValueError(f"Unrecognized error_handling value: {error_handling=}")
- with rh.tracing_context(enabled="local" if not upload_results else True):
- try:
- arg_names = _get_target_args(fn)
- args = [getattr(example, argn) for argn in arg_names]
- fn(*args, langsmith_extra=langsmith_extra)
- # Reset attachment readers if attachments were used.
- if include_attachments and example.attachments is not None:
- for attachment in example.attachments:
- reader = example.attachments[attachment]["reader"]
- reader.seek(0)
- except Exception as e:
- logger.error(
- f"Error running target function: {e}", exc_info=True, stacklevel=1
- )
- return _ForwardResults(run=cast(schemas.Run, run), example=example)
- def _is_valid_uuid(value: str) -> bool:
- try:
- uuid.UUID(value)
- return True
- except ValueError:
- return False
- def _resolve_data(
- data: DATA_T,
- *,
- client: langsmith.Client,
- include_attachments: bool = False,
- ) -> Iterable[schemas.Example]:
- """Return the examples for the given dataset."""
- if isinstance(data, uuid.UUID):
- return client.list_examples(
- dataset_id=data, include_attachments=include_attachments
- )
- elif isinstance(data, str) and _is_valid_uuid(data):
- return client.list_examples(
- dataset_id=uuid.UUID(data), include_attachments=include_attachments
- )
- elif isinstance(data, str):
- return client.list_examples(
- dataset_name=data, include_attachments=include_attachments
- )
- elif isinstance(data, schemas.Dataset):
- return client.list_examples(
- dataset_id=data.id, include_attachments=include_attachments
- )
- return data
- def _ensure_traceable(
- target: TARGET_T | rh.SupportsLangsmithExtra[[dict], dict] | Runnable,
- ) -> rh.SupportsLangsmithExtra[[dict], dict]:
- """Ensure the target function is traceable."""
- if not _is_callable(target):
- raise ValueError(
- "Target must be a callable function or a langchain/langgraph object. For "
- "example:\n\n"
- "def predict(inputs: dict) -> dict:\n"
- " # do work, like chain.invoke(inputs)\n"
- " return {...}\n\n"
- "evaluate(\n"
- " predict,\n"
- " ...\n"
- ")"
- )
- if rh.is_traceable_function(target):
- fn: rh.SupportsLangsmithExtra[[dict], dict] = target
- else:
- if _is_langchain_runnable(target):
- target = target.invoke # type: ignore[union-attr]
- fn = rh.traceable(name="Target")(cast(Callable, target))
- return fn
- def _include_attachments(target: Any, evaluators: Optional[Sequence]) -> bool:
- return _target_include_attachments(target) or bool(
- _evaluators_include_attachments(evaluators)
- )
- def _evaluators_include_attachments(evaluators: Optional[Sequence]) -> int:
- if evaluators is None:
- return 0
- return sum(_evaluator_uses_attachments(e) for e in evaluators)
- def _evaluator_uses_attachments(evaluator: Any) -> bool:
- if not callable(evaluator):
- return False
- sig = inspect.signature(evaluator)
- params = list(sig.parameters.values())
- positional_params = [
- p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
- ]
- return any(p.name == "attachments" for p in positional_params)
- def _target_include_attachments(target: Any) -> bool:
- """Whether the target function accepts attachments."""
- return "attachments" in _get_target_args(target)
- def _get_target_args(target: Any) -> list[str]:
- """Whether the target function accepts attachments."""
- if not callable(target):
- return []
- if _is_langchain_runnable(target):
- return ["inputs"]
- # Check function signature
- sig = inspect.signature(target)
- params = list(sig.parameters.values())
- positional_params = [
- p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
- ]
- positional_no_default = [p for p in positional_params if p.default is p.empty]
- if len(positional_params) == 0:
- raise ValueError(
- "Target function must accept at least one positional argument (inputs)."
- )
- elif len(positional_no_default) > 3:
- raise ValueError(
- "Target function must accept at most three "
- "arguments without default values: (inputs, attachments, metadata)."
- )
- elif len(positional_no_default) > 1 and {
- p.name for p in positional_no_default
- }.difference(["inputs", "attachments", "metadata"]):
- raise ValueError(
- "When passing multiple positional arguments without default values, they "
- "must be named 'inputs', 'attachments', or 'metadata'. Received: "
- f"{[p.name for p in positional_no_default]}"
- )
- else:
- args = []
- for p in positional_params[:3]:
- if p.name in {"inputs", "attachments", "metadata"}:
- args.append(p.name)
- else:
- break
- return args or ["inputs"]
- def _resolve_experiment(
- experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]],
- runs: Optional[Iterable[schemas.Run]],
- client: langsmith.Client,
- ) -> tuple[
- Optional[Union[schemas.TracerSession, str]], Optional[Iterable[schemas.Run]]
- ]:
- # TODO: Remove this, handle outside the manager
- if experiment is not None:
- if isinstance(experiment, schemas.TracerSession):
- experiment_ = experiment
- else:
- experiment_ = _load_experiment(experiment, client)
- if not experiment_.name:
- raise ValueError("Experiment name must be defined if provided.")
- if not experiment_.reference_dataset_id:
- raise ValueError(
- "Experiment must have an associated reference_dataset_id, "
- "but none was provided."
- )
- return experiment_, runs
- # If we have runs, that means the experiment was already started.
- if runs is not None:
- runs_, runs = itertools.tee(runs)
- first_run = next(runs_)
- experiment_ = client.read_project(project_id=first_run.session_id)
- if not experiment_.name:
- raise ValueError("Experiment name not found for provided runs.")
- return experiment_, runs
- return None, None
- def _get_random_name() -> str:
- from langsmith.evaluation._name_generation import random_name # noqa: F401
- return random_name()
- def _extract_feedback_keys(evaluator: RunEvaluator):
- if isinstance(evaluator, DynamicRunEvaluator):
- if getattr(evaluator, "func", None):
- return _extract_code_evaluator_feedback_keys(evaluator.func)
- elif getattr(evaluator, "afunc", None):
- return _extract_code_evaluator_feedback_keys(evaluator.afunc)
- # TODO: Support for DynamicComparisonRunEvaluator
- if hasattr(evaluator, "evaluator"):
- # LangChainStringEvaluator
- if getattr(getattr(evaluator, "evaluator"), "evaluation_name", None):
- return [evaluator.evaluator.evaluation_name]
- return []
- def _extract_code_evaluator_feedback_keys(func: Callable) -> list[str]:
- python_code = inspect.getsource(func)
- def extract_dict_keys(node):
- if isinstance(node, ast.Dict):
- keys = []
- key_value = None
- for key, value in zip(node.keys, node.values):
- if isinstance(key, (ast.Str, ast.Constant)):
- key_str = key.s if isinstance(key, ast.Str) else key.value
- if key_str == "key" and isinstance(value, (ast.Str, ast.Constant)):
- key_value = (
- value.s if isinstance(value, ast.Str) else value.value
- )
- return [key_value] if key_value else keys
- elif (
- isinstance(node, ast.Call)
- and isinstance(node.func, ast.Name)
- and node.func.id == "dict"
- ):
- for keyword in node.keywords:
- if keyword.arg == "key" and isinstance(
- keyword.value, (ast.Str, ast.Constant)
- ):
- return [
- (
- keyword.value.s
- if isinstance(keyword.value, ast.Str)
- else keyword.value.value
- )
- ]
- return []
- def extract_evaluation_result_key(node):
- if (
- isinstance(node, ast.Call)
- and isinstance(node.func, ast.Name)
- and node.func.id == "EvaluationResult"
- ):
- for keyword in node.keywords:
- if keyword.arg == "key" and isinstance(
- keyword.value, (ast.Str, ast.Constant)
- ):
- return [
- (
- keyword.value.s
- if isinstance(keyword.value, ast.Str)
- else keyword.value.value
- )
- ]
- return []
- def extract_evaluation_results_keys(node, variables):
- if (
- isinstance(node, ast.Call)
- and isinstance(node.func, ast.Name)
- and node.func.id == "EvaluationResults"
- ):
- for keyword in node.keywords:
- if keyword.arg == "results":
- if isinstance(keyword.value, ast.Name):
- return variables.get(keyword.value.id, [])
- elif isinstance(keyword.value, ast.List):
- keys = []
- for elt in keyword.value.elts:
- keys.extend(extract_evaluation_result_key(elt))
- return keys
- elif isinstance(node, ast.Dict):
- for key, value in zip(node.keys, node.values):
- if isinstance(key, (ast.Str, ast.Constant)) and key.s == "results":
- if isinstance(value, ast.List):
- keys = []
- for elt in value.elts:
- if isinstance(elt, ast.Dict):
- for elt_key, elt_value in zip(elt.keys, elt.values):
- if (
- isinstance(elt_key, (ast.Str, ast.Constant))
- and elt_key.s == "key"
- ):
- if isinstance(
- elt_value, (ast.Str, ast.Constant)
- ):
- keys.append(elt_value.s)
- elif (
- isinstance(elt, ast.Call)
- and isinstance(elt.func, ast.Name)
- and elt.func.id in ("EvaluationResult", "dict")
- ):
- for keyword in elt.keywords:
- if keyword.arg == "key" and isinstance(
- keyword.value, (ast.Str, ast.Constant)
- ):
- keys.append(
- keyword.value.s
- if isinstance(keyword.value, ast.Str)
- else keyword.value.value
- )
- return keys
- return []
- python_code = textwrap.dedent(python_code)
- try:
- tree = ast.parse(python_code)
- function_def = tree.body[0]
- if not isinstance(function_def, (ast.FunctionDef, ast.AsyncFunctionDef)):
- return []
- variables = {}
- keys = []
- for node in ast.walk(function_def):
- if isinstance(node, ast.Assign):
- if isinstance(node.value, ast.List):
- list_keys = []
- for elt in node.value.elts:
- list_keys.extend(extract_evaluation_result_key(elt))
- if isinstance(node.targets[0], ast.Name):
- variables[node.targets[0].id] = list_keys
- elif isinstance(node, ast.Return) and node.value is not None:
- dict_keys = extract_dict_keys(node.value)
- eval_result_key = extract_evaluation_result_key(node.value)
- eval_results_keys = extract_evaluation_results_keys(
- node.value, variables
- )
- keys.extend(dict_keys)
- keys.extend(eval_result_key)
- keys.extend(eval_results_keys)
- # If no keys found, return the function name
- return keys if keys else [function_def.name]
- except SyntaxError:
- return []
- def _to_pandas(
- results: list[ExperimentResultRow],
- start: Optional[int] = 0,
- end: Optional[int] = None,
- ):
- try:
- import pandas as pd
- except ImportError as e:
- raise ImportError(
- "The 'pandas' library is required to use the 'to_pandas' function. "
- "Please install it using 'pip install pandas' or "
- "'conda install pandas' before calling this method."
- ) from e
- return pd.DataFrame(_flatten_experiment_results(results, start=start, end=end))
- def _flatten_experiment_results(
- results: list[ExperimentResultRow],
- start: Optional[int] = 0,
- end: Optional[int] = None,
- ):
- return [
- {
- **{f"inputs.{k}": v for k, v in (x["example"].inputs or {}).items()},
- **{f"outputs.{k}": v for k, v in (x["run"].outputs or {}).items()},
- "error": x["run"].error,
- **(
- {f"reference.{k}": v for k, v in x["example"].outputs.items()}
- if x["example"].outputs is not None
- else {}
- ),
- **{
- f"feedback.{r.key}": r.score if r.score is not None else r.value
- for r in x["evaluation_results"]["results"]
- },
- "execution_time": (
- (x["run"].end_time - x["run"].start_time).total_seconds()
- if x["run"].end_time
- else None
- ),
- "example_id": x["run"].reference_example_id,
- "id": x["run"].id,
- }
- for x in results[start:end]
- ]
- @functools.lru_cache(maxsize=1)
- def _import_langchain_runnable() -> Optional[type]:
- try:
- from langchain_core.runnables import Runnable
- return Runnable
- except ImportError:
- return None
- def _is_langchain_runnable(o: Any) -> bool:
- return bool((Runnable := _import_langchain_runnable()) and isinstance(o, Runnable))
|