run_trees.py 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139
  1. """Schemas for the LangSmith API."""
  2. from __future__ import annotations
  3. import functools
  4. import json
  5. import logging
  6. import sys
  7. from collections.abc import Mapping, Sequence
  8. from datetime import datetime, timezone
  9. from typing import Any, Optional, Union, cast
  10. from uuid import NAMESPACE_DNS, UUID, uuid5
  11. from typing_extensions import TypedDict
  12. from langsmith._internal._uuid import uuid7
  13. from langsmith.uuid import uuid7_from_datetime
  14. try:
  15. from pydantic.v1 import Field, root_validator # type: ignore[import]
  16. except ImportError:
  17. from pydantic import ( # type: ignore[assignment, no-redef]
  18. Field,
  19. root_validator,
  20. )
  21. import contextvars
  22. import threading
  23. import urllib.parse
  24. import langsmith._internal._context as _context
  25. from langsmith import schemas as ls_schemas
  26. from langsmith import utils
  27. from langsmith.client import ID_TYPE, RUN_TYPE_T, Client, _dumps_json, _ensure_uuid
  28. logger = logging.getLogger(__name__)
  29. class WriteReplica(TypedDict, total=False):
  30. api_url: Optional[str]
  31. api_key: Optional[str]
  32. project_name: Optional[str]
  33. updates: Optional[dict]
  34. LANGSMITH_PREFIX = "langsmith-"
  35. LANGSMITH_DOTTED_ORDER = sys.intern(f"{LANGSMITH_PREFIX}trace")
  36. LANGSMITH_DOTTED_ORDER_BYTES = LANGSMITH_DOTTED_ORDER.encode("utf-8")
  37. LANGSMITH_METADATA = sys.intern(f"{LANGSMITH_PREFIX}metadata")
  38. LANGSMITH_TAGS = sys.intern(f"{LANGSMITH_PREFIX}tags")
  39. LANGSMITH_PROJECT = sys.intern(f"{LANGSMITH_PREFIX}project")
  40. LANGSMITH_REPLICAS = sys.intern(f"{LANGSMITH_PREFIX}replicas")
  41. OVERRIDE_OUTPUTS = sys.intern("__omit_auto_outputs")
  42. NOT_PROVIDED = cast(None, object())
  43. _LOCK = threading.Lock()
  44. # Context variables
  45. _REPLICAS = contextvars.ContextVar[Optional[Sequence[WriteReplica]]](
  46. "_REPLICAS", default=None
  47. )
  48. _DISTRIBUTED_PARENT_ID = contextvars.ContextVar[Optional[str]](
  49. "_DISTRIBUTED_PARENT_ID", default=None
  50. )
  51. _SENTINEL = cast(None, object())
  52. TIMESTAMP_LENGTH = 36
  53. # Note, this is called directly by langchain. Do not remove.
  54. def get_cached_client(**init_kwargs: Any) -> Client:
  55. global _CLIENT
  56. if _CLIENT is None:
  57. with _LOCK:
  58. if _CLIENT is None:
  59. _CLIENT = Client(**init_kwargs)
  60. return _CLIENT
  61. def configure(
  62. client: Optional[Client] = _SENTINEL,
  63. enabled: Optional[bool] = _SENTINEL,
  64. project_name: Optional[str] = _SENTINEL,
  65. tags: Optional[list[str]] = _SENTINEL,
  66. metadata: Optional[dict[str, Any]] = _SENTINEL,
  67. ):
  68. """Configure global LangSmith tracing context.
  69. This function allows you to set global configuration options for LangSmith
  70. tracing that will be applied to all subsequent traced operations. It modifies
  71. context variables that control tracing behavior across your application.
  72. Do this once at startup to configure the global settings in code.
  73. If, instead, you wish to only configure tracing for a single invocation,
  74. use the `tracing_context` context manager instead.
  75. Args:
  76. client: A LangSmith Client instance to use for all tracing operations.
  77. If provided, this client will be used instead of creating new clients.
  78. Pass `None` to explicitly clear the global client.
  79. enabled: Whether tracing is enabled.
  80. Can be:
  81. - `True`: Enable tracing and send data to LangSmith
  82. - `False`: Disable tracing completely
  83. - `'local'`: Enable tracing but only store data locally
  84. - `None`: Clear the setting (falls back to environment variables)
  85. project_name: The LangSmith project name where traces will be sent.
  86. This determines which project dashboard will display your traces.
  87. Pass `None` to explicitly clear the project name.
  88. tags: A list of tags to be applied to all traced runs.
  89. Tags are useful for filtering and organizing runs in the LangSmith UI.
  90. Pass `None` to explicitly clear all global tags.
  91. metadata: A dictionary of metadata to attach to all traced runs.
  92. Metadata can store any additional context about your runs.
  93. Pass `None` to explicitly clear all global metadata.
  94. Examples:
  95. Basic configuration:
  96. >>> import langsmith as ls
  97. >>> # Enable tracing with a specific project
  98. >>> ls.configure(enabled=True, project_name="my-project")
  99. Set global trace masking:
  100. >>> def hide_keys(data):
  101. ... if not data:
  102. ... return {}
  103. ... return {k: v for k, v in data.items() if k not in ["key1", "key2"]}
  104. >>> ls.configure(
  105. ... client=ls.Client(
  106. ... hide_inputs=hide_keys,
  107. ... hide_outputs=hide_keys,
  108. ... )
  109. ... )
  110. Adding global tags and metadata:
  111. >>> ls.configure(
  112. ... tags=["production", "v1.0"],
  113. ... metadata={"environment": "prod", "version": "1.0.0"},
  114. ... )
  115. Disabling tracing:
  116. >>> ls.configure(enabled=False)
  117. """
  118. global _CLIENT
  119. with _LOCK:
  120. if client is not _SENTINEL:
  121. _CLIENT = client
  122. if enabled is not _SENTINEL:
  123. _context._TRACING_ENABLED.set(enabled)
  124. _context._GLOBAL_TRACING_ENABLED = enabled
  125. if project_name is not _SENTINEL:
  126. _context._PROJECT_NAME.set(project_name)
  127. _context._GLOBAL_PROJECT_NAME = project_name
  128. if tags is not _SENTINEL:
  129. _context._TAGS.set(tags)
  130. _context._GLOBAL_TAGS = tags
  131. if metadata is not _SENTINEL:
  132. _context._METADATA.set(metadata)
  133. _context._GLOBAL_METADATA = metadata
  134. def validate_extracted_usage_metadata(
  135. data: ls_schemas.ExtractedUsageMetadata,
  136. ) -> ls_schemas.ExtractedUsageMetadata:
  137. """Validate that the dict only contains allowed keys."""
  138. allowed_keys = {
  139. "input_tokens",
  140. "output_tokens",
  141. "total_tokens",
  142. "input_token_details",
  143. "output_token_details",
  144. "input_cost",
  145. "output_cost",
  146. "total_cost",
  147. "input_cost_details",
  148. "output_cost_details",
  149. }
  150. extra_keys = set(data.keys()) - allowed_keys
  151. if extra_keys:
  152. raise ValueError(f"Unexpected keys in usage metadata: {extra_keys}")
  153. return data # type: ignore
  154. class RunTree(ls_schemas.RunBase):
  155. """Run Schema with back-references for posting runs."""
  156. name: str
  157. id: UUID = Field(default_factory=uuid7)
  158. run_type: str = Field(default="chain")
  159. start_time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
  160. # Note: no longer set.
  161. parent_run: Optional[RunTree] = Field(default=None, exclude=True)
  162. parent_dotted_order: Optional[str] = Field(default=None, exclude=True)
  163. child_runs: list[RunTree] = Field(
  164. default_factory=list,
  165. exclude={"__all__": {"parent_run_id"}},
  166. )
  167. session_name: str = Field(
  168. default_factory=lambda: utils.get_tracer_project() or "default",
  169. alias="project_name",
  170. )
  171. session_id: Optional[UUID] = Field(default=None, alias="project_id")
  172. extra: dict = Field(default_factory=dict)
  173. tags: Optional[list[str]] = Field(default_factory=list)
  174. events: list[dict] = Field(default_factory=list)
  175. """List of events associated with the run, like
  176. start and end events."""
  177. ls_client: Optional[Any] = Field(default=None, exclude=True)
  178. dotted_order: str = Field(
  179. default="", description="The order of the run in the tree."
  180. )
  181. trace_id: UUID = Field(default="", description="The trace id of the run.") # type: ignore
  182. dangerously_allow_filesystem: Optional[bool] = Field(
  183. default=False, description="Whether to allow filesystem access for attachments."
  184. )
  185. replicas: Optional[Sequence[WriteReplica]] = Field(
  186. default=None,
  187. description="Projects to replicate this run to with optional updates.",
  188. )
  189. class Config:
  190. """Pydantic model configuration."""
  191. arbitrary_types_allowed = True
  192. allow_population_by_field_name = True
  193. extra = "ignore"
  194. @root_validator(pre=True)
  195. def infer_defaults(cls, values: dict) -> dict:
  196. """Assign name to the run."""
  197. if values.get("name") is None and values.get("serialized") is not None:
  198. if "name" in values["serialized"]:
  199. values["name"] = values["serialized"]["name"]
  200. elif "id" in values["serialized"]:
  201. values["name"] = values["serialized"]["id"][-1]
  202. if values.get("name") is None:
  203. values["name"] = "Unnamed"
  204. if "client" in values: # Handle user-constructed clients
  205. values["ls_client"] = values.pop("client")
  206. elif "_client" in values:
  207. values["ls_client"] = values.pop("_client")
  208. if not values.get("ls_client"):
  209. values["ls_client"] = None
  210. parent_run = values.pop("parent_run", None)
  211. if parent_run is not None:
  212. values["parent_run_id"] = parent_run.id
  213. values["parent_dotted_order"] = parent_run.dotted_order
  214. if "id" not in values:
  215. # Generate UUID from start_time if available
  216. if "start_time" in values and values["start_time"] is not None:
  217. values["id"] = uuid7_from_datetime(values["start_time"])
  218. else:
  219. values["id"] = uuid7()
  220. if "trace_id" not in values:
  221. if parent_run is not None:
  222. values["trace_id"] = parent_run.trace_id
  223. else:
  224. values["trace_id"] = values["id"]
  225. cast(dict, values.setdefault("extra", {}))
  226. if values.get("events") is None:
  227. values["events"] = []
  228. if values.get("tags") is None:
  229. values["tags"] = []
  230. if values.get("outputs") is None:
  231. values["outputs"] = {}
  232. if values.get("attachments") is None:
  233. values["attachments"] = {}
  234. if values.get("replicas") is None:
  235. values["replicas"] = _REPLICAS.get()
  236. values["replicas"] = _ensure_write_replicas(values["replicas"])
  237. return values
  238. @root_validator(pre=False)
  239. def ensure_dotted_order(cls, values: dict) -> dict:
  240. """Ensure the dotted order of the run."""
  241. current_dotted_order = values.get("dotted_order")
  242. if current_dotted_order and current_dotted_order.strip():
  243. return values
  244. current_dotted_order = _create_current_dotted_order(
  245. values["start_time"], values["id"]
  246. )
  247. parent_dotted_order = values.get("parent_dotted_order")
  248. if parent_dotted_order is not None:
  249. values["dotted_order"] = parent_dotted_order + "." + current_dotted_order
  250. else:
  251. values["dotted_order"] = current_dotted_order
  252. return values
  253. @property
  254. def client(self) -> Client:
  255. """Return the client."""
  256. # Lazily load the client
  257. # If you never use this for API calls, it will never be loaded
  258. if self.ls_client is None:
  259. self.ls_client = get_cached_client()
  260. return self.ls_client
  261. @property
  262. def _client(self) -> Optional[Client]:
  263. # For backwards compat
  264. return self.ls_client
  265. def __setattr__(self, name, value):
  266. """Set the `_client` specially."""
  267. # For backwards compat
  268. if name == "_client":
  269. self.ls_client = value
  270. else:
  271. return super().__setattr__(name, value)
  272. def set(
  273. self,
  274. *,
  275. inputs: Optional[Mapping[str, Any]] = NOT_PROVIDED,
  276. outputs: Optional[Mapping[str, Any]] = NOT_PROVIDED,
  277. tags: Optional[Sequence[str]] = NOT_PROVIDED,
  278. metadata: Optional[Mapping[str, Any]] = NOT_PROVIDED,
  279. usage_metadata: Optional[ls_schemas.ExtractedUsageMetadata] = NOT_PROVIDED,
  280. ) -> None:
  281. """Set the inputs, outputs, tags, and metadata of the run.
  282. If performed, this will override the default behavior of the
  283. end() method to ignore new outputs (that would otherwise be added)
  284. by the @traceable decorator.
  285. If your LangChain or LangGraph versions are sufficiently up-to-date,
  286. this will also override the default behavior of `LangChainTracer`.
  287. Args:
  288. inputs: The inputs to set.
  289. outputs: The outputs to set.
  290. tags: The tags to set.
  291. metadata: The metadata to set.
  292. usage_metadata: Usage information to set.
  293. Returns:
  294. None
  295. """
  296. if tags is not NOT_PROVIDED:
  297. self.tags = list(tags)
  298. if metadata is not NOT_PROVIDED:
  299. self.extra.setdefault("metadata", {}).update(metadata or {})
  300. if inputs is not NOT_PROVIDED:
  301. # Used by LangChain core to determine whether to
  302. # re-upload the inputs upon run completion
  303. self.extra["inputs_is_truthy"] = False
  304. if inputs is None:
  305. self.inputs = {}
  306. else:
  307. self.inputs = dict(inputs)
  308. if outputs is not NOT_PROVIDED:
  309. self.extra[OVERRIDE_OUTPUTS] = True
  310. if outputs is None:
  311. self.outputs = {}
  312. else:
  313. self.outputs = dict(outputs)
  314. if usage_metadata is not NOT_PROVIDED:
  315. self.extra.setdefault("metadata", {})["usage_metadata"] = (
  316. validate_extracted_usage_metadata(usage_metadata)
  317. )
  318. def add_tags(self, tags: Union[Sequence[str], str]) -> None:
  319. """Add tags to the run."""
  320. if isinstance(tags, str):
  321. tags = [tags]
  322. if self.tags is None:
  323. self.tags = []
  324. self.tags.extend(tags)
  325. def add_metadata(self, metadata: dict[str, Any]) -> None:
  326. """Add metadata to the run."""
  327. if self.extra is None:
  328. self.extra = {}
  329. metadata_: dict = cast(dict, self.extra).setdefault("metadata", {})
  330. metadata_.update(metadata)
  331. def add_outputs(self, outputs: dict[str, Any]) -> None:
  332. """Upsert the given outputs into the run.
  333. Args:
  334. outputs: A dictionary containing the outputs to be added.
  335. """
  336. if self.outputs is None:
  337. self.outputs = {}
  338. self.outputs.update(outputs)
  339. def add_inputs(self, inputs: dict[str, Any]) -> None:
  340. """Upsert the given inputs into the run.
  341. Args:
  342. inputs: A dictionary containing the inputs to be added.
  343. """
  344. if self.inputs is None:
  345. self.inputs = {}
  346. self.inputs.update(inputs)
  347. # Set to False so LangChain things it needs to
  348. # re-upload inputs
  349. self.extra["inputs_is_truthy"] = False
  350. def add_event(
  351. self,
  352. events: Union[
  353. ls_schemas.RunEvent,
  354. Sequence[ls_schemas.RunEvent],
  355. Sequence[dict],
  356. dict,
  357. str,
  358. ],
  359. ) -> None:
  360. """Add an event to the list of events.
  361. Args:
  362. events: The event(s) to be added. It can be a single event, a sequence
  363. of events, a sequence of dictionaries, a dictionary, or a string.
  364. Returns:
  365. None
  366. """
  367. if self.events is None:
  368. self.events = []
  369. if isinstance(events, dict):
  370. self.events.append(events) # type: ignore[arg-type]
  371. elif isinstance(events, str):
  372. self.events.append(
  373. {
  374. "name": "event",
  375. "time": datetime.now(timezone.utc).isoformat(),
  376. "message": events,
  377. }
  378. )
  379. else:
  380. self.events.extend(events) # type: ignore[arg-type]
  381. def end(
  382. self,
  383. *,
  384. outputs: Optional[dict] = None,
  385. error: Optional[str] = None,
  386. end_time: Optional[datetime] = None,
  387. events: Optional[Sequence[ls_schemas.RunEvent]] = None,
  388. metadata: Optional[dict[str, Any]] = None,
  389. ) -> None:
  390. """Set the end time of the run and all child runs."""
  391. self.end_time = end_time or datetime.now(timezone.utc)
  392. # We've already 'set' the outputs, so ignore
  393. # the ones that are automatically included
  394. if not self.extra.get(OVERRIDE_OUTPUTS):
  395. if outputs is not None:
  396. if not self.outputs:
  397. self.outputs = outputs
  398. else:
  399. self.outputs.update(outputs)
  400. if error is not None:
  401. self.error = error
  402. if events is not None:
  403. self.add_event(events)
  404. if metadata is not None:
  405. self.add_metadata(metadata)
  406. def create_child(
  407. self,
  408. name: str,
  409. run_type: RUN_TYPE_T = "chain",
  410. *,
  411. run_id: Optional[ID_TYPE] = None,
  412. serialized: Optional[dict] = None,
  413. inputs: Optional[dict] = None,
  414. outputs: Optional[dict] = None,
  415. error: Optional[str] = None,
  416. reference_example_id: Optional[UUID] = None,
  417. start_time: Optional[datetime] = None,
  418. end_time: Optional[datetime] = None,
  419. tags: Optional[list[str]] = None,
  420. extra: Optional[dict] = None,
  421. attachments: Optional[ls_schemas.Attachments] = None,
  422. ) -> RunTree:
  423. """Add a child run to the run tree."""
  424. serialized_ = serialized or {"name": name}
  425. run = RunTree(
  426. name=name,
  427. id=_ensure_uuid(run_id),
  428. serialized=serialized_,
  429. inputs=inputs or {},
  430. outputs=outputs or {},
  431. error=error,
  432. run_type=run_type,
  433. reference_example_id=reference_example_id,
  434. start_time=start_time or datetime.now(timezone.utc),
  435. end_time=end_time,
  436. extra=extra or {},
  437. parent_run=self,
  438. project_name=self.session_name,
  439. replicas=self.replicas,
  440. ls_client=self.ls_client,
  441. tags=tags,
  442. attachments=attachments or {}, # type: ignore
  443. dangerously_allow_filesystem=self.dangerously_allow_filesystem,
  444. )
  445. return run
  446. def _get_dicts_safe(self):
  447. # Things like generators cannot be copied
  448. self_dict = self.dict(
  449. exclude={"child_runs", "inputs", "outputs"}, exclude_none=True
  450. )
  451. if self.inputs is not None:
  452. # shallow copy. deep copying will occur in the client
  453. self_dict["inputs"] = self.inputs.copy()
  454. if self.outputs is not None:
  455. # shallow copy; deep copying will occur in the client
  456. self_dict["outputs"] = self.outputs.copy()
  457. return self_dict
  458. def _slice_parent_id(self, parent_id: str, run_dict: dict) -> None:
  459. """Slice the parent id from dotted order.
  460. Additionally check if the current run is a child of the parent. If so, update
  461. the parent_run_id to None, and set the trace id to the new root id after
  462. parent_id.
  463. """
  464. if dotted_order := run_dict.get("dotted_order"):
  465. segs = dotted_order.split(".")
  466. start_idx = None
  467. parent_id = str(parent_id)
  468. # TODO(angus): potentially use binary search to find the index
  469. for idx, part in enumerate(segs):
  470. seg_id = part[-TIMESTAMP_LENGTH:]
  471. if str(seg_id) == parent_id:
  472. start_idx = idx
  473. break
  474. if start_idx is not None:
  475. # Trim segments to start after parent_id (exclusive)
  476. trimmed_segs = segs[start_idx + 1 :]
  477. # Rebuild dotted_order
  478. run_dict["dotted_order"] = ".".join(trimmed_segs)
  479. if trimmed_segs:
  480. run_dict["trace_id"] = UUID(trimmed_segs[0][-TIMESTAMP_LENGTH:])
  481. else:
  482. run_dict["trace_id"] = run_dict["id"]
  483. if str(run_dict.get("parent_run_id")) == parent_id:
  484. # We've found the new root node.
  485. run_dict.pop("parent_run_id", None)
  486. def _remap_for_project(
  487. self, project_name: str, updates: Optional[dict] = None
  488. ) -> dict:
  489. """Rewrites ids/dotted_order for a given project with optional updates."""
  490. run_dict = self._get_dicts_safe()
  491. if project_name == self.session_name:
  492. return run_dict
  493. if updates and updates.get("reroot", False):
  494. distributed_parent_id = _DISTRIBUTED_PARENT_ID.get()
  495. if distributed_parent_id:
  496. self._slice_parent_id(distributed_parent_id, run_dict)
  497. old_id = run_dict["id"]
  498. new_id = uuid5(NAMESPACE_DNS, f"{old_id}:{project_name}")
  499. # trace id
  500. old_trace = run_dict.get("trace_id")
  501. if old_trace:
  502. new_trace = uuid5(NAMESPACE_DNS, f"{old_trace}:{project_name}")
  503. else:
  504. new_trace = None
  505. # parent id
  506. parent = run_dict.get("parent_run_id")
  507. if parent:
  508. new_parent = uuid5(NAMESPACE_DNS, f"{parent}:{project_name}")
  509. else:
  510. new_parent = None
  511. # dotted order
  512. if run_dict.get("dotted_order"):
  513. segs = run_dict["dotted_order"].split(".")
  514. rebuilt = []
  515. for part in segs[:-1]:
  516. repl = uuid5(
  517. NAMESPACE_DNS, f"{part[-TIMESTAMP_LENGTH:]}:{project_name}"
  518. )
  519. rebuilt.append(part[:-TIMESTAMP_LENGTH] + str(repl))
  520. rebuilt.append(segs[-1][:-TIMESTAMP_LENGTH] + str(new_id))
  521. dotted = ".".join(rebuilt)
  522. else:
  523. dotted = None
  524. dup = utils.deepish_copy(run_dict)
  525. dup.update(
  526. {
  527. "id": new_id,
  528. "trace_id": new_trace,
  529. "parent_run_id": new_parent,
  530. "dotted_order": dotted,
  531. "session_name": project_name,
  532. }
  533. )
  534. if updates:
  535. dup.update(updates)
  536. return dup
  537. def post(self, exclude_child_runs: bool = True) -> None:
  538. """Post the run tree to the API asynchronously."""
  539. if self.replicas:
  540. for replica in self.replicas:
  541. project_name = replica.get("project_name") or self.session_name
  542. updates = replica.get("updates")
  543. run_dict = self._remap_for_project(project_name, updates)
  544. self.client.create_run(
  545. **run_dict,
  546. api_key=replica.get("api_key"),
  547. api_url=replica.get("api_url"),
  548. )
  549. else:
  550. kwargs = self._get_dicts_safe()
  551. self.client.create_run(**kwargs)
  552. if self.attachments:
  553. keys = [str(name) for name in self.attachments]
  554. self.events.append(
  555. {
  556. "name": "uploaded_attachment",
  557. "time": datetime.now(timezone.utc).isoformat(),
  558. "message": set(keys),
  559. }
  560. )
  561. if not exclude_child_runs:
  562. for child_run in self.child_runs:
  563. child_run.post(exclude_child_runs=False)
  564. def patch(self, *, exclude_inputs: bool = False) -> None:
  565. """Patch the run tree to the API in a background thread.
  566. Args:
  567. exclude_inputs: Whether to exclude inputs from the patch request.
  568. """
  569. if not self.end_time:
  570. self.end()
  571. attachments = {
  572. a: v for a, v in self.attachments.items() if isinstance(v, tuple)
  573. }
  574. try:
  575. # Avoid loading the same attachment twice
  576. if attachments:
  577. uploaded = next(
  578. (
  579. ev
  580. for ev in self.events
  581. if ev.get("name") == "uploaded_attachment"
  582. ),
  583. None,
  584. )
  585. if uploaded:
  586. attachments = {
  587. a: v
  588. for a, v in attachments.items()
  589. if a not in uploaded["message"]
  590. }
  591. except Exception as e:
  592. logger.warning(f"Error filtering attachments to upload: {e}")
  593. if self.replicas:
  594. for replica in self.replicas:
  595. project_name = replica.get("project_name") or self.session_name
  596. updates = replica.get("updates")
  597. run_dict = self._remap_for_project(project_name, updates)
  598. self.client.update_run(
  599. name=run_dict["name"],
  600. run_id=run_dict["id"],
  601. run_type=run_dict.get("run_type"),
  602. start_time=run_dict.get("start_time"),
  603. inputs=None if exclude_inputs else run_dict["inputs"],
  604. outputs=run_dict["outputs"],
  605. error=run_dict.get("error"),
  606. parent_run_id=run_dict.get("parent_run_id"),
  607. session_name=run_dict.get("session_name"),
  608. reference_example_id=run_dict.get("reference_example_id"),
  609. end_time=run_dict.get("end_time"),
  610. dotted_order=run_dict.get("dotted_order"),
  611. trace_id=run_dict.get("trace_id"),
  612. events=run_dict.get("events"),
  613. tags=run_dict.get("tags"),
  614. extra=run_dict.get("extra"),
  615. attachments=attachments,
  616. api_key=replica.get("api_key"),
  617. api_url=replica.get("api_url"),
  618. )
  619. else:
  620. self.client.update_run(
  621. name=self.name,
  622. run_id=self.id,
  623. run_type=cast(RUN_TYPE_T, self.run_type),
  624. start_time=self.start_time,
  625. inputs=(
  626. None
  627. if exclude_inputs
  628. else (self.inputs.copy() if self.inputs else None)
  629. ),
  630. outputs=self.outputs.copy() if self.outputs else None,
  631. error=self.error,
  632. parent_run_id=self.parent_run_id,
  633. session_name=self.session_name,
  634. reference_example_id=self.reference_example_id,
  635. end_time=self.end_time,
  636. dotted_order=self.dotted_order,
  637. trace_id=self.trace_id,
  638. events=self.events,
  639. tags=self.tags,
  640. extra=self.extra,
  641. attachments=attachments,
  642. )
  643. def wait(self) -> None:
  644. """Wait for all `_futures` to complete."""
  645. pass
  646. def get_url(self) -> str:
  647. """Return the URL of the run."""
  648. return self.client.get_run_url(run=self)
  649. @classmethod
  650. def from_dotted_order(
  651. cls,
  652. dotted_order: str,
  653. **kwargs: Any,
  654. ) -> RunTree:
  655. """Create a new 'child' span from the provided dotted order.
  656. Returns:
  657. RunTree: The new span.
  658. """
  659. headers = {
  660. LANGSMITH_DOTTED_ORDER: dotted_order,
  661. }
  662. return cast(RunTree, cls.from_headers(headers, **kwargs)) # type: ignore[arg-type]
  663. @classmethod
  664. def from_runnable_config(
  665. cls,
  666. config: Optional[dict],
  667. **kwargs: Any,
  668. ) -> Optional[RunTree]:
  669. """Create a new 'child' span from the provided runnable config.
  670. Requires `langchain` to be installed.
  671. Returns:
  672. The new span or `None` if no parent span information is found.
  673. """
  674. try:
  675. from langchain_core.callbacks.manager import (
  676. AsyncCallbackManager,
  677. CallbackManager,
  678. )
  679. from langchain_core.runnables import RunnableConfig, ensure_config
  680. from langchain_core.tracers.langchain import LangChainTracer
  681. except ImportError as e:
  682. raise ImportError(
  683. "RunTree.from_runnable_config requires langchain-core to be installed. "
  684. "You can install it with `pip install langchain-core`."
  685. ) from e
  686. if config is None:
  687. config_ = ensure_config(
  688. cast(RunnableConfig, config) if isinstance(config, dict) else None
  689. )
  690. else:
  691. config_ = cast(RunnableConfig, config)
  692. if (
  693. (cb := config_.get("callbacks"))
  694. and isinstance(cb, (CallbackManager, AsyncCallbackManager))
  695. and cb.parent_run_id
  696. and (
  697. tracer := next(
  698. (t for t in cb.handlers if isinstance(t, LangChainTracer)),
  699. None,
  700. )
  701. )
  702. ):
  703. if (run := tracer.run_map.get(str(cb.parent_run_id))) and run.dotted_order:
  704. dotted_order = run.dotted_order
  705. kwargs["run_type"] = run.run_type
  706. kwargs["inputs"] = run.inputs
  707. kwargs["outputs"] = run.outputs
  708. kwargs["start_time"] = run.start_time
  709. kwargs["end_time"] = run.end_time
  710. kwargs["tags"] = sorted(set(run.tags or [] + kwargs.get("tags", [])))
  711. kwargs["name"] = run.name
  712. extra_ = kwargs.setdefault("extra", {})
  713. metadata_ = extra_.setdefault("metadata", {})
  714. metadata_.update(run.metadata)
  715. elif hasattr(tracer, "order_map") and cb.parent_run_id in tracer.order_map:
  716. dotted_order = tracer.order_map[cb.parent_run_id][1]
  717. else:
  718. return None
  719. kwargs["client"] = tracer.client
  720. kwargs["project_name"] = tracer.project_name
  721. return RunTree.from_dotted_order(dotted_order, **kwargs)
  722. return None
  723. @classmethod
  724. def from_headers(
  725. cls, headers: Mapping[Union[str, bytes], Union[str, bytes]], **kwargs: Any
  726. ) -> Optional[RunTree]:
  727. """Create a new 'parent' span from the provided headers.
  728. Extracts parent span information from the headers and creates a new span.
  729. Metadata and tags are extracted from the baggage header.
  730. The dotted order and trace id are extracted from the trace header.
  731. Returns:
  732. The new span or `None` if no parent span information is found.
  733. """
  734. init_args = kwargs.copy()
  735. langsmith_trace = cast(Optional[str], headers.get(LANGSMITH_DOTTED_ORDER))
  736. if not langsmith_trace:
  737. langsmith_trace_bytes = cast(
  738. Optional[bytes], headers.get(LANGSMITH_DOTTED_ORDER_BYTES)
  739. )
  740. if not langsmith_trace_bytes:
  741. return # type: ignore[return-value]
  742. langsmith_trace = langsmith_trace_bytes.decode("utf-8")
  743. parent_dotted_order = langsmith_trace.strip()
  744. parsed_dotted_order = _parse_dotted_order(parent_dotted_order)
  745. trace_id = parsed_dotted_order[0][1]
  746. init_args["trace_id"] = trace_id
  747. init_args["id"] = parsed_dotted_order[-1][1]
  748. init_args["dotted_order"] = parent_dotted_order
  749. if len(parsed_dotted_order) >= 2:
  750. # Has a parent
  751. init_args["parent_run_id"] = parsed_dotted_order[-2][1]
  752. # All placeholders. We assume the source process
  753. # handles the life-cycle of the run.
  754. init_args["start_time"] = init_args.get("start_time") or datetime.now(
  755. timezone.utc
  756. )
  757. init_args["run_type"] = init_args.get("run_type") or "chain"
  758. init_args["name"] = init_args.get("name") or "parent"
  759. baggage = _Baggage.from_headers(headers)
  760. if baggage.metadata or baggage.tags:
  761. init_args["extra"] = init_args.setdefault("extra", {})
  762. init_args["extra"]["metadata"] = init_args["extra"].setdefault(
  763. "metadata", {}
  764. )
  765. metadata = {**baggage.metadata, **init_args["extra"]["metadata"]}
  766. init_args["extra"]["metadata"] = metadata
  767. tags = sorted(set(baggage.tags + init_args.get("tags", [])))
  768. init_args["tags"] = tags
  769. if baggage.project_name:
  770. init_args["project_name"] = baggage.project_name
  771. if baggage.replicas:
  772. init_args["replicas"] = baggage.replicas
  773. run_tree = RunTree(**init_args)
  774. # Set the distributed parent ID to this run's ID for rerooting
  775. _DISTRIBUTED_PARENT_ID.set(str(run_tree.id))
  776. return run_tree
  777. def to_headers(self) -> dict[str, str]:
  778. """Return the `RunTree` as a dictionary of headers."""
  779. headers = {}
  780. if self.trace_id:
  781. headers[f"{LANGSMITH_DOTTED_ORDER}"] = self.dotted_order
  782. baggage = _Baggage(
  783. metadata=self.extra.get("metadata", {}),
  784. tags=self.tags,
  785. project_name=self.session_name,
  786. replicas=self.replicas,
  787. )
  788. headers["baggage"] = baggage.to_header()
  789. return headers
  790. def __repr__(self):
  791. """Return a string representation of the `RunTree` object."""
  792. return (
  793. f"RunTree(id={self.id}, name='{self.name}', "
  794. f"run_type='{self.run_type}', dotted_order='{self.dotted_order}')"
  795. )
  796. class _Baggage:
  797. """Baggage header information."""
  798. def __init__(
  799. self,
  800. metadata: Optional[dict[str, str]] = None,
  801. tags: Optional[list[str]] = None,
  802. project_name: Optional[str] = None,
  803. replicas: Optional[Sequence[WriteReplica]] = None,
  804. ):
  805. """Initialize the Baggage object."""
  806. self.metadata = metadata or {}
  807. self.tags = tags or []
  808. self.project_name = project_name
  809. self.replicas = replicas or []
  810. @classmethod
  811. def from_header(cls, header_value: Optional[str]) -> _Baggage:
  812. """Create a Baggage object from the given header value."""
  813. if not header_value:
  814. return cls()
  815. metadata = {}
  816. tags = []
  817. project_name = None
  818. replicas: Optional[list[WriteReplica]] = None
  819. try:
  820. for item in header_value.split(","):
  821. key, value = item.split("=", 1)
  822. if key == LANGSMITH_METADATA:
  823. metadata = json.loads(urllib.parse.unquote(value))
  824. elif key == LANGSMITH_TAGS:
  825. tags = urllib.parse.unquote(value).split(",")
  826. elif key == LANGSMITH_PROJECT:
  827. project_name = urllib.parse.unquote(value)
  828. elif key == LANGSMITH_REPLICAS:
  829. replicas_data = json.loads(urllib.parse.unquote(value))
  830. parsed_replicas: list[WriteReplica] = []
  831. for replica_item in replicas_data:
  832. if (
  833. isinstance(replica_item, (tuple, list))
  834. and len(replica_item) == 2
  835. ):
  836. # Convert legacy format to WriteReplica
  837. parsed_replicas.append(
  838. WriteReplica(
  839. api_url=None,
  840. api_key=None,
  841. project_name=str(replica_item[0]),
  842. updates=replica_item[1],
  843. )
  844. )
  845. elif isinstance(replica_item, dict):
  846. # New WriteReplica format: preserve as dict
  847. parsed_replicas.append(cast(WriteReplica, replica_item))
  848. else:
  849. logger.warning(
  850. f"Unknown replica format in baggage: {replica_item}"
  851. )
  852. continue
  853. replicas = parsed_replicas
  854. except Exception as e:
  855. logger.warning(f"Error parsing baggage header: {e}")
  856. return cls(
  857. metadata=metadata, tags=tags, project_name=project_name, replicas=replicas
  858. )
  859. @classmethod
  860. def from_headers(cls, headers: Mapping[Union[str, bytes], Any]) -> _Baggage:
  861. if "baggage" in headers:
  862. return cls.from_header(headers["baggage"])
  863. elif b"baggage" in headers:
  864. return cls.from_header(cast(bytes, headers[b"baggage"]).decode("utf-8"))
  865. else:
  866. return cls.from_header(None)
  867. def to_header(self) -> str:
  868. """Return the Baggage object as a header value."""
  869. items = []
  870. if self.metadata:
  871. serialized_metadata = _dumps_json(self.metadata)
  872. items.append(
  873. f"{LANGSMITH_PREFIX}metadata={urllib.parse.quote(serialized_metadata)}"
  874. )
  875. if self.tags:
  876. serialized_tags = ",".join(self.tags)
  877. items.append(
  878. f"{LANGSMITH_PREFIX}tags={urllib.parse.quote(serialized_tags)}"
  879. )
  880. if self.project_name:
  881. items.append(
  882. f"{LANGSMITH_PREFIX}project={urllib.parse.quote(self.project_name)}"
  883. )
  884. if self.replicas:
  885. serialized_replicas = _dumps_json(self.replicas)
  886. items.append(
  887. f"{LANGSMITH_PREFIX}replicas={urllib.parse.quote(serialized_replicas)}"
  888. )
  889. return ",".join(items)
  890. @functools.lru_cache(maxsize=1)
  891. def _parse_write_replicas_from_env_var(env_var: Optional[str]) -> list[WriteReplica]:
  892. """Parse write replicas from LANGSMITH_RUNS_ENDPOINTS environment variable value.
  893. Supports array format [{"api_url": "x", "api_key": "y"}] and object format
  894. {"url": "key"}.
  895. """
  896. if not env_var:
  897. return []
  898. try:
  899. parsed = json.loads(env_var)
  900. if isinstance(parsed, list):
  901. replicas = []
  902. for item in parsed:
  903. if not isinstance(item, dict):
  904. logger.warning(
  905. f"Invalid item type in LANGSMITH_RUNS_ENDPOINTS: "
  906. f"expected dict, got {type(item).__name__}"
  907. )
  908. continue
  909. api_url = item.get("api_url")
  910. api_key = item.get("api_key")
  911. if not isinstance(api_url, str):
  912. logger.warning(
  913. f"Invalid api_url type in LANGSMITH_RUNS_ENDPOINTS: "
  914. f"expected string, got {type(api_url).__name__}"
  915. )
  916. continue
  917. if not isinstance(api_key, str):
  918. logger.warning(
  919. f"Invalid api_key type in LANGSMITH_RUNS_ENDPOINTS: "
  920. f"expected string, got {type(api_key).__name__}"
  921. )
  922. continue
  923. replicas.append(
  924. WriteReplica(
  925. api_url=api_url.rstrip("/"),
  926. api_key=api_key,
  927. project_name=None,
  928. updates=None,
  929. )
  930. )
  931. return replicas
  932. elif isinstance(parsed, dict):
  933. _check_endpoint_env_unset(parsed)
  934. replicas = []
  935. for url, key in parsed.items():
  936. url = url.rstrip("/")
  937. if isinstance(key, str):
  938. replicas.append(
  939. WriteReplica(
  940. api_url=url,
  941. api_key=key,
  942. project_name=None,
  943. updates=None,
  944. )
  945. )
  946. else:
  947. logger.warning(
  948. f"Invalid value type in LANGSMITH_RUNS_ENDPOINTS for URL "
  949. f"{url}: "
  950. f"expected string, got {type(key).__name__}"
  951. )
  952. continue
  953. return replicas
  954. else:
  955. logger.warning(
  956. f"Invalid LANGSMITH_RUNS_ENDPOINTS – must be valid JSON list of "
  957. "objects with api_url and api_key properties, or object mapping "
  958. f"url->apiKey, got {type(parsed).__name__}"
  959. )
  960. return []
  961. except utils.LangSmithUserError:
  962. raise
  963. except Exception as e:
  964. logger.warning(
  965. "Invalid LANGSMITH_RUNS_ENDPOINTS – must be valid JSON list of "
  966. f"objects with api_url and api_key properties, or object mapping"
  967. f" url->apiKey: {e}"
  968. )
  969. return []
  970. def _get_write_replicas_from_env() -> list[WriteReplica]:
  971. """Get write replicas from LANGSMITH_RUNS_ENDPOINTS environment variable."""
  972. env_var = utils.get_env_var("RUNS_ENDPOINTS")
  973. return _parse_write_replicas_from_env_var(env_var)
  974. def _check_endpoint_env_unset(parsed: dict[str, str]) -> None:
  975. """Check if endpoint environment variables conflict with runs endpoints."""
  976. import os
  977. if parsed and (os.getenv("LANGSMITH_ENDPOINT") or os.getenv("LANGCHAIN_ENDPOINT")):
  978. raise utils.LangSmithUserError(
  979. "You cannot provide both LANGSMITH_ENDPOINT / LANGCHAIN_ENDPOINT "
  980. "and LANGSMITH_RUNS_ENDPOINTS."
  981. )
  982. def _ensure_write_replicas(
  983. replicas: Optional[Sequence[WriteReplica]],
  984. ) -> list[WriteReplica]:
  985. """Convert replicas to WriteReplica format."""
  986. if replicas is None:
  987. return _get_write_replicas_from_env()
  988. # All replicas should now be WriteReplica dicts
  989. return list(replicas)
  990. def _parse_dotted_order(dotted_order: str) -> list[tuple[datetime, UUID]]:
  991. """Parse the dotted order string."""
  992. parts = dotted_order.split(".")
  993. return [
  994. (
  995. datetime.strptime(part[:-TIMESTAMP_LENGTH], "%Y%m%dT%H%M%S%fZ"),
  996. UUID(part[-TIMESTAMP_LENGTH:]),
  997. )
  998. for part in parts
  999. ]
  1000. _CLIENT: Optional[Client] = _context._GLOBAL_CLIENT
  1001. __all__ = ["RunTree", "RunTree"]
  1002. def _create_current_dotted_order(
  1003. start_time: Optional[datetime], run_id: Optional[UUID]
  1004. ) -> str:
  1005. """Create the current dotted order."""
  1006. st = start_time or datetime.now(timezone.utc)
  1007. id_ = run_id or uuid7_from_datetime(st)
  1008. return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_)