| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095 |
- """水印 掩膜与去水印算法(由 ocr_utils.watermark_utils 迁入)。"""
- from __future__ import annotations
- import json
- import re
- from pathlib import Path
- from typing import Any, Dict, Optional, Tuple, Union
- import cv2
- import numpy as np
- from loguru import logger
- from PIL import Image
- def detect_watermark(
- image: Union[np.ndarray, Image.Image],
- midtone_low: int = 100,
- midtone_high: int = 220,
- ratio_threshold: float = 0.03,
- check_diagonal: bool = True,
- diagonal_angle_range: tuple = (30, 60),
- ) -> bool:
- """
- 检测图像中是否存在浅色斜向文字水印(银行流水类文档水印检测)。
- 原理:
- 1. 将图像转为灰度,提取「中间调」像素(midtone_low ~ midtone_high),
- 这些像素既不是纯白背景,也不是深黑正文,是浅灰水印的典型范围。
- 2. 若中间调像素占比超过 ratio_threshold,初步判定存在水印。
- 3. 若 check_diagonal=True,进一步用 Hough 直线变换验证中间调区域
- 是否呈现斜向(diagonal_angle_range 度)纹理,以排除灰色背景误报。
- Args:
- image: 输入图像,支持 PIL.Image 或 np.ndarray(BGR/RGB/灰度)。
- midtone_low: 中间调下限(默认 100),低于此视为深色正文。
- midtone_high: 中间调上限(默认 220),高于此视为纯白背景。
- ratio_threshold: 中间调像素占全图比例阈值(默认 0.03 即 3%)。
- check_diagonal: 是否进行斜向纹理验证(默认 True)。
- diagonal_angle_range: 斜向角度范围(度),默认 (30, 60),含 45° 斜水印。
- Returns:
- True 表示检测到水印,False 表示未检测到。
- """
- if isinstance(image, Image.Image):
- pil_img = image.convert('RGB') if image.mode == 'RGBA' else image
- np_img = np.array(pil_img)
- gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY) if np_img.ndim == 3 else np_img
- else:
- np_img = image
- gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img
- midtone_mask = (gray > midtone_low) & (gray < midtone_high)
- ratio = midtone_mask.sum() / gray.size
- if ratio < ratio_threshold:
- return False
- if not check_diagonal:
- return True
- midtone_uint8 = (midtone_mask.astype(np.uint8)) * 255
- edges = cv2.Canny(midtone_uint8, 50, 150, apertureSize=3)
- lines = cv2.HoughLines(edges, rho=1, theta=np.pi / 180, threshold=80)
- if lines is None:
- return False
- low_rad = np.deg2rad(diagonal_angle_range[0])
- high_rad = np.deg2rad(diagonal_angle_range[1])
- diagonal_count = 0
- for line in lines:
- theta = line[0][1]
- if low_rad <= theta <= high_rad or (np.pi - high_rad) <= theta <= (np.pi - low_rad):
- diagonal_count += 1
- return diagonal_count > 0
- def _local_std_map(gray: np.ndarray, window: int = 5) -> np.ndarray:
- """局部标准差图(返回值与输入同形状)。"""
- gray = np.asarray(gray, dtype=np.float32)
- size = max(3, int(window))
- kernel = np.ones((size, size), dtype=np.float32) / (size * size)
- mean = cv2.filter2D(gray, -1, kernel)
- sq_mean = cv2.filter2D(gray * gray, -1, kernel)
- var = sq_mean - mean * mean
- var = np.maximum(var, 0)
- return np.sqrt(var)
- def _line_structuring_kernel(length: int, angle_deg: float) -> np.ndarray:
- """生成指定角度、长度的线形结构元(用于斜向水印形态学)。"""
- length = max(3, int(length))
- k = np.zeros((length, length), np.uint8)
- c = length // 2
- rad = np.deg2rad(angle_deg)
- dx = int(round(np.cos(rad) * (c - 1)))
- dy = int(round(np.sin(rad) * (c - 1)))
- cv2.line(k, (c - dx, c - dy), (c + dx, c + dy), 1, thickness=1)
- return k
- def _line_angle_deg(x1: int, y1: int, x2: int, y2: int) -> float:
- """线段方向角 [0, 180)(无向)。"""
- ang = float(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
- if ang < 0:
- ang += 180.0
- return ang
- def _angle_in_diagonal_ranges(
- angle_deg: float,
- ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((35.0, 55.0), (125.0, 145.0)),
- ) -> bool:
- for lo, hi in ranges:
- if lo <= angle_deg <= hi:
- return True
- return False
- def _angle_distance_deg(a: float, b: float) -> float:
- """无向角距离 [0, 90]。"""
- d = abs(float(a) - float(b)) % 180.0
- return min(d, 180.0 - d)
- def _line_length(x1: int, y1: int, x2: int, y2: int) -> float:
- return float(np.hypot(x2 - x1, y2 - y1))
- def _find_dominant_diagonal_angles(
- segments: list,
- *,
- angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)),
- smooth_sigma: float = 2.0,
- secondary_peak_ratio: float = 0.35,
- ) -> Tuple[list, np.ndarray]:
- """
- 按线段长度加权统计角度直方图,取主峰(及次峰)作为本页水印固定方向。
- Returns:
- dominant_angles: 1~2 个主导角度(度)
- hist_smooth: 长度 180 的平滑直方图
- """
- hist = np.zeros(180, dtype=np.float64)
- for x1, y1, x2, y2, ang, length in segments:
- if not _angle_in_diagonal_ranges(ang, angle_ranges):
- continue
- hist[int(ang) % 180] += length
- if hist.sum() <= 0:
- return [], hist
- ksize = max(3, int(smooth_sigma * 4) | 1)
- hist_smooth = cv2.GaussianBlur(
- hist.reshape(1, 180).astype(np.float32), (ksize, 1), smooth_sigma
- ).flatten().astype(np.float64)
- peaks: list = []
- for lo, hi in angle_ranges:
- lo_i, hi_i = int(lo), int(hi)
- sub = hist_smooth[lo_i : hi_i + 1]
- if sub.size == 0 or sub.max() <= 0:
- continue
- peak_ang = lo_i + int(sub.argmax())
- peaks.append((peak_ang, float(sub.max())))
- if not peaks:
- return [], hist_smooth
- peaks.sort(key=lambda x: -x[1])
- dominant: list = [peaks[0][0]]
- for ang, val in peaks[1:]:
- if val >= peaks[0][1] * secondary_peak_ratio:
- if all(_angle_distance_deg(ang, d) > 15 for d in dominant):
- dominant.append(ang)
- return dominant, hist_smooth
- def _render_angle_histogram(hist: np.ndarray, dominant_angles: list) -> np.ndarray:
- """角度直方图 debug 图(BGR)。"""
- h_img, w_img = 120, 360
- canvas = np.ones((h_img, w_img, 3), dtype=np.uint8) * 255
- if hist.max() <= 0:
- return canvas
- norm = (hist / hist.max() * (h_img - 20)).astype(np.int32)
- for i, h in enumerate(norm):
- x = int(i * (w_img - 1) / 179)
- cv2.line(canvas, (x, h_img - 10), (x, h_img - 10 - int(h)), (180, 180, 180), 1)
- for ang in dominant_angles:
- x = int(ang * (w_img - 1) / 179)
- cv2.line(canvas, (x, 0), (x, h_img - 1), (0, 0, 255), 2)
- cv2.putText(canvas, "angle (deg)", (w_img // 2 - 40, h_img - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 0), 1)
- return canvas
- def _build_diag_hough_region_mask(
- gray: np.ndarray,
- *,
- midtone_low: int = 200,
- midtone_high: int = 254,
- canny_low: int = 30,
- canny_high: int = 100,
- hough_threshold: int = 30,
- min_line_length: int = 40,
- max_line_gap: int = 15,
- angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)),
- angle_tolerance: float = 5.0,
- use_angle_statistics: bool = True,
- secondary_peak_ratio: float = 0.35,
- min_length_percentile: float = 25.0,
- line_thickness: int = 10,
- band_dilate_radius: int = 12,
- ) -> Tuple[np.ndarray, Dict[str, Any]]:
- """
- 方案 C:Canny + HoughLinesP + 角度直方图统计主峰,仅保留与本页水印方向一致的线段。
- """
- gray_u8 = np.asarray(gray, dtype=np.uint8)
- band = ((gray_u8 >= midtone_low) & (gray_u8 < midtone_high)).astype(np.uint8) * 255
- edges = cv2.Canny(band, int(canny_low), int(canny_high), apertureSize=3)
- lines_p = cv2.HoughLinesP(
- edges,
- rho=1,
- theta=np.pi / 180,
- threshold=int(hough_threshold),
- minLineLength=int(min_line_length),
- maxLineGap=int(max_line_gap),
- )
- line_mask = np.zeros_like(gray_u8, dtype=np.uint8)
- lines_all_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR)
- lines_filt_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR)
- diag_candidates: list = []
- total_lines = 0
- if lines_p is not None:
- for seg in lines_p:
- x1, y1, x2, y2 = [int(v) for v in seg[0]]
- total_lines += 1
- ang = _line_angle_deg(x1, y1, x2, y2)
- length = _line_length(x1, y1, x2, y2)
- if not _angle_in_diagonal_ranges(ang, angle_ranges):
- continue
- diag_candidates.append((x1, y1, x2, y2, ang, length))
- cv2.line(lines_all_bgr, (x1, y1), (x2, y2), (128, 128, 128), 1)
- dominant_angles: list = []
- hist_smooth = np.zeros(180, dtype=np.float64)
- if use_angle_statistics and diag_candidates:
- dominant_angles, hist_smooth = _find_dominant_diagonal_angles(
- diag_candidates,
- angle_ranges=angle_ranges,
- secondary_peak_ratio=secondary_peak_ratio,
- )
- def _angle_matches(ang: float) -> bool:
- if not use_angle_statistics or not dominant_angles:
- return True
- return any(_angle_distance_deg(ang, d) <= angle_tolerance for d in dominant_angles)
- angle_matched = [
- s for s in diag_candidates if _angle_matches(s[4])
- ]
- if angle_matched and min_length_percentile > 0:
- lengths = np.array([s[5] for s in angle_matched], dtype=np.float32)
- len_th = float(np.percentile(lengths, min_length_percentile))
- angle_matched = [s for s in angle_matched if s[5] >= len_th]
- matched_keys = {(s[0], s[1], s[2], s[3]) for s in angle_matched}
- kept_lines: list = []
- for x1, y1, x2, y2, ang, _length in angle_matched:
- kept_lines.append((x1, y1, x2, y2, ang))
- cv2.line(line_mask, (x1, y1), (x2, y2), 255, thickness=int(line_thickness))
- cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 0, 255), 2)
- for x1, y1, x2, y2, _ang, _length in diag_candidates:
- if (x1, y1, x2, y2) not in matched_keys:
- cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 180, 255), 1)
- geom = line_mask > 0
- if band_dilate_radius > 0 and np.any(geom):
- k = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (band_dilate_radius * 2 + 1, band_dilate_radius * 2 + 1)
- )
- geom = cv2.dilate(line_mask, k) > 0
- info: Dict[str, Any] = {
- "hough_total_lines": total_lines,
- "hough_diag_candidates": len(diag_candidates),
- "hough_kept_lines": len(kept_lines),
- "dominant_angles": dominant_angles,
- "angle_tolerance": angle_tolerance,
- "geom_mask_ratio": float(geom.sum() / gray_u8.size),
- "hough_lines_bgr": lines_filt_bgr,
- "hough_lines_all_bgr": lines_all_bgr,
- "angle_histogram_bgr": _render_angle_histogram(hist_smooth, dominant_angles),
- }
- return geom, info
- def _compute_block_orientation_debug_maps(
- gray: np.ndarray,
- *,
- block_size: int = 48,
- ) -> Tuple[np.ndarray, np.ndarray]:
- """分块 diag/hv 弱边缘占比图(仅 debug 热力图,0~1 float)。"""
- gray_f = np.asarray(gray, dtype=np.float32)
- bs = max(4, int(block_size))
- h_blocks = gray_f.shape[0] // bs
- w_blocks = gray_f.shape[1] // bs
- if h_blocks == 0 or w_blocks == 0:
- z = np.zeros_like(gray_f, dtype=np.float32)
- return z, z
- ph, pw = h_blocks * bs, w_blocks * bs
- gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3)
- gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3)
- mag = np.sqrt(gx * gx + gy * gy)
- ori = np.arctan2(gy, gx) * 180.0 / np.pi
- diag = (
- ((ori > 25) & (ori < 65))
- | ((ori > 115) & (ori < 155))
- | ((ori > -155) & (ori < -115))
- | ((ori > -65) & (ori < -25))
- )
- hv = (
- ((ori > -20) & (ori < 20))
- | ((ori > 160) | (ori < -160))
- | ((ori > 70) & (ori < 110))
- | ((ori > -110) & (ori < -70))
- )
- weak = (mag > 1) & (mag < 15)
- def _to_blocks(arr: np.ndarray) -> np.ndarray:
- return (
- arr[:ph, :pw]
- .reshape(h_blocks, bs, w_blocks, bs)
- .transpose(0, 2, 1, 3)
- .reshape(h_blocks, w_blocks, -1)
- )
- b_diag = _to_blocks(diag)
- b_hv = _to_blocks(hv)
- b_weak = _to_blocks(weak)
- diag_weak = np.sum(b_diag & b_weak, axis=2)
- hv_weak = np.sum(b_hv & b_weak, axis=2)
- total_weak = np.sum(b_weak, axis=2)
- with np.errstate(divide="ignore", invalid="ignore"):
- diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0).astype(np.float32)
- hv_ratio = np.where(total_weak > 0, hv_weak / total_weak, 0.0).astype(np.float32)
- diag_up = np.repeat(np.repeat(diag_ratio, bs, axis=0), bs, axis=1)
- hv_up = np.repeat(np.repeat(hv_ratio, bs, axis=0), bs, axis=1)
- diag_full = np.zeros_like(gray_f, dtype=np.float32)
- hv_full = np.zeros_like(gray_f, dtype=np.float32)
- diag_full[:ph, :pw] = diag_up
- hv_full[:ph, :pw] = hv_up
- return diag_full, hv_full
- def render_ratio_heatmap(ratio_map: np.ndarray) -> np.ndarray:
- """将 0~1 浮点占比图转为 BGR 热力图。"""
- r = np.clip(np.asarray(ratio_map, dtype=np.float32), 0.0, 1.0)
- u8 = (r * 255).astype(np.uint8)
- return cv2.applyColorMap(u8, cv2.COLORMAP_JET)
- def save_watermark_mask_debug_layers(
- image: np.ndarray,
- output_dir: Union[str, Path],
- stem: str,
- debug: Dict[str, Any],
- *,
- image_format: str = "png",
- ) -> Dict[str, str]:
- """保存分层 debug 图(方案 D)。"""
- out_dir = Path(output_dir)
- out_dir.mkdir(parents=True, exist_ok=True)
- fmt = (image_format or "png").lstrip(".")
- paths: Dict[str, str] = {}
- def _save_overlay(name: str, mask: Optional[np.ndarray], color=(0, 0, 255)) -> None:
- if mask is None or not np.any(mask):
- return
- from ocr_utils.watermark.removal import render_watermark_mask_overlay
- ov = render_watermark_mask_overlay(image, mask, color=color)
- p = out_dir / f"{stem}_{name}.{fmt}"
- cv2.imwrite(str(p), cv2.cvtColor(ov, cv2.COLOR_RGB2BGR) if ov.shape[2] == 3 else ov)
- paths[name] = str(p)
- _save_overlay("wm_candidate_overlay", debug.get("wm_candidate"))
- _save_overlay("geom_region_overlay", debug.get("geom_region"), color=(0, 180, 255))
- _save_overlay("geom_candidate_overlay", debug.get("geom_candidate"), color=(0, 255, 0))
- _save_overlay("wm_mask_overlay", debug.get("wm_mask"), color=(255, 0, 0))
- hough_bgr = debug.get("hough_lines_bgr")
- if hough_bgr is not None:
- p = out_dir / f"{stem}_hough_lines.{fmt}"
- cv2.imwrite(str(p), hough_bgr)
- paths["hough_lines"] = str(p)
- hough_all = debug.get("hough_lines_all_bgr")
- if hough_all is not None:
- p = out_dir / f"{stem}_hough_lines_all.{fmt}"
- cv2.imwrite(str(p), hough_all)
- paths["hough_lines_all"] = str(p)
- angle_hist = debug.get("angle_histogram_bgr")
- if angle_hist is not None:
- p = out_dir / f"{stem}_angle_histogram.{fmt}"
- cv2.imwrite(str(p), angle_hist)
- paths["angle_histogram"] = str(p)
- diag_hm = debug.get("diag_ratio_heatmap")
- if diag_hm is not None:
- p = out_dir / f"{stem}_diag_ratio_heatmap.{fmt}"
- cv2.imwrite(str(p), diag_hm)
- paths["diag_ratio_heatmap"] = str(p)
- hv_hm = debug.get("hv_ratio_heatmap")
- if hv_hm is not None:
- p = out_dir / f"{stem}_hv_ratio_heatmap.{fmt}"
- cv2.imwrite(str(p), hv_hm)
- paths["hv_ratio_heatmap"] = str(p)
- return paths
- def _build_diag_region_mask(
- gray: np.ndarray,
- *,
- block_size: int = 48,
- diag_ratio_thresh: float = 0.20,
- light_gray_thresh: int = 238,
- light_ratio_thresh: float = 0.10,
- min_edge_count: int = 10,
- dilate_radius: int = 3,
- ) -> np.ndarray:
- """
- 分块梯度方向检测:返回对角线方向纹理占优的区域掩膜。
- 原理:水印是45°斜向字符,其梯度主方向在30-60°和120-150°。
- 分块统计该方向弱边缘占比,高频块标记为水印候选区域。
- Returns:
- bool ndarray, 与 gray 同形状,True=疑似斜向水印区域。
- """
- gray_f = np.asarray(gray, dtype=np.float32)
- img_h, img_w = gray_f.shape
- bs = max(4, int(block_size))
- # Sobel 梯度
- gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3)
- gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3)
- mag = np.sqrt(gx * gx + gy * gy)
- ori = np.arctan2(gy, gx) * 180.0 / np.pi
- # 对角线方向 (±45° 附近,即梯度 30-65° / 115-155°)
- diag = (
- ((ori > 25) & (ori < 65))
- | ((ori > 115) & (ori < 155))
- | ((ori > -155) & (ori < -115))
- | ((ori > -65) & (ori < -25))
- )
- h_blocks = img_h // bs
- w_blocks = img_w // bs
- if h_blocks == 0 or w_blocks == 0:
- return np.zeros_like(gray, dtype=bool)
- ph, pw = h_blocks * bs, w_blocks * bs
- # 分块统计
- def _to_blocks(arr: np.ndarray) -> np.ndarray:
- return arr[:ph, :pw].reshape(h_blocks, bs, w_blocks, bs).transpose(0, 2, 1, 3).reshape(h_blocks, w_blocks, -1)
- block_mag = _to_blocks(mag)
- block_diag = _to_blocks(diag)
- block_gray = _to_blocks(gray_f)
- weak = (block_mag > 1) & (block_mag < 15)
- diag_weak = np.sum(block_diag & weak, axis=2)
- total_weak = np.sum(weak, axis=2)
- with np.errstate(divide="ignore", invalid="ignore"):
- diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0)
- light_ratio = np.mean(block_gray >= light_gray_thresh, axis=2)
- wm_blocks = (
- (diag_ratio > diag_ratio_thresh)
- & (light_ratio > light_ratio_thresh)
- & (total_weak > min_edge_count)
- )
- # 展开为像素掩膜
- wm_block_mask = np.repeat(np.repeat(wm_blocks, bs, axis=0), bs, axis=1)
- full_mask = np.zeros(gray_f.shape, dtype=bool)
- full_mask[:ph, :pw] = wm_block_mask
- if dilate_radius > 0:
- k = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1)
- )
- full_mask = cv2.dilate(full_mask.astype(np.uint8), k) > 0
- return full_mask
- def _build_seal_protect_mask(
- bgr: np.ndarray,
- *,
- hue_high: int = 15,
- sat_min: int = 40,
- value_min: int = 30,
- ) -> np.ndarray:
- """红色/公章区域保护掩膜(True=保护,不置白)。"""
- hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV)
- lower1 = np.array([0, sat_min, value_min], dtype=np.uint8)
- upper1 = np.array([hue_high, 255, 255], dtype=np.uint8)
- lower2 = np.array([170, sat_min, value_min], dtype=np.uint8)
- upper2 = np.array([180, 255, 255], dtype=np.uint8)
- m1 = cv2.inRange(hsv, lower1, upper1)
- m2 = cv2.inRange(hsv, lower2, upper2)
- m2 = cv2.inRange(hsv, lower2, upper2)
- return (m1 > 0) | (m2 > 0)
- def _build_text_edge_protect(
- gray: np.ndarray,
- *,
- edge_window: int = 5,
- edge_std_thresh: float = 6.0,
- dilate_radius: int = 1,
- ) -> np.ndarray:
- """基于局部方差的笔画边缘保护掩膜(True=保护,不置白)。"""
- local_std = _local_std_map(gray, window=edge_window)
- edge_mask = local_std >= edge_std_thresh
- if dilate_radius > 0:
- k = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1)
- )
- edge_mask = cv2.dilate(edge_mask.astype(np.uint8), k) > 0
- return edge_mask.astype(bool)
- def _build_watermark_mask_light_on_white(
- gray: np.ndarray,
- *,
- bgr: Optional[np.ndarray] = None,
- light_gray_low: int = 236,
- light_gray_high: int = 253,
- whiten_gray_low: int = 200,
- text_protect_gray_max: int = 130,
- text_protect_percentile: Optional[float] = None,
- background_threshold: int = 248,
- morph_close_kernel: int = 0,
- morph_close_iter: int = 1,
- morph_dilate_kernel: int = 0,
- morph_dilate_iter: int = 1,
- min_component_area: int = 200,
- low_variance_thresh: float = 0.0,
- edge_window: int = 5,
- direction_filter: str = "hough",
- debug_block_maps: bool = True,
- debug_block_size: int = 48,
- hough_midtone_low: int = 200,
- hough_midtone_high: int = 254,
- hough_canny_low: int = 30,
- hough_canny_high: int = 100,
- hough_threshold: int = 25,
- hough_min_line_length: int = 35,
- hough_max_line_gap: int = 18,
- hough_line_thickness: int = 12,
- hough_band_dilate_radius: int = 14,
- hough_angle_tolerance: float = 5.0,
- hough_use_angle_statistics: bool = True,
- hough_secondary_peak_ratio: float = 0.35,
- hough_min_length_percentile: float = 25.0,
- diag_block_size: int = 0,
- diag_ratio_thresh: float = 0.20,
- diag_light_ratio_thresh: float = 0.10,
- diag_min_edge_count: int = 10,
- diag_dilate_radius: int = 3,
- seal_protect: bool = True,
- seal_hue_high: int = 15,
- seal_sat_min: int = 40,
- ) -> Tuple[np.ndarray, Dict[str, Any]]:
- """
- 白底流水水印掩膜(方案 C + E)。
- 1. Hough 斜向线段 → geom_region(几何限定区域)
- 2. wm_candidate = 浅色带且非正文保护
- 3. wm_mask = geom_region(置白区域由几何约束;实际白化时再 g>=light_gray_low)
- 4. debug 输出 candidate / geom / 交集 / 热力图
- """
- gray_arr = np.asarray(gray)
- bg_th = int(background_threshold)
- low = int(light_gray_low)
- high = int(light_gray_high)
- if text_protect_gray_max > 0:
- t_protect = float(text_protect_gray_max)
- else:
- dark = gray_arr[gray_arr < min(130, bg_th)]
- if dark.size > 0 and text_protect_percentile is not None:
- t_protect = float(np.percentile(dark, text_protect_percentile))
- else:
- t_protect = 120.0
- text_protect = gray_arr <= t_protect
- low = max(low, int(t_protect) + 25)
- wm_candidate = (gray_arr >= low) & (gray_arr < high) & (~text_protect)
- direction = (direction_filter or "hough").lower().strip()
- hough_info: Dict[str, Any] = {}
- geom_region = np.zeros_like(gray_arr, dtype=bool)
- if direction == "hough":
- geom_region, hough_info = _build_diag_hough_region_mask(
- gray_arr,
- midtone_low=hough_midtone_low,
- midtone_high=hough_midtone_high,
- canny_low=hough_canny_low,
- canny_high=hough_canny_high,
- hough_threshold=hough_threshold,
- min_line_length=hough_min_line_length,
- max_line_gap=hough_max_line_gap,
- angle_tolerance=hough_angle_tolerance,
- use_angle_statistics=hough_use_angle_statistics,
- secondary_peak_ratio=hough_secondary_peak_ratio,
- min_length_percentile=hough_min_length_percentile,
- line_thickness=hough_line_thickness,
- band_dilate_radius=hough_band_dilate_radius,
- )
- elif diag_block_size > 0:
- geom_region = _build_diag_region_mask(
- gray_arr,
- block_size=diag_block_size,
- diag_ratio_thresh=diag_ratio_thresh,
- light_gray_thresh=low,
- light_ratio_thresh=diag_light_ratio_thresh,
- min_edge_count=diag_min_edge_count,
- dilate_radius=diag_dilate_radius,
- )
- geom_candidate = geom_region & wm_candidate
- wm_mask = geom_region.copy()
- if min_component_area > 0 and np.any(wm_mask):
- n_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
- wm_mask.astype(np.uint8), connectivity=8
- )
- filtered = np.zeros_like(wm_mask)
- for i in range(1, n_labels):
- if stats[i, cv2.CC_STAT_AREA] >= min_component_area:
- filtered[labels == i] = True
- if np.any(filtered):
- wm_mask = filtered
- elif np.any(geom_region):
- wm_mask = geom_region
- seal_mask = np.zeros_like(wm_mask, dtype=bool)
- if seal_protect and bgr is not None and bgr.ndim == 3:
- seal_mask = _build_seal_protect_mask(
- bgr, hue_high=seal_hue_high, sat_min=seal_sat_min
- )
- wm_mask &= ~seal_mask
- midtone = (gray_arr >= low) & (gray_arr < high)
- debug: Dict[str, Any] = {
- "mask_mode": "light_on_white",
- "direction_filter": direction,
- "light_gray_low": low,
- "light_gray_high": high,
- "midtone_ratio": float(midtone.sum() / gray_arr.size),
- "wm_candidate_ratio": float(wm_candidate.sum() / gray_arr.size),
- "geom_mask_ratio": float(geom_region.sum() / gray_arr.size),
- "geom_candidate_ratio": float(geom_candidate.sum() / gray_arr.size),
- "wm_mask_ratio": float(wm_mask.sum() / gray_arr.size),
- "T_protect": t_protect,
- "text_protect_gray_max": text_protect_gray_max,
- "text_protect": text_protect,
- "seal_protect": seal_mask,
- "wm_candidate": wm_candidate,
- "geom_region": geom_region,
- "geom_candidate": geom_candidate,
- "diag_region": geom_region,
- "wm_mask": wm_mask,
- "whiten_gray_low": int(whiten_gray_low),
- "hough_lines_bgr": hough_info.get("hough_lines_bgr"),
- "hough_lines_all_bgr": hough_info.get("hough_lines_all_bgr"),
- "angle_histogram_bgr": hough_info.get("angle_histogram_bgr"),
- "dominant_angles": hough_info.get("dominant_angles", []),
- "hough_kept_lines": hough_info.get("hough_kept_lines", 0),
- "hough_diag_candidates": hough_info.get("hough_diag_candidates", 0),
- "hough_total_lines": hough_info.get("hough_total_lines", 0),
- }
- if debug_block_maps:
- bs = debug_block_size if debug_block_size > 0 else 48
- diag_map, hv_map = _compute_block_orientation_debug_maps(gray_arr, block_size=bs)
- debug["diag_ratio_heatmap"] = render_ratio_heatmap(diag_map)
- debug["hv_ratio_heatmap"] = render_ratio_heatmap(hv_map)
- return wm_mask, debug
- def build_watermark_mask(
- gray: np.ndarray,
- *,
- bgr: Optional[np.ndarray] = None,
- mask_mode: str = "diagonal_midtone",
- light_gray_low: int = 236,
- light_gray_high: int = 253,
- whiten_gray_low: int = 200,
- text_protect_gray_max: int = 130,
- morph_close_kernel: int = 0,
- morph_close_iter: int = 1,
- morph_dilate_kernel: int = 0,
- morph_dilate_iter: int = 1,
- low_variance_thresh: float = 0.0,
- edge_window: int = 5,
- direction_filter: str = "hough",
- debug_block_maps: bool = True,
- debug_block_size: int = 48,
- hough_midtone_low: int = 200,
- hough_midtone_high: int = 254,
- hough_canny_low: int = 30,
- hough_canny_high: int = 100,
- hough_threshold: int = 25,
- hough_min_line_length: int = 35,
- hough_max_line_gap: int = 18,
- hough_line_thickness: int = 12,
- hough_band_dilate_radius: int = 14,
- hough_angle_tolerance: float = 5.0,
- hough_use_angle_statistics: bool = True,
- hough_secondary_peak_ratio: float = 0.35,
- hough_min_length_percentile: float = 25.0,
- diag_block_size: int = 0,
- diag_ratio_thresh: float = 0.20,
- diag_light_ratio_thresh: float = 0.10,
- diag_min_edge_count: int = 10,
- diag_dilate_radius: int = 3,
- # diagonal_midtone 参数
- midtone_low: int = 100,
- midtone_high: int = 220,
- remove_horizontal_vertical: bool = True,
- diagonal_enhance: bool = True,
- diagonal_kernel_length: int = 25,
- horizontal_kernel_length: int = 35,
- vertical_kernel_length: int = 35,
- morph_open_kernel: int = 2,
- dmorph_close_kernel: int = 3,
- min_component_area: int = 200,
- text_protect_percentile: float = 10.0,
- background_threshold: int = 248,
- seal_protect: bool = True,
- seal_hue_high: int = 15,
- seal_sat_min: int = 40,
- ) -> Tuple[np.ndarray, Dict[str, Any]]:
- """
- 构建水印掩膜 wm_mask(True=疑似水印像素)。
- mask_mode:
- light_on_white — Hough 斜向几何带 + 浅色白化(方案 C/E)
- diagonal_midtone — 中间调 + 斜向形态学(旧逻辑)
- """
- gray = np.asarray(gray)
- if gray.ndim != 2:
- raise ValueError("build_watermark_mask expects single-channel grayscale")
- mode = (mask_mode or "light_on_white").lower().strip()
- if mode == "light_on_white":
- return _build_watermark_mask_light_on_white(
- gray,
- bgr=bgr,
- light_gray_low=light_gray_low,
- light_gray_high=light_gray_high,
- whiten_gray_low=whiten_gray_low,
- text_protect_gray_max=text_protect_gray_max,
- text_protect_percentile=text_protect_percentile,
- background_threshold=background_threshold,
- morph_close_kernel=morph_close_kernel,
- morph_close_iter=morph_close_iter,
- morph_dilate_kernel=morph_dilate_kernel,
- morph_dilate_iter=morph_dilate_iter,
- low_variance_thresh=low_variance_thresh,
- edge_window=edge_window,
- min_component_area=min_component_area,
- direction_filter=direction_filter,
- debug_block_maps=debug_block_maps,
- debug_block_size=debug_block_size,
- hough_midtone_low=hough_midtone_low,
- hough_midtone_high=hough_midtone_high,
- hough_canny_low=hough_canny_low,
- hough_canny_high=hough_canny_high,
- hough_threshold=hough_threshold,
- hough_min_line_length=hough_min_line_length,
- hough_max_line_gap=hough_max_line_gap,
- hough_line_thickness=hough_line_thickness,
- hough_band_dilate_radius=hough_band_dilate_radius,
- hough_angle_tolerance=hough_angle_tolerance,
- hough_use_angle_statistics=hough_use_angle_statistics,
- hough_secondary_peak_ratio=hough_secondary_peak_ratio,
- hough_min_length_percentile=hough_min_length_percentile,
- diag_block_size=diag_block_size,
- diag_ratio_thresh=diag_ratio_thresh,
- diag_light_ratio_thresh=diag_light_ratio_thresh,
- diag_min_edge_count=diag_min_edge_count,
- diag_dilate_radius=diag_dilate_radius,
- seal_protect=seal_protect,
- seal_hue_high=seal_hue_high,
- seal_sat_min=seal_sat_min,
- )
- midtone = (gray > midtone_low) & (gray < midtone_high)
- mid_u8 = (midtone.astype(np.uint8)) * 255
- horiz = np.zeros_like(midtone, dtype=bool)
- vert = np.zeros_like(midtone, dtype=bool)
- if remove_horizontal_vertical:
- kh = cv2.getStructuringElement(
- cv2.MORPH_RECT, (max(3, horizontal_kernel_length), 1)
- )
- kv = cv2.getStructuringElement(
- cv2.MORPH_RECT, (1, max(3, vertical_kernel_length))
- )
- horiz = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kh) > 0
- vert = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kv) > 0
- # 中间调去掉明显横竖线(保留斜向水印)
- candidate = midtone & ~(horiz | vert)
- if diagonal_enhance:
- k45 = _line_structuring_kernel(diagonal_kernel_length, 45)
- k135 = _line_structuring_kernel(diagonal_kernel_length, 135)
- d45 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k45) > 0
- d135 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k135) > 0
- direction = d45 | d135
- dilate_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7))
- near_diag = cv2.dilate(direction.astype(np.uint8), dilate_k) > 0
- # 斜向结构足够时收窄到斜向附近;否则保留「中间调减横竖」结果
- if near_diag.sum() > gray.size * 0.001:
- candidate = candidate & near_diag
- cand_u8 = (candidate.astype(np.uint8)) * 255
- if morph_open_kernel > 0:
- k_open = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (morph_open_kernel, morph_open_kernel)
- )
- cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_OPEN, k_open)
- if dmorph_close_kernel > 0:
- k_close = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (dmorph_close_kernel, dmorph_close_kernel)
- )
- cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_CLOSE, k_close)
- wm_mask = cand_u8 > 0
- if min_component_area > 0:
- n_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
- wm_mask.astype(np.uint8), connectivity=8
- )
- filtered = np.zeros_like(wm_mask)
- for i in range(1, n_labels):
- if stats[i, cv2.CC_STAT_AREA] >= min_component_area:
- filtered[labels == i] = True
- wm_mask = filtered
- non_bg = gray[gray < background_threshold]
- if non_bg.size > 0:
- t_protect = float(np.percentile(non_bg, text_protect_percentile))
- else:
- t_protect = 85.0
- t_protect = max(t_protect, float(midtone_low))
- text_protect = gray <= t_protect
- midtone_ratio = float(midtone.sum() / gray.size)
- wm_ratio = float(wm_mask.sum() / gray.size)
- # 掩膜过小:回退为「中间调减横竖」或整块中间调(满版斜纹水印常见)
- min_wm_ratio = max(0.005, midtone_ratio * 0.12)
- if wm_ratio < min_wm_ratio:
- relaxed = midtone & ~(horiz | vert) & (~text_protect)
- if relaxed.sum() / gray.size < min_wm_ratio:
- relaxed = midtone & (~text_protect)
- wm_mask = relaxed
- wm_ratio = float(wm_mask.sum() / gray.size)
- seal_mask = np.zeros_like(wm_mask, dtype=bool)
- if seal_protect and bgr is not None and bgr.ndim == 3:
- seal_mask = _build_seal_protect_mask(
- bgr, hue_high=seal_hue_high, sat_min=seal_sat_min
- )
- debug: Dict[str, Any] = {
- "mask_mode": "diagonal_midtone",
- "midtone_ratio": midtone_ratio,
- "wm_mask_ratio": wm_ratio,
- "T_protect": t_protect,
- "text_protect": text_protect,
- "seal_protect": seal_mask,
- "midtone_mask": midtone,
- "wm_mask": wm_mask,
- }
- return wm_mask, debug
- def remove_watermark_masked_adaptive(
- gray: np.ndarray,
- *,
- bgr: Optional[np.ndarray] = None,
- mask_cfg: Optional[Dict[str, Any]] = None,
- adaptive_cfg: Optional[Dict[str, Any]] = None,
- threshold_fallback: int = 175,
- morph_close_kernel: int = 0,
- ) -> Tuple[np.ndarray, Dict[str, Any]]:
- """
- 掩膜内置白(whiten_mode=mask_fill)或掩膜内动态阈值(threshold_in_mask)。
- 掩膜为空时回退全局 threshold_fallback。
- """
- gray = np.asarray(gray).copy()
- mcfg: Dict[str, Any] = {
- "mask_mode": "light_on_white",
- "light_gray_low": 236,
- "light_gray_high": 253,
- "whiten_gray_low": 200,
- "text_protect_gray_max": 130,
- "morph_close_kernel": 0,
- "morph_close_iter": 1,
- "morph_dilate_kernel": 0,
- "morph_dilate_iter": 1,
- "low_variance_thresh": 0.0,
- "edge_window": 5,
- "min_component_area": 200,
- "direction_filter": "hough",
- "debug_block_maps": True,
- "debug_block_size": 48,
- "hough_midtone_low": 200,
- "hough_midtone_high": 254,
- "hough_canny_low": 30,
- "hough_canny_high": 100,
- "hough_threshold": 25,
- "hough_min_line_length": 35,
- "hough_max_line_gap": 18,
- "hough_line_thickness": 12,
- "hough_band_dilate_radius": 14,
- "hough_angle_tolerance": 5.0,
- "hough_use_angle_statistics": True,
- "hough_secondary_peak_ratio": 0.35,
- "hough_min_length_percentile": 25.0,
- "diag_block_size": 0,
- "diag_ratio_thresh": 0.20,
- "diag_light_ratio_thresh": 0.10,
- "diag_min_edge_count": 10,
- "diag_dilate_radius": 3,
- "midtone_low": 100,
- "midtone_high": 220,
- "remove_horizontal_vertical": True,
- "diagonal_enhance": True,
- "diagonal_kernel_length": 25,
- "horizontal_kernel_length": 35,
- "vertical_kernel_length": 35,
- "morph_open_kernel": 2,
- "dmorph_close_kernel": 3,
- "text_protect_percentile": 10.0,
- "background_threshold": 248,
- "seal_protect": True,
- "seal_hue_high": 15,
- "seal_sat_min": 40,
- }
- mcfg.update(mask_cfg or {})
- mask_mode = str(mcfg.get("mask_mode", "light_on_white")).lower().strip()
- # light_on_white 默认 mask_fill
- acfg: Dict[str, Any] = {
- "whiten_mode": None,
- "text_percentile": 10.0,
- "watermark_percentile": 88.0,
- "background_percentile": 95.0,
- "background_threshold": 248,
- "wm_margin": 12,
- "text_protect_max": 120,
- }
- acfg.update(adaptive_cfg or {})
- whiten_mode = acfg.get("whiten_mode")
- if not whiten_mode:
- whiten_mode = (
- "mask_fill"
- if mask_mode == "light_on_white"
- else "threshold_in_mask"
- )
- whiten_mode = str(whiten_mode).lower().strip()
- wm_mask, debug = build_watermark_mask(gray, bgr=bgr, **mcfg)
- if not np.any(wm_mask):
- cleaned = gray.copy()
- cleaned[gray > threshold_fallback] = 255
- debug["mode"] = "fallback_threshold"
- debug["threshold_fallback"] = threshold_fallback
- if morph_close_kernel > 0:
- kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8)
- cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel)
- return cleaned, debug
- bg_th = int(acfg["background_threshold"])
- bg_pixels = gray[gray >= bg_th]
- if bg_pixels.size > 0:
- b_level = float(np.percentile(bg_pixels, acfg["background_percentile"]))
- else:
- b_level = 250.0
- if mask_mode == "light_on_white":
- t_protect = float(debug.get("T_protect", 150.0))
- else:
- non_bg = gray[gray < bg_th]
- if non_bg.size > 0:
- t_protect = float(np.percentile(non_bg, acfg["text_percentile"]))
- else:
- t_protect = float(debug.get("T_protect", 85.0))
- t_protect = min(t_protect, float(acfg["text_protect_max"]))
- t_protect = max(t_protect, float(mcfg.get("midtone_low", 100)))
- text_protect = debug["text_protect"]
- seal_protect = debug["seal_protect"]
- t_wm: Optional[float] = None
- if whiten_mode == "mask_fill":
- # 几何带内:g>=whiten_gray_low 置白;g<=130 正文硬保护(方案 E)
- wm_gray_low = float(
- mcfg.get("whiten_gray_low", debug.get("whiten_gray_low", 200))
- )
- to_white = (
- wm_mask
- & (gray >= wm_gray_low)
- & (gray < int(mcfg.get("light_gray_high", 254)))
- & (~text_protect)
- & (~seal_protect)
- )
- else:
- mask_vals = gray[wm_mask]
- if mask_vals.size > 0:
- t_wm = float(np.percentile(mask_vals, acfg["watermark_percentile"]))
- else:
- t_wm = t_protect + 0.45 * (b_level - t_protect)
- margin = float(acfg["wm_margin"])
- t_wm = max(t_wm, t_protect + margin)
- t_wm = min(t_wm, b_level - 3.0)
- t_wm = min(t_wm, float(mcfg.get("midtone_high", 220)) - 5.0)
- to_white = wm_mask & (gray >= t_wm) & (~text_protect) & (~seal_protect)
- cleaned = gray.copy()
- cleaned[to_white] = 255
- if morph_close_kernel > 0:
- kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8)
- cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel)
- debug.update(
- {
- "mode": "masked_adaptive",
- "mask_mode": mask_mode,
- "whiten_mode": whiten_mode,
- "T_wm": t_wm,
- "T_protect": t_protect,
- "B_level": b_level,
- "white_pixel_ratio": float(to_white.sum() / gray.size),
- "threshold_fallback": threshold_fallback,
- }
- )
- return cleaned, debug
- def _image_to_gray_and_bgr(
- image: Union[np.ndarray, Image.Image],
- ) -> Tuple[np.ndarray, Optional[np.ndarray]]:
- """统一为灰度 + 可选 BGR(用于掩膜公章保护)。"""
- if isinstance(image, Image.Image):
- pil_img = image.convert("RGB") if image.mode == "RGBA" else image
- np_img = np.array(pil_img)
- np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR)
- else:
- np_img = image.copy()
- if np_img.ndim == 3:
- bgr = np_img
- gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY)
- else:
- bgr = None
- gray = np_img
- return gray, bgr
|