| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
- from typing import Any, List, Generic, TypeVar, Optional, cast
- from typing_extensions import Protocol, override, runtime_checkable
- from ._base_client import BasePage, PageInfo, BaseSyncPage, BaseAsyncPage
- __all__ = [
- "SyncPage",
- "AsyncPage",
- "SyncCursorPage",
- "AsyncCursorPage",
- "SyncConversationCursorPage",
- "AsyncConversationCursorPage",
- ]
- _T = TypeVar("_T")
- @runtime_checkable
- class CursorPageItem(Protocol):
- id: Optional[str]
- class SyncPage(BaseSyncPage[_T], BasePage[_T], Generic[_T]):
- """Note: no pagination actually occurs yet, this is for forwards-compatibility."""
- data: List[_T]
- object: str
- @override
- def _get_page_items(self) -> List[_T]:
- data = self.data
- if not data:
- return []
- return data
- @override
- def next_page_info(self) -> None:
- """
- This page represents a response that isn't actually paginated at the API level
- so there will never be a next page.
- """
- return None
- class AsyncPage(BaseAsyncPage[_T], BasePage[_T], Generic[_T]):
- """Note: no pagination actually occurs yet, this is for forwards-compatibility."""
- data: List[_T]
- object: str
- @override
- def _get_page_items(self) -> List[_T]:
- data = self.data
- if not data:
- return []
- return data
- @override
- def next_page_info(self) -> None:
- """
- This page represents a response that isn't actually paginated at the API level
- so there will never be a next page.
- """
- return None
- class SyncCursorPage(BaseSyncPage[_T], BasePage[_T], Generic[_T]):
- data: List[_T]
- has_more: Optional[bool] = None
- @override
- def _get_page_items(self) -> List[_T]:
- data = self.data
- if not data:
- return []
- return data
- @override
- def has_next_page(self) -> bool:
- has_more = self.has_more
- if has_more is not None and has_more is False:
- return False
- return super().has_next_page()
- @override
- def next_page_info(self) -> Optional[PageInfo]:
- data = self.data
- if not data:
- return None
- item = cast(Any, data[-1])
- if not isinstance(item, CursorPageItem) or item.id is None:
- # TODO emit warning log
- return None
- return PageInfo(params={"after": item.id})
- class AsyncCursorPage(BaseAsyncPage[_T], BasePage[_T], Generic[_T]):
- data: List[_T]
- has_more: Optional[bool] = None
- @override
- def _get_page_items(self) -> List[_T]:
- data = self.data
- if not data:
- return []
- return data
- @override
- def has_next_page(self) -> bool:
- has_more = self.has_more
- if has_more is not None and has_more is False:
- return False
- return super().has_next_page()
- @override
- def next_page_info(self) -> Optional[PageInfo]:
- data = self.data
- if not data:
- return None
- item = cast(Any, data[-1])
- if not isinstance(item, CursorPageItem) or item.id is None:
- # TODO emit warning log
- return None
- return PageInfo(params={"after": item.id})
- class SyncConversationCursorPage(BaseSyncPage[_T], BasePage[_T], Generic[_T]):
- data: List[_T]
- has_more: Optional[bool] = None
- last_id: Optional[str] = None
- @override
- def _get_page_items(self) -> List[_T]:
- data = self.data
- if not data:
- return []
- return data
- @override
- def has_next_page(self) -> bool:
- has_more = self.has_more
- if has_more is not None and has_more is False:
- return False
- return super().has_next_page()
- @override
- def next_page_info(self) -> Optional[PageInfo]:
- last_id = self.last_id
- if not last_id:
- return None
- return PageInfo(params={"after": last_id})
- class AsyncConversationCursorPage(BaseAsyncPage[_T], BasePage[_T], Generic[_T]):
- data: List[_T]
- has_more: Optional[bool] = None
- last_id: Optional[str] = None
- @override
- def _get_page_items(self) -> List[_T]:
- data = self.data
- if not data:
- return []
- return data
- @override
- def has_next_page(self) -> bool:
- has_more = self.has_more
- if has_more is not None and has_more is False:
- return False
- return super().has_next_page()
- @override
- def next_page_info(self) -> Optional[PageInfo]:
- last_id = self.last_id
- if not last_id:
- return None
- return PageInfo(params={"after": last_id})
|