"""水印 PDF XObject 水印(由 ocr_utils.watermark_utils 迁入)。""" from __future__ import annotations import json import re from pathlib import Path from typing import Any, Dict, Optional, Tuple, Union import cv2 import numpy as np from loguru import logger from PIL import Image def _is_watermark_xobj(doc, xref: int, obj_str: str) -> bool: """ 判断一个 Form XObject 是否为水印。 启发式规则(满足其一即视为水印): 1. 含旋转变换矩阵(cm 指令 sin/cos 分量非零),无论是否有 /Group 2. 有透明度组(/Group)且内容流包含透明度操作符(ca/CA) 3. 有透明度组且内容流体积 > 2KB(大量重复绘图 = 平铺水印) """ if "/Form" not in obj_str: return False try: stream = doc.xref_stream(xref) if not stream: return False stream_text = stream.decode("latin-1", errors="ignore") except Exception: return False has_group = "/Group" in obj_str cm_pattern = re.compile( r"([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+[-\d.]+\s+[-\d.]+\s+cm" ) for m in cm_pattern.finditer(stream_text): a, b, c, d = float(m.group(1)), float(m.group(2)), float(m.group(3)), float(m.group(4)) if abs(b) > 0.1 or abs(c) > 0.1: return True if not has_group: return False if re.search(r'\b(ca|CA)\s+[0-9.]+', stream_text) or re.search(r'[0-9.]+\s+(ca|CA)\b', stream_text): return True if len(stream_text) > 2048: return True return False def _is_watermark_image_xobj(doc, xref: int, obj_str: str) -> bool: """ 判断一个 Image XObject 是否为水印背景图。 判断规则(全部满足): 1. /Subtype /Image 2. 有 /SMask(半透明) 3. 宽 >= 600 且 高 >= 800(全页尺寸,排除小图标) 4. 解码后像素均值 >= 240(近乎全白,水印文字稀疏) """ if "/Image" not in obj_str or "/SMask" not in obj_str: return False w_m = re.search(r'/Width\s+(\d+)', obj_str) h_m = re.search(r'/Height\s+(\d+)', obj_str) if not w_m or not h_m: return False if int(w_m.group(1)) < 600 or int(h_m.group(1)) < 800: return False try: from io import BytesIO img_info = doc.extract_image(xref) pil_img = Image.open(BytesIO(img_info["image"])).convert("L") return float(np.array(pil_img).mean()) >= 240.0 except Exception: return False def _blank_watermark_image(doc, img_xref: int) -> None: """ 将水印 Image XObject 的 RGB 流和 SMask 替换为全白/全不透明。 关键点:必须先移除 /DecodeParms(Predictor 11),再调用 update_stream。 否则渲染器在 FlateDecode 之后还会尝试 Predictor 解码,失败后回退原始数据, 水印依然可见。 """ obj_str = doc.xref_object(img_xref) w_m = re.search(r'/Width\s+(\d+)', obj_str) h_m = re.search(r'/Height\s+(\d+)', obj_str) w = int(w_m.group(1)) if w_m else 1 h = int(h_m.group(1)) if h_m else 1 cs_m = re.search(r'/ColorSpace\s+/Device(RGB|Gray|CMYK)', obj_str) channels = {'RGB': 3, 'CMYK': 4}.get(cs_m.group(1) if cs_m else '', 1) doc.xref_set_key(img_xref, "DecodeParms", "null") doc.update_stream(img_xref, bytes([255]) * (w * h * channels)) smask_m = re.search(r'/SMask\s+(\d+)\s+0\s+R', obj_str) if smask_m: smask_xref = int(smask_m.group(1)) smask_obj = doc.xref_object(smask_xref) sw = int(m.group(1)) if (m := re.search(r'/Width\s+(\d+)', smask_obj)) else w sh = int(m.group(1)) if (m := re.search(r'/Height\s+(\d+)', smask_obj)) else h doc.xref_set_key(smask_xref, "DecodeParms", "null") doc.update_stream(smask_xref, bytes([255]) * (sw * sh)) def scan_pdf_watermark_xobjs(pdf_bytes: bytes, sample_pages: int = 3) -> bool: """ 快速扫描 PDF 前 N 页,判断是否含水印 XObject。 无副作用(只读),用于在执行去水印前快速判断,避免对无水印的大文件 执行全量扫描和序列化,显著降低财报等大文件的处理开销。 Args: pdf_bytes: PDF 文件的原始字节。 sample_pages: 扫描页数上限,默认 3(银行流水通常前几页有水印)。 Returns: True 表示发现水印 XObject,False 表示未发现。 """ try: import fitz except ImportError: return False doc = fitz.open(stream=pdf_bytes, filetype="pdf") pages_to_check = min(sample_pages, len(doc)) try: for i in range(pages_to_check): page = doc[i] for xref, *_ in page.get_xobjects(): try: obj_str = doc.xref_object(xref) except Exception: continue if _is_watermark_xobj(doc, xref, obj_str): return True for img_tuple in page.get_images(full=True): try: obj_str = doc.xref_object(img_tuple[0]) except Exception: continue if _is_watermark_image_xobj(doc, img_tuple[0], obj_str): return True finally: doc.close() return False def remove_txt_pdf_watermark(pdf_bytes: bytes) -> Optional[bytes]: """ 对文字型 PDF 执行原生水印去除,完全在内存中完成,不写临时文件。 支持两种水印形式: - Form XObject 水印:清空内容流 - Image XObject 水印(全页背景图 + SMask 透明通道):替换为全白像素 适用场景:pdf_type='txt' 的 PDF,去除后可直接传给渲染层(tobytes() → bytes)。 对于大文件(如财报),建议先用 scan_pdf_watermark_xobjs() 快速判断再调用本函数。 Args: pdf_bytes: 原始 PDF 的字节内容。 Returns: 去除水印后的 PDF bytes(garbage=4 压缩);若未发现水印返回 None。 """ try: import fitz except ImportError: raise ImportError("请安装 PyMuPDF: pip install PyMuPDF") from loguru import logger doc = fitz.open(stream=pdf_bytes, filetype="pdf") processed_xrefs: set[int] = set() total_removed = 0 for page in doc: # ── Form XObject 水印 ───────────────────────────────────────── for xref, name, _invoker, _unused in page.get_xobjects(): if xref in processed_xrefs: continue try: obj_str = doc.xref_object(xref) except Exception: continue if _is_watermark_xobj(doc, xref, obj_str): try: doc.update_stream(xref, b"") processed_xrefs.add(xref) total_removed += 1 logger.debug(f" [Form XObject] 清空水印 xref={xref}, name={name}") except Exception as e: logger.warning(f" 清空 Form XObject xref={xref} 失败: {e}") # ── Image XObject 水印 ──────────────────────────────────────── for img_tuple in page.get_images(full=True): img_xref = img_tuple[0] if img_xref in processed_xrefs: continue try: obj_str = doc.xref_object(img_xref) except Exception: continue if _is_watermark_image_xobj(doc, img_xref, obj_str): _blank_watermark_image(doc, img_xref) processed_xrefs.add(img_xref) total_removed += 1 logger.debug(f" [Image XObject] 替换水印图像 xref={img_xref}") if total_removed == 0: doc.close() return None result = doc.tobytes(garbage=4, deflate=True) doc.close() logger.info(f"✅ PDF 层级水印去除:共清除 {total_removed} 个水印 XObject") return result