_evals.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. """Beta utility functions to assist in common eval workflows.
  2. These functions may change in the future.
  3. """
  4. import collections
  5. import datetime
  6. import itertools
  7. import uuid
  8. from collections.abc import Sequence
  9. from typing import Optional, TypeVar
  10. import langsmith.run_trees as rt
  11. import langsmith.schemas as ls_schemas
  12. from langsmith import evaluation as ls_eval
  13. from langsmith._internal._beta_decorator import warn_beta
  14. from langsmith.client import Client
  15. def _convert_ids(run_dict: dict, id_map: dict) -> dict:
  16. """Convert the IDs in the run dictionary using the provided ID map.
  17. Parameters:
  18. - run_dict: The dictionary representing a run.
  19. - id_map: The dictionary mapping old IDs to new IDs.
  20. Returns:
  21. - dict: The updated run dictionary.
  22. """
  23. do = run_dict["dotted_order"]
  24. for k, v in id_map.items():
  25. do = do.replace(str(k), str(v))
  26. run_dict["dotted_order"] = do
  27. if run_dict.get("parent_run_id"):
  28. run_dict["parent_run_id"] = id_map[run_dict["parent_run_id"]]
  29. if not run_dict.get("extra"):
  30. run_dict["extra"] = {}
  31. return run_dict
  32. def _convert_root_run(root: ls_schemas.Run, run_to_example_map: dict) -> list[dict]:
  33. """Convert the root run and its child runs to a list of dictionaries.
  34. Parameters:
  35. - root: The root run to convert.
  36. - run_to_example_map: The dictionary mapping run IDs to example IDs.
  37. Returns:
  38. - The list of converted run dictionaries.
  39. """
  40. runs_ = [root]
  41. trace_id = uuid.uuid4()
  42. id_map = {root.trace_id: trace_id}
  43. results = []
  44. while runs_:
  45. src = runs_.pop()
  46. src_dict = src.dict(exclude={"parent_run_ids", "child_run_ids", "session_id"})
  47. id_map[src_dict["id"]] = id_map.get(src_dict["id"], uuid.uuid4())
  48. src_dict["id"] = id_map[src_dict["id"]]
  49. src_dict["trace_id"] = id_map[src_dict["trace_id"]]
  50. if src.child_runs:
  51. runs_.extend(src.child_runs)
  52. results.append(src_dict)
  53. result = [_convert_ids(r, id_map) for r in results]
  54. result[0]["reference_example_id"] = run_to_example_map[root.id]
  55. return result
  56. @warn_beta
  57. def convert_runs_to_test(
  58. runs: Sequence[ls_schemas.Run],
  59. *,
  60. dataset_name: str,
  61. test_project_name: Optional[str] = None,
  62. client: Optional[Client] = None,
  63. load_child_runs: bool = False,
  64. include_outputs: bool = False,
  65. ) -> ls_schemas.TracerSession:
  66. """Convert the following runs to a dataset + test.
  67. This makes it easy to sample prod runs into a new regression testing
  68. workflow and compare against a candidate system.
  69. Internally, this function does the following:
  70. 1. Create a dataset from the provided production run inputs.
  71. 2. Create a new test project.
  72. 3. Clone the production runs and re-upload against the dataset.
  73. Parameters:
  74. - runs: A sequence of runs to be executed as a test.
  75. - dataset_name: The name of the dataset to associate with the test runs.
  76. - client: An optional LangSmith client instance. If not provided, a new client will
  77. be created.
  78. - load_child_runs: Whether to load child runs when copying runs.
  79. Returns:
  80. - The project containing the cloned runs.
  81. Example:
  82. --------
  83. ```python
  84. import langsmith
  85. import random
  86. client = langsmith.Client()
  87. # Randomly sample 100 runs from a prod project
  88. runs = list(client.list_runs(project_name="My Project", execution_order=1))
  89. sampled_runs = random.sample(runs, min(len(runs), 100))
  90. runs_as_test(runs, dataset_name="Random Runs")
  91. # Select runs named "extractor" whose root traces received good feedback
  92. runs = client.list_runs(
  93. project_name="<your_project>",
  94. filter='eq(name, "extractor")',
  95. trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
  96. )
  97. runs_as_test(runs, dataset_name="Extraction Good")
  98. ```
  99. """
  100. if not runs:
  101. raise ValueError(f"""Expected a non-empty sequence of runs. Received: {runs}""")
  102. client = client or rt.get_cached_client()
  103. ds = client.create_dataset(dataset_name=dataset_name)
  104. outputs = [r.outputs for r in runs] if include_outputs else None
  105. client.create_examples(
  106. inputs=[r.inputs for r in runs],
  107. outputs=outputs,
  108. source_run_ids=[r.id for r in runs],
  109. dataset_id=ds.id,
  110. )
  111. if not load_child_runs:
  112. runs_to_copy = runs
  113. else:
  114. runs_to_copy = [
  115. client.read_run(r.id, load_child_runs=load_child_runs) for r in runs
  116. ]
  117. test_project_name = test_project_name or f"prod-baseline-{uuid.uuid4().hex[:6]}"
  118. examples = list(client.list_examples(dataset_name=dataset_name))
  119. run_to_example_map = {e.source_run_id: e.id for e in examples}
  120. dataset_version = (
  121. examples[0].modified_at if examples[0].modified_at else examples[0].created_at
  122. )
  123. to_create = [
  124. run_dict
  125. for root_run in runs_to_copy
  126. for run_dict in _convert_root_run(root_run, run_to_example_map)
  127. ]
  128. project = client.create_project(
  129. project_name=test_project_name,
  130. reference_dataset_id=ds.id,
  131. metadata={
  132. "which": "prod-baseline",
  133. "dataset_version": dataset_version.isoformat(),
  134. },
  135. )
  136. for new_run in to_create:
  137. latency = new_run["end_time"] - new_run["start_time"]
  138. new_run["start_time"] = datetime.datetime.now(tz=datetime.timezone.utc)
  139. new_run["end_time"] = new_run["start_time"] + latency
  140. client.create_run(**new_run, project_name=test_project_name)
  141. _ = client.update_project(
  142. project.id,
  143. )
  144. return project
  145. def _load_nested_traces(project_name: str, client: Client) -> list[ls_schemas.Run]:
  146. runs = client.list_runs(project_name=project_name)
  147. treemap: collections.defaultdict[uuid.UUID, list[ls_schemas.Run]] = (
  148. collections.defaultdict(list)
  149. )
  150. results = []
  151. all_runs = {}
  152. for run in runs:
  153. if run.parent_run_id is not None:
  154. treemap[run.parent_run_id].append(run)
  155. else:
  156. results.append(run)
  157. all_runs[run.id] = run
  158. for run_id, child_runs in treemap.items():
  159. all_runs[run_id].child_runs = sorted(child_runs, key=lambda r: r.dotted_order)
  160. return results
  161. T = TypeVar("T")
  162. U = TypeVar("U")
  163. def _outer_product(list1: list[T], list2: list[U]) -> list[tuple[T, U]]:
  164. return list(itertools.product(list1, list2))
  165. @warn_beta
  166. def compute_test_metrics(
  167. project_name: str,
  168. *,
  169. evaluators: list,
  170. max_concurrency: Optional[int] = 10,
  171. client: Optional[Client] = None,
  172. ) -> None:
  173. """Compute test metrics for a given test name using a list of evaluators.
  174. Args:
  175. project_name (str): The name of the test project to evaluate.
  176. evaluators (list): A list of evaluators to compute metrics with.
  177. max_concurrency (Optional[int], optional): The maximum number of concurrent
  178. evaluations. Defaults to 10.
  179. client (Optional[Client], optional): The client to use for evaluations.
  180. Defaults to None.
  181. Returns:
  182. None: This function does not return any value.
  183. """
  184. from langsmith import ContextThreadPoolExecutor
  185. evaluators_: list[ls_eval.RunEvaluator] = []
  186. for func in evaluators:
  187. if isinstance(func, ls_eval.RunEvaluator):
  188. evaluators_.append(func)
  189. elif callable(func):
  190. evaluators_.append(ls_eval.run_evaluator(func))
  191. else:
  192. raise NotImplementedError(
  193. f"Evaluation not yet implemented for evaluator of type {type(func)}"
  194. )
  195. client = client or rt.get_cached_client()
  196. traces = _load_nested_traces(project_name, client)
  197. with ContextThreadPoolExecutor(max_workers=max_concurrency) as executor:
  198. results = executor.map(
  199. client.evaluate_run, *zip(*_outer_product(traces, evaluators_))
  200. )
  201. for _ in results:
  202. pass