| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- from __future__ import annotations
- from abc import abstractmethod
- from collections.abc import AsyncIterator, Callable, Iterator, Sequence
- from typing import Any, Generic, cast
- from langchain_core.runnables import Runnable, RunnableConfig
- from langchain_core.runnables.graph import Graph as DrawableGraph
- from typing_extensions import Self
- from langgraph.types import All, Command, StateSnapshot, StateUpdate, StreamMode
- from langgraph.typing import ContextT, InputT, OutputT, StateT
- __all__ = ("PregelProtocol", "StreamProtocol")
- class PregelProtocol(Runnable[InputT, Any], Generic[StateT, ContextT, InputT, OutputT]):
- @abstractmethod
- def with_config(
- self, config: RunnableConfig | None = None, **kwargs: Any
- ) -> Self: ...
- @abstractmethod
- def get_graph(
- self,
- config: RunnableConfig | None = None,
- *,
- xray: int | bool = False,
- ) -> DrawableGraph: ...
- @abstractmethod
- async def aget_graph(
- self,
- config: RunnableConfig | None = None,
- *,
- xray: int | bool = False,
- ) -> DrawableGraph: ...
- @abstractmethod
- def get_state(
- self, config: RunnableConfig, *, subgraphs: bool = False
- ) -> StateSnapshot: ...
- @abstractmethod
- async def aget_state(
- self, config: RunnableConfig, *, subgraphs: bool = False
- ) -> StateSnapshot: ...
- @abstractmethod
- def get_state_history(
- self,
- config: RunnableConfig,
- *,
- filter: dict[str, Any] | None = None,
- before: RunnableConfig | None = None,
- limit: int | None = None,
- ) -> Iterator[StateSnapshot]: ...
- @abstractmethod
- def aget_state_history(
- self,
- config: RunnableConfig,
- *,
- filter: dict[str, Any] | None = None,
- before: RunnableConfig | None = None,
- limit: int | None = None,
- ) -> AsyncIterator[StateSnapshot]: ...
- @abstractmethod
- def bulk_update_state(
- self,
- config: RunnableConfig,
- updates: Sequence[Sequence[StateUpdate]],
- ) -> RunnableConfig: ...
- @abstractmethod
- async def abulk_update_state(
- self,
- config: RunnableConfig,
- updates: Sequence[Sequence[StateUpdate]],
- ) -> RunnableConfig: ...
- @abstractmethod
- def update_state(
- self,
- config: RunnableConfig,
- values: dict[str, Any] | Any | None,
- as_node: str | None = None,
- ) -> RunnableConfig: ...
- @abstractmethod
- async def aupdate_state(
- self,
- config: RunnableConfig,
- values: dict[str, Any] | Any | None,
- as_node: str | None = None,
- ) -> RunnableConfig: ...
- @abstractmethod
- def stream(
- self,
- input: InputT | Command | None,
- config: RunnableConfig | None = None,
- *,
- context: ContextT | None = None,
- stream_mode: StreamMode | list[StreamMode] | None = None,
- interrupt_before: All | Sequence[str] | None = None,
- interrupt_after: All | Sequence[str] | None = None,
- subgraphs: bool = False,
- ) -> Iterator[dict[str, Any] | Any]: ...
- @abstractmethod
- def astream(
- self,
- input: InputT | Command | None,
- config: RunnableConfig | None = None,
- *,
- context: ContextT | None = None,
- stream_mode: StreamMode | list[StreamMode] | None = None,
- interrupt_before: All | Sequence[str] | None = None,
- interrupt_after: All | Sequence[str] | None = None,
- subgraphs: bool = False,
- ) -> AsyncIterator[dict[str, Any] | Any]: ...
- @abstractmethod
- def invoke(
- self,
- input: InputT | Command | None,
- config: RunnableConfig | None = None,
- *,
- context: ContextT | None = None,
- interrupt_before: All | Sequence[str] | None = None,
- interrupt_after: All | Sequence[str] | None = None,
- ) -> dict[str, Any] | Any: ...
- @abstractmethod
- async def ainvoke(
- self,
- input: InputT | Command | None,
- config: RunnableConfig | None = None,
- *,
- context: ContextT | None = None,
- interrupt_before: All | Sequence[str] | None = None,
- interrupt_after: All | Sequence[str] | None = None,
- ) -> dict[str, Any] | Any: ...
- StreamChunk = tuple[tuple[str, ...], str, Any]
- class StreamProtocol:
- __slots__ = ("modes", "__call__")
- modes: set[StreamMode]
- __call__: Callable[[Self, StreamChunk], None]
- def __init__(
- self,
- __call__: Callable[[StreamChunk], None],
- modes: set[StreamMode],
- ) -> None:
- self.__call__ = cast(Callable[[Self, StreamChunk], None], __call__)
- self.modes = modes
|