"""水印 掩膜与去水印算法(由 ocr_utils.watermark_utils 迁入)。""" from __future__ import annotations import json import re from pathlib import Path from typing import Any, Dict, Optional, Tuple, Union import cv2 import numpy as np from loguru import logger from PIL import Image def detect_watermark( image: Union[np.ndarray, Image.Image], midtone_low: int = 100, midtone_high: int = 220, ratio_threshold: float = 0.03, check_diagonal: bool = True, diagonal_angle_range: tuple = (30, 60), ) -> bool: """ 检测图像中是否存在浅色斜向文字水印(银行流水类文档水印检测)。 原理: 1. 将图像转为灰度,提取「中间调」像素(midtone_low ~ midtone_high), 这些像素既不是纯白背景,也不是深黑正文,是浅灰水印的典型范围。 2. 若中间调像素占比超过 ratio_threshold,初步判定存在水印。 3. 若 check_diagonal=True,进一步用 Hough 直线变换验证中间调区域 是否呈现斜向(diagonal_angle_range 度)纹理,以排除灰色背景误报。 Args: image: 输入图像,支持 PIL.Image 或 np.ndarray(BGR/RGB/灰度)。 midtone_low: 中间调下限(默认 100),低于此视为深色正文。 midtone_high: 中间调上限(默认 220),高于此视为纯白背景。 ratio_threshold: 中间调像素占全图比例阈值(默认 0.03 即 3%)。 check_diagonal: 是否进行斜向纹理验证(默认 True)。 diagonal_angle_range: 斜向角度范围(度),默认 (30, 60),含 45° 斜水印。 Returns: True 表示检测到水印,False 表示未检测到。 """ if isinstance(image, Image.Image): pil_img = image.convert('RGB') if image.mode == 'RGBA' else image np_img = np.array(pil_img) gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY) if np_img.ndim == 3 else np_img else: np_img = image gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img midtone_mask = (gray > midtone_low) & (gray < midtone_high) ratio = midtone_mask.sum() / gray.size if ratio < ratio_threshold: return False if not check_diagonal: return True midtone_uint8 = (midtone_mask.astype(np.uint8)) * 255 edges = cv2.Canny(midtone_uint8, 50, 150, apertureSize=3) lines = cv2.HoughLines(edges, rho=1, theta=np.pi / 180, threshold=80) if lines is None: return False low_rad = np.deg2rad(diagonal_angle_range[0]) high_rad = np.deg2rad(diagonal_angle_range[1]) diagonal_count = 0 for line in lines: theta = line[0][1] if low_rad <= theta <= high_rad or (np.pi - high_rad) <= theta <= (np.pi - low_rad): diagonal_count += 1 return diagonal_count > 0 def _local_std_map(gray: np.ndarray, window: int = 5) -> np.ndarray: """局部标准差图(返回值与输入同形状)。""" gray = np.asarray(gray, dtype=np.float32) size = max(3, int(window)) kernel = np.ones((size, size), dtype=np.float32) / (size * size) mean = cv2.filter2D(gray, -1, kernel) sq_mean = cv2.filter2D(gray * gray, -1, kernel) var = sq_mean - mean * mean var = np.maximum(var, 0) return np.sqrt(var) def _line_structuring_kernel(length: int, angle_deg: float) -> np.ndarray: """生成指定角度、长度的线形结构元(用于斜向水印形态学)。""" length = max(3, int(length)) k = np.zeros((length, length), np.uint8) c = length // 2 rad = np.deg2rad(angle_deg) dx = int(round(np.cos(rad) * (c - 1))) dy = int(round(np.sin(rad) * (c - 1))) cv2.line(k, (c - dx, c - dy), (c + dx, c + dy), 1, thickness=1) return k def _line_angle_deg(x1: int, y1: int, x2: int, y2: int) -> float: """线段方向角 [0, 180)(无向)。""" ang = float(np.degrees(np.arctan2(y2 - y1, x2 - x1))) if ang < 0: ang += 180.0 return ang def _angle_in_diagonal_ranges( angle_deg: float, ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((35.0, 55.0), (125.0, 145.0)), ) -> bool: for lo, hi in ranges: if lo <= angle_deg <= hi: return True return False def _angle_distance_deg(a: float, b: float) -> float: """无向角距离 [0, 90]。""" d = abs(float(a) - float(b)) % 180.0 return min(d, 180.0 - d) def _line_length(x1: int, y1: int, x2: int, y2: int) -> float: return float(np.hypot(x2 - x1, y2 - y1)) def _find_dominant_diagonal_angles( segments: list, *, angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)), smooth_sigma: float = 2.0, secondary_peak_ratio: float = 0.35, ) -> Tuple[list, np.ndarray]: """ 按线段长度加权统计角度直方图,取主峰(及次峰)作为本页水印固定方向。 Returns: dominant_angles: 1~2 个主导角度(度) hist_smooth: 长度 180 的平滑直方图 """ hist = np.zeros(180, dtype=np.float64) for x1, y1, x2, y2, ang, length in segments: if not _angle_in_diagonal_ranges(ang, angle_ranges): continue hist[int(ang) % 180] += length if hist.sum() <= 0: return [], hist ksize = max(3, int(smooth_sigma * 4) | 1) hist_smooth = cv2.GaussianBlur( hist.reshape(1, 180).astype(np.float32), (ksize, 1), smooth_sigma ).flatten().astype(np.float64) peaks: list = [] for lo, hi in angle_ranges: lo_i, hi_i = int(lo), int(hi) sub = hist_smooth[lo_i : hi_i + 1] if sub.size == 0 or sub.max() <= 0: continue peak_ang = lo_i + int(sub.argmax()) peaks.append((peak_ang, float(sub.max()))) if not peaks: return [], hist_smooth peaks.sort(key=lambda x: -x[1]) dominant: list = [peaks[0][0]] for ang, val in peaks[1:]: if val >= peaks[0][1] * secondary_peak_ratio: if all(_angle_distance_deg(ang, d) > 15 for d in dominant): dominant.append(ang) return dominant, hist_smooth def _render_angle_histogram(hist: np.ndarray, dominant_angles: list) -> np.ndarray: """角度直方图 debug 图(BGR)。""" h_img, w_img = 120, 360 canvas = np.ones((h_img, w_img, 3), dtype=np.uint8) * 255 if hist.max() <= 0: return canvas norm = (hist / hist.max() * (h_img - 20)).astype(np.int32) for i, h in enumerate(norm): x = int(i * (w_img - 1) / 179) cv2.line(canvas, (x, h_img - 10), (x, h_img - 10 - int(h)), (180, 180, 180), 1) for ang in dominant_angles: x = int(ang * (w_img - 1) / 179) cv2.line(canvas, (x, 0), (x, h_img - 1), (0, 0, 255), 2) cv2.putText(canvas, "angle (deg)", (w_img // 2 - 40, h_img - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 0), 1) return canvas def _build_diag_hough_region_mask( gray: np.ndarray, *, midtone_low: int = 200, midtone_high: int = 254, canny_low: int = 30, canny_high: int = 100, hough_threshold: int = 30, min_line_length: int = 40, max_line_gap: int = 15, angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)), angle_tolerance: float = 5.0, use_angle_statistics: bool = True, secondary_peak_ratio: float = 0.35, min_length_percentile: float = 25.0, line_thickness: int = 10, band_dilate_radius: int = 12, ) -> Tuple[np.ndarray, Dict[str, Any]]: """ 方案 C:Canny + HoughLinesP + 角度直方图统计主峰,仅保留与本页水印方向一致的线段。 """ gray_u8 = np.asarray(gray, dtype=np.uint8) band = ((gray_u8 >= midtone_low) & (gray_u8 < midtone_high)).astype(np.uint8) * 255 edges = cv2.Canny(band, int(canny_low), int(canny_high), apertureSize=3) lines_p = cv2.HoughLinesP( edges, rho=1, theta=np.pi / 180, threshold=int(hough_threshold), minLineLength=int(min_line_length), maxLineGap=int(max_line_gap), ) line_mask = np.zeros_like(gray_u8, dtype=np.uint8) lines_all_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR) lines_filt_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR) diag_candidates: list = [] total_lines = 0 if lines_p is not None: for seg in lines_p: x1, y1, x2, y2 = [int(v) for v in seg[0]] total_lines += 1 ang = _line_angle_deg(x1, y1, x2, y2) length = _line_length(x1, y1, x2, y2) if not _angle_in_diagonal_ranges(ang, angle_ranges): continue diag_candidates.append((x1, y1, x2, y2, ang, length)) cv2.line(lines_all_bgr, (x1, y1), (x2, y2), (128, 128, 128), 1) dominant_angles: list = [] hist_smooth = np.zeros(180, dtype=np.float64) if use_angle_statistics and diag_candidates: dominant_angles, hist_smooth = _find_dominant_diagonal_angles( diag_candidates, angle_ranges=angle_ranges, secondary_peak_ratio=secondary_peak_ratio, ) def _angle_matches(ang: float) -> bool: if not use_angle_statistics or not dominant_angles: return True return any(_angle_distance_deg(ang, d) <= angle_tolerance for d in dominant_angles) angle_matched = [ s for s in diag_candidates if _angle_matches(s[4]) ] if angle_matched and min_length_percentile > 0: lengths = np.array([s[5] for s in angle_matched], dtype=np.float32) len_th = float(np.percentile(lengths, min_length_percentile)) angle_matched = [s for s in angle_matched if s[5] >= len_th] matched_keys = {(s[0], s[1], s[2], s[3]) for s in angle_matched} kept_lines: list = [] for x1, y1, x2, y2, ang, _length in angle_matched: kept_lines.append((x1, y1, x2, y2, ang)) cv2.line(line_mask, (x1, y1), (x2, y2), 255, thickness=int(line_thickness)) cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 0, 255), 2) for x1, y1, x2, y2, _ang, _length in diag_candidates: if (x1, y1, x2, y2) not in matched_keys: cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 180, 255), 1) geom = line_mask > 0 if band_dilate_radius > 0 and np.any(geom): k = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (band_dilate_radius * 2 + 1, band_dilate_radius * 2 + 1) ) geom = cv2.dilate(line_mask, k) > 0 info: Dict[str, Any] = { "hough_total_lines": total_lines, "hough_diag_candidates": len(diag_candidates), "hough_kept_lines": len(kept_lines), "dominant_angles": dominant_angles, "angle_tolerance": angle_tolerance, "geom_mask_ratio": float(geom.sum() / gray_u8.size), "hough_lines_bgr": lines_filt_bgr, "hough_lines_all_bgr": lines_all_bgr, "angle_histogram_bgr": _render_angle_histogram(hist_smooth, dominant_angles), } return geom, info def _compute_block_orientation_debug_maps( gray: np.ndarray, *, block_size: int = 48, ) -> Tuple[np.ndarray, np.ndarray]: """分块 diag/hv 弱边缘占比图(仅 debug 热力图,0~1 float)。""" gray_f = np.asarray(gray, dtype=np.float32) bs = max(4, int(block_size)) h_blocks = gray_f.shape[0] // bs w_blocks = gray_f.shape[1] // bs if h_blocks == 0 or w_blocks == 0: z = np.zeros_like(gray_f, dtype=np.float32) return z, z ph, pw = h_blocks * bs, w_blocks * bs gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3) gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3) mag = np.sqrt(gx * gx + gy * gy) ori = np.arctan2(gy, gx) * 180.0 / np.pi diag = ( ((ori > 25) & (ori < 65)) | ((ori > 115) & (ori < 155)) | ((ori > -155) & (ori < -115)) | ((ori > -65) & (ori < -25)) ) hv = ( ((ori > -20) & (ori < 20)) | ((ori > 160) | (ori < -160)) | ((ori > 70) & (ori < 110)) | ((ori > -110) & (ori < -70)) ) weak = (mag > 1) & (mag < 15) def _to_blocks(arr: np.ndarray) -> np.ndarray: return ( arr[:ph, :pw] .reshape(h_blocks, bs, w_blocks, bs) .transpose(0, 2, 1, 3) .reshape(h_blocks, w_blocks, -1) ) b_diag = _to_blocks(diag) b_hv = _to_blocks(hv) b_weak = _to_blocks(weak) diag_weak = np.sum(b_diag & b_weak, axis=2) hv_weak = np.sum(b_hv & b_weak, axis=2) total_weak = np.sum(b_weak, axis=2) with np.errstate(divide="ignore", invalid="ignore"): diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0).astype(np.float32) hv_ratio = np.where(total_weak > 0, hv_weak / total_weak, 0.0).astype(np.float32) diag_up = np.repeat(np.repeat(diag_ratio, bs, axis=0), bs, axis=1) hv_up = np.repeat(np.repeat(hv_ratio, bs, axis=0), bs, axis=1) diag_full = np.zeros_like(gray_f, dtype=np.float32) hv_full = np.zeros_like(gray_f, dtype=np.float32) diag_full[:ph, :pw] = diag_up hv_full[:ph, :pw] = hv_up return diag_full, hv_full def render_ratio_heatmap(ratio_map: np.ndarray) -> np.ndarray: """将 0~1 浮点占比图转为 BGR 热力图。""" r = np.clip(np.asarray(ratio_map, dtype=np.float32), 0.0, 1.0) u8 = (r * 255).astype(np.uint8) return cv2.applyColorMap(u8, cv2.COLORMAP_JET) def save_watermark_mask_debug_layers( image: np.ndarray, output_dir: Union[str, Path], stem: str, debug: Dict[str, Any], *, image_format: str = "png", ) -> Dict[str, str]: """保存分层 debug 图(方案 D)。""" out_dir = Path(output_dir) out_dir.mkdir(parents=True, exist_ok=True) fmt = (image_format or "png").lstrip(".") paths: Dict[str, str] = {} def _save_overlay(name: str, mask: Optional[np.ndarray], color=(0, 0, 255)) -> None: if mask is None or not np.any(mask): return from ocr_utils.watermark.removal import render_watermark_mask_overlay ov = render_watermark_mask_overlay(image, mask, color=color) p = out_dir / f"{stem}_{name}.{fmt}" cv2.imwrite(str(p), cv2.cvtColor(ov, cv2.COLOR_RGB2BGR) if ov.shape[2] == 3 else ov) paths[name] = str(p) _save_overlay("wm_candidate_overlay", debug.get("wm_candidate")) _save_overlay("geom_region_overlay", debug.get("geom_region"), color=(0, 180, 255)) _save_overlay("geom_candidate_overlay", debug.get("geom_candidate"), color=(0, 255, 0)) _save_overlay("wm_mask_overlay", debug.get("wm_mask"), color=(255, 0, 0)) hough_bgr = debug.get("hough_lines_bgr") if hough_bgr is not None: p = out_dir / f"{stem}_hough_lines.{fmt}" cv2.imwrite(str(p), hough_bgr) paths["hough_lines"] = str(p) hough_all = debug.get("hough_lines_all_bgr") if hough_all is not None: p = out_dir / f"{stem}_hough_lines_all.{fmt}" cv2.imwrite(str(p), hough_all) paths["hough_lines_all"] = str(p) angle_hist = debug.get("angle_histogram_bgr") if angle_hist is not None: p = out_dir / f"{stem}_angle_histogram.{fmt}" cv2.imwrite(str(p), angle_hist) paths["angle_histogram"] = str(p) diag_hm = debug.get("diag_ratio_heatmap") if diag_hm is not None: p = out_dir / f"{stem}_diag_ratio_heatmap.{fmt}" cv2.imwrite(str(p), diag_hm) paths["diag_ratio_heatmap"] = str(p) hv_hm = debug.get("hv_ratio_heatmap") if hv_hm is not None: p = out_dir / f"{stem}_hv_ratio_heatmap.{fmt}" cv2.imwrite(str(p), hv_hm) paths["hv_ratio_heatmap"] = str(p) return paths def _build_diag_region_mask( gray: np.ndarray, *, block_size: int = 48, diag_ratio_thresh: float = 0.20, light_gray_thresh: int = 238, light_ratio_thresh: float = 0.10, min_edge_count: int = 10, dilate_radius: int = 3, ) -> np.ndarray: """ 分块梯度方向检测:返回对角线方向纹理占优的区域掩膜。 原理:水印是45°斜向字符,其梯度主方向在30-60°和120-150°。 分块统计该方向弱边缘占比,高频块标记为水印候选区域。 Returns: bool ndarray, 与 gray 同形状,True=疑似斜向水印区域。 """ gray_f = np.asarray(gray, dtype=np.float32) img_h, img_w = gray_f.shape bs = max(4, int(block_size)) # Sobel 梯度 gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3) gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3) mag = np.sqrt(gx * gx + gy * gy) ori = np.arctan2(gy, gx) * 180.0 / np.pi # 对角线方向 (±45° 附近,即梯度 30-65° / 115-155°) diag = ( ((ori > 25) & (ori < 65)) | ((ori > 115) & (ori < 155)) | ((ori > -155) & (ori < -115)) | ((ori > -65) & (ori < -25)) ) h_blocks = img_h // bs w_blocks = img_w // bs if h_blocks == 0 or w_blocks == 0: return np.zeros_like(gray, dtype=bool) ph, pw = h_blocks * bs, w_blocks * bs # 分块统计 def _to_blocks(arr: np.ndarray) -> np.ndarray: return arr[:ph, :pw].reshape(h_blocks, bs, w_blocks, bs).transpose(0, 2, 1, 3).reshape(h_blocks, w_blocks, -1) block_mag = _to_blocks(mag) block_diag = _to_blocks(diag) block_gray = _to_blocks(gray_f) weak = (block_mag > 1) & (block_mag < 15) diag_weak = np.sum(block_diag & weak, axis=2) total_weak = np.sum(weak, axis=2) with np.errstate(divide="ignore", invalid="ignore"): diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0) light_ratio = np.mean(block_gray >= light_gray_thresh, axis=2) wm_blocks = ( (diag_ratio > diag_ratio_thresh) & (light_ratio > light_ratio_thresh) & (total_weak > min_edge_count) ) # 展开为像素掩膜 wm_block_mask = np.repeat(np.repeat(wm_blocks, bs, axis=0), bs, axis=1) full_mask = np.zeros(gray_f.shape, dtype=bool) full_mask[:ph, :pw] = wm_block_mask if dilate_radius > 0: k = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1) ) full_mask = cv2.dilate(full_mask.astype(np.uint8), k) > 0 return full_mask def _build_seal_protect_mask( bgr: np.ndarray, *, hue_high: int = 15, sat_min: int = 40, value_min: int = 30, ) -> np.ndarray: """红色/公章区域保护掩膜(True=保护,不置白)。""" hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV) lower1 = np.array([0, sat_min, value_min], dtype=np.uint8) upper1 = np.array([hue_high, 255, 255], dtype=np.uint8) lower2 = np.array([170, sat_min, value_min], dtype=np.uint8) upper2 = np.array([180, 255, 255], dtype=np.uint8) m1 = cv2.inRange(hsv, lower1, upper1) m2 = cv2.inRange(hsv, lower2, upper2) m2 = cv2.inRange(hsv, lower2, upper2) return (m1 > 0) | (m2 > 0) def _build_text_edge_protect( gray: np.ndarray, *, edge_window: int = 5, edge_std_thresh: float = 6.0, dilate_radius: int = 1, ) -> np.ndarray: """基于局部方差的笔画边缘保护掩膜(True=保护,不置白)。""" local_std = _local_std_map(gray, window=edge_window) edge_mask = local_std >= edge_std_thresh if dilate_radius > 0: k = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1) ) edge_mask = cv2.dilate(edge_mask.astype(np.uint8), k) > 0 return edge_mask.astype(bool) def _build_watermark_mask_light_on_white( gray: np.ndarray, *, bgr: Optional[np.ndarray] = None, light_gray_low: int = 236, light_gray_high: int = 253, whiten_gray_low: int = 200, text_protect_gray_max: int = 130, text_protect_percentile: Optional[float] = None, background_threshold: int = 248, morph_close_kernel: int = 0, morph_close_iter: int = 1, morph_dilate_kernel: int = 0, morph_dilate_iter: int = 1, min_component_area: int = 200, low_variance_thresh: float = 0.0, edge_window: int = 5, direction_filter: str = "hough", debug_block_maps: bool = True, debug_block_size: int = 48, hough_midtone_low: int = 200, hough_midtone_high: int = 254, hough_canny_low: int = 30, hough_canny_high: int = 100, hough_threshold: int = 25, hough_min_line_length: int = 35, hough_max_line_gap: int = 18, hough_line_thickness: int = 12, hough_band_dilate_radius: int = 14, hough_angle_tolerance: float = 5.0, hough_use_angle_statistics: bool = True, hough_secondary_peak_ratio: float = 0.35, hough_min_length_percentile: float = 25.0, diag_block_size: int = 0, diag_ratio_thresh: float = 0.20, diag_light_ratio_thresh: float = 0.10, diag_min_edge_count: int = 10, diag_dilate_radius: int = 3, seal_protect: bool = True, seal_hue_high: int = 15, seal_sat_min: int = 40, ) -> Tuple[np.ndarray, Dict[str, Any]]: """ 白底流水水印掩膜(方案 C + E)。 1. Hough 斜向线段 → geom_region(几何限定区域) 2. wm_candidate = 浅色带且非正文保护 3. wm_mask = geom_region(置白区域由几何约束;实际白化时再 g>=light_gray_low) 4. debug 输出 candidate / geom / 交集 / 热力图 """ gray_arr = np.asarray(gray) bg_th = int(background_threshold) low = int(light_gray_low) high = int(light_gray_high) if text_protect_gray_max > 0: t_protect = float(text_protect_gray_max) else: dark = gray_arr[gray_arr < min(130, bg_th)] if dark.size > 0 and text_protect_percentile is not None: t_protect = float(np.percentile(dark, text_protect_percentile)) else: t_protect = 120.0 text_protect = gray_arr <= t_protect low = max(low, int(t_protect) + 25) wm_candidate = (gray_arr >= low) & (gray_arr < high) & (~text_protect) direction = (direction_filter or "hough").lower().strip() hough_info: Dict[str, Any] = {} geom_region = np.zeros_like(gray_arr, dtype=bool) if direction == "hough": geom_region, hough_info = _build_diag_hough_region_mask( gray_arr, midtone_low=hough_midtone_low, midtone_high=hough_midtone_high, canny_low=hough_canny_low, canny_high=hough_canny_high, hough_threshold=hough_threshold, min_line_length=hough_min_line_length, max_line_gap=hough_max_line_gap, angle_tolerance=hough_angle_tolerance, use_angle_statistics=hough_use_angle_statistics, secondary_peak_ratio=hough_secondary_peak_ratio, min_length_percentile=hough_min_length_percentile, line_thickness=hough_line_thickness, band_dilate_radius=hough_band_dilate_radius, ) elif diag_block_size > 0: geom_region = _build_diag_region_mask( gray_arr, block_size=diag_block_size, diag_ratio_thresh=diag_ratio_thresh, light_gray_thresh=low, light_ratio_thresh=diag_light_ratio_thresh, min_edge_count=diag_min_edge_count, dilate_radius=diag_dilate_radius, ) geom_candidate = geom_region & wm_candidate wm_mask = geom_region.copy() if min_component_area > 0 and np.any(wm_mask): n_labels, labels, stats, _ = cv2.connectedComponentsWithStats( wm_mask.astype(np.uint8), connectivity=8 ) filtered = np.zeros_like(wm_mask) for i in range(1, n_labels): if stats[i, cv2.CC_STAT_AREA] >= min_component_area: filtered[labels == i] = True if np.any(filtered): wm_mask = filtered elif np.any(geom_region): wm_mask = geom_region seal_mask = np.zeros_like(wm_mask, dtype=bool) if seal_protect and bgr is not None and bgr.ndim == 3: seal_mask = _build_seal_protect_mask( bgr, hue_high=seal_hue_high, sat_min=seal_sat_min ) wm_mask &= ~seal_mask midtone = (gray_arr >= low) & (gray_arr < high) debug: Dict[str, Any] = { "mask_mode": "light_on_white", "direction_filter": direction, "light_gray_low": low, "light_gray_high": high, "midtone_ratio": float(midtone.sum() / gray_arr.size), "wm_candidate_ratio": float(wm_candidate.sum() / gray_arr.size), "geom_mask_ratio": float(geom_region.sum() / gray_arr.size), "geom_candidate_ratio": float(geom_candidate.sum() / gray_arr.size), "wm_mask_ratio": float(wm_mask.sum() / gray_arr.size), "T_protect": t_protect, "text_protect_gray_max": text_protect_gray_max, "text_protect": text_protect, "seal_protect": seal_mask, "wm_candidate": wm_candidate, "geom_region": geom_region, "geom_candidate": geom_candidate, "diag_region": geom_region, "wm_mask": wm_mask, "whiten_gray_low": int(whiten_gray_low), "hough_lines_bgr": hough_info.get("hough_lines_bgr"), "hough_lines_all_bgr": hough_info.get("hough_lines_all_bgr"), "angle_histogram_bgr": hough_info.get("angle_histogram_bgr"), "dominant_angles": hough_info.get("dominant_angles", []), "hough_kept_lines": hough_info.get("hough_kept_lines", 0), "hough_diag_candidates": hough_info.get("hough_diag_candidates", 0), "hough_total_lines": hough_info.get("hough_total_lines", 0), } if debug_block_maps: bs = debug_block_size if debug_block_size > 0 else 48 diag_map, hv_map = _compute_block_orientation_debug_maps(gray_arr, block_size=bs) debug["diag_ratio_heatmap"] = render_ratio_heatmap(diag_map) debug["hv_ratio_heatmap"] = render_ratio_heatmap(hv_map) return wm_mask, debug def build_watermark_mask( gray: np.ndarray, *, bgr: Optional[np.ndarray] = None, mask_mode: str = "diagonal_midtone", light_gray_low: int = 236, light_gray_high: int = 253, whiten_gray_low: int = 200, text_protect_gray_max: int = 130, morph_close_kernel: int = 0, morph_close_iter: int = 1, morph_dilate_kernel: int = 0, morph_dilate_iter: int = 1, low_variance_thresh: float = 0.0, edge_window: int = 5, direction_filter: str = "hough", debug_block_maps: bool = True, debug_block_size: int = 48, hough_midtone_low: int = 200, hough_midtone_high: int = 254, hough_canny_low: int = 30, hough_canny_high: int = 100, hough_threshold: int = 25, hough_min_line_length: int = 35, hough_max_line_gap: int = 18, hough_line_thickness: int = 12, hough_band_dilate_radius: int = 14, hough_angle_tolerance: float = 5.0, hough_use_angle_statistics: bool = True, hough_secondary_peak_ratio: float = 0.35, hough_min_length_percentile: float = 25.0, diag_block_size: int = 0, diag_ratio_thresh: float = 0.20, diag_light_ratio_thresh: float = 0.10, diag_min_edge_count: int = 10, diag_dilate_radius: int = 3, # diagonal_midtone 参数 midtone_low: int = 100, midtone_high: int = 220, remove_horizontal_vertical: bool = True, diagonal_enhance: bool = True, diagonal_kernel_length: int = 25, horizontal_kernel_length: int = 35, vertical_kernel_length: int = 35, morph_open_kernel: int = 2, dmorph_close_kernel: int = 3, min_component_area: int = 200, text_protect_percentile: float = 10.0, background_threshold: int = 248, seal_protect: bool = True, seal_hue_high: int = 15, seal_sat_min: int = 40, ) -> Tuple[np.ndarray, Dict[str, Any]]: """ 构建水印掩膜 wm_mask(True=疑似水印像素)。 mask_mode: light_on_white — Hough 斜向几何带 + 浅色白化(方案 C/E) diagonal_midtone — 中间调 + 斜向形态学(旧逻辑) """ gray = np.asarray(gray) if gray.ndim != 2: raise ValueError("build_watermark_mask expects single-channel grayscale") mode = (mask_mode or "light_on_white").lower().strip() if mode == "light_on_white": return _build_watermark_mask_light_on_white( gray, bgr=bgr, light_gray_low=light_gray_low, light_gray_high=light_gray_high, whiten_gray_low=whiten_gray_low, text_protect_gray_max=text_protect_gray_max, text_protect_percentile=text_protect_percentile, background_threshold=background_threshold, morph_close_kernel=morph_close_kernel, morph_close_iter=morph_close_iter, morph_dilate_kernel=morph_dilate_kernel, morph_dilate_iter=morph_dilate_iter, low_variance_thresh=low_variance_thresh, edge_window=edge_window, min_component_area=min_component_area, direction_filter=direction_filter, debug_block_maps=debug_block_maps, debug_block_size=debug_block_size, hough_midtone_low=hough_midtone_low, hough_midtone_high=hough_midtone_high, hough_canny_low=hough_canny_low, hough_canny_high=hough_canny_high, hough_threshold=hough_threshold, hough_min_line_length=hough_min_line_length, hough_max_line_gap=hough_max_line_gap, hough_line_thickness=hough_line_thickness, hough_band_dilate_radius=hough_band_dilate_radius, hough_angle_tolerance=hough_angle_tolerance, hough_use_angle_statistics=hough_use_angle_statistics, hough_secondary_peak_ratio=hough_secondary_peak_ratio, hough_min_length_percentile=hough_min_length_percentile, diag_block_size=diag_block_size, diag_ratio_thresh=diag_ratio_thresh, diag_light_ratio_thresh=diag_light_ratio_thresh, diag_min_edge_count=diag_min_edge_count, diag_dilate_radius=diag_dilate_radius, seal_protect=seal_protect, seal_hue_high=seal_hue_high, seal_sat_min=seal_sat_min, ) midtone = (gray > midtone_low) & (gray < midtone_high) mid_u8 = (midtone.astype(np.uint8)) * 255 horiz = np.zeros_like(midtone, dtype=bool) vert = np.zeros_like(midtone, dtype=bool) if remove_horizontal_vertical: kh = cv2.getStructuringElement( cv2.MORPH_RECT, (max(3, horizontal_kernel_length), 1) ) kv = cv2.getStructuringElement( cv2.MORPH_RECT, (1, max(3, vertical_kernel_length)) ) horiz = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kh) > 0 vert = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kv) > 0 # 中间调去掉明显横竖线(保留斜向水印) candidate = midtone & ~(horiz | vert) if diagonal_enhance: k45 = _line_structuring_kernel(diagonal_kernel_length, 45) k135 = _line_structuring_kernel(diagonal_kernel_length, 135) d45 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k45) > 0 d135 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k135) > 0 direction = d45 | d135 dilate_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7)) near_diag = cv2.dilate(direction.astype(np.uint8), dilate_k) > 0 # 斜向结构足够时收窄到斜向附近;否则保留「中间调减横竖」结果 if near_diag.sum() > gray.size * 0.001: candidate = candidate & near_diag cand_u8 = (candidate.astype(np.uint8)) * 255 if morph_open_kernel > 0: k_open = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (morph_open_kernel, morph_open_kernel) ) cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_OPEN, k_open) if dmorph_close_kernel > 0: k_close = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (dmorph_close_kernel, dmorph_close_kernel) ) cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_CLOSE, k_close) wm_mask = cand_u8 > 0 if min_component_area > 0: n_labels, labels, stats, _ = cv2.connectedComponentsWithStats( wm_mask.astype(np.uint8), connectivity=8 ) filtered = np.zeros_like(wm_mask) for i in range(1, n_labels): if stats[i, cv2.CC_STAT_AREA] >= min_component_area: filtered[labels == i] = True wm_mask = filtered non_bg = gray[gray < background_threshold] if non_bg.size > 0: t_protect = float(np.percentile(non_bg, text_protect_percentile)) else: t_protect = 85.0 t_protect = max(t_protect, float(midtone_low)) text_protect = gray <= t_protect midtone_ratio = float(midtone.sum() / gray.size) wm_ratio = float(wm_mask.sum() / gray.size) # 掩膜过小:回退为「中间调减横竖」或整块中间调(满版斜纹水印常见) min_wm_ratio = max(0.005, midtone_ratio * 0.12) if wm_ratio < min_wm_ratio: relaxed = midtone & ~(horiz | vert) & (~text_protect) if relaxed.sum() / gray.size < min_wm_ratio: relaxed = midtone & (~text_protect) wm_mask = relaxed wm_ratio = float(wm_mask.sum() / gray.size) seal_mask = np.zeros_like(wm_mask, dtype=bool) if seal_protect and bgr is not None and bgr.ndim == 3: seal_mask = _build_seal_protect_mask( bgr, hue_high=seal_hue_high, sat_min=seal_sat_min ) debug: Dict[str, Any] = { "mask_mode": "diagonal_midtone", "midtone_ratio": midtone_ratio, "wm_mask_ratio": wm_ratio, "T_protect": t_protect, "text_protect": text_protect, "seal_protect": seal_mask, "midtone_mask": midtone, "wm_mask": wm_mask, } return wm_mask, debug def remove_watermark_masked_adaptive( gray: np.ndarray, *, bgr: Optional[np.ndarray] = None, mask_cfg: Optional[Dict[str, Any]] = None, adaptive_cfg: Optional[Dict[str, Any]] = None, threshold_fallback: int = 175, morph_close_kernel: int = 0, ) -> Tuple[np.ndarray, Dict[str, Any]]: """ 掩膜内置白(whiten_mode=mask_fill)或掩膜内动态阈值(threshold_in_mask)。 掩膜为空时回退全局 threshold_fallback。 """ gray = np.asarray(gray).copy() mcfg: Dict[str, Any] = { "mask_mode": "light_on_white", "light_gray_low": 236, "light_gray_high": 253, "whiten_gray_low": 200, "text_protect_gray_max": 130, "morph_close_kernel": 0, "morph_close_iter": 1, "morph_dilate_kernel": 0, "morph_dilate_iter": 1, "low_variance_thresh": 0.0, "edge_window": 5, "min_component_area": 200, "direction_filter": "hough", "debug_block_maps": True, "debug_block_size": 48, "hough_midtone_low": 200, "hough_midtone_high": 254, "hough_canny_low": 30, "hough_canny_high": 100, "hough_threshold": 25, "hough_min_line_length": 35, "hough_max_line_gap": 18, "hough_line_thickness": 12, "hough_band_dilate_radius": 14, "hough_angle_tolerance": 5.0, "hough_use_angle_statistics": True, "hough_secondary_peak_ratio": 0.35, "hough_min_length_percentile": 25.0, "diag_block_size": 0, "diag_ratio_thresh": 0.20, "diag_light_ratio_thresh": 0.10, "diag_min_edge_count": 10, "diag_dilate_radius": 3, "midtone_low": 100, "midtone_high": 220, "remove_horizontal_vertical": True, "diagonal_enhance": True, "diagonal_kernel_length": 25, "horizontal_kernel_length": 35, "vertical_kernel_length": 35, "morph_open_kernel": 2, "dmorph_close_kernel": 3, "text_protect_percentile": 10.0, "background_threshold": 248, "seal_protect": True, "seal_hue_high": 15, "seal_sat_min": 40, } mcfg.update(mask_cfg or {}) mask_mode = str(mcfg.get("mask_mode", "light_on_white")).lower().strip() # light_on_white 默认 mask_fill acfg: Dict[str, Any] = { "whiten_mode": None, "text_percentile": 10.0, "watermark_percentile": 88.0, "background_percentile": 95.0, "background_threshold": 248, "wm_margin": 12, "text_protect_max": 120, } acfg.update(adaptive_cfg or {}) whiten_mode = acfg.get("whiten_mode") if not whiten_mode: whiten_mode = ( "mask_fill" if mask_mode == "light_on_white" else "threshold_in_mask" ) whiten_mode = str(whiten_mode).lower().strip() wm_mask, debug = build_watermark_mask(gray, bgr=bgr, **mcfg) if not np.any(wm_mask): cleaned = gray.copy() cleaned[gray > threshold_fallback] = 255 debug["mode"] = "fallback_threshold" debug["threshold_fallback"] = threshold_fallback if morph_close_kernel > 0: kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8) cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel) return cleaned, debug bg_th = int(acfg["background_threshold"]) bg_pixels = gray[gray >= bg_th] if bg_pixels.size > 0: b_level = float(np.percentile(bg_pixels, acfg["background_percentile"])) else: b_level = 250.0 if mask_mode == "light_on_white": t_protect = float(debug.get("T_protect", 150.0)) else: non_bg = gray[gray < bg_th] if non_bg.size > 0: t_protect = float(np.percentile(non_bg, acfg["text_percentile"])) else: t_protect = float(debug.get("T_protect", 85.0)) t_protect = min(t_protect, float(acfg["text_protect_max"])) t_protect = max(t_protect, float(mcfg.get("midtone_low", 100))) text_protect = debug["text_protect"] seal_protect = debug["seal_protect"] t_wm: Optional[float] = None if whiten_mode == "mask_fill": # 几何带内:g>=whiten_gray_low 置白;g<=130 正文硬保护(方案 E) wm_gray_low = float( mcfg.get("whiten_gray_low", debug.get("whiten_gray_low", 200)) ) to_white = ( wm_mask & (gray >= wm_gray_low) & (gray < int(mcfg.get("light_gray_high", 254))) & (~text_protect) & (~seal_protect) ) else: mask_vals = gray[wm_mask] if mask_vals.size > 0: t_wm = float(np.percentile(mask_vals, acfg["watermark_percentile"])) else: t_wm = t_protect + 0.45 * (b_level - t_protect) margin = float(acfg["wm_margin"]) t_wm = max(t_wm, t_protect + margin) t_wm = min(t_wm, b_level - 3.0) t_wm = min(t_wm, float(mcfg.get("midtone_high", 220)) - 5.0) to_white = wm_mask & (gray >= t_wm) & (~text_protect) & (~seal_protect) cleaned = gray.copy() cleaned[to_white] = 255 if morph_close_kernel > 0: kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8) cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel) debug.update( { "mode": "masked_adaptive", "mask_mode": mask_mode, "whiten_mode": whiten_mode, "T_wm": t_wm, "T_protect": t_protect, "B_level": b_level, "white_pixel_ratio": float(to_white.sum() / gray.size), "threshold_fallback": threshold_fallback, } ) return cleaned, debug def _image_to_gray_and_bgr( image: Union[np.ndarray, Image.Image], ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """统一为灰度 + 可选 BGR(用于掩膜公章保护)。""" if isinstance(image, Image.Image): pil_img = image.convert("RGB") if image.mode == "RGBA" else image np_img = np.array(pil_img) np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR) else: np_img = image.copy() if np_img.ndim == 3: bgr = np_img gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) else: bgr = None gray = np_img return gray, bgr