_parallel.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import abc
  15. from concurrent.futures import ThreadPoolExecutor
  16. from ...utils import device as device_utils
  17. from ..common.batch_sampler import ImageBatchSampler
  18. from .base import BasePipeline
  19. class MultiDeviceSimpleInferenceExecutor(object):
  20. def __init__(self, pipelines, batch_sampler, *, postprocess_result=None):
  21. super().__init__()
  22. self._pipelines = pipelines
  23. self._batch_sampler = batch_sampler
  24. self._postprocess_result = postprocess_result
  25. @property
  26. def pipelines(self):
  27. return self._pipelines
  28. def execute(
  29. self,
  30. input,
  31. *args,
  32. **kwargs,
  33. ):
  34. with ThreadPoolExecutor(max_workers=len(self._pipelines)) as pool:
  35. input_batches = self._batch_sampler(input)
  36. out_of_data = False
  37. while not out_of_data:
  38. input_future_pairs = []
  39. for pipeline in self._pipelines:
  40. try:
  41. input_batch = next(input_batches)
  42. except StopIteration:
  43. out_of_data = True
  44. break
  45. input_instances = input_batch.instances
  46. future = pool.submit(
  47. lambda pipeline, input_instances, args, kwargs: list(
  48. pipeline.predict(input_instances, *args, **kwargs)
  49. ),
  50. pipeline,
  51. input_instances,
  52. args,
  53. kwargs,
  54. )
  55. input_future_pairs.append((input_batch, future))
  56. # We synchronize here to keep things simple (no data
  57. # prefetching, no queues, no dedicated workers), although
  58. # it's less efficient.
  59. for input_batch, future in input_future_pairs:
  60. result = future.result()
  61. for input_path, result_item in zip(input_batch.input_paths, result):
  62. result_item["input_path"] = input_path
  63. if self._postprocess_result:
  64. result = self._postprocess_result(result, input_batch)
  65. yield from result
  66. class AutoParallelSimpleInferencePipeline(BasePipeline):
  67. def __init__(
  68. self,
  69. config,
  70. *args,
  71. **kwargs,
  72. ):
  73. super().__init__(*args, **kwargs)
  74. self._multi_device_inference = False
  75. if self.device is not None:
  76. device_type, device_ids = device_utils.parse_device(self.device)
  77. if device_ids is not None and len(device_ids) > 1:
  78. self._multi_device_inference = True
  79. self._pipelines = []
  80. for device_id in device_ids:
  81. pipeline = self._create_internal_pipeline(
  82. config, device_utils.constr_device(device_type, [device_id])
  83. )
  84. self._pipelines.append(pipeline)
  85. batch_size = self._get_batch_size(config)
  86. batch_sampler = self._create_batch_sampler(batch_size)
  87. self._executor = MultiDeviceSimpleInferenceExecutor(
  88. self._pipelines,
  89. batch_sampler,
  90. postprocess_result=self._postprocess_result,
  91. )
  92. if not self._multi_device_inference:
  93. self._pipeline = self._create_internal_pipeline(config, self.device)
  94. @property
  95. def multi_device_inference(self):
  96. return self._multi_device_inference
  97. def __getattr__(self, name):
  98. if self._multi_device_inference:
  99. first_pipeline = self._executor.pipelines[0]
  100. return getattr(first_pipeline, name)
  101. else:
  102. return getattr(self._pipeline, name)
  103. def predict(
  104. self,
  105. input,
  106. *args,
  107. **kwargs,
  108. ):
  109. if self._multi_device_inference:
  110. yield from self._executor.execute(
  111. input,
  112. *args,
  113. **kwargs,
  114. )
  115. else:
  116. yield from self._pipeline.predict(
  117. input,
  118. *args,
  119. **kwargs,
  120. )
  121. @abc.abstractmethod
  122. def _create_internal_pipeline(self, config, device):
  123. raise NotImplementedError
  124. @abc.abstractmethod
  125. def _get_batch_size(self, config):
  126. raise NotImplementedError
  127. @abc.abstractmethod
  128. def _create_batch_sampler(self, batch_size):
  129. raise NotImplementedError
  130. def _postprocess_result(self, result, input_batch):
  131. return result
  132. class AutoParallelImageSimpleInferencePipeline(AutoParallelSimpleInferencePipeline):
  133. @property
  134. @abc.abstractmethod
  135. def _pipeline_cls(self):
  136. raise NotImplementedError
  137. def _create_internal_pipeline(self, config, device):
  138. return self._pipeline_cls(
  139. config,
  140. device=device,
  141. pp_option=self.pp_option,
  142. use_hpip=self.use_hpip,
  143. hpi_config=self.hpi_config,
  144. )
  145. def _create_batch_sampler(self, batch_size):
  146. return ImageBatchSampler(batch_size)
  147. def _postprocess_result(self, result, input_batch):
  148. for page_index, item in zip(input_batch.page_indexes, result):
  149. item["page_index"] = page_index
  150. return result