""" 方案3:多策略融合水印 mask。 在现有 light_on_white / diagonal_midtone 基础上新增背景差异策略, 三种策略结果做 OR 融合,覆盖更多水印类型。 用法: from watermark_lab.fused_mask import build_fused_watermark_mask mask, debug = build_fused_watermark_mask(gray, bgr=bgr) """ from __future__ import annotations import sys from pathlib import Path from typing import Any, Dict, Optional import cv2 import numpy as np _repo_root = Path(__file__).resolve().parents[3] if str(_repo_root) not in sys.path: sys.path.insert(0, str(_repo_root)) import ocr_utils.watermark.algorithms as _algo # ── 策略C:背景差异残差 ────────────────────────────────────────── def _build_background_diff_mask( gray: np.ndarray, *, median_kernel: int = 31, diff_low: float = 6.0, diff_high: float = 50.0, morph_open_kernel: int = 3, dilate_radius: int = 0, ) -> np.ndarray: """ 大核中值滤波估计背景 → 残差 → 提取半透明水印纹理。 原理: - 大核 medianBlur 抹掉文字(高频),保留背景(低频) + 水印(中低频) - 残差 |original - bg| 里,正文差异大(>50),水印差异中等(6~50),纯背景差异小(<6) - 取 6~50 范围的残差作为水印候选 Args: gray: 灰度图 ndarray (H, W) median_kernel: 中值滤波核大小,越大抹文字越干净但水印也可能被模糊 diff_low: 残差下限 diff_high: 残差上限 morph_open_kernel: 形态学开运算核大小(去噪点) dilate_radius: 膨胀半径(连接碎片水印) Returns: bool ndarray (H, W),True=疑似水印 """ gray_f = np.asarray(gray, dtype=np.float32) ksize = max(3, int(median_kernel)) | 1 # 确保奇数 bg = cv2.medianBlur(gray, ksize).astype(np.float32) diff = cv2.absdiff(gray_f, bg) mask = (diff > diff_low) & (diff < diff_high) if morph_open_kernel > 0 and np.any(mask): k = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (morph_open_kernel, morph_open_kernel) ) mask = cv2.morphologyEx(mask.astype(np.uint8), cv2.MORPH_OPEN, k) > 0 if dilate_radius > 0 and np.any(mask): k = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1) ) mask = cv2.dilate(mask.astype(np.uint8), k) > 0 return mask # ── 融合主函数 ────────────────────────────────────────────────── def build_fused_watermark_mask( gray: np.ndarray, *, bgr: Optional[np.ndarray] = None, # ── 策略A: light_on_white ── a_enabled: bool = True, a_light_gray_low: int = 220, a_light_gray_high: int = 254, a_direction_filter: str = "hough", a_text_protect_gray_max: int = 130, a_min_component_area: int = 200, # ── 策略B: diagonal_midtone ── b_enabled: bool = True, b_midtone_low: int = 100, b_midtone_high: int = 230, b_background_threshold: int = 248, # ── 策略C: 背景差异 ── c_enabled: bool = True, c_median_kernel: int = 31, c_diff_low: float = 6.0, c_diff_high: float = 50.0, c_morph_open_kernel: int = 3, c_dilate_radius: int = 0, # ── 通用后处理 ── min_component_area: int = 200, seal_protect: bool = True, ) -> tuple[np.ndarray, Dict[str, Any]]: """ 多策略融合水印 mask。 三种策略 OR 融合 → 连通域过滤 → 公章保护 → 最终 mask。 Returns: mask: bool ndarray (H, W) debug: 包含每项策略的中间结果和统计信息 """ masks: list[np.ndarray] = [] debug: Dict[str, Any] = {"strategies": {}, "fused_ratio": None} # ── 策略A: light_on_white ── if a_enabled: try: ma, da = _algo.build_watermark_mask( gray, bgr=bgr, mask_mode="light_on_white", light_gray_low=a_light_gray_low, light_gray_high=a_light_gray_high, direction_filter=a_direction_filter, text_protect_gray_max=a_text_protect_gray_max, min_component_area=a_min_component_area, seal_protect=seal_protect, ) masks.append(ma) debug["strategies"]["light_on_white"] = { "ratio": float(ma.sum() / gray.size), **{k: v for k, v in da.items() if not isinstance(v, np.ndarray)}, } except Exception as e: debug["strategies"]["light_on_white"] = {"error": str(e), "ratio": 0.0} # ── 策略B: diagonal_midtone ── if b_enabled: try: mb, db = _algo.build_watermark_mask( gray, bgr=bgr, mask_mode="diagonal_midtone", midtone_low=b_midtone_low, midtone_high=b_midtone_high, background_threshold=b_background_threshold, min_component_area=min_component_area, ) masks.append(mb) debug["strategies"]["diagonal_midtone"] = { "ratio": float(mb.sum() / gray.size), **{k: v for k, v in db.items() if not isinstance(v, np.ndarray)}, } except Exception as e: debug["strategies"]["diagonal_midtone"] = {"error": str(e), "ratio": 0.0} # ── 策略C: 背景差异 ── if c_enabled: try: mc = _build_background_diff_mask( gray, median_kernel=c_median_kernel, diff_low=c_diff_low, diff_high=c_diff_high, morph_open_kernel=c_morph_open_kernel, dilate_radius=c_dilate_radius, ) masks.append(mc) debug["strategies"]["background_diff"] = { "ratio": float(mc.sum() / gray.size), "median_kernel": c_median_kernel, "diff_low": c_diff_low, "diff_high": c_diff_high, } except Exception as e: debug["strategies"]["background_diff"] = {"error": str(e), "ratio": 0.0} # ── OR 融合 ── fused = np.zeros_like(gray, dtype=bool) for m in masks: fused |= m # ── 连通域过滤 ── if min_component_area > 0 and np.any(fused): n_labels, labels, stats, _ = cv2.connectedComponentsWithStats( fused.astype(np.uint8), connectivity=8 ) filtered = np.zeros_like(fused) for i in range(1, n_labels): if stats[i, cv2.CC_STAT_AREA] >= min_component_area: filtered[labels == i] = True if np.any(filtered): fused = filtered # ── 公章保护 ── if seal_protect and bgr is not None and bgr.ndim == 3: seal = _algo._build_seal_protect_mask(bgr) fused &= ~seal debug["fused_ratio"] = float(fused.sum() / gray.size) debug["n_strategies"] = len(masks) return fused, debug