config.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import asyncio
  2. import sys
  3. from typing import Any
  4. from langchain_core.runnables import RunnableConfig
  5. from langchain_core.runnables.config import var_child_runnable_config
  6. from langgraph.store.base import BaseStore
  7. from langgraph._internal._constants import CONF, CONFIG_KEY_RUNTIME
  8. from langgraph.types import StreamWriter
  9. def _no_op_stream_writer(c: Any) -> None:
  10. pass
  11. def get_config() -> RunnableConfig:
  12. if sys.version_info < (3, 11):
  13. try:
  14. if asyncio.current_task():
  15. raise RuntimeError(
  16. "Python 3.11 or later required to use this in an async context"
  17. )
  18. except RuntimeError:
  19. pass
  20. if var_config := var_child_runnable_config.get():
  21. return var_config
  22. else:
  23. raise RuntimeError("Called get_config outside of a runnable context")
  24. def get_store() -> BaseStore:
  25. """Access LangGraph store from inside a graph node or entrypoint task at runtime.
  26. Can be called from inside any [`StateGraph`][langgraph.graph.StateGraph] node or
  27. functional API [`task`][langgraph.func.task], as long as the `StateGraph` or the [`entrypoint`][langgraph.func.entrypoint]
  28. was initialized with a store, e.g.:
  29. ```python
  30. # with StateGraph
  31. graph = (
  32. StateGraph(...)
  33. ...
  34. .compile(store=store)
  35. )
  36. # or with entrypoint
  37. @entrypoint(store=store)
  38. def workflow(inputs):
  39. ...
  40. ```
  41. !!! warning "Async with Python < 3.11"
  42. If you are using Python < 3.11 and are running LangGraph asynchronously,
  43. `get_store()` won't work since it uses [`contextvar`](https://docs.python.org/3/library/contextvars.html) propagation (only available in [Python >= 3.11](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task)).
  44. Example: Using with `StateGraph`
  45. ```python
  46. from typing_extensions import TypedDict
  47. from langgraph.graph import StateGraph, START
  48. from langgraph.store.memory import InMemoryStore
  49. from langgraph.config import get_store
  50. store = InMemoryStore()
  51. store.put(("values",), "foo", {"bar": 2})
  52. class State(TypedDict):
  53. foo: int
  54. def my_node(state: State):
  55. my_store = get_store()
  56. stored_value = my_store.get(("values",), "foo").value["bar"]
  57. return {"foo": stored_value + 1}
  58. graph = (
  59. StateGraph(State)
  60. .add_node(my_node)
  61. .add_edge(START, "my_node")
  62. .compile(store=store)
  63. )
  64. graph.invoke({"foo": 1})
  65. ```
  66. ```pycon
  67. {"foo": 3}
  68. ```
  69. Example: Using with functional API
  70. ```python
  71. from langgraph.func import entrypoint, task
  72. from langgraph.store.memory import InMemoryStore
  73. from langgraph.config import get_store
  74. store = InMemoryStore()
  75. store.put(("values",), "foo", {"bar": 2})
  76. @task
  77. def my_task(value: int):
  78. my_store = get_store()
  79. stored_value = my_store.get(("values",), "foo").value["bar"]
  80. return stored_value + 1
  81. @entrypoint(store=store)
  82. def workflow(value: int):
  83. return my_task(value).result()
  84. workflow.invoke(1)
  85. ```
  86. ```pycon
  87. 3
  88. ```
  89. """
  90. return get_config()[CONF][CONFIG_KEY_RUNTIME].store
  91. def get_stream_writer() -> StreamWriter:
  92. """Access LangGraph [`StreamWriter`][langgraph.types.StreamWriter] from inside a graph node or entrypoint task at runtime.
  93. Can be called from inside any [`StateGraph`][langgraph.graph.StateGraph] node or
  94. functional API [`task`][langgraph.func.task].
  95. !!! warning "Async with Python < 3.11"
  96. If you are using Python < 3.11 and are running LangGraph asynchronously,
  97. `get_stream_writer()` won't work since it uses [`contextvar`](https://docs.python.org/3/library/contextvars.html) propagation (only available in [Python >= 3.11](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task)).
  98. Example: Using with `StateGraph`
  99. ```python
  100. from typing_extensions import TypedDict
  101. from langgraph.graph import StateGraph, START
  102. from langgraph.config import get_stream_writer
  103. class State(TypedDict):
  104. foo: int
  105. def my_node(state: State):
  106. my_stream_writer = get_stream_writer()
  107. my_stream_writer({"custom_data": "Hello!"})
  108. return {"foo": state["foo"] + 1}
  109. graph = (
  110. StateGraph(State)
  111. .add_node(my_node)
  112. .add_edge(START, "my_node")
  113. .compile(store=store)
  114. )
  115. for chunk in graph.stream({"foo": 1}, stream_mode="custom"):
  116. print(chunk)
  117. ```
  118. ```pycon
  119. {"custom_data": "Hello!"}
  120. ```
  121. Example: Using with functional API
  122. ```python
  123. from langgraph.func import entrypoint, task
  124. from langgraph.config import get_stream_writer
  125. @task
  126. def my_task(value: int):
  127. my_stream_writer = get_stream_writer()
  128. my_stream_writer({"custom_data": "Hello!"})
  129. return value + 1
  130. @entrypoint(store=store)
  131. def workflow(value: int):
  132. return my_task(value).result()
  133. for chunk in workflow.stream(1, stream_mode="custom"):
  134. print(chunk)
  135. ```
  136. ```pycon
  137. {"custom_data": "Hello!"}
  138. ```
  139. """
  140. runtime = get_config()[CONF][CONFIG_KEY_RUNTIME]
  141. return runtime.stream_writer