webhooks.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
  2. from __future__ import annotations
  3. import hmac
  4. import json
  5. import time
  6. import base64
  7. import hashlib
  8. from typing import cast
  9. from .._types import HeadersLike
  10. from .._utils import get_required_header
  11. from .._models import construct_type
  12. from .._resource import SyncAPIResource, AsyncAPIResource
  13. from .._exceptions import InvalidWebhookSignatureError
  14. from ..types.webhooks.unwrap_webhook_event import UnwrapWebhookEvent
  15. __all__ = ["Webhooks", "AsyncWebhooks"]
  16. class Webhooks(SyncAPIResource):
  17. def unwrap(
  18. self,
  19. payload: str | bytes,
  20. headers: HeadersLike,
  21. *,
  22. secret: str | None = None,
  23. ) -> UnwrapWebhookEvent:
  24. """Validates that the given payload was sent by OpenAI and parses the payload."""
  25. if secret is None:
  26. secret = self._client.webhook_secret
  27. self.verify_signature(payload=payload, headers=headers, secret=secret)
  28. return cast(
  29. UnwrapWebhookEvent,
  30. construct_type(
  31. type_=UnwrapWebhookEvent,
  32. value=json.loads(payload),
  33. ),
  34. )
  35. def verify_signature(
  36. self,
  37. payload: str | bytes,
  38. headers: HeadersLike,
  39. *,
  40. secret: str | None = None,
  41. tolerance: int = 300,
  42. ) -> None:
  43. """Validates whether or not the webhook payload was sent by OpenAI.
  44. Args:
  45. payload: The webhook payload
  46. headers: The webhook headers
  47. secret: The webhook secret (optional, will use client secret if not provided)
  48. tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
  49. """
  50. if secret is None:
  51. secret = self._client.webhook_secret
  52. if secret is None:
  53. raise ValueError(
  54. "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
  55. "on the client class, OpenAI(webhook_secret='123'), or passed to this function"
  56. )
  57. signature_header = get_required_header(headers, "webhook-signature")
  58. timestamp = get_required_header(headers, "webhook-timestamp")
  59. webhook_id = get_required_header(headers, "webhook-id")
  60. # Validate timestamp to prevent replay attacks
  61. try:
  62. timestamp_seconds = int(timestamp)
  63. except ValueError:
  64. raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
  65. now = int(time.time())
  66. if now - timestamp_seconds > tolerance:
  67. raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
  68. if timestamp_seconds > now + tolerance:
  69. raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
  70. # Extract signatures from v1,<base64> format
  71. # The signature header can have multiple values, separated by spaces.
  72. # Each value is in the format v1,<base64>. We should accept if any match.
  73. signatures: list[str] = []
  74. for part in signature_header.split():
  75. if part.startswith("v1,"):
  76. signatures.append(part[3:])
  77. else:
  78. signatures.append(part)
  79. # Decode the secret if it starts with whsec_
  80. if secret.startswith("whsec_"):
  81. decoded_secret = base64.b64decode(secret[6:])
  82. else:
  83. decoded_secret = secret.encode()
  84. body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
  85. # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
  86. signed_payload = f"{webhook_id}.{timestamp}.{body}"
  87. expected_signature = base64.b64encode(
  88. hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
  89. ).decode()
  90. # Accept if any signature matches
  91. if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
  92. raise InvalidWebhookSignatureError(
  93. "The given webhook signature does not match the expected signature"
  94. ) from None
  95. class AsyncWebhooks(AsyncAPIResource):
  96. def unwrap(
  97. self,
  98. payload: str | bytes,
  99. headers: HeadersLike,
  100. *,
  101. secret: str | None = None,
  102. ) -> UnwrapWebhookEvent:
  103. """Validates that the given payload was sent by OpenAI and parses the payload."""
  104. if secret is None:
  105. secret = self._client.webhook_secret
  106. self.verify_signature(payload=payload, headers=headers, secret=secret)
  107. body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
  108. return cast(
  109. UnwrapWebhookEvent,
  110. construct_type(
  111. type_=UnwrapWebhookEvent,
  112. value=json.loads(body),
  113. ),
  114. )
  115. def verify_signature(
  116. self,
  117. payload: str | bytes,
  118. headers: HeadersLike,
  119. *,
  120. secret: str | None = None,
  121. tolerance: int = 300,
  122. ) -> None:
  123. """Validates whether or not the webhook payload was sent by OpenAI.
  124. Args:
  125. payload: The webhook payload
  126. headers: The webhook headers
  127. secret: The webhook secret (optional, will use client secret if not provided)
  128. tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
  129. """
  130. if secret is None:
  131. secret = self._client.webhook_secret
  132. if secret is None:
  133. raise ValueError(
  134. "The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
  135. "on the client class, OpenAI(webhook_secret='123'), or passed to this function"
  136. ) from None
  137. signature_header = get_required_header(headers, "webhook-signature")
  138. timestamp = get_required_header(headers, "webhook-timestamp")
  139. webhook_id = get_required_header(headers, "webhook-id")
  140. # Validate timestamp to prevent replay attacks
  141. try:
  142. timestamp_seconds = int(timestamp)
  143. except ValueError:
  144. raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
  145. now = int(time.time())
  146. if now - timestamp_seconds > tolerance:
  147. raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
  148. if timestamp_seconds > now + tolerance:
  149. raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
  150. # Extract signatures from v1,<base64> format
  151. # The signature header can have multiple values, separated by spaces.
  152. # Each value is in the format v1,<base64>. We should accept if any match.
  153. signatures: list[str] = []
  154. for part in signature_header.split():
  155. if part.startswith("v1,"):
  156. signatures.append(part[3:])
  157. else:
  158. signatures.append(part)
  159. # Decode the secret if it starts with whsec_
  160. if secret.startswith("whsec_"):
  161. decoded_secret = base64.b64decode(secret[6:])
  162. else:
  163. decoded_secret = secret.encode()
  164. body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
  165. # Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
  166. signed_payload = f"{webhook_id}.{timestamp}.{body}"
  167. expected_signature = base64.b64encode(
  168. hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
  169. ).decode()
  170. # Accept if any signature matches
  171. if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
  172. raise InvalidWebhookSignatureError("The given webhook signature does not match the expected signature")