base.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. from __future__ import annotations
  2. from abc import ABC, abstractmethod
  3. from collections.abc import Sequence
  4. from typing import Any, Generic, TypeVar
  5. from typing_extensions import Self
  6. from langgraph._internal._typing import MISSING
  7. from langgraph.errors import EmptyChannelError
  8. Value = TypeVar("Value")
  9. Update = TypeVar("Update")
  10. Checkpoint = TypeVar("Checkpoint")
  11. __all__ = ("BaseChannel",)
  12. class BaseChannel(Generic[Value, Update, Checkpoint], ABC):
  13. """Base class for all channels."""
  14. __slots__ = ("key", "typ")
  15. def __init__(self, typ: Any, key: str = "") -> None:
  16. self.typ = typ
  17. self.key = key
  18. @property
  19. @abstractmethod
  20. def ValueType(self) -> Any:
  21. """The type of the value stored in the channel."""
  22. @property
  23. @abstractmethod
  24. def UpdateType(self) -> Any:
  25. """The type of the update received by the channel."""
  26. # serialize/deserialize methods
  27. def copy(self) -> Self:
  28. """Return a copy of the channel.
  29. By default, delegates to `checkpoint()` and `from_checkpoint()`.
  30. Subclasses can override this method with a more efficient implementation.
  31. """
  32. return self.from_checkpoint(self.checkpoint())
  33. def checkpoint(self) -> Checkpoint | Any:
  34. """Return a serializable representation of the channel's current state.
  35. Raises `EmptyChannelError` if the channel is empty (never updated yet),
  36. or doesn't support checkpoints.
  37. """
  38. try:
  39. return self.get()
  40. except EmptyChannelError:
  41. return MISSING
  42. @abstractmethod
  43. def from_checkpoint(self, checkpoint: Checkpoint | Any) -> Self:
  44. """Return a new identical channel, optionally initialized from a checkpoint.
  45. If the checkpoint contains complex data structures, they should be copied.
  46. """
  47. # read methods
  48. @abstractmethod
  49. def get(self) -> Value:
  50. """Return the current value of the channel.
  51. Raises `EmptyChannelError` if the channel is empty (never updated yet)."""
  52. def is_available(self) -> bool:
  53. """Return `True` if the channel is available (not empty), `False` otherwise.
  54. Subclasses should override this method to provide a more efficient
  55. implementation than calling `get()` and catching `EmptyChannelError`.
  56. """
  57. try:
  58. self.get()
  59. return True
  60. except EmptyChannelError:
  61. return False
  62. # write methods
  63. @abstractmethod
  64. def update(self, values: Sequence[Update]) -> bool:
  65. """Update the channel's value with the given sequence of updates.
  66. The order of the updates in the sequence is arbitrary.
  67. This method is called by Pregel for all channels at the end of each step.
  68. If there are no updates, it is called with an empty sequence.
  69. Raises `InvalidUpdateError` if the sequence of updates is invalid.
  70. Returns `True` if the channel was updated, `False` otherwise."""
  71. def consume(self) -> bool:
  72. """Notify the channel that a subscribed task ran.
  73. By default, no-op.
  74. A channel can use this method to modify its state, preventing the value from being consumed again.
  75. Returns `True` if the channel was updated, `False` otherwise.
  76. """
  77. return False
  78. def finish(self) -> bool:
  79. """Notify the channel that the Pregel run is finishing.
  80. By default, no-op.
  81. A channel can use this method to modify its state, preventing finish.
  82. Returns `True` if the channel was updated, `False` otherwise.
  83. """
  84. return False