| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712 |
- """
- 水印处理工具模块
- 统一管理所有水印检测与去除能力,供整个平台复用:
- - 图像级(扫描 PDF / 图片):
- detect_watermark() 检测图像中的斜向文字水印
- build_watermark_mask() 构建斜向浅灰水印掩膜(方案 D)
- remove_watermark_masked_adaptive() 掩膜 + 动态阈值去水印
- remove_watermark_from_image() 去除水印,返回灰度图
- remove_watermark_from_image_rgb() 去除水印,返回 RGB 图(适合模型输入)
- enhance_document_contrast() 去水印后对比度/笔画深度恢复
- save_watermark_removal_debug() 保存去水印前后对比调试图
- - PDF 层级(文字型 PDF,保留可搜索性):
- scan_pdf_watermark_xobjs() 快速扫描 PDF 是否含水印 XObject(无副作用)
- remove_txt_pdf_watermark() 从内存 PDF bytes 去除水印,返回新 bytes 或 None
- """
- from __future__ import annotations
- import json
- import re
- from pathlib import Path
- from typing import Any, Dict, Optional, Tuple, Union
- import cv2
- import numpy as np
- from loguru import logger
- from PIL import Image
- # ─────────────────────────────────────────────────────────────────────────────
- # 图像级水印检测与去除
- # ─────────────────────────────────────────────────────────────────────────────
- def detect_watermark(
- image: Union[np.ndarray, Image.Image],
- midtone_low: int = 100,
- midtone_high: int = 220,
- ratio_threshold: float = 0.03,
- check_diagonal: bool = True,
- diagonal_angle_range: tuple = (30, 60),
- ) -> bool:
- """
- 检测图像中是否存在浅色斜向文字水印(银行流水类文档水印检测)。
- 原理:
- 1. 将图像转为灰度,提取「中间调」像素(midtone_low ~ midtone_high),
- 这些像素既不是纯白背景,也不是深黑正文,是浅灰水印的典型范围。
- 2. 若中间调像素占比超过 ratio_threshold,初步判定存在水印。
- 3. 若 check_diagonal=True,进一步用 Hough 直线变换验证中间调区域
- 是否呈现斜向(diagonal_angle_range 度)纹理,以排除灰色背景误报。
- Args:
- image: 输入图像,支持 PIL.Image 或 np.ndarray(BGR/RGB/灰度)。
- midtone_low: 中间调下限(默认 100),低于此视为深色正文。
- midtone_high: 中间调上限(默认 220),高于此视为纯白背景。
- ratio_threshold: 中间调像素占全图比例阈值(默认 0.03 即 3%)。
- check_diagonal: 是否进行斜向纹理验证(默认 True)。
- diagonal_angle_range: 斜向角度范围(度),默认 (30, 60),含 45° 斜水印。
- Returns:
- True 表示检测到水印,False 表示未检测到。
- """
- if isinstance(image, Image.Image):
- pil_img = image.convert('RGB') if image.mode == 'RGBA' else image
- np_img = np.array(pil_img)
- gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY) if np_img.ndim == 3 else np_img
- else:
- np_img = image
- gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img
- midtone_mask = (gray > midtone_low) & (gray < midtone_high)
- ratio = midtone_mask.sum() / gray.size
- if ratio < ratio_threshold:
- return False
- if not check_diagonal:
- return True
- midtone_uint8 = (midtone_mask.astype(np.uint8)) * 255
- edges = cv2.Canny(midtone_uint8, 50, 150, apertureSize=3)
- lines = cv2.HoughLines(edges, rho=1, theta=np.pi / 180, threshold=80)
- if lines is None:
- return False
- low_rad = np.deg2rad(diagonal_angle_range[0])
- high_rad = np.deg2rad(diagonal_angle_range[1])
- diagonal_count = 0
- for line in lines:
- theta = line[0][1]
- if low_rad <= theta <= high_rad or (np.pi - high_rad) <= theta <= (np.pi - low_rad):
- diagonal_count += 1
- return True | False
- def _local_std_map(gray: np.ndarray, window: int = 5) -> np.ndarray:
- """局部标准差图(返回值与输入同形状)。"""
- gray = np.asarray(gray, dtype=np.float32)
- size = max(3, int(window))
- kernel = np.ones((size, size), dtype=np.float32) / (size * size)
- mean = cv2.filter2D(gray, -1, kernel)
- sq_mean = cv2.filter2D(gray * gray, -1, kernel)
- var = sq_mean - mean * mean
- var = np.maximum(var, 0)
- return np.sqrt(var)
- def _line_structuring_kernel(length: int, angle_deg: float) -> np.ndarray:
- """生成指定角度、长度的线形结构元(用于斜向水印形态学)。"""
- length = max(3, int(length))
- k = np.zeros((length, length), np.uint8)
- c = length // 2
- rad = np.deg2rad(angle_deg)
- dx = int(round(np.cos(rad) * (c - 1)))
- dy = int(round(np.sin(rad) * (c - 1)))
- cv2.line(k, (c - dx, c - dy), (c + dx, c + dy), 1, thickness=1)
- return k
- def _line_angle_deg(x1: int, y1: int, x2: int, y2: int) -> float:
- """线段方向角 [0, 180)(无向)。"""
- ang = float(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
- if ang < 0:
- ang += 180.0
- return ang
- def _angle_in_diagonal_ranges(
- angle_deg: float,
- ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((35.0, 55.0), (125.0, 145.0)),
- ) -> bool:
- for lo, hi in ranges:
- if lo <= angle_deg <= hi:
- return True
- return False
- def _angle_distance_deg(a: float, b: float) -> float:
- """无向角距离 [0, 90]。"""
- d = abs(float(a) - float(b)) % 180.0
- return min(d, 180.0 - d)
- def _line_length(x1: int, y1: int, x2: int, y2: int) -> float:
- return float(np.hypot(x2 - x1, y2 - y1))
- def _find_dominant_diagonal_angles(
- segments: list,
- *,
- angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)),
- smooth_sigma: float = 2.0,
- secondary_peak_ratio: float = 0.35,
- ) -> Tuple[list, np.ndarray]:
- """
- 按线段长度加权统计角度直方图,取主峰(及次峰)作为本页水印固定方向。
- Returns:
- dominant_angles: 1~2 个主导角度(度)
- hist_smooth: 长度 180 的平滑直方图
- """
- hist = np.zeros(180, dtype=np.float64)
- for x1, y1, x2, y2, ang, length in segments:
- if not _angle_in_diagonal_ranges(ang, angle_ranges):
- continue
- hist[int(ang) % 180] += length
- if hist.sum() <= 0:
- return [], hist
- ksize = max(3, int(smooth_sigma * 4) | 1)
- hist_smooth = cv2.GaussianBlur(
- hist.reshape(1, 180).astype(np.float32), (ksize, 1), smooth_sigma
- ).flatten().astype(np.float64)
- peaks: list = []
- for lo, hi in angle_ranges:
- lo_i, hi_i = int(lo), int(hi)
- sub = hist_smooth[lo_i : hi_i + 1]
- if sub.size == 0 or sub.max() <= 0:
- continue
- peak_ang = lo_i + int(sub.argmax())
- peaks.append((peak_ang, float(sub.max())))
- if not peaks:
- return [], hist_smooth
- peaks.sort(key=lambda x: -x[1])
- dominant: list = [peaks[0][0]]
- for ang, val in peaks[1:]:
- if val >= peaks[0][1] * secondary_peak_ratio:
- if all(_angle_distance_deg(ang, d) > 15 for d in dominant):
- dominant.append(ang)
- return dominant, hist_smooth
- def _render_angle_histogram(hist: np.ndarray, dominant_angles: list) -> np.ndarray:
- """角度直方图 debug 图(BGR)。"""
- h_img, w_img = 120, 360
- canvas = np.ones((h_img, w_img, 3), dtype=np.uint8) * 255
- if hist.max() <= 0:
- return canvas
- norm = (hist / hist.max() * (h_img - 20)).astype(np.int32)
- for i, h in enumerate(norm):
- x = int(i * (w_img - 1) / 179)
- cv2.line(canvas, (x, h_img - 10), (x, h_img - 10 - int(h)), (180, 180, 180), 1)
- for ang in dominant_angles:
- x = int(ang * (w_img - 1) / 179)
- cv2.line(canvas, (x, 0), (x, h_img - 1), (0, 0, 255), 2)
- cv2.putText(canvas, "angle (deg)", (w_img // 2 - 40, h_img - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 0), 1)
- return canvas
- def _build_diag_hough_region_mask(
- gray: np.ndarray,
- *,
- midtone_low: int = 200,
- midtone_high: int = 254,
- canny_low: int = 30,
- canny_high: int = 100,
- hough_threshold: int = 30,
- min_line_length: int = 40,
- max_line_gap: int = 15,
- angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)),
- angle_tolerance: float = 5.0,
- use_angle_statistics: bool = True,
- secondary_peak_ratio: float = 0.35,
- min_length_percentile: float = 25.0,
- line_thickness: int = 10,
- band_dilate_radius: int = 12,
- ) -> Tuple[np.ndarray, Dict[str, Any]]:
- """
- 方案 C:Canny + HoughLinesP + 角度直方图统计主峰,仅保留与本页水印方向一致的线段。
- """
- gray_u8 = np.asarray(gray, dtype=np.uint8)
- band = ((gray_u8 >= midtone_low) & (gray_u8 < midtone_high)).astype(np.uint8) * 255
- edges = cv2.Canny(band, int(canny_low), int(canny_high), apertureSize=3)
- lines_p = cv2.HoughLinesP(
- edges,
- rho=1,
- theta=np.pi / 180,
- threshold=int(hough_threshold),
- minLineLength=int(min_line_length),
- maxLineGap=int(max_line_gap),
- )
- line_mask = np.zeros_like(gray_u8, dtype=np.uint8)
- lines_all_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR)
- lines_filt_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR)
- diag_candidates: list = []
- total_lines = 0
- if lines_p is not None:
- for seg in lines_p:
- x1, y1, x2, y2 = [int(v) for v in seg[0]]
- total_lines += 1
- ang = _line_angle_deg(x1, y1, x2, y2)
- length = _line_length(x1, y1, x2, y2)
- if not _angle_in_diagonal_ranges(ang, angle_ranges):
- continue
- diag_candidates.append((x1, y1, x2, y2, ang, length))
- cv2.line(lines_all_bgr, (x1, y1), (x2, y2), (128, 128, 128), 1)
- dominant_angles: list = []
- hist_smooth = np.zeros(180, dtype=np.float64)
- if use_angle_statistics and diag_candidates:
- dominant_angles, hist_smooth = _find_dominant_diagonal_angles(
- diag_candidates,
- angle_ranges=angle_ranges,
- secondary_peak_ratio=secondary_peak_ratio,
- )
- def _angle_matches(ang: float) -> bool:
- if not use_angle_statistics or not dominant_angles:
- return True
- return any(_angle_distance_deg(ang, d) <= angle_tolerance for d in dominant_angles)
- angle_matched = [
- s for s in diag_candidates if _angle_matches(s[4])
- ]
- if angle_matched and min_length_percentile > 0:
- lengths = np.array([s[5] for s in angle_matched], dtype=np.float32)
- len_th = float(np.percentile(lengths, min_length_percentile))
- angle_matched = [s for s in angle_matched if s[5] >= len_th]
- matched_keys = {(s[0], s[1], s[2], s[3]) for s in angle_matched}
- kept_lines: list = []
- for x1, y1, x2, y2, ang, _length in angle_matched:
- kept_lines.append((x1, y1, x2, y2, ang))
- cv2.line(line_mask, (x1, y1), (x2, y2), 255, thickness=int(line_thickness))
- cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 0, 255), 2)
- for x1, y1, x2, y2, _ang, _length in diag_candidates:
- if (x1, y1, x2, y2) not in matched_keys:
- cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 180, 255), 1)
- geom = line_mask > 0
- if band_dilate_radius > 0 and np.any(geom):
- k = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (band_dilate_radius * 2 + 1, band_dilate_radius * 2 + 1)
- )
- geom = cv2.dilate(line_mask, k) > 0
- info: Dict[str, Any] = {
- "hough_total_lines": total_lines,
- "hough_diag_candidates": len(diag_candidates),
- "hough_kept_lines": len(kept_lines),
- "dominant_angles": dominant_angles,
- "angle_tolerance": angle_tolerance,
- "geom_mask_ratio": float(geom.sum() / gray_u8.size),
- "hough_lines_bgr": lines_filt_bgr,
- "hough_lines_all_bgr": lines_all_bgr,
- "angle_histogram_bgr": _render_angle_histogram(hist_smooth, dominant_angles),
- }
- return geom, info
- def _compute_block_orientation_debug_maps(
- gray: np.ndarray,
- *,
- block_size: int = 48,
- ) -> Tuple[np.ndarray, np.ndarray]:
- """分块 diag/hv 弱边缘占比图(仅 debug 热力图,0~1 float)。"""
- gray_f = np.asarray(gray, dtype=np.float32)
- bs = max(4, int(block_size))
- h_blocks = gray_f.shape[0] // bs
- w_blocks = gray_f.shape[1] // bs
- if h_blocks == 0 or w_blocks == 0:
- z = np.zeros_like(gray_f, dtype=np.float32)
- return z, z
- ph, pw = h_blocks * bs, w_blocks * bs
- gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3)
- gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3)
- mag = np.sqrt(gx * gx + gy * gy)
- ori = np.arctan2(gy, gx) * 180.0 / np.pi
- diag = (
- ((ori > 25) & (ori < 65))
- | ((ori > 115) & (ori < 155))
- | ((ori > -155) & (ori < -115))
- | ((ori > -65) & (ori < -25))
- )
- hv = (
- ((ori > -20) & (ori < 20))
- | ((ori > 160) | (ori < -160))
- | ((ori > 70) & (ori < 110))
- | ((ori > -110) & (ori < -70))
- )
- weak = (mag > 1) & (mag < 15)
- def _to_blocks(arr: np.ndarray) -> np.ndarray:
- return (
- arr[:ph, :pw]
- .reshape(h_blocks, bs, w_blocks, bs)
- .transpose(0, 2, 1, 3)
- .reshape(h_blocks, w_blocks, -1)
- )
- b_diag = _to_blocks(diag)
- b_hv = _to_blocks(hv)
- b_weak = _to_blocks(weak)
- diag_weak = np.sum(b_diag & b_weak, axis=2)
- hv_weak = np.sum(b_hv & b_weak, axis=2)
- total_weak = np.sum(b_weak, axis=2)
- with np.errstate(divide="ignore", invalid="ignore"):
- diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0).astype(np.float32)
- hv_ratio = np.where(total_weak > 0, hv_weak / total_weak, 0.0).astype(np.float32)
- diag_up = np.repeat(np.repeat(diag_ratio, bs, axis=0), bs, axis=1)
- hv_up = np.repeat(np.repeat(hv_ratio, bs, axis=0), bs, axis=1)
- diag_full = np.zeros_like(gray_f, dtype=np.float32)
- hv_full = np.zeros_like(gray_f, dtype=np.float32)
- diag_full[:ph, :pw] = diag_up
- hv_full[:ph, :pw] = hv_up
- return diag_full, hv_full
- def render_ratio_heatmap(ratio_map: np.ndarray) -> np.ndarray:
- """将 0~1 浮点占比图转为 BGR 热力图。"""
- r = np.clip(np.asarray(ratio_map, dtype=np.float32), 0.0, 1.0)
- u8 = (r * 255).astype(np.uint8)
- return cv2.applyColorMap(u8, cv2.COLORMAP_JET)
- def save_watermark_mask_debug_layers(
- image: np.ndarray,
- output_dir: Union[str, Path],
- stem: str,
- debug: Dict[str, Any],
- *,
- image_format: str = "png",
- ) -> Dict[str, str]:
- """保存分层 debug 图(方案 D)。"""
- out_dir = Path(output_dir)
- out_dir.mkdir(parents=True, exist_ok=True)
- fmt = (image_format or "png").lstrip(".")
- paths: Dict[str, str] = {}
- def _save_overlay(name: str, mask: Optional[np.ndarray], color=(0, 0, 255)) -> None:
- if mask is None or not np.any(mask):
- return
- ov = render_watermark_mask_overlay(image, mask, color=color)
- p = out_dir / f"{stem}_{name}.{fmt}"
- cv2.imwrite(str(p), cv2.cvtColor(ov, cv2.COLOR_RGB2BGR) if ov.shape[2] == 3 else ov)
- paths[name] = str(p)
- _save_overlay("wm_candidate_overlay", debug.get("wm_candidate"))
- _save_overlay("geom_region_overlay", debug.get("geom_region"), color=(0, 180, 255))
- _save_overlay("geom_candidate_overlay", debug.get("geom_candidate"), color=(0, 255, 0))
- _save_overlay("wm_mask_overlay", debug.get("wm_mask"), color=(255, 0, 0))
- hough_bgr = debug.get("hough_lines_bgr")
- if hough_bgr is not None:
- p = out_dir / f"{stem}_hough_lines.{fmt}"
- cv2.imwrite(str(p), hough_bgr)
- paths["hough_lines"] = str(p)
- hough_all = debug.get("hough_lines_all_bgr")
- if hough_all is not None:
- p = out_dir / f"{stem}_hough_lines_all.{fmt}"
- cv2.imwrite(str(p), hough_all)
- paths["hough_lines_all"] = str(p)
- angle_hist = debug.get("angle_histogram_bgr")
- if angle_hist is not None:
- p = out_dir / f"{stem}_angle_histogram.{fmt}"
- cv2.imwrite(str(p), angle_hist)
- paths["angle_histogram"] = str(p)
- diag_hm = debug.get("diag_ratio_heatmap")
- if diag_hm is not None:
- p = out_dir / f"{stem}_diag_ratio_heatmap.{fmt}"
- cv2.imwrite(str(p), diag_hm)
- paths["diag_ratio_heatmap"] = str(p)
- hv_hm = debug.get("hv_ratio_heatmap")
- if hv_hm is not None:
- p = out_dir / f"{stem}_hv_ratio_heatmap.{fmt}"
- cv2.imwrite(str(p), hv_hm)
- paths["hv_ratio_heatmap"] = str(p)
- return paths
- def _build_diag_region_mask(
- gray: np.ndarray,
- *,
- block_size: int = 48,
- diag_ratio_thresh: float = 0.20,
- light_gray_thresh: int = 238,
- light_ratio_thresh: float = 0.10,
- min_edge_count: int = 10,
- dilate_radius: int = 3,
- ) -> np.ndarray:
- """
- 分块梯度方向检测:返回对角线方向纹理占优的区域掩膜。
- 原理:水印是45°斜向字符,其梯度主方向在30-60°和120-150°。
- 分块统计该方向弱边缘占比,高频块标记为水印候选区域。
- Returns:
- bool ndarray, 与 gray 同形状,True=疑似斜向水印区域。
- """
- gray_f = np.asarray(gray, dtype=np.float32)
- img_h, img_w = gray_f.shape
- bs = max(4, int(block_size))
- # Sobel 梯度
- gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3)
- gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3)
- mag = np.sqrt(gx * gx + gy * gy)
- ori = np.arctan2(gy, gx) * 180.0 / np.pi
- # 对角线方向 (±45° 附近,即梯度 30-65° / 115-155°)
- diag = (
- ((ori > 25) & (ori < 65))
- | ((ori > 115) & (ori < 155))
- | ((ori > -155) & (ori < -115))
- | ((ori > -65) & (ori < -25))
- )
- h_blocks = img_h // bs
- w_blocks = img_w // bs
- if h_blocks == 0 or w_blocks == 0:
- return np.zeros_like(gray, dtype=bool)
- ph, pw = h_blocks * bs, w_blocks * bs
- # 分块统计
- def _to_blocks(arr: np.ndarray) -> np.ndarray:
- return arr[:ph, :pw].reshape(h_blocks, bs, w_blocks, bs).transpose(0, 2, 1, 3).reshape(h_blocks, w_blocks, -1)
- block_mag = _to_blocks(mag)
- block_diag = _to_blocks(diag)
- block_gray = _to_blocks(gray_f)
- weak = (block_mag > 1) & (block_mag < 15)
- diag_weak = np.sum(block_diag & weak, axis=2)
- total_weak = np.sum(weak, axis=2)
- with np.errstate(divide="ignore", invalid="ignore"):
- diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0)
- light_ratio = np.mean(block_gray >= light_gray_thresh, axis=2)
- wm_blocks = (
- (diag_ratio > diag_ratio_thresh)
- & (light_ratio > light_ratio_thresh)
- & (total_weak > min_edge_count)
- )
- # 展开为像素掩膜
- wm_block_mask = np.repeat(np.repeat(wm_blocks, bs, axis=0), bs, axis=1)
- full_mask = np.zeros(gray_f.shape, dtype=bool)
- full_mask[:ph, :pw] = wm_block_mask
- if dilate_radius > 0:
- k = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1)
- )
- full_mask = cv2.dilate(full_mask.astype(np.uint8), k) > 0
- return full_mask
- def _build_seal_protect_mask(
- bgr: np.ndarray,
- *,
- hue_high: int = 15,
- sat_min: int = 40,
- value_min: int = 30,
- ) -> np.ndarray:
- """红色/公章区域保护掩膜(True=保护,不置白)。"""
- hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV)
- lower1 = np.array([0, sat_min, value_min], dtype=np.uint8)
- upper1 = np.array([hue_high, 255, 255], dtype=np.uint8)
- lower2 = np.array([170, sat_min, value_min], dtype=np.uint8)
- upper2 = np.array([180, 255, 255], dtype=np.uint8)
- m1 = cv2.inRange(hsv, lower1, upper1)
- m2 = cv2.inRange(hsv, lower2, upper2)
- m2 = cv2.inRange(hsv, lower2, upper2)
- return (m1 > 0) | (m2 > 0)
- def _build_text_edge_protect(
- gray: np.ndarray,
- *,
- edge_window: int = 5,
- edge_std_thresh: float = 6.0,
- dilate_radius: int = 1,
- ) -> np.ndarray:
- """基于局部方差的笔画边缘保护掩膜(True=保护,不置白)。"""
- local_std = _local_std_map(gray, window=edge_window)
- edge_mask = local_std >= edge_std_thresh
- if dilate_radius > 0:
- k = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1)
- )
- edge_mask = cv2.dilate(edge_mask.astype(np.uint8), k) > 0
- return edge_mask.astype(bool)
- def _build_watermark_mask_light_on_white(
- gray: np.ndarray,
- *,
- bgr: Optional[np.ndarray] = None,
- light_gray_low: int = 236,
- light_gray_high: int = 253,
- whiten_gray_low: int = 200,
- text_protect_gray_max: int = 130,
- text_protect_percentile: Optional[float] = None,
- background_threshold: int = 248,
- morph_close_kernel: int = 0,
- morph_close_iter: int = 1,
- morph_dilate_kernel: int = 0,
- morph_dilate_iter: int = 1,
- min_component_area: int = 200,
- low_variance_thresh: float = 0.0,
- edge_window: int = 5,
- direction_filter: str = "hough",
- debug_block_maps: bool = True,
- debug_block_size: int = 48,
- hough_midtone_low: int = 200,
- hough_midtone_high: int = 254,
- hough_canny_low: int = 30,
- hough_canny_high: int = 100,
- hough_threshold: int = 25,
- hough_min_line_length: int = 35,
- hough_max_line_gap: int = 18,
- hough_line_thickness: int = 12,
- hough_band_dilate_radius: int = 14,
- hough_angle_tolerance: float = 5.0,
- hough_use_angle_statistics: bool = True,
- hough_secondary_peak_ratio: float = 0.35,
- hough_min_length_percentile: float = 25.0,
- diag_block_size: int = 0,
- diag_ratio_thresh: float = 0.20,
- diag_light_ratio_thresh: float = 0.10,
- diag_min_edge_count: int = 10,
- diag_dilate_radius: int = 3,
- seal_protect: bool = True,
- seal_hue_high: int = 15,
- seal_sat_min: int = 40,
- ) -> Tuple[np.ndarray, Dict[str, Any]]:
- """
- 白底流水水印掩膜(方案 C + E)。
- 1. Hough 斜向线段 → geom_region(几何限定区域)
- 2. wm_candidate = 浅色带且非正文保护
- 3. wm_mask = geom_region(置白区域由几何约束;实际白化时再 g>=light_gray_low)
- 4. debug 输出 candidate / geom / 交集 / 热力图
- """
- gray_arr = np.asarray(gray)
- bg_th = int(background_threshold)
- low = int(light_gray_low)
- high = int(light_gray_high)
- if text_protect_gray_max > 0:
- t_protect = float(text_protect_gray_max)
- else:
- dark = gray_arr[gray_arr < min(130, bg_th)]
- if dark.size > 0 and text_protect_percentile is not None:
- t_protect = float(np.percentile(dark, text_protect_percentile))
- else:
- t_protect = 120.0
- text_protect = gray_arr <= t_protect
- low = max(low, int(t_protect) + 25)
- wm_candidate = (gray_arr >= low) & (gray_arr < high) & (~text_protect)
- direction = (direction_filter or "hough").lower().strip()
- hough_info: Dict[str, Any] = {}
- geom_region = np.zeros_like(gray_arr, dtype=bool)
- if direction == "hough":
- geom_region, hough_info = _build_diag_hough_region_mask(
- gray_arr,
- midtone_low=hough_midtone_low,
- midtone_high=hough_midtone_high,
- canny_low=hough_canny_low,
- canny_high=hough_canny_high,
- hough_threshold=hough_threshold,
- min_line_length=hough_min_line_length,
- max_line_gap=hough_max_line_gap,
- angle_tolerance=hough_angle_tolerance,
- use_angle_statistics=hough_use_angle_statistics,
- secondary_peak_ratio=hough_secondary_peak_ratio,
- min_length_percentile=hough_min_length_percentile,
- line_thickness=hough_line_thickness,
- band_dilate_radius=hough_band_dilate_radius,
- )
- elif diag_block_size > 0:
- geom_region = _build_diag_region_mask(
- gray_arr,
- block_size=diag_block_size,
- diag_ratio_thresh=diag_ratio_thresh,
- light_gray_thresh=low,
- light_ratio_thresh=diag_light_ratio_thresh,
- min_edge_count=diag_min_edge_count,
- dilate_radius=diag_dilate_radius,
- )
- geom_candidate = geom_region & wm_candidate
- wm_mask = geom_region.copy()
- if min_component_area > 0 and np.any(wm_mask):
- n_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
- wm_mask.astype(np.uint8), connectivity=8
- )
- filtered = np.zeros_like(wm_mask)
- for i in range(1, n_labels):
- if stats[i, cv2.CC_STAT_AREA] >= min_component_area:
- filtered[labels == i] = True
- if np.any(filtered):
- wm_mask = filtered
- elif np.any(geom_region):
- wm_mask = geom_region
- seal_mask = np.zeros_like(wm_mask, dtype=bool)
- if seal_protect and bgr is not None and bgr.ndim == 3:
- seal_mask = _build_seal_protect_mask(
- bgr, hue_high=seal_hue_high, sat_min=seal_sat_min
- )
- wm_mask &= ~seal_mask
- midtone = (gray_arr >= low) & (gray_arr < high)
- debug: Dict[str, Any] = {
- "mask_mode": "light_on_white",
- "direction_filter": direction,
- "light_gray_low": low,
- "light_gray_high": high,
- "midtone_ratio": float(midtone.sum() / gray_arr.size),
- "wm_candidate_ratio": float(wm_candidate.sum() / gray_arr.size),
- "geom_mask_ratio": float(geom_region.sum() / gray_arr.size),
- "geom_candidate_ratio": float(geom_candidate.sum() / gray_arr.size),
- "wm_mask_ratio": float(wm_mask.sum() / gray_arr.size),
- "T_protect": t_protect,
- "text_protect_gray_max": text_protect_gray_max,
- "text_protect": text_protect,
- "seal_protect": seal_mask,
- "wm_candidate": wm_candidate,
- "geom_region": geom_region,
- "geom_candidate": geom_candidate,
- "diag_region": geom_region,
- "wm_mask": wm_mask,
- "whiten_gray_low": int(whiten_gray_low),
- "hough_lines_bgr": hough_info.get("hough_lines_bgr"),
- "hough_lines_all_bgr": hough_info.get("hough_lines_all_bgr"),
- "angle_histogram_bgr": hough_info.get("angle_histogram_bgr"),
- "dominant_angles": hough_info.get("dominant_angles", []),
- "hough_kept_lines": hough_info.get("hough_kept_lines", 0),
- "hough_diag_candidates": hough_info.get("hough_diag_candidates", 0),
- "hough_total_lines": hough_info.get("hough_total_lines", 0),
- }
- if debug_block_maps:
- bs = debug_block_size if debug_block_size > 0 else 48
- diag_map, hv_map = _compute_block_orientation_debug_maps(gray_arr, block_size=bs)
- debug["diag_ratio_heatmap"] = render_ratio_heatmap(diag_map)
- debug["hv_ratio_heatmap"] = render_ratio_heatmap(hv_map)
- return wm_mask, debug
- def build_watermark_mask(
- gray: np.ndarray,
- *,
- bgr: Optional[np.ndarray] = None,
- mask_mode: str = "diagonal_midtone",
- light_gray_low: int = 236,
- light_gray_high: int = 253,
- whiten_gray_low: int = 200,
- text_protect_gray_max: int = 130,
- morph_close_kernel: int = 0,
- morph_close_iter: int = 1,
- morph_dilate_kernel: int = 0,
- morph_dilate_iter: int = 1,
- low_variance_thresh: float = 0.0,
- edge_window: int = 5,
- direction_filter: str = "hough",
- debug_block_maps: bool = True,
- debug_block_size: int = 48,
- hough_midtone_low: int = 200,
- hough_midtone_high: int = 254,
- hough_canny_low: int = 30,
- hough_canny_high: int = 100,
- hough_threshold: int = 25,
- hough_min_line_length: int = 35,
- hough_max_line_gap: int = 18,
- hough_line_thickness: int = 12,
- hough_band_dilate_radius: int = 14,
- hough_angle_tolerance: float = 5.0,
- hough_use_angle_statistics: bool = True,
- hough_secondary_peak_ratio: float = 0.35,
- hough_min_length_percentile: float = 25.0,
- diag_block_size: int = 0,
- diag_ratio_thresh: float = 0.20,
- diag_light_ratio_thresh: float = 0.10,
- diag_min_edge_count: int = 10,
- diag_dilate_radius: int = 3,
- # diagonal_midtone 参数
- midtone_low: int = 100,
- midtone_high: int = 220,
- remove_horizontal_vertical: bool = True,
- diagonal_enhance: bool = True,
- diagonal_kernel_length: int = 25,
- horizontal_kernel_length: int = 35,
- vertical_kernel_length: int = 35,
- morph_open_kernel: int = 2,
- dmorph_close_kernel: int = 3,
- min_component_area: int = 200,
- text_protect_percentile: float = 10.0,
- background_threshold: int = 248,
- seal_protect: bool = True,
- seal_hue_high: int = 15,
- seal_sat_min: int = 40,
- ) -> Tuple[np.ndarray, Dict[str, Any]]:
- """
- 构建水印掩膜 wm_mask(True=疑似水印像素)。
- mask_mode:
- light_on_white — Hough 斜向几何带 + 浅色白化(方案 C/E)
- diagonal_midtone — 中间调 + 斜向形态学(旧逻辑)
- """
- gray = np.asarray(gray)
- if gray.ndim != 2:
- raise ValueError("build_watermark_mask expects single-channel grayscale")
- mode = (mask_mode or "light_on_white").lower().strip()
- if mode == "light_on_white":
- return _build_watermark_mask_light_on_white(
- gray,
- bgr=bgr,
- light_gray_low=light_gray_low,
- light_gray_high=light_gray_high,
- whiten_gray_low=whiten_gray_low,
- text_protect_gray_max=text_protect_gray_max,
- text_protect_percentile=text_protect_percentile,
- background_threshold=background_threshold,
- morph_close_kernel=morph_close_kernel,
- morph_close_iter=morph_close_iter,
- morph_dilate_kernel=morph_dilate_kernel,
- morph_dilate_iter=morph_dilate_iter,
- low_variance_thresh=low_variance_thresh,
- edge_window=edge_window,
- min_component_area=min_component_area,
- direction_filter=direction_filter,
- debug_block_maps=debug_block_maps,
- debug_block_size=debug_block_size,
- hough_midtone_low=hough_midtone_low,
- hough_midtone_high=hough_midtone_high,
- hough_canny_low=hough_canny_low,
- hough_canny_high=hough_canny_high,
- hough_threshold=hough_threshold,
- hough_min_line_length=hough_min_line_length,
- hough_max_line_gap=hough_max_line_gap,
- hough_line_thickness=hough_line_thickness,
- hough_band_dilate_radius=hough_band_dilate_radius,
- hough_angle_tolerance=hough_angle_tolerance,
- hough_use_angle_statistics=hough_use_angle_statistics,
- hough_secondary_peak_ratio=hough_secondary_peak_ratio,
- hough_min_length_percentile=hough_min_length_percentile,
- diag_block_size=diag_block_size,
- diag_ratio_thresh=diag_ratio_thresh,
- diag_light_ratio_thresh=diag_light_ratio_thresh,
- diag_min_edge_count=diag_min_edge_count,
- diag_dilate_radius=diag_dilate_radius,
- seal_protect=seal_protect,
- seal_hue_high=seal_hue_high,
- seal_sat_min=seal_sat_min,
- )
- midtone = (gray > midtone_low) & (gray < midtone_high)
- mid_u8 = (midtone.astype(np.uint8)) * 255
- horiz = np.zeros_like(midtone, dtype=bool)
- vert = np.zeros_like(midtone, dtype=bool)
- if remove_horizontal_vertical:
- kh = cv2.getStructuringElement(
- cv2.MORPH_RECT, (max(3, horizontal_kernel_length), 1)
- )
- kv = cv2.getStructuringElement(
- cv2.MORPH_RECT, (1, max(3, vertical_kernel_length))
- )
- horiz = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kh) > 0
- vert = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kv) > 0
- # 中间调去掉明显横竖线(保留斜向水印)
- candidate = midtone & ~(horiz | vert)
- if diagonal_enhance:
- k45 = _line_structuring_kernel(diagonal_kernel_length, 45)
- k135 = _line_structuring_kernel(diagonal_kernel_length, 135)
- d45 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k45) > 0
- d135 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k135) > 0
- direction = d45 | d135
- dilate_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7))
- near_diag = cv2.dilate(direction.astype(np.uint8), dilate_k) > 0
- # 斜向结构足够时收窄到斜向附近;否则保留「中间调减横竖」结果
- if near_diag.sum() > gray.size * 0.001:
- candidate = candidate & near_diag
- cand_u8 = (candidate.astype(np.uint8)) * 255
- if morph_open_kernel > 0:
- k_open = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (morph_open_kernel, morph_open_kernel)
- )
- cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_OPEN, k_open)
- if dmorph_close_kernel > 0:
- k_close = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE, (dmorph_close_kernel, dmorph_close_kernel)
- )
- cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_CLOSE, k_close)
- wm_mask = cand_u8 > 0
- if min_component_area > 0:
- n_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
- wm_mask.astype(np.uint8), connectivity=8
- )
- filtered = np.zeros_like(wm_mask)
- for i in range(1, n_labels):
- if stats[i, cv2.CC_STAT_AREA] >= min_component_area:
- filtered[labels == i] = True
- wm_mask = filtered
- non_bg = gray[gray < background_threshold]
- if non_bg.size > 0:
- t_protect = float(np.percentile(non_bg, text_protect_percentile))
- else:
- t_protect = 85.0
- t_protect = max(t_protect, float(midtone_low))
- text_protect = gray <= t_protect
- midtone_ratio = float(midtone.sum() / gray.size)
- wm_ratio = float(wm_mask.sum() / gray.size)
- # 掩膜过小:回退为「中间调减横竖」或整块中间调(满版斜纹水印常见)
- min_wm_ratio = max(0.005, midtone_ratio * 0.12)
- if wm_ratio < min_wm_ratio:
- relaxed = midtone & ~(horiz | vert) & (~text_protect)
- if relaxed.sum() / gray.size < min_wm_ratio:
- relaxed = midtone & (~text_protect)
- wm_mask = relaxed
- wm_ratio = float(wm_mask.sum() / gray.size)
- seal_mask = np.zeros_like(wm_mask, dtype=bool)
- if seal_protect and bgr is not None and bgr.ndim == 3:
- seal_mask = _build_seal_protect_mask(
- bgr, hue_high=seal_hue_high, sat_min=seal_sat_min
- )
- debug: Dict[str, Any] = {
- "mask_mode": "diagonal_midtone",
- "midtone_ratio": midtone_ratio,
- "wm_mask_ratio": wm_ratio,
- "T_protect": t_protect,
- "text_protect": text_protect,
- "seal_protect": seal_mask,
- "midtone_mask": midtone,
- "wm_mask": wm_mask,
- }
- return wm_mask, debug
- def remove_watermark_masked_adaptive(
- gray: np.ndarray,
- *,
- bgr: Optional[np.ndarray] = None,
- mask_cfg: Optional[Dict[str, Any]] = None,
- adaptive_cfg: Optional[Dict[str, Any]] = None,
- threshold_fallback: int = 175,
- morph_close_kernel: int = 0,
- ) -> Tuple[np.ndarray, Dict[str, Any]]:
- """
- 掩膜内置白(whiten_mode=mask_fill)或掩膜内动态阈值(threshold_in_mask)。
- 掩膜为空时回退全局 threshold_fallback。
- """
- gray = np.asarray(gray).copy()
- mcfg: Dict[str, Any] = {
- "mask_mode": "light_on_white",
- "light_gray_low": 236,
- "light_gray_high": 253,
- "whiten_gray_low": 200,
- "text_protect_gray_max": 130,
- "morph_close_kernel": 0,
- "morph_close_iter": 1,
- "morph_dilate_kernel": 0,
- "morph_dilate_iter": 1,
- "low_variance_thresh": 0.0,
- "edge_window": 5,
- "min_component_area": 200,
- "direction_filter": "hough",
- "debug_block_maps": True,
- "debug_block_size": 48,
- "hough_midtone_low": 200,
- "hough_midtone_high": 254,
- "hough_canny_low": 30,
- "hough_canny_high": 100,
- "hough_threshold": 25,
- "hough_min_line_length": 35,
- "hough_max_line_gap": 18,
- "hough_line_thickness": 12,
- "hough_band_dilate_radius": 14,
- "hough_angle_tolerance": 5.0,
- "hough_use_angle_statistics": True,
- "hough_secondary_peak_ratio": 0.35,
- "hough_min_length_percentile": 25.0,
- "diag_block_size": 0,
- "diag_ratio_thresh": 0.20,
- "diag_light_ratio_thresh": 0.10,
- "diag_min_edge_count": 10,
- "diag_dilate_radius": 3,
- "midtone_low": 100,
- "midtone_high": 220,
- "remove_horizontal_vertical": True,
- "diagonal_enhance": True,
- "diagonal_kernel_length": 25,
- "horizontal_kernel_length": 35,
- "vertical_kernel_length": 35,
- "morph_open_kernel": 2,
- "dmorph_close_kernel": 3,
- "text_protect_percentile": 10.0,
- "background_threshold": 248,
- "seal_protect": True,
- "seal_hue_high": 15,
- "seal_sat_min": 40,
- }
- mcfg.update(mask_cfg or {})
- mask_mode = str(mcfg.get("mask_mode", "light_on_white")).lower().strip()
- # light_on_white 默认 mask_fill
- acfg: Dict[str, Any] = {
- "whiten_mode": None,
- "text_percentile": 10.0,
- "watermark_percentile": 88.0,
- "background_percentile": 95.0,
- "background_threshold": 248,
- "wm_margin": 12,
- "text_protect_max": 120,
- }
- acfg.update(adaptive_cfg or {})
- whiten_mode = acfg.get("whiten_mode")
- if not whiten_mode:
- whiten_mode = (
- "mask_fill"
- if mask_mode == "light_on_white"
- else "threshold_in_mask"
- )
- whiten_mode = str(whiten_mode).lower().strip()
- wm_mask, debug = build_watermark_mask(gray, bgr=bgr, **mcfg)
- if not np.any(wm_mask):
- cleaned = gray.copy()
- cleaned[gray > threshold_fallback] = 255
- debug["mode"] = "fallback_threshold"
- debug["threshold_fallback"] = threshold_fallback
- if morph_close_kernel > 0:
- kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8)
- cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel)
- return cleaned, debug
- bg_th = int(acfg["background_threshold"])
- bg_pixels = gray[gray >= bg_th]
- if bg_pixels.size > 0:
- b_level = float(np.percentile(bg_pixels, acfg["background_percentile"]))
- else:
- b_level = 250.0
- if mask_mode == "light_on_white":
- t_protect = float(debug.get("T_protect", 150.0))
- else:
- non_bg = gray[gray < bg_th]
- if non_bg.size > 0:
- t_protect = float(np.percentile(non_bg, acfg["text_percentile"]))
- else:
- t_protect = float(debug.get("T_protect", 85.0))
- t_protect = min(t_protect, float(acfg["text_protect_max"]))
- t_protect = max(t_protect, float(mcfg.get("midtone_low", 100)))
- text_protect = debug["text_protect"]
- seal_protect = debug["seal_protect"]
- t_wm: Optional[float] = None
- if whiten_mode == "mask_fill":
- # 几何带内:g>=whiten_gray_low 置白;g<=130 正文硬保护(方案 E)
- wm_gray_low = float(
- mcfg.get("whiten_gray_low", debug.get("whiten_gray_low", 200))
- )
- to_white = (
- wm_mask
- & (gray >= wm_gray_low)
- & (gray < int(mcfg.get("light_gray_high", 254)))
- & (~text_protect)
- & (~seal_protect)
- )
- else:
- mask_vals = gray[wm_mask]
- if mask_vals.size > 0:
- t_wm = float(np.percentile(mask_vals, acfg["watermark_percentile"]))
- else:
- t_wm = t_protect + 0.45 * (b_level - t_protect)
- margin = float(acfg["wm_margin"])
- t_wm = max(t_wm, t_protect + margin)
- t_wm = min(t_wm, b_level - 3.0)
- t_wm = min(t_wm, float(mcfg.get("midtone_high", 220)) - 5.0)
- to_white = wm_mask & (gray >= t_wm) & (~text_protect) & (~seal_protect)
- cleaned = gray.copy()
- cleaned[to_white] = 255
- if morph_close_kernel > 0:
- kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8)
- cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel)
- debug.update(
- {
- "mode": "masked_adaptive",
- "mask_mode": mask_mode,
- "whiten_mode": whiten_mode,
- "T_wm": t_wm,
- "T_protect": t_protect,
- "B_level": b_level,
- "white_pixel_ratio": float(to_white.sum() / gray.size),
- "threshold_fallback": threshold_fallback,
- }
- )
- return cleaned, debug
- def _image_to_gray_and_bgr(
- image: Union[np.ndarray, Image.Image],
- ) -> Tuple[np.ndarray, Optional[np.ndarray]]:
- """统一为灰度 + 可选 BGR(用于掩膜公章保护)。"""
- if isinstance(image, Image.Image):
- pil_img = image.convert("RGB") if image.mode == "RGBA" else image
- np_img = np.array(pil_img)
- np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR)
- else:
- np_img = image.copy()
- if np_img.ndim == 3:
- bgr = np_img
- gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY)
- else:
- bgr = None
- gray = np_img
- return gray, bgr
- def _enhance_text_restore(
- gray: np.ndarray,
- *,
- background_threshold: int = 248,
- text_lo_percentile: float = 1.0,
- text_hi_percentile: float = 99.0,
- text_black_target: int = 85,
- ) -> np.ndarray:
- """
- 仅对非背景像素做动态范围压缩,将最深笔画拉向 text_black_target(默认 ~85,接近扫描件原图)。
- 背景(>= background_threshold)保持白色,避免整图 gamma 导致背景发灰。
- """
- result = gray.copy()
- bg_th = int(np.clip(background_threshold, 200, 255))
- text_mask = gray < bg_th
- if not np.any(text_mask):
- return result
- vals = gray[text_mask].astype(np.float32)
- lo = float(np.percentile(vals, text_lo_percentile))
- hi = float(np.percentile(vals, text_hi_percentile))
- target = int(np.clip(text_black_target, 10, 200))
- if hi <= lo + 1.0:
- return result
- stretched = (vals - lo) * target / (hi - lo)
- result[text_mask] = np.clip(stretched, 0, 255).astype(np.uint8)
- return result
- def enhance_document_contrast(
- gray: np.ndarray,
- method: str = "text_restore",
- *,
- clip_limit: float = 2.0,
- tile_grid_size: int = 8,
- gamma: float = 0.85,
- black_percentile: float = 2.0,
- white_percentile: float = 98.0,
- background_threshold: int = 248,
- text_lo_percentile: float = 1.0,
- text_hi_percentile: float = 99.0,
- text_black_target: int = 85,
- ) -> np.ndarray:
- """
- 文档灰度图对比度增强(常用于去水印后恢复笔画深度)。
- Args:
- gray: 单通道 uint8 灰度图
- method: text_restore | clahe | gamma | linear
- clip_limit: CLAHE 对比度限制
- tile_grid_size: CLAHE 分块大小
- gamma: gamma 校正指数,<1 加深文字(去水印后发浅时适用)
- black_percentile: linear 拉伸下分位(映射到 0)
- white_percentile: linear 拉伸上分位(映射到 255)
- background_threshold: text_restore 背景阈值(>= 视为白底不处理)
- text_lo_percentile: text_restore 笔画下分位
- text_hi_percentile: text_restore 笔画上分位(映射到 text_black_target)
- text_black_target: text_restore 最深笔画目标灰度(越小越深,建议 75~95)
- Returns:
- 增强后的灰度图
- """
- if gray is None or gray.size == 0:
- return gray
- if gray.ndim != 2:
- raise ValueError("enhance_document_contrast expects single-channel grayscale image")
- method = (method or "text_restore").lower().strip()
- if method == "text_restore":
- return _enhance_text_restore(
- gray,
- background_threshold=background_threshold,
- text_lo_percentile=text_lo_percentile,
- text_hi_percentile=text_hi_percentile,
- text_black_target=text_black_target,
- )
- if method == "gamma":
- gamma = max(0.1, min(float(gamma), 3.0))
- inv_gamma = 1.0 / gamma
- table = np.array(
- [((i / 255.0) ** inv_gamma) * 255 for i in range(256)],
- dtype=np.uint8,
- )
- return cv2.LUT(gray, table)
- if method == "linear":
- p_low = float(np.percentile(gray, black_percentile))
- p_high = float(np.percentile(gray, white_percentile))
- if p_high <= p_low + 1.0:
- return gray
- stretched = (gray.astype(np.float32) - p_low) * 255.0 / (p_high - p_low)
- return np.clip(stretched, 0, 255).astype(np.uint8)
- # 默认 CLAHE:局部对比度,适合扫描件
- tile = max(2, int(tile_grid_size))
- clahe = cv2.createCLAHE(
- clipLimit=max(0.1, float(clip_limit)),
- tileGridSize=(tile, tile),
- )
- return clahe.apply(gray)
- def apply_contrast_enhancement_config(
- gray: np.ndarray,
- contrast_cfg: Optional[Dict[str, Any]],
- ) -> np.ndarray:
- """按配置字典应用对比度增强;未启用时原样返回。"""
- if not contrast_cfg or not contrast_cfg.get("enabled", False):
- return gray
- return enhance_document_contrast(
- gray,
- method=contrast_cfg.get("method", "text_restore"),
- clip_limit=contrast_cfg.get("clip_limit", 2.0),
- tile_grid_size=contrast_cfg.get("tile_grid_size", 8),
- gamma=contrast_cfg.get("gamma", 0.85),
- black_percentile=contrast_cfg.get("black_percentile", 2.0),
- white_percentile=contrast_cfg.get("white_percentile", 98.0),
- background_threshold=contrast_cfg.get("background_threshold", 248),
- text_lo_percentile=contrast_cfg.get("text_lo_percentile", 1.0),
- text_hi_percentile=contrast_cfg.get("text_hi_percentile", 99.0),
- text_black_target=contrast_cfg.get("text_black_target", 75),
- )
- def remove_watermark_from_image(
- image: Union[np.ndarray, Image.Image],
- threshold: int = 160,
- morph_close_kernel: int = 2,
- return_pil: Optional[bool] = None,
- watermark_removal_cfg: Optional[Dict[str, Any]] = None,
- removal_debug: Optional[Dict[str, Any]] = None,
- ) -> Union[np.ndarray, Image.Image]:
- """
- 去除图像中的浅色斜向文字水印,返回灰度图。
- method(watermark_removal_cfg):
- threshold(默认): gray > threshold → 255
- masked / masked_adaptive: 掩膜 + 掩膜内动态阈值
- Args:
- image: 输入图像(PIL.Image 或 np.ndarray BGR/RGB/灰度)。
- threshold: 全局阈值或掩膜失败时的回退阈值。
- morph_close_kernel: 形态学闭运算核大小,0 跳过。
- watermark_removal_cfg: 完整配置(含 method / mask / adaptive)。
- removal_debug: 若传入 dict,写入掩膜与 T_wm 等调试字段。
- Returns:
- 去除水印后的灰度图:PIL.Image(mode='L') 或 np.ndarray(HxW, uint8)。
- """
- input_is_pil = isinstance(image, Image.Image)
- cfg = watermark_removal_cfg or {}
- method = str(cfg.get("method") or "threshold").lower().strip()
- gray, bgr = _image_to_gray_and_bgr(image)
- if method in ("masked", "masked_adaptive"):
- cleaned, dbg = remove_watermark_masked_adaptive(
- gray,
- bgr=bgr,
- mask_cfg=cfg.get("mask") if isinstance(cfg.get("mask"), dict) else None,
- adaptive_cfg=cfg.get("adaptive")
- if isinstance(cfg.get("adaptive"), dict)
- else None,
- threshold_fallback=threshold,
- morph_close_kernel=morph_close_kernel,
- )
- if removal_debug is not None:
- removal_debug.clear()
- removal_debug.update(dbg)
- else:
- cleaned = gray.copy()
- cleaned[gray > threshold] = 255
- if morph_close_kernel > 0:
- kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8)
- cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel)
- if removal_debug is not None:
- removal_debug.clear()
- removal_debug.update({"mode": "threshold", "threshold": threshold})
- should_return_pil = input_is_pil if return_pil is None else return_pil
- return Image.fromarray(cleaned, mode='L') if should_return_pil else cleaned
- def remove_watermark_from_image_rgb(
- image: Union[np.ndarray, Image.Image],
- threshold: int = 160,
- morph_close_kernel: int = 2,
- return_pil: Optional[bool] = None,
- contrast_enhancement: Optional[Dict[str, Any]] = None,
- apply_watermark_removal: bool = True,
- watermark_removal_cfg: Optional[Dict[str, Any]] = None,
- removal_debug: Optional[Dict[str, Any]] = None,
- ) -> Union[np.ndarray, Image.Image]:
- """
- 去除水印并返回 RGB 三通道图像。
- 与 remove_watermark_from_image 逻辑相同,但输出为 RGB(三通道),
- 方便直接传入布局检测、OCR 等需要彩色输入的下游模型。
- Args:
- contrast_enhancement: 对比度增强配置(含 enabled / method 等),见 apply_contrast_enhancement_config
- apply_watermark_removal: False 时跳过阈值抹白,仅做对比度增强(若启用)
- Args/Returns: 同 remove_watermark_from_image,但输出为 RGB/BGR 三通道。
- """
- input_is_pil = isinstance(image, Image.Image)
- if apply_watermark_removal:
- gray_result = remove_watermark_from_image(
- image,
- threshold,
- morph_close_kernel,
- return_pil=False,
- watermark_removal_cfg=watermark_removal_cfg,
- removal_debug=removal_debug,
- )
- else:
- if isinstance(image, Image.Image):
- np_img = np.array(image.convert("RGB"))
- np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR)
- else:
- np_img = image.copy()
- gray_result = (
- cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY)
- if np_img.ndim == 3
- else np_img
- )
- gray_result = apply_contrast_enhancement_config(gray_result, contrast_enhancement)
- rgb_np = cv2.cvtColor(gray_result, cv2.COLOR_GRAY2BGR)
- should_return_pil = input_is_pil if return_pil is None else return_pil
- if should_return_pil:
- return Image.fromarray(cv2.cvtColor(rgb_np, cv2.COLOR_BGR2RGB))
- return rgb_np
- def render_watermark_mask_overlay(
- image: np.ndarray,
- wm_mask: np.ndarray,
- *,
- color: Tuple[int, int, int] = (0, 0, 255),
- alpha: float = 0.45,
- ) -> np.ndarray:
- """在原图上叠加红色半透明水印掩膜,供调试图保存。"""
- if image.ndim == 2:
- base = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
- elif image.shape[2] == 3:
- base = image.copy()
- if image.max() <= 1:
- base = (image * 255).astype(np.uint8)
- else:
- base = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
- overlay = base.copy()
- overlay[wm_mask] = color
- return cv2.addWeighted(base, 1.0 - alpha, overlay, alpha, 0)
- def _image_to_bgr_for_debug(img: np.ndarray) -> np.ndarray:
- """将 ndarray 转为 BGR,供 cv2.imwrite 使用。"""
- if img.ndim == 2:
- return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
- out = img.copy()
- if out.shape[2] == 3:
- return cv2.cvtColor(out, cv2.COLOR_RGB2BGR)
- return out
- def save_watermark_removal_debug(
- before: Union[np.ndarray, Image.Image],
- after: Union[np.ndarray, Image.Image],
- output_dir: Union[str, Path],
- page_name: str,
- *,
- processing_params: Optional[Dict[str, Any]] = None,
- image_format: str = "png",
- save_compare: bool = True,
- subdir: str = "watermark_removal",
- mask_overlay: Optional[np.ndarray] = None,
- ) -> Dict[str, str]:
- """
- 保存去水印调试图(before / after / compare / meta.json)。
- 与 universal_doc_parser 的 module debug 目录结构一致:
- ``{output_dir}/debug/{subdir}/``
- Args:
- before: 处理前图像(RGB/BGR/灰度)
- after: 处理后图像
- output_dir: 输出根目录(通常为 pipeline 或工具的输出目录)
- page_name: 文件名前缀(如 ``doc_page_002``)
- processing_params: 写入 meta.json 的参数(threshold、contrast_enhancement 等)
- image_format: 图片格式,png/jpg
- save_compare: 是否保存左右拼接对比图
- subdir: debug 根目录下的子目录名(默认 watermark_removal)
- Returns:
- 已保存文件路径字典(before/after/compare/meta,未保存的键省略)
- """
- if isinstance(before, Image.Image):
- before = np.array(before)
- if isinstance(after, Image.Image):
- after = np.array(after)
- from ocr_utils.module_debug_viz import resolve_module_debug_dir
- debug_dir = resolve_module_debug_dir(output_dir, subdir)
- fmt = (image_format or "png").lstrip(".")
- before_bgr = _image_to_bgr_for_debug(before)
- after_bgr = _image_to_bgr_for_debug(after)
- paths: Dict[str, str] = {}
- before_path = debug_dir / f"{page_name}_watermark_before.{fmt}"
- after_path = debug_dir / f"{page_name}_watermark_after.{fmt}"
- cv2.imwrite(str(before_path), before_bgr)
- cv2.imwrite(str(after_path), after_bgr)
- paths["before"] = str(before_path)
- paths["after"] = str(after_path)
- if save_compare:
- h = max(before_bgr.shape[0], after_bgr.shape[0])
- if before_bgr.shape[0] != h:
- before_bgr = cv2.resize(before_bgr, (before_bgr.shape[1], h))
- if after_bgr.shape[0] != h:
- after_bgr = cv2.resize(after_bgr, (after_bgr.shape[1], h))
- compare = np.hstack([before_bgr, after_bgr])
- compare_path = debug_dir / f"{page_name}_watermark_compare.{fmt}"
- cv2.imwrite(str(compare_path), compare)
- paths["compare"] = str(compare_path)
- logger.info(f"Saved watermark compare: {compare_path}")
- if mask_overlay is not None:
- mask_bgr = _image_to_bgr_for_debug(mask_overlay)
- mask_path = debug_dir / f"{page_name}_watermark_mask.{fmt}"
- cv2.imwrite(str(mask_path), mask_bgr)
- paths["mask"] = str(mask_path)
- meta: Dict[str, Any] = {"page_name": page_name}
- if processing_params:
- _skip_meta = (
- "midtone_mask",
- "wm_mask",
- "wm_candidate",
- "geom_region",
- "geom_candidate",
- "diag_region",
- "text_protect",
- "seal_protect",
- "hough_lines_bgr",
- "diag_ratio_heatmap",
- "hv_ratio_heatmap",
- )
- meta_params = {
- k: v
- for k, v in processing_params.items()
- if k not in _skip_meta
- }
- meta.update(meta_params)
- else:
- meta.update({})
- meta["before"] = paths["before"]
- meta["after"] = paths["after"]
- if "compare" in paths:
- meta["compare"] = paths["compare"]
- meta_path = debug_dir / f"{page_name}_watermark_meta.json"
- meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
- paths["meta"] = str(meta_path)
- logger.info(f"Saved watermark debug: {before_path}, {after_path}")
- return paths
- # ─────────────────────────────────────────────────────────────────────────────
- # PDF 层级水印去除(文字型 PDF,保留可搜索性)
- # ─────────────────────────────────────────────────────────────────────────────
- def _is_watermark_xobj(doc, xref: int, obj_str: str) -> bool:
- """
- 判断一个 Form XObject 是否为水印。
- 启发式规则(满足其一即视为水印):
- 1. 含旋转变换矩阵(cm 指令 sin/cos 分量非零),无论是否有 /Group
- 2. 有透明度组(/Group)且内容流包含透明度操作符(ca/CA)
- 3. 有透明度组且内容流体积 > 2KB(大量重复绘图 = 平铺水印)
- """
- if "/Form" not in obj_str:
- return False
- try:
- stream = doc.xref_stream(xref)
- if not stream:
- return False
- stream_text = stream.decode("latin-1", errors="ignore")
- except Exception:
- return False
- has_group = "/Group" in obj_str
- cm_pattern = re.compile(
- r"([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+[-\d.]+\s+[-\d.]+\s+cm"
- )
- for m in cm_pattern.finditer(stream_text):
- a, b, c, d = float(m.group(1)), float(m.group(2)), float(m.group(3)), float(m.group(4))
- if abs(b) > 0.1 or abs(c) > 0.1:
- return True
- if not has_group:
- return False
- if re.search(r'\b(ca|CA)\s+[0-9.]+', stream_text) or re.search(r'[0-9.]+\s+(ca|CA)\b', stream_text):
- return True
- if len(stream_text) > 2048:
- return True
- return False
- def _is_watermark_image_xobj(doc, xref: int, obj_str: str) -> bool:
- """
- 判断一个 Image XObject 是否为水印背景图。
- 判断规则(全部满足):
- 1. /Subtype /Image
- 2. 有 /SMask(半透明)
- 3. 宽 >= 600 且 高 >= 800(全页尺寸,排除小图标)
- 4. 解码后像素均值 >= 240(近乎全白,水印文字稀疏)
- """
- if "/Image" not in obj_str or "/SMask" not in obj_str:
- return False
- w_m = re.search(r'/Width\s+(\d+)', obj_str)
- h_m = re.search(r'/Height\s+(\d+)', obj_str)
- if not w_m or not h_m:
- return False
- if int(w_m.group(1)) < 600 or int(h_m.group(1)) < 800:
- return False
- try:
- from io import BytesIO
- img_info = doc.extract_image(xref)
- pil_img = Image.open(BytesIO(img_info["image"])).convert("L")
- return float(np.array(pil_img).mean()) >= 240.0
- except Exception:
- return False
- def _blank_watermark_image(doc, img_xref: int) -> None:
- """
- 将水印 Image XObject 的 RGB 流和 SMask 替换为全白/全不透明。
- 关键点:必须先移除 /DecodeParms(Predictor 11),再调用 update_stream。
- 否则渲染器在 FlateDecode 之后还会尝试 Predictor 解码,失败后回退原始数据,
- 水印依然可见。
- """
- obj_str = doc.xref_object(img_xref)
- w_m = re.search(r'/Width\s+(\d+)', obj_str)
- h_m = re.search(r'/Height\s+(\d+)', obj_str)
- w = int(w_m.group(1)) if w_m else 1
- h = int(h_m.group(1)) if h_m else 1
- cs_m = re.search(r'/ColorSpace\s+/Device(RGB|Gray|CMYK)', obj_str)
- channels = {'RGB': 3, 'CMYK': 4}.get(cs_m.group(1) if cs_m else '', 1)
- doc.xref_set_key(img_xref, "DecodeParms", "null")
- doc.update_stream(img_xref, bytes([255]) * (w * h * channels))
- smask_m = re.search(r'/SMask\s+(\d+)\s+0\s+R', obj_str)
- if smask_m:
- smask_xref = int(smask_m.group(1))
- smask_obj = doc.xref_object(smask_xref)
- sw = int(m.group(1)) if (m := re.search(r'/Width\s+(\d+)', smask_obj)) else w
- sh = int(m.group(1)) if (m := re.search(r'/Height\s+(\d+)', smask_obj)) else h
- doc.xref_set_key(smask_xref, "DecodeParms", "null")
- doc.update_stream(smask_xref, bytes([255]) * (sw * sh))
- def scan_pdf_watermark_xobjs(pdf_bytes: bytes, sample_pages: int = 3) -> bool:
- """
- 快速扫描 PDF 前 N 页,判断是否含水印 XObject。
- 无副作用(只读),用于在执行去水印前快速判断,避免对无水印的大文件
- 执行全量扫描和序列化,显著降低财报等大文件的处理开销。
- Args:
- pdf_bytes: PDF 文件的原始字节。
- sample_pages: 扫描页数上限,默认 3(银行流水通常前几页有水印)。
- Returns:
- True 表示发现水印 XObject,False 表示未发现。
- """
- try:
- import fitz
- except ImportError:
- return False
- doc = fitz.open(stream=pdf_bytes, filetype="pdf")
- pages_to_check = min(sample_pages, len(doc))
- try:
- for i in range(pages_to_check):
- page = doc[i]
- for xref, *_ in page.get_xobjects():
- try:
- obj_str = doc.xref_object(xref)
- except Exception:
- continue
- if _is_watermark_xobj(doc, xref, obj_str):
- return True
- for img_tuple in page.get_images(full=True):
- try:
- obj_str = doc.xref_object(img_tuple[0])
- except Exception:
- continue
- if _is_watermark_image_xobj(doc, img_tuple[0], obj_str):
- return True
- finally:
- doc.close()
- return False
- def remove_txt_pdf_watermark(pdf_bytes: bytes) -> Optional[bytes]:
- """
- 对文字型 PDF 执行原生水印去除,完全在内存中完成,不写临时文件。
- 支持两种水印形式:
- - Form XObject 水印:清空内容流
- - Image XObject 水印(全页背景图 + SMask 透明通道):替换为全白像素
- 适用场景:pdf_type='txt' 的 PDF,去除后可直接传给渲染层(tobytes() → bytes)。
- 对于大文件(如财报),建议先用 scan_pdf_watermark_xobjs() 快速判断再调用本函数。
- Args:
- pdf_bytes: 原始 PDF 的字节内容。
- Returns:
- 去除水印后的 PDF bytes(garbage=4 压缩);若未发现水印返回 None。
- """
- try:
- import fitz
- except ImportError:
- raise ImportError("请安装 PyMuPDF: pip install PyMuPDF")
- from loguru import logger
- doc = fitz.open(stream=pdf_bytes, filetype="pdf")
- processed_xrefs: set[int] = set()
- total_removed = 0
- for page in doc:
- # ── Form XObject 水印 ─────────────────────────────────────────
- for xref, name, _invoker, _unused in page.get_xobjects():
- if xref in processed_xrefs:
- continue
- try:
- obj_str = doc.xref_object(xref)
- except Exception:
- continue
- if _is_watermark_xobj(doc, xref, obj_str):
- try:
- doc.update_stream(xref, b"")
- processed_xrefs.add(xref)
- total_removed += 1
- logger.debug(f" [Form XObject] 清空水印 xref={xref}, name={name}")
- except Exception as e:
- logger.warning(f" 清空 Form XObject xref={xref} 失败: {e}")
- # ── Image XObject 水印 ────────────────────────────────────────
- for img_tuple in page.get_images(full=True):
- img_xref = img_tuple[0]
- if img_xref in processed_xrefs:
- continue
- try:
- obj_str = doc.xref_object(img_xref)
- except Exception:
- continue
- if _is_watermark_image_xobj(doc, img_xref, obj_str):
- _blank_watermark_image(doc, img_xref)
- processed_xrefs.add(img_xref)
- total_removed += 1
- logger.debug(f" [Image XObject] 替换水印图像 xref={img_xref}")
- if total_removed == 0:
- doc.close()
- return None
- result = doc.tobytes(garbage=4, deflate=True)
- doc.close()
- logger.info(f"✅ PDF 层级水印去除:共清除 {total_removed} 个水印 XObject")
- return result
|