last_value.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. from __future__ import annotations
  2. from collections.abc import Sequence
  3. from typing import Any, Generic
  4. from typing_extensions import Self
  5. from langgraph._internal._typing import MISSING
  6. from langgraph.channels.base import BaseChannel, Value
  7. from langgraph.errors import (
  8. EmptyChannelError,
  9. ErrorCode,
  10. InvalidUpdateError,
  11. create_error_message,
  12. )
  13. __all__ = ("LastValue", "LastValueAfterFinish")
  14. class LastValue(Generic[Value], BaseChannel[Value, Value, Value]):
  15. """Stores the last value received, can receive at most one value per step."""
  16. __slots__ = ("value",)
  17. value: Value | Any
  18. def __init__(self, typ: Any, key: str = "") -> None:
  19. super().__init__(typ, key)
  20. self.value = MISSING
  21. def __eq__(self, value: object) -> bool:
  22. return isinstance(value, LastValue)
  23. @property
  24. def ValueType(self) -> type[Value]:
  25. """The type of the value stored in the channel."""
  26. return self.typ
  27. @property
  28. def UpdateType(self) -> type[Value]:
  29. """The type of the update received by the channel."""
  30. return self.typ
  31. def copy(self) -> Self:
  32. """Return a copy of the channel."""
  33. empty = self.__class__(self.typ, self.key)
  34. empty.value = self.value
  35. return empty
  36. def from_checkpoint(self, checkpoint: Value) -> Self:
  37. empty = self.__class__(self.typ, self.key)
  38. if checkpoint is not MISSING:
  39. empty.value = checkpoint
  40. return empty
  41. def update(self, values: Sequence[Value]) -> bool:
  42. if len(values) == 0:
  43. return False
  44. if len(values) != 1:
  45. msg = create_error_message(
  46. message=f"At key '{self.key}': Can receive only one value per step. Use an Annotated key to handle multiple values.",
  47. error_code=ErrorCode.INVALID_CONCURRENT_GRAPH_UPDATE,
  48. )
  49. raise InvalidUpdateError(msg)
  50. self.value = values[-1]
  51. return True
  52. def get(self) -> Value:
  53. if self.value is MISSING:
  54. raise EmptyChannelError()
  55. return self.value
  56. def is_available(self) -> bool:
  57. return self.value is not MISSING
  58. def checkpoint(self) -> Value:
  59. return self.value
  60. class LastValueAfterFinish(
  61. Generic[Value], BaseChannel[Value, Value, tuple[Value, bool]]
  62. ):
  63. """Stores the last value received, but only made available after finish().
  64. Once made available, clears the value."""
  65. __slots__ = ("value", "finished")
  66. value: Value | Any
  67. finished: bool
  68. def __init__(self, typ: Any, key: str = "") -> None:
  69. super().__init__(typ, key)
  70. self.value = MISSING
  71. self.finished = False
  72. def __eq__(self, value: object) -> bool:
  73. return isinstance(value, LastValueAfterFinish)
  74. @property
  75. def ValueType(self) -> type[Value]:
  76. """The type of the value stored in the channel."""
  77. return self.typ
  78. @property
  79. def UpdateType(self) -> type[Value]:
  80. """The type of the update received by the channel."""
  81. return self.typ
  82. def checkpoint(self) -> tuple[Value | Any, bool] | Any:
  83. if self.value is MISSING:
  84. return MISSING
  85. return (self.value, self.finished)
  86. def from_checkpoint(self, checkpoint: tuple[Value | Any, bool] | Any) -> Self:
  87. empty = self.__class__(self.typ)
  88. empty.key = self.key
  89. if checkpoint is not MISSING:
  90. empty.value, empty.finished = checkpoint
  91. return empty
  92. def update(self, values: Sequence[Value | Any]) -> bool:
  93. if len(values) == 0:
  94. return False
  95. self.finished = False
  96. self.value = values[-1]
  97. return True
  98. def consume(self) -> bool:
  99. if self.finished:
  100. self.finished = False
  101. self.value = MISSING
  102. return True
  103. return False
  104. def finish(self) -> bool:
  105. if not self.finished and self.value is not MISSING:
  106. self.finished = True
  107. return True
  108. else:
  109. return False
  110. def get(self) -> Value:
  111. if self.value is MISSING or not self.finished:
  112. raise EmptyChannelError()
  113. return self.value
  114. def is_available(self) -> bool:
  115. return self.value is not MISSING and self.finished