to_thread.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. from __future__ import annotations
  2. __all__ = (
  3. "run_sync",
  4. "current_default_thread_limiter",
  5. )
  6. import sys
  7. from collections.abc import Callable
  8. from typing import TypeVar
  9. from warnings import warn
  10. from ._core._eventloop import get_async_backend
  11. from .abc import CapacityLimiter
  12. if sys.version_info >= (3, 11):
  13. from typing import TypeVarTuple, Unpack
  14. else:
  15. from typing_extensions import TypeVarTuple, Unpack
  16. T_Retval = TypeVar("T_Retval")
  17. PosArgsT = TypeVarTuple("PosArgsT")
  18. async def run_sync(
  19. func: Callable[[Unpack[PosArgsT]], T_Retval],
  20. *args: Unpack[PosArgsT],
  21. abandon_on_cancel: bool = False,
  22. cancellable: bool | None = None,
  23. limiter: CapacityLimiter | None = None,
  24. ) -> T_Retval:
  25. """
  26. Call the given function with the given arguments in a worker thread.
  27. If the ``cancellable`` option is enabled and the task waiting for its completion is
  28. cancelled, the thread will still run its course but its return value (or any raised
  29. exception) will be ignored.
  30. :param func: a callable
  31. :param args: positional arguments for the callable
  32. :param abandon_on_cancel: ``True`` to abandon the thread (leaving it to run
  33. unchecked on own) if the host task is cancelled, ``False`` to ignore
  34. cancellations in the host task until the operation has completed in the worker
  35. thread
  36. :param cancellable: deprecated alias of ``abandon_on_cancel``; will override
  37. ``abandon_on_cancel`` if both parameters are passed
  38. :param limiter: capacity limiter to use to limit the total amount of threads running
  39. (if omitted, the default limiter is used)
  40. :return: an awaitable that yields the return value of the function.
  41. """
  42. if cancellable is not None:
  43. abandon_on_cancel = cancellable
  44. warn(
  45. "The `cancellable=` keyword argument to `anyio.to_thread.run_sync` is "
  46. "deprecated since AnyIO 4.1.0; use `abandon_on_cancel=` instead",
  47. DeprecationWarning,
  48. stacklevel=2,
  49. )
  50. return await get_async_backend().run_sync_in_worker_thread(
  51. func, args, abandon_on_cancel=abandon_on_cancel, limiter=limiter
  52. )
  53. def current_default_thread_limiter() -> CapacityLimiter:
  54. """
  55. Return the capacity limiter that is used by default to limit the number of
  56. concurrent threads.
  57. :return: a capacity limiter object
  58. """
  59. return get_async_backend().current_default_thread_limiter()