_runner.py 88 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314
  1. """V2 Evaluation Interface."""
  2. from __future__ import annotations
  3. import ast
  4. import collections
  5. import concurrent.futures as cf
  6. import functools
  7. import inspect
  8. import io
  9. import itertools
  10. import logging
  11. import pathlib
  12. import queue
  13. import random
  14. import textwrap
  15. import threading
  16. import uuid
  17. from collections.abc import Awaitable, Generator, Iterable, Iterator, Sequence
  18. from contextvars import copy_context
  19. from typing import (
  20. TYPE_CHECKING,
  21. Any,
  22. Callable,
  23. Literal,
  24. Optional,
  25. TypeVar,
  26. Union,
  27. cast,
  28. )
  29. from typing_extensions import TypedDict, overload
  30. import langsmith
  31. from langsmith import env as ls_env
  32. from langsmith import run_helpers as rh
  33. from langsmith import run_trees as rt
  34. from langsmith import schemas
  35. from langsmith import utils as ls_utils
  36. from langsmith._internal._beta_decorator import _warn_once
  37. from langsmith.evaluation.evaluator import (
  38. SUMMARY_EVALUATOR_T,
  39. ComparisonEvaluationResult,
  40. DynamicComparisonRunEvaluator,
  41. DynamicRunEvaluator,
  42. EvaluationResult,
  43. EvaluationResults,
  44. RunEvaluator,
  45. _normalize_summary_evaluator,
  46. comparison_evaluator,
  47. run_evaluator,
  48. )
  49. from langsmith.evaluation.integrations import LangChainStringEvaluator
  50. if TYPE_CHECKING:
  51. import pandas as pd
  52. from langchain_core.runnables import Runnable
  53. DataFrame = pd.DataFrame
  54. else:
  55. DataFrame = Any
  56. logger = logging.getLogger(__name__)
  57. TARGET_T = Union[Callable[[dict], dict], Callable[[dict, dict], dict]]
  58. # Data format: dataset-name, dataset_id, or examples
  59. DATA_T = Union[str, uuid.UUID, Iterable[schemas.Example], schemas.Dataset]
  60. # Summary evaluator runs over the whole dataset
  61. # and reports aggregate metric(s)
  62. # Row-level evaluator
  63. EVALUATOR_T = Union[
  64. RunEvaluator,
  65. Callable[
  66. [schemas.Run, Optional[schemas.Example]],
  67. Union[EvaluationResult, EvaluationResults],
  68. ],
  69. Callable[..., Union[dict, EvaluationResults, EvaluationResult]],
  70. ]
  71. AEVALUATOR_T = Union[
  72. Callable[
  73. [schemas.Run, Optional[schemas.Example]],
  74. Awaitable[Union[EvaluationResult, EvaluationResults]],
  75. ],
  76. ]
  77. EXPERIMENT_T = Union[str, uuid.UUID, schemas.TracerSession]
  78. @overload
  79. def evaluate(
  80. target: Union[TARGET_T, Runnable, EXPERIMENT_T],
  81. /,
  82. data: Optional[DATA_T] = None,
  83. evaluators: Optional[Sequence[EVALUATOR_T]] = None,
  84. summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
  85. metadata: Optional[dict] = None,
  86. experiment_prefix: Optional[str] = None,
  87. description: Optional[str] = None,
  88. max_concurrency: Optional[int] = 0,
  89. num_repetitions: int = 1,
  90. client: Optional[langsmith.Client] = None,
  91. blocking: bool = True,
  92. experiment: Optional[EXPERIMENT_T] = None,
  93. upload_results: bool = True,
  94. **kwargs: Any,
  95. ) -> ExperimentResults: ...
  96. @overload
  97. def evaluate(
  98. target: Union[tuple[EXPERIMENT_T, EXPERIMENT_T]],
  99. /,
  100. data: Optional[DATA_T] = None,
  101. evaluators: Optional[Sequence[COMPARATIVE_EVALUATOR_T]] = None,
  102. summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
  103. metadata: Optional[dict] = None,
  104. experiment_prefix: Optional[str] = None,
  105. description: Optional[str] = None,
  106. max_concurrency: Optional[int] = 0,
  107. num_repetitions: int = 1,
  108. client: Optional[langsmith.Client] = None,
  109. blocking: bool = True,
  110. experiment: Optional[EXPERIMENT_T] = None,
  111. upload_results: bool = True,
  112. **kwargs: Any,
  113. ) -> ComparativeExperimentResults: ...
  114. def evaluate(
  115. target: Union[TARGET_T, Runnable, EXPERIMENT_T, tuple[EXPERIMENT_T, EXPERIMENT_T]],
  116. /,
  117. data: Optional[DATA_T] = None,
  118. evaluators: Optional[
  119. Union[Sequence[EVALUATOR_T], Sequence[COMPARATIVE_EVALUATOR_T]]
  120. ] = None,
  121. summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
  122. metadata: Optional[dict] = None,
  123. experiment_prefix: Optional[str] = None,
  124. description: Optional[str] = None,
  125. max_concurrency: Optional[int] = 0,
  126. num_repetitions: int = 1,
  127. client: Optional[langsmith.Client] = None,
  128. blocking: bool = True,
  129. experiment: Optional[EXPERIMENT_T] = None,
  130. upload_results: bool = True,
  131. error_handling: Literal["log", "ignore"] = "log",
  132. **kwargs: Any,
  133. ) -> Union[ExperimentResults, ComparativeExperimentResults]:
  134. r"""Evaluate a target system on a given dataset.
  135. Args:
  136. target (TARGET_T | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]):
  137. The target system or experiment(s) to evaluate.
  138. Can be a function that takes a dict and returns a `dict`, a langchain `Runnable`, an
  139. existing experiment ID, or a two-tuple of experiment IDs.
  140. data (DATA_T): The dataset to evaluate on.
  141. Can be a dataset name, a list of examples, or a generator of examples.
  142. evaluators (Sequence[EVALUATOR_T] | Sequence[COMPARATIVE_EVALUATOR_T] | None):
  143. A list of evaluators to run on each example. The evaluator signature
  144. depends on the target type.
  145. summary_evaluators (Sequence[SUMMARY_EVALUATOR_T] | None): A list of summary
  146. evaluators to run on the entire dataset.
  147. Should not be specified if comparing two existing experiments.
  148. metadata (dict | None): Metadata to attach to the experiment.
  149. experiment_prefix (str | None): A prefix to provide for your experiment name.
  150. description (str | None): A free-form text description for the experiment.
  151. max_concurrency (int | None): The maximum number of concurrent
  152. evaluations to run.
  153. If `None` then no limit is set. If `0` then no concurrency.
  154. client (langsmith.Client | None): The LangSmith client to use.
  155. blocking (bool): Whether to block until the evaluation is complete.
  156. num_repetitions (int): The number of times to run the evaluation.
  157. Each item in the dataset will be run and evaluated this many times.
  158. experiment (schemas.TracerSession | None): An existing experiment to
  159. extend.
  160. If provided, `experiment_prefix` is ignored.
  161. For advanced usage only. Should not be specified if target is an existing
  162. experiment or two-tuple fo experiments.
  163. error_handling (str, default="log"): How to handle individual run errors.
  164. `'log'` will trace the runs with the error message as part of the
  165. experiment, `'ignore'` will not count the run as part of the experiment at
  166. all.
  167. Returns:
  168. ExperimentResults: If target is a function, `Runnable`, or existing experiment.
  169. ComparativeExperimentResults: If target is a two-tuple of existing experiments.
  170. Examples:
  171. Prepare the dataset:
  172. >>> from typing import Sequence
  173. >>> from langsmith import Client
  174. >>> from langsmith.evaluation import evaluate
  175. >>> from langsmith.schemas import Example, Run
  176. >>> client = Client()
  177. >>> dataset = client.clone_public_dataset(
  178. ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
  179. ... )
  180. >>> dataset_name = "Evaluate Examples"
  181. Basic usage:
  182. >>> def accuracy(run: Run, example: Example):
  183. ... # Row-level evaluator for accuracy.
  184. ... pred = run.outputs["output"]
  185. ... expected = example.outputs["answer"]
  186. ... return {"score": expected.lower() == pred.lower()}
  187. >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
  188. ... # Experiment-level evaluator for precision.
  189. ... # TP / (TP + FP)
  190. ... predictions = [run.outputs["output"].lower() for run in runs]
  191. ... expected = [example.outputs["answer"].lower() for example in examples]
  192. ... # yes and no are the only possible answers
  193. ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
  194. ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
  195. ... return {"score": tp / (tp + fp)}
  196. >>> def predict(inputs: dict) -> dict:
  197. ... # This can be any function or just an API call to your app.
  198. ... return {"output": "Yes"}
  199. >>> results = evaluate(
  200. ... predict,
  201. ... data=dataset_name,
  202. ... evaluators=[accuracy],
  203. ... summary_evaluators=[precision],
  204. ... experiment_prefix="My Experiment",
  205. ... description="Evaluating the accuracy of a simple prediction model.",
  206. ... metadata={
  207. ... "my-prompt-version": "abcd-1234",
  208. ... },
  209. ... ) # doctest: +ELLIPSIS
  210. View the evaluation results for experiment:...
  211. Evaluating over only a subset of the examples
  212. >>> experiment_name = results.experiment_name
  213. >>> examples = client.list_examples(dataset_name=dataset_name, limit=5)
  214. >>> results = evaluate(
  215. ... predict,
  216. ... data=examples,
  217. ... evaluators=[accuracy],
  218. ... summary_evaluators=[precision],
  219. ... experiment_prefix="My Experiment",
  220. ... description="Just testing a subset synchronously.",
  221. ... ) # doctest: +ELLIPSIS
  222. View the evaluation results for experiment:...
  223. Streaming each prediction to more easily + eagerly debug.
  224. >>> results = evaluate(
  225. ... predict,
  226. ... data=dataset_name,
  227. ... evaluators=[accuracy],
  228. ... summary_evaluators=[precision],
  229. ... description="I don't even have to block!",
  230. ... blocking=False,
  231. ... ) # doctest: +ELLIPSIS
  232. View the evaluation results for experiment:...
  233. >>> for i, result in enumerate(results): # doctest: +ELLIPSIS
  234. ... pass
  235. Using the `evaluate` API with an off-the-shelf LangChain evaluator:
  236. >>> from langsmith.evaluation import LangChainStringEvaluator # doctest: +SKIP
  237. >>> from langchain_openai import ChatOpenAI # doctest: +SKIP
  238. >>> def prepare_criteria_data(run: Run, example: Example): # doctest: +SKIP
  239. ... return {
  240. ... "prediction": run.outputs["output"],
  241. ... "reference": example.outputs["answer"],
  242. ... "input": str(example.inputs),
  243. ... }
  244. >>> results = evaluate( # doctest: +SKIP
  245. ... predict,
  246. ... data=dataset_name,
  247. ... evaluators=[
  248. ... accuracy,
  249. ... LangChainStringEvaluator("embedding_distance"),
  250. ... LangChainStringEvaluator(
  251. ... "labeled_criteria",
  252. ... config={
  253. ... "criteria": {
  254. ... "usefulness": "The prediction is useful if it is correct"
  255. ... " and/or asks a useful followup question."
  256. ... },
  257. ... "llm": ChatOpenAI(model="gpt-4o"),
  258. ... },
  259. ... prepare_data=prepare_criteria_data,
  260. ... ),
  261. ... ],
  262. ... description="Evaluating with off-the-shelf LangChain evaluators.",
  263. ... summary_evaluators=[precision],
  264. ... )
  265. View the evaluation results for experiment:... # doctest: +SKIP
  266. Evaluating a LangChain object:
  267. >>> from langchain_core.runnables import chain as as_runnable
  268. >>> @as_runnable
  269. ... def nested_predict(inputs):
  270. ... return {"output": "Yes"}
  271. >>> @as_runnable
  272. ... def lc_predict(inputs):
  273. ... return nested_predict.invoke(inputs)
  274. >>> results = evaluate(
  275. ... lc_predict.invoke,
  276. ... data=dataset_name,
  277. ... evaluators=[accuracy],
  278. ... description="This time we're evaluating a LangChain object.",
  279. ... summary_evaluators=[precision],
  280. ... ) # doctest: +ELLIPSIS
  281. View the evaluation results for experiment:...
  282. !!! warning "Behavior changed in `langsmith` 0.2.0"
  283. 'max_concurrency' default updated from None (no limit on concurrency)
  284. to 0 (no concurrency at all).
  285. """ # noqa: E501
  286. if isinstance(target, (str, uuid.UUID, schemas.TracerSession)):
  287. invalid_args = {
  288. "num_repetitions": num_repetitions > 1,
  289. "experiment": bool(experiment),
  290. "upload_results": not upload_results,
  291. "experiment_prefix": bool(experiment_prefix),
  292. "data": bool(data),
  293. }
  294. if any(invalid_args.values()):
  295. msg = (
  296. f"Received invalid arguments. "
  297. f"{tuple(k for k, v in invalid_args.items() if v)} should not be "
  298. f"specified when target is an existing experiment."
  299. )
  300. raise ValueError(msg)
  301. target_id = target if isinstance(target, (str, uuid.UUID)) else target.id
  302. logger.debug(f"Running evaluation over existing experiment {target_id}...")
  303. return evaluate_existing(
  304. target,
  305. evaluators=cast(Optional[Sequence[EVALUATOR_T]], evaluators),
  306. summary_evaluators=summary_evaluators,
  307. metadata=metadata,
  308. max_concurrency=max_concurrency,
  309. client=client,
  310. blocking=blocking,
  311. **kwargs,
  312. )
  313. elif isinstance(target, (list, tuple)):
  314. invalid_args = {
  315. "num_repetitions": num_repetitions > 1,
  316. "experiment": bool(experiment),
  317. "upload_results": not upload_results,
  318. "summary_evaluators": bool(summary_evaluators),
  319. "data": bool(data),
  320. }
  321. if len(target) != 2 or not all(
  322. isinstance(t, (str, uuid.UUID, schemas.TracerSession)) for t in target
  323. ):
  324. msg = (
  325. "Received invalid target. If a tuple is specified it must have length "
  326. "2 and each element should by the ID or schemas.TracerSession of an "
  327. f"existing experiment. Received {target=}"
  328. )
  329. raise ValueError(msg)
  330. elif any(invalid_args.values()):
  331. msg = (
  332. f"Received invalid arguments. "
  333. f"{tuple(k for k, v in invalid_args.items() if v)} should not be "
  334. f"specified when target is two existing experiments."
  335. )
  336. raise ValueError(msg)
  337. if max_concurrency is not None:
  338. kwargs["max_concurrency"] = max_concurrency
  339. target_ids = [t if isinstance(t, (str, uuid.UUID)) else t.id for t in target]
  340. logger.debug(
  341. f"Running pairwise evaluation over existing experiments {target_ids}..."
  342. )
  343. return evaluate_comparative(
  344. target,
  345. evaluators=cast(Sequence[COMPARATIVE_EVALUATOR_T], evaluators or ()),
  346. experiment_prefix=experiment_prefix,
  347. description=description,
  348. client=client,
  349. metadata=metadata,
  350. **kwargs,
  351. )
  352. elif kwargs:
  353. msg = (
  354. f"Received unsupported arguments {kwargs}. These arguments are not "
  355. f"supported when creating a new experiment."
  356. )
  357. raise ValueError(msg)
  358. elif not data:
  359. msg = "Must specify 'data' when running evaluations over a target function."
  360. raise ValueError(msg)
  361. elif callable(target) and rh.is_async(target):
  362. msg = (
  363. "Async functions are not supported by `evaluate`. "
  364. "Please use `aevaluate` instead:\n\n"
  365. "from langsmith import aevaluate\n\n"
  366. "await aevaluate(\n"
  367. " async_target_function,\n"
  368. " data=data,\n"
  369. " evaluators=evaluators,\n"
  370. " # ... other parameters\n"
  371. ")"
  372. )
  373. raise ValueError(msg)
  374. elif experiment and experiment_prefix:
  375. msg = (
  376. "Expected at most one of 'experiment' or 'experiment_prefix',"
  377. " but both were provided. "
  378. f"Got: experiment={experiment}, experiment_prefix={experiment_prefix}"
  379. )
  380. raise ValueError(msg)
  381. else:
  382. if not upload_results:
  383. _warn_once("'upload_results' parameter is in beta.")
  384. logger.debug(f"Running evaluation over target system {target}...")
  385. return _evaluate(
  386. target,
  387. data=data,
  388. evaluators=cast(Optional[Sequence[EVALUATOR_T]], evaluators),
  389. summary_evaluators=summary_evaluators,
  390. metadata=metadata,
  391. experiment_prefix=experiment_prefix,
  392. description=description,
  393. max_concurrency=max_concurrency,
  394. num_repetitions=num_repetitions,
  395. client=client,
  396. blocking=blocking,
  397. experiment=experiment,
  398. upload_results=upload_results,
  399. error_handling=error_handling,
  400. )
  401. def evaluate_existing(
  402. experiment: Union[str, uuid.UUID, schemas.TracerSession],
  403. /,
  404. evaluators: Optional[Sequence[EVALUATOR_T]] = None,
  405. summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
  406. metadata: Optional[dict] = None,
  407. max_concurrency: Optional[int] = 0,
  408. client: Optional[langsmith.Client] = None,
  409. load_nested: bool = False,
  410. blocking: bool = True,
  411. ) -> ExperimentResults:
  412. r"""Evaluate existing experiment runs.
  413. Args:
  414. experiment (Union[str, uuid.UUID]): The identifier of the experiment to evaluate.
  415. evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation.
  416. summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): Optional sequence of evaluators
  417. to apply over the entire dataset.
  418. metadata (Optional[dict]): Optional metadata to include in the evaluation results.
  419. max_concurrency (int | None): The maximum number of concurrent
  420. evaluations to run.
  421. If `None` then no limit is set. If `0` then no concurrency.
  422. client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation.
  423. load_nested: Whether to load all child runs for the experiment.
  424. Default is to only load the top-level root runs.
  425. blocking (bool): Whether to block until evaluation is complete.
  426. Returns:
  427. The evaluation results.
  428. Environment:
  429. - `LANGSMITH_TEST_CACHE`: If set, API calls will be cached to disk to save time and
  430. cost during testing.
  431. Recommended to commit the cache files to your repository for faster CI/CD runs.
  432. Requires the `'langsmith[vcr]'` package to be installed.
  433. Examples:
  434. Define your evaluators
  435. >>> from typing import Sequence
  436. >>> from langsmith.schemas import Example, Run
  437. >>> def accuracy(run: Run, example: Example):
  438. ... # Row-level evaluator for accuracy.
  439. ... pred = run.outputs["output"]
  440. ... expected = example.outputs["answer"]
  441. ... return {"score": expected.lower() == pred.lower()}
  442. >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
  443. ... # Experiment-level evaluator for precision.
  444. ... # TP / (TP + FP)
  445. ... predictions = [run.outputs["output"].lower() for run in runs]
  446. ... expected = [example.outputs["answer"].lower() for example in examples]
  447. ... # yes and no are the only possible answers
  448. ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
  449. ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
  450. ... return {"score": tp / (tp + fp)}
  451. Load the experiment and run the evaluation.
  452. >>> import uuid
  453. >>> from langsmith import Client
  454. >>> from langsmith.evaluation import evaluate, evaluate_existing
  455. >>> client = Client()
  456. >>> dataset_name = "__doctest_evaluate_existing_" + uuid.uuid4().hex[:8]
  457. >>> dataset = client.create_dataset(dataset_name)
  458. >>> example = client.create_example(
  459. ... inputs={"question": "What is 2+2?"},
  460. ... outputs={"answer": "4"},
  461. ... dataset_id=dataset.id,
  462. ... )
  463. >>> def predict(inputs: dict) -> dict:
  464. ... return {"output": "4"}
  465. >>> # First run inference on the dataset
  466. ... results = evaluate(
  467. ... predict, data=dataset_name, experiment_prefix="doctest_experiment"
  468. ... ) # doctest: +ELLIPSIS
  469. View the evaluation results for experiment:...
  470. >>> experiment_id = results.experiment_name
  471. >>> # Wait for the experiment to be fully processed and check if we have results
  472. >>> len(results) > 0
  473. True
  474. >>> import time
  475. >>> time.sleep(2)
  476. >>> results = evaluate_existing(
  477. ... experiment_id,
  478. ... evaluators=[accuracy],
  479. ... summary_evaluators=[precision],
  480. ... ) # doctest: +ELLIPSIS
  481. View the evaluation results for experiment:...
  482. >>> client.delete_dataset(dataset_id=dataset.id)
  483. """ # noqa: E501
  484. client = client or rt.get_cached_client(timeout_ms=(20_000, 90_001))
  485. project = _load_experiment(experiment, client)
  486. runs = _load_traces(experiment, client, load_nested=load_nested)
  487. data_map = _load_examples_map(client, project)
  488. data = [data_map[cast(uuid.UUID, run.reference_example_id)] for run in runs]
  489. return _evaluate(
  490. runs,
  491. data=data,
  492. evaluators=evaluators,
  493. summary_evaluators=summary_evaluators,
  494. metadata=metadata,
  495. max_concurrency=max_concurrency,
  496. client=client,
  497. blocking=blocking,
  498. experiment=project,
  499. )
  500. class ExperimentResultRow(TypedDict):
  501. run: schemas.Run
  502. example: schemas.Example
  503. evaluation_results: EvaluationResults
  504. class ExperimentResults:
  505. """Represents the results of an evaluate() call.
  506. This class provides an iterator interface to iterate over the experiment results
  507. as they become available. It also provides methods to access the experiment name,
  508. the number of results, and to wait for the results to be processed.
  509. Methods:
  510. experiment_name() -> str: Returns the name of the experiment.
  511. wait() -> None: Waits for the experiment data to be processed.
  512. """
  513. def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True):
  514. self._manager = experiment_manager
  515. self._results: list[ExperimentResultRow] = []
  516. self._queue: queue.Queue[ExperimentResultRow] = queue.Queue()
  517. self._processing_complete = threading.Event()
  518. if not blocking:
  519. self._thread: Optional[threading.Thread] = threading.Thread(
  520. target=self._process_data
  521. )
  522. self._thread.start()
  523. else:
  524. self._thread = None
  525. self._process_data()
  526. @property
  527. def experiment_name(self) -> str:
  528. return self._manager.experiment_name
  529. def __iter__(self) -> Iterator[ExperimentResultRow]:
  530. ix = 0
  531. while (
  532. not self._processing_complete.is_set()
  533. or not self._queue.empty()
  534. or ix < len(self._results)
  535. ):
  536. try:
  537. if ix < len(self._results):
  538. yield self._results[ix]
  539. ix += 1
  540. else:
  541. self._queue.get(block=True, timeout=0.1)
  542. except queue.Empty:
  543. continue
  544. def _process_data(self) -> None:
  545. tqdm = _load_tqdm()
  546. results = self._manager.get_results()
  547. for item in tqdm(results):
  548. self._queue.put(item)
  549. self._results.append(item)
  550. summary_scores = self._manager.get_summary_scores()
  551. self._summary_results = summary_scores
  552. self._processing_complete.set()
  553. def __len__(self) -> int:
  554. return len(self._results)
  555. def to_pandas(
  556. self, start: Optional[int] = 0, end: Optional[int] = None
  557. ) -> DataFrame:
  558. return _to_pandas(self._results, start=start, end=end)
  559. def _repr_html_(self) -> str:
  560. import importlib.util
  561. if self._results and importlib.util.find_spec("pandas"):
  562. df = self.to_pandas()
  563. return df._repr_html_() # type: ignore[operator]
  564. else:
  565. return self.__repr__()
  566. def __repr__(self) -> str:
  567. return f"<ExperimentResults {self.experiment_name}>"
  568. def wait(self) -> None:
  569. """Wait for the evaluation runner to complete.
  570. This method blocks the current thread until the evaluation runner has
  571. finished its execution.
  572. """
  573. if self._thread:
  574. self._thread.join()
  575. ## Public API for Comparison Experiments
  576. # Row-level evaluator
  577. COMPARATIVE_EVALUATOR_T = Callable[
  578. [Sequence[schemas.Run], Optional[schemas.Example]],
  579. Union[
  580. Union[ComparisonEvaluationResult, dict],
  581. Awaitable[Union[ComparisonEvaluationResult, dict]],
  582. ],
  583. ]
  584. def evaluate_comparative(
  585. experiments: tuple[EXPERIMENT_T, EXPERIMENT_T],
  586. /,
  587. evaluators: Sequence[COMPARATIVE_EVALUATOR_T],
  588. experiment_prefix: Optional[str] = None,
  589. description: Optional[str] = None,
  590. max_concurrency: int = 5,
  591. client: Optional[langsmith.Client] = None,
  592. metadata: Optional[dict] = None,
  593. load_nested: bool = False,
  594. randomize_order: bool = False,
  595. ) -> ComparativeExperimentResults:
  596. r"""Evaluate existing experiment runs against each other.
  597. This lets you use pairwise preference scoring to generate more
  598. reliable feedback in your experiments.
  599. Args:
  600. experiments (Tuple[Union[str, uuid.UUID], Union[str, uuid.UUID]]):
  601. The identifiers of the experiments to compare.
  602. evaluators (Sequence[COMPARATIVE_EVALUATOR_T]):
  603. A list of evaluators to run on each example.
  604. experiment_prefix (Optional[str]): A prefix to provide for your experiment name.
  605. description (Optional[str]): A free-form text description for the experiment.
  606. max_concurrency (int): The maximum number of concurrent evaluations to run.
  607. client (Optional[langsmith.Client]): The LangSmith client to use.
  608. metadata (Optional[dict]): Metadata to attach to the experiment.
  609. load_nested (bool): Whether to load all child runs for the experiment.
  610. Default is to only load the top-level root runs.
  611. randomize_order (bool): Whether to randomize the order of the outputs for each evaluation.
  612. Returns:
  613. The results of the comparative evaluation.
  614. Examples:
  615. Suppose you want to compare two prompts to see which one is more effective.
  616. You would first prepare your dataset:
  617. >>> from typing import Sequence
  618. >>> from langsmith import Client
  619. >>> from langsmith.evaluation import evaluate
  620. >>> from langsmith.schemas import Example, Run
  621. >>> client = Client()
  622. >>> dataset = client.clone_public_dataset(
  623. ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
  624. ... )
  625. >>> dataset_name = "Evaluate Examples"
  626. Then you would run your different prompts:
  627. >>> import functools
  628. >>> import openai
  629. >>> from langsmith.evaluation import evaluate
  630. >>> from langsmith.wrappers import wrap_openai
  631. >>> oai_client = openai.Client()
  632. >>> wrapped_client = wrap_openai(oai_client)
  633. >>> prompt_1 = "You are a helpful assistant."
  634. >>> prompt_2 = "You are an exceedingly helpful assistant."
  635. >>> def predict(inputs: dict, prompt: str) -> dict:
  636. ... completion = wrapped_client.chat.completions.create(
  637. ... model="gpt-4o-mini",
  638. ... messages=[
  639. ... {"role": "system", "content": prompt},
  640. ... {
  641. ... "role": "user",
  642. ... "content": f"Context: {inputs['context']}"
  643. ... f"\n\ninputs['question']",
  644. ... },
  645. ... ],
  646. ... )
  647. ... return {"output": completion.choices[0].message.content}
  648. >>> results_1 = evaluate(
  649. ... functools.partial(predict, prompt=prompt_1),
  650. ... data=dataset_name,
  651. ... description="Evaluating our basic system prompt.",
  652. ... blocking=False, # Run these experiments in parallel
  653. ... ) # doctest: +ELLIPSIS
  654. View the evaluation results for experiment:...
  655. >>> results_2 = evaluate(
  656. ... functools.partial(predict, prompt=prompt_2),
  657. ... data=dataset_name,
  658. ... description="Evaluating our advanced system prompt.",
  659. ... blocking=False,
  660. ... ) # doctest: +ELLIPSIS
  661. View the evaluation results for experiment:...
  662. >>> results_1.wait()
  663. >>> results_2.wait()
  664. Finally, you would compare the two prompts directly:
  665. >>> import json
  666. >>> from langsmith.evaluation import evaluate_comparative
  667. >>> from langsmith import schemas
  668. >>> def score_preferences(runs: list, example: schemas.Example):
  669. ... assert len(runs) == 2 # Comparing 2 systems
  670. ... assert isinstance(example, schemas.Example)
  671. ... assert all(run.reference_example_id == example.id for run in runs)
  672. ... pred_a = runs[0].outputs["output"] if runs[0].outputs else ""
  673. ... pred_b = runs[1].outputs["output"] if runs[1].outputs else ""
  674. ... ground_truth = example.outputs["answer"] if example.outputs else ""
  675. ... tools = [
  676. ... {
  677. ... "type": "function",
  678. ... "function": {
  679. ... "name": "rank_preferences",
  680. ... "description": "Saves the prefered response ('A' or 'B')",
  681. ... "parameters": {
  682. ... "type": "object",
  683. ... "properties": {
  684. ... "reasoning": {
  685. ... "type": "string",
  686. ... "description": "The reasoning behind the choice.",
  687. ... },
  688. ... "preferred_option": {
  689. ... "type": "string",
  690. ... "enum": ["A", "B"],
  691. ... "description": "The preferred option, either 'A' or 'B'",
  692. ... },
  693. ... },
  694. ... "required": ["preferred_option"],
  695. ... },
  696. ... },
  697. ... }
  698. ... ]
  699. ... completion = openai.Client().chat.completions.create(
  700. ... model="gpt-4o-mini",
  701. ... messages=[
  702. ... {"role": "system", "content": "Select the better response."},
  703. ... {
  704. ... "role": "user",
  705. ... "content": f"Option A: {pred_a}"
  706. ... f"\n\nOption B: {pred_b}"
  707. ... f"\n\nGround Truth: {ground_truth}",
  708. ... },
  709. ... ],
  710. ... tools=tools,
  711. ... tool_choice={
  712. ... "type": "function",
  713. ... "function": {"name": "rank_preferences"},
  714. ... },
  715. ... )
  716. ... tool_args = completion.choices[0].message.tool_calls[0].function.arguments
  717. ... loaded_args = json.loads(tool_args)
  718. ... preference = loaded_args["preferred_option"]
  719. ... comment = loaded_args["reasoning"]
  720. ... if preference == "A":
  721. ... return {
  722. ... "key": "ranked_preference",
  723. ... "scores": {runs[0].id: 1, runs[1].id: 0},
  724. ... "comment": comment,
  725. ... }
  726. ... else:
  727. ... return {
  728. ... "key": "ranked_preference",
  729. ... "scores": {runs[0].id: 0, runs[1].id: 1},
  730. ... "comment": comment,
  731. ... }
  732. >>> def score_length_difference(runs: list, example: schemas.Example):
  733. ... # Just return whichever response is longer.
  734. ... # Just an example, not actually useful in real life.
  735. ... assert len(runs) == 2 # Comparing 2 systems
  736. ... assert isinstance(example, schemas.Example)
  737. ... assert all(run.reference_example_id == example.id for run in runs)
  738. ... pred_a = runs[0].outputs["output"] if runs[0].outputs else ""
  739. ... pred_b = runs[1].outputs["output"] if runs[1].outputs else ""
  740. ... if len(pred_a) > len(pred_b):
  741. ... return {
  742. ... "key": "length_difference",
  743. ... "scores": {runs[0].id: 1, runs[1].id: 0},
  744. ... }
  745. ... else:
  746. ... return {
  747. ... "key": "length_difference",
  748. ... "scores": {runs[0].id: 0, runs[1].id: 1},
  749. ... }
  750. >>> results = evaluate_comparative(
  751. ... [results_1.experiment_name, results_2.experiment_name],
  752. ... evaluators=[score_preferences, score_length_difference],
  753. ... client=client,
  754. ... ) # doctest: +ELLIPSIS
  755. View the pairwise evaluation results at:...
  756. >>> eval_results = list(results)
  757. >>> assert len(eval_results) >= 10 # doctest: +SKIP
  758. >>> assert all(
  759. ... "feedback.ranked_preference" in r["evaluation_results"]
  760. ... for r in eval_results
  761. ... ) # doctest: +SKIP
  762. >>> assert all(
  763. ... "feedback.length_difference" in r["evaluation_results"]
  764. ... for r in eval_results
  765. ... ) # doctest: +SKIP
  766. """ # noqa: E501
  767. if len(experiments) < 2:
  768. raise ValueError("Comparative evaluation requires at least 2 experiments.")
  769. if not evaluators:
  770. raise ValueError(
  771. "At least one evaluator is required for comparative evaluation."
  772. )
  773. if max_concurrency < 0:
  774. raise ValueError("max_concurrency must be a positive integer.")
  775. client = client or rt.get_cached_client()
  776. # TODO: Add information about comparison experiments
  777. projects = [_load_experiment(experiment, client) for experiment in experiments]
  778. ref_datasets_ = [str(p.reference_dataset_id) for p in projects]
  779. if not len(set(ref_datasets_)) == 1:
  780. raise ValueError("All experiments must have the same reference dataset.")
  781. experiment_ids = [p.id for p in projects]
  782. if experiment_prefix is None:
  783. experiment_names = [p.name for p in projects if p.name is not None]
  784. experiment_name = (
  785. " vs. ".join(experiment_names) + "-" + str(uuid.uuid4().hex[:4])
  786. )
  787. else:
  788. experiment_name = experiment_prefix + "-" + str(uuid.uuid4().hex[:8])
  789. comparative_experiment_id = uuid.uuid4()
  790. comparative_experiment = client.create_comparative_experiment(
  791. experiment_name,
  792. experiments=experiment_ids,
  793. description=description,
  794. metadata=metadata,
  795. id=comparative_experiment_id,
  796. )
  797. _print_comparative_experiment_start(
  798. cast(
  799. tuple[schemas.TracerSessionResult, schemas.TracerSessionResult],
  800. tuple(projects),
  801. ),
  802. comparative_experiment,
  803. )
  804. runs = [
  805. _load_traces(experiment, client, load_nested=load_nested)
  806. for experiment in experiments
  807. ]
  808. # Only check intersections for the experiments
  809. examples_intersection = None
  810. for runs_list in runs:
  811. example_ids_set = {run.reference_example_id for run in runs_list}
  812. if examples_intersection is None:
  813. examples_intersection = example_ids_set
  814. else:
  815. examples_intersection &= example_ids_set
  816. example_ids_nullable = (
  817. list(examples_intersection) if examples_intersection is not None else []
  818. )
  819. example_ids = [eid for eid in example_ids_nullable if eid is not None]
  820. # TODO: Warn if different dataset versions, etc. are used in the different
  821. # experiments. We aren't providing any training wheels here.
  822. batch_size = 99
  823. data = {}
  824. for i in range(0, len(example_ids), batch_size):
  825. example_ids_batch = example_ids[i : i + batch_size]
  826. for e in client.list_examples(
  827. dataset_id=projects[0].reference_dataset_id,
  828. as_of=projects[0].metadata.get("dataset_version"),
  829. example_ids=example_ids_batch,
  830. ):
  831. data[e.id] = e
  832. runs_dict: dict[uuid.UUID, list[schemas.Run]] = collections.defaultdict(list)
  833. for runs_list in runs:
  834. for run in runs_list:
  835. if run.reference_example_id in data:
  836. runs_dict[cast(uuid.UUID, run.reference_example_id)].append(run)
  837. comparators = [comparison_evaluator(evaluator) for evaluator in evaluators or []]
  838. results: dict = {}
  839. def evaluate_and_submit_feedback(
  840. runs_list: list[schemas.Run],
  841. example: schemas.Example,
  842. comparator: DynamicComparisonRunEvaluator,
  843. executor: cf.Executor,
  844. ) -> tuple[uuid.UUID, ComparisonEvaluationResult]:
  845. feedback_group_id = uuid.uuid4()
  846. if randomize_order:
  847. random.shuffle(runs_list)
  848. with rh.tracing_context(project_name="evaluators", client=client):
  849. result = comparator.compare_runs(runs_list, example)
  850. if client is None:
  851. raise ValueError("Client is required to submit feedback.")
  852. comments = (
  853. {str(rid): result.comment for rid in result.scores}
  854. if isinstance(result.comment, str)
  855. else (result.comment or {})
  856. )
  857. for run_id, score in result.scores.items():
  858. executor.submit(
  859. client.create_feedback,
  860. run_id=run_id,
  861. key=result.key,
  862. score=score,
  863. comment=comments.get(str(run_id)),
  864. comparative_experiment_id=comparative_experiment.id,
  865. source_run_id=result.source_run_id,
  866. feedback_group_id=feedback_group_id,
  867. )
  868. return example.id, result
  869. tqdm = _load_tqdm()
  870. with ls_utils.ContextThreadPoolExecutor(
  871. max_workers=max_concurrency or 1
  872. ) as executor:
  873. futures = []
  874. for example_id, runs_list in tqdm(runs_dict.items()):
  875. results[example_id] = {"runs": runs_list}
  876. for comparator in comparators:
  877. if max_concurrency > 1:
  878. future = executor.submit(
  879. evaluate_and_submit_feedback,
  880. runs_list,
  881. data[example_id],
  882. comparator,
  883. executor,
  884. )
  885. futures.append(future)
  886. else:
  887. _, result = evaluate_and_submit_feedback(
  888. runs_list, data[example_id], comparator, executor
  889. )
  890. results[example_id][f"feedback.{result.key}"] = result
  891. if futures:
  892. cf.wait(futures)
  893. for future in futures:
  894. example_id, result = future.result()
  895. results[example_id][f"feedback.{result.key}"] = result
  896. return ComparativeExperimentResults(results, data)
  897. class ComparativeExperimentResults:
  898. """Represents the results of an evaluate_comparative() call.
  899. This class provides an iterator interface to iterate over the experiment results
  900. as they become available. It also provides methods to access the experiment name,
  901. the number of results, and to wait for the results to be processed.
  902. Methods:
  903. experiment_name() -> str: Returns the name of the experiment.
  904. wait() -> None: Waits for the experiment data to be processed.
  905. """
  906. def __init__(
  907. self,
  908. results: dict,
  909. examples: Optional[dict[uuid.UUID, schemas.Example]] = None,
  910. ):
  911. self._results = results
  912. self._examples = examples
  913. def __getitem__(self, key):
  914. """Return the result associated with the given key."""
  915. return self._results[key]
  916. def __iter__(self):
  917. for key, value in self._results.items():
  918. yield {
  919. "example": self._examples[key] if self._examples else None,
  920. "evaluation_results": value,
  921. }
  922. ## Private API
  923. def _print_comparative_experiment_start(
  924. experiments: tuple[schemas.TracerSession, schemas.TracerSession],
  925. comparative_experiment: schemas.ComparativeExperiment,
  926. ) -> None:
  927. url = experiments[0].url or experiments[1].url
  928. if url:
  929. project_url = url.split("?")[0]
  930. dataset_id = comparative_experiment.reference_dataset_id
  931. base_url = project_url.split("/projects/p/")[0]
  932. comparison_url = (
  933. f"{base_url}/datasets/{dataset_id}/compare?"
  934. f"selectedSessions={'%2C'.join([str(e.id) for e in experiments])}"
  935. f"&comparativeExperiment={comparative_experiment.id}"
  936. )
  937. print( # noqa: T201
  938. f"View the pairwise evaluation results at:\n{comparison_url}\n\n"
  939. )
  940. def _is_callable(target: Union[TARGET_T, Iterable[schemas.Run], Runnable]) -> bool:
  941. return callable(target) or _is_langchain_runnable(target)
  942. def _evaluate(
  943. target: Union[TARGET_T, Iterable[schemas.Run], Runnable],
  944. /,
  945. data: DATA_T,
  946. evaluators: Optional[Sequence[EVALUATOR_T]] = None,
  947. summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
  948. metadata: Optional[dict] = None,
  949. experiment_prefix: Optional[str] = None,
  950. description: Optional[str] = None,
  951. max_concurrency: Optional[int] = None,
  952. num_repetitions: int = 1,
  953. client: Optional[langsmith.Client] = None,
  954. blocking: bool = True,
  955. experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
  956. upload_results: bool = True,
  957. error_handling: Literal["log", "ignore"] = "log",
  958. ) -> ExperimentResults:
  959. # Initialize the experiment manager.
  960. client = client or rt.get_cached_client()
  961. runs = None if _is_callable(target) else cast(Iterable[schemas.Run], target)
  962. experiment_, runs = _resolve_experiment(experiment, runs, client)
  963. manager = _ExperimentManager(
  964. data,
  965. client=client,
  966. metadata=metadata,
  967. experiment=experiment_ or experiment_prefix,
  968. description=description,
  969. num_repetitions=num_repetitions,
  970. # If provided, we don't need to create a new experiment.
  971. runs=runs,
  972. # Create or resolve the experiment.
  973. include_attachments=_include_attachments(target, evaluators),
  974. upload_results=upload_results,
  975. error_handling=error_handling,
  976. ).start()
  977. if cache_dir := ls_utils.get_cache_dir(None):
  978. cache_path = pathlib.Path(cache_dir) / f"{manager.dataset_id}.yaml"
  979. else:
  980. cache_path = None
  981. with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
  982. if _is_callable(target):
  983. # Add predictions to the experiment.
  984. manager = manager.with_predictions(
  985. cast(TARGET_T, target), max_concurrency=max_concurrency
  986. )
  987. if evaluators:
  988. # Apply evaluators to the predictions.
  989. manager = manager.with_evaluators(
  990. evaluators, max_concurrency=max_concurrency
  991. )
  992. if summary_evaluators:
  993. # Apply the experiment-level summary evaluators.
  994. manager = manager.with_summary_evaluators(summary_evaluators)
  995. # Start consuming the results.
  996. results = ExperimentResults(manager, blocking=blocking)
  997. return results
  998. def _is_uuid(value: str) -> bool:
  999. try:
  1000. uuid.UUID(value)
  1001. return True
  1002. except ValueError:
  1003. return False
  1004. def _load_experiment(
  1005. project: EXPERIMENT_T, client: langsmith.Client
  1006. ) -> schemas.TracerSession:
  1007. if isinstance(project, schemas.TracerSession):
  1008. return project
  1009. elif isinstance(project, uuid.UUID) or _is_uuid(project):
  1010. return client.read_project(project_id=project)
  1011. else:
  1012. return client.read_project(project_name=project)
  1013. def _load_traces(
  1014. project: Union[str, uuid.UUID, schemas.TracerSession],
  1015. client: langsmith.Client,
  1016. load_nested: bool = False,
  1017. ) -> list[schemas.Run]:
  1018. """Load nested traces for a given project."""
  1019. is_root = None if load_nested else True
  1020. if isinstance(project, schemas.TracerSession):
  1021. runs = client.list_runs(project_id=project.id, is_root=is_root)
  1022. elif isinstance(project, uuid.UUID) or _is_uuid(project):
  1023. runs = client.list_runs(project_id=project, is_root=is_root)
  1024. else:
  1025. runs = client.list_runs(project_name=project, is_root=is_root)
  1026. if not load_nested:
  1027. return list(runs)
  1028. treemap: collections.defaultdict[uuid.UUID, list[schemas.Run]] = (
  1029. collections.defaultdict(list)
  1030. )
  1031. results = []
  1032. all_runs = {}
  1033. for run in runs:
  1034. if run.parent_run_id is not None:
  1035. treemap[run.parent_run_id].append(run)
  1036. else:
  1037. results.append(run)
  1038. all_runs[run.id] = run
  1039. for run_id, child_runs in treemap.items():
  1040. all_runs[run_id].child_runs = sorted(child_runs, key=lambda r: r.dotted_order)
  1041. return results
  1042. def _load_examples_map(
  1043. client: langsmith.Client, project: schemas.TracerSession
  1044. ) -> dict[uuid.UUID, schemas.Example]:
  1045. return {
  1046. e.id: e
  1047. for e in client.list_examples(
  1048. dataset_id=project.reference_dataset_id,
  1049. as_of=project.metadata.get("dataset_version"),
  1050. )
  1051. }
  1052. IT = TypeVar("IT")
  1053. def _load_tqdm() -> Callable[[IT], IT]:
  1054. try:
  1055. from tqdm.auto import tqdm
  1056. except ImportError:
  1057. return lambda x: x
  1058. return tqdm # type: ignore[return-value]
  1059. ET = TypeVar("ET", bound="_ExperimentManagerMixin")
  1060. class _ExperimentManagerMixin:
  1061. def __init__(
  1062. self,
  1063. /,
  1064. experiment: Optional[Union[schemas.TracerSession, str]],
  1065. metadata: Optional[dict] = None,
  1066. client: Optional[langsmith.Client] = None,
  1067. description: Optional[str] = None,
  1068. ):
  1069. self.client = client or rt.get_cached_client()
  1070. self._experiment: Optional[schemas.TracerSession] = None
  1071. if experiment is None:
  1072. self._experiment_name = _get_random_name()
  1073. elif isinstance(experiment, str):
  1074. self._experiment_name = experiment + "-" + str(uuid.uuid4().hex[:8])
  1075. else:
  1076. self._experiment_name = cast(str, experiment.name)
  1077. self._experiment = experiment
  1078. metadata = metadata or {}
  1079. if not metadata.get("revision_id"):
  1080. metadata = {
  1081. "revision_id": ls_env.get_langchain_env_var_metadata().get(
  1082. "revision_id"
  1083. ),
  1084. **metadata,
  1085. }
  1086. self._metadata = metadata or {}
  1087. self._description = description
  1088. @property
  1089. def experiment_name(self) -> str:
  1090. if self._experiment_name is not None:
  1091. return self._experiment_name
  1092. raise ValueError(
  1093. "Experiment name not provided, and experiment not yet started."
  1094. )
  1095. def _get_experiment(self) -> schemas.TracerSession:
  1096. if self._experiment is None:
  1097. raise ValueError("Experiment not started yet.")
  1098. return self._experiment
  1099. def _get_experiment_metadata(self):
  1100. project_metadata = self._metadata or {}
  1101. project_metadata["__ls_runner"] = "py_sdk_evaluate"
  1102. git_info = ls_env.get_git_info()
  1103. if git_info:
  1104. project_metadata = {
  1105. **project_metadata,
  1106. "git": git_info,
  1107. }
  1108. if self._experiment:
  1109. project_metadata = {
  1110. **self._experiment.metadata,
  1111. **project_metadata,
  1112. }
  1113. return project_metadata
  1114. def _create_experiment(
  1115. self, dataset_id: uuid.UUID, metadata: dict
  1116. ) -> schemas.TracerSession:
  1117. # There is a chance of name collision, so we'll retry
  1118. starting_name = self._experiment_name
  1119. num_attempts = 10
  1120. for _ in range(num_attempts):
  1121. try:
  1122. return self.client.create_project(
  1123. self._experiment_name,
  1124. description=self._description,
  1125. reference_dataset_id=dataset_id,
  1126. metadata=metadata,
  1127. )
  1128. except ls_utils.LangSmithConflictError:
  1129. self._experiment_name = f"{starting_name}-{str(uuid.uuid4().hex[:6])}"
  1130. raise ValueError(
  1131. f"Could not find a unique experiment name in {num_attempts} attempts."
  1132. " Please try again with a different experiment name."
  1133. )
  1134. def _get_project(self, first_example: schemas.Example) -> schemas.TracerSession:
  1135. if self._experiment is None:
  1136. project_metadata = self._get_experiment_metadata()
  1137. project = self._create_experiment(
  1138. first_example.dataset_id, project_metadata
  1139. )
  1140. else:
  1141. project = self._experiment
  1142. return project
  1143. def _print_experiment_start(
  1144. self, project: Optional[schemas.TracerSession], first_example: schemas.Example
  1145. ) -> None:
  1146. if project and project.url:
  1147. # TODO: Make this a public API
  1148. project_url = project.url.split("?")[0]
  1149. dataset_id = first_example.dataset_id
  1150. base_url = project_url.split("/projects/p/")[0]
  1151. comparison_url = (
  1152. f"{base_url}/datasets/{dataset_id}/compare?"
  1153. f"selectedSessions={project.id}"
  1154. )
  1155. print( # noqa: T201
  1156. f"View the evaluation results for experiment: '{self.experiment_name}'"
  1157. f" at:\n{comparison_url}\n\n"
  1158. )
  1159. else:
  1160. # HACKHACK
  1161. print( # noqa: T201
  1162. "Starting evaluation of experiment: %s", self.experiment_name
  1163. )
  1164. class _ExperimentManager(_ExperimentManagerMixin):
  1165. """Manage the execution of experiments.
  1166. Supports lazily running predictions and evaluations in parallel to facilitate
  1167. result streaming and early debugging.
  1168. Args:
  1169. data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR
  1170. a generator of examples.
  1171. num_repetitions (int): The number of times to run over the data.
  1172. runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment
  1173. predictions.
  1174. experiment (Optional[schemas.TracerSession]): The tracer session
  1175. associated with the experiment.
  1176. experiment_prefix (Optional[str]): The prefix for the experiment name.
  1177. metadata (Optional[dict]): Additional metadata for the experiment.
  1178. client (Optional[langsmith.Client]): The Langsmith client used for
  1179. the experiment.
  1180. evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation
  1181. sresults for the experiment.
  1182. summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results
  1183. for the experiment.
  1184. """
  1185. def __init__(
  1186. self,
  1187. data: DATA_T,
  1188. /,
  1189. experiment: Optional[Union[schemas.TracerSession, str]],
  1190. metadata: Optional[dict] = None,
  1191. client: Optional[langsmith.Client] = None,
  1192. runs: Optional[Iterable[schemas.Run]] = None,
  1193. evaluation_results: Optional[Iterable[EvaluationResults]] = None,
  1194. summary_results: Optional[Iterable[EvaluationResults]] = None,
  1195. description: Optional[str] = None,
  1196. num_repetitions: int = 1,
  1197. include_attachments: bool = False,
  1198. reuse_attachments: bool = False,
  1199. upload_results: bool = True,
  1200. attachment_raw_data_dict: Optional[dict] = None,
  1201. error_handling: Literal["log", "ignore"] = "log",
  1202. ):
  1203. super().__init__(
  1204. experiment=experiment,
  1205. metadata=metadata,
  1206. client=client,
  1207. description=description,
  1208. )
  1209. self._data = data
  1210. self._examples: Optional[Iterable[schemas.Example]] = None
  1211. self._runs = runs
  1212. self._evaluation_results = evaluation_results
  1213. self._summary_results = summary_results
  1214. self._num_repetitions = num_repetitions
  1215. self._include_attachments = include_attachments
  1216. self._reuse_attachments = reuse_attachments
  1217. self._upload_results = upload_results
  1218. self._attachment_raw_data_dict = attachment_raw_data_dict
  1219. self._error_handling = error_handling
  1220. def _reset_example_attachment_readers(
  1221. self, example: schemas.Example
  1222. ) -> schemas.Example:
  1223. """Reset attachment readers for an example.
  1224. This is only in the case that an attachment is going to be used by more
  1225. than 1 callable (target + evaluators). In that case we keep a single copy
  1226. of the attachment data in `self._attachment_raw_data_dict`, and create
  1227. readers from that data. This makes it so that we don't have to keep
  1228. copies of the same data in memory, instead we can just create readers
  1229. from the same data.
  1230. """
  1231. if not hasattr(example, "attachments") or not example.attachments:
  1232. return example
  1233. new_attachments: dict[str, schemas.AttachmentInfo] = {}
  1234. for name, attachment in example.attachments.items():
  1235. if (
  1236. self._attachment_raw_data_dict is not None
  1237. and str(example.id) + name in self._attachment_raw_data_dict
  1238. ):
  1239. new_attachments[name] = {
  1240. "presigned_url": attachment["presigned_url"],
  1241. "reader": io.BytesIO(
  1242. self._attachment_raw_data_dict[str(example.id) + name]
  1243. ),
  1244. "mime_type": attachment["mime_type"],
  1245. }
  1246. else:
  1247. new_attachments[name] = attachment
  1248. # Create a new Example instance with the updated attachments
  1249. return schemas.Example(
  1250. id=example.id,
  1251. created_at=example.created_at,
  1252. dataset_id=example.dataset_id,
  1253. inputs=example.inputs,
  1254. outputs=example.outputs,
  1255. metadata=example.metadata,
  1256. modified_at=example.modified_at,
  1257. source_run_id=example.source_run_id,
  1258. attachments=new_attachments,
  1259. _host_url=example._host_url,
  1260. _tenant_id=example._tenant_id,
  1261. )
  1262. @property
  1263. def examples(self) -> Iterable[schemas.Example]:
  1264. if self._examples is None:
  1265. self._examples = _resolve_data(
  1266. self._data,
  1267. client=self.client,
  1268. include_attachments=self._include_attachments,
  1269. )
  1270. if self._reuse_attachments and self._attachment_raw_data_dict is None:
  1271. examples_copy, self._examples = itertools.tee(self._examples)
  1272. self._attachment_raw_data_dict = {
  1273. str(e.id) + name: value["reader"].read()
  1274. for e in examples_copy
  1275. for name, value in (e.attachments or {}).items()
  1276. }
  1277. if self._num_repetitions > 1:
  1278. examples_list = list(self._examples)
  1279. self._examples = itertools.chain.from_iterable(
  1280. [
  1281. self._reset_example_attachment_readers(example)
  1282. for example in examples_list
  1283. ]
  1284. for _ in range(self._num_repetitions)
  1285. )
  1286. self._examples, examples_iter = itertools.tee(self._examples)
  1287. return examples_iter
  1288. @property
  1289. def dataset_id(self) -> str:
  1290. if self._experiment is None or not getattr(
  1291. self._experiment, "reference_dataset_id", None
  1292. ):
  1293. example = next(iter(self.examples))
  1294. return str(example.dataset_id)
  1295. return str(
  1296. cast(schemas.TracerSessionResult, self._experiment).reference_dataset_id
  1297. )
  1298. @property
  1299. def evaluation_results(self) -> Iterable[EvaluationResults]:
  1300. if self._evaluation_results is None:
  1301. return ({"results": []} for _ in self.examples)
  1302. return self._evaluation_results
  1303. @property
  1304. def runs(self) -> Iterable[schemas.Run]:
  1305. if self._runs is None:
  1306. raise ValueError(
  1307. "Runs not provided in this experiment. Please predict first."
  1308. )
  1309. self._runs, runs_iter = itertools.tee(self._runs)
  1310. return runs_iter
  1311. def start(self) -> _ExperimentManager:
  1312. first_example = next(itertools.islice(self.examples, 1))
  1313. project = self._get_project(first_example) if self._upload_results else None
  1314. self._print_experiment_start(project, first_example)
  1315. self._metadata["num_repetitions"] = self._num_repetitions
  1316. return self._copy(self.examples, experiment=project)
  1317. def with_predictions(
  1318. self,
  1319. target: TARGET_T,
  1320. /,
  1321. max_concurrency: Optional[int] = None,
  1322. ) -> _ExperimentManager:
  1323. """Lazily apply the target function to the experiment."""
  1324. context = copy_context()
  1325. _experiment_results = context.run(
  1326. self._predict,
  1327. target,
  1328. max_concurrency=max_concurrency,
  1329. include_attachments=_target_include_attachments(target),
  1330. )
  1331. r1, r2 = itertools.tee(_experiment_results, 2)
  1332. return self._copy(
  1333. (pred["example"] for pred in r1), runs=(pred["run"] for pred in r2)
  1334. )
  1335. def with_evaluators(
  1336. self,
  1337. evaluators: Sequence[
  1338. Union[
  1339. EVALUATOR_T,
  1340. RunEvaluator,
  1341. ]
  1342. ],
  1343. *,
  1344. max_concurrency: Optional[int] = None,
  1345. ) -> _ExperimentManager:
  1346. """Lazily apply the provided evaluators to the experiment."""
  1347. evaluators = _resolve_evaluators(evaluators)
  1348. context = copy_context()
  1349. experiment_results = context.run(
  1350. self._score, evaluators, max_concurrency=max_concurrency
  1351. )
  1352. # Split the generator into three so the manager
  1353. # can consume each value individually.
  1354. r1, r2, r3 = itertools.tee(experiment_results, 3)
  1355. return self._copy(
  1356. (result["example"] for result in r1),
  1357. runs=(result["run"] for result in r2),
  1358. evaluation_results=(result["evaluation_results"] for result in r3),
  1359. )
  1360. def with_summary_evaluators(
  1361. self,
  1362. summary_evaluators: Sequence[SUMMARY_EVALUATOR_T],
  1363. ) -> _ExperimentManager:
  1364. """Lazily apply the provided summary evaluators to the experiment."""
  1365. wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators)
  1366. context = copy_context()
  1367. aggregate_feedback_gen = context.run(
  1368. self._apply_summary_evaluators, wrapped_evaluators
  1369. )
  1370. return self._copy(
  1371. self.examples, runs=self.runs, summary_results=aggregate_feedback_gen
  1372. )
  1373. def get_results(self) -> Iterable[ExperimentResultRow]:
  1374. """Return the traces, evaluation results, and associated examples."""
  1375. for run, example, evaluation_results in zip(
  1376. self.runs, self.examples, self.evaluation_results
  1377. ):
  1378. yield ExperimentResultRow(
  1379. run=run,
  1380. example=example,
  1381. evaluation_results=evaluation_results,
  1382. )
  1383. def get_summary_scores(self) -> dict[str, list[dict]]:
  1384. """If `summary_evaluators` were applied, consume and return the results."""
  1385. if self._summary_results is None:
  1386. return {"results": []}
  1387. # Consume the generator
  1388. return {
  1389. "results": [
  1390. res # type: ignore[misc]
  1391. for results in self._summary_results
  1392. for res in results["results"]
  1393. ]
  1394. }
  1395. # Private methods
  1396. def _predict(
  1397. self,
  1398. target: TARGET_T,
  1399. /,
  1400. max_concurrency: Optional[int] = None,
  1401. include_attachments: bool = False,
  1402. ) -> Generator[_ForwardResults, None, None]:
  1403. """Run the target function on the examples."""
  1404. fn = _ensure_traceable(target)
  1405. if max_concurrency == 0:
  1406. for example in self.examples:
  1407. yield _forward(
  1408. fn,
  1409. example,
  1410. self.experiment_name,
  1411. self._metadata,
  1412. self.client,
  1413. self._upload_results,
  1414. include_attachments,
  1415. self._error_handling,
  1416. )
  1417. else:
  1418. with ls_utils.ContextThreadPoolExecutor(max_concurrency) as executor:
  1419. futures = [
  1420. executor.submit(
  1421. _forward,
  1422. fn,
  1423. example,
  1424. self.experiment_name,
  1425. self._metadata,
  1426. self.client,
  1427. self._upload_results,
  1428. include_attachments,
  1429. self._error_handling,
  1430. )
  1431. for example in self.examples
  1432. ]
  1433. for future in cf.as_completed(futures):
  1434. yield future.result()
  1435. # Close out the project.
  1436. self._end()
  1437. def _run_evaluators(
  1438. self,
  1439. evaluators: Sequence[RunEvaluator],
  1440. current_results: ExperimentResultRow,
  1441. executor: cf.ThreadPoolExecutor,
  1442. ) -> ExperimentResultRow:
  1443. current_context = rh.get_tracing_context()
  1444. metadata = {
  1445. **(current_context["metadata"] or {}),
  1446. **{
  1447. "experiment": self.experiment_name,
  1448. "reference_example_id": current_results["example"].id,
  1449. "reference_run_id": current_results["run"].id,
  1450. },
  1451. }
  1452. with rh.tracing_context(
  1453. **{
  1454. **current_context,
  1455. "project_name": "evaluators",
  1456. "metadata": metadata,
  1457. "enabled": "local" if not self._upload_results else True,
  1458. "client": self.client,
  1459. }
  1460. ):
  1461. run = current_results["run"]
  1462. example = current_results["example"]
  1463. eval_results = current_results["evaluation_results"]
  1464. for evaluator in evaluators:
  1465. evaluator_run_id = uuid.uuid4()
  1466. try:
  1467. evaluator_response = evaluator.evaluate_run( # type: ignore[call-arg]
  1468. run=run,
  1469. example=example,
  1470. evaluator_run_id=evaluator_run_id,
  1471. )
  1472. eval_results["results"].extend(
  1473. self.client._select_eval_results(evaluator_response)
  1474. )
  1475. if self._upload_results:
  1476. # TODO: This is a hack
  1477. self.client._log_evaluation_feedback(
  1478. evaluator_response, run=run, _executor=executor
  1479. )
  1480. except Exception as e:
  1481. try:
  1482. feedback_keys = _extract_feedback_keys(evaluator)
  1483. error_response = EvaluationResults(
  1484. results=[
  1485. EvaluationResult(
  1486. key=key,
  1487. source_run_id=evaluator_run_id,
  1488. comment=repr(e),
  1489. extra={"error": True},
  1490. )
  1491. for key in feedback_keys
  1492. ]
  1493. )
  1494. eval_results["results"].extend(
  1495. self.client._select_eval_results(error_response)
  1496. )
  1497. if self._upload_results:
  1498. # TODO: This is a hack
  1499. self.client._log_evaluation_feedback(
  1500. error_response, run=run, _executor=executor
  1501. )
  1502. except Exception as e2:
  1503. logger.debug(f"Error parsing feedback keys: {e2}")
  1504. pass
  1505. logger.error(
  1506. f"Error running evaluator {repr(evaluator)} on"
  1507. f" run {run.id if run else ''}: {repr(e)}",
  1508. exc_info=True,
  1509. )
  1510. if example.attachments is not None:
  1511. for attachment in example.attachments:
  1512. reader = example.attachments[attachment]["reader"]
  1513. reader.seek(0)
  1514. return ExperimentResultRow(
  1515. run=run,
  1516. example=example,
  1517. evaluation_results=eval_results,
  1518. )
  1519. def _score(
  1520. self,
  1521. evaluators: Sequence[RunEvaluator],
  1522. max_concurrency: Optional[int] = None,
  1523. ) -> Iterable[ExperimentResultRow]:
  1524. """Run the evaluators on the prediction stream.
  1525. Expects runs to be available in the manager.
  1526. (e.g. from a previous prediction step)
  1527. """
  1528. with ls_utils.ContextThreadPoolExecutor(
  1529. max_workers=max_concurrency or 1
  1530. ) as executor:
  1531. if max_concurrency == 0:
  1532. context = copy_context()
  1533. for current_results in self.get_results():
  1534. yield context.run(
  1535. self._run_evaluators,
  1536. evaluators,
  1537. current_results,
  1538. executor,
  1539. )
  1540. else:
  1541. futures = set()
  1542. for current_results in self.get_results():
  1543. futures.add(
  1544. executor.submit(
  1545. self._run_evaluators,
  1546. evaluators,
  1547. current_results,
  1548. executor,
  1549. )
  1550. )
  1551. try:
  1552. # Since prediction may be slow, yield (with a timeout) to
  1553. # allow for early results to be emitted.
  1554. for future in cf.as_completed(futures, timeout=0.001):
  1555. yield future.result()
  1556. futures.remove(future)
  1557. except (cf.TimeoutError, TimeoutError):
  1558. pass
  1559. for future in cf.as_completed(futures):
  1560. result = future.result()
  1561. yield result
  1562. def _apply_summary_evaluators(
  1563. self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
  1564. ) -> Generator[EvaluationResults, None, None]:
  1565. runs, examples = [], []
  1566. for run, example in zip(self.runs, self.examples):
  1567. runs.append(run)
  1568. examples.append(example)
  1569. aggregate_feedback = []
  1570. with ls_utils.ContextThreadPoolExecutor() as executor:
  1571. project_id = self._get_experiment().id if self._upload_results else None
  1572. current_context = rh.get_tracing_context()
  1573. metadata = {
  1574. **(current_context["metadata"] or {}),
  1575. **{
  1576. "experiment": self.experiment_name,
  1577. "experiment_id": project_id,
  1578. },
  1579. }
  1580. with rh.tracing_context(
  1581. **{
  1582. **current_context,
  1583. "project_name": "evaluators",
  1584. "metadata": metadata,
  1585. "client": self.client,
  1586. "enabled": "local" if not self._upload_results else True,
  1587. }
  1588. ):
  1589. for evaluator in summary_evaluators:
  1590. try:
  1591. summary_eval_result = evaluator(runs, examples)
  1592. # TODO: Expose public API for this.
  1593. flattened_results = self.client._select_eval_results(
  1594. summary_eval_result,
  1595. fn_name=evaluator.__name__,
  1596. )
  1597. aggregate_feedback.extend(flattened_results)
  1598. if self._upload_results:
  1599. for result in flattened_results:
  1600. feedback = result.dict(exclude={"target_run_id"})
  1601. evaluator_info = feedback.pop("evaluator_info", None)
  1602. executor.submit(
  1603. self.client.create_feedback,
  1604. **feedback,
  1605. run_id=None,
  1606. project_id=project_id,
  1607. source_info=evaluator_info,
  1608. )
  1609. except Exception as e:
  1610. logger.error(
  1611. f"Error running summary evaluator {repr(evaluator)}: {e}",
  1612. exc_info=True,
  1613. )
  1614. yield {"results": aggregate_feedback}
  1615. def _get_dataset_version(self) -> Optional[str]:
  1616. examples = list(self.examples)
  1617. modified_at = [ex.modified_at for ex in examples if ex.modified_at]
  1618. # Should always be defined in practice when fetched,
  1619. # but the typing permits None
  1620. max_modified_at = max(modified_at) if modified_at else None
  1621. return max_modified_at.isoformat() if max_modified_at else None
  1622. def _get_dataset_splits(self) -> Optional[list[str]]:
  1623. examples = list(self.examples)
  1624. splits = set()
  1625. for example in examples:
  1626. if (
  1627. example.metadata
  1628. and example.metadata.get("dataset_split")
  1629. and isinstance(example.metadata["dataset_split"], list)
  1630. ):
  1631. for split in example.metadata["dataset_split"]:
  1632. if isinstance(split, str):
  1633. splits.add(split)
  1634. else:
  1635. splits.add("base")
  1636. return list(splits)
  1637. def _end(self) -> None:
  1638. if not self._upload_results:
  1639. return
  1640. experiment = self._experiment
  1641. if experiment is None:
  1642. raise ValueError("Experiment not started yet.")
  1643. project_metadata = self._get_experiment_metadata()
  1644. project_metadata["dataset_version"] = self._get_dataset_version()
  1645. project_metadata["dataset_splits"] = self._get_dataset_splits()
  1646. self.client.update_project(
  1647. experiment.id,
  1648. metadata={
  1649. **experiment.metadata,
  1650. **project_metadata,
  1651. },
  1652. )
  1653. def _copy(self, *args: Any, **kwargs: Any) -> _ExperimentManager:
  1654. default_args = (self._data,)
  1655. default_kwargs = {
  1656. "experiment": self._experiment,
  1657. "metadata": self._metadata,
  1658. "runs": self._runs,
  1659. "client": self.client,
  1660. "evaluation_results": self._evaluation_results,
  1661. "summary_results": self._summary_results,
  1662. "include_attachments": self._include_attachments,
  1663. "reuse_attachments": self._reuse_attachments,
  1664. "upload_results": self._upload_results,
  1665. "attachment_raw_data_dict": self._attachment_raw_data_dict,
  1666. "error_handling": self._error_handling,
  1667. }
  1668. full_args = list(args) + list(default_args[len(args) :])
  1669. full_kwargs = {**default_kwargs, **kwargs}
  1670. return self.__class__(*full_args, **full_kwargs)
  1671. def _resolve_evaluators(
  1672. evaluators: Sequence[Union[EVALUATOR_T, RunEvaluator, AEVALUATOR_T]],
  1673. ) -> Sequence[RunEvaluator]:
  1674. results = []
  1675. for evaluator in evaluators:
  1676. if isinstance(evaluator, RunEvaluator):
  1677. results.append(evaluator)
  1678. elif isinstance(evaluator, LangChainStringEvaluator):
  1679. results.append(evaluator.as_run_evaluator())
  1680. else:
  1681. results.append(run_evaluator(evaluator))
  1682. return results
  1683. def _wrap_summary_evaluators(
  1684. evaluators: Sequence[SUMMARY_EVALUATOR_T],
  1685. ) -> list[SUMMARY_EVALUATOR_T]:
  1686. def _wrap(evaluator: SUMMARY_EVALUATOR_T) -> SUMMARY_EVALUATOR_T:
  1687. eval_name = getattr(evaluator, "__name__", "BatchEvaluator")
  1688. evaluator = _normalize_summary_evaluator(evaluator)
  1689. @functools.wraps(evaluator)
  1690. def _wrapper_inner(
  1691. runs: Sequence[schemas.Run], examples: Sequence[schemas.Example]
  1692. ) -> Union[EvaluationResult, EvaluationResults]:
  1693. @rh.traceable(name=eval_name)
  1694. def _wrapper_super_inner(
  1695. runs_: str, examples_: str
  1696. ) -> Union[EvaluationResult, EvaluationResults]:
  1697. return evaluator(list(runs), list(examples))
  1698. return _wrapper_super_inner(
  1699. f"Runs[] (Length={len(runs)})", f"Examples[] (Length={len(examples)})"
  1700. )
  1701. return _wrapper_inner
  1702. results = []
  1703. for evaluator in evaluators:
  1704. results.append(_wrap(evaluator))
  1705. return results
  1706. class _ForwardResults(TypedDict):
  1707. run: schemas.Run
  1708. example: schemas.Example
  1709. def _forward(
  1710. fn: rh.SupportsLangsmithExtra,
  1711. example: schemas.Example,
  1712. experiment_name: str,
  1713. metadata: dict,
  1714. client: langsmith.Client,
  1715. upload_results: bool,
  1716. include_attachments: bool = False,
  1717. error_handling: Literal["log", "ignore"] = "log",
  1718. ) -> _ForwardResults:
  1719. run: Optional[schemas.RunBase] = None
  1720. def _get_run(r: rt.RunTree) -> None:
  1721. nonlocal run
  1722. run = r
  1723. def _set_reference_example_id(r: rt.RunTree) -> None:
  1724. r.reference_example_id = example.id
  1725. example_version = (example.modified_at or example.created_at).isoformat()
  1726. langsmith_extra = rh.LangSmithExtra(
  1727. on_end=_get_run,
  1728. project_name=experiment_name,
  1729. metadata={**metadata, "example_version": example_version},
  1730. client=client,
  1731. )
  1732. if error_handling == "log":
  1733. langsmith_extra["reference_example_id"] = example.id
  1734. elif error_handling == "ignore":
  1735. # Only set the reference_example_id if the run succeeds.
  1736. langsmith_extra["_on_success"] = _set_reference_example_id
  1737. else:
  1738. raise ValueError(f"Unrecognized error_handling value: {error_handling=}")
  1739. with rh.tracing_context(enabled="local" if not upload_results else True):
  1740. try:
  1741. arg_names = _get_target_args(fn)
  1742. args = [getattr(example, argn) for argn in arg_names]
  1743. fn(*args, langsmith_extra=langsmith_extra)
  1744. # Reset attachment readers if attachments were used.
  1745. if include_attachments and example.attachments is not None:
  1746. for attachment in example.attachments:
  1747. reader = example.attachments[attachment]["reader"]
  1748. reader.seek(0)
  1749. except Exception as e:
  1750. logger.error(
  1751. f"Error running target function: {e}", exc_info=True, stacklevel=1
  1752. )
  1753. return _ForwardResults(run=cast(schemas.Run, run), example=example)
  1754. def _is_valid_uuid(value: str) -> bool:
  1755. try:
  1756. uuid.UUID(value)
  1757. return True
  1758. except ValueError:
  1759. return False
  1760. def _resolve_data(
  1761. data: DATA_T,
  1762. *,
  1763. client: langsmith.Client,
  1764. include_attachments: bool = False,
  1765. ) -> Iterable[schemas.Example]:
  1766. """Return the examples for the given dataset."""
  1767. if isinstance(data, uuid.UUID):
  1768. return client.list_examples(
  1769. dataset_id=data, include_attachments=include_attachments
  1770. )
  1771. elif isinstance(data, str) and _is_valid_uuid(data):
  1772. return client.list_examples(
  1773. dataset_id=uuid.UUID(data), include_attachments=include_attachments
  1774. )
  1775. elif isinstance(data, str):
  1776. return client.list_examples(
  1777. dataset_name=data, include_attachments=include_attachments
  1778. )
  1779. elif isinstance(data, schemas.Dataset):
  1780. return client.list_examples(
  1781. dataset_id=data.id, include_attachments=include_attachments
  1782. )
  1783. return data
  1784. def _ensure_traceable(
  1785. target: TARGET_T | rh.SupportsLangsmithExtra[[dict], dict] | Runnable,
  1786. ) -> rh.SupportsLangsmithExtra[[dict], dict]:
  1787. """Ensure the target function is traceable."""
  1788. if not _is_callable(target):
  1789. raise ValueError(
  1790. "Target must be a callable function or a langchain/langgraph object. For "
  1791. "example:\n\n"
  1792. "def predict(inputs: dict) -> dict:\n"
  1793. " # do work, like chain.invoke(inputs)\n"
  1794. " return {...}\n\n"
  1795. "evaluate(\n"
  1796. " predict,\n"
  1797. " ...\n"
  1798. ")"
  1799. )
  1800. if rh.is_traceable_function(target):
  1801. fn: rh.SupportsLangsmithExtra[[dict], dict] = target
  1802. else:
  1803. if _is_langchain_runnable(target):
  1804. target = target.invoke # type: ignore[union-attr]
  1805. fn = rh.traceable(name="Target")(cast(Callable, target))
  1806. return fn
  1807. def _include_attachments(target: Any, evaluators: Optional[Sequence]) -> bool:
  1808. return _target_include_attachments(target) or bool(
  1809. _evaluators_include_attachments(evaluators)
  1810. )
  1811. def _evaluators_include_attachments(evaluators: Optional[Sequence]) -> int:
  1812. if evaluators is None:
  1813. return 0
  1814. return sum(_evaluator_uses_attachments(e) for e in evaluators)
  1815. def _evaluator_uses_attachments(evaluator: Any) -> bool:
  1816. if not callable(evaluator):
  1817. return False
  1818. sig = inspect.signature(evaluator)
  1819. params = list(sig.parameters.values())
  1820. positional_params = [
  1821. p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
  1822. ]
  1823. return any(p.name == "attachments" for p in positional_params)
  1824. def _target_include_attachments(target: Any) -> bool:
  1825. """Whether the target function accepts attachments."""
  1826. return "attachments" in _get_target_args(target)
  1827. def _get_target_args(target: Any) -> list[str]:
  1828. """Whether the target function accepts attachments."""
  1829. if not callable(target):
  1830. return []
  1831. if _is_langchain_runnable(target):
  1832. return ["inputs"]
  1833. # Check function signature
  1834. sig = inspect.signature(target)
  1835. params = list(sig.parameters.values())
  1836. positional_params = [
  1837. p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
  1838. ]
  1839. positional_no_default = [p for p in positional_params if p.default is p.empty]
  1840. if len(positional_params) == 0:
  1841. raise ValueError(
  1842. "Target function must accept at least one positional argument (inputs)."
  1843. )
  1844. elif len(positional_no_default) > 3:
  1845. raise ValueError(
  1846. "Target function must accept at most three "
  1847. "arguments without default values: (inputs, attachments, metadata)."
  1848. )
  1849. elif len(positional_no_default) > 1 and {
  1850. p.name for p in positional_no_default
  1851. }.difference(["inputs", "attachments", "metadata"]):
  1852. raise ValueError(
  1853. "When passing multiple positional arguments without default values, they "
  1854. "must be named 'inputs', 'attachments', or 'metadata'. Received: "
  1855. f"{[p.name for p in positional_no_default]}"
  1856. )
  1857. else:
  1858. args = []
  1859. for p in positional_params[:3]:
  1860. if p.name in {"inputs", "attachments", "metadata"}:
  1861. args.append(p.name)
  1862. else:
  1863. break
  1864. return args or ["inputs"]
  1865. def _resolve_experiment(
  1866. experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]],
  1867. runs: Optional[Iterable[schemas.Run]],
  1868. client: langsmith.Client,
  1869. ) -> tuple[
  1870. Optional[Union[schemas.TracerSession, str]], Optional[Iterable[schemas.Run]]
  1871. ]:
  1872. # TODO: Remove this, handle outside the manager
  1873. if experiment is not None:
  1874. if isinstance(experiment, schemas.TracerSession):
  1875. experiment_ = experiment
  1876. else:
  1877. experiment_ = _load_experiment(experiment, client)
  1878. if not experiment_.name:
  1879. raise ValueError("Experiment name must be defined if provided.")
  1880. if not experiment_.reference_dataset_id:
  1881. raise ValueError(
  1882. "Experiment must have an associated reference_dataset_id, "
  1883. "but none was provided."
  1884. )
  1885. return experiment_, runs
  1886. # If we have runs, that means the experiment was already started.
  1887. if runs is not None:
  1888. runs_, runs = itertools.tee(runs)
  1889. first_run = next(runs_)
  1890. experiment_ = client.read_project(project_id=first_run.session_id)
  1891. if not experiment_.name:
  1892. raise ValueError("Experiment name not found for provided runs.")
  1893. return experiment_, runs
  1894. return None, None
  1895. def _get_random_name() -> str:
  1896. from langsmith.evaluation._name_generation import random_name # noqa: F401
  1897. return random_name()
  1898. def _extract_feedback_keys(evaluator: RunEvaluator):
  1899. if isinstance(evaluator, DynamicRunEvaluator):
  1900. if getattr(evaluator, "func", None):
  1901. return _extract_code_evaluator_feedback_keys(evaluator.func)
  1902. elif getattr(evaluator, "afunc", None):
  1903. return _extract_code_evaluator_feedback_keys(evaluator.afunc)
  1904. # TODO: Support for DynamicComparisonRunEvaluator
  1905. if hasattr(evaluator, "evaluator"):
  1906. # LangChainStringEvaluator
  1907. if getattr(getattr(evaluator, "evaluator"), "evaluation_name", None):
  1908. return [evaluator.evaluator.evaluation_name]
  1909. return []
  1910. def _extract_code_evaluator_feedback_keys(func: Callable) -> list[str]:
  1911. python_code = inspect.getsource(func)
  1912. def extract_dict_keys(node):
  1913. if isinstance(node, ast.Dict):
  1914. keys = []
  1915. key_value = None
  1916. for key, value in zip(node.keys, node.values):
  1917. if isinstance(key, (ast.Str, ast.Constant)):
  1918. key_str = key.s if isinstance(key, ast.Str) else key.value
  1919. if key_str == "key" and isinstance(value, (ast.Str, ast.Constant)):
  1920. key_value = (
  1921. value.s if isinstance(value, ast.Str) else value.value
  1922. )
  1923. return [key_value] if key_value else keys
  1924. elif (
  1925. isinstance(node, ast.Call)
  1926. and isinstance(node.func, ast.Name)
  1927. and node.func.id == "dict"
  1928. ):
  1929. for keyword in node.keywords:
  1930. if keyword.arg == "key" and isinstance(
  1931. keyword.value, (ast.Str, ast.Constant)
  1932. ):
  1933. return [
  1934. (
  1935. keyword.value.s
  1936. if isinstance(keyword.value, ast.Str)
  1937. else keyword.value.value
  1938. )
  1939. ]
  1940. return []
  1941. def extract_evaluation_result_key(node):
  1942. if (
  1943. isinstance(node, ast.Call)
  1944. and isinstance(node.func, ast.Name)
  1945. and node.func.id == "EvaluationResult"
  1946. ):
  1947. for keyword in node.keywords:
  1948. if keyword.arg == "key" and isinstance(
  1949. keyword.value, (ast.Str, ast.Constant)
  1950. ):
  1951. return [
  1952. (
  1953. keyword.value.s
  1954. if isinstance(keyword.value, ast.Str)
  1955. else keyword.value.value
  1956. )
  1957. ]
  1958. return []
  1959. def extract_evaluation_results_keys(node, variables):
  1960. if (
  1961. isinstance(node, ast.Call)
  1962. and isinstance(node.func, ast.Name)
  1963. and node.func.id == "EvaluationResults"
  1964. ):
  1965. for keyword in node.keywords:
  1966. if keyword.arg == "results":
  1967. if isinstance(keyword.value, ast.Name):
  1968. return variables.get(keyword.value.id, [])
  1969. elif isinstance(keyword.value, ast.List):
  1970. keys = []
  1971. for elt in keyword.value.elts:
  1972. keys.extend(extract_evaluation_result_key(elt))
  1973. return keys
  1974. elif isinstance(node, ast.Dict):
  1975. for key, value in zip(node.keys, node.values):
  1976. if isinstance(key, (ast.Str, ast.Constant)) and key.s == "results":
  1977. if isinstance(value, ast.List):
  1978. keys = []
  1979. for elt in value.elts:
  1980. if isinstance(elt, ast.Dict):
  1981. for elt_key, elt_value in zip(elt.keys, elt.values):
  1982. if (
  1983. isinstance(elt_key, (ast.Str, ast.Constant))
  1984. and elt_key.s == "key"
  1985. ):
  1986. if isinstance(
  1987. elt_value, (ast.Str, ast.Constant)
  1988. ):
  1989. keys.append(elt_value.s)
  1990. elif (
  1991. isinstance(elt, ast.Call)
  1992. and isinstance(elt.func, ast.Name)
  1993. and elt.func.id in ("EvaluationResult", "dict")
  1994. ):
  1995. for keyword in elt.keywords:
  1996. if keyword.arg == "key" and isinstance(
  1997. keyword.value, (ast.Str, ast.Constant)
  1998. ):
  1999. keys.append(
  2000. keyword.value.s
  2001. if isinstance(keyword.value, ast.Str)
  2002. else keyword.value.value
  2003. )
  2004. return keys
  2005. return []
  2006. python_code = textwrap.dedent(python_code)
  2007. try:
  2008. tree = ast.parse(python_code)
  2009. function_def = tree.body[0]
  2010. if not isinstance(function_def, (ast.FunctionDef, ast.AsyncFunctionDef)):
  2011. return []
  2012. variables = {}
  2013. keys = []
  2014. for node in ast.walk(function_def):
  2015. if isinstance(node, ast.Assign):
  2016. if isinstance(node.value, ast.List):
  2017. list_keys = []
  2018. for elt in node.value.elts:
  2019. list_keys.extend(extract_evaluation_result_key(elt))
  2020. if isinstance(node.targets[0], ast.Name):
  2021. variables[node.targets[0].id] = list_keys
  2022. elif isinstance(node, ast.Return) and node.value is not None:
  2023. dict_keys = extract_dict_keys(node.value)
  2024. eval_result_key = extract_evaluation_result_key(node.value)
  2025. eval_results_keys = extract_evaluation_results_keys(
  2026. node.value, variables
  2027. )
  2028. keys.extend(dict_keys)
  2029. keys.extend(eval_result_key)
  2030. keys.extend(eval_results_keys)
  2031. # If no keys found, return the function name
  2032. return keys if keys else [function_def.name]
  2033. except SyntaxError:
  2034. return []
  2035. def _to_pandas(
  2036. results: list[ExperimentResultRow],
  2037. start: Optional[int] = 0,
  2038. end: Optional[int] = None,
  2039. ):
  2040. try:
  2041. import pandas as pd
  2042. except ImportError as e:
  2043. raise ImportError(
  2044. "The 'pandas' library is required to use the 'to_pandas' function. "
  2045. "Please install it using 'pip install pandas' or "
  2046. "'conda install pandas' before calling this method."
  2047. ) from e
  2048. return pd.DataFrame(_flatten_experiment_results(results, start=start, end=end))
  2049. def _flatten_experiment_results(
  2050. results: list[ExperimentResultRow],
  2051. start: Optional[int] = 0,
  2052. end: Optional[int] = None,
  2053. ):
  2054. return [
  2055. {
  2056. **{f"inputs.{k}": v for k, v in (x["example"].inputs or {}).items()},
  2057. **{f"outputs.{k}": v for k, v in (x["run"].outputs or {}).items()},
  2058. "error": x["run"].error,
  2059. **(
  2060. {f"reference.{k}": v for k, v in x["example"].outputs.items()}
  2061. if x["example"].outputs is not None
  2062. else {}
  2063. ),
  2064. **{
  2065. f"feedback.{r.key}": r.score if r.score is not None else r.value
  2066. for r in x["evaluation_results"]["results"]
  2067. },
  2068. "execution_time": (
  2069. (x["run"].end_time - x["run"].start_time).total_seconds()
  2070. if x["run"].end_time
  2071. else None
  2072. ),
  2073. "example_id": x["run"].reference_example_id,
  2074. "id": x["run"].id,
  2075. }
  2076. for x in results[start:end]
  2077. ]
  2078. @functools.lru_cache(maxsize=1)
  2079. def _import_langchain_runnable() -> Optional[type]:
  2080. try:
  2081. from langchain_core.runnables import Runnable
  2082. return Runnable
  2083. except ImportError:
  2084. return None
  2085. def _is_langchain_runnable(o: Any) -> bool:
  2086. return bool((Runnable := _import_langchain_runnable()) and isinstance(o, Runnable))