to_process.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. from __future__ import annotations
  2. __all__ = (
  3. "current_default_process_limiter",
  4. "process_worker",
  5. "run_sync",
  6. )
  7. import os
  8. import pickle
  9. import subprocess
  10. import sys
  11. from collections import deque
  12. from collections.abc import Callable
  13. from importlib.util import module_from_spec, spec_from_file_location
  14. from typing import TypeVar, cast
  15. from ._core._eventloop import current_time, get_async_backend, get_cancelled_exc_class
  16. from ._core._exceptions import BrokenWorkerProcess
  17. from ._core._subprocesses import open_process
  18. from ._core._synchronization import CapacityLimiter
  19. from ._core._tasks import CancelScope, fail_after
  20. from .abc import ByteReceiveStream, ByteSendStream, Process
  21. from .lowlevel import RunVar, checkpoint_if_cancelled
  22. from .streams.buffered import BufferedByteReceiveStream
  23. if sys.version_info >= (3, 11):
  24. from typing import TypeVarTuple, Unpack
  25. else:
  26. from typing_extensions import TypeVarTuple, Unpack
  27. WORKER_MAX_IDLE_TIME = 300 # 5 minutes
  28. T_Retval = TypeVar("T_Retval")
  29. PosArgsT = TypeVarTuple("PosArgsT")
  30. _process_pool_workers: RunVar[set[Process]] = RunVar("_process_pool_workers")
  31. _process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar(
  32. "_process_pool_idle_workers"
  33. )
  34. _default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter")
  35. async def run_sync( # type: ignore[return]
  36. func: Callable[[Unpack[PosArgsT]], T_Retval],
  37. *args: Unpack[PosArgsT],
  38. cancellable: bool = False,
  39. limiter: CapacityLimiter | None = None,
  40. ) -> T_Retval:
  41. """
  42. Call the given function with the given arguments in a worker process.
  43. If the ``cancellable`` option is enabled and the task waiting for its completion is
  44. cancelled, the worker process running it will be abruptly terminated using SIGKILL
  45. (or ``terminateProcess()`` on Windows).
  46. :param func: a callable
  47. :param args: positional arguments for the callable
  48. :param cancellable: ``True`` to allow cancellation of the operation while it's
  49. running
  50. :param limiter: capacity limiter to use to limit the total amount of processes
  51. running (if omitted, the default limiter is used)
  52. :return: an awaitable that yields the return value of the function.
  53. """
  54. async def send_raw_command(pickled_cmd: bytes) -> object:
  55. try:
  56. await stdin.send(pickled_cmd)
  57. response = await buffered.receive_until(b"\n", 50)
  58. status, length = response.split(b" ")
  59. if status not in (b"RETURN", b"EXCEPTION"):
  60. raise RuntimeError(
  61. f"Worker process returned unexpected response: {response!r}"
  62. )
  63. pickled_response = await buffered.receive_exactly(int(length))
  64. except BaseException as exc:
  65. workers.discard(process)
  66. try:
  67. process.kill()
  68. with CancelScope(shield=True):
  69. await process.aclose()
  70. except ProcessLookupError:
  71. pass
  72. if isinstance(exc, get_cancelled_exc_class()):
  73. raise
  74. else:
  75. raise BrokenWorkerProcess from exc
  76. retval = pickle.loads(pickled_response)
  77. if status == b"EXCEPTION":
  78. assert isinstance(retval, BaseException)
  79. raise retval
  80. else:
  81. return retval
  82. # First pickle the request before trying to reserve a worker process
  83. await checkpoint_if_cancelled()
  84. request = pickle.dumps(("run", func, args), protocol=pickle.HIGHEST_PROTOCOL)
  85. # If this is the first run in this event loop thread, set up the necessary variables
  86. try:
  87. workers = _process_pool_workers.get()
  88. idle_workers = _process_pool_idle_workers.get()
  89. except LookupError:
  90. workers = set()
  91. idle_workers = deque()
  92. _process_pool_workers.set(workers)
  93. _process_pool_idle_workers.set(idle_workers)
  94. get_async_backend().setup_process_pool_exit_at_shutdown(workers)
  95. async with limiter or current_default_process_limiter():
  96. # Pop processes from the pool (starting from the most recently used) until we
  97. # find one that hasn't exited yet
  98. process: Process
  99. while idle_workers:
  100. process, idle_since = idle_workers.pop()
  101. if process.returncode is None:
  102. stdin = cast(ByteSendStream, process.stdin)
  103. buffered = BufferedByteReceiveStream(
  104. cast(ByteReceiveStream, process.stdout)
  105. )
  106. # Prune any other workers that have been idle for WORKER_MAX_IDLE_TIME
  107. # seconds or longer
  108. now = current_time()
  109. killed_processes: list[Process] = []
  110. while idle_workers:
  111. if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME:
  112. break
  113. process_to_kill, idle_since = idle_workers.popleft()
  114. process_to_kill.kill()
  115. workers.remove(process_to_kill)
  116. killed_processes.append(process_to_kill)
  117. with CancelScope(shield=True):
  118. for killed_process in killed_processes:
  119. await killed_process.aclose()
  120. break
  121. workers.remove(process)
  122. else:
  123. command = [sys.executable, "-u", "-m", __name__]
  124. process = await open_process(
  125. command, stdin=subprocess.PIPE, stdout=subprocess.PIPE
  126. )
  127. try:
  128. stdin = cast(ByteSendStream, process.stdin)
  129. buffered = BufferedByteReceiveStream(
  130. cast(ByteReceiveStream, process.stdout)
  131. )
  132. with fail_after(20):
  133. message = await buffered.receive(6)
  134. if message != b"READY\n":
  135. raise BrokenWorkerProcess(
  136. f"Worker process returned unexpected response: {message!r}"
  137. )
  138. main_module_path = getattr(sys.modules["__main__"], "__file__", None)
  139. pickled = pickle.dumps(
  140. ("init", sys.path, main_module_path),
  141. protocol=pickle.HIGHEST_PROTOCOL,
  142. )
  143. await send_raw_command(pickled)
  144. except (BrokenWorkerProcess, get_cancelled_exc_class()):
  145. raise
  146. except BaseException as exc:
  147. process.kill()
  148. raise BrokenWorkerProcess(
  149. "Error during worker process initialization"
  150. ) from exc
  151. workers.add(process)
  152. with CancelScope(shield=not cancellable):
  153. try:
  154. return cast(T_Retval, await send_raw_command(request))
  155. finally:
  156. if process in workers:
  157. idle_workers.append((process, current_time()))
  158. def current_default_process_limiter() -> CapacityLimiter:
  159. """
  160. Return the capacity limiter that is used by default to limit the number of worker
  161. processes.
  162. :return: a capacity limiter object
  163. """
  164. try:
  165. return _default_process_limiter.get()
  166. except LookupError:
  167. limiter = CapacityLimiter(os.cpu_count() or 2)
  168. _default_process_limiter.set(limiter)
  169. return limiter
  170. def process_worker() -> None:
  171. # Redirect standard streams to os.devnull so that user code won't interfere with the
  172. # parent-worker communication
  173. stdin = sys.stdin
  174. stdout = sys.stdout
  175. sys.stdin = open(os.devnull)
  176. sys.stdout = open(os.devnull, "w")
  177. stdout.buffer.write(b"READY\n")
  178. while True:
  179. retval = exception = None
  180. try:
  181. command, *args = pickle.load(stdin.buffer)
  182. except EOFError:
  183. return
  184. except BaseException as exc:
  185. exception = exc
  186. else:
  187. if command == "run":
  188. func, args = args
  189. try:
  190. retval = func(*args)
  191. except BaseException as exc:
  192. exception = exc
  193. elif command == "init":
  194. main_module_path: str | None
  195. sys.path, main_module_path = args
  196. del sys.modules["__main__"]
  197. if main_module_path and os.path.isfile(main_module_path):
  198. # Load the parent's main module but as __mp_main__ instead of
  199. # __main__ (like multiprocessing does) to avoid infinite recursion
  200. try:
  201. spec = spec_from_file_location("__mp_main__", main_module_path)
  202. if spec and spec.loader:
  203. main = module_from_spec(spec)
  204. spec.loader.exec_module(main)
  205. sys.modules["__main__"] = main
  206. except BaseException as exc:
  207. exception = exc
  208. try:
  209. if exception is not None:
  210. status = b"EXCEPTION"
  211. pickled = pickle.dumps(exception, pickle.HIGHEST_PROTOCOL)
  212. else:
  213. status = b"RETURN"
  214. pickled = pickle.dumps(retval, pickle.HIGHEST_PROTOCOL)
  215. except BaseException as exc:
  216. exception = exc
  217. status = b"EXCEPTION"
  218. pickled = pickle.dumps(exc, pickle.HIGHEST_PROTOCOL)
  219. stdout.buffer.write(b"%s %d\n" % (status, len(pickled)))
  220. stdout.buffer.write(pickled)
  221. # Respect SIGTERM
  222. if isinstance(exception, SystemExit):
  223. raise exception
  224. if __name__ == "__main__":
  225. process_worker()