| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- from __future__ import annotations
- from collections.abc import Sequence
- from typing import Any, Generic
- from typing_extensions import Self
- from langgraph._internal._typing import MISSING
- from langgraph.channels.base import BaseChannel, Value
- from langgraph.errors import (
- EmptyChannelError,
- ErrorCode,
- InvalidUpdateError,
- create_error_message,
- )
- __all__ = ("LastValue", "LastValueAfterFinish")
- class LastValue(Generic[Value], BaseChannel[Value, Value, Value]):
- """Stores the last value received, can receive at most one value per step."""
- __slots__ = ("value",)
- value: Value | Any
- def __init__(self, typ: Any, key: str = "") -> None:
- super().__init__(typ, key)
- self.value = MISSING
- def __eq__(self, value: object) -> bool:
- return isinstance(value, LastValue)
- @property
- def ValueType(self) -> type[Value]:
- """The type of the value stored in the channel."""
- return self.typ
- @property
- def UpdateType(self) -> type[Value]:
- """The type of the update received by the channel."""
- return self.typ
- def copy(self) -> Self:
- """Return a copy of the channel."""
- empty = self.__class__(self.typ, self.key)
- empty.value = self.value
- return empty
- def from_checkpoint(self, checkpoint: Value) -> Self:
- empty = self.__class__(self.typ, self.key)
- if checkpoint is not MISSING:
- empty.value = checkpoint
- return empty
- def update(self, values: Sequence[Value]) -> bool:
- if len(values) == 0:
- return False
- if len(values) != 1:
- msg = create_error_message(
- message=f"At key '{self.key}': Can receive only one value per step. Use an Annotated key to handle multiple values.",
- error_code=ErrorCode.INVALID_CONCURRENT_GRAPH_UPDATE,
- )
- raise InvalidUpdateError(msg)
- self.value = values[-1]
- return True
- def get(self) -> Value:
- if self.value is MISSING:
- raise EmptyChannelError()
- return self.value
- def is_available(self) -> bool:
- return self.value is not MISSING
- def checkpoint(self) -> Value:
- return self.value
- class LastValueAfterFinish(
- Generic[Value], BaseChannel[Value, Value, tuple[Value, bool]]
- ):
- """Stores the last value received, but only made available after finish().
- Once made available, clears the value."""
- __slots__ = ("value", "finished")
- value: Value | Any
- finished: bool
- def __init__(self, typ: Any, key: str = "") -> None:
- super().__init__(typ, key)
- self.value = MISSING
- self.finished = False
- def __eq__(self, value: object) -> bool:
- return isinstance(value, LastValueAfterFinish)
- @property
- def ValueType(self) -> type[Value]:
- """The type of the value stored in the channel."""
- return self.typ
- @property
- def UpdateType(self) -> type[Value]:
- """The type of the update received by the channel."""
- return self.typ
- def checkpoint(self) -> tuple[Value | Any, bool] | Any:
- if self.value is MISSING:
- return MISSING
- return (self.value, self.finished)
- def from_checkpoint(self, checkpoint: tuple[Value | Any, bool] | Any) -> Self:
- empty = self.__class__(self.typ)
- empty.key = self.key
- if checkpoint is not MISSING:
- empty.value, empty.finished = checkpoint
- return empty
- def update(self, values: Sequence[Value | Any]) -> bool:
- if len(values) == 0:
- return False
- self.finished = False
- self.value = values[-1]
- return True
- def consume(self) -> bool:
- if self.finished:
- self.finished = False
- self.value = MISSING
- return True
- return False
- def finish(self) -> bool:
- if not self.finished and self.value is not MISSING:
- self.finished = True
- return True
- else:
- return False
- def get(self) -> Value:
- if self.value is MISSING or not self.finished:
- raise EmptyChannelError()
- return self.value
- def is_available(self) -> bool:
- return self.value is not MISSING and self.finished
|