_arunner.py 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349
  1. """V2 Evaluation Interface."""
  2. from __future__ import annotations
  3. import asyncio
  4. import concurrent.futures as cf
  5. import io
  6. import logging
  7. import pathlib
  8. import uuid
  9. from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Iterable, Sequence
  10. from typing import (
  11. TYPE_CHECKING,
  12. Any,
  13. Callable,
  14. Literal,
  15. Optional,
  16. TypeVar,
  17. Union,
  18. cast,
  19. )
  20. import langsmith
  21. from langsmith import run_helpers as rh
  22. from langsmith import run_trees, schemas
  23. from langsmith import run_trees as rt
  24. from langsmith import utils as ls_utils
  25. from langsmith._internal import _aiter as aitertools
  26. from langsmith._internal._beta_decorator import _warn_once
  27. from langsmith.evaluation._runner import (
  28. AEVALUATOR_T,
  29. DATA_T,
  30. EVALUATOR_T,
  31. ExperimentResultRow,
  32. _evaluators_include_attachments,
  33. _ExperimentManagerMixin,
  34. _extract_feedback_keys,
  35. _ForwardResults,
  36. _get_target_args,
  37. _is_langchain_runnable,
  38. _load_examples_map,
  39. _load_experiment,
  40. _load_tqdm,
  41. _load_traces,
  42. _resolve_data,
  43. _resolve_evaluators,
  44. _resolve_experiment,
  45. _target_include_attachments,
  46. _to_pandas,
  47. _wrap_summary_evaluators,
  48. )
  49. from langsmith.evaluation.evaluator import (
  50. SUMMARY_EVALUATOR_T,
  51. EvaluationResult,
  52. EvaluationResults,
  53. RunEvaluator,
  54. )
  55. if TYPE_CHECKING:
  56. import pandas as pd
  57. from langchain_core.runnables import Runnable
  58. DataFrame = pd.DataFrame
  59. else:
  60. DataFrame = Any
  61. logger = logging.getLogger(__name__)
  62. ATARGET_T = Union[
  63. Callable[[dict], Awaitable[dict]], Callable[[dict, dict], Awaitable[dict]]
  64. ]
  65. async def aevaluate(
  66. target: Union[
  67. ATARGET_T, AsyncIterable[dict], Runnable, str, uuid.UUID, schemas.TracerSession
  68. ],
  69. /,
  70. data: Union[
  71. DATA_T, AsyncIterable[schemas.Example], Iterable[schemas.Example], None
  72. ] = None,
  73. evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]] = None,
  74. summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
  75. metadata: Optional[dict] = None,
  76. experiment_prefix: Optional[str] = None,
  77. description: Optional[str] = None,
  78. max_concurrency: Optional[int] = 0,
  79. num_repetitions: int = 1,
  80. client: Optional[langsmith.Client] = None,
  81. blocking: bool = True,
  82. experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
  83. upload_results: bool = True,
  84. error_handling: Literal["log", "ignore"] = "log",
  85. **kwargs: Any,
  86. ) -> AsyncExperimentResults:
  87. r"""Evaluate an async target system on a given dataset.
  88. Args:
  89. target (AsyncCallable[[dict], dict] | AsyncIterable[dict] | Runnable | EXPERIMENT_T | Tuple[EXPERIMENT_T, EXPERIMENT_T]):
  90. The target system or experiment(s) to evaluate.
  91. Can be an async function that takes a `dict` and returns a `dict`, a
  92. langchain `Runnable`, an existing experiment ID, or a two-tuple of experiment IDs.
  93. data (Union[DATA_T, AsyncIterable[schemas.Example]]): The dataset to evaluate on.
  94. Can be a dataset name, a list of examples, an async generator of examples, or an async iterable of examples.
  95. evaluators (Optional[Sequence[EVALUATOR_T]]): A list of evaluators to run
  96. on each example.
  97. summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): A list of summary
  98. evaluators to run on the entire dataset.
  99. metadata (Optional[dict]): Metadata to attach to the experiment.
  100. experiment_prefix (Optional[str]): A prefix to provide for your experiment name.
  101. description (Optional[str]): A description of the experiment.
  102. max_concurrency (int | None): The maximum number of concurrent
  103. evaluations to run.
  104. If `None` then no limit is set. If `0` then no concurrency.
  105. num_repetitions (int): The number of times to run the evaluation.
  106. Each item in the dataset will be run and evaluated this many times.
  107. client (Optional[langsmith.Client]): The LangSmith client to use.
  108. blocking (bool): Whether to block until the evaluation is complete.
  109. experiment (Optional[schemas.TracerSession]): An existing experiment to
  110. extend.
  111. If provided, `experiment_prefix` is ignored. For advanced usage only.
  112. error_handling (str, default="log"): How to handle individual run errors.
  113. `'log'` will trace the runs with the error message as part of the
  114. experiment, `'ignore'` will not count the run as part of the experiment at
  115. all.
  116. Returns:
  117. An async iterator over the experiment results.
  118. Environment:
  119. - `LANGSMITH_TEST_CACHE`: If set, API calls will be cached to disk to save time and
  120. cost during testing.
  121. Recommended to commit the cache files to your repository for faster CI/CD runs.
  122. Requires the `'langsmith[vcr]'` package to be installed.
  123. Examples:
  124. >>> from typing import Sequence
  125. >>> from langsmith import Client, aevaluate
  126. >>> from langsmith.schemas import Example, Run
  127. >>> client = Client()
  128. >>> dataset = client.clone_public_dataset(
  129. ... "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
  130. ... )
  131. >>> dataset_name = "Evaluate Examples"
  132. Basic usage:
  133. >>> def accuracy(run: Run, example: Example):
  134. ... # Row-level evaluator for accuracy.
  135. ... pred = run.outputs["output"]
  136. ... expected = example.outputs["answer"]
  137. ... return {"score": expected.lower() == pred.lower()}
  138. >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
  139. ... # Experiment-level evaluator for precision.
  140. ... # TP / (TP + FP)
  141. ... predictions = [run.outputs["output"].lower() for run in runs]
  142. ... expected = [example.outputs["answer"].lower() for example in examples]
  143. ... # yes and no are the only possible answers
  144. ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
  145. ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
  146. ... return {"score": tp / (tp + fp)}
  147. >>> import asyncio
  148. >>> async def apredict(inputs: dict) -> dict:
  149. ... # This can be any async function or just an API call to your app.
  150. ... await asyncio.sleep(0.1)
  151. ... return {"output": "Yes"}
  152. >>> results = asyncio.run(
  153. ... aevaluate(
  154. ... apredict,
  155. ... data=dataset_name,
  156. ... evaluators=[accuracy],
  157. ... summary_evaluators=[precision],
  158. ... experiment_prefix="My Experiment",
  159. ... description="Evaluate the accuracy of the model asynchronously.",
  160. ... metadata={
  161. ... "my-prompt-version": "abcd-1234",
  162. ... },
  163. ... )
  164. ... ) # doctest: +ELLIPSIS
  165. View the evaluation results for experiment:...
  166. Evaluating over only a subset of the examples using an async generator:
  167. >>> async def example_generator():
  168. ... examples = client.list_examples(dataset_name=dataset_name, limit=5)
  169. ... for example in examples:
  170. ... yield example
  171. >>> results = asyncio.run(
  172. ... aevaluate(
  173. ... apredict,
  174. ... data=example_generator(),
  175. ... evaluators=[accuracy],
  176. ... summary_evaluators=[precision],
  177. ... experiment_prefix="My Subset Experiment",
  178. ... description="Evaluate a subset of examples asynchronously.",
  179. ... )
  180. ... ) # doctest: +ELLIPSIS
  181. View the evaluation results for experiment:...
  182. Streaming each prediction to more easily + eagerly debug.
  183. >>> results = asyncio.run(
  184. ... aevaluate(
  185. ... apredict,
  186. ... data=dataset_name,
  187. ... evaluators=[accuracy],
  188. ... summary_evaluators=[precision],
  189. ... experiment_prefix="My Streaming Experiment",
  190. ... description="Streaming predictions for debugging.",
  191. ... blocking=False,
  192. ... )
  193. ... ) # doctest: +ELLIPSIS
  194. View the evaluation results for experiment:...
  195. >>> async def aenumerate(iterable):
  196. ... async for elem in iterable:
  197. ... print(elem)
  198. >>> asyncio.run(aenumerate(results))
  199. Running without concurrency:
  200. >>> results = asyncio.run(
  201. ... aevaluate(
  202. ... apredict,
  203. ... data=dataset_name,
  204. ... evaluators=[accuracy],
  205. ... summary_evaluators=[precision],
  206. ... experiment_prefix="My Experiment Without Concurrency",
  207. ... description="This was run without concurrency.",
  208. ... max_concurrency=0,
  209. ... )
  210. ... ) # doctest: +ELLIPSIS
  211. View the evaluation results for experiment:...
  212. Using Async evaluators:
  213. >>> async def helpfulness(run: Run, example: Example):
  214. ... # Row-level evaluator for helpfulness.
  215. ... await asyncio.sleep(5) # Replace with your LLM API call
  216. ... return {"score": run.outputs["output"] == "Yes"}
  217. >>> results = asyncio.run(
  218. ... aevaluate(
  219. ... apredict,
  220. ... data=dataset_name,
  221. ... evaluators=[helpfulness],
  222. ... summary_evaluators=[precision],
  223. ... experiment_prefix="My Helpful Experiment",
  224. ... description="Applying async evaluators example.",
  225. ... )
  226. ... ) # doctest: +ELLIPSIS
  227. View the evaluation results for experiment:...
  228. !!! warning "Behavior changed in `langsmith` 0.2.0"
  229. 'max_concurrency' default updated from None (no limit on concurrency)
  230. to 0 (no concurrency at all).
  231. """ # noqa: E501
  232. if isinstance(target, (str, uuid.UUID, schemas.TracerSession)):
  233. invalid_args = {
  234. "num_repetitions": num_repetitions > 1,
  235. "experiment": bool(experiment),
  236. "upload_results": not upload_results,
  237. "experiment_prefix": bool(experiment_prefix),
  238. "data": bool(data),
  239. }
  240. if any(invalid_args.values()):
  241. msg = (
  242. f"Received invalid arguments. "
  243. f"{tuple(k for k, v in invalid_args.items() if v)} should not be "
  244. f"specified when target is an existing experiment."
  245. )
  246. raise ValueError(msg)
  247. target_id = target if isinstance(target, (str, uuid.UUID)) else target.id
  248. logger.debug(f"Running evaluation over existing experiment {target_id}...")
  249. return await aevaluate_existing(
  250. target,
  251. evaluators=evaluators,
  252. summary_evaluators=summary_evaluators,
  253. metadata=metadata,
  254. max_concurrency=max_concurrency,
  255. client=client,
  256. blocking=blocking,
  257. **kwargs,
  258. )
  259. elif isinstance(target, (list, tuple)):
  260. msg = (
  261. "Running a comparison of two existing experiments asynchronously is not "
  262. "currently supported. Please use the `evaluate()` method instead and make "
  263. "sure that your evaluators are defined as synchronous functions."
  264. )
  265. raise ValueError(msg)
  266. elif kwargs:
  267. msg = (
  268. f"Received unsupported arguments {kwargs}. These arguments are not "
  269. f"supported when creating a new experiment."
  270. )
  271. raise ValueError(msg)
  272. elif not data:
  273. msg = "Must specify 'data' when running evaluations over a target function."
  274. raise ValueError(msg)
  275. elif experiment and experiment_prefix:
  276. msg = (
  277. "Expected at most one of 'experiment' or 'experiment_prefix',"
  278. " but both were provided. "
  279. f"Got: experiment={experiment}, experiment_prefix={experiment_prefix}"
  280. )
  281. raise ValueError(msg)
  282. else:
  283. if not upload_results:
  284. _warn_once("'upload_results' parameter is in beta.")
  285. logger.debug(f"Running evaluation over target system {target}...")
  286. return await _aevaluate(
  287. target,
  288. data=data,
  289. evaluators=evaluators,
  290. summary_evaluators=summary_evaluators,
  291. metadata=metadata,
  292. experiment_prefix=experiment_prefix,
  293. description=description,
  294. max_concurrency=max_concurrency,
  295. num_repetitions=num_repetitions,
  296. client=client,
  297. blocking=blocking,
  298. experiment=experiment,
  299. upload_results=upload_results,
  300. error_handling=error_handling,
  301. )
  302. async def aevaluate_existing(
  303. experiment: Union[str, uuid.UUID, schemas.TracerSession],
  304. /,
  305. evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]] = None,
  306. summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
  307. metadata: Optional[dict] = None,
  308. max_concurrency: Optional[int] = 0,
  309. client: Optional[langsmith.Client] = None,
  310. load_nested: bool = False,
  311. blocking: bool = True,
  312. ) -> AsyncExperimentResults:
  313. r"""Evaluate existing experiment runs asynchronously.
  314. Args:
  315. experiment (Union[str, uuid.UUID]): The identifier of the experiment to evaluate.
  316. evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation.
  317. summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): Optional sequence of evaluators
  318. to apply over the entire dataset.
  319. metadata (Optional[dict]): Optional metadata to include in the evaluation results.
  320. max_concurrency (int | None): The maximum number of concurrent
  321. evaluations to run.
  322. If `None` then no limit is set. If `0` then no concurrency.
  323. client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation.
  324. load_nested: Whether to load all child runs for the experiment.
  325. Default is to only load the top-level root runs.
  326. blocking (bool): Whether to block until evaluation is complete.
  327. Returns:
  328. An async iterator over the experiment results.
  329. Examples:
  330. Define your evaluators
  331. >>> from typing import Sequence
  332. >>> from langsmith.schemas import Example, Run
  333. >>> def accuracy(run: Run, example: Example):
  334. ... # Row-level evaluator for accuracy.
  335. ... pred = run.outputs["output"]
  336. ... expected = example.outputs["answer"]
  337. ... return {"score": expected.lower() == pred.lower()}
  338. >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
  339. ... # Experiment-level evaluator for precision.
  340. ... # TP / (TP + FP)
  341. ... predictions = [run.outputs["output"].lower() for run in runs]
  342. ... expected = [example.outputs["answer"].lower() for example in examples]
  343. ... # yes and no are the only possible answers
  344. ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
  345. ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
  346. ... return {"score": tp / (tp + fp)}
  347. Load the experiment and run the evaluation.
  348. >>> import asyncio
  349. >>> import uuid
  350. >>> from langsmith import Client, aevaluate, aevaluate_existing
  351. >>> client = Client()
  352. >>> dataset_name = "__doctest_aevaluate_existing_" + uuid.uuid4().hex[:8]
  353. >>> dataset = client.create_dataset(dataset_name)
  354. >>> example = client.create_example(
  355. ... inputs={"question": "What is 2+2?"},
  356. ... outputs={"answer": "4"},
  357. ... dataset_id=dataset.id,
  358. ... )
  359. >>> async def apredict(inputs: dict) -> dict:
  360. ... await asyncio.sleep(0.001)
  361. ... return {"output": "4"}
  362. >>> results = asyncio.run(
  363. ... aevaluate(
  364. ... apredict, data=dataset_name, experiment_prefix="doctest_experiment"
  365. ... )
  366. ... ) # doctest: +ELLIPSIS
  367. View the evaluation results for experiment:...
  368. >>> experiment_id = results.experiment_name
  369. >>> # Consume all results to ensure evaluation is complete
  370. >>> async def consume_results():
  371. ... result_list = [r async for r in results]
  372. ... return len(result_list) > 0
  373. >>> asyncio.run(consume_results())
  374. True
  375. >>> import time
  376. >>> time.sleep(3)
  377. >>> results = asyncio.run(
  378. ... aevaluate_existing(
  379. ... experiment_id,
  380. ... evaluators=[accuracy],
  381. ... summary_evaluators=[precision],
  382. ... )
  383. ... ) # doctest: +ELLIPSIS
  384. View the evaluation results for experiment:...
  385. >>> client.delete_dataset(dataset_id=dataset.id)
  386. """ # noqa: E501
  387. client = client or run_trees.get_cached_client()
  388. project = (
  389. experiment
  390. if isinstance(experiment, schemas.TracerSession)
  391. else (await aitertools.aio_to_thread(_load_experiment, experiment, client))
  392. )
  393. runs = await aitertools.aio_to_thread(
  394. _load_traces, experiment, client, load_nested=load_nested
  395. )
  396. data_map = await aitertools.aio_to_thread(_load_examples_map, client, project)
  397. data = [data_map[run.reference_example_id] for run in runs]
  398. return await _aevaluate(
  399. runs,
  400. data=data,
  401. evaluators=evaluators,
  402. summary_evaluators=summary_evaluators,
  403. metadata=metadata,
  404. max_concurrency=max_concurrency,
  405. client=client,
  406. blocking=blocking,
  407. experiment=project,
  408. )
  409. async def _aevaluate(
  410. target: Union[ATARGET_T, AsyncIterable[dict], Iterable[schemas.Run], Runnable],
  411. /,
  412. data: Union[DATA_T, AsyncIterable[schemas.Example]],
  413. evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]] = None,
  414. summary_evaluators: Optional[Sequence[SUMMARY_EVALUATOR_T]] = None,
  415. metadata: Optional[dict] = None,
  416. experiment_prefix: Optional[str] = None,
  417. description: Optional[str] = None,
  418. max_concurrency: Optional[int] = None,
  419. num_repetitions: int = 1,
  420. client: Optional[langsmith.Client] = None,
  421. blocking: bool = True,
  422. experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]] = None,
  423. upload_results: bool = True,
  424. error_handling: Literal["log", "ignore"] = "log",
  425. ) -> AsyncExperimentResults:
  426. is_async_target = (
  427. asyncio.iscoroutinefunction(target)
  428. or (hasattr(target, "__aiter__") and asyncio.iscoroutine(target.__aiter__()))
  429. or _is_langchain_runnable(target)
  430. )
  431. client = client or rt.get_cached_client()
  432. runs = None if is_async_target else cast(Iterable[schemas.Run], target)
  433. experiment_, runs = await aitertools.aio_to_thread(
  434. _resolve_experiment,
  435. experiment,
  436. runs,
  437. client,
  438. )
  439. num_include_attachments = int(
  440. _target_include_attachments(target)
  441. ) + _evaluators_include_attachments(evaluators)
  442. manager = await _AsyncExperimentManager(
  443. data,
  444. client=client,
  445. metadata=metadata,
  446. experiment=experiment_ or experiment_prefix,
  447. description=description,
  448. num_repetitions=num_repetitions,
  449. runs=runs,
  450. include_attachments=num_include_attachments > 0,
  451. reuse_attachments=num_repetitions * num_include_attachments > 1,
  452. upload_results=upload_results,
  453. error_handling=error_handling,
  454. ).astart()
  455. cache_dir = ls_utils.get_cache_dir(None)
  456. if cache_dir is not None:
  457. dsid = await manager.get_dataset_id()
  458. cache_path = pathlib.Path(cache_dir) / f"{dsid}.yaml"
  459. else:
  460. cache_path = None
  461. with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
  462. if is_async_target:
  463. if evaluators:
  464. # Run predictions and evaluations in a single pipeline
  465. manager = await manager.awith_predictions_and_evaluators(
  466. cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency
  467. )
  468. else:
  469. manager = await manager.awith_predictions(
  470. cast(ATARGET_T, target), max_concurrency=max_concurrency
  471. )
  472. if summary_evaluators:
  473. manager = await manager.awith_summary_evaluators(summary_evaluators)
  474. else:
  475. if evaluators:
  476. manager = await manager.awith_evaluators(
  477. evaluators, max_concurrency=max_concurrency
  478. )
  479. if summary_evaluators:
  480. manager = await manager.awith_summary_evaluators(summary_evaluators)
  481. results = AsyncExperimentResults(manager)
  482. if blocking:
  483. await results.wait()
  484. return results
  485. class _AsyncExperimentManager(_ExperimentManagerMixin):
  486. """Manage the execution of experiments asynchronously.
  487. Supports lazily running predictions and evaluations in parallel to facilitate
  488. result streaming and early debugging.
  489. Args:
  490. data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR
  491. a generator of examples.
  492. runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment
  493. predictions.
  494. experiment (Optional[schemas.TracerSession]): The tracer session
  495. associated with the experiment.
  496. experiment_prefix (Optional[str]): The prefix for the experiment name.
  497. description (Optional[str]): The description for the experiment.
  498. metadata (Optional[dict]): Additional metadata for the experiment.
  499. client (Optional[langsmith.Client]): The Langsmith client used for
  500. the experiment.
  501. evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation
  502. sresults for the experiment.
  503. summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results
  504. for the experiment.
  505. num_repetitions (Optional[int], default=1): The number of repetitions for
  506. the experiment.
  507. include_attachments (Optional[bool], default=False): Whether to include
  508. attachments. This is used for when we pull the examples for the experiment.
  509. reuse_attachments (Optional[bool], default=False): Whether to reuse attachments
  510. from examples. This is True if we need to reuse attachments across multiple
  511. target/evaluator functions.
  512. upload_results (Optional[bool], default=True): Whether to upload results
  513. to Langsmith.
  514. attachment_raw_data_dict (Optional[dict]): A dictionary to store raw data
  515. for attachments. Only used if we reuse attachments across multiple
  516. target/evaluator functions.
  517. error_handling (str, default="log"): How to handle individual run errors.
  518. `'log'` will trace the runs with the error message as part of the
  519. experiment, `'ignore'` will not count the run as part of the experiment at
  520. all.
  521. """
  522. def __init__(
  523. self,
  524. data: Union[DATA_T, AsyncIterable[schemas.Example]],
  525. /,
  526. experiment: Optional[Union[schemas.TracerSession, str]] = None,
  527. metadata: Optional[dict] = None,
  528. runs: Optional[Union[Iterable[schemas.Run], AsyncIterable[schemas.Run]]] = None,
  529. client: Optional[langsmith.Client] = None,
  530. evaluation_results: Optional[AsyncIterable[EvaluationResults]] = None,
  531. summary_results: Optional[AsyncIterable[EvaluationResults]] = None,
  532. description: Optional[str] = None,
  533. num_repetitions: int = 1,
  534. include_attachments: bool = False,
  535. reuse_attachments: bool = False,
  536. upload_results: bool = True,
  537. attachment_raw_data_dict: Optional[dict] = None,
  538. error_handling: Literal["log", "ignore"] = "log",
  539. ):
  540. super().__init__(
  541. experiment=experiment,
  542. metadata=metadata,
  543. client=client,
  544. description=description,
  545. )
  546. self._data = data
  547. self._examples: Optional[AsyncIterable[schemas.Example]] = None
  548. self._runs = (
  549. aitertools.ensure_async_iterator(runs) if runs is not None else None
  550. )
  551. self._evaluation_results = evaluation_results
  552. self._summary_results = summary_results
  553. self._num_repetitions = num_repetitions
  554. self._include_attachments = include_attachments
  555. self._reuse_attachments = reuse_attachments
  556. self._upload_results = upload_results
  557. self._attachment_raw_data_dict = attachment_raw_data_dict
  558. self._error_handling = error_handling
  559. def _reset_example_attachments(self, example: schemas.Example) -> schemas.Example:
  560. """Reset attachment readers for an example.
  561. This is only in the case that an attachment is going to be used by more
  562. than 1 callable (target + evaluators). In that case we keep a single copy
  563. of the attachment data in self._attachment_raw_data_dict, and create
  564. readers from that data. This makes it so that we don't have to keep
  565. copies of the same data in memory, instead we can just create readers
  566. from the same data.
  567. """
  568. if not hasattr(example, "attachments") or not example.attachments:
  569. return example
  570. new_attachments: dict[str, schemas.AttachmentInfo] = {}
  571. for name, attachment in example.attachments.items():
  572. if (
  573. self._attachment_raw_data_dict is not None
  574. and str(example.id) + name in self._attachment_raw_data_dict
  575. ):
  576. new_attachments[name] = {
  577. "presigned_url": attachment["presigned_url"],
  578. "reader": io.BytesIO(
  579. self._attachment_raw_data_dict[str(example.id) + name]
  580. ),
  581. "mime_type": attachment["mime_type"],
  582. }
  583. else:
  584. new_attachments[name] = attachment
  585. # Create a new Example instance with the updated attachments
  586. return schemas.Example(
  587. id=example.id,
  588. created_at=example.created_at,
  589. dataset_id=example.dataset_id,
  590. inputs=example.inputs,
  591. outputs=example.outputs,
  592. metadata=example.metadata,
  593. modified_at=example.modified_at,
  594. source_run_id=example.source_run_id,
  595. attachments=new_attachments,
  596. _host_url=example._host_url,
  597. _tenant_id=example._tenant_id,
  598. )
  599. async def aget_examples(self) -> AsyncIterator[schemas.Example]:
  600. if self._examples is None:
  601. self._examples = _aresolve_data(
  602. self._data,
  603. client=self.client,
  604. include_attachments=self._include_attachments,
  605. )
  606. if self._reuse_attachments and self._attachment_raw_data_dict is None:
  607. examples_copy, self._examples = aitertools.atee(self._examples)
  608. self._attachment_raw_data_dict = {
  609. str(e.id) + name: value["reader"].read()
  610. async for e in examples_copy
  611. for name, value in (e.attachments or {}).items()
  612. }
  613. if self._num_repetitions > 1:
  614. examples_list = [example async for example in self._examples]
  615. self._examples = async_chain_from_iterable(
  616. [
  617. async_iter_from_list(
  618. [
  619. self._reset_example_attachments(example)
  620. for example in examples_list
  621. ]
  622. )
  623. for _ in range(self._num_repetitions)
  624. ]
  625. )
  626. self._examples, examples_iter = aitertools.atee(
  627. aitertools.ensure_async_iterator(self._examples), 2, lock=asyncio.Lock()
  628. )
  629. return examples_iter
  630. async def get_dataset_id(self) -> str:
  631. if self._experiment is None or not getattr(
  632. self._experiment, "reference_dataset_id", None
  633. ):
  634. example = await aitertools.py_anext(await self.aget_examples())
  635. if example is None:
  636. raise ValueError("No examples found in the dataset.")
  637. return str(example.dataset_id)
  638. return str(self._experiment.reference_dataset_id)
  639. async def aget_runs(self) -> AsyncIterator[schemas.Run]:
  640. if self._runs is None:
  641. raise ValueError("Runs not loaded yet.")
  642. self._runs, runs = aitertools.atee(
  643. aitertools.ensure_async_iterator(self._runs), 2, lock=asyncio.Lock()
  644. )
  645. async for run in runs:
  646. yield run
  647. async def aget_evaluation_results(self) -> AsyncIterator[EvaluationResults]:
  648. if self._evaluation_results is None:
  649. async for _ in await self.aget_examples():
  650. yield {"results": []}
  651. else:
  652. self._evaluation_results, evaluation_results = aitertools.atee(
  653. aitertools.ensure_async_iterator(self._evaluation_results),
  654. 2,
  655. lock=asyncio.Lock(),
  656. )
  657. async for result in evaluation_results:
  658. yield result
  659. async def astart(self) -> _AsyncExperimentManager:
  660. try:
  661. first_example = await aitertools.py_anext(await self.aget_examples())
  662. except StopAsyncIteration:
  663. raise ValueError(
  664. "No examples found in the dataset. "
  665. "Please ensure the data provided to aevaluate is not empty."
  666. )
  667. if not first_example:
  668. raise ValueError(
  669. "No examples found in the dataset."
  670. "Please ensure the data provided to aevaluate is not empty."
  671. )
  672. project = self._get_project(first_example) if self._upload_results else None
  673. self._print_experiment_start(project, first_example)
  674. self._metadata["num_repetitions"] = self._num_repetitions
  675. return self._copy(
  676. await self.aget_examples(),
  677. experiment=project,
  678. )
  679. def _get_example_with_readers(self, example: schemas.Example) -> schemas.Example:
  680. new_attachments: dict[str, schemas.AttachmentInfo] = {}
  681. for name, attachment in (example.attachments or {}).items():
  682. if (
  683. self._attachment_raw_data_dict is not None
  684. and str(example.id) + name in self._attachment_raw_data_dict
  685. ):
  686. reader = io.BytesIO(
  687. self._attachment_raw_data_dict[str(example.id) + name]
  688. )
  689. new_attachments[name] = {
  690. "presigned_url": attachment["presigned_url"],
  691. "reader": reader,
  692. "mime_type": attachment["mime_type"],
  693. }
  694. else:
  695. new_attachments[name] = attachment
  696. return schemas.Example(
  697. id=example.id,
  698. created_at=example.created_at,
  699. dataset_id=example.dataset_id,
  700. inputs=example.inputs,
  701. outputs=example.outputs,
  702. metadata=example.metadata,
  703. modified_at=example.modified_at,
  704. source_run_id=example.source_run_id,
  705. attachments=new_attachments,
  706. _host_url=example._host_url,
  707. _tenant_id=example._tenant_id,
  708. )
  709. async def awith_predictions_and_evaluators(
  710. self,
  711. target: ATARGET_T,
  712. evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
  713. /,
  714. max_concurrency: Optional[int] = None,
  715. ) -> _AsyncExperimentManager:
  716. """Run predictions and evaluations in a single pipeline.
  717. This allows evaluators to process results as soon as they're available from
  718. the target function, rather than waiting for all predictions to complete first.
  719. """
  720. evaluators = _resolve_evaluators(evaluators)
  721. if not hasattr(self, "_evaluation_feedback_executor"):
  722. self._evaluation_feedback_executor = cf.ThreadPoolExecutor(max_workers=4)
  723. traceable_target = _ensure_async_traceable(target)
  724. async def process_example(example: schemas.Example):
  725. # Yield the coroutine to be awaited later
  726. pred = await _aforward(
  727. traceable_target,
  728. self._get_example_with_readers(example),
  729. self.experiment_name,
  730. self._metadata,
  731. self.client,
  732. _target_include_attachments(target),
  733. self._error_handling,
  734. )
  735. example, run = pred["example"], pred["run"]
  736. result = await self._arun_evaluators(
  737. evaluators,
  738. {
  739. "run": run,
  740. "example": example,
  741. "evaluation_results": {"results": []},
  742. },
  743. feedback_executor=self._evaluation_feedback_executor,
  744. )
  745. return result
  746. async def process_examples():
  747. """Create a single task per example.
  748. That task is to run the target function and all the evaluators
  749. sequentially.
  750. """
  751. async for example in await self.aget_examples():
  752. yield process_example(example)
  753. await self._aend()
  754. # Run the per-example tasks with max-concurrency
  755. # This guarantees that max_concurrency is the upper limit
  756. # for the number of target/evaluators that can be run in parallel
  757. experiment_results = aitertools.aiter_with_concurrency(
  758. max_concurrency,
  759. process_examples(),
  760. _eager_consumption_timeout=0.001,
  761. )
  762. r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())
  763. return self._copy(
  764. (result["example"] async for result in r1),
  765. runs=(result["run"] async for result in r2),
  766. evaluation_results=(result["evaluation_results"] async for result in r3),
  767. )
  768. async def awith_predictions(
  769. self,
  770. target: ATARGET_T,
  771. /,
  772. max_concurrency: Optional[int] = None,
  773. ) -> _AsyncExperimentManager:
  774. _experiment_results = self._apredict(
  775. target,
  776. max_concurrency=max_concurrency,
  777. include_attachments=_target_include_attachments(target),
  778. )
  779. r1, r2 = aitertools.atee(_experiment_results, 2, lock=asyncio.Lock())
  780. return self._copy(
  781. (pred["example"] async for pred in r1),
  782. runs=(pred["run"] async for pred in r2),
  783. )
  784. async def awith_evaluators(
  785. self,
  786. evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
  787. *,
  788. max_concurrency: Optional[int] = None,
  789. ) -> _AsyncExperimentManager:
  790. evaluators = _resolve_evaluators(evaluators)
  791. experiment_results = self._ascore(evaluators, max_concurrency=max_concurrency)
  792. r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())
  793. return self._copy(
  794. (result["example"] async for result in r1),
  795. runs=(result["run"] async for result in r2),
  796. evaluation_results=(result["evaluation_results"] async for result in r3),
  797. )
  798. async def awith_summary_evaluators(
  799. self,
  800. summary_evaluators: Sequence[SUMMARY_EVALUATOR_T],
  801. ) -> _AsyncExperimentManager:
  802. wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators)
  803. aggregate_feedback_gen = self._aapply_summary_evaluators(wrapped_evaluators)
  804. return self._copy(
  805. await self.aget_examples(),
  806. runs=self.aget_runs(),
  807. summary_results=aggregate_feedback_gen,
  808. )
  809. async def aget_results(self) -> AsyncIterator[ExperimentResultRow]:
  810. async for run, example, evaluation_results in aitertools.async_zip(
  811. self.aget_runs(), await self.aget_examples(), self.aget_evaluation_results()
  812. ):
  813. yield ExperimentResultRow(
  814. run=run,
  815. example=example,
  816. evaluation_results=evaluation_results,
  817. )
  818. async def aget_summary_scores(self) -> dict[str, list[dict]]:
  819. if self._summary_results is None:
  820. return {"results": []}
  821. return {
  822. "results": [
  823. res # type: ignore[misc]
  824. async for results in self._summary_results
  825. for res in results["results"]
  826. ]
  827. }
  828. ## Private methods
  829. async def _apredict(
  830. self,
  831. target: ATARGET_T,
  832. /,
  833. max_concurrency: Optional[int] = None,
  834. include_attachments: bool = False,
  835. ) -> AsyncIterator[_ForwardResults]:
  836. fn = _ensure_async_traceable(target)
  837. async def predict_all():
  838. async for example in await self.aget_examples():
  839. # Yield the coroutine to be awaited later
  840. yield _aforward(
  841. fn,
  842. self._get_example_with_readers(example),
  843. self.experiment_name,
  844. self._metadata,
  845. self.client,
  846. include_attachments,
  847. self._error_handling,
  848. )
  849. async for result in aitertools.aiter_with_concurrency(
  850. max_concurrency, predict_all(), _eager_consumption_timeout=0.001
  851. ):
  852. yield result
  853. await self._aend()
  854. async def _ascore(
  855. self,
  856. evaluators: Sequence[RunEvaluator],
  857. max_concurrency: Optional[int] = None,
  858. ) -> AsyncIterator[ExperimentResultRow]:
  859. with cf.ThreadPoolExecutor(max_workers=4) as feedback_executor:
  860. async def score_all():
  861. async for current_results in self.aget_results():
  862. # Yield the coroutine to be awaited later in aiter_with_concurrency
  863. yield self._arun_evaluators(
  864. evaluators, current_results, feedback_executor=feedback_executor
  865. )
  866. async for result in aitertools.aiter_with_concurrency(
  867. max_concurrency, score_all(), _eager_consumption_timeout=0.001
  868. ):
  869. yield result
  870. async def _arun_evaluators(
  871. self,
  872. evaluators: Sequence[RunEvaluator],
  873. current_results: ExperimentResultRow,
  874. feedback_executor: cf.ThreadPoolExecutor,
  875. ) -> ExperimentResultRow:
  876. current_context = rh.get_tracing_context()
  877. metadata = {
  878. **(current_context["metadata"] or {}),
  879. **{"experiment": self.experiment_name},
  880. }
  881. with rh.tracing_context(
  882. **{
  883. **current_context,
  884. "project_name": "evaluators",
  885. "metadata": metadata,
  886. "enabled": "local" if not self._upload_results else True,
  887. "client": self.client,
  888. }
  889. ):
  890. run = current_results["run"]
  891. example = current_results["example"]
  892. eval_results = current_results["evaluation_results"]
  893. async def _run_single_evaluator(evaluator: RunEvaluator):
  894. evaluator_run_id = uuid.uuid4()
  895. try:
  896. evaluator_response = await evaluator.aevaluate_run( # type: ignore[call-arg]
  897. run=run,
  898. example=self._get_example_with_readers(example),
  899. evaluator_run_id=evaluator_run_id,
  900. )
  901. selected_results = self.client._select_eval_results(
  902. evaluator_response
  903. )
  904. if self._upload_results:
  905. self.client._log_evaluation_feedback(
  906. evaluator_response, run=run, _executor=feedback_executor
  907. )
  908. return selected_results
  909. except Exception as e:
  910. try:
  911. feedback_keys = _extract_feedback_keys(evaluator)
  912. error_response = EvaluationResults(
  913. results=[
  914. EvaluationResult(
  915. key=key,
  916. source_run_id=evaluator_run_id,
  917. comment=repr(e),
  918. extra={"error": True},
  919. )
  920. for key in feedback_keys
  921. ]
  922. )
  923. selected_results = self.client._select_eval_results(
  924. error_response
  925. )
  926. if self._upload_results:
  927. self.client._log_evaluation_feedback(
  928. error_response, run=run, _executor=feedback_executor
  929. )
  930. return selected_results
  931. except Exception as e2:
  932. logger.debug(f"Error parsing feedback keys: {e2}")
  933. pass
  934. logger.error(
  935. f"Error running evaluator {repr(evaluator)} on"
  936. f" run {run.id}: {repr(e)}",
  937. exc_info=True,
  938. )
  939. all_results = []
  940. for evaluator in evaluators:
  941. all_results.append(await _run_single_evaluator(evaluator))
  942. for result in all_results:
  943. if result is not None:
  944. eval_results["results"].extend(result)
  945. return ExperimentResultRow(
  946. run=run,
  947. example=example,
  948. evaluation_results=eval_results,
  949. )
  950. async def _aapply_summary_evaluators(
  951. self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
  952. ) -> AsyncIterator[EvaluationResults]:
  953. runs, examples = [], []
  954. async_examples = aitertools.ensure_async_iterator(await self.aget_examples())
  955. async for run, example in aitertools.async_zip(
  956. self.aget_runs(), async_examples
  957. ):
  958. runs.append(run)
  959. examples.append(example)
  960. aggregate_feedback = []
  961. project_id = self._get_experiment().id if self._upload_results else None
  962. current_context = rh.get_tracing_context()
  963. metadata = {
  964. **(current_context["metadata"] or {}),
  965. **{
  966. "experiment": self.experiment_name,
  967. "experiment_id": project_id,
  968. },
  969. }
  970. with rh.tracing_context(
  971. **{
  972. **current_context,
  973. "project_name": "evaluators",
  974. "metadata": metadata,
  975. "enabled": "local" if not self._upload_results else True,
  976. "client": self.client,
  977. }
  978. ):
  979. for evaluator in summary_evaluators:
  980. try:
  981. summary_eval_result = evaluator(runs, examples)
  982. flattened_results = self.client._select_eval_results(
  983. summary_eval_result,
  984. fn_name=evaluator.__name__,
  985. )
  986. aggregate_feedback.extend(flattened_results)
  987. if self._upload_results:
  988. for result in flattened_results:
  989. feedback = result.dict(exclude={"target_run_id"})
  990. evaluator_info = feedback.pop("evaluator_info", None)
  991. await aitertools.aio_to_thread(
  992. self.client.create_feedback,
  993. **feedback,
  994. run_id=None,
  995. project_id=project_id,
  996. source_info=evaluator_info,
  997. )
  998. except Exception as e:
  999. logger.error(
  1000. f"Error running summary evaluator {repr(evaluator)}: {e}",
  1001. exc_info=True,
  1002. )
  1003. yield {"results": aggregate_feedback}
  1004. async def _get_dataset_version(self) -> Optional[str]:
  1005. modified_at = []
  1006. async for example in await self.aget_examples():
  1007. if example.modified_at:
  1008. # Should always be defined in practice when fetched,
  1009. # but the typing permits None
  1010. modified_at.append(example.modified_at)
  1011. max_modified_at = max(modified_at) if modified_at else None
  1012. return max_modified_at.isoformat() if max_modified_at else None
  1013. async def _get_dataset_splits(self) -> Optional[list[str]]:
  1014. splits = set()
  1015. async for example in await self.aget_examples():
  1016. if (
  1017. example.metadata
  1018. and example.metadata.get("dataset_split")
  1019. and isinstance(example.metadata["dataset_split"], list)
  1020. ):
  1021. for split in example.metadata["dataset_split"]:
  1022. if isinstance(split, str):
  1023. splits.add(split)
  1024. else:
  1025. splits.add("base")
  1026. return list(splits)
  1027. async def _aend(self) -> None:
  1028. if not self._upload_results:
  1029. return
  1030. experiment = self._experiment
  1031. if experiment is None:
  1032. raise ValueError("Experiment not started yet.")
  1033. project_metadata = self._get_experiment_metadata()
  1034. project_metadata["dataset_version"] = await self._get_dataset_version()
  1035. project_metadata["dataset_splits"] = await self._get_dataset_splits()
  1036. self.client.update_project(
  1037. experiment.id,
  1038. metadata={
  1039. **experiment.metadata,
  1040. **project_metadata,
  1041. },
  1042. )
  1043. def _copy(self, *args: Any, **kwargs: Any) -> _AsyncExperimentManager:
  1044. default_args = (self._data,)
  1045. default_kwargs = {
  1046. "experiment": self._experiment,
  1047. "metadata": self._metadata,
  1048. "runs": self._runs,
  1049. "client": self.client,
  1050. "evaluation_results": self._evaluation_results,
  1051. "summary_results": self._summary_results,
  1052. "include_attachments": self._include_attachments,
  1053. "reuse_attachments": self._reuse_attachments,
  1054. "upload_results": self._upload_results,
  1055. "attachment_raw_data_dict": self._attachment_raw_data_dict,
  1056. "error_handling": self._error_handling,
  1057. }
  1058. full_args = list(args) + list(default_args[len(args) :])
  1059. full_kwargs = {**default_kwargs, **kwargs}
  1060. return self.__class__(*full_args, **full_kwargs)
  1061. class AsyncExperimentResults:
  1062. def __init__(
  1063. self,
  1064. experiment_manager: _AsyncExperimentManager,
  1065. ):
  1066. self._manager = experiment_manager
  1067. self._results: list[ExperimentResultRow] = []
  1068. self._lock = asyncio.Lock()
  1069. self._task = asyncio.create_task(self._process_data(self._manager))
  1070. self._processed_count = 0
  1071. @property
  1072. def experiment_name(self) -> str:
  1073. return self._manager.experiment_name
  1074. def __aiter__(self) -> AsyncIterator[ExperimentResultRow]:
  1075. return self
  1076. async def __anext__(self) -> ExperimentResultRow:
  1077. async def _wait_until_index(index: int) -> None:
  1078. while self._processed_count < index:
  1079. await asyncio.sleep(0.05)
  1080. while True:
  1081. async with self._lock:
  1082. if self._processed_count < len(self._results):
  1083. result = self._results[self._processed_count]
  1084. self._processed_count += 1
  1085. return result
  1086. elif self._task.done():
  1087. raise StopAsyncIteration
  1088. await asyncio.shield(
  1089. asyncio.wait_for(_wait_until_index(len(self._results)), timeout=None)
  1090. )
  1091. async def _process_data(self, manager: _AsyncExperimentManager) -> None:
  1092. tqdm = _load_tqdm()
  1093. async for item in tqdm(manager.aget_results()):
  1094. async with self._lock:
  1095. self._results.append(item)
  1096. summary_scores = await manager.aget_summary_scores()
  1097. async with self._lock:
  1098. self._summary_results = summary_scores
  1099. def to_pandas(
  1100. self, start: Optional[int] = 0, end: Optional[int] = None
  1101. ) -> DataFrame:
  1102. return _to_pandas(self._results, start=start, end=end)
  1103. def _repr_html_(self) -> str:
  1104. import importlib.util
  1105. if self._results and importlib.util.find_spec("pandas"):
  1106. df = self.to_pandas(0, 5)
  1107. return df._repr_html_() # type: ignore[operator]
  1108. else:
  1109. return self.__repr__()
  1110. def __len__(self) -> int:
  1111. return len(self._results)
  1112. def __repr__(self) -> str:
  1113. return f"<AsyncExperimentResults {self.experiment_name}>"
  1114. async def wait(self) -> None:
  1115. await self._task
  1116. async def _aforward(
  1117. fn: rh.SupportsLangsmithExtra[[dict], Awaitable],
  1118. example: schemas.Example,
  1119. experiment_name: str,
  1120. metadata: dict,
  1121. client: langsmith.Client,
  1122. include_attachments: bool = False,
  1123. error_handling: Literal["log", "ignore"] = "log",
  1124. ) -> _ForwardResults:
  1125. run: Optional[schemas.RunBase] = None
  1126. def _get_run(r: run_trees.RunTree) -> None:
  1127. nonlocal run
  1128. run = r
  1129. def _set_reference_example_id(r: rt.RunTree) -> None:
  1130. r.reference_example_id = example.id
  1131. langsmith_extra = rh.LangSmithExtra(
  1132. on_end=_get_run,
  1133. project_name=experiment_name,
  1134. metadata={
  1135. **metadata,
  1136. "example_version": (example.modified_at or example.created_at).isoformat(),
  1137. },
  1138. client=client,
  1139. )
  1140. if error_handling == "log":
  1141. langsmith_extra["reference_example_id"] = example.id
  1142. elif error_handling == "ignore":
  1143. langsmith_extra["_on_success"] = _set_reference_example_id
  1144. else:
  1145. raise ValueError(f"Unrecognized error_handling value: {error_handling=}")
  1146. with rh.tracing_context(enabled=True):
  1147. try:
  1148. arg_names = _get_target_args(fn)
  1149. args = [getattr(example, argn) for argn in arg_names]
  1150. await fn(*args, langsmith_extra=langsmith_extra)
  1151. except Exception as e:
  1152. logger.error(
  1153. f"Error running target function: {e}", exc_info=True, stacklevel=1
  1154. )
  1155. return _ForwardResults(
  1156. run=cast(schemas.Run, run),
  1157. example=example,
  1158. )
  1159. def _ensure_async_traceable(
  1160. target: ATARGET_T,
  1161. ) -> rh.SupportsLangsmithExtra[[dict], Awaitable]:
  1162. if not asyncio.iscoroutinefunction(target) and not _is_langchain_runnable(target):
  1163. if callable(target):
  1164. raise ValueError(
  1165. "Target must be an async function. For sync functions, use evaluate."
  1166. " Example usage:\n\n"
  1167. "async def predict(inputs: dict) -> dict:\n"
  1168. " # do work, like chain.invoke(inputs)\n"
  1169. " return {...}\n"
  1170. "await aevaluate(predict, ...)"
  1171. )
  1172. else:
  1173. raise ValueError(
  1174. "Target must be a callable async function. "
  1175. "Received a non-callable object. Example usage:\n\n"
  1176. "async def predict(inputs: dict) -> dict:\n"
  1177. " # do work, like chain.invoke(inputs)\n"
  1178. " return {...}\n"
  1179. "await aevaluate(predict, ...)"
  1180. )
  1181. if rh.is_traceable_function(target):
  1182. return target # type: ignore
  1183. else:
  1184. if _is_langchain_runnable(target):
  1185. target = target.ainvoke # type: ignore[union-attr]
  1186. return rh.traceable(name="AsyncTarget")(target) # type: ignore[arg-type]
  1187. def _aresolve_data(
  1188. data: Union[DATA_T, AsyncIterable[schemas.Example]],
  1189. *,
  1190. client: langsmith.Client,
  1191. include_attachments: bool = False,
  1192. ) -> AsyncIterator[schemas.Example]:
  1193. """Return the examples for the given dataset."""
  1194. if isinstance(data, AsyncIterable):
  1195. return aitertools.ensure_async_iterator(data)
  1196. return aitertools.ensure_async_iterator(
  1197. _resolve_data(data, client=client, include_attachments=include_attachments)
  1198. )
  1199. T = TypeVar("T")
  1200. async def async_chain_from_iterable(
  1201. iterable: Iterable[AsyncIterable[T]],
  1202. ) -> AsyncIterator[T]:
  1203. """Chain multiple async iterables."""
  1204. for sub_iterable in iterable:
  1205. async for item in sub_iterable:
  1206. yield item
  1207. async def async_iter_from_list(
  1208. examples: list[schemas.Example],
  1209. ) -> AsyncIterable[schemas.Example]:
  1210. """Convert a list of examples to an async iterable."""
  1211. for example in examples:
  1212. yield example