| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- """
- 方案3:多策略融合水印 mask。
- 在现有 light_on_white / diagonal_midtone 基础上新增背景差异策略,
- 三种策略结果做 OR 融合,覆盖更多水印类型。
- 用法:
- from watermark_lab.fused_mask import build_fused_watermark_mask
- mask, debug = build_fused_watermark_mask(gray, bgr=bgr)
- """
- from __future__ import annotations
- import sys
- from pathlib import Path
- from typing import Any, Dict, Optional
- import cv2
- import numpy as np
- _repo_root = Path(__file__).resolve().parents[3]
- if str(_repo_root) not in sys.path:
- sys.path.insert(0, str(_repo_root))
- import ocr_utils.watermark.algorithms as _algo
- # ── 策略C:背景差异残差 ──────────────────────────────────────────
- def _build_background_diff_mask(
- gray: np.ndarray,
- *,
- median_kernel: int = 31,
- diff_low: float = 6.0,
- diff_high: float = 50.0,
- morph_open_kernel: int = 3,
- dilate_radius: int = 0,
- ) -> np.ndarray:
- """
- 大核中值滤波估计背景 → 残差 → 提取半透明水印纹理。
- 原理:
- - 大核 medianBlur 抹掉文字(高频),保留背景(低频) + 水印(中低频)
- - 残差 |original - bg| 里,正文差异大(>50),水印差异中等(6~50),纯背景差异小(<6)
- - 取 6~50 范围的残差作为水印候选
- Args:
- gray: 灰度图 ndarray (H, W)
- median_kernel: 中值滤波核大小,越大抹文字越干净但水印也可能被模糊
- diff_low: 残差下限
- diff_high: 残差上限
- morph_open_kernel: 形态学开运算核大小(去噪点)
- dilate_radius: 膨胀半径(连接碎片水印)
- Returns:
- bool ndarray (H, W),True=疑似水印
- """
- gray_f = np.asarray(gray, dtype=np.float32)
- ksize = max(3, int(median_kernel)) | 1 # 确保奇数
- bg = cv2.medianBlur(gray, ksize).astype(np.float32)
- diff = cv2.absdiff(gray_f, bg)
- mask = (diff > diff_low) & (diff < diff_high)
- if morph_open_kernel > 0 and np.any(mask):
- k = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (morph_open_kernel, morph_open_kernel)
- )
- mask = cv2.morphologyEx(mask.astype(np.uint8), cv2.MORPH_OPEN, k) > 0
- if dilate_radius > 0 and np.any(mask):
- k = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1)
- )
- mask = cv2.dilate(mask.astype(np.uint8), k) > 0
- return mask
- # ── 融合主函数 ──────────────────────────────────────────────────
- def build_fused_watermark_mask(
- gray: np.ndarray,
- *,
- bgr: Optional[np.ndarray] = None,
- # ── 策略A: light_on_white ──
- a_enabled: bool = True,
- a_light_gray_low: int = 220,
- a_light_gray_high: int = 254,
- a_direction_filter: str = "hough",
- a_text_protect_gray_max: int = 130,
- a_min_component_area: int = 200,
- # ── 策略B: diagonal_midtone ──
- b_enabled: bool = True,
- b_midtone_low: int = 100,
- b_midtone_high: int = 230,
- b_background_threshold: int = 248,
- # ── 策略C: 背景差异 ──
- c_enabled: bool = True,
- c_median_kernel: int = 31,
- c_diff_low: float = 6.0,
- c_diff_high: float = 50.0,
- c_morph_open_kernel: int = 3,
- c_dilate_radius: int = 0,
- # ── 通用后处理 ──
- min_component_area: int = 200,
- seal_protect: bool = True,
- ) -> tuple[np.ndarray, Dict[str, Any]]:
- """
- 多策略融合水印 mask。
- 三种策略 OR 融合 → 连通域过滤 → 公章保护 → 最终 mask。
- Returns:
- mask: bool ndarray (H, W)
- debug: 包含每项策略的中间结果和统计信息
- """
- masks: list[np.ndarray] = []
- debug: Dict[str, Any] = {"strategies": {}, "fused_ratio": None}
- # ── 策略A: light_on_white ──
- if a_enabled:
- try:
- ma, da = _algo.build_watermark_mask(
- gray,
- bgr=bgr,
- mask_mode="light_on_white",
- light_gray_low=a_light_gray_low,
- light_gray_high=a_light_gray_high,
- direction_filter=a_direction_filter,
- text_protect_gray_max=a_text_protect_gray_max,
- min_component_area=a_min_component_area,
- seal_protect=seal_protect,
- )
- masks.append(ma)
- debug["strategies"]["light_on_white"] = {
- "ratio": float(ma.sum() / gray.size),
- **{k: v for k, v in da.items() if not isinstance(v, np.ndarray)},
- }
- except Exception as e:
- debug["strategies"]["light_on_white"] = {"error": str(e), "ratio": 0.0}
- # ── 策略B: diagonal_midtone ──
- if b_enabled:
- try:
- mb, db = _algo.build_watermark_mask(
- gray,
- bgr=bgr,
- mask_mode="diagonal_midtone",
- midtone_low=b_midtone_low,
- midtone_high=b_midtone_high,
- background_threshold=b_background_threshold,
- min_component_area=min_component_area,
- )
- masks.append(mb)
- debug["strategies"]["diagonal_midtone"] = {
- "ratio": float(mb.sum() / gray.size),
- **{k: v for k, v in db.items() if not isinstance(v, np.ndarray)},
- }
- except Exception as e:
- debug["strategies"]["diagonal_midtone"] = {"error": str(e), "ratio": 0.0}
- # ── 策略C: 背景差异 ──
- if c_enabled:
- try:
- mc = _build_background_diff_mask(
- gray,
- median_kernel=c_median_kernel,
- diff_low=c_diff_low,
- diff_high=c_diff_high,
- morph_open_kernel=c_morph_open_kernel,
- dilate_radius=c_dilate_radius,
- )
- masks.append(mc)
- debug["strategies"]["background_diff"] = {
- "ratio": float(mc.sum() / gray.size),
- "median_kernel": c_median_kernel,
- "diff_low": c_diff_low,
- "diff_high": c_diff_high,
- }
- except Exception as e:
- debug["strategies"]["background_diff"] = {"error": str(e), "ratio": 0.0}
- # ── OR 融合 ──
- fused = np.zeros_like(gray, dtype=bool)
- for m in masks:
- fused |= m
- # ── 连通域过滤 ──
- if min_component_area > 0 and np.any(fused):
- n_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
- fused.astype(np.uint8), connectivity=8
- )
- filtered = np.zeros_like(fused)
- for i in range(1, n_labels):
- if stats[i, cv2.CC_STAT_AREA] >= min_component_area:
- filtered[labels == i] = True
- if np.any(filtered):
- fused = filtered
- # ── 公章保护 ──
- if seal_protect and bgr is not None and bgr.ndim == 3:
- seal = _algo._build_seal_protect_mask(bgr)
- fused &= ~seal
- debug["fused_ratio"] = float(fused.sum() / gray.size)
- debug["n_strategies"] = len(masks)
- return fused, debug
|