run_helpers.py 73 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115
  1. """Decorator for creating a run tree from functions."""
  2. from __future__ import annotations
  3. import asyncio
  4. import contextlib
  5. import contextvars
  6. import datetime
  7. import functools
  8. import inspect
  9. import logging
  10. import warnings
  11. from collections.abc import (
  12. AsyncGenerator,
  13. AsyncIterator,
  14. Awaitable,
  15. Generator,
  16. Iterator,
  17. Mapping,
  18. Sequence,
  19. )
  20. from contextvars import copy_context
  21. from typing import (
  22. TYPE_CHECKING,
  23. Annotated,
  24. Any,
  25. Callable,
  26. Generic,
  27. Literal,
  28. Optional,
  29. Protocol,
  30. TypedDict,
  31. TypeVar,
  32. Union,
  33. cast,
  34. get_type_hints,
  35. overload,
  36. runtime_checkable,
  37. )
  38. from typing_extensions import ParamSpec, TypeGuard, get_args, get_origin
  39. import langsmith._internal._context as _context
  40. from langsmith import client as ls_client
  41. from langsmith import run_trees, schemas, utils
  42. from langsmith._internal import _aiter as aitertools
  43. from langsmith.env import _runtime_env
  44. from langsmith.run_trees import WriteReplica
  45. if TYPE_CHECKING:
  46. from types import TracebackType
  47. from langchain_core.runnables import Runnable
  48. LOGGER = logging.getLogger(__name__)
  49. _CONTEXT_KEYS: dict[str, contextvars.ContextVar] = {
  50. "parent": _context._PARENT_RUN_TREE,
  51. "project_name": _context._PROJECT_NAME,
  52. "tags": _context._TAGS,
  53. "metadata": _context._METADATA,
  54. "enabled": _context._TRACING_ENABLED,
  55. "client": _context._CLIENT,
  56. "replicas": run_trees._REPLICAS,
  57. "distributed_parent_id": run_trees._DISTRIBUTED_PARENT_ID,
  58. }
  59. _EXCLUDED_FRAME_FNAME = "langsmith/run_helpers.py"
  60. _OTEL_AVAILABLE: Optional[bool] = None
  61. def get_current_run_tree() -> Optional[run_trees.RunTree]:
  62. """Get the current run tree."""
  63. return _context._PARENT_RUN_TREE.get()
  64. def set_run_metadata(**metadata: Any) -> None:
  65. """Update metadata on the current run tree."""
  66. run_tree = get_current_run_tree()
  67. if run_tree is None:
  68. LOGGER.warning(
  69. "No active run tree found. Call `set_run_metadata` inside a traced run."
  70. )
  71. else:
  72. run_tree.metadata.update(metadata)
  73. return
  74. def get_tracing_context(
  75. context: Optional[contextvars.Context] = None,
  76. ) -> dict[str, Any]:
  77. """Get the current tracing context."""
  78. if context is None:
  79. return {
  80. "parent": _context._PARENT_RUN_TREE.get(),
  81. "project_name": _context._PROJECT_NAME.get(),
  82. "tags": _context._TAGS.get(),
  83. "metadata": _context._METADATA.get(),
  84. "enabled": _context._TRACING_ENABLED.get(),
  85. "client": _context._CLIENT.get(),
  86. "replicas": run_trees._REPLICAS.get(),
  87. "distributed_parent_id": run_trees._DISTRIBUTED_PARENT_ID.get(),
  88. }
  89. return {k: context.get(v) for k, v in _CONTEXT_KEYS.items()}
  90. @contextlib.contextmanager
  91. def tracing_context(
  92. *,
  93. project_name: Optional[str] = None,
  94. tags: Optional[list[str]] = None,
  95. metadata: Optional[dict[str, Any]] = None,
  96. parent: Optional[Union[run_trees.RunTree, Mapping, str, Literal[False]]] = None,
  97. enabled: Optional[Union[bool, Literal["local"]]] = None,
  98. client: Optional[ls_client.Client] = None,
  99. replicas: Optional[Sequence[WriteReplica]] = None,
  100. distributed_parent_id: Optional[str] = None,
  101. **kwargs: Any,
  102. ) -> Generator[None, None, None]:
  103. """Set the tracing context for a block of code.
  104. Args:
  105. project_name: The name of the project to log the run to.
  106. tags: The tags to add to the run.
  107. metadata: The metadata to add to the run.
  108. parent: The parent run to use for the context.
  109. Can be a Run/`RunTree` object, request headers (for distributed tracing),
  110. or the dotted order string.
  111. client: The client to use for logging the run to LangSmith.
  112. enabled: Whether tracing is enabled.
  113. Defaults to `None`, meaning it will use the current context value or environment variables.
  114. replicas: A sequence of `WriteReplica` dictionaries to send runs to.
  115. Example: `[{"api_url": "https://api.example.com", "api_key": "key", "project_name": "proj"}]`
  116. or `[{"project_name": "my_experiment", "updates": {"reference_example_id": None}}]`
  117. distributed_parent_id: The distributed parent ID for distributed tracing. Defaults to None.
  118. """
  119. if kwargs:
  120. # warn
  121. warnings.warn(
  122. f"Unrecognized keyword arguments: {kwargs}.",
  123. DeprecationWarning,
  124. )
  125. current_context = get_tracing_context()
  126. parent_run = (
  127. _get_parent_run({"parent": parent or kwargs.get("parent_run")})
  128. if parent is not False
  129. else None
  130. )
  131. distributed_parent_id_to_use = distributed_parent_id
  132. if distributed_parent_id_to_use is None and parent_run is not None:
  133. # TODO(angus): decide if we want to merge tags and metadata
  134. tags = sorted(set(tags or []) | set(parent_run.tags or []))
  135. metadata = {**parent_run.metadata, **(metadata or {})}
  136. distributed_parent_id_to_use = parent_run.id # type: ignore[assignment]
  137. enabled = enabled if enabled is not None else current_context.get("enabled")
  138. _set_tracing_context(
  139. {
  140. "parent": parent_run,
  141. "project_name": project_name,
  142. "tags": tags,
  143. "metadata": metadata,
  144. "enabled": enabled,
  145. "client": client,
  146. "replicas": replicas,
  147. "distributed_parent_id": distributed_parent_id_to_use,
  148. }
  149. )
  150. try:
  151. yield
  152. finally:
  153. _set_tracing_context(current_context)
  154. # Alias for backwards compatibility
  155. get_run_tree_context = get_current_run_tree
  156. def is_traceable_function(func: Any) -> TypeGuard[SupportsLangsmithExtra[P, R]]:
  157. """Check if a function is `@traceable` decorated."""
  158. return (
  159. _is_traceable_function(func)
  160. or (isinstance(func, functools.partial) and _is_traceable_function(func.func))
  161. or (hasattr(func, "__call__") and _is_traceable_function(func.__call__))
  162. )
  163. def ensure_traceable(
  164. func: Callable[P, R],
  165. *,
  166. name: Optional[str] = None,
  167. metadata: Optional[Mapping[str, Any]] = None,
  168. tags: Optional[list[str]] = None,
  169. client: Optional[ls_client.Client] = None,
  170. reduce_fn: Optional[Callable[[Sequence], Union[dict, str]]] = None,
  171. project_name: Optional[str] = None,
  172. process_inputs: Optional[Callable[[dict], dict]] = None,
  173. process_outputs: Optional[Callable[..., dict]] = None,
  174. process_chunk: Optional[Callable] = None,
  175. ) -> SupportsLangsmithExtra[P, R]:
  176. """Ensure that a function is traceable."""
  177. if is_traceable_function(func):
  178. return func
  179. return traceable(
  180. name=name,
  181. metadata=metadata,
  182. tags=tags,
  183. client=client,
  184. reduce_fn=reduce_fn,
  185. project_name=project_name,
  186. process_inputs=process_inputs,
  187. process_outputs=process_outputs,
  188. process_chunk=process_chunk,
  189. )(func)
  190. def is_async(func: Callable) -> bool:
  191. """Inspect function or wrapped function to see if it is async."""
  192. return inspect.iscoroutinefunction(func) or (
  193. hasattr(func, "__wrapped__") and inspect.iscoroutinefunction(func.__wrapped__)
  194. )
  195. class LangSmithExtra(TypedDict, total=False):
  196. """Any additional info to be injected into the run dynamically."""
  197. name: Optional[str]
  198. """Optional name for the run."""
  199. reference_example_id: Optional[ls_client.ID_TYPE]
  200. """Optional ID of a reference example."""
  201. run_extra: Optional[dict]
  202. """Optional additional run information."""
  203. parent: Optional[Union[run_trees.RunTree, str, Mapping]]
  204. """Optional parent run, can be a RunTree, string, or mapping."""
  205. run_tree: Optional[run_trees.RunTree] # TODO: Deprecate
  206. """Optional run tree (deprecated)."""
  207. project_name: Optional[str]
  208. """Optional name of the project."""
  209. metadata: Optional[dict[str, Any]]
  210. """Optional metadata for the run."""
  211. tags: Optional[list[str]]
  212. """Optional list of tags for the run."""
  213. run_id: Optional[ls_client.ID_TYPE]
  214. """Optional ID for the run."""
  215. client: Optional[ls_client.Client]
  216. """Optional LangSmith client."""
  217. # Optional callback function to be called if the run succeeds and before it is sent.
  218. _on_success: Optional[Callable[[run_trees.RunTree], None]]
  219. on_end: Optional[Callable[[run_trees.RunTree], Any]]
  220. """Optional callback function to be called after the run ends and is sent."""
  221. R = TypeVar("R", covariant=True)
  222. P = ParamSpec("P")
  223. @runtime_checkable
  224. class SupportsLangsmithExtra(Protocol, Generic[P, R]):
  225. """Implementations of this Protocol accept an optional langsmith_extra parameter."""
  226. def __call__( # type: ignore[valid-type]
  227. self,
  228. *args: P.args,
  229. langsmith_extra: Optional[LangSmithExtra] = None,
  230. **kwargs: P.kwargs,
  231. ) -> R:
  232. """Call the instance when it is called as a function.
  233. Args:
  234. *args: Variable length argument list.
  235. langsmith_extra: Optional dictionary containing additional
  236. parameters specific to Langsmith.
  237. **kwargs: Arbitrary keyword arguments.
  238. Returns:
  239. R: The return value of the method.
  240. """
  241. ...
  242. def _extract_usage(
  243. *,
  244. run_tree: run_trees.RunTree,
  245. outputs: Optional[dict] = None,
  246. **kwargs: Any,
  247. ) -> Optional[schemas.ExtractedUsageMetadata]:
  248. from_metadata = (run_tree.metadata or {}).get("usage_metadata")
  249. return (outputs or {}).get("usage_metadata") or from_metadata
  250. @overload
  251. def traceable(
  252. func: Callable[P, R],
  253. ) -> SupportsLangsmithExtra[P, R]: ...
  254. @overload
  255. def traceable(
  256. run_type: ls_client.RUN_TYPE_T = "chain",
  257. *,
  258. name: Optional[str] = None,
  259. metadata: Optional[Mapping[str, Any]] = None,
  260. tags: Optional[list[str]] = None,
  261. client: Optional[ls_client.Client] = None,
  262. reduce_fn: Optional[Callable[[Sequence], Union[dict, str]]] = None,
  263. project_name: Optional[str] = None,
  264. process_inputs: Optional[Callable[[dict], dict]] = None,
  265. process_outputs: Optional[Callable[..., dict]] = None,
  266. process_chunk: Optional[Callable] = None,
  267. _invocation_params_fn: Optional[Callable[[dict], dict]] = None,
  268. dangerously_allow_filesystem: bool = False,
  269. ) -> Callable[[Callable[P, R]], SupportsLangsmithExtra[P, R]]: ...
  270. def traceable(
  271. *args: Any,
  272. **kwargs: Any,
  273. ) -> Union[Callable, Callable[[Callable], Callable]]:
  274. """Trace a function with langsmith.
  275. Args:
  276. run_type: The type of run (span) to create.
  277. Examples: `llm`, `chain`, `tool`, `prompt`, `retriever`, etc.
  278. Defaults to "chain".
  279. name: The name of the run. Defaults to the function name.
  280. metadata: The metadata to add to the run. Defaults to `None`.
  281. tags: The tags to add to the run. Defaults to `None`.
  282. client: The client to use for logging the run to LangSmith. Defaults to
  283. `None`, which will use the default client.
  284. reduce_fn: A function to reduce the output of the function if the function
  285. returns a generator.
  286. Defaults to `None`, which means the values will be logged as a list.
  287. !!! note
  288. If the iterator is never exhausted (e.g. the function returns an
  289. infinite generator), this will never be called, and the run itself will
  290. be stuck in a pending state.
  291. project_name: The name of the project to log the run to.
  292. Defaults to `None`, which will use the default project.
  293. process_inputs: Custom serialization / processing function for inputs.
  294. Defaults to `None`.
  295. process_outputs: Custom serialization / processing function for outputs.
  296. Defaults to `None`.
  297. dangerously_allow_filesystem: Whether to allow filesystem access for attachments.
  298. Defaults to `False`.
  299. Traces that reference local filepaths will be uploaded to LangSmith.
  300. In general, network-hosted applications should not be using this because
  301. referenced files are usually on the user's machine, not the host machine.
  302. Returns:
  303. The decorated function.
  304. !!! note
  305. Requires that `LANGSMITH_TRACING_V2` be set to 'true' in the environment.
  306. Examples:
  307. !!! example "Basic usage"
  308. ```python
  309. @traceable
  310. def my_function(x: float, y: float) -> float:
  311. return x + y
  312. my_function(5, 6)
  313. @traceable
  314. async def my_async_function(query_params: dict) -> dict:
  315. async with httpx.AsyncClient() as http_client:
  316. response = await http_client.get(
  317. "https://api.example.com/data",
  318. params=query_params,
  319. )
  320. return response.json()
  321. asyncio.run(my_async_function({"param": "value"}))
  322. ```
  323. !!! example "Streaming data with a generator"
  324. ```python
  325. @traceable
  326. def my_generator(n: int) -> Iterable:
  327. for i in range(n):
  328. yield i
  329. for item in my_generator(5):
  330. print(item)
  331. ```
  332. !!! example "Async streaming data"
  333. ```python
  334. @traceable
  335. async def my_async_generator(query_params: dict) -> Iterable:
  336. async with httpx.AsyncClient() as http_client:
  337. response = await http_client.get(
  338. "https://api.example.com/data",
  339. params=query_params,
  340. )
  341. for item in response.json():
  342. yield item
  343. async def async_code():
  344. async for item in my_async_generator({"param": "value"}):
  345. print(item)
  346. asyncio.run(async_code())
  347. ```
  348. !!! example "Specifying a run type and name"
  349. ```python
  350. @traceable(name="CustomName", run_type="tool")
  351. def another_function(a: float, b: float) -> float:
  352. return a * b
  353. another_function(5, 6)
  354. ```
  355. !!! example "Logging with custom metadata and tags"
  356. ```python
  357. @traceable(
  358. metadata={"version": "1.0", "author": "John Doe"}, tags=["beta", "test"]
  359. )
  360. def tagged_function(x):
  361. return x**2
  362. tagged_function(5)
  363. ```
  364. !!! example "Specifying a custom client and project name"
  365. ```python
  366. custom_client = Client(api_key="your_api_key")
  367. @traceable(client=custom_client, project_name="My Special Project")
  368. def project_specific_function(data):
  369. return data
  370. project_specific_function({"data": "to process"})
  371. ```
  372. !!! example "Manually passing `langsmith_extra`"
  373. ```python
  374. @traceable
  375. def manual_extra_function(x):
  376. return x**2
  377. manual_extra_function(5, langsmith_extra={"metadata": {"version": "1.0"}})
  378. ```
  379. """
  380. run_type = cast(
  381. ls_client.RUN_TYPE_T,
  382. (
  383. args[0]
  384. if args and isinstance(args[0], str)
  385. else (kwargs.pop("run_type", None) or "chain")
  386. ),
  387. )
  388. if run_type not in _VALID_RUN_TYPES:
  389. warnings.warn(
  390. f"Unrecognized run_type: {run_type}. Must be one of: {_VALID_RUN_TYPES}."
  391. f" Did you mean @traceable(name='{run_type}')?"
  392. )
  393. if len(args) > 1:
  394. warnings.warn(
  395. "The `traceable()` decorator only accepts one positional argument, "
  396. "which should be the run_type. All other arguments should be passed "
  397. "as keyword arguments."
  398. )
  399. if "extra" in kwargs:
  400. warnings.warn(
  401. "The `extra` keyword argument is deprecated. Please use `metadata` "
  402. "instead.",
  403. DeprecationWarning,
  404. )
  405. reduce_fn = kwargs.pop("reduce_fn", None)
  406. container_input = _ContainerInput(
  407. # TODO: Deprecate raw extra
  408. extra_outer=kwargs.pop("extra", None),
  409. name=kwargs.pop("name", None),
  410. metadata=kwargs.pop("metadata", None),
  411. tags=kwargs.pop("tags", None),
  412. client=kwargs.pop("client", None),
  413. project_name=kwargs.pop("project_name", None),
  414. run_type=run_type,
  415. process_inputs=kwargs.pop("process_inputs", None),
  416. process_chunk=kwargs.pop("process_chunk", None),
  417. invocation_params_fn=kwargs.pop("_invocation_params_fn", None),
  418. dangerously_allow_filesystem=kwargs.pop("dangerously_allow_filesystem", False),
  419. )
  420. outputs_processor = kwargs.pop("process_outputs", None)
  421. _on_run_end = functools.partial(
  422. _handle_container_end,
  423. outputs_processor=outputs_processor,
  424. )
  425. if kwargs:
  426. warnings.warn(
  427. f"The following keyword arguments are not recognized and will be ignored: "
  428. f"{sorted(kwargs.keys())}.",
  429. DeprecationWarning,
  430. )
  431. def decorator(func: Callable):
  432. func_sig = inspect.signature(func)
  433. func_accepts_parent_run = func_sig.parameters.get("run_tree", None) is not None
  434. func_accepts_config = func_sig.parameters.get("config", None) is not None
  435. @functools.wraps(func)
  436. async def async_wrapper(
  437. *args: Any,
  438. langsmith_extra: Optional[LangSmithExtra] = None,
  439. **kwargs: Any,
  440. ) -> Any:
  441. """Async version of wrapper function."""
  442. if not func_accepts_config:
  443. kwargs.pop("config", None)
  444. run_container = await aitertools.aio_to_thread(
  445. _setup_run,
  446. func,
  447. container_input=container_input,
  448. langsmith_extra=langsmith_extra,
  449. args=args,
  450. kwargs=kwargs,
  451. )
  452. try:
  453. accepts_context = aitertools.asyncio_accepts_context()
  454. if func_accepts_parent_run:
  455. kwargs["run_tree"] = run_container["new_run"]
  456. otel_context_manager = _maybe_create_otel_context(
  457. run_container["new_run"]
  458. )
  459. if otel_context_manager:
  460. async def run_with_otel_context():
  461. with otel_context_manager:
  462. return await func(*args, **kwargs)
  463. if accepts_context:
  464. function_result = await asyncio.create_task( # type: ignore[call-arg]
  465. run_with_otel_context(), context=run_container["context"]
  466. )
  467. else:
  468. # Python < 3.11
  469. with tracing_context(
  470. **get_tracing_context(run_container["context"])
  471. ):
  472. function_result = await run_with_otel_context()
  473. else:
  474. fr_coro = func(*args, **kwargs)
  475. if accepts_context:
  476. function_result = await asyncio.create_task( # type: ignore[call-arg]
  477. fr_coro, context=run_container["context"]
  478. )
  479. else:
  480. # Python < 3.11
  481. with tracing_context(
  482. **get_tracing_context(run_container["context"])
  483. ):
  484. function_result = await fr_coro
  485. except BaseException as e:
  486. # shield from cancellation, given we're catching all exceptions
  487. _cleanup_traceback(e)
  488. await asyncio.shield(
  489. aitertools.aio_to_thread(_on_run_end, run_container, error=e)
  490. )
  491. raise
  492. await aitertools.aio_to_thread(
  493. _on_run_end, run_container, outputs=function_result
  494. )
  495. return function_result
  496. @functools.wraps(func)
  497. async def async_generator_wrapper(
  498. *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any
  499. ) -> AsyncGenerator:
  500. if not func_accepts_config:
  501. kwargs.pop("config", None)
  502. run_container = await aitertools.aio_to_thread(
  503. _setup_run,
  504. func,
  505. container_input=container_input,
  506. langsmith_extra=langsmith_extra,
  507. args=args,
  508. kwargs=kwargs,
  509. )
  510. results: list[Any] = []
  511. try:
  512. if func_accepts_parent_run:
  513. kwargs["run_tree"] = run_container["new_run"]
  514. # TODO: Nesting is ambiguous if a nested traceable function is only
  515. # called mid-generation. Need to explicitly accept run_tree to get
  516. # around this.
  517. otel_context_manager = _maybe_create_otel_context(
  518. run_container["new_run"]
  519. )
  520. async_gen_result = func(*args, **kwargs)
  521. # Can't iterate through if it's a coroutine
  522. accepts_context = aitertools.asyncio_accepts_context()
  523. if inspect.iscoroutine(async_gen_result):
  524. if accepts_context:
  525. async_gen_result = await asyncio.create_task(
  526. async_gen_result, context=run_container["context"]
  527. ) # type: ignore
  528. else:
  529. # Python < 3.11
  530. with tracing_context(
  531. **get_tracing_context(run_container["context"])
  532. ):
  533. async_gen_result = await async_gen_result
  534. async for item in _process_async_iterator(
  535. generator=async_gen_result,
  536. run_container=run_container,
  537. is_llm_run=(
  538. run_container["new_run"].run_type == "llm"
  539. if run_container["new_run"]
  540. else False
  541. ),
  542. accepts_context=accepts_context,
  543. results=results,
  544. process_chunk=container_input.get("process_chunk"),
  545. otel_context_manager=otel_context_manager,
  546. ):
  547. yield item
  548. except BaseException as e:
  549. _cleanup_traceback(e)
  550. await asyncio.shield(
  551. aitertools.aio_to_thread(
  552. _on_run_end,
  553. run_container,
  554. error=e,
  555. outputs=_get_function_result(results, reduce_fn),
  556. )
  557. )
  558. raise
  559. await aitertools.aio_to_thread(
  560. _on_run_end,
  561. run_container,
  562. outputs=_get_function_result(results, reduce_fn),
  563. )
  564. @functools.wraps(func)
  565. def wrapper(
  566. *args: Any,
  567. langsmith_extra: Optional[LangSmithExtra] = None,
  568. **kwargs: Any,
  569. ) -> Any:
  570. """Create a new run or create_child() if run is passed in kwargs."""
  571. if not func_accepts_config:
  572. kwargs.pop("config", None)
  573. run_container = _setup_run(
  574. func,
  575. container_input=container_input,
  576. langsmith_extra=langsmith_extra,
  577. args=args,
  578. kwargs=kwargs,
  579. )
  580. func_accepts_parent_run = (
  581. inspect.signature(func).parameters.get("run_tree", None) is not None
  582. )
  583. try:
  584. if func_accepts_parent_run:
  585. kwargs["run_tree"] = run_container["new_run"]
  586. otel_context_manager = _maybe_create_otel_context(
  587. run_container["new_run"]
  588. )
  589. if otel_context_manager:
  590. def run_with_otel_context():
  591. with otel_context_manager:
  592. return func(*args, **kwargs)
  593. function_result = run_container["context"].run(
  594. run_with_otel_context
  595. )
  596. else:
  597. function_result = run_container["context"].run(
  598. func, *args, **kwargs
  599. )
  600. except BaseException as e:
  601. _cleanup_traceback(e)
  602. _on_run_end(run_container, error=e)
  603. raise
  604. _on_run_end(run_container, outputs=function_result)
  605. return function_result
  606. @functools.wraps(func)
  607. def generator_wrapper(
  608. *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any
  609. ) -> Any:
  610. if not func_accepts_config:
  611. kwargs.pop("config", None)
  612. run_container = _setup_run(
  613. func,
  614. container_input=container_input,
  615. langsmith_extra=langsmith_extra,
  616. args=args,
  617. kwargs=kwargs,
  618. )
  619. func_accepts_parent_run = (
  620. inspect.signature(func).parameters.get("run_tree", None) is not None
  621. )
  622. results: list[Any] = []
  623. function_return: Any = None
  624. try:
  625. if func_accepts_parent_run:
  626. kwargs["run_tree"] = run_container["new_run"]
  627. generator_result = run_container["context"].run(func, *args, **kwargs)
  628. otel_context_manager = _maybe_create_otel_context(
  629. run_container["new_run"]
  630. )
  631. function_return = yield from _process_iterator(
  632. generator_result,
  633. run_container,
  634. is_llm_run=run_type == "llm",
  635. results=results,
  636. process_chunk=container_input.get("process_chunk"),
  637. otel_context_manager=otel_context_manager,
  638. )
  639. if function_return is not None:
  640. results.append(function_return)
  641. except BaseException as e:
  642. _cleanup_traceback(e)
  643. _on_run_end(
  644. run_container,
  645. error=e,
  646. outputs=_get_function_result(results, reduce_fn),
  647. )
  648. raise
  649. _on_run_end(run_container, outputs=_get_function_result(results, reduce_fn))
  650. return function_return
  651. # "Stream" functions (used in methods like OpenAI/Anthropic's SDKs)
  652. # are functions that return iterable responses and should not be
  653. # considered complete until the streaming is completed
  654. @functools.wraps(func)
  655. def stream_wrapper(
  656. *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any
  657. ) -> Any:
  658. if not func_accepts_config:
  659. kwargs.pop("config", None)
  660. trace_container = _setup_run(
  661. func,
  662. container_input=container_input,
  663. langsmith_extra=langsmith_extra,
  664. args=args,
  665. kwargs=kwargs,
  666. )
  667. try:
  668. if func_accepts_parent_run:
  669. kwargs["run_tree"] = trace_container["new_run"]
  670. stream = trace_container["context"].run(func, *args, **kwargs)
  671. except Exception as e:
  672. _cleanup_traceback(e)
  673. _on_run_end(trace_container, error=e)
  674. raise
  675. if hasattr(stream, "__iter__"):
  676. return _TracedStream(stream, trace_container, reduce_fn)
  677. elif hasattr(stream, "__aiter__"):
  678. # sync function -> async iterable (unexpected)
  679. return _TracedAsyncStream(stream, trace_container, reduce_fn)
  680. # If it's not iterable, end the trace immediately
  681. _on_run_end(trace_container, outputs=stream)
  682. return stream
  683. @functools.wraps(func)
  684. async def async_stream_wrapper(
  685. *args: Any, langsmith_extra: Optional[LangSmithExtra] = None, **kwargs: Any
  686. ) -> Any:
  687. if not func_accepts_config:
  688. kwargs.pop("config", None)
  689. trace_container = await aitertools.aio_to_thread(
  690. _setup_run,
  691. func,
  692. container_input=container_input,
  693. langsmith_extra=langsmith_extra,
  694. args=args,
  695. kwargs=kwargs,
  696. )
  697. try:
  698. if func_accepts_parent_run:
  699. kwargs["run_tree"] = trace_container["new_run"]
  700. stream = await func(*args, **kwargs)
  701. except Exception as e:
  702. await aitertools.aio_to_thread(_on_run_end, trace_container, error=e)
  703. raise
  704. if hasattr(stream, "__aiter__"):
  705. return _TracedAsyncStream(stream, trace_container, reduce_fn)
  706. elif hasattr(stream, "__iter__"):
  707. # Async function -> sync iterable
  708. return _TracedStream(stream, trace_container, reduce_fn)
  709. # If it's not iterable, end the trace immediately
  710. await aitertools.aio_to_thread(_on_run_end, trace_container, outputs=stream)
  711. return stream
  712. if inspect.isasyncgenfunction(func):
  713. selected_wrapper: Callable = async_generator_wrapper
  714. elif inspect.isgeneratorfunction(func):
  715. selected_wrapper = generator_wrapper
  716. elif is_async(func):
  717. if reduce_fn:
  718. selected_wrapper = async_stream_wrapper
  719. else:
  720. selected_wrapper = async_wrapper
  721. else:
  722. if reduce_fn:
  723. selected_wrapper = stream_wrapper
  724. else:
  725. selected_wrapper = wrapper
  726. setattr(selected_wrapper, "__langsmith_traceable__", True)
  727. sig = inspect.signature(selected_wrapper)
  728. if not sig.parameters.get("config"):
  729. sig = sig.replace(
  730. parameters=[
  731. *(
  732. param
  733. for param in sig.parameters.values()
  734. if param.kind != inspect.Parameter.VAR_KEYWORD
  735. ),
  736. inspect.Parameter(
  737. "config", inspect.Parameter.KEYWORD_ONLY, default=None
  738. ),
  739. *(
  740. param
  741. for param in sig.parameters.values()
  742. if param.kind == inspect.Parameter.VAR_KEYWORD
  743. ),
  744. ]
  745. )
  746. selected_wrapper.__signature__ = sig # type: ignore[attr-defined]
  747. return selected_wrapper
  748. # If the decorator is called with no arguments, then it's being used as a
  749. # decorator, so we return the decorator function
  750. if len(args) == 1 and callable(args[0]) and not kwargs:
  751. return decorator(args[0])
  752. # Else it's being used as a decorator factory, so we return the decorator
  753. return decorator
  754. class trace:
  755. """Manage a LangSmith run in context.
  756. This class can be used as both a synchronous and asynchronous context manager.
  757. Args:
  758. name: Name of the run.
  759. run_type: Type of run (e.g., `'chain'`, `'llm'`, `'tool'`).
  760. inputs: Initial input data for the run.
  761. project_name: Project name to associate the run with.
  762. parent: Parent run.
  763. Can be a `RunTree`, dotted order string, or tracing headers.
  764. tags: List of tags for the run.
  765. metadata: Additional metadata for the run.
  766. client: LangSmith client for custom settings.
  767. run_id: Preset identifier for the run.
  768. reference_example_id: Associates run with a dataset example.
  769. Only for root runs in evaluation.
  770. exceptions_to_handle: Exception types to ignore.
  771. extra: Extra data to send to LangSmith.
  772. Use 'metadata' instead.
  773. Examples:
  774. Synchronous usage:
  775. ```python
  776. with trace("My Operation", run_type="tool", tags=["important"]) as run:
  777. result = "foo" # Perform operation
  778. run.metadata["some-key"] = "some-value"
  779. run.end(outputs={"result": result})
  780. ```
  781. Asynchronous usage:
  782. ```python
  783. async def main():
  784. async with trace("Async Operation", run_type="tool", tags=["async"]) as run:
  785. result = "foo" # Await async operation
  786. run.metadata["some-key"] = "some-value"
  787. # "end" just adds the outputs and sets error to None
  788. # The actual patching of the run happens when the context exits
  789. run.end(outputs={"result": result})
  790. asyncio.run(main())
  791. ```
  792. Handling specific exceptions:
  793. ```python
  794. import pytest
  795. import sys
  796. with trace("Test", exceptions_to_handle=(pytest.skip.Exception,)):
  797. if sys.platform == "win32": # Just an example
  798. pytest.skip("Skipping test for windows")
  799. result = "foo" # Perform test operation
  800. ```
  801. """
  802. def __init__(
  803. self,
  804. name: str,
  805. run_type: ls_client.RUN_TYPE_T = "chain",
  806. *,
  807. inputs: Optional[dict] = None,
  808. extra: Optional[dict] = None,
  809. project_name: Optional[str] = None,
  810. parent: Optional[
  811. Union[run_trees.RunTree, str, Mapping, Literal["ignore"]]
  812. ] = None,
  813. tags: Optional[list[str]] = None,
  814. metadata: Optional[Mapping[str, Any]] = None,
  815. client: Optional[ls_client.Client] = None,
  816. run_id: Optional[ls_client.ID_TYPE] = None,
  817. reference_example_id: Optional[ls_client.ID_TYPE] = None,
  818. exceptions_to_handle: Optional[tuple[type[BaseException], ...]] = None,
  819. attachments: Optional[schemas.Attachments] = None,
  820. **kwargs: Any,
  821. ):
  822. """Initialize the trace context manager.
  823. Warns if unsupported kwargs are passed.
  824. """
  825. self._end_on_exit = kwargs.pop("_end_on_exit", True)
  826. if kwargs:
  827. warnings.warn(
  828. "The `trace` context manager no longer supports the following kwargs: "
  829. f"{sorted(kwargs.keys())}.",
  830. DeprecationWarning,
  831. )
  832. self.name = name
  833. self.run_type = run_type
  834. self.inputs = inputs
  835. self.attachments = attachments
  836. self.extra = extra
  837. self.project_name = project_name
  838. self.parent = parent
  839. # The run tree is deprecated. Keeping for backwards compat.
  840. # Will fully merge within parent later.
  841. self.run_tree = kwargs.get("run_tree")
  842. self.tags = tags
  843. self.metadata = metadata
  844. self.client = client
  845. self.run_id = run_id
  846. self.reference_example_id = reference_example_id
  847. self.exceptions_to_handle = exceptions_to_handle
  848. self.new_run: Optional[run_trees.RunTree] = None
  849. self.old_ctx: Optional[dict] = None
  850. def _setup(self) -> run_trees.RunTree:
  851. """Set up the tracing context and create a new run.
  852. This method initializes the tracing context, merges tags and metadata,
  853. creates a new run (either as a child of an existing run or as a new root run),
  854. and sets up the necessary context variables.
  855. Returns:
  856. run_trees.RunTree: The newly created run.
  857. """
  858. self.old_ctx = get_tracing_context()
  859. enabled = utils.tracing_is_enabled(self.old_ctx)
  860. outer_tags = _context._TAGS.get() or _context._GLOBAL_TAGS
  861. outer_metadata = _context._METADATA.get() or _context._GLOBAL_METADATA
  862. client_ = self.client or self.old_ctx.get("client")
  863. parent_run_ = _get_parent_run(
  864. {
  865. "parent": self.parent,
  866. "run_tree": self.run_tree,
  867. "client": client_,
  868. "project_name": self.project_name,
  869. }
  870. )
  871. tags_ = sorted(set((self.tags or []) + (outer_tags or [])))
  872. metadata = {
  873. **(self.metadata or {}),
  874. **(outer_metadata or {}),
  875. "ls_method": "trace",
  876. }
  877. extra_outer = self.extra or {}
  878. extra_outer["metadata"] = metadata
  879. project_name_ = _get_project_name(self.project_name)
  880. if parent_run_ is not None and enabled:
  881. self.new_run = parent_run_.create_child(
  882. name=self.name,
  883. run_id=self.run_id,
  884. run_type=self.run_type,
  885. extra=extra_outer,
  886. inputs=self.inputs,
  887. tags=tags_,
  888. attachments=self.attachments,
  889. )
  890. else:
  891. self.new_run = run_trees.RunTree(
  892. name=self.name,
  893. id=ls_client._ensure_uuid(self.run_id),
  894. reference_example_id=ls_client._ensure_uuid(
  895. self.reference_example_id, accept_null=True
  896. ),
  897. run_type=self.run_type,
  898. extra=extra_outer,
  899. project_name=project_name_ or "default",
  900. replicas=run_trees._REPLICAS.get(),
  901. inputs=self.inputs or {},
  902. tags=tags_,
  903. client=client_, # type: ignore
  904. attachments=self.attachments or {}, # type: ignore
  905. )
  906. if enabled is True:
  907. self.new_run.post()
  908. if enabled:
  909. _context._TAGS.set(tags_)
  910. _context._METADATA.set(metadata)
  911. _context._PARENT_RUN_TREE.set(self.new_run)
  912. _context._PROJECT_NAME.set(project_name_)
  913. _context._CLIENT.set(client_)
  914. return self.new_run
  915. def _teardown(
  916. self,
  917. exc_type: Optional[type[BaseException]],
  918. exc_value: Optional[BaseException],
  919. traceback: Optional[TracebackType],
  920. ) -> None:
  921. """Clean up the tracing context and finalize the run.
  922. This method handles exceptions, ends the run if necessary,
  923. patches the run if it's not disabled, and resets the tracing context.
  924. Args:
  925. exc_type: The type of the exception that occurred, if any.
  926. exc_value: The exception instance that occurred, if any.
  927. traceback: The traceback object associated with the exception, if any.
  928. """
  929. if self.new_run is None:
  930. return
  931. if exc_type is not None:
  932. if self.exceptions_to_handle and issubclass(
  933. exc_type, self.exceptions_to_handle
  934. ):
  935. tb = None
  936. else:
  937. tb = utils._format_exc()
  938. tb = f"{exc_type.__name__}: {exc_value}\n\n{tb}"
  939. self.new_run.end(error=tb)
  940. if self.old_ctx is not None:
  941. enabled = utils.tracing_is_enabled(self.old_ctx)
  942. if enabled is True and self._end_on_exit:
  943. self.new_run.patch()
  944. _set_tracing_context(self.old_ctx)
  945. else:
  946. warnings.warn("Tracing context was not set up properly.", RuntimeWarning)
  947. def __enter__(self) -> run_trees.RunTree:
  948. """Enter the context manager synchronously.
  949. Returns:
  950. run_trees.RunTree: The newly created run.
  951. """
  952. return self._setup()
  953. def __exit__(
  954. self,
  955. exc_type: Optional[type[BaseException]] = None,
  956. exc_value: Optional[BaseException] = None,
  957. traceback: Optional[TracebackType] = None,
  958. ) -> None:
  959. """Exit the context manager synchronously.
  960. Args:
  961. exc_type: The type of the exception that occurred, if any.
  962. exc_value: The exception instance that occurred, if any.
  963. traceback: The traceback object associated with the exception, if any.
  964. """
  965. self._teardown(exc_type, exc_value, traceback)
  966. async def __aenter__(self) -> run_trees.RunTree:
  967. """Enter the context manager asynchronously.
  968. Returns:
  969. run_trees.RunTree: The newly created run.
  970. """
  971. ctx = copy_context()
  972. result = await aitertools.aio_to_thread(self._setup, __ctx=ctx)
  973. # Set the context for the current thread
  974. _set_tracing_context(get_tracing_context(ctx))
  975. return result
  976. async def __aexit__(
  977. self,
  978. exc_type: Optional[type[BaseException]] = None,
  979. exc_value: Optional[BaseException] = None,
  980. traceback: Optional[TracebackType] = None,
  981. ) -> None:
  982. """Exit the context manager asynchronously.
  983. Args:
  984. exc_type: The type of the exception that occurred, if any.
  985. exc_value: The exception instance that occurred, if any.
  986. traceback: The traceback object associated with the exception, if any.
  987. """
  988. ctx = copy_context()
  989. if exc_type is not None:
  990. await asyncio.shield(
  991. aitertools.aio_to_thread(
  992. self._teardown, exc_type, exc_value, traceback, __ctx=ctx
  993. )
  994. )
  995. else:
  996. await aitertools.aio_to_thread(
  997. self._teardown, exc_type, exc_value, traceback, __ctx=ctx
  998. )
  999. _set_tracing_context(get_tracing_context(ctx))
  1000. def _get_project_name(project_name: Optional[str]) -> Optional[str]:
  1001. if project_name:
  1002. return project_name
  1003. prt = _PARENT_RUN_TREE.get()
  1004. return (
  1005. # Maintain tree consistency first
  1006. _context._PROJECT_NAME.get()
  1007. or (prt.session_name if prt else None)
  1008. # Global fallback configured via ls.configure(...)
  1009. or _context._GLOBAL_PROJECT_NAME
  1010. # fallback to the default for the environment
  1011. or utils.get_tracer_project()
  1012. )
  1013. def as_runnable(traceable_fn: Callable) -> Runnable:
  1014. """Convert a function wrapped by the LangSmith `@traceable` decorator to a `Runnable`.
  1015. Args:
  1016. traceable_fn: The function wrapped by the `@traceable` decorator.
  1017. Returns:
  1018. Runnable: A `Runnable` object that maintains a consistent LangSmith
  1019. tracing context.
  1020. Raises:
  1021. ImportError: If `langchain` module is not installed.
  1022. ValueError: If the provided function is not wrapped by the `@traceable` decorator.
  1023. Example:
  1024. >>> @traceable
  1025. ... def my_function(input_data):
  1026. ... # Function implementation
  1027. ... pass
  1028. >>> runnable = as_runnable(my_function)
  1029. """
  1030. try:
  1031. from langchain_core.runnables import RunnableConfig, RunnableLambda
  1032. from langchain_core.runnables.utils import Input, Output
  1033. except ImportError as e:
  1034. raise ImportError(
  1035. "as_runnable requires langchain-core to be installed. "
  1036. "You can install it with `pip install langchain-core`."
  1037. ) from e
  1038. if not is_traceable_function(traceable_fn):
  1039. try:
  1040. fn_src = inspect.getsource(traceable_fn)
  1041. except Exception:
  1042. fn_src = "<source unavailable>"
  1043. raise ValueError(
  1044. f"as_runnable expects a function wrapped by the LangSmith"
  1045. f" @traceable decorator. Got {traceable_fn} defined as:\n{fn_src}"
  1046. )
  1047. class RunnableTraceable(RunnableLambda):
  1048. """Converts a `@traceable` decorated function to a `Runnable`.
  1049. This helps maintain a consistent LangSmith tracing context.
  1050. """
  1051. def __init__(
  1052. self,
  1053. func: Callable,
  1054. afunc: Optional[Callable[..., Awaitable[Output]]] = None,
  1055. ) -> None:
  1056. wrapped: Optional[Callable[[Input], Output]] = None
  1057. awrapped = self._wrap_async(afunc)
  1058. if is_async(func):
  1059. if awrapped is not None:
  1060. raise TypeError(
  1061. "Func was provided as a coroutine function, but afunc was "
  1062. "also provided. If providing both, func should be a regular "
  1063. "function to avoid ambiguity."
  1064. )
  1065. wrapped = cast(Callable[[Input], Output], self._wrap_async(func))
  1066. elif is_traceable_function(func):
  1067. wrapped = cast(Callable[[Input], Output], self._wrap_sync(func))
  1068. if wrapped is None:
  1069. raise ValueError(
  1070. f"{self.__class__.__name__} expects a function wrapped by"
  1071. " the LangSmith"
  1072. f" @traceable decorator. Got {func}"
  1073. )
  1074. super().__init__(
  1075. wrapped,
  1076. cast(
  1077. Optional[Callable[[Input], Awaitable[Output]]],
  1078. awrapped,
  1079. ),
  1080. )
  1081. @staticmethod
  1082. def _wrap_sync(
  1083. func: Callable[..., Output],
  1084. ) -> Callable[[Input, RunnableConfig], Output]:
  1085. """Wrap a synchronous function to make it asynchronous."""
  1086. def wrap_traceable(inputs: dict, config: RunnableConfig) -> Any:
  1087. run_tree = run_trees.RunTree.from_runnable_config(cast(dict, config))
  1088. return func(**inputs, langsmith_extra={"run_tree": run_tree})
  1089. return cast(Callable[[Input, RunnableConfig], Output], wrap_traceable)
  1090. @staticmethod
  1091. def _wrap_async(
  1092. afunc: Optional[Callable[..., Awaitable[Output]]],
  1093. ) -> Optional[Callable[[Input, RunnableConfig], Awaitable[Output]]]:
  1094. """Wrap an async function to make it synchronous."""
  1095. if afunc is None:
  1096. return None
  1097. if not is_traceable_function(afunc):
  1098. raise ValueError(
  1099. "RunnableTraceable expects a function wrapped by the LangSmith"
  1100. f" @traceable decorator. Got {afunc}"
  1101. )
  1102. afunc_ = cast(Callable[..., Awaitable[Output]], afunc)
  1103. async def awrap_traceable(inputs: dict, config: RunnableConfig) -> Any:
  1104. run_tree = run_trees.RunTree.from_runnable_config(cast(dict, config))
  1105. return await afunc_(**inputs, langsmith_extra={"run_tree": run_tree})
  1106. return cast(
  1107. Callable[[Input, RunnableConfig], Awaitable[Output]], awrap_traceable
  1108. )
  1109. return RunnableTraceable(traceable_fn)
  1110. ## Private Methods and Objects
  1111. _VALID_RUN_TYPES = {
  1112. "tool",
  1113. "chain",
  1114. "llm",
  1115. "retriever",
  1116. "embedding",
  1117. "prompt",
  1118. "parser",
  1119. }
  1120. class _TraceableContainer(TypedDict, total=False):
  1121. """Typed response when initializing a run a traceable."""
  1122. new_run: Optional[run_trees.RunTree]
  1123. project_name: Optional[str]
  1124. outer_project: Optional[str]
  1125. outer_metadata: Optional[dict[str, Any]]
  1126. outer_tags: Optional[list[str]]
  1127. _on_success: Optional[Callable[[run_trees.RunTree], Any]]
  1128. on_end: Optional[Callable[[run_trees.RunTree], Any]]
  1129. context: contextvars.Context
  1130. _token_event_logged: Optional[bool]
  1131. class _ContainerInput(TypedDict, total=False):
  1132. """Typed response when initializing a run a traceable."""
  1133. extra_outer: Optional[dict]
  1134. name: Optional[str]
  1135. metadata: Optional[dict[str, Any]]
  1136. tags: Optional[list[str]]
  1137. client: Optional[ls_client.Client]
  1138. reduce_fn: Optional[Callable]
  1139. project_name: Optional[str]
  1140. run_type: ls_client.RUN_TYPE_T
  1141. process_inputs: Optional[Callable[[dict], dict]]
  1142. process_chunk: Optional[Callable]
  1143. invocation_params_fn: Optional[Callable[[dict], dict]]
  1144. dangerously_allow_filesystem: Optional[bool]
  1145. def _container_end(
  1146. container: _TraceableContainer,
  1147. outputs: Optional[Any] = None,
  1148. error: Optional[BaseException] = None,
  1149. ) -> None:
  1150. """End the run."""
  1151. run_tree = container.get("new_run")
  1152. if run_tree is None:
  1153. # Tracing not enabled
  1154. return
  1155. if isinstance(outputs, dict):
  1156. dict_outputs = outputs
  1157. elif (
  1158. outputs is not None
  1159. and hasattr(outputs, "model_dump")
  1160. and callable(outputs.model_dump)
  1161. and not isinstance(outputs, type)
  1162. ):
  1163. try:
  1164. dict_outputs = outputs.model_dump(exclude_none=True, mode="json")
  1165. except Exception as e:
  1166. LOGGER.debug(
  1167. f"Failed to use model_dump to serialize {type(outputs)} to JSON: {e}"
  1168. )
  1169. dict_outputs = {"output": outputs}
  1170. else:
  1171. dict_outputs = {"output": outputs}
  1172. if (usage := _extract_usage(run_tree=run_tree, outputs=dict_outputs)) is not None:
  1173. run_tree.metadata["usage_metadata"] = usage
  1174. if error:
  1175. stacktrace = utils._format_exc()
  1176. error_repr = f"{repr(error)}\n\n{stacktrace}"
  1177. else:
  1178. error_repr = None
  1179. if (_on_success := container.get("_on_success")) and callable(_on_success):
  1180. try:
  1181. _on_success(run_tree)
  1182. except BaseException as e:
  1183. warnings.warn(f"Failed to run _on_success function: {e}")
  1184. run_tree.end(outputs=dict_outputs, error=error_repr)
  1185. if utils.tracing_is_enabled() is True:
  1186. run_tree.patch()
  1187. if (on_end := container.get("on_end")) and callable(on_end):
  1188. try:
  1189. on_end(run_tree)
  1190. except BaseException as e:
  1191. warnings.warn(f"Failed to run on_end function: {e}")
  1192. def _collect_extra(extra_outer: dict, langsmith_extra: LangSmithExtra) -> dict:
  1193. run_extra = langsmith_extra.get("run_extra", None)
  1194. if run_extra:
  1195. extra_inner = {**extra_outer, **run_extra}
  1196. else:
  1197. extra_inner = extra_outer
  1198. return extra_inner
  1199. def _get_parent_run(
  1200. langsmith_extra: LangSmithExtra,
  1201. config: Optional[dict] = None,
  1202. ) -> Optional[run_trees.RunTree]:
  1203. parent = langsmith_extra.get("parent")
  1204. if parent == "ignore":
  1205. return None
  1206. if isinstance(parent, run_trees.RunTree):
  1207. return parent
  1208. if isinstance(parent, Mapping):
  1209. return run_trees.RunTree.from_headers(
  1210. parent,
  1211. client=langsmith_extra.get("client"),
  1212. # Precedence: headers -> cvar -> explicit -> env var
  1213. project_name=_get_project_name(langsmith_extra.get("project_name")),
  1214. )
  1215. if isinstance(parent, str):
  1216. dort = run_trees.RunTree.from_dotted_order(
  1217. parent,
  1218. client=langsmith_extra.get("client"),
  1219. # Precedence: cvar -> explicit -> env var
  1220. project_name=_get_project_name(langsmith_extra.get("project_name")),
  1221. )
  1222. return dort
  1223. run_tree = langsmith_extra.get("run_tree")
  1224. if run_tree:
  1225. return run_tree
  1226. crt = get_current_run_tree()
  1227. if _runtime_env.get_langchain_core_version() is not None:
  1228. if rt := run_trees.RunTree.from_runnable_config(
  1229. config, client=langsmith_extra.get("client")
  1230. ):
  1231. # Still need to break ties when alternating between traceable and
  1232. # LanChain code.
  1233. # Nesting: LC -> LS -> LS, we want to still use LS as the parent
  1234. # Otherwise would look like LC -> {LS, LS} (siblings)
  1235. if (
  1236. not crt # Simple LC -> LS
  1237. # Let user override if manually passed in or invoked in a
  1238. # RunnableSequence. This is a naive check.
  1239. or (config is not None and config.get("callbacks"))
  1240. # If the LangChain dotted order is more nested than the LangSmith
  1241. # dotted order, use the LangChain run as the parent.
  1242. # Note that this condition shouldn't be triggered in later
  1243. # versions of core, since we also update the run_tree context
  1244. # vars when updating the RunnableConfig context var.
  1245. or rt.dotted_order > crt.dotted_order
  1246. ):
  1247. return rt
  1248. return crt
  1249. def _setup_run(
  1250. func: Callable,
  1251. container_input: _ContainerInput,
  1252. langsmith_extra: Optional[LangSmithExtra] = None,
  1253. args: Any = None,
  1254. kwargs: Any = None,
  1255. ) -> _TraceableContainer:
  1256. """Create a new run or create_child() if run is passed in kwargs."""
  1257. extra_outer = container_input.get("extra_outer") or {}
  1258. metadata = container_input.get("metadata")
  1259. tags = container_input.get("tags")
  1260. client = container_input.get("client")
  1261. run_type = container_input.get("run_type") or "chain"
  1262. dangerously_allow_filesystem = container_input.get(
  1263. "dangerously_allow_filesystem", False
  1264. )
  1265. outer_project = _context._PROJECT_NAME.get()
  1266. langsmith_extra = langsmith_extra or LangSmithExtra()
  1267. name = langsmith_extra.get("name") or container_input.get("name")
  1268. client_ = langsmith_extra.get("client", client) or _context._CLIENT.get()
  1269. parent_run_ = _get_parent_run(
  1270. {**langsmith_extra, "client": client_}, kwargs.get("config")
  1271. )
  1272. project_cv = _context._PROJECT_NAME.get()
  1273. selected_project = (
  1274. project_cv # From parent trace
  1275. or (
  1276. parent_run_.session_name if parent_run_ else None
  1277. ) # from parent run attempt 2 (not managed by traceable)
  1278. or langsmith_extra.get("project_name") # at invocation time
  1279. or container_input["project_name"] # at decorator time
  1280. or _context._GLOBAL_PROJECT_NAME # global fallback from ls.configure
  1281. or utils.get_tracer_project() # default
  1282. )
  1283. reference_example_id = langsmith_extra.get("reference_example_id")
  1284. id_ = langsmith_extra.get("run_id")
  1285. if not parent_run_ and not utils.tracing_is_enabled():
  1286. utils.log_once(
  1287. logging.DEBUG,
  1288. "LangSmith tracing is not enabled, returning original function.",
  1289. )
  1290. return _TraceableContainer(
  1291. new_run=None,
  1292. project_name=selected_project,
  1293. outer_project=outer_project,
  1294. outer_metadata=None,
  1295. outer_tags=None,
  1296. _on_success=langsmith_extra.get("_on_success"),
  1297. on_end=langsmith_extra.get("on_end"),
  1298. context=copy_context(),
  1299. _token_event_logged=False,
  1300. )
  1301. signature = inspect.signature(func)
  1302. name_ = name or utils._get_function_name(func)
  1303. extra_inner = _collect_extra(extra_outer, langsmith_extra)
  1304. outer_metadata = _context._METADATA.get() or _context._GLOBAL_METADATA
  1305. outer_tags = _context._TAGS.get() or _context._GLOBAL_TAGS
  1306. context = copy_context()
  1307. metadata_ = {
  1308. **(langsmith_extra.get("metadata") or {}),
  1309. **(outer_metadata or {}),
  1310. }
  1311. context.run(_context._METADATA.set, metadata_)
  1312. metadata_.update(metadata or {})
  1313. metadata_["ls_method"] = "traceable"
  1314. extra_inner["metadata"] = metadata_
  1315. inputs, attachments = _get_inputs_and_attachments_safe(
  1316. signature, *args, func=func, **kwargs
  1317. )
  1318. invocation_params_fn = container_input.get("invocation_params_fn")
  1319. if invocation_params_fn:
  1320. try:
  1321. invocation_params = {
  1322. k: v for k, v in invocation_params_fn(inputs).items() if v is not None
  1323. }
  1324. if invocation_params and isinstance(invocation_params, dict):
  1325. metadata_.update(invocation_params)
  1326. except BaseException as e:
  1327. LOGGER.error(f"Failed to infer invocation params for {name_}: {e}")
  1328. process_inputs = container_input.get("process_inputs")
  1329. if process_inputs:
  1330. try:
  1331. inputs = process_inputs(inputs)
  1332. except BaseException as e:
  1333. LOGGER.error(f"Failed to filter inputs for {name_}: {e}")
  1334. tags_ = (langsmith_extra.get("tags") or []) + (outer_tags or [])
  1335. context.run(_context._TAGS.set, tags_)
  1336. tags_ += tags or []
  1337. if parent_run_ is not None:
  1338. new_run = parent_run_.create_child(
  1339. name=name_,
  1340. run_type=run_type,
  1341. inputs=inputs,
  1342. tags=tags_,
  1343. extra=extra_inner,
  1344. run_id=id_,
  1345. attachments=attachments,
  1346. )
  1347. else:
  1348. # Create RunTree kwargs conditionally to let RunTree generate id from start_time
  1349. run_tree_kwargs = {
  1350. "name": name_,
  1351. "inputs": inputs,
  1352. "run_type": run_type,
  1353. "reference_example_id": ls_client._ensure_uuid(
  1354. reference_example_id, accept_null=True
  1355. ),
  1356. "project_name": selected_project,
  1357. "replicas": run_trees._REPLICAS.get(),
  1358. "extra": extra_inner,
  1359. "tags": tags_,
  1360. "client": client_,
  1361. "attachments": attachments,
  1362. "dangerously_allow_filesystem": dangerously_allow_filesystem,
  1363. }
  1364. # Only pass id if user explicitly provided one
  1365. if id_ is not None:
  1366. run_tree_kwargs["id"] = ls_client._ensure_uuid(id_)
  1367. new_run = run_trees.RunTree(**cast(Any, run_tree_kwargs))
  1368. if utils.tracing_is_enabled() is True:
  1369. try:
  1370. new_run.post()
  1371. except BaseException as e:
  1372. LOGGER.error(f"Failed to post run {new_run.id}: {e}")
  1373. response_container = _TraceableContainer(
  1374. new_run=new_run,
  1375. project_name=selected_project,
  1376. outer_project=outer_project,
  1377. outer_metadata=outer_metadata,
  1378. outer_tags=outer_tags,
  1379. on_end=langsmith_extra.get("on_end"),
  1380. _on_success=langsmith_extra.get("_on_success"),
  1381. context=context,
  1382. _token_event_logged=False,
  1383. )
  1384. context.run(_context._PROJECT_NAME.set, response_container["project_name"])
  1385. context.run(_PARENT_RUN_TREE.set, response_container["new_run"])
  1386. return response_container
  1387. def _handle_container_end(
  1388. container: _TraceableContainer,
  1389. outputs: Optional[Any] = None,
  1390. error: Optional[BaseException] = None,
  1391. outputs_processor: Optional[Callable[..., dict]] = None,
  1392. ) -> None:
  1393. """Handle the end of run."""
  1394. try:
  1395. if outputs_processor is not None:
  1396. outputs = outputs_processor(outputs)
  1397. _container_end(container, outputs=outputs, error=error)
  1398. except BaseException as e:
  1399. LOGGER.warning(f"Unable to process trace outputs: {repr(e)}")
  1400. def _is_traceable_function(func: Any) -> bool:
  1401. return getattr(func, "__langsmith_traceable__", False)
  1402. def _get_inputs(
  1403. signature: inspect.Signature, *args: Any, **kwargs: Any
  1404. ) -> dict[str, Any]:
  1405. """Return a dictionary of inputs from the function signature."""
  1406. bound = signature.bind_partial(*args, **kwargs)
  1407. bound.apply_defaults()
  1408. arguments = dict(bound.arguments)
  1409. arguments.pop("self", None)
  1410. arguments.pop("cls", None)
  1411. for param_name, param in signature.parameters.items():
  1412. if param.kind == inspect.Parameter.VAR_KEYWORD:
  1413. # Update with the **kwargs, and remove the original entry
  1414. # This is to help flatten out keyword arguments
  1415. if param_name in arguments:
  1416. arguments.update(arguments[param_name])
  1417. arguments.pop(param_name)
  1418. return arguments
  1419. def _get_inputs_safe(
  1420. signature: inspect.Signature, *args: Any, **kwargs: Any
  1421. ) -> dict[str, Any]:
  1422. try:
  1423. return _get_inputs(signature, *args, **kwargs)
  1424. except BaseException as e:
  1425. LOGGER.debug(f"Failed to get inputs for {signature}: {e}")
  1426. return {"args": args, "kwargs": kwargs}
  1427. def _is_attachment(param: inspect.Parameter, func: Optional[Callable] = None) -> bool:
  1428. if param.annotation == schemas.Attachment or (
  1429. get_origin(param.annotation) == Annotated
  1430. and any(arg == schemas.Attachment for arg in get_args(param.annotation))
  1431. ):
  1432. return True
  1433. # try resolving stringified annotations
  1434. if func is not None and isinstance(param.annotation, str):
  1435. try:
  1436. # include_extras=True preserves annotated metadata
  1437. type_hints = get_type_hints(func, include_extras=True)
  1438. resolved_annotation = type_hints.get(param.name)
  1439. if resolved_annotation is not None:
  1440. return resolved_annotation == schemas.Attachment or (
  1441. get_origin(resolved_annotation) == Annotated
  1442. and any(
  1443. arg == schemas.Attachment
  1444. for arg in get_args(resolved_annotation)
  1445. )
  1446. )
  1447. except (NameError, TypeError, AttributeError):
  1448. pass
  1449. return False
  1450. def _attachment_args_helper(
  1451. signature: inspect.Signature, func: Optional[Callable] = None
  1452. ) -> set[str]:
  1453. return {
  1454. name
  1455. for name, param in signature.parameters.items()
  1456. if _is_attachment(param, func)
  1457. }
  1458. @functools.lru_cache(maxsize=1000)
  1459. def _cached_attachment_args(
  1460. signature: inspect.Signature, func: Optional[Callable] = None
  1461. ) -> set[str]:
  1462. return _attachment_args_helper(signature, func)
  1463. def _attachment_args(
  1464. signature: inspect.Signature, func: Optional[Callable] = None
  1465. ) -> set[str]:
  1466. # Caching signatures fails if there's unhashable default values.
  1467. try:
  1468. return _cached_attachment_args(signature, func)
  1469. except TypeError:
  1470. return _attachment_args_helper(signature, func)
  1471. def _get_inputs_and_attachments_safe(
  1472. signature: inspect.Signature,
  1473. *args: Any,
  1474. func: Optional[Callable] = None,
  1475. **kwargs: Any,
  1476. ) -> tuple[dict, schemas.Attachments]:
  1477. try:
  1478. inferred = _get_inputs(signature, *args, **kwargs)
  1479. attachment_args = _attachment_args(signature, func)
  1480. if attachment_args:
  1481. inputs, attachments = {}, {}
  1482. for k, v in inferred.items():
  1483. if k in attachment_args:
  1484. attachments[k] = v
  1485. else:
  1486. inputs[k] = v
  1487. return inputs, attachments
  1488. return inferred, {}
  1489. except BaseException as e:
  1490. LOGGER.warning(f"Failed to get inputs for {signature}: {e}")
  1491. return {"args": args, "kwargs": kwargs}, {}
  1492. def _set_tracing_context(context: Optional[dict[str, Any]] = None):
  1493. """Set the tracing context."""
  1494. if context is None:
  1495. for k, v in _CONTEXT_KEYS.items():
  1496. v.set(None)
  1497. return
  1498. for k, v in context.items():
  1499. var = _CONTEXT_KEYS[k]
  1500. var.set(v)
  1501. def _process_iterator(
  1502. generator: Iterator[T],
  1503. run_container: _TraceableContainer,
  1504. is_llm_run: bool,
  1505. # Results is mutated
  1506. results: list[Any],
  1507. process_chunk: Optional[Callable],
  1508. otel_context_manager: Optional[Any] = None,
  1509. ) -> Generator[T, None, Any]:
  1510. try:
  1511. while True:
  1512. if otel_context_manager:
  1513. # Create a fresh context manager for each iteration
  1514. def next_with_otel_context():
  1515. # Get the run_tree from run_container to create a fresh context
  1516. run_tree = run_container.get("new_run")
  1517. if run_tree:
  1518. fresh_otel_context = _maybe_create_otel_context(run_tree)
  1519. if fresh_otel_context:
  1520. with fresh_otel_context:
  1521. return next(generator)
  1522. return next(generator)
  1523. item: T = run_container["context"].run(next_with_otel_context) # type: ignore[arg-type]
  1524. else:
  1525. item = run_container["context"].run(next, generator) # type: ignore[arg-type]
  1526. if process_chunk:
  1527. traced_item = process_chunk(item)
  1528. else:
  1529. traced_item = item
  1530. if (
  1531. is_llm_run
  1532. and run_container["new_run"]
  1533. and not run_container.get("_token_event_logged")
  1534. ):
  1535. run_container["new_run"].add_event(
  1536. {
  1537. "name": "new_token",
  1538. "time": datetime.datetime.now(
  1539. datetime.timezone.utc
  1540. ).isoformat(),
  1541. "kwargs": {"token": traced_item},
  1542. }
  1543. )
  1544. run_container["_token_event_logged"] = True
  1545. results.append(traced_item)
  1546. yield item
  1547. except StopIteration as e:
  1548. return e.value
  1549. async def _process_async_iterator(
  1550. generator: AsyncIterator[T],
  1551. run_container: _TraceableContainer,
  1552. *,
  1553. is_llm_run: bool,
  1554. accepts_context: bool,
  1555. results: list[Any],
  1556. process_chunk: Optional[Callable],
  1557. otel_context_manager: Optional[Any] = None,
  1558. ) -> AsyncGenerator[T, None]:
  1559. try:
  1560. while True:
  1561. if otel_context_manager:
  1562. # Create a fresh context manager for each iteration
  1563. async def anext_with_otel_context():
  1564. # Get the run_tree from run_container to create a fresh context
  1565. run_tree = run_container.get("new_run")
  1566. if run_tree:
  1567. fresh_otel_context = _maybe_create_otel_context(run_tree)
  1568. if fresh_otel_context:
  1569. with fresh_otel_context:
  1570. return await aitertools.py_anext(generator)
  1571. return await aitertools.py_anext(generator)
  1572. if accepts_context:
  1573. item = await asyncio.create_task( # type: ignore[call-arg, var-annotated]
  1574. anext_with_otel_context(),
  1575. context=run_container["context"],
  1576. )
  1577. else:
  1578. # Python < 3.11
  1579. with tracing_context(
  1580. **get_tracing_context(run_container["context"])
  1581. ):
  1582. item = await anext_with_otel_context()
  1583. else:
  1584. if accepts_context:
  1585. item = await asyncio.create_task( # type: ignore[call-arg, var-annotated]
  1586. aitertools.py_anext(generator), # type: ignore[arg-type]
  1587. context=run_container["context"],
  1588. )
  1589. else:
  1590. # Python < 3.11
  1591. with tracing_context(
  1592. **get_tracing_context(run_container["context"])
  1593. ):
  1594. item = await aitertools.py_anext(generator)
  1595. if process_chunk:
  1596. traced_item = process_chunk(item)
  1597. else:
  1598. traced_item = item
  1599. if (
  1600. is_llm_run
  1601. and run_container["new_run"]
  1602. and not run_container.get("_token_event_logged")
  1603. ):
  1604. run_container["new_run"].add_event(
  1605. {
  1606. "name": "new_token",
  1607. "time": datetime.datetime.now(
  1608. datetime.timezone.utc
  1609. ).isoformat(),
  1610. "kwargs": {"token": traced_item},
  1611. }
  1612. )
  1613. run_container["_token_event_logged"] = True
  1614. results.append(traced_item)
  1615. yield item
  1616. except StopAsyncIteration:
  1617. pass
  1618. T = TypeVar("T")
  1619. class _TracedStreamBase(Generic[T]):
  1620. """Base class for traced stream objects."""
  1621. def __init__(
  1622. self,
  1623. stream: Union[Iterator[T], AsyncIterator[T]],
  1624. trace_container: _TraceableContainer,
  1625. reduce_fn: Optional[Callable] = None,
  1626. ):
  1627. self.__ls_stream__ = stream
  1628. self.__ls_trace_container__ = trace_container
  1629. self.__ls_completed__ = False
  1630. self.__ls_reduce_fn__ = reduce_fn
  1631. self.__ls_accumulated_output__: list[T] = []
  1632. self.__is_llm_run__ = (
  1633. trace_container["new_run"].run_type == "llm"
  1634. if trace_container["new_run"]
  1635. else False
  1636. )
  1637. def __getattr__(self, name: str):
  1638. return getattr(self.__ls_stream__, name)
  1639. def __dir__(self):
  1640. return list(set(dir(self.__class__) + dir(self.__ls_stream__)))
  1641. def __repr__(self):
  1642. return f"Traceable({self.__ls_stream__!r})"
  1643. def __str__(self):
  1644. return str(self.__ls_stream__)
  1645. def __del__(self):
  1646. try:
  1647. if not self.__ls_completed__:
  1648. self._end_trace()
  1649. except BaseException:
  1650. pass
  1651. try:
  1652. self.__ls_stream__.__del__()
  1653. except BaseException:
  1654. pass
  1655. def _end_trace(self, error: Optional[BaseException] = None):
  1656. if self.__ls_completed__:
  1657. return
  1658. try:
  1659. if self.__ls_reduce_fn__:
  1660. reduced_output = self.__ls_reduce_fn__(self.__ls_accumulated_output__)
  1661. else:
  1662. reduced_output = self.__ls_accumulated_output__
  1663. _container_end(
  1664. self.__ls_trace_container__,
  1665. outputs=reduced_output,
  1666. error=error,
  1667. )
  1668. finally:
  1669. self.__ls_completed__ = True
  1670. class _TracedStream(_TracedStreamBase, Generic[T]):
  1671. """A wrapper for synchronous stream objects that handles tracing."""
  1672. def __init__(
  1673. self,
  1674. stream: Iterator[T],
  1675. trace_container: _TraceableContainer,
  1676. reduce_fn: Optional[Callable] = None,
  1677. process_chunk: Optional[Callable] = None,
  1678. ):
  1679. super().__init__(
  1680. stream=stream,
  1681. trace_container=trace_container,
  1682. reduce_fn=reduce_fn,
  1683. )
  1684. self.__ls_stream__ = stream
  1685. self.__ls__gen__ = _process_iterator(
  1686. self.__ls_stream__,
  1687. self.__ls_trace_container__,
  1688. is_llm_run=self.__is_llm_run__,
  1689. results=self.__ls_accumulated_output__,
  1690. process_chunk=process_chunk,
  1691. )
  1692. def __next__(self) -> T:
  1693. try:
  1694. return next(self.__ls__gen__)
  1695. except StopIteration:
  1696. self._end_trace()
  1697. raise
  1698. def __iter__(self) -> Iterator[T]:
  1699. try:
  1700. yield from self.__ls__gen__
  1701. except BaseException as e:
  1702. _cleanup_traceback(e)
  1703. self._end_trace(error=e)
  1704. raise
  1705. else:
  1706. self._end_trace()
  1707. def __enter__(self):
  1708. self.__ls_stream__.__enter__()
  1709. return self
  1710. def __exit__(self, exc_type, exc_val, exc_tb):
  1711. try:
  1712. return self.__ls_stream__.__exit__(exc_type, exc_val, exc_tb)
  1713. finally:
  1714. self._end_trace(error=exc_val if exc_type else None)
  1715. class _TracedAsyncStream(_TracedStreamBase, Generic[T]):
  1716. """A wrapper for asynchronous stream objects that handles tracing."""
  1717. def __init__(
  1718. self,
  1719. stream: AsyncIterator[T],
  1720. trace_container: _TraceableContainer,
  1721. reduce_fn: Optional[Callable] = None,
  1722. process_chunk: Optional[Callable] = None,
  1723. ):
  1724. super().__init__(
  1725. stream=stream,
  1726. trace_container=trace_container,
  1727. reduce_fn=reduce_fn,
  1728. )
  1729. self.__ls_stream__ = stream
  1730. self.__ls_gen = _process_async_iterator(
  1731. generator=self.__ls_stream__,
  1732. run_container=self.__ls_trace_container__,
  1733. is_llm_run=self.__is_llm_run__,
  1734. accepts_context=aitertools.asyncio_accepts_context(),
  1735. results=self.__ls_accumulated_output__,
  1736. process_chunk=process_chunk,
  1737. )
  1738. async def _aend_trace(self, error: Optional[BaseException] = None):
  1739. ctx = copy_context()
  1740. await asyncio.shield(
  1741. aitertools.aio_to_thread(self._end_trace, error, __ctx=ctx)
  1742. )
  1743. _set_tracing_context(get_tracing_context(ctx))
  1744. async def __anext__(self) -> T:
  1745. try:
  1746. return cast(T, await aitertools.py_anext(self.__ls_gen))
  1747. except StopAsyncIteration:
  1748. await self._aend_trace()
  1749. raise
  1750. async def __aiter__(self) -> AsyncIterator[T]:
  1751. try:
  1752. async for item in self.__ls_gen:
  1753. yield item
  1754. except BaseException:
  1755. await self._aend_trace()
  1756. raise
  1757. else:
  1758. await self._aend_trace()
  1759. async def __aenter__(self):
  1760. await self.__ls_stream__.__aenter__()
  1761. return self
  1762. async def __aexit__(self, exc_type, exc_val, exc_tb):
  1763. try:
  1764. return await self.__ls_stream__.__aexit__(exc_type, exc_val, exc_tb)
  1765. finally:
  1766. await self._aend_trace()
  1767. def _get_function_result(results: list, reduce_fn: Callable) -> Any:
  1768. if results:
  1769. if reduce_fn is not None:
  1770. try:
  1771. return reduce_fn(results)
  1772. except BaseException as e:
  1773. LOGGER.error(e)
  1774. return results
  1775. else:
  1776. return results
  1777. def _cleanup_traceback(e: BaseException):
  1778. tb_ = e.__traceback__
  1779. if tb_:
  1780. while tb_.tb_next is not None and tb_.tb_frame.f_code.co_filename.endswith(
  1781. _EXCLUDED_FRAME_FNAME
  1782. ):
  1783. tb_ = tb_.tb_next
  1784. e.__traceback__ = tb_
  1785. def _is_otel_available() -> bool:
  1786. """Cache for otel import check."""
  1787. global _OTEL_AVAILABLE
  1788. if _OTEL_AVAILABLE is None:
  1789. try:
  1790. import opentelemetry.trace # noqa: F401
  1791. _OTEL_AVAILABLE = True
  1792. except ImportError:
  1793. _OTEL_AVAILABLE = False
  1794. return _OTEL_AVAILABLE
  1795. def _maybe_create_otel_context(run_tree: Optional[run_trees.RunTree]):
  1796. """Create OpenTelemetry context manager if OTEL is enabled and available.
  1797. Args:
  1798. run_tree: The current run tree.
  1799. Returns:
  1800. Context manager for use_span or None if not available.
  1801. """
  1802. if not run_tree or not utils.is_env_var_truish("OTEL_ENABLED"):
  1803. return None
  1804. if not _is_otel_available():
  1805. return None
  1806. from opentelemetry.trace import ( # type: ignore[import]
  1807. NonRecordingSpan,
  1808. SpanContext,
  1809. TraceFlags,
  1810. TraceState,
  1811. use_span,
  1812. )
  1813. from langsmith._internal._otel_utils import (
  1814. get_otel_span_id_from_uuid,
  1815. get_otel_trace_id_from_uuid,
  1816. )
  1817. trace_id = get_otel_trace_id_from_uuid(run_tree.trace_id)
  1818. span_id = get_otel_span_id_from_uuid(run_tree.id)
  1819. span_context = SpanContext(
  1820. trace_id=trace_id,
  1821. span_id=span_id,
  1822. is_remote=False,
  1823. trace_flags=TraceFlags(TraceFlags.SAMPLED),
  1824. trace_state=TraceState(),
  1825. )
  1826. non_recording_span = NonRecordingSpan(span_context)
  1827. return use_span(non_recording_span)
  1828. # For backwards compatibility
  1829. _PROJECT_NAME = _context._PROJECT_NAME
  1830. _TAGS = _context._TAGS
  1831. _METADATA = _context._METADATA
  1832. _TRACING_ENABLED = _context._TRACING_ENABLED
  1833. _CLIENT = _context._CLIENT
  1834. _PARENT_RUN_TREE = _context._PARENT_RUN_TREE