""" 水印处理工具模块 统一管理所有水印检测与去除能力,供整个平台复用: - 图像级(扫描 PDF / 图片): detect_watermark() 检测图像中的斜向文字水印 remove_watermark_from_image() 去除水印,返回灰度图 remove_watermark_from_image_rgb() 去除水印,返回 RGB 图(适合模型输入) - PDF 层级(文字型 PDF,保留可搜索性): scan_pdf_watermark_xobjs() 快速扫描 PDF 是否含水印 XObject(无副作用) remove_txt_pdf_watermark() 从内存 PDF bytes 去除水印,返回新 bytes 或 None """ from __future__ import annotations import re from typing import Optional, Union import cv2 import numpy as np from PIL import Image # ───────────────────────────────────────────────────────────────────────────── # 图像级水印检测与去除 # ───────────────────────────────────────────────────────────────────────────── def detect_watermark( image: Union[np.ndarray, Image.Image], midtone_low: int = 100, midtone_high: int = 220, ratio_threshold: float = 0.03, check_diagonal: bool = True, diagonal_angle_range: tuple = (30, 60), ) -> bool: """ 检测图像中是否存在浅色斜向文字水印(银行流水类文档水印检测)。 原理: 1. 将图像转为灰度,提取「中间调」像素(midtone_low ~ midtone_high), 这些像素既不是纯白背景,也不是深黑正文,是浅灰水印的典型范围。 2. 若中间调像素占比超过 ratio_threshold,初步判定存在水印。 3. 若 check_diagonal=True,进一步用 Hough 直线变换验证中间调区域 是否呈现斜向(diagonal_angle_range 度)纹理,以排除灰色背景误报。 Args: image: 输入图像,支持 PIL.Image 或 np.ndarray(BGR/RGB/灰度)。 midtone_low: 中间调下限(默认 100),低于此视为深色正文。 midtone_high: 中间调上限(默认 220),高于此视为纯白背景。 ratio_threshold: 中间调像素占全图比例阈值(默认 0.03 即 3%)。 check_diagonal: 是否进行斜向纹理验证(默认 True)。 diagonal_angle_range: 斜向角度范围(度),默认 (30, 60),含 45° 斜水印。 Returns: True 表示检测到水印,False 表示未检测到。 """ if isinstance(image, Image.Image): pil_img = image.convert('RGB') if image.mode == 'RGBA' else image np_img = np.array(pil_img) gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY) if np_img.ndim == 3 else np_img else: np_img = image gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img midtone_mask = (gray > midtone_low) & (gray < midtone_high) ratio = midtone_mask.sum() / gray.size if ratio < ratio_threshold: return False if not check_diagonal: return True midtone_uint8 = (midtone_mask.astype(np.uint8)) * 255 edges = cv2.Canny(midtone_uint8, 50, 150, apertureSize=3) lines = cv2.HoughLines(edges, rho=1, theta=np.pi / 180, threshold=80) if lines is None: return False low_rad = np.deg2rad(diagonal_angle_range[0]) high_rad = np.deg2rad(diagonal_angle_range[1]) diagonal_count = 0 for line in lines: theta = line[0][1] if low_rad <= theta <= high_rad or (np.pi - high_rad) <= theta <= (np.pi - low_rad): diagonal_count += 1 return diagonal_count >= 2 def remove_watermark_from_image( image: Union[np.ndarray, Image.Image], threshold: int = 160, morph_close_kernel: int = 2, return_pil: Optional[bool] = None, ) -> Union[np.ndarray, Image.Image]: """ 去除图像中的浅色斜向文字水印,返回灰度图。 原理:正文为深黑色(灰度 < threshold),水印为浅灰(灰度 > threshold)。 将高于阈值的像素置为白色(255),保留低于阈值的深色正文。 Args: image: 输入图像(PIL.Image 或 np.ndarray BGR/RGB/灰度)。 threshold: 灰度阈值(0-255)。建议范围 140-180,默认 160。 越大越保守(可能残留水印),越小越激进(可能损失浅色正文)。 morph_close_kernel: 形态学闭运算核大小,用于填补字符断裂。0 跳过。 return_pil: None(与输入同类型)| True(PIL.Image)| False(np.ndarray)。 Returns: 去除水印后的灰度图:PIL.Image(mode='L') 或 np.ndarray(HxW, uint8)。 """ input_is_pil = isinstance(image, Image.Image) if input_is_pil: pil_img = image.convert('RGB') if image.mode == 'RGBA' else image np_img = np.array(pil_img) if np_img.ndim == 3: np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR) else: np_img = image.copy() gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img cleaned = gray.copy() cleaned[gray > threshold] = 255 if morph_close_kernel > 0: kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8) cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel) should_return_pil = input_is_pil if return_pil is None else return_pil return Image.fromarray(cleaned, mode='L') if should_return_pil else cleaned def remove_watermark_from_image_rgb( image: Union[np.ndarray, Image.Image], threshold: int = 160, morph_close_kernel: int = 2, return_pil: Optional[bool] = None, ) -> Union[np.ndarray, Image.Image]: """ 去除水印并返回 RGB 三通道图像。 与 remove_watermark_from_image 逻辑相同,但输出为 RGB(三通道), 方便直接传入布局检测、OCR 等需要彩色输入的下游模型。 Args/Returns: 同 remove_watermark_from_image,但输出为 RGB/BGR 三通道。 """ input_is_pil = isinstance(image, Image.Image) gray_result = remove_watermark_from_image(image, threshold, morph_close_kernel, return_pil=False) rgb_np = cv2.cvtColor(gray_result, cv2.COLOR_GRAY2BGR) should_return_pil = input_is_pil if return_pil is None else return_pil if should_return_pil: return Image.fromarray(cv2.cvtColor(rgb_np, cv2.COLOR_BGR2RGB)) return rgb_np # ───────────────────────────────────────────────────────────────────────────── # PDF 层级水印去除(文字型 PDF,保留可搜索性) # ───────────────────────────────────────────────────────────────────────────── def _is_watermark_xobj(doc, xref: int, obj_str: str) -> bool: """ 判断一个 Form XObject 是否为水印。 启发式规则(满足其一即视为水印): 1. 含旋转变换矩阵(cm 指令 sin/cos 分量非零),无论是否有 /Group 2. 有透明度组(/Group)且内容流包含透明度操作符(ca/CA) 3. 有透明度组且内容流体积 > 2KB(大量重复绘图 = 平铺水印) """ if "/Form" not in obj_str: return False try: stream = doc.xref_stream(xref) if not stream: return False stream_text = stream.decode("latin-1", errors="ignore") except Exception: return False has_group = "/Group" in obj_str cm_pattern = re.compile( r"([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+[-\d.]+\s+[-\d.]+\s+cm" ) for m in cm_pattern.finditer(stream_text): a, b, c, d = float(m.group(1)), float(m.group(2)), float(m.group(3)), float(m.group(4)) if abs(b) > 0.1 or abs(c) > 0.1: return True if not has_group: return False if re.search(r'\b(ca|CA)\s+[0-9.]+', stream_text) or re.search(r'[0-9.]+\s+(ca|CA)\b', stream_text): return True if len(stream_text) > 2048: return True return False def _is_watermark_image_xobj(doc, xref: int, obj_str: str) -> bool: """ 判断一个 Image XObject 是否为水印背景图。 判断规则(全部满足): 1. /Subtype /Image 2. 有 /SMask(半透明) 3. 宽 >= 600 且 高 >= 800(全页尺寸,排除小图标) 4. 解码后像素均值 >= 240(近乎全白,水印文字稀疏) """ if "/Image" not in obj_str or "/SMask" not in obj_str: return False w_m = re.search(r'/Width\s+(\d+)', obj_str) h_m = re.search(r'/Height\s+(\d+)', obj_str) if not w_m or not h_m: return False if int(w_m.group(1)) < 600 or int(h_m.group(1)) < 800: return False try: from io import BytesIO img_info = doc.extract_image(xref) pil_img = Image.open(BytesIO(img_info["image"])).convert("L") return float(np.array(pil_img).mean()) >= 240.0 except Exception: return False def _blank_watermark_image(doc, img_xref: int) -> None: """ 将水印 Image XObject 的 RGB 流和 SMask 替换为全白/全不透明。 关键点:必须先移除 /DecodeParms(Predictor 11),再调用 update_stream。 否则渲染器在 FlateDecode 之后还会尝试 Predictor 解码,失败后回退原始数据, 水印依然可见。 """ obj_str = doc.xref_object(img_xref) w_m = re.search(r'/Width\s+(\d+)', obj_str) h_m = re.search(r'/Height\s+(\d+)', obj_str) w = int(w_m.group(1)) if w_m else 1 h = int(h_m.group(1)) if h_m else 1 cs_m = re.search(r'/ColorSpace\s+/Device(RGB|Gray|CMYK)', obj_str) channels = {'RGB': 3, 'CMYK': 4}.get(cs_m.group(1) if cs_m else '', 1) doc.xref_set_key(img_xref, "DecodeParms", "null") doc.update_stream(img_xref, bytes([255]) * (w * h * channels)) smask_m = re.search(r'/SMask\s+(\d+)\s+0\s+R', obj_str) if smask_m: smask_xref = int(smask_m.group(1)) smask_obj = doc.xref_object(smask_xref) sw = int(m.group(1)) if (m := re.search(r'/Width\s+(\d+)', smask_obj)) else w sh = int(m.group(1)) if (m := re.search(r'/Height\s+(\d+)', smask_obj)) else h doc.xref_set_key(smask_xref, "DecodeParms", "null") doc.update_stream(smask_xref, bytes([255]) * (sw * sh)) def scan_pdf_watermark_xobjs(pdf_bytes: bytes, sample_pages: int = 3) -> bool: """ 快速扫描 PDF 前 N 页,判断是否含水印 XObject。 无副作用(只读),用于在执行去水印前快速判断,避免对无水印的大文件 执行全量扫描和序列化,显著降低财报等大文件的处理开销。 Args: pdf_bytes: PDF 文件的原始字节。 sample_pages: 扫描页数上限,默认 3(银行流水通常前几页有水印)。 Returns: True 表示发现水印 XObject,False 表示未发现。 """ try: import fitz except ImportError: return False doc = fitz.open(stream=pdf_bytes, filetype="pdf") pages_to_check = min(sample_pages, len(doc)) try: for i in range(pages_to_check): page = doc[i] for xref, *_ in page.get_xobjects(): try: obj_str = doc.xref_object(xref) except Exception: continue if _is_watermark_xobj(doc, xref, obj_str): return True for img_tuple in page.get_images(full=True): try: obj_str = doc.xref_object(img_tuple[0]) except Exception: continue if _is_watermark_image_xobj(doc, img_tuple[0], obj_str): return True finally: doc.close() return False def remove_txt_pdf_watermark(pdf_bytes: bytes) -> Optional[bytes]: """ 对文字型 PDF 执行原生水印去除,完全在内存中完成,不写临时文件。 支持两种水印形式: - Form XObject 水印:清空内容流 - Image XObject 水印(全页背景图 + SMask 透明通道):替换为全白像素 适用场景:pdf_type='txt' 的 PDF,去除后可直接传给渲染层(tobytes() → bytes)。 对于大文件(如财报),建议先用 scan_pdf_watermark_xobjs() 快速判断再调用本函数。 Args: pdf_bytes: 原始 PDF 的字节内容。 Returns: 去除水印后的 PDF bytes(garbage=4 压缩);若未发现水印返回 None。 """ try: import fitz except ImportError: raise ImportError("请安装 PyMuPDF: pip install PyMuPDF") from loguru import logger doc = fitz.open(stream=pdf_bytes, filetype="pdf") processed_xrefs: set[int] = set() total_removed = 0 for page in doc: # ── Form XObject 水印 ───────────────────────────────────────── for xref, name, _invoker, _unused in page.get_xobjects(): if xref in processed_xrefs: continue try: obj_str = doc.xref_object(xref) except Exception: continue if _is_watermark_xobj(doc, xref, obj_str): try: doc.update_stream(xref, b"") processed_xrefs.add(xref) total_removed += 1 logger.debug(f" [Form XObject] 清空水印 xref={xref}, name={name}") except Exception as e: logger.warning(f" 清空 Form XObject xref={xref} 失败: {e}") # ── Image XObject 水印 ──────────────────────────────────────── for img_tuple in page.get_images(full=True): img_xref = img_tuple[0] if img_xref in processed_xrefs: continue try: obj_str = doc.xref_object(img_xref) except Exception: continue if _is_watermark_image_xobj(doc, img_xref, obj_str): _blank_watermark_image(doc, img_xref) processed_xrefs.add(img_xref) total_removed += 1 logger.debug(f" [Image XObject] 替换水印图像 xref={img_xref}") if total_removed == 0: doc.close() return None result = doc.tobytes(garbage=4, deflate=True) doc.close() logger.info(f"✅ PDF 层级水印去除:共清除 {total_removed} 个水印 XObject") return result