ephemeral_value.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from __future__ import annotations
  2. from collections.abc import Sequence
  3. from typing import Any, Generic
  4. from typing_extensions import Self
  5. from langgraph._internal._typing import MISSING
  6. from langgraph.channels.base import BaseChannel, Value
  7. from langgraph.errors import EmptyChannelError, InvalidUpdateError
  8. __all__ = ("EphemeralValue",)
  9. class EphemeralValue(Generic[Value], BaseChannel[Value, Value, Value]):
  10. """Stores the value received in the step immediately preceding, clears after."""
  11. __slots__ = ("value", "guard")
  12. value: Value | Any
  13. guard: bool
  14. def __init__(self, typ: Any, guard: bool = True) -> None:
  15. super().__init__(typ)
  16. self.guard = guard
  17. self.value = MISSING
  18. def __eq__(self, value: object) -> bool:
  19. return isinstance(value, EphemeralValue) and value.guard == self.guard
  20. @property
  21. def ValueType(self) -> type[Value]:
  22. """The type of the value stored in the channel."""
  23. return self.typ
  24. @property
  25. def UpdateType(self) -> type[Value]:
  26. """The type of the update received by the channel."""
  27. return self.typ
  28. def copy(self) -> Self:
  29. """Return a copy of the channel."""
  30. empty = self.__class__(self.typ, self.guard)
  31. empty.key = self.key
  32. empty.value = self.value
  33. return empty
  34. def from_checkpoint(self, checkpoint: Value) -> Self:
  35. empty = self.__class__(self.typ, self.guard)
  36. empty.key = self.key
  37. if checkpoint is not MISSING:
  38. empty.value = checkpoint
  39. return empty
  40. def update(self, values: Sequence[Value]) -> bool:
  41. if len(values) == 0:
  42. if self.value is not MISSING:
  43. self.value = MISSING
  44. return True
  45. else:
  46. return False
  47. if len(values) != 1 and self.guard:
  48. raise InvalidUpdateError(
  49. f"At key '{self.key}': EphemeralValue(guard=True) can receive only one value per step. Use guard=False if you want to store any one of multiple values."
  50. )
  51. self.value = values[-1]
  52. return True
  53. def get(self) -> Value:
  54. if self.value is MISSING:
  55. raise EmptyChannelError()
  56. return self.value
  57. def is_available(self) -> bool:
  58. return self.value is not MISSING
  59. def checkpoint(self) -> Value:
  60. return self.value