fused_mask.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. """
  2. 方案3:多策略融合水印 mask。
  3. 在现有 light_on_white / diagonal_midtone 基础上新增背景差异策略,
  4. 三种策略结果做 OR 融合,覆盖更多水印类型。
  5. 用法:
  6. from watermark_lab.fused_mask import build_fused_watermark_mask
  7. mask, debug = build_fused_watermark_mask(gray, bgr=bgr)
  8. """
  9. from __future__ import annotations
  10. import sys
  11. from pathlib import Path
  12. from typing import Any, Dict, Optional
  13. import cv2
  14. import numpy as np
  15. _repo_root = Path(__file__).resolve().parents[3]
  16. if str(_repo_root) not in sys.path:
  17. sys.path.insert(0, str(_repo_root))
  18. import ocr_utils.watermark.algorithms as _algo
  19. # ── 策略C:背景差异残差 ──────────────────────────────────────────
  20. def _build_background_diff_mask(
  21. gray: np.ndarray,
  22. *,
  23. median_kernel: int = 31,
  24. diff_low: float = 6.0,
  25. diff_high: float = 50.0,
  26. morph_open_kernel: int = 3,
  27. dilate_radius: int = 0,
  28. ) -> np.ndarray:
  29. """
  30. 大核中值滤波估计背景 → 残差 → 提取半透明水印纹理。
  31. 原理:
  32. - 大核 medianBlur 抹掉文字(高频),保留背景(低频) + 水印(中低频)
  33. - 残差 |original - bg| 里,正文差异大(>50),水印差异中等(6~50),纯背景差异小(<6)
  34. - 取 6~50 范围的残差作为水印候选
  35. Args:
  36. gray: 灰度图 ndarray (H, W)
  37. median_kernel: 中值滤波核大小,越大抹文字越干净但水印也可能被模糊
  38. diff_low: 残差下限
  39. diff_high: 残差上限
  40. morph_open_kernel: 形态学开运算核大小(去噪点)
  41. dilate_radius: 膨胀半径(连接碎片水印)
  42. Returns:
  43. bool ndarray (H, W),True=疑似水印
  44. """
  45. gray_f = np.asarray(gray, dtype=np.float32)
  46. ksize = max(3, int(median_kernel)) | 1 # 确保奇数
  47. bg = cv2.medianBlur(gray, ksize).astype(np.float32)
  48. diff = cv2.absdiff(gray_f, bg)
  49. mask = (diff > diff_low) & (diff < diff_high)
  50. if morph_open_kernel > 0 and np.any(mask):
  51. k = cv2.getStructuringElement(
  52. cv2.MORPH_ELLIPSE, (morph_open_kernel, morph_open_kernel)
  53. )
  54. mask = cv2.morphologyEx(mask.astype(np.uint8), cv2.MORPH_OPEN, k) > 0
  55. if dilate_radius > 0 and np.any(mask):
  56. k = cv2.getStructuringElement(
  57. cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1)
  58. )
  59. mask = cv2.dilate(mask.astype(np.uint8), k) > 0
  60. return mask
  61. # ── 融合主函数 ──────────────────────────────────────────────────
  62. def build_fused_watermark_mask(
  63. gray: np.ndarray,
  64. *,
  65. bgr: Optional[np.ndarray] = None,
  66. # ── 策略A: light_on_white ──
  67. a_enabled: bool = True,
  68. a_light_gray_low: int = 220,
  69. a_light_gray_high: int = 254,
  70. a_direction_filter: str = "hough",
  71. a_text_protect_gray_max: int = 130,
  72. a_min_component_area: int = 200,
  73. # ── 策略B: diagonal_midtone ──
  74. b_enabled: bool = True,
  75. b_midtone_low: int = 100,
  76. b_midtone_high: int = 230,
  77. b_background_threshold: int = 248,
  78. # ── 策略C: 背景差异 ──
  79. c_enabled: bool = True,
  80. c_median_kernel: int = 31,
  81. c_diff_low: float = 6.0,
  82. c_diff_high: float = 50.0,
  83. c_morph_open_kernel: int = 3,
  84. c_dilate_radius: int = 0,
  85. # ── 通用后处理 ──
  86. min_component_area: int = 200,
  87. seal_protect: bool = True,
  88. ) -> tuple[np.ndarray, Dict[str, Any]]:
  89. """
  90. 多策略融合水印 mask。
  91. 三种策略 OR 融合 → 连通域过滤 → 公章保护 → 最终 mask。
  92. Returns:
  93. mask: bool ndarray (H, W)
  94. debug: 包含每项策略的中间结果和统计信息
  95. """
  96. masks: list[np.ndarray] = []
  97. debug: Dict[str, Any] = {"strategies": {}, "fused_ratio": None}
  98. # ── 策略A: light_on_white ──
  99. if a_enabled:
  100. try:
  101. ma, da = _algo.build_watermark_mask(
  102. gray,
  103. bgr=bgr,
  104. mask_mode="light_on_white",
  105. light_gray_low=a_light_gray_low,
  106. light_gray_high=a_light_gray_high,
  107. direction_filter=a_direction_filter,
  108. text_protect_gray_max=a_text_protect_gray_max,
  109. min_component_area=a_min_component_area,
  110. seal_protect=seal_protect,
  111. )
  112. masks.append(ma)
  113. debug["strategies"]["light_on_white"] = {
  114. "ratio": float(ma.sum() / gray.size),
  115. **{k: v for k, v in da.items() if not isinstance(v, np.ndarray)},
  116. }
  117. except Exception as e:
  118. debug["strategies"]["light_on_white"] = {"error": str(e), "ratio": 0.0}
  119. # ── 策略B: diagonal_midtone ──
  120. if b_enabled:
  121. try:
  122. mb, db = _algo.build_watermark_mask(
  123. gray,
  124. bgr=bgr,
  125. mask_mode="diagonal_midtone",
  126. midtone_low=b_midtone_low,
  127. midtone_high=b_midtone_high,
  128. background_threshold=b_background_threshold,
  129. min_component_area=min_component_area,
  130. )
  131. masks.append(mb)
  132. debug["strategies"]["diagonal_midtone"] = {
  133. "ratio": float(mb.sum() / gray.size),
  134. **{k: v for k, v in db.items() if not isinstance(v, np.ndarray)},
  135. }
  136. except Exception as e:
  137. debug["strategies"]["diagonal_midtone"] = {"error": str(e), "ratio": 0.0}
  138. # ── 策略C: 背景差异 ──
  139. if c_enabled:
  140. try:
  141. mc = _build_background_diff_mask(
  142. gray,
  143. median_kernel=c_median_kernel,
  144. diff_low=c_diff_low,
  145. diff_high=c_diff_high,
  146. morph_open_kernel=c_morph_open_kernel,
  147. dilate_radius=c_dilate_radius,
  148. )
  149. masks.append(mc)
  150. debug["strategies"]["background_diff"] = {
  151. "ratio": float(mc.sum() / gray.size),
  152. "median_kernel": c_median_kernel,
  153. "diff_low": c_diff_low,
  154. "diff_high": c_diff_high,
  155. }
  156. except Exception as e:
  157. debug["strategies"]["background_diff"] = {"error": str(e), "ratio": 0.0}
  158. # ── OR 融合 ──
  159. fused = np.zeros_like(gray, dtype=bool)
  160. for m in masks:
  161. fused |= m
  162. # ── 连通域过滤 ──
  163. if min_component_area > 0 and np.any(fused):
  164. n_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
  165. fused.astype(np.uint8), connectivity=8
  166. )
  167. filtered = np.zeros_like(fused)
  168. for i in range(1, n_labels):
  169. if stats[i, cv2.CC_STAT_AREA] >= min_component_area:
  170. filtered[labels == i] = True
  171. if np.any(filtered):
  172. fused = filtered
  173. # ── 公章保护 ──
  174. if seal_protect and bgr is not None and bgr.ndim == 3:
  175. seal = _algo._build_seal_protect_mask(bgr)
  176. fused &= ~seal
  177. debug["fused_ratio"] = float(fused.sum() / gray.size)
  178. debug["n_strategies"] = len(masks)
  179. return fused, debug