| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- """水印 PDF XObject 水印(由 ocr_utils.watermark_utils 迁入)。"""
- from __future__ import annotations
- import json
- import re
- from pathlib import Path
- from typing import Any, Dict, Optional, Tuple, Union
- import cv2
- import numpy as np
- from loguru import logger
- from PIL import Image
- def _is_watermark_xobj(doc, xref: int, obj_str: str) -> bool:
- """
- 判断一个 Form XObject 是否为水印。
- 启发式规则(满足其一即视为水印):
- 1. 含旋转变换矩阵(cm 指令 sin/cos 分量非零),无论是否有 /Group
- 2. 有透明度组(/Group)且内容流包含透明度操作符(ca/CA)
- 3. 有透明度组且内容流体积 > 2KB(大量重复绘图 = 平铺水印)
- """
- if "/Form" not in obj_str:
- return False
- try:
- stream = doc.xref_stream(xref)
- if not stream:
- return False
- stream_text = stream.decode("latin-1", errors="ignore")
- except Exception:
- return False
- has_group = "/Group" in obj_str
- cm_pattern = re.compile(
- r"([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+[-\d.]+\s+[-\d.]+\s+cm"
- )
- for m in cm_pattern.finditer(stream_text):
- a, b, c, d = float(m.group(1)), float(m.group(2)), float(m.group(3)), float(m.group(4))
- if abs(b) > 0.1 or abs(c) > 0.1:
- return True
- if not has_group:
- return False
- if re.search(r'\b(ca|CA)\s+[0-9.]+', stream_text) or re.search(r'[0-9.]+\s+(ca|CA)\b', stream_text):
- return True
- if len(stream_text) > 2048:
- return True
- return False
- def _is_watermark_image_xobj(doc, xref: int, obj_str: str) -> bool:
- """
- 判断一个 Image XObject 是否为水印背景图。
- 判断规则(全部满足):
- 1. /Subtype /Image
- 2. 有 /SMask(半透明)
- 3. 宽 >= 600 且 高 >= 800(全页尺寸,排除小图标)
- 4. 解码后像素均值 >= 240(近乎全白,水印文字稀疏)
- """
- if "/Image" not in obj_str or "/SMask" not in obj_str:
- return False
- w_m = re.search(r'/Width\s+(\d+)', obj_str)
- h_m = re.search(r'/Height\s+(\d+)', obj_str)
- if not w_m or not h_m:
- return False
- if int(w_m.group(1)) < 600 or int(h_m.group(1)) < 800:
- return False
- try:
- from io import BytesIO
- img_info = doc.extract_image(xref)
- pil_img = Image.open(BytesIO(img_info["image"])).convert("L")
- return float(np.array(pil_img).mean()) >= 240.0
- except Exception:
- return False
- def _blank_watermark_image(doc, img_xref: int) -> None:
- """
- 将水印 Image XObject 的 RGB 流和 SMask 替换为全白/全不透明。
- 关键点:必须先移除 /DecodeParms(Predictor 11),再调用 update_stream。
- 否则渲染器在 FlateDecode 之后还会尝试 Predictor 解码,失败后回退原始数据,
- 水印依然可见。
- """
- obj_str = doc.xref_object(img_xref)
- w_m = re.search(r'/Width\s+(\d+)', obj_str)
- h_m = re.search(r'/Height\s+(\d+)', obj_str)
- w = int(w_m.group(1)) if w_m else 1
- h = int(h_m.group(1)) if h_m else 1
- cs_m = re.search(r'/ColorSpace\s+/Device(RGB|Gray|CMYK)', obj_str)
- channels = {'RGB': 3, 'CMYK': 4}.get(cs_m.group(1) if cs_m else '', 1)
- doc.xref_set_key(img_xref, "DecodeParms", "null")
- doc.update_stream(img_xref, bytes([255]) * (w * h * channels))
- smask_m = re.search(r'/SMask\s+(\d+)\s+0\s+R', obj_str)
- if smask_m:
- smask_xref = int(smask_m.group(1))
- smask_obj = doc.xref_object(smask_xref)
- sw = int(m.group(1)) if (m := re.search(r'/Width\s+(\d+)', smask_obj)) else w
- sh = int(m.group(1)) if (m := re.search(r'/Height\s+(\d+)', smask_obj)) else h
- doc.xref_set_key(smask_xref, "DecodeParms", "null")
- doc.update_stream(smask_xref, bytes([255]) * (sw * sh))
- def scan_pdf_watermark_xobjs(pdf_bytes: bytes, sample_pages: int = 3) -> bool:
- """
- 快速扫描 PDF 前 N 页,判断是否含水印 XObject。
- 无副作用(只读),用于在执行去水印前快速判断,避免对无水印的大文件
- 执行全量扫描和序列化,显著降低财报等大文件的处理开销。
- Args:
- pdf_bytes: PDF 文件的原始字节。
- sample_pages: 扫描页数上限,默认 3(银行流水通常前几页有水印)。
- Returns:
- True 表示发现水印 XObject,False 表示未发现。
- """
- try:
- import fitz
- except ImportError:
- return False
- doc = fitz.open(stream=pdf_bytes, filetype="pdf")
- pages_to_check = min(sample_pages, len(doc))
- try:
- for i in range(pages_to_check):
- page = doc[i]
- for xref, *_ in page.get_xobjects():
- try:
- obj_str = doc.xref_object(xref)
- except Exception:
- continue
- if _is_watermark_xobj(doc, xref, obj_str):
- return True
- for img_tuple in page.get_images(full=True):
- try:
- obj_str = doc.xref_object(img_tuple[0])
- except Exception:
- continue
- if _is_watermark_image_xobj(doc, img_tuple[0], obj_str):
- return True
- finally:
- doc.close()
- return False
- def remove_txt_pdf_watermark(pdf_bytes: bytes) -> Optional[bytes]:
- """
- 对文字型 PDF 执行原生水印去除,完全在内存中完成,不写临时文件。
- 支持两种水印形式:
- - Form XObject 水印:清空内容流
- - Image XObject 水印(全页背景图 + SMask 透明通道):替换为全白像素
- 适用场景:pdf_type='txt' 的 PDF,去除后可直接传给渲染层(tobytes() → bytes)。
- 对于大文件(如财报),建议先用 scan_pdf_watermark_xobjs() 快速判断再调用本函数。
- Args:
- pdf_bytes: 原始 PDF 的字节内容。
- Returns:
- 去除水印后的 PDF bytes(garbage=4 压缩);若未发现水印返回 None。
- """
- try:
- import fitz
- except ImportError:
- raise ImportError("请安装 PyMuPDF: pip install PyMuPDF")
- from loguru import logger
- doc = fitz.open(stream=pdf_bytes, filetype="pdf")
- processed_xrefs: set[int] = set()
- total_removed = 0
- for page in doc:
- # ── Form XObject 水印 ─────────────────────────────────────────
- for xref, name, _invoker, _unused in page.get_xobjects():
- if xref in processed_xrefs:
- continue
- try:
- obj_str = doc.xref_object(xref)
- except Exception:
- continue
- if _is_watermark_xobj(doc, xref, obj_str):
- try:
- doc.update_stream(xref, b"")
- processed_xrefs.add(xref)
- total_removed += 1
- logger.debug(f" [Form XObject] 清空水印 xref={xref}, name={name}")
- except Exception as e:
- logger.warning(f" 清空 Form XObject xref={xref} 失败: {e}")
- # ── Image XObject 水印 ────────────────────────────────────────
- for img_tuple in page.get_images(full=True):
- img_xref = img_tuple[0]
- if img_xref in processed_xrefs:
- continue
- try:
- obj_str = doc.xref_object(img_xref)
- except Exception:
- continue
- if _is_watermark_image_xobj(doc, img_xref, obj_str):
- _blank_watermark_image(doc, img_xref)
- processed_xrefs.add(img_xref)
- total_removed += 1
- logger.debug(f" [Image XObject] 替换水印图像 xref={img_xref}")
- if total_removed == 0:
- doc.close()
- return None
- result = doc.tobytes(garbage=4, deflate=True)
- doc.close()
- logger.info(f"✅ PDF 层级水印去除:共清除 {total_removed} 个水印 XObject")
- return result
|