to_process.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. from __future__ import annotations
  2. import os
  3. import pickle
  4. import subprocess
  5. import sys
  6. from collections import deque
  7. from importlib.util import module_from_spec, spec_from_file_location
  8. from typing import Callable, TypeVar, cast
  9. from ._core._eventloop import current_time, get_asynclib, get_cancelled_exc_class
  10. from ._core._exceptions import BrokenWorkerProcess
  11. from ._core._subprocesses import open_process
  12. from ._core._synchronization import CapacityLimiter
  13. from ._core._tasks import CancelScope, fail_after
  14. from .abc import ByteReceiveStream, ByteSendStream, Process
  15. from .lowlevel import RunVar, checkpoint_if_cancelled
  16. from .streams.buffered import BufferedByteReceiveStream
  17. WORKER_MAX_IDLE_TIME = 300 # 5 minutes
  18. T_Retval = TypeVar("T_Retval")
  19. _process_pool_workers: RunVar[set[Process]] = RunVar("_process_pool_workers")
  20. _process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar(
  21. "_process_pool_idle_workers"
  22. )
  23. _default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter")
  24. async def run_sync(
  25. func: Callable[..., T_Retval],
  26. *args: object,
  27. cancellable: bool = False,
  28. limiter: CapacityLimiter | None = None,
  29. ) -> T_Retval:
  30. """
  31. Call the given function with the given arguments in a worker process.
  32. If the ``cancellable`` option is enabled and the task waiting for its completion is cancelled,
  33. the worker process running it will be abruptly terminated using SIGKILL (or
  34. ``terminateProcess()`` on Windows).
  35. :param func: a callable
  36. :param args: positional arguments for the callable
  37. :param cancellable: ``True`` to allow cancellation of the operation while it's running
  38. :param limiter: capacity limiter to use to limit the total amount of processes running
  39. (if omitted, the default limiter is used)
  40. :return: an awaitable that yields the return value of the function.
  41. """
  42. async def send_raw_command(pickled_cmd: bytes) -> object:
  43. try:
  44. await stdin.send(pickled_cmd)
  45. response = await buffered.receive_until(b"\n", 50)
  46. status, length = response.split(b" ")
  47. if status not in (b"RETURN", b"EXCEPTION"):
  48. raise RuntimeError(
  49. f"Worker process returned unexpected response: {response!r}"
  50. )
  51. pickled_response = await buffered.receive_exactly(int(length))
  52. except BaseException as exc:
  53. workers.discard(process)
  54. try:
  55. process.kill()
  56. with CancelScope(shield=True):
  57. await process.aclose()
  58. except ProcessLookupError:
  59. pass
  60. if isinstance(exc, get_cancelled_exc_class()):
  61. raise
  62. else:
  63. raise BrokenWorkerProcess from exc
  64. retval = pickle.loads(pickled_response)
  65. if status == b"EXCEPTION":
  66. assert isinstance(retval, BaseException)
  67. raise retval
  68. else:
  69. return retval
  70. # First pickle the request before trying to reserve a worker process
  71. await checkpoint_if_cancelled()
  72. request = pickle.dumps(("run", func, args), protocol=pickle.HIGHEST_PROTOCOL)
  73. # If this is the first run in this event loop thread, set up the necessary variables
  74. try:
  75. workers = _process_pool_workers.get()
  76. idle_workers = _process_pool_idle_workers.get()
  77. except LookupError:
  78. workers = set()
  79. idle_workers = deque()
  80. _process_pool_workers.set(workers)
  81. _process_pool_idle_workers.set(idle_workers)
  82. get_asynclib().setup_process_pool_exit_at_shutdown(workers)
  83. async with (limiter or current_default_process_limiter()):
  84. # Pop processes from the pool (starting from the most recently used) until we find one that
  85. # hasn't exited yet
  86. process: Process
  87. while idle_workers:
  88. process, idle_since = idle_workers.pop()
  89. if process.returncode is None:
  90. stdin = cast(ByteSendStream, process.stdin)
  91. buffered = BufferedByteReceiveStream(
  92. cast(ByteReceiveStream, process.stdout)
  93. )
  94. # Prune any other workers that have been idle for WORKER_MAX_IDLE_TIME seconds or
  95. # longer
  96. now = current_time()
  97. killed_processes: list[Process] = []
  98. while idle_workers:
  99. if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME:
  100. break
  101. process, idle_since = idle_workers.popleft()
  102. process.kill()
  103. workers.remove(process)
  104. killed_processes.append(process)
  105. with CancelScope(shield=True):
  106. for process in killed_processes:
  107. await process.aclose()
  108. break
  109. workers.remove(process)
  110. else:
  111. command = [sys.executable, "-u", "-m", __name__]
  112. process = await open_process(
  113. command, stdin=subprocess.PIPE, stdout=subprocess.PIPE
  114. )
  115. try:
  116. stdin = cast(ByteSendStream, process.stdin)
  117. buffered = BufferedByteReceiveStream(
  118. cast(ByteReceiveStream, process.stdout)
  119. )
  120. with fail_after(20):
  121. message = await buffered.receive(6)
  122. if message != b"READY\n":
  123. raise BrokenWorkerProcess(
  124. f"Worker process returned unexpected response: {message!r}"
  125. )
  126. main_module_path = getattr(sys.modules["__main__"], "__file__", None)
  127. pickled = pickle.dumps(
  128. ("init", sys.path, main_module_path),
  129. protocol=pickle.HIGHEST_PROTOCOL,
  130. )
  131. await send_raw_command(pickled)
  132. except (BrokenWorkerProcess, get_cancelled_exc_class()):
  133. raise
  134. except BaseException as exc:
  135. process.kill()
  136. raise BrokenWorkerProcess(
  137. "Error during worker process initialization"
  138. ) from exc
  139. workers.add(process)
  140. with CancelScope(shield=not cancellable):
  141. try:
  142. return cast(T_Retval, await send_raw_command(request))
  143. finally:
  144. if process in workers:
  145. idle_workers.append((process, current_time()))
  146. def current_default_process_limiter() -> CapacityLimiter:
  147. """
  148. Return the capacity limiter that is used by default to limit the number of worker processes.
  149. :return: a capacity limiter object
  150. """
  151. try:
  152. return _default_process_limiter.get()
  153. except LookupError:
  154. limiter = CapacityLimiter(os.cpu_count() or 2)
  155. _default_process_limiter.set(limiter)
  156. return limiter
  157. def process_worker() -> None:
  158. # Redirect standard streams to os.devnull so that user code won't interfere with the
  159. # parent-worker communication
  160. stdin = sys.stdin
  161. stdout = sys.stdout
  162. sys.stdin = open(os.devnull)
  163. sys.stdout = open(os.devnull, "w")
  164. stdout.buffer.write(b"READY\n")
  165. while True:
  166. retval = exception = None
  167. try:
  168. command, *args = pickle.load(stdin.buffer)
  169. except EOFError:
  170. return
  171. except BaseException as exc:
  172. exception = exc
  173. else:
  174. if command == "run":
  175. func, args = args
  176. try:
  177. retval = func(*args)
  178. except BaseException as exc:
  179. exception = exc
  180. elif command == "init":
  181. main_module_path: str | None
  182. sys.path, main_module_path = args
  183. del sys.modules["__main__"]
  184. if main_module_path:
  185. # Load the parent's main module but as __mp_main__ instead of __main__
  186. # (like multiprocessing does) to avoid infinite recursion
  187. try:
  188. spec = spec_from_file_location("__mp_main__", main_module_path)
  189. if spec and spec.loader:
  190. main = module_from_spec(spec)
  191. spec.loader.exec_module(main)
  192. sys.modules["__main__"] = main
  193. except BaseException as exc:
  194. exception = exc
  195. try:
  196. if exception is not None:
  197. status = b"EXCEPTION"
  198. pickled = pickle.dumps(exception, pickle.HIGHEST_PROTOCOL)
  199. else:
  200. status = b"RETURN"
  201. pickled = pickle.dumps(retval, pickle.HIGHEST_PROTOCOL)
  202. except BaseException as exc:
  203. exception = exc
  204. status = b"EXCEPTION"
  205. pickled = pickle.dumps(exc, pickle.HIGHEST_PROTOCOL)
  206. stdout.buffer.write(b"%s %d\n" % (status, len(pickled)))
  207. stdout.buffer.write(pickled)
  208. # Respect SIGTERM
  209. if isinstance(exception, SystemExit):
  210. raise exception
  211. if __name__ == "__main__":
  212. process_worker()