watermark_utils.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. """
  2. 水印处理工具模块
  3. 统一管理所有水印检测与去除能力,供整个平台复用:
  4. - 图像级(扫描 PDF / 图片):
  5. detect_watermark() 检测图像中的斜向文字水印
  6. remove_watermark_from_image() 去除水印,返回灰度图
  7. remove_watermark_from_image_rgb() 去除水印,返回 RGB 图(适合模型输入)
  8. - PDF 层级(文字型 PDF,保留可搜索性):
  9. scan_pdf_watermark_xobjs() 快速扫描 PDF 是否含水印 XObject(无副作用)
  10. remove_txt_pdf_watermark() 从内存 PDF bytes 去除水印,返回新 bytes 或 None
  11. """
  12. from __future__ import annotations
  13. import re
  14. from typing import Optional, Union
  15. import cv2
  16. import numpy as np
  17. from PIL import Image
  18. # ─────────────────────────────────────────────────────────────────────────────
  19. # 图像级水印检测与去除
  20. # ─────────────────────────────────────────────────────────────────────────────
  21. def detect_watermark(
  22. image: Union[np.ndarray, Image.Image],
  23. midtone_low: int = 100,
  24. midtone_high: int = 220,
  25. ratio_threshold: float = 0.03,
  26. check_diagonal: bool = True,
  27. diagonal_angle_range: tuple = (30, 60),
  28. ) -> bool:
  29. """
  30. 检测图像中是否存在浅色斜向文字水印(银行流水类文档水印检测)。
  31. 原理:
  32. 1. 将图像转为灰度,提取「中间调」像素(midtone_low ~ midtone_high),
  33. 这些像素既不是纯白背景,也不是深黑正文,是浅灰水印的典型范围。
  34. 2. 若中间调像素占比超过 ratio_threshold,初步判定存在水印。
  35. 3. 若 check_diagonal=True,进一步用 Hough 直线变换验证中间调区域
  36. 是否呈现斜向(diagonal_angle_range 度)纹理,以排除灰色背景误报。
  37. Args:
  38. image: 输入图像,支持 PIL.Image 或 np.ndarray(BGR/RGB/灰度)。
  39. midtone_low: 中间调下限(默认 100),低于此视为深色正文。
  40. midtone_high: 中间调上限(默认 220),高于此视为纯白背景。
  41. ratio_threshold: 中间调像素占全图比例阈值(默认 0.03 即 3%)。
  42. check_diagonal: 是否进行斜向纹理验证(默认 True)。
  43. diagonal_angle_range: 斜向角度范围(度),默认 (30, 60),含 45° 斜水印。
  44. Returns:
  45. True 表示检测到水印,False 表示未检测到。
  46. """
  47. if isinstance(image, Image.Image):
  48. pil_img = image.convert('RGB') if image.mode == 'RGBA' else image
  49. np_img = np.array(pil_img)
  50. gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY) if np_img.ndim == 3 else np_img
  51. else:
  52. np_img = image
  53. gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img
  54. midtone_mask = (gray > midtone_low) & (gray < midtone_high)
  55. ratio = midtone_mask.sum() / gray.size
  56. if ratio < ratio_threshold:
  57. return False
  58. if not check_diagonal:
  59. return True
  60. midtone_uint8 = (midtone_mask.astype(np.uint8)) * 255
  61. edges = cv2.Canny(midtone_uint8, 50, 150, apertureSize=3)
  62. lines = cv2.HoughLines(edges, rho=1, theta=np.pi / 180, threshold=80)
  63. if lines is None:
  64. return False
  65. low_rad = np.deg2rad(diagonal_angle_range[0])
  66. high_rad = np.deg2rad(diagonal_angle_range[1])
  67. diagonal_count = 0
  68. for line in lines:
  69. theta = line[0][1]
  70. if low_rad <= theta <= high_rad or (np.pi - high_rad) <= theta <= (np.pi - low_rad):
  71. diagonal_count += 1
  72. return diagonal_count >= 2
  73. def remove_watermark_from_image(
  74. image: Union[np.ndarray, Image.Image],
  75. threshold: int = 160,
  76. morph_close_kernel: int = 2,
  77. return_pil: Optional[bool] = None,
  78. ) -> Union[np.ndarray, Image.Image]:
  79. """
  80. 去除图像中的浅色斜向文字水印,返回灰度图。
  81. 原理:正文为深黑色(灰度 < threshold),水印为浅灰(灰度 > threshold)。
  82. 将高于阈值的像素置为白色(255),保留低于阈值的深色正文。
  83. Args:
  84. image: 输入图像(PIL.Image 或 np.ndarray BGR/RGB/灰度)。
  85. threshold: 灰度阈值(0-255)。建议范围 140-180,默认 160。
  86. 越大越保守(可能残留水印),越小越激进(可能损失浅色正文)。
  87. morph_close_kernel: 形态学闭运算核大小,用于填补字符断裂。0 跳过。
  88. return_pil: None(与输入同类型)| True(PIL.Image)| False(np.ndarray)。
  89. Returns:
  90. 去除水印后的灰度图:PIL.Image(mode='L') 或 np.ndarray(HxW, uint8)。
  91. """
  92. input_is_pil = isinstance(image, Image.Image)
  93. if input_is_pil:
  94. pil_img = image.convert('RGB') if image.mode == 'RGBA' else image
  95. np_img = np.array(pil_img)
  96. if np_img.ndim == 3:
  97. np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR)
  98. else:
  99. np_img = image.copy()
  100. gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img
  101. cleaned = gray.copy()
  102. cleaned[gray > threshold] = 255
  103. if morph_close_kernel > 0:
  104. kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8)
  105. cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel)
  106. should_return_pil = input_is_pil if return_pil is None else return_pil
  107. return Image.fromarray(cleaned, mode='L') if should_return_pil else cleaned
  108. def remove_watermark_from_image_rgb(
  109. image: Union[np.ndarray, Image.Image],
  110. threshold: int = 160,
  111. morph_close_kernel: int = 2,
  112. return_pil: Optional[bool] = None,
  113. ) -> Union[np.ndarray, Image.Image]:
  114. """
  115. 去除水印并返回 RGB 三通道图像。
  116. 与 remove_watermark_from_image 逻辑相同,但输出为 RGB(三通道),
  117. 方便直接传入布局检测、OCR 等需要彩色输入的下游模型。
  118. Args/Returns: 同 remove_watermark_from_image,但输出为 RGB/BGR 三通道。
  119. """
  120. input_is_pil = isinstance(image, Image.Image)
  121. gray_result = remove_watermark_from_image(image, threshold, morph_close_kernel, return_pil=False)
  122. rgb_np = cv2.cvtColor(gray_result, cv2.COLOR_GRAY2BGR)
  123. should_return_pil = input_is_pil if return_pil is None else return_pil
  124. if should_return_pil:
  125. return Image.fromarray(cv2.cvtColor(rgb_np, cv2.COLOR_BGR2RGB))
  126. return rgb_np
  127. # ─────────────────────────────────────────────────────────────────────────────
  128. # PDF 层级水印去除(文字型 PDF,保留可搜索性)
  129. # ─────────────────────────────────────────────────────────────────────────────
  130. def _is_watermark_xobj(doc, xref: int, obj_str: str) -> bool:
  131. """
  132. 判断一个 Form XObject 是否为水印。
  133. 启发式规则(满足其一即视为水印):
  134. 1. 含旋转变换矩阵(cm 指令 sin/cos 分量非零),无论是否有 /Group
  135. 2. 有透明度组(/Group)且内容流包含透明度操作符(ca/CA)
  136. 3. 有透明度组且内容流体积 > 2KB(大量重复绘图 = 平铺水印)
  137. """
  138. if "/Form" not in obj_str:
  139. return False
  140. try:
  141. stream = doc.xref_stream(xref)
  142. if not stream:
  143. return False
  144. stream_text = stream.decode("latin-1", errors="ignore")
  145. except Exception:
  146. return False
  147. has_group = "/Group" in obj_str
  148. cm_pattern = re.compile(
  149. r"([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+[-\d.]+\s+[-\d.]+\s+cm"
  150. )
  151. for m in cm_pattern.finditer(stream_text):
  152. a, b, c, d = float(m.group(1)), float(m.group(2)), float(m.group(3)), float(m.group(4))
  153. if abs(b) > 0.1 or abs(c) > 0.1:
  154. return True
  155. if not has_group:
  156. return False
  157. if re.search(r'\b(ca|CA)\s+[0-9.]+', stream_text) or re.search(r'[0-9.]+\s+(ca|CA)\b', stream_text):
  158. return True
  159. if len(stream_text) > 2048:
  160. return True
  161. return False
  162. def _is_watermark_image_xobj(doc, xref: int, obj_str: str) -> bool:
  163. """
  164. 判断一个 Image XObject 是否为水印背景图。
  165. 判断规则(全部满足):
  166. 1. /Subtype /Image
  167. 2. 有 /SMask(半透明)
  168. 3. 宽 >= 600 且 高 >= 800(全页尺寸,排除小图标)
  169. 4. 解码后像素均值 >= 240(近乎全白,水印文字稀疏)
  170. """
  171. if "/Image" not in obj_str or "/SMask" not in obj_str:
  172. return False
  173. w_m = re.search(r'/Width\s+(\d+)', obj_str)
  174. h_m = re.search(r'/Height\s+(\d+)', obj_str)
  175. if not w_m or not h_m:
  176. return False
  177. if int(w_m.group(1)) < 600 or int(h_m.group(1)) < 800:
  178. return False
  179. try:
  180. from io import BytesIO
  181. img_info = doc.extract_image(xref)
  182. pil_img = Image.open(BytesIO(img_info["image"])).convert("L")
  183. return float(np.array(pil_img).mean()) >= 240.0
  184. except Exception:
  185. return False
  186. def _blank_watermark_image(doc, img_xref: int) -> None:
  187. """
  188. 将水印 Image XObject 的 RGB 流和 SMask 替换为全白/全不透明。
  189. 关键点:必须先移除 /DecodeParms(Predictor 11),再调用 update_stream。
  190. 否则渲染器在 FlateDecode 之后还会尝试 Predictor 解码,失败后回退原始数据,
  191. 水印依然可见。
  192. """
  193. obj_str = doc.xref_object(img_xref)
  194. w_m = re.search(r'/Width\s+(\d+)', obj_str)
  195. h_m = re.search(r'/Height\s+(\d+)', obj_str)
  196. w = int(w_m.group(1)) if w_m else 1
  197. h = int(h_m.group(1)) if h_m else 1
  198. cs_m = re.search(r'/ColorSpace\s+/Device(RGB|Gray|CMYK)', obj_str)
  199. channels = {'RGB': 3, 'CMYK': 4}.get(cs_m.group(1) if cs_m else '', 1)
  200. doc.xref_set_key(img_xref, "DecodeParms", "null")
  201. doc.update_stream(img_xref, bytes([255]) * (w * h * channels))
  202. smask_m = re.search(r'/SMask\s+(\d+)\s+0\s+R', obj_str)
  203. if smask_m:
  204. smask_xref = int(smask_m.group(1))
  205. smask_obj = doc.xref_object(smask_xref)
  206. sw = int(m.group(1)) if (m := re.search(r'/Width\s+(\d+)', smask_obj)) else w
  207. sh = int(m.group(1)) if (m := re.search(r'/Height\s+(\d+)', smask_obj)) else h
  208. doc.xref_set_key(smask_xref, "DecodeParms", "null")
  209. doc.update_stream(smask_xref, bytes([255]) * (sw * sh))
  210. def scan_pdf_watermark_xobjs(pdf_bytes: bytes, sample_pages: int = 3) -> bool:
  211. """
  212. 快速扫描 PDF 前 N 页,判断是否含水印 XObject。
  213. 无副作用(只读),用于在执行去水印前快速判断,避免对无水印的大文件
  214. 执行全量扫描和序列化,显著降低财报等大文件的处理开销。
  215. Args:
  216. pdf_bytes: PDF 文件的原始字节。
  217. sample_pages: 扫描页数上限,默认 3(银行流水通常前几页有水印)。
  218. Returns:
  219. True 表示发现水印 XObject,False 表示未发现。
  220. """
  221. try:
  222. import fitz
  223. except ImportError:
  224. return False
  225. doc = fitz.open(stream=pdf_bytes, filetype="pdf")
  226. pages_to_check = min(sample_pages, len(doc))
  227. try:
  228. for i in range(pages_to_check):
  229. page = doc[i]
  230. for xref, *_ in page.get_xobjects():
  231. try:
  232. obj_str = doc.xref_object(xref)
  233. except Exception:
  234. continue
  235. if _is_watermark_xobj(doc, xref, obj_str):
  236. return True
  237. for img_tuple in page.get_images(full=True):
  238. try:
  239. obj_str = doc.xref_object(img_tuple[0])
  240. except Exception:
  241. continue
  242. if _is_watermark_image_xobj(doc, img_tuple[0], obj_str):
  243. return True
  244. finally:
  245. doc.close()
  246. return False
  247. def remove_txt_pdf_watermark(pdf_bytes: bytes) -> Optional[bytes]:
  248. """
  249. 对文字型 PDF 执行原生水印去除,完全在内存中完成,不写临时文件。
  250. 支持两种水印形式:
  251. - Form XObject 水印:清空内容流
  252. - Image XObject 水印(全页背景图 + SMask 透明通道):替换为全白像素
  253. 适用场景:pdf_type='txt' 的 PDF,去除后可直接传给渲染层(tobytes() → bytes)。
  254. 对于大文件(如财报),建议先用 scan_pdf_watermark_xobjs() 快速判断再调用本函数。
  255. Args:
  256. pdf_bytes: 原始 PDF 的字节内容。
  257. Returns:
  258. 去除水印后的 PDF bytes(garbage=4 压缩);若未发现水印返回 None。
  259. """
  260. try:
  261. import fitz
  262. except ImportError:
  263. raise ImportError("请安装 PyMuPDF: pip install PyMuPDF")
  264. from loguru import logger
  265. doc = fitz.open(stream=pdf_bytes, filetype="pdf")
  266. processed_xrefs: set[int] = set()
  267. total_removed = 0
  268. for page in doc:
  269. # ── Form XObject 水印 ─────────────────────────────────────────
  270. for xref, name, _invoker, _unused in page.get_xobjects():
  271. if xref in processed_xrefs:
  272. continue
  273. try:
  274. obj_str = doc.xref_object(xref)
  275. except Exception:
  276. continue
  277. if _is_watermark_xobj(doc, xref, obj_str):
  278. try:
  279. doc.update_stream(xref, b"")
  280. processed_xrefs.add(xref)
  281. total_removed += 1
  282. logger.debug(f" [Form XObject] 清空水印 xref={xref}, name={name}")
  283. except Exception as e:
  284. logger.warning(f" 清空 Form XObject xref={xref} 失败: {e}")
  285. # ── Image XObject 水印 ────────────────────────────────────────
  286. for img_tuple in page.get_images(full=True):
  287. img_xref = img_tuple[0]
  288. if img_xref in processed_xrefs:
  289. continue
  290. try:
  291. obj_str = doc.xref_object(img_xref)
  292. except Exception:
  293. continue
  294. if _is_watermark_image_xobj(doc, img_xref, obj_str):
  295. _blank_watermark_image(doc, img_xref)
  296. processed_xrefs.add(img_xref)
  297. total_removed += 1
  298. logger.debug(f" [Image XObject] 替换水印图像 xref={img_xref}")
  299. if total_removed == 0:
  300. doc.close()
  301. return None
  302. result = doc.tobytes(garbage=4, deflate=True)
  303. doc.close()
  304. logger.info(f"✅ PDF 层级水印去除:共清除 {total_removed} 个水印 XObject")
  305. return result