| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
- from __future__ import annotations
- import hmac
- import json
- import time
- import base64
- import hashlib
- from typing import cast
- from .._types import HeadersLike
- from .._utils import get_required_header
- from .._models import construct_type
- from .._resource import SyncAPIResource, AsyncAPIResource
- from .._exceptions import InvalidWebhookSignatureError
- from ..types.webhooks.unwrap_webhook_event import UnwrapWebhookEvent
- __all__ = ["Webhooks", "AsyncWebhooks"]
- class Webhooks(SyncAPIResource):
- def unwrap(
- self,
- payload: str | bytes,
- headers: HeadersLike,
- *,
- secret: str | None = None,
- ) -> UnwrapWebhookEvent:
- """Validates that the given payload was sent by OpenAI and parses the payload."""
- if secret is None:
- secret = self._client.webhook_secret
- self.verify_signature(payload=payload, headers=headers, secret=secret)
- return cast(
- UnwrapWebhookEvent,
- construct_type(
- type_=UnwrapWebhookEvent,
- value=json.loads(payload),
- ),
- )
- def verify_signature(
- self,
- payload: str | bytes,
- headers: HeadersLike,
- *,
- secret: str | None = None,
- tolerance: int = 300,
- ) -> None:
- """Validates whether or not the webhook payload was sent by OpenAI.
- Args:
- payload: The webhook payload
- headers: The webhook headers
- secret: The webhook secret (optional, will use client secret if not provided)
- tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
- """
- if secret is None:
- secret = self._client.webhook_secret
- if secret is None:
- raise ValueError(
- "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
- "on the client class, OpenAI(webhook_secret='123'), or passed to this function"
- )
- signature_header = get_required_header(headers, "webhook-signature")
- timestamp = get_required_header(headers, "webhook-timestamp")
- webhook_id = get_required_header(headers, "webhook-id")
- # Validate timestamp to prevent replay attacks
- try:
- timestamp_seconds = int(timestamp)
- except ValueError:
- raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
- now = int(time.time())
- if now - timestamp_seconds > tolerance:
- raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
- if timestamp_seconds > now + tolerance:
- raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
- # Extract signatures from v1,<base64> format
- # The signature header can have multiple values, separated by spaces.
- # Each value is in the format v1,<base64>. We should accept if any match.
- signatures: list[str] = []
- for part in signature_header.split():
- if part.startswith("v1,"):
- signatures.append(part[3:])
- else:
- signatures.append(part)
- # Decode the secret if it starts with whsec_
- if secret.startswith("whsec_"):
- decoded_secret = base64.b64decode(secret[6:])
- else:
- decoded_secret = secret.encode()
- body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
- # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
- signed_payload = f"{webhook_id}.{timestamp}.{body}"
- expected_signature = base64.b64encode(
- hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
- ).decode()
- # Accept if any signature matches
- if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
- raise InvalidWebhookSignatureError(
- "The given webhook signature does not match the expected signature"
- ) from None
- class AsyncWebhooks(AsyncAPIResource):
- def unwrap(
- self,
- payload: str | bytes,
- headers: HeadersLike,
- *,
- secret: str | None = None,
- ) -> UnwrapWebhookEvent:
- """Validates that the given payload was sent by OpenAI and parses the payload."""
- if secret is None:
- secret = self._client.webhook_secret
- self.verify_signature(payload=payload, headers=headers, secret=secret)
- body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
- return cast(
- UnwrapWebhookEvent,
- construct_type(
- type_=UnwrapWebhookEvent,
- value=json.loads(body),
- ),
- )
- def verify_signature(
- self,
- payload: str | bytes,
- headers: HeadersLike,
- *,
- secret: str | None = None,
- tolerance: int = 300,
- ) -> None:
- """Validates whether or not the webhook payload was sent by OpenAI.
- Args:
- payload: The webhook payload
- headers: The webhook headers
- secret: The webhook secret (optional, will use client secret if not provided)
- tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
- """
- if secret is None:
- secret = self._client.webhook_secret
- if secret is None:
- raise ValueError(
- "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
- "on the client class, OpenAI(webhook_secret='123'), or passed to this function"
- ) from None
- signature_header = get_required_header(headers, "webhook-signature")
- timestamp = get_required_header(headers, "webhook-timestamp")
- webhook_id = get_required_header(headers, "webhook-id")
- # Validate timestamp to prevent replay attacks
- try:
- timestamp_seconds = int(timestamp)
- except ValueError:
- raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
- now = int(time.time())
- if now - timestamp_seconds > tolerance:
- raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
- if timestamp_seconds > now + tolerance:
- raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
- # Extract signatures from v1,<base64> format
- # The signature header can have multiple values, separated by spaces.
- # Each value is in the format v1,<base64>. We should accept if any match.
- signatures: list[str] = []
- for part in signature_header.split():
- if part.startswith("v1,"):
- signatures.append(part[3:])
- else:
- signatures.append(part)
- # Decode the secret if it starts with whsec_
- if secret.startswith("whsec_"):
- decoded_secret = base64.b64decode(secret[6:])
- else:
- decoded_secret = secret.encode()
- body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
- # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
- signed_payload = f"{webhook_id}.{timestamp}.{body}"
- expected_signature = base64.b64encode(
- hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
- ).decode()
- # Accept if any signature matches
- if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
- raise InvalidWebhookSignatureError("The given webhook signature does not match the expected signature")
|