""" 水印处理工具模块 统一管理所有水印检测与去除能力,供整个平台复用: - 图像级(扫描 PDF / 图片): detect_watermark() 检测图像中的斜向文字水印 build_watermark_mask() 构建斜向浅灰水印掩膜(方案 D) remove_watermark_masked_adaptive() 掩膜 + 动态阈值去水印 remove_watermark_from_image() 去除水印,返回灰度图 remove_watermark_from_image_rgb() 去除水印,返回 RGB 图(适合模型输入) enhance_document_contrast() 去水印后对比度/笔画深度恢复 save_watermark_removal_debug() 保存去水印前后对比调试图 - PDF 层级(文字型 PDF,保留可搜索性): scan_pdf_watermark_xobjs() 快速扫描 PDF 是否含水印 XObject(无副作用) remove_txt_pdf_watermark() 从内存 PDF bytes 去除水印,返回新 bytes 或 None """ from __future__ import annotations import json import re from pathlib import Path from typing import Any, Dict, Optional, Tuple, Union import cv2 import numpy as np from loguru import logger from PIL import Image # ───────────────────────────────────────────────────────────────────────────── # 图像级水印检测与去除 # ───────────────────────────────────────────────────────────────────────────── def detect_watermark( image: Union[np.ndarray, Image.Image], midtone_low: int = 100, midtone_high: int = 220, ratio_threshold: float = 0.03, check_diagonal: bool = True, diagonal_angle_range: tuple = (30, 60), ) -> bool: """ 检测图像中是否存在浅色斜向文字水印(银行流水类文档水印检测)。 原理: 1. 将图像转为灰度,提取「中间调」像素(midtone_low ~ midtone_high), 这些像素既不是纯白背景,也不是深黑正文,是浅灰水印的典型范围。 2. 若中间调像素占比超过 ratio_threshold,初步判定存在水印。 3. 若 check_diagonal=True,进一步用 Hough 直线变换验证中间调区域 是否呈现斜向(diagonal_angle_range 度)纹理,以排除灰色背景误报。 Args: image: 输入图像,支持 PIL.Image 或 np.ndarray(BGR/RGB/灰度)。 midtone_low: 中间调下限(默认 100),低于此视为深色正文。 midtone_high: 中间调上限(默认 220),高于此视为纯白背景。 ratio_threshold: 中间调像素占全图比例阈值(默认 0.03 即 3%)。 check_diagonal: 是否进行斜向纹理验证(默认 True)。 diagonal_angle_range: 斜向角度范围(度),默认 (30, 60),含 45° 斜水印。 Returns: True 表示检测到水印,False 表示未检测到。 """ if isinstance(image, Image.Image): pil_img = image.convert('RGB') if image.mode == 'RGBA' else image np_img = np.array(pil_img) gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY) if np_img.ndim == 3 else np_img else: np_img = image gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img midtone_mask = (gray > midtone_low) & (gray < midtone_high) ratio = midtone_mask.sum() / gray.size if ratio < ratio_threshold: return False if not check_diagonal: return True midtone_uint8 = (midtone_mask.astype(np.uint8)) * 255 edges = cv2.Canny(midtone_uint8, 50, 150, apertureSize=3) lines = cv2.HoughLines(edges, rho=1, theta=np.pi / 180, threshold=80) if lines is None: return False low_rad = np.deg2rad(diagonal_angle_range[0]) high_rad = np.deg2rad(diagonal_angle_range[1]) diagonal_count = 0 for line in lines: theta = line[0][1] if low_rad <= theta <= high_rad or (np.pi - high_rad) <= theta <= (np.pi - low_rad): diagonal_count += 1 return True | False def _local_std_map(gray: np.ndarray, window: int = 5) -> np.ndarray: """局部标准差图(返回值与输入同形状)。""" gray = np.asarray(gray, dtype=np.float32) size = max(3, int(window)) kernel = np.ones((size, size), dtype=np.float32) / (size * size) mean = cv2.filter2D(gray, -1, kernel) sq_mean = cv2.filter2D(gray * gray, -1, kernel) var = sq_mean - mean * mean var = np.maximum(var, 0) return np.sqrt(var) def _line_structuring_kernel(length: int, angle_deg: float) -> np.ndarray: """生成指定角度、长度的线形结构元(用于斜向水印形态学)。""" length = max(3, int(length)) k = np.zeros((length, length), np.uint8) c = length // 2 rad = np.deg2rad(angle_deg) dx = int(round(np.cos(rad) * (c - 1))) dy = int(round(np.sin(rad) * (c - 1))) cv2.line(k, (c - dx, c - dy), (c + dx, c + dy), 1, thickness=1) return k def _line_angle_deg(x1: int, y1: int, x2: int, y2: int) -> float: """线段方向角 [0, 180)(无向)。""" ang = float(np.degrees(np.arctan2(y2 - y1, x2 - x1))) if ang < 0: ang += 180.0 return ang def _angle_in_diagonal_ranges( angle_deg: float, ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((35.0, 55.0), (125.0, 145.0)), ) -> bool: for lo, hi in ranges: if lo <= angle_deg <= hi: return True return False def _angle_distance_deg(a: float, b: float) -> float: """无向角距离 [0, 90]。""" d = abs(float(a) - float(b)) % 180.0 return min(d, 180.0 - d) def _line_length(x1: int, y1: int, x2: int, y2: int) -> float: return float(np.hypot(x2 - x1, y2 - y1)) def _find_dominant_diagonal_angles( segments: list, *, angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)), smooth_sigma: float = 2.0, secondary_peak_ratio: float = 0.35, ) -> Tuple[list, np.ndarray]: """ 按线段长度加权统计角度直方图,取主峰(及次峰)作为本页水印固定方向。 Returns: dominant_angles: 1~2 个主导角度(度) hist_smooth: 长度 180 的平滑直方图 """ hist = np.zeros(180, dtype=np.float64) for x1, y1, x2, y2, ang, length in segments: if not _angle_in_diagonal_ranges(ang, angle_ranges): continue hist[int(ang) % 180] += length if hist.sum() <= 0: return [], hist ksize = max(3, int(smooth_sigma * 4) | 1) hist_smooth = cv2.GaussianBlur( hist.reshape(1, 180).astype(np.float32), (ksize, 1), smooth_sigma ).flatten().astype(np.float64) peaks: list = [] for lo, hi in angle_ranges: lo_i, hi_i = int(lo), int(hi) sub = hist_smooth[lo_i : hi_i + 1] if sub.size == 0 or sub.max() <= 0: continue peak_ang = lo_i + int(sub.argmax()) peaks.append((peak_ang, float(sub.max()))) if not peaks: return [], hist_smooth peaks.sort(key=lambda x: -x[1]) dominant: list = [peaks[0][0]] for ang, val in peaks[1:]: if val >= peaks[0][1] * secondary_peak_ratio: if all(_angle_distance_deg(ang, d) > 15 for d in dominant): dominant.append(ang) return dominant, hist_smooth def _render_angle_histogram(hist: np.ndarray, dominant_angles: list) -> np.ndarray: """角度直方图 debug 图(BGR)。""" h_img, w_img = 120, 360 canvas = np.ones((h_img, w_img, 3), dtype=np.uint8) * 255 if hist.max() <= 0: return canvas norm = (hist / hist.max() * (h_img - 20)).astype(np.int32) for i, h in enumerate(norm): x = int(i * (w_img - 1) / 179) cv2.line(canvas, (x, h_img - 10), (x, h_img - 10 - int(h)), (180, 180, 180), 1) for ang in dominant_angles: x = int(ang * (w_img - 1) / 179) cv2.line(canvas, (x, 0), (x, h_img - 1), (0, 0, 255), 2) cv2.putText(canvas, "angle (deg)", (w_img // 2 - 40, h_img - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 0), 1) return canvas def _build_diag_hough_region_mask( gray: np.ndarray, *, midtone_low: int = 200, midtone_high: int = 254, canny_low: int = 30, canny_high: int = 100, hough_threshold: int = 30, min_line_length: int = 40, max_line_gap: int = 15, angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)), angle_tolerance: float = 5.0, use_angle_statistics: bool = True, secondary_peak_ratio: float = 0.35, min_length_percentile: float = 25.0, line_thickness: int = 10, band_dilate_radius: int = 12, ) -> Tuple[np.ndarray, Dict[str, Any]]: """ 方案 C:Canny + HoughLinesP + 角度直方图统计主峰,仅保留与本页水印方向一致的线段。 """ gray_u8 = np.asarray(gray, dtype=np.uint8) band = ((gray_u8 >= midtone_low) & (gray_u8 < midtone_high)).astype(np.uint8) * 255 edges = cv2.Canny(band, int(canny_low), int(canny_high), apertureSize=3) lines_p = cv2.HoughLinesP( edges, rho=1, theta=np.pi / 180, threshold=int(hough_threshold), minLineLength=int(min_line_length), maxLineGap=int(max_line_gap), ) line_mask = np.zeros_like(gray_u8, dtype=np.uint8) lines_all_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR) lines_filt_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR) diag_candidates: list = [] total_lines = 0 if lines_p is not None: for seg in lines_p: x1, y1, x2, y2 = [int(v) for v in seg[0]] total_lines += 1 ang = _line_angle_deg(x1, y1, x2, y2) length = _line_length(x1, y1, x2, y2) if not _angle_in_diagonal_ranges(ang, angle_ranges): continue diag_candidates.append((x1, y1, x2, y2, ang, length)) cv2.line(lines_all_bgr, (x1, y1), (x2, y2), (128, 128, 128), 1) dominant_angles: list = [] hist_smooth = np.zeros(180, dtype=np.float64) if use_angle_statistics and diag_candidates: dominant_angles, hist_smooth = _find_dominant_diagonal_angles( diag_candidates, angle_ranges=angle_ranges, secondary_peak_ratio=secondary_peak_ratio, ) def _angle_matches(ang: float) -> bool: if not use_angle_statistics or not dominant_angles: return True return any(_angle_distance_deg(ang, d) <= angle_tolerance for d in dominant_angles) angle_matched = [ s for s in diag_candidates if _angle_matches(s[4]) ] if angle_matched and min_length_percentile > 0: lengths = np.array([s[5] for s in angle_matched], dtype=np.float32) len_th = float(np.percentile(lengths, min_length_percentile)) angle_matched = [s for s in angle_matched if s[5] >= len_th] matched_keys = {(s[0], s[1], s[2], s[3]) for s in angle_matched} kept_lines: list = [] for x1, y1, x2, y2, ang, _length in angle_matched: kept_lines.append((x1, y1, x2, y2, ang)) cv2.line(line_mask, (x1, y1), (x2, y2), 255, thickness=int(line_thickness)) cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 0, 255), 2) for x1, y1, x2, y2, _ang, _length in diag_candidates: if (x1, y1, x2, y2) not in matched_keys: cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 180, 255), 1) geom = line_mask > 0 if band_dilate_radius > 0 and np.any(geom): k = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (band_dilate_radius * 2 + 1, band_dilate_radius * 2 + 1) ) geom = cv2.dilate(line_mask, k) > 0 info: Dict[str, Any] = { "hough_total_lines": total_lines, "hough_diag_candidates": len(diag_candidates), "hough_kept_lines": len(kept_lines), "dominant_angles": dominant_angles, "angle_tolerance": angle_tolerance, "geom_mask_ratio": float(geom.sum() / gray_u8.size), "hough_lines_bgr": lines_filt_bgr, "hough_lines_all_bgr": lines_all_bgr, "angle_histogram_bgr": _render_angle_histogram(hist_smooth, dominant_angles), } return geom, info def _compute_block_orientation_debug_maps( gray: np.ndarray, *, block_size: int = 48, ) -> Tuple[np.ndarray, np.ndarray]: """分块 diag/hv 弱边缘占比图(仅 debug 热力图,0~1 float)。""" gray_f = np.asarray(gray, dtype=np.float32) bs = max(4, int(block_size)) h_blocks = gray_f.shape[0] // bs w_blocks = gray_f.shape[1] // bs if h_blocks == 0 or w_blocks == 0: z = np.zeros_like(gray_f, dtype=np.float32) return z, z ph, pw = h_blocks * bs, w_blocks * bs gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3) gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3) mag = np.sqrt(gx * gx + gy * gy) ori = np.arctan2(gy, gx) * 180.0 / np.pi diag = ( ((ori > 25) & (ori < 65)) | ((ori > 115) & (ori < 155)) | ((ori > -155) & (ori < -115)) | ((ori > -65) & (ori < -25)) ) hv = ( ((ori > -20) & (ori < 20)) | ((ori > 160) | (ori < -160)) | ((ori > 70) & (ori < 110)) | ((ori > -110) & (ori < -70)) ) weak = (mag > 1) & (mag < 15) def _to_blocks(arr: np.ndarray) -> np.ndarray: return ( arr[:ph, :pw] .reshape(h_blocks, bs, w_blocks, bs) .transpose(0, 2, 1, 3) .reshape(h_blocks, w_blocks, -1) ) b_diag = _to_blocks(diag) b_hv = _to_blocks(hv) b_weak = _to_blocks(weak) diag_weak = np.sum(b_diag & b_weak, axis=2) hv_weak = np.sum(b_hv & b_weak, axis=2) total_weak = np.sum(b_weak, axis=2) with np.errstate(divide="ignore", invalid="ignore"): diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0).astype(np.float32) hv_ratio = np.where(total_weak > 0, hv_weak / total_weak, 0.0).astype(np.float32) diag_up = np.repeat(np.repeat(diag_ratio, bs, axis=0), bs, axis=1) hv_up = np.repeat(np.repeat(hv_ratio, bs, axis=0), bs, axis=1) diag_full = np.zeros_like(gray_f, dtype=np.float32) hv_full = np.zeros_like(gray_f, dtype=np.float32) diag_full[:ph, :pw] = diag_up hv_full[:ph, :pw] = hv_up return diag_full, hv_full def render_ratio_heatmap(ratio_map: np.ndarray) -> np.ndarray: """将 0~1 浮点占比图转为 BGR 热力图。""" r = np.clip(np.asarray(ratio_map, dtype=np.float32), 0.0, 1.0) u8 = (r * 255).astype(np.uint8) return cv2.applyColorMap(u8, cv2.COLORMAP_JET) def save_watermark_mask_debug_layers( image: np.ndarray, output_dir: Union[str, Path], stem: str, debug: Dict[str, Any], *, image_format: str = "png", ) -> Dict[str, str]: """保存分层 debug 图(方案 D)。""" out_dir = Path(output_dir) out_dir.mkdir(parents=True, exist_ok=True) fmt = (image_format or "png").lstrip(".") paths: Dict[str, str] = {} def _save_overlay(name: str, mask: Optional[np.ndarray], color=(0, 0, 255)) -> None: if mask is None or not np.any(mask): return ov = render_watermark_mask_overlay(image, mask, color=color) p = out_dir / f"{stem}_{name}.{fmt}" cv2.imwrite(str(p), cv2.cvtColor(ov, cv2.COLOR_RGB2BGR) if ov.shape[2] == 3 else ov) paths[name] = str(p) _save_overlay("wm_candidate_overlay", debug.get("wm_candidate")) _save_overlay("geom_region_overlay", debug.get("geom_region"), color=(0, 180, 255)) _save_overlay("geom_candidate_overlay", debug.get("geom_candidate"), color=(0, 255, 0)) _save_overlay("wm_mask_overlay", debug.get("wm_mask"), color=(255, 0, 0)) hough_bgr = debug.get("hough_lines_bgr") if hough_bgr is not None: p = out_dir / f"{stem}_hough_lines.{fmt}" cv2.imwrite(str(p), hough_bgr) paths["hough_lines"] = str(p) hough_all = debug.get("hough_lines_all_bgr") if hough_all is not None: p = out_dir / f"{stem}_hough_lines_all.{fmt}" cv2.imwrite(str(p), hough_all) paths["hough_lines_all"] = str(p) angle_hist = debug.get("angle_histogram_bgr") if angle_hist is not None: p = out_dir / f"{stem}_angle_histogram.{fmt}" cv2.imwrite(str(p), angle_hist) paths["angle_histogram"] = str(p) diag_hm = debug.get("diag_ratio_heatmap") if diag_hm is not None: p = out_dir / f"{stem}_diag_ratio_heatmap.{fmt}" cv2.imwrite(str(p), diag_hm) paths["diag_ratio_heatmap"] = str(p) hv_hm = debug.get("hv_ratio_heatmap") if hv_hm is not None: p = out_dir / f"{stem}_hv_ratio_heatmap.{fmt}" cv2.imwrite(str(p), hv_hm) paths["hv_ratio_heatmap"] = str(p) return paths def _build_diag_region_mask( gray: np.ndarray, *, block_size: int = 48, diag_ratio_thresh: float = 0.20, light_gray_thresh: int = 238, light_ratio_thresh: float = 0.10, min_edge_count: int = 10, dilate_radius: int = 3, ) -> np.ndarray: """ 分块梯度方向检测:返回对角线方向纹理占优的区域掩膜。 原理:水印是45°斜向字符,其梯度主方向在30-60°和120-150°。 分块统计该方向弱边缘占比,高频块标记为水印候选区域。 Returns: bool ndarray, 与 gray 同形状,True=疑似斜向水印区域。 """ gray_f = np.asarray(gray, dtype=np.float32) img_h, img_w = gray_f.shape bs = max(4, int(block_size)) # Sobel 梯度 gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3) gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3) mag = np.sqrt(gx * gx + gy * gy) ori = np.arctan2(gy, gx) * 180.0 / np.pi # 对角线方向 (±45° 附近,即梯度 30-65° / 115-155°) diag = ( ((ori > 25) & (ori < 65)) | ((ori > 115) & (ori < 155)) | ((ori > -155) & (ori < -115)) | ((ori > -65) & (ori < -25)) ) h_blocks = img_h // bs w_blocks = img_w // bs if h_blocks == 0 or w_blocks == 0: return np.zeros_like(gray, dtype=bool) ph, pw = h_blocks * bs, w_blocks * bs # 分块统计 def _to_blocks(arr: np.ndarray) -> np.ndarray: return arr[:ph, :pw].reshape(h_blocks, bs, w_blocks, bs).transpose(0, 2, 1, 3).reshape(h_blocks, w_blocks, -1) block_mag = _to_blocks(mag) block_diag = _to_blocks(diag) block_gray = _to_blocks(gray_f) weak = (block_mag > 1) & (block_mag < 15) diag_weak = np.sum(block_diag & weak, axis=2) total_weak = np.sum(weak, axis=2) with np.errstate(divide="ignore", invalid="ignore"): diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0) light_ratio = np.mean(block_gray >= light_gray_thresh, axis=2) wm_blocks = ( (diag_ratio > diag_ratio_thresh) & (light_ratio > light_ratio_thresh) & (total_weak > min_edge_count) ) # 展开为像素掩膜 wm_block_mask = np.repeat(np.repeat(wm_blocks, bs, axis=0), bs, axis=1) full_mask = np.zeros(gray_f.shape, dtype=bool) full_mask[:ph, :pw] = wm_block_mask if dilate_radius > 0: k = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1) ) full_mask = cv2.dilate(full_mask.astype(np.uint8), k) > 0 return full_mask def _build_seal_protect_mask( bgr: np.ndarray, *, hue_high: int = 15, sat_min: int = 40, value_min: int = 30, ) -> np.ndarray: """红色/公章区域保护掩膜(True=保护,不置白)。""" hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV) lower1 = np.array([0, sat_min, value_min], dtype=np.uint8) upper1 = np.array([hue_high, 255, 255], dtype=np.uint8) lower2 = np.array([170, sat_min, value_min], dtype=np.uint8) upper2 = np.array([180, 255, 255], dtype=np.uint8) m1 = cv2.inRange(hsv, lower1, upper1) m2 = cv2.inRange(hsv, lower2, upper2) m2 = cv2.inRange(hsv, lower2, upper2) return (m1 > 0) | (m2 > 0) def _build_text_edge_protect( gray: np.ndarray, *, edge_window: int = 5, edge_std_thresh: float = 6.0, dilate_radius: int = 1, ) -> np.ndarray: """基于局部方差的笔画边缘保护掩膜(True=保护,不置白)。""" local_std = _local_std_map(gray, window=edge_window) edge_mask = local_std >= edge_std_thresh if dilate_radius > 0: k = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1) ) edge_mask = cv2.dilate(edge_mask.astype(np.uint8), k) > 0 return edge_mask.astype(bool) def _build_watermark_mask_light_on_white( gray: np.ndarray, *, bgr: Optional[np.ndarray] = None, light_gray_low: int = 236, light_gray_high: int = 253, whiten_gray_low: int = 200, text_protect_gray_max: int = 130, text_protect_percentile: Optional[float] = None, background_threshold: int = 248, morph_close_kernel: int = 0, morph_close_iter: int = 1, morph_dilate_kernel: int = 0, morph_dilate_iter: int = 1, min_component_area: int = 200, low_variance_thresh: float = 0.0, edge_window: int = 5, direction_filter: str = "hough", debug_block_maps: bool = True, debug_block_size: int = 48, hough_midtone_low: int = 200, hough_midtone_high: int = 254, hough_canny_low: int = 30, hough_canny_high: int = 100, hough_threshold: int = 25, hough_min_line_length: int = 35, hough_max_line_gap: int = 18, hough_line_thickness: int = 12, hough_band_dilate_radius: int = 14, hough_angle_tolerance: float = 5.0, hough_use_angle_statistics: bool = True, hough_secondary_peak_ratio: float = 0.35, hough_min_length_percentile: float = 25.0, diag_block_size: int = 0, diag_ratio_thresh: float = 0.20, diag_light_ratio_thresh: float = 0.10, diag_min_edge_count: int = 10, diag_dilate_radius: int = 3, seal_protect: bool = True, seal_hue_high: int = 15, seal_sat_min: int = 40, ) -> Tuple[np.ndarray, Dict[str, Any]]: """ 白底流水水印掩膜(方案 C + E)。 1. Hough 斜向线段 → geom_region(几何限定区域) 2. wm_candidate = 浅色带且非正文保护 3. wm_mask = geom_region(置白区域由几何约束;实际白化时再 g>=light_gray_low) 4. debug 输出 candidate / geom / 交集 / 热力图 """ gray_arr = np.asarray(gray) bg_th = int(background_threshold) low = int(light_gray_low) high = int(light_gray_high) if text_protect_gray_max > 0: t_protect = float(text_protect_gray_max) else: dark = gray_arr[gray_arr < min(130, bg_th)] if dark.size > 0 and text_protect_percentile is not None: t_protect = float(np.percentile(dark, text_protect_percentile)) else: t_protect = 120.0 text_protect = gray_arr <= t_protect low = max(low, int(t_protect) + 25) wm_candidate = (gray_arr >= low) & (gray_arr < high) & (~text_protect) direction = (direction_filter or "hough").lower().strip() hough_info: Dict[str, Any] = {} geom_region = np.zeros_like(gray_arr, dtype=bool) if direction == "hough": geom_region, hough_info = _build_diag_hough_region_mask( gray_arr, midtone_low=hough_midtone_low, midtone_high=hough_midtone_high, canny_low=hough_canny_low, canny_high=hough_canny_high, hough_threshold=hough_threshold, min_line_length=hough_min_line_length, max_line_gap=hough_max_line_gap, angle_tolerance=hough_angle_tolerance, use_angle_statistics=hough_use_angle_statistics, secondary_peak_ratio=hough_secondary_peak_ratio, min_length_percentile=hough_min_length_percentile, line_thickness=hough_line_thickness, band_dilate_radius=hough_band_dilate_radius, ) elif diag_block_size > 0: geom_region = _build_diag_region_mask( gray_arr, block_size=diag_block_size, diag_ratio_thresh=diag_ratio_thresh, light_gray_thresh=low, light_ratio_thresh=diag_light_ratio_thresh, min_edge_count=diag_min_edge_count, dilate_radius=diag_dilate_radius, ) geom_candidate = geom_region & wm_candidate wm_mask = geom_region.copy() if min_component_area > 0 and np.any(wm_mask): n_labels, labels, stats, _ = cv2.connectedComponentsWithStats( wm_mask.astype(np.uint8), connectivity=8 ) filtered = np.zeros_like(wm_mask) for i in range(1, n_labels): if stats[i, cv2.CC_STAT_AREA] >= min_component_area: filtered[labels == i] = True if np.any(filtered): wm_mask = filtered elif np.any(geom_region): wm_mask = geom_region seal_mask = np.zeros_like(wm_mask, dtype=bool) if seal_protect and bgr is not None and bgr.ndim == 3: seal_mask = _build_seal_protect_mask( bgr, hue_high=seal_hue_high, sat_min=seal_sat_min ) wm_mask &= ~seal_mask midtone = (gray_arr >= low) & (gray_arr < high) debug: Dict[str, Any] = { "mask_mode": "light_on_white", "direction_filter": direction, "light_gray_low": low, "light_gray_high": high, "midtone_ratio": float(midtone.sum() / gray_arr.size), "wm_candidate_ratio": float(wm_candidate.sum() / gray_arr.size), "geom_mask_ratio": float(geom_region.sum() / gray_arr.size), "geom_candidate_ratio": float(geom_candidate.sum() / gray_arr.size), "wm_mask_ratio": float(wm_mask.sum() / gray_arr.size), "T_protect": t_protect, "text_protect_gray_max": text_protect_gray_max, "text_protect": text_protect, "seal_protect": seal_mask, "wm_candidate": wm_candidate, "geom_region": geom_region, "geom_candidate": geom_candidate, "diag_region": geom_region, "wm_mask": wm_mask, "whiten_gray_low": int(whiten_gray_low), "hough_lines_bgr": hough_info.get("hough_lines_bgr"), "hough_lines_all_bgr": hough_info.get("hough_lines_all_bgr"), "angle_histogram_bgr": hough_info.get("angle_histogram_bgr"), "dominant_angles": hough_info.get("dominant_angles", []), "hough_kept_lines": hough_info.get("hough_kept_lines", 0), "hough_diag_candidates": hough_info.get("hough_diag_candidates", 0), "hough_total_lines": hough_info.get("hough_total_lines", 0), } if debug_block_maps: bs = debug_block_size if debug_block_size > 0 else 48 diag_map, hv_map = _compute_block_orientation_debug_maps(gray_arr, block_size=bs) debug["diag_ratio_heatmap"] = render_ratio_heatmap(diag_map) debug["hv_ratio_heatmap"] = render_ratio_heatmap(hv_map) return wm_mask, debug def build_watermark_mask( gray: np.ndarray, *, bgr: Optional[np.ndarray] = None, mask_mode: str = "diagonal_midtone", light_gray_low: int = 236, light_gray_high: int = 253, whiten_gray_low: int = 200, text_protect_gray_max: int = 130, morph_close_kernel: int = 0, morph_close_iter: int = 1, morph_dilate_kernel: int = 0, morph_dilate_iter: int = 1, low_variance_thresh: float = 0.0, edge_window: int = 5, direction_filter: str = "hough", debug_block_maps: bool = True, debug_block_size: int = 48, hough_midtone_low: int = 200, hough_midtone_high: int = 254, hough_canny_low: int = 30, hough_canny_high: int = 100, hough_threshold: int = 25, hough_min_line_length: int = 35, hough_max_line_gap: int = 18, hough_line_thickness: int = 12, hough_band_dilate_radius: int = 14, hough_angle_tolerance: float = 5.0, hough_use_angle_statistics: bool = True, hough_secondary_peak_ratio: float = 0.35, hough_min_length_percentile: float = 25.0, diag_block_size: int = 0, diag_ratio_thresh: float = 0.20, diag_light_ratio_thresh: float = 0.10, diag_min_edge_count: int = 10, diag_dilate_radius: int = 3, # diagonal_midtone 参数 midtone_low: int = 100, midtone_high: int = 220, remove_horizontal_vertical: bool = True, diagonal_enhance: bool = True, diagonal_kernel_length: int = 25, horizontal_kernel_length: int = 35, vertical_kernel_length: int = 35, morph_open_kernel: int = 2, dmorph_close_kernel: int = 3, min_component_area: int = 200, text_protect_percentile: float = 10.0, background_threshold: int = 248, seal_protect: bool = True, seal_hue_high: int = 15, seal_sat_min: int = 40, ) -> Tuple[np.ndarray, Dict[str, Any]]: """ 构建水印掩膜 wm_mask(True=疑似水印像素)。 mask_mode: light_on_white — Hough 斜向几何带 + 浅色白化(方案 C/E) diagonal_midtone — 中间调 + 斜向形态学(旧逻辑) """ gray = np.asarray(gray) if gray.ndim != 2: raise ValueError("build_watermark_mask expects single-channel grayscale") mode = (mask_mode or "light_on_white").lower().strip() if mode == "light_on_white": return _build_watermark_mask_light_on_white( gray, bgr=bgr, light_gray_low=light_gray_low, light_gray_high=light_gray_high, whiten_gray_low=whiten_gray_low, text_protect_gray_max=text_protect_gray_max, text_protect_percentile=text_protect_percentile, background_threshold=background_threshold, morph_close_kernel=morph_close_kernel, morph_close_iter=morph_close_iter, morph_dilate_kernel=morph_dilate_kernel, morph_dilate_iter=morph_dilate_iter, low_variance_thresh=low_variance_thresh, edge_window=edge_window, min_component_area=min_component_area, direction_filter=direction_filter, debug_block_maps=debug_block_maps, debug_block_size=debug_block_size, hough_midtone_low=hough_midtone_low, hough_midtone_high=hough_midtone_high, hough_canny_low=hough_canny_low, hough_canny_high=hough_canny_high, hough_threshold=hough_threshold, hough_min_line_length=hough_min_line_length, hough_max_line_gap=hough_max_line_gap, hough_line_thickness=hough_line_thickness, hough_band_dilate_radius=hough_band_dilate_radius, hough_angle_tolerance=hough_angle_tolerance, hough_use_angle_statistics=hough_use_angle_statistics, hough_secondary_peak_ratio=hough_secondary_peak_ratio, hough_min_length_percentile=hough_min_length_percentile, diag_block_size=diag_block_size, diag_ratio_thresh=diag_ratio_thresh, diag_light_ratio_thresh=diag_light_ratio_thresh, diag_min_edge_count=diag_min_edge_count, diag_dilate_radius=diag_dilate_radius, seal_protect=seal_protect, seal_hue_high=seal_hue_high, seal_sat_min=seal_sat_min, ) midtone = (gray > midtone_low) & (gray < midtone_high) mid_u8 = (midtone.astype(np.uint8)) * 255 horiz = np.zeros_like(midtone, dtype=bool) vert = np.zeros_like(midtone, dtype=bool) if remove_horizontal_vertical: kh = cv2.getStructuringElement( cv2.MORPH_RECT, (max(3, horizontal_kernel_length), 1) ) kv = cv2.getStructuringElement( cv2.MORPH_RECT, (1, max(3, vertical_kernel_length)) ) horiz = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kh) > 0 vert = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kv) > 0 # 中间调去掉明显横竖线(保留斜向水印) candidate = midtone & ~(horiz | vert) if diagonal_enhance: k45 = _line_structuring_kernel(diagonal_kernel_length, 45) k135 = _line_structuring_kernel(diagonal_kernel_length, 135) d45 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k45) > 0 d135 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k135) > 0 direction = d45 | d135 dilate_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7)) near_diag = cv2.dilate(direction.astype(np.uint8), dilate_k) > 0 # 斜向结构足够时收窄到斜向附近;否则保留「中间调减横竖」结果 if near_diag.sum() > gray.size * 0.001: candidate = candidate & near_diag cand_u8 = (candidate.astype(np.uint8)) * 255 if morph_open_kernel > 0: k_open = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (morph_open_kernel, morph_open_kernel) ) cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_OPEN, k_open) if dmorph_close_kernel > 0: k_close = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (dmorph_close_kernel, dmorph_close_kernel) ) cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_CLOSE, k_close) wm_mask = cand_u8 > 0 if min_component_area > 0: n_labels, labels, stats, _ = cv2.connectedComponentsWithStats( wm_mask.astype(np.uint8), connectivity=8 ) filtered = np.zeros_like(wm_mask) for i in range(1, n_labels): if stats[i, cv2.CC_STAT_AREA] >= min_component_area: filtered[labels == i] = True wm_mask = filtered non_bg = gray[gray < background_threshold] if non_bg.size > 0: t_protect = float(np.percentile(non_bg, text_protect_percentile)) else: t_protect = 85.0 t_protect = max(t_protect, float(midtone_low)) text_protect = gray <= t_protect midtone_ratio = float(midtone.sum() / gray.size) wm_ratio = float(wm_mask.sum() / gray.size) # 掩膜过小:回退为「中间调减横竖」或整块中间调(满版斜纹水印常见) min_wm_ratio = max(0.005, midtone_ratio * 0.12) if wm_ratio < min_wm_ratio: relaxed = midtone & ~(horiz | vert) & (~text_protect) if relaxed.sum() / gray.size < min_wm_ratio: relaxed = midtone & (~text_protect) wm_mask = relaxed wm_ratio = float(wm_mask.sum() / gray.size) seal_mask = np.zeros_like(wm_mask, dtype=bool) if seal_protect and bgr is not None and bgr.ndim == 3: seal_mask = _build_seal_protect_mask( bgr, hue_high=seal_hue_high, sat_min=seal_sat_min ) debug: Dict[str, Any] = { "mask_mode": "diagonal_midtone", "midtone_ratio": midtone_ratio, "wm_mask_ratio": wm_ratio, "T_protect": t_protect, "text_protect": text_protect, "seal_protect": seal_mask, "midtone_mask": midtone, "wm_mask": wm_mask, } return wm_mask, debug def remove_watermark_masked_adaptive( gray: np.ndarray, *, bgr: Optional[np.ndarray] = None, mask_cfg: Optional[Dict[str, Any]] = None, adaptive_cfg: Optional[Dict[str, Any]] = None, threshold_fallback: int = 175, morph_close_kernel: int = 0, ) -> Tuple[np.ndarray, Dict[str, Any]]: """ 掩膜内置白(whiten_mode=mask_fill)或掩膜内动态阈值(threshold_in_mask)。 掩膜为空时回退全局 threshold_fallback。 """ gray = np.asarray(gray).copy() mcfg: Dict[str, Any] = { "mask_mode": "light_on_white", "light_gray_low": 236, "light_gray_high": 253, "whiten_gray_low": 200, "text_protect_gray_max": 130, "morph_close_kernel": 0, "morph_close_iter": 1, "morph_dilate_kernel": 0, "morph_dilate_iter": 1, "low_variance_thresh": 0.0, "edge_window": 5, "min_component_area": 200, "direction_filter": "hough", "debug_block_maps": True, "debug_block_size": 48, "hough_midtone_low": 200, "hough_midtone_high": 254, "hough_canny_low": 30, "hough_canny_high": 100, "hough_threshold": 25, "hough_min_line_length": 35, "hough_max_line_gap": 18, "hough_line_thickness": 12, "hough_band_dilate_radius": 14, "hough_angle_tolerance": 5.0, "hough_use_angle_statistics": True, "hough_secondary_peak_ratio": 0.35, "hough_min_length_percentile": 25.0, "diag_block_size": 0, "diag_ratio_thresh": 0.20, "diag_light_ratio_thresh": 0.10, "diag_min_edge_count": 10, "diag_dilate_radius": 3, "midtone_low": 100, "midtone_high": 220, "remove_horizontal_vertical": True, "diagonal_enhance": True, "diagonal_kernel_length": 25, "horizontal_kernel_length": 35, "vertical_kernel_length": 35, "morph_open_kernel": 2, "dmorph_close_kernel": 3, "text_protect_percentile": 10.0, "background_threshold": 248, "seal_protect": True, "seal_hue_high": 15, "seal_sat_min": 40, } mcfg.update(mask_cfg or {}) mask_mode = str(mcfg.get("mask_mode", "light_on_white")).lower().strip() # light_on_white 默认 mask_fill acfg: Dict[str, Any] = { "whiten_mode": None, "text_percentile": 10.0, "watermark_percentile": 88.0, "background_percentile": 95.0, "background_threshold": 248, "wm_margin": 12, "text_protect_max": 120, } acfg.update(adaptive_cfg or {}) whiten_mode = acfg.get("whiten_mode") if not whiten_mode: whiten_mode = ( "mask_fill" if mask_mode == "light_on_white" else "threshold_in_mask" ) whiten_mode = str(whiten_mode).lower().strip() wm_mask, debug = build_watermark_mask(gray, bgr=bgr, **mcfg) if not np.any(wm_mask): cleaned = gray.copy() cleaned[gray > threshold_fallback] = 255 debug["mode"] = "fallback_threshold" debug["threshold_fallback"] = threshold_fallback if morph_close_kernel > 0: kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8) cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel) return cleaned, debug bg_th = int(acfg["background_threshold"]) bg_pixels = gray[gray >= bg_th] if bg_pixels.size > 0: b_level = float(np.percentile(bg_pixels, acfg["background_percentile"])) else: b_level = 250.0 if mask_mode == "light_on_white": t_protect = float(debug.get("T_protect", 150.0)) else: non_bg = gray[gray < bg_th] if non_bg.size > 0: t_protect = float(np.percentile(non_bg, acfg["text_percentile"])) else: t_protect = float(debug.get("T_protect", 85.0)) t_protect = min(t_protect, float(acfg["text_protect_max"])) t_protect = max(t_protect, float(mcfg.get("midtone_low", 100))) text_protect = debug["text_protect"] seal_protect = debug["seal_protect"] t_wm: Optional[float] = None if whiten_mode == "mask_fill": # 几何带内:g>=whiten_gray_low 置白;g<=130 正文硬保护(方案 E) wm_gray_low = float( mcfg.get("whiten_gray_low", debug.get("whiten_gray_low", 200)) ) to_white = ( wm_mask & (gray >= wm_gray_low) & (gray < int(mcfg.get("light_gray_high", 254))) & (~text_protect) & (~seal_protect) ) else: mask_vals = gray[wm_mask] if mask_vals.size > 0: t_wm = float(np.percentile(mask_vals, acfg["watermark_percentile"])) else: t_wm = t_protect + 0.45 * (b_level - t_protect) margin = float(acfg["wm_margin"]) t_wm = max(t_wm, t_protect + margin) t_wm = min(t_wm, b_level - 3.0) t_wm = min(t_wm, float(mcfg.get("midtone_high", 220)) - 5.0) to_white = wm_mask & (gray >= t_wm) & (~text_protect) & (~seal_protect) cleaned = gray.copy() cleaned[to_white] = 255 if morph_close_kernel > 0: kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8) cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel) debug.update( { "mode": "masked_adaptive", "mask_mode": mask_mode, "whiten_mode": whiten_mode, "T_wm": t_wm, "T_protect": t_protect, "B_level": b_level, "white_pixel_ratio": float(to_white.sum() / gray.size), "threshold_fallback": threshold_fallback, } ) return cleaned, debug def _image_to_gray_and_bgr( image: Union[np.ndarray, Image.Image], ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """统一为灰度 + 可选 BGR(用于掩膜公章保护)。""" if isinstance(image, Image.Image): pil_img = image.convert("RGB") if image.mode == "RGBA" else image np_img = np.array(pil_img) np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR) else: np_img = image.copy() if np_img.ndim == 3: bgr = np_img gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) else: bgr = None gray = np_img return gray, bgr def _enhance_text_restore( gray: np.ndarray, *, background_threshold: int = 248, text_lo_percentile: float = 1.0, text_hi_percentile: float = 99.0, text_black_target: int = 85, ) -> np.ndarray: """ 仅对非背景像素做动态范围压缩,将最深笔画拉向 text_black_target(默认 ~85,接近扫描件原图)。 背景(>= background_threshold)保持白色,避免整图 gamma 导致背景发灰。 """ result = gray.copy() bg_th = int(np.clip(background_threshold, 200, 255)) text_mask = gray < bg_th if not np.any(text_mask): return result vals = gray[text_mask].astype(np.float32) lo = float(np.percentile(vals, text_lo_percentile)) hi = float(np.percentile(vals, text_hi_percentile)) target = int(np.clip(text_black_target, 10, 200)) if hi <= lo + 1.0: return result stretched = (vals - lo) * target / (hi - lo) result[text_mask] = np.clip(stretched, 0, 255).astype(np.uint8) return result def enhance_document_contrast( gray: np.ndarray, method: str = "text_restore", *, clip_limit: float = 2.0, tile_grid_size: int = 8, gamma: float = 0.85, black_percentile: float = 2.0, white_percentile: float = 98.0, background_threshold: int = 248, text_lo_percentile: float = 1.0, text_hi_percentile: float = 99.0, text_black_target: int = 85, ) -> np.ndarray: """ 文档灰度图对比度增强(常用于去水印后恢复笔画深度)。 Args: gray: 单通道 uint8 灰度图 method: text_restore | clahe | gamma | linear clip_limit: CLAHE 对比度限制 tile_grid_size: CLAHE 分块大小 gamma: gamma 校正指数,<1 加深文字(去水印后发浅时适用) black_percentile: linear 拉伸下分位(映射到 0) white_percentile: linear 拉伸上分位(映射到 255) background_threshold: text_restore 背景阈值(>= 视为白底不处理) text_lo_percentile: text_restore 笔画下分位 text_hi_percentile: text_restore 笔画上分位(映射到 text_black_target) text_black_target: text_restore 最深笔画目标灰度(越小越深,建议 75~95) Returns: 增强后的灰度图 """ if gray is None or gray.size == 0: return gray if gray.ndim != 2: raise ValueError("enhance_document_contrast expects single-channel grayscale image") method = (method or "text_restore").lower().strip() if method == "text_restore": return _enhance_text_restore( gray, background_threshold=background_threshold, text_lo_percentile=text_lo_percentile, text_hi_percentile=text_hi_percentile, text_black_target=text_black_target, ) if method == "gamma": gamma = max(0.1, min(float(gamma), 3.0)) inv_gamma = 1.0 / gamma table = np.array( [((i / 255.0) ** inv_gamma) * 255 for i in range(256)], dtype=np.uint8, ) return cv2.LUT(gray, table) if method == "linear": p_low = float(np.percentile(gray, black_percentile)) p_high = float(np.percentile(gray, white_percentile)) if p_high <= p_low + 1.0: return gray stretched = (gray.astype(np.float32) - p_low) * 255.0 / (p_high - p_low) return np.clip(stretched, 0, 255).astype(np.uint8) # 默认 CLAHE:局部对比度,适合扫描件 tile = max(2, int(tile_grid_size)) clahe = cv2.createCLAHE( clipLimit=max(0.1, float(clip_limit)), tileGridSize=(tile, tile), ) return clahe.apply(gray) def apply_contrast_enhancement_config( gray: np.ndarray, contrast_cfg: Optional[Dict[str, Any]], ) -> np.ndarray: """按配置字典应用对比度增强;未启用时原样返回。""" if not contrast_cfg or not contrast_cfg.get("enabled", False): return gray return enhance_document_contrast( gray, method=contrast_cfg.get("method", "text_restore"), clip_limit=contrast_cfg.get("clip_limit", 2.0), tile_grid_size=contrast_cfg.get("tile_grid_size", 8), gamma=contrast_cfg.get("gamma", 0.85), black_percentile=contrast_cfg.get("black_percentile", 2.0), white_percentile=contrast_cfg.get("white_percentile", 98.0), background_threshold=contrast_cfg.get("background_threshold", 248), text_lo_percentile=contrast_cfg.get("text_lo_percentile", 1.0), text_hi_percentile=contrast_cfg.get("text_hi_percentile", 99.0), text_black_target=contrast_cfg.get("text_black_target", 75), ) def remove_watermark_from_image( image: Union[np.ndarray, Image.Image], threshold: int = 160, morph_close_kernel: int = 2, return_pil: Optional[bool] = None, watermark_removal_cfg: Optional[Dict[str, Any]] = None, removal_debug: Optional[Dict[str, Any]] = None, ) -> Union[np.ndarray, Image.Image]: """ 去除图像中的浅色斜向文字水印,返回灰度图。 method(watermark_removal_cfg): threshold(默认): gray > threshold → 255 masked / masked_adaptive: 掩膜 + 掩膜内动态阈值 Args: image: 输入图像(PIL.Image 或 np.ndarray BGR/RGB/灰度)。 threshold: 全局阈值或掩膜失败时的回退阈值。 morph_close_kernel: 形态学闭运算核大小,0 跳过。 watermark_removal_cfg: 完整配置(含 method / mask / adaptive)。 removal_debug: 若传入 dict,写入掩膜与 T_wm 等调试字段。 Returns: 去除水印后的灰度图:PIL.Image(mode='L') 或 np.ndarray(HxW, uint8)。 """ input_is_pil = isinstance(image, Image.Image) cfg = watermark_removal_cfg or {} method = str(cfg.get("method") or "threshold").lower().strip() gray, bgr = _image_to_gray_and_bgr(image) if method in ("masked", "masked_adaptive"): cleaned, dbg = remove_watermark_masked_adaptive( gray, bgr=bgr, mask_cfg=cfg.get("mask") if isinstance(cfg.get("mask"), dict) else None, adaptive_cfg=cfg.get("adaptive") if isinstance(cfg.get("adaptive"), dict) else None, threshold_fallback=threshold, morph_close_kernel=morph_close_kernel, ) if removal_debug is not None: removal_debug.clear() removal_debug.update(dbg) else: cleaned = gray.copy() cleaned[gray > threshold] = 255 if morph_close_kernel > 0: kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8) cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel) if removal_debug is not None: removal_debug.clear() removal_debug.update({"mode": "threshold", "threshold": threshold}) should_return_pil = input_is_pil if return_pil is None else return_pil return Image.fromarray(cleaned, mode='L') if should_return_pil else cleaned def remove_watermark_from_image_rgb( image: Union[np.ndarray, Image.Image], threshold: int = 160, morph_close_kernel: int = 2, return_pil: Optional[bool] = None, contrast_enhancement: Optional[Dict[str, Any]] = None, apply_watermark_removal: bool = True, watermark_removal_cfg: Optional[Dict[str, Any]] = None, removal_debug: Optional[Dict[str, Any]] = None, ) -> Union[np.ndarray, Image.Image]: """ 去除水印并返回 RGB 三通道图像。 与 remove_watermark_from_image 逻辑相同,但输出为 RGB(三通道), 方便直接传入布局检测、OCR 等需要彩色输入的下游模型。 Args: contrast_enhancement: 对比度增强配置(含 enabled / method 等),见 apply_contrast_enhancement_config apply_watermark_removal: False 时跳过阈值抹白,仅做对比度增强(若启用) Args/Returns: 同 remove_watermark_from_image,但输出为 RGB/BGR 三通道。 """ input_is_pil = isinstance(image, Image.Image) if apply_watermark_removal: gray_result = remove_watermark_from_image( image, threshold, morph_close_kernel, return_pil=False, watermark_removal_cfg=watermark_removal_cfg, removal_debug=removal_debug, ) else: if isinstance(image, Image.Image): np_img = np.array(image.convert("RGB")) np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR) else: np_img = image.copy() gray_result = ( cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img ) gray_result = apply_contrast_enhancement_config(gray_result, contrast_enhancement) rgb_np = cv2.cvtColor(gray_result, cv2.COLOR_GRAY2BGR) should_return_pil = input_is_pil if return_pil is None else return_pil if should_return_pil: return Image.fromarray(cv2.cvtColor(rgb_np, cv2.COLOR_BGR2RGB)) return rgb_np def render_watermark_mask_overlay( image: np.ndarray, wm_mask: np.ndarray, *, color: Tuple[int, int, int] = (0, 0, 255), alpha: float = 0.45, ) -> np.ndarray: """在原图上叠加红色半透明水印掩膜,供调试图保存。""" if image.ndim == 2: base = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) elif image.shape[2] == 3: base = image.copy() if image.max() <= 1: base = (image * 255).astype(np.uint8) else: base = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR) overlay = base.copy() overlay[wm_mask] = color return cv2.addWeighted(base, 1.0 - alpha, overlay, alpha, 0) def _image_to_bgr_for_debug(img: np.ndarray) -> np.ndarray: """将 ndarray 转为 BGR,供 cv2.imwrite 使用。""" if img.ndim == 2: return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) out = img.copy() if out.shape[2] == 3: return cv2.cvtColor(out, cv2.COLOR_RGB2BGR) return out def save_watermark_removal_debug( before: Union[np.ndarray, Image.Image], after: Union[np.ndarray, Image.Image], output_dir: Union[str, Path], page_name: str, *, processing_params: Optional[Dict[str, Any]] = None, image_format: str = "png", save_compare: bool = True, subdir: str = "watermark_removal", mask_overlay: Optional[np.ndarray] = None, ) -> Dict[str, str]: """ 保存去水印调试图(before / after / compare / meta.json)。 与 universal_doc_parser 的 debug_comparison 目录结构一致: ``{output_dir}/debug_comparison/{subdir}/`` Args: before: 处理前图像(RGB/BGR/灰度) after: 处理后图像 output_dir: 输出根目录(通常为 pipeline 或工具的输出目录) page_name: 文件名前缀(如 ``doc_page_002``) processing_params: 写入 meta.json 的参数(threshold、contrast_enhancement 等) image_format: 图片格式,png/jpg save_compare: 是否保存左右拼接对比图 subdir: debug_comparison 下的子目录名 Returns: 已保存文件路径字典(before/after/compare/meta,未保存的键省略) """ if isinstance(before, Image.Image): before = np.array(before) if isinstance(after, Image.Image): after = np.array(after) root = Path(output_dir) debug_dir = root / "debug_comparison" / subdir debug_dir.mkdir(parents=True, exist_ok=True) fmt = (image_format or "png").lstrip(".") before_bgr = _image_to_bgr_for_debug(before) after_bgr = _image_to_bgr_for_debug(after) paths: Dict[str, str] = {} before_path = debug_dir / f"{page_name}_watermark_before.{fmt}" after_path = debug_dir / f"{page_name}_watermark_after.{fmt}" cv2.imwrite(str(before_path), before_bgr) cv2.imwrite(str(after_path), after_bgr) paths["before"] = str(before_path) paths["after"] = str(after_path) if save_compare: h = max(before_bgr.shape[0], after_bgr.shape[0]) if before_bgr.shape[0] != h: before_bgr = cv2.resize(before_bgr, (before_bgr.shape[1], h)) if after_bgr.shape[0] != h: after_bgr = cv2.resize(after_bgr, (after_bgr.shape[1], h)) compare = np.hstack([before_bgr, after_bgr]) compare_path = debug_dir / f"{page_name}_watermark_compare.{fmt}" cv2.imwrite(str(compare_path), compare) paths["compare"] = str(compare_path) logger.info(f"Saved watermark compare: {compare_path}") if mask_overlay is not None: mask_bgr = _image_to_bgr_for_debug(mask_overlay) mask_path = debug_dir / f"{page_name}_watermark_mask.{fmt}" cv2.imwrite(str(mask_path), mask_bgr) paths["mask"] = str(mask_path) meta: Dict[str, Any] = {"page_name": page_name} if processing_params: _skip_meta = ( "midtone_mask", "wm_mask", "wm_candidate", "geom_region", "geom_candidate", "diag_region", "text_protect", "seal_protect", "hough_lines_bgr", "diag_ratio_heatmap", "hv_ratio_heatmap", ) meta_params = { k: v for k, v in processing_params.items() if k not in _skip_meta } meta.update(meta_params) else: meta.update({}) meta["before"] = paths["before"] meta["after"] = paths["after"] if "compare" in paths: meta["compare"] = paths["compare"] meta_path = debug_dir / f"{page_name}_watermark_meta.json" meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") paths["meta"] = str(meta_path) logger.info(f"Saved watermark debug: {before_path}, {after_path}") return paths # ───────────────────────────────────────────────────────────────────────────── # PDF 层级水印去除(文字型 PDF,保留可搜索性) # ───────────────────────────────────────────────────────────────────────────── def _is_watermark_xobj(doc, xref: int, obj_str: str) -> bool: """ 判断一个 Form XObject 是否为水印。 启发式规则(满足其一即视为水印): 1. 含旋转变换矩阵(cm 指令 sin/cos 分量非零),无论是否有 /Group 2. 有透明度组(/Group)且内容流包含透明度操作符(ca/CA) 3. 有透明度组且内容流体积 > 2KB(大量重复绘图 = 平铺水印) """ if "/Form" not in obj_str: return False try: stream = doc.xref_stream(xref) if not stream: return False stream_text = stream.decode("latin-1", errors="ignore") except Exception: return False has_group = "/Group" in obj_str cm_pattern = re.compile( r"([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+[-\d.]+\s+[-\d.]+\s+cm" ) for m in cm_pattern.finditer(stream_text): a, b, c, d = float(m.group(1)), float(m.group(2)), float(m.group(3)), float(m.group(4)) if abs(b) > 0.1 or abs(c) > 0.1: return True if not has_group: return False if re.search(r'\b(ca|CA)\s+[0-9.]+', stream_text) or re.search(r'[0-9.]+\s+(ca|CA)\b', stream_text): return True if len(stream_text) > 2048: return True return False def _is_watermark_image_xobj(doc, xref: int, obj_str: str) -> bool: """ 判断一个 Image XObject 是否为水印背景图。 判断规则(全部满足): 1. /Subtype /Image 2. 有 /SMask(半透明) 3. 宽 >= 600 且 高 >= 800(全页尺寸,排除小图标) 4. 解码后像素均值 >= 240(近乎全白,水印文字稀疏) """ if "/Image" not in obj_str or "/SMask" not in obj_str: return False w_m = re.search(r'/Width\s+(\d+)', obj_str) h_m = re.search(r'/Height\s+(\d+)', obj_str) if not w_m or not h_m: return False if int(w_m.group(1)) < 600 or int(h_m.group(1)) < 800: return False try: from io import BytesIO img_info = doc.extract_image(xref) pil_img = Image.open(BytesIO(img_info["image"])).convert("L") return float(np.array(pil_img).mean()) >= 240.0 except Exception: return False def _blank_watermark_image(doc, img_xref: int) -> None: """ 将水印 Image XObject 的 RGB 流和 SMask 替换为全白/全不透明。 关键点:必须先移除 /DecodeParms(Predictor 11),再调用 update_stream。 否则渲染器在 FlateDecode 之后还会尝试 Predictor 解码,失败后回退原始数据, 水印依然可见。 """ obj_str = doc.xref_object(img_xref) w_m = re.search(r'/Width\s+(\d+)', obj_str) h_m = re.search(r'/Height\s+(\d+)', obj_str) w = int(w_m.group(1)) if w_m else 1 h = int(h_m.group(1)) if h_m else 1 cs_m = re.search(r'/ColorSpace\s+/Device(RGB|Gray|CMYK)', obj_str) channels = {'RGB': 3, 'CMYK': 4}.get(cs_m.group(1) if cs_m else '', 1) doc.xref_set_key(img_xref, "DecodeParms", "null") doc.update_stream(img_xref, bytes([255]) * (w * h * channels)) smask_m = re.search(r'/SMask\s+(\d+)\s+0\s+R', obj_str) if smask_m: smask_xref = int(smask_m.group(1)) smask_obj = doc.xref_object(smask_xref) sw = int(m.group(1)) if (m := re.search(r'/Width\s+(\d+)', smask_obj)) else w sh = int(m.group(1)) if (m := re.search(r'/Height\s+(\d+)', smask_obj)) else h doc.xref_set_key(smask_xref, "DecodeParms", "null") doc.update_stream(smask_xref, bytes([255]) * (sw * sh)) def scan_pdf_watermark_xobjs(pdf_bytes: bytes, sample_pages: int = 3) -> bool: """ 快速扫描 PDF 前 N 页,判断是否含水印 XObject。 无副作用(只读),用于在执行去水印前快速判断,避免对无水印的大文件 执行全量扫描和序列化,显著降低财报等大文件的处理开销。 Args: pdf_bytes: PDF 文件的原始字节。 sample_pages: 扫描页数上限,默认 3(银行流水通常前几页有水印)。 Returns: True 表示发现水印 XObject,False 表示未发现。 """ try: import fitz except ImportError: return False doc = fitz.open(stream=pdf_bytes, filetype="pdf") pages_to_check = min(sample_pages, len(doc)) try: for i in range(pages_to_check): page = doc[i] for xref, *_ in page.get_xobjects(): try: obj_str = doc.xref_object(xref) except Exception: continue if _is_watermark_xobj(doc, xref, obj_str): return True for img_tuple in page.get_images(full=True): try: obj_str = doc.xref_object(img_tuple[0]) except Exception: continue if _is_watermark_image_xobj(doc, img_tuple[0], obj_str): return True finally: doc.close() return False def remove_txt_pdf_watermark(pdf_bytes: bytes) -> Optional[bytes]: """ 对文字型 PDF 执行原生水印去除,完全在内存中完成,不写临时文件。 支持两种水印形式: - Form XObject 水印:清空内容流 - Image XObject 水印(全页背景图 + SMask 透明通道):替换为全白像素 适用场景:pdf_type='txt' 的 PDF,去除后可直接传给渲染层(tobytes() → bytes)。 对于大文件(如财报),建议先用 scan_pdf_watermark_xobjs() 快速判断再调用本函数。 Args: pdf_bytes: 原始 PDF 的字节内容。 Returns: 去除水印后的 PDF bytes(garbage=4 压缩);若未发现水印返回 None。 """ try: import fitz except ImportError: raise ImportError("请安装 PyMuPDF: pip install PyMuPDF") from loguru import logger doc = fitz.open(stream=pdf_bytes, filetype="pdf") processed_xrefs: set[int] = set() total_removed = 0 for page in doc: # ── Form XObject 水印 ───────────────────────────────────────── for xref, name, _invoker, _unused in page.get_xobjects(): if xref in processed_xrefs: continue try: obj_str = doc.xref_object(xref) except Exception: continue if _is_watermark_xobj(doc, xref, obj_str): try: doc.update_stream(xref, b"") processed_xrefs.add(xref) total_removed += 1 logger.debug(f" [Form XObject] 清空水印 xref={xref}, name={name}") except Exception as e: logger.warning(f" 清空 Form XObject xref={xref} 失败: {e}") # ── Image XObject 水印 ──────────────────────────────────────── for img_tuple in page.get_images(full=True): img_xref = img_tuple[0] if img_xref in processed_xrefs: continue try: obj_str = doc.xref_object(img_xref) except Exception: continue if _is_watermark_image_xobj(doc, img_xref, obj_str): _blank_watermark_image(doc, img_xref) processed_xrefs.add(img_xref) total_removed += 1 logger.debug(f" [Image XObject] 替换水印图像 xref={img_xref}") if total_removed == 0: doc.close() return None result = doc.tobytes(garbage=4, deflate=True) doc.close() logger.info(f"✅ PDF 层级水印去除:共清除 {total_removed} 个水印 XObject") return result