processor.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. """
  2. 水印处理门面:preset 解析、检测、去水印、对比度增强。
  3. """
  4. from __future__ import annotations
  5. from typing import Any, Dict, List, Optional, Tuple, Union
  6. import numpy as np
  7. from loguru import logger
  8. from PIL import Image
  9. from ocr_utils.watermark.algorithms import detect_watermark
  10. from ocr_utils.watermark.presets import Scope, merge_watermark_config
  11. from ocr_utils.watermark.removal import remove_watermark_from_image_rgb
  12. class WatermarkProcessor:
  13. """页级 / 单元格级水印去除编排。"""
  14. def __init__(
  15. self,
  16. config: Dict[str, Any],
  17. *,
  18. scope: Scope = "page",
  19. ):
  20. self.scope = scope
  21. self.config = merge_watermark_config(scope, config)
  22. @classmethod
  23. def from_user_config(
  24. cls,
  25. user_cfg: Optional[Dict[str, Any]],
  26. *,
  27. scope: Scope = "page",
  28. ) -> "WatermarkProcessor":
  29. return cls(user_cfg or {}, scope=scope)
  30. @property
  31. def enabled(self) -> bool:
  32. return bool(self.config.get("enabled", False))
  33. @property
  34. def method(self) -> str:
  35. return str(self.config.get("method") or "masked_adaptive")
  36. @property
  37. def threshold(self) -> int:
  38. return int(self.config.get("threshold", 175))
  39. @property
  40. def morph_close_kernel(self) -> int:
  41. return int(self.config.get("morph_close_kernel", 0))
  42. def contrast_config(self) -> Optional[Dict[str, Any]]:
  43. ce = self.config.get("contrast_enhancement")
  44. if not isinstance(ce, dict):
  45. return None
  46. if not ce.get("enabled", False):
  47. return None
  48. return dict(ce)
  49. def should_apply(self, image: Union[np.ndarray, Image.Image]) -> bool:
  50. if not self.enabled:
  51. return False
  52. if not bool(self.config.get("detect_before_remove", True)):
  53. return True
  54. detect_cfg = self.config.get("detect")
  55. if not isinstance(detect_cfg, dict):
  56. detect_cfg = {}
  57. angle_range = detect_cfg.get("diagonal_angle_range", (30, 60))
  58. if isinstance(angle_range, list):
  59. angle_range = tuple(angle_range)
  60. return detect_watermark(
  61. image,
  62. midtone_low=int(detect_cfg.get("midtone_low", 100)),
  63. midtone_high=int(detect_cfg.get("midtone_high", 220)),
  64. ratio_threshold=float(detect_cfg.get("ratio_threshold", 0.025)),
  65. check_diagonal=bool(detect_cfg.get("check_diagonal", True)),
  66. diagonal_angle_range=angle_range,
  67. )
  68. def process(
  69. self,
  70. image: Union[np.ndarray, Image.Image],
  71. *,
  72. apply_removal: Optional[bool] = None,
  73. apply_contrast: Optional[bool] = None,
  74. contrast_override: Optional[Dict[str, Any]] = None,
  75. removal_debug: Optional[Dict[str, Any]] = None,
  76. force: bool = False,
  77. ) -> Tuple[np.ndarray, List[str]]:
  78. """
  79. 去水印 + 可选对比度增强。
  80. Returns:
  81. (BGR ndarray, preprocess_stages)
  82. """
  83. stages: List[str] = []
  84. if isinstance(image, Image.Image):
  85. img = np.array(image.convert("RGB"))
  86. img = img[:, :, ::-1].copy() # RGB -> BGR
  87. else:
  88. img = np.array(image)
  89. if img.ndim == 2:
  90. img = np.stack([img, img, img], axis=-1)
  91. do_remove = apply_removal if apply_removal is not None else self.enabled
  92. if do_remove and not force and not self.should_apply(img):
  93. do_remove = False
  94. if contrast_override is not None:
  95. contrast_cfg = dict(contrast_override)
  96. if apply_contrast is not False and not contrast_cfg.get("enabled", True):
  97. contrast_cfg["enabled"] = True
  98. else:
  99. contrast_cfg = self.contrast_config()
  100. if apply_contrast is False:
  101. contrast_cfg = None
  102. elif apply_contrast is True and contrast_cfg is None:
  103. ce = self.config.get("contrast_enhancement") or {}
  104. if isinstance(ce, dict) and ce.get("method"):
  105. contrast_cfg = dict(ce)
  106. contrast_cfg["enabled"] = True
  107. if not do_remove and not contrast_cfg:
  108. return img, stages
  109. try:
  110. if do_remove:
  111. stages.append("wm")
  112. if contrast_cfg:
  113. stages.append("contrast")
  114. cleaned = remove_watermark_from_image_rgb(
  115. img,
  116. threshold=self.threshold,
  117. morph_close_kernel=self.morph_close_kernel,
  118. return_pil=False,
  119. contrast_enhancement=contrast_cfg,
  120. apply_watermark_removal=do_remove,
  121. watermark_removal_cfg=self.config,
  122. removal_debug=removal_debug,
  123. )
  124. return np.asarray(cleaned), stages
  125. except Exception as e:
  126. logger.warning(f"WatermarkProcessor.process failed (scope={self.scope}): {e}")
  127. return img, stages
  128. def get_full_config(self) -> Dict[str, Any]:
  129. return dict(self.config)