watermark_utils.py 62 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712
  1. """
  2. 水印处理工具模块
  3. 统一管理所有水印检测与去除能力,供整个平台复用:
  4. - 图像级(扫描 PDF / 图片):
  5. detect_watermark() 检测图像中的斜向文字水印
  6. build_watermark_mask() 构建斜向浅灰水印掩膜(方案 D)
  7. remove_watermark_masked_adaptive() 掩膜 + 动态阈值去水印
  8. remove_watermark_from_image() 去除水印,返回灰度图
  9. remove_watermark_from_image_rgb() 去除水印,返回 RGB 图(适合模型输入)
  10. enhance_document_contrast() 去水印后对比度/笔画深度恢复
  11. save_watermark_removal_debug() 保存去水印前后对比调试图
  12. - PDF 层级(文字型 PDF,保留可搜索性):
  13. scan_pdf_watermark_xobjs() 快速扫描 PDF 是否含水印 XObject(无副作用)
  14. remove_txt_pdf_watermark() 从内存 PDF bytes 去除水印,返回新 bytes 或 None
  15. """
  16. from __future__ import annotations
  17. import json
  18. import re
  19. from pathlib import Path
  20. from typing import Any, Dict, Optional, Tuple, Union
  21. import cv2
  22. import numpy as np
  23. from loguru import logger
  24. from PIL import Image
  25. # ─────────────────────────────────────────────────────────────────────────────
  26. # 图像级水印检测与去除
  27. # ─────────────────────────────────────────────────────────────────────────────
  28. def detect_watermark(
  29. image: Union[np.ndarray, Image.Image],
  30. midtone_low: int = 100,
  31. midtone_high: int = 220,
  32. ratio_threshold: float = 0.03,
  33. check_diagonal: bool = True,
  34. diagonal_angle_range: tuple = (30, 60),
  35. ) -> bool:
  36. """
  37. 检测图像中是否存在浅色斜向文字水印(银行流水类文档水印检测)。
  38. 原理:
  39. 1. 将图像转为灰度,提取「中间调」像素(midtone_low ~ midtone_high),
  40. 这些像素既不是纯白背景,也不是深黑正文,是浅灰水印的典型范围。
  41. 2. 若中间调像素占比超过 ratio_threshold,初步判定存在水印。
  42. 3. 若 check_diagonal=True,进一步用 Hough 直线变换验证中间调区域
  43. 是否呈现斜向(diagonal_angle_range 度)纹理,以排除灰色背景误报。
  44. Args:
  45. image: 输入图像,支持 PIL.Image 或 np.ndarray(BGR/RGB/灰度)。
  46. midtone_low: 中间调下限(默认 100),低于此视为深色正文。
  47. midtone_high: 中间调上限(默认 220),高于此视为纯白背景。
  48. ratio_threshold: 中间调像素占全图比例阈值(默认 0.03 即 3%)。
  49. check_diagonal: 是否进行斜向纹理验证(默认 True)。
  50. diagonal_angle_range: 斜向角度范围(度),默认 (30, 60),含 45° 斜水印。
  51. Returns:
  52. True 表示检测到水印,False 表示未检测到。
  53. """
  54. if isinstance(image, Image.Image):
  55. pil_img = image.convert('RGB') if image.mode == 'RGBA' else image
  56. np_img = np.array(pil_img)
  57. gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY) if np_img.ndim == 3 else np_img
  58. else:
  59. np_img = image
  60. gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img
  61. midtone_mask = (gray > midtone_low) & (gray < midtone_high)
  62. ratio = midtone_mask.sum() / gray.size
  63. if ratio < ratio_threshold:
  64. return False
  65. if not check_diagonal:
  66. return True
  67. midtone_uint8 = (midtone_mask.astype(np.uint8)) * 255
  68. edges = cv2.Canny(midtone_uint8, 50, 150, apertureSize=3)
  69. lines = cv2.HoughLines(edges, rho=1, theta=np.pi / 180, threshold=80)
  70. if lines is None:
  71. return False
  72. low_rad = np.deg2rad(diagonal_angle_range[0])
  73. high_rad = np.deg2rad(diagonal_angle_range[1])
  74. diagonal_count = 0
  75. for line in lines:
  76. theta = line[0][1]
  77. if low_rad <= theta <= high_rad or (np.pi - high_rad) <= theta <= (np.pi - low_rad):
  78. diagonal_count += 1
  79. return True | False
  80. def _local_std_map(gray: np.ndarray, window: int = 5) -> np.ndarray:
  81. """局部标准差图(返回值与输入同形状)。"""
  82. gray = np.asarray(gray, dtype=np.float32)
  83. size = max(3, int(window))
  84. kernel = np.ones((size, size), dtype=np.float32) / (size * size)
  85. mean = cv2.filter2D(gray, -1, kernel)
  86. sq_mean = cv2.filter2D(gray * gray, -1, kernel)
  87. var = sq_mean - mean * mean
  88. var = np.maximum(var, 0)
  89. return np.sqrt(var)
  90. def _line_structuring_kernel(length: int, angle_deg: float) -> np.ndarray:
  91. """生成指定角度、长度的线形结构元(用于斜向水印形态学)。"""
  92. length = max(3, int(length))
  93. k = np.zeros((length, length), np.uint8)
  94. c = length // 2
  95. rad = np.deg2rad(angle_deg)
  96. dx = int(round(np.cos(rad) * (c - 1)))
  97. dy = int(round(np.sin(rad) * (c - 1)))
  98. cv2.line(k, (c - dx, c - dy), (c + dx, c + dy), 1, thickness=1)
  99. return k
  100. def _line_angle_deg(x1: int, y1: int, x2: int, y2: int) -> float:
  101. """线段方向角 [0, 180)(无向)。"""
  102. ang = float(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
  103. if ang < 0:
  104. ang += 180.0
  105. return ang
  106. def _angle_in_diagonal_ranges(
  107. angle_deg: float,
  108. ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((35.0, 55.0), (125.0, 145.0)),
  109. ) -> bool:
  110. for lo, hi in ranges:
  111. if lo <= angle_deg <= hi:
  112. return True
  113. return False
  114. def _angle_distance_deg(a: float, b: float) -> float:
  115. """无向角距离 [0, 90]。"""
  116. d = abs(float(a) - float(b)) % 180.0
  117. return min(d, 180.0 - d)
  118. def _line_length(x1: int, y1: int, x2: int, y2: int) -> float:
  119. return float(np.hypot(x2 - x1, y2 - y1))
  120. def _find_dominant_diagonal_angles(
  121. segments: list,
  122. *,
  123. angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)),
  124. smooth_sigma: float = 2.0,
  125. secondary_peak_ratio: float = 0.35,
  126. ) -> Tuple[list, np.ndarray]:
  127. """
  128. 按线段长度加权统计角度直方图,取主峰(及次峰)作为本页水印固定方向。
  129. Returns:
  130. dominant_angles: 1~2 个主导角度(度)
  131. hist_smooth: 长度 180 的平滑直方图
  132. """
  133. hist = np.zeros(180, dtype=np.float64)
  134. for x1, y1, x2, y2, ang, length in segments:
  135. if not _angle_in_diagonal_ranges(ang, angle_ranges):
  136. continue
  137. hist[int(ang) % 180] += length
  138. if hist.sum() <= 0:
  139. return [], hist
  140. ksize = max(3, int(smooth_sigma * 4) | 1)
  141. hist_smooth = cv2.GaussianBlur(
  142. hist.reshape(1, 180).astype(np.float32), (ksize, 1), smooth_sigma
  143. ).flatten().astype(np.float64)
  144. peaks: list = []
  145. for lo, hi in angle_ranges:
  146. lo_i, hi_i = int(lo), int(hi)
  147. sub = hist_smooth[lo_i : hi_i + 1]
  148. if sub.size == 0 or sub.max() <= 0:
  149. continue
  150. peak_ang = lo_i + int(sub.argmax())
  151. peaks.append((peak_ang, float(sub.max())))
  152. if not peaks:
  153. return [], hist_smooth
  154. peaks.sort(key=lambda x: -x[1])
  155. dominant: list = [peaks[0][0]]
  156. for ang, val in peaks[1:]:
  157. if val >= peaks[0][1] * secondary_peak_ratio:
  158. if all(_angle_distance_deg(ang, d) > 15 for d in dominant):
  159. dominant.append(ang)
  160. return dominant, hist_smooth
  161. def _render_angle_histogram(hist: np.ndarray, dominant_angles: list) -> np.ndarray:
  162. """角度直方图 debug 图(BGR)。"""
  163. h_img, w_img = 120, 360
  164. canvas = np.ones((h_img, w_img, 3), dtype=np.uint8) * 255
  165. if hist.max() <= 0:
  166. return canvas
  167. norm = (hist / hist.max() * (h_img - 20)).astype(np.int32)
  168. for i, h in enumerate(norm):
  169. x = int(i * (w_img - 1) / 179)
  170. cv2.line(canvas, (x, h_img - 10), (x, h_img - 10 - int(h)), (180, 180, 180), 1)
  171. for ang in dominant_angles:
  172. x = int(ang * (w_img - 1) / 179)
  173. cv2.line(canvas, (x, 0), (x, h_img - 1), (0, 0, 255), 2)
  174. cv2.putText(canvas, "angle (deg)", (w_img // 2 - 40, h_img - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 0), 1)
  175. return canvas
  176. def _build_diag_hough_region_mask(
  177. gray: np.ndarray,
  178. *,
  179. midtone_low: int = 200,
  180. midtone_high: int = 254,
  181. canny_low: int = 30,
  182. canny_high: int = 100,
  183. hough_threshold: int = 30,
  184. min_line_length: int = 40,
  185. max_line_gap: int = 15,
  186. angle_ranges: Tuple[Tuple[float, float], Tuple[float, float]] = ((25.0, 65.0), (115.0, 155.0)),
  187. angle_tolerance: float = 5.0,
  188. use_angle_statistics: bool = True,
  189. secondary_peak_ratio: float = 0.35,
  190. min_length_percentile: float = 25.0,
  191. line_thickness: int = 10,
  192. band_dilate_radius: int = 12,
  193. ) -> Tuple[np.ndarray, Dict[str, Any]]:
  194. """
  195. 方案 C:Canny + HoughLinesP + 角度直方图统计主峰,仅保留与本页水印方向一致的线段。
  196. """
  197. gray_u8 = np.asarray(gray, dtype=np.uint8)
  198. band = ((gray_u8 >= midtone_low) & (gray_u8 < midtone_high)).astype(np.uint8) * 255
  199. edges = cv2.Canny(band, int(canny_low), int(canny_high), apertureSize=3)
  200. lines_p = cv2.HoughLinesP(
  201. edges,
  202. rho=1,
  203. theta=np.pi / 180,
  204. threshold=int(hough_threshold),
  205. minLineLength=int(min_line_length),
  206. maxLineGap=int(max_line_gap),
  207. )
  208. line_mask = np.zeros_like(gray_u8, dtype=np.uint8)
  209. lines_all_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR)
  210. lines_filt_bgr = cv2.cvtColor(gray_u8, cv2.COLOR_GRAY2BGR)
  211. diag_candidates: list = []
  212. total_lines = 0
  213. if lines_p is not None:
  214. for seg in lines_p:
  215. x1, y1, x2, y2 = [int(v) for v in seg[0]]
  216. total_lines += 1
  217. ang = _line_angle_deg(x1, y1, x2, y2)
  218. length = _line_length(x1, y1, x2, y2)
  219. if not _angle_in_diagonal_ranges(ang, angle_ranges):
  220. continue
  221. diag_candidates.append((x1, y1, x2, y2, ang, length))
  222. cv2.line(lines_all_bgr, (x1, y1), (x2, y2), (128, 128, 128), 1)
  223. dominant_angles: list = []
  224. hist_smooth = np.zeros(180, dtype=np.float64)
  225. if use_angle_statistics and diag_candidates:
  226. dominant_angles, hist_smooth = _find_dominant_diagonal_angles(
  227. diag_candidates,
  228. angle_ranges=angle_ranges,
  229. secondary_peak_ratio=secondary_peak_ratio,
  230. )
  231. def _angle_matches(ang: float) -> bool:
  232. if not use_angle_statistics or not dominant_angles:
  233. return True
  234. return any(_angle_distance_deg(ang, d) <= angle_tolerance for d in dominant_angles)
  235. angle_matched = [
  236. s for s in diag_candidates if _angle_matches(s[4])
  237. ]
  238. if angle_matched and min_length_percentile > 0:
  239. lengths = np.array([s[5] for s in angle_matched], dtype=np.float32)
  240. len_th = float(np.percentile(lengths, min_length_percentile))
  241. angle_matched = [s for s in angle_matched if s[5] >= len_th]
  242. matched_keys = {(s[0], s[1], s[2], s[3]) for s in angle_matched}
  243. kept_lines: list = []
  244. for x1, y1, x2, y2, ang, _length in angle_matched:
  245. kept_lines.append((x1, y1, x2, y2, ang))
  246. cv2.line(line_mask, (x1, y1), (x2, y2), 255, thickness=int(line_thickness))
  247. cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 0, 255), 2)
  248. for x1, y1, x2, y2, _ang, _length in diag_candidates:
  249. if (x1, y1, x2, y2) not in matched_keys:
  250. cv2.line(lines_filt_bgr, (x1, y1), (x2, y2), (0, 180, 255), 1)
  251. geom = line_mask > 0
  252. if band_dilate_radius > 0 and np.any(geom):
  253. k = cv2.getStructuringElement(
  254. cv2.MORPH_ELLIPSE, (band_dilate_radius * 2 + 1, band_dilate_radius * 2 + 1)
  255. )
  256. geom = cv2.dilate(line_mask, k) > 0
  257. info: Dict[str, Any] = {
  258. "hough_total_lines": total_lines,
  259. "hough_diag_candidates": len(diag_candidates),
  260. "hough_kept_lines": len(kept_lines),
  261. "dominant_angles": dominant_angles,
  262. "angle_tolerance": angle_tolerance,
  263. "geom_mask_ratio": float(geom.sum() / gray_u8.size),
  264. "hough_lines_bgr": lines_filt_bgr,
  265. "hough_lines_all_bgr": lines_all_bgr,
  266. "angle_histogram_bgr": _render_angle_histogram(hist_smooth, dominant_angles),
  267. }
  268. return geom, info
  269. def _compute_block_orientation_debug_maps(
  270. gray: np.ndarray,
  271. *,
  272. block_size: int = 48,
  273. ) -> Tuple[np.ndarray, np.ndarray]:
  274. """分块 diag/hv 弱边缘占比图(仅 debug 热力图,0~1 float)。"""
  275. gray_f = np.asarray(gray, dtype=np.float32)
  276. bs = max(4, int(block_size))
  277. h_blocks = gray_f.shape[0] // bs
  278. w_blocks = gray_f.shape[1] // bs
  279. if h_blocks == 0 or w_blocks == 0:
  280. z = np.zeros_like(gray_f, dtype=np.float32)
  281. return z, z
  282. ph, pw = h_blocks * bs, w_blocks * bs
  283. gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3)
  284. gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3)
  285. mag = np.sqrt(gx * gx + gy * gy)
  286. ori = np.arctan2(gy, gx) * 180.0 / np.pi
  287. diag = (
  288. ((ori > 25) & (ori < 65))
  289. | ((ori > 115) & (ori < 155))
  290. | ((ori > -155) & (ori < -115))
  291. | ((ori > -65) & (ori < -25))
  292. )
  293. hv = (
  294. ((ori > -20) & (ori < 20))
  295. | ((ori > 160) | (ori < -160))
  296. | ((ori > 70) & (ori < 110))
  297. | ((ori > -110) & (ori < -70))
  298. )
  299. weak = (mag > 1) & (mag < 15)
  300. def _to_blocks(arr: np.ndarray) -> np.ndarray:
  301. return (
  302. arr[:ph, :pw]
  303. .reshape(h_blocks, bs, w_blocks, bs)
  304. .transpose(0, 2, 1, 3)
  305. .reshape(h_blocks, w_blocks, -1)
  306. )
  307. b_diag = _to_blocks(diag)
  308. b_hv = _to_blocks(hv)
  309. b_weak = _to_blocks(weak)
  310. diag_weak = np.sum(b_diag & b_weak, axis=2)
  311. hv_weak = np.sum(b_hv & b_weak, axis=2)
  312. total_weak = np.sum(b_weak, axis=2)
  313. with np.errstate(divide="ignore", invalid="ignore"):
  314. diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0).astype(np.float32)
  315. hv_ratio = np.where(total_weak > 0, hv_weak / total_weak, 0.0).astype(np.float32)
  316. diag_up = np.repeat(np.repeat(diag_ratio, bs, axis=0), bs, axis=1)
  317. hv_up = np.repeat(np.repeat(hv_ratio, bs, axis=0), bs, axis=1)
  318. diag_full = np.zeros_like(gray_f, dtype=np.float32)
  319. hv_full = np.zeros_like(gray_f, dtype=np.float32)
  320. diag_full[:ph, :pw] = diag_up
  321. hv_full[:ph, :pw] = hv_up
  322. return diag_full, hv_full
  323. def render_ratio_heatmap(ratio_map: np.ndarray) -> np.ndarray:
  324. """将 0~1 浮点占比图转为 BGR 热力图。"""
  325. r = np.clip(np.asarray(ratio_map, dtype=np.float32), 0.0, 1.0)
  326. u8 = (r * 255).astype(np.uint8)
  327. return cv2.applyColorMap(u8, cv2.COLORMAP_JET)
  328. def save_watermark_mask_debug_layers(
  329. image: np.ndarray,
  330. output_dir: Union[str, Path],
  331. stem: str,
  332. debug: Dict[str, Any],
  333. *,
  334. image_format: str = "png",
  335. ) -> Dict[str, str]:
  336. """保存分层 debug 图(方案 D)。"""
  337. out_dir = Path(output_dir)
  338. out_dir.mkdir(parents=True, exist_ok=True)
  339. fmt = (image_format or "png").lstrip(".")
  340. paths: Dict[str, str] = {}
  341. def _save_overlay(name: str, mask: Optional[np.ndarray], color=(0, 0, 255)) -> None:
  342. if mask is None or not np.any(mask):
  343. return
  344. ov = render_watermark_mask_overlay(image, mask, color=color)
  345. p = out_dir / f"{stem}_{name}.{fmt}"
  346. cv2.imwrite(str(p), cv2.cvtColor(ov, cv2.COLOR_RGB2BGR) if ov.shape[2] == 3 else ov)
  347. paths[name] = str(p)
  348. _save_overlay("wm_candidate_overlay", debug.get("wm_candidate"))
  349. _save_overlay("geom_region_overlay", debug.get("geom_region"), color=(0, 180, 255))
  350. _save_overlay("geom_candidate_overlay", debug.get("geom_candidate"), color=(0, 255, 0))
  351. _save_overlay("wm_mask_overlay", debug.get("wm_mask"), color=(255, 0, 0))
  352. hough_bgr = debug.get("hough_lines_bgr")
  353. if hough_bgr is not None:
  354. p = out_dir / f"{stem}_hough_lines.{fmt}"
  355. cv2.imwrite(str(p), hough_bgr)
  356. paths["hough_lines"] = str(p)
  357. hough_all = debug.get("hough_lines_all_bgr")
  358. if hough_all is not None:
  359. p = out_dir / f"{stem}_hough_lines_all.{fmt}"
  360. cv2.imwrite(str(p), hough_all)
  361. paths["hough_lines_all"] = str(p)
  362. angle_hist = debug.get("angle_histogram_bgr")
  363. if angle_hist is not None:
  364. p = out_dir / f"{stem}_angle_histogram.{fmt}"
  365. cv2.imwrite(str(p), angle_hist)
  366. paths["angle_histogram"] = str(p)
  367. diag_hm = debug.get("diag_ratio_heatmap")
  368. if diag_hm is not None:
  369. p = out_dir / f"{stem}_diag_ratio_heatmap.{fmt}"
  370. cv2.imwrite(str(p), diag_hm)
  371. paths["diag_ratio_heatmap"] = str(p)
  372. hv_hm = debug.get("hv_ratio_heatmap")
  373. if hv_hm is not None:
  374. p = out_dir / f"{stem}_hv_ratio_heatmap.{fmt}"
  375. cv2.imwrite(str(p), hv_hm)
  376. paths["hv_ratio_heatmap"] = str(p)
  377. return paths
  378. def _build_diag_region_mask(
  379. gray: np.ndarray,
  380. *,
  381. block_size: int = 48,
  382. diag_ratio_thresh: float = 0.20,
  383. light_gray_thresh: int = 238,
  384. light_ratio_thresh: float = 0.10,
  385. min_edge_count: int = 10,
  386. dilate_radius: int = 3,
  387. ) -> np.ndarray:
  388. """
  389. 分块梯度方向检测:返回对角线方向纹理占优的区域掩膜。
  390. 原理:水印是45°斜向字符,其梯度主方向在30-60°和120-150°。
  391. 分块统计该方向弱边缘占比,高频块标记为水印候选区域。
  392. Returns:
  393. bool ndarray, 与 gray 同形状,True=疑似斜向水印区域。
  394. """
  395. gray_f = np.asarray(gray, dtype=np.float32)
  396. img_h, img_w = gray_f.shape
  397. bs = max(4, int(block_size))
  398. # Sobel 梯度
  399. gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3)
  400. gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3)
  401. mag = np.sqrt(gx * gx + gy * gy)
  402. ori = np.arctan2(gy, gx) * 180.0 / np.pi
  403. # 对角线方向 (±45° 附近,即梯度 30-65° / 115-155°)
  404. diag = (
  405. ((ori > 25) & (ori < 65))
  406. | ((ori > 115) & (ori < 155))
  407. | ((ori > -155) & (ori < -115))
  408. | ((ori > -65) & (ori < -25))
  409. )
  410. h_blocks = img_h // bs
  411. w_blocks = img_w // bs
  412. if h_blocks == 0 or w_blocks == 0:
  413. return np.zeros_like(gray, dtype=bool)
  414. ph, pw = h_blocks * bs, w_blocks * bs
  415. # 分块统计
  416. def _to_blocks(arr: np.ndarray) -> np.ndarray:
  417. return arr[:ph, :pw].reshape(h_blocks, bs, w_blocks, bs).transpose(0, 2, 1, 3).reshape(h_blocks, w_blocks, -1)
  418. block_mag = _to_blocks(mag)
  419. block_diag = _to_blocks(diag)
  420. block_gray = _to_blocks(gray_f)
  421. weak = (block_mag > 1) & (block_mag < 15)
  422. diag_weak = np.sum(block_diag & weak, axis=2)
  423. total_weak = np.sum(weak, axis=2)
  424. with np.errstate(divide="ignore", invalid="ignore"):
  425. diag_ratio = np.where(total_weak > 0, diag_weak / total_weak, 0.0)
  426. light_ratio = np.mean(block_gray >= light_gray_thresh, axis=2)
  427. wm_blocks = (
  428. (diag_ratio > diag_ratio_thresh)
  429. & (light_ratio > light_ratio_thresh)
  430. & (total_weak > min_edge_count)
  431. )
  432. # 展开为像素掩膜
  433. wm_block_mask = np.repeat(np.repeat(wm_blocks, bs, axis=0), bs, axis=1)
  434. full_mask = np.zeros(gray_f.shape, dtype=bool)
  435. full_mask[:ph, :pw] = wm_block_mask
  436. if dilate_radius > 0:
  437. k = cv2.getStructuringElement(
  438. cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1)
  439. )
  440. full_mask = cv2.dilate(full_mask.astype(np.uint8), k) > 0
  441. return full_mask
  442. def _build_seal_protect_mask(
  443. bgr: np.ndarray,
  444. *,
  445. hue_high: int = 15,
  446. sat_min: int = 40,
  447. value_min: int = 30,
  448. ) -> np.ndarray:
  449. """红色/公章区域保护掩膜(True=保护,不置白)。"""
  450. hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV)
  451. lower1 = np.array([0, sat_min, value_min], dtype=np.uint8)
  452. upper1 = np.array([hue_high, 255, 255], dtype=np.uint8)
  453. lower2 = np.array([170, sat_min, value_min], dtype=np.uint8)
  454. upper2 = np.array([180, 255, 255], dtype=np.uint8)
  455. m1 = cv2.inRange(hsv, lower1, upper1)
  456. m2 = cv2.inRange(hsv, lower2, upper2)
  457. m2 = cv2.inRange(hsv, lower2, upper2)
  458. return (m1 > 0) | (m2 > 0)
  459. def _build_text_edge_protect(
  460. gray: np.ndarray,
  461. *,
  462. edge_window: int = 5,
  463. edge_std_thresh: float = 6.0,
  464. dilate_radius: int = 1,
  465. ) -> np.ndarray:
  466. """基于局部方差的笔画边缘保护掩膜(True=保护,不置白)。"""
  467. local_std = _local_std_map(gray, window=edge_window)
  468. edge_mask = local_std >= edge_std_thresh
  469. if dilate_radius > 0:
  470. k = cv2.getStructuringElement(
  471. cv2.MORPH_ELLIPSE, (dilate_radius * 2 + 1, dilate_radius * 2 + 1)
  472. )
  473. edge_mask = cv2.dilate(edge_mask.astype(np.uint8), k) > 0
  474. return edge_mask.astype(bool)
  475. def _build_watermark_mask_light_on_white(
  476. gray: np.ndarray,
  477. *,
  478. bgr: Optional[np.ndarray] = None,
  479. light_gray_low: int = 236,
  480. light_gray_high: int = 253,
  481. whiten_gray_low: int = 200,
  482. text_protect_gray_max: int = 130,
  483. text_protect_percentile: Optional[float] = None,
  484. background_threshold: int = 248,
  485. morph_close_kernel: int = 0,
  486. morph_close_iter: int = 1,
  487. morph_dilate_kernel: int = 0,
  488. morph_dilate_iter: int = 1,
  489. min_component_area: int = 200,
  490. low_variance_thresh: float = 0.0,
  491. edge_window: int = 5,
  492. direction_filter: str = "hough",
  493. debug_block_maps: bool = True,
  494. debug_block_size: int = 48,
  495. hough_midtone_low: int = 200,
  496. hough_midtone_high: int = 254,
  497. hough_canny_low: int = 30,
  498. hough_canny_high: int = 100,
  499. hough_threshold: int = 25,
  500. hough_min_line_length: int = 35,
  501. hough_max_line_gap: int = 18,
  502. hough_line_thickness: int = 12,
  503. hough_band_dilate_radius: int = 14,
  504. hough_angle_tolerance: float = 5.0,
  505. hough_use_angle_statistics: bool = True,
  506. hough_secondary_peak_ratio: float = 0.35,
  507. hough_min_length_percentile: float = 25.0,
  508. diag_block_size: int = 0,
  509. diag_ratio_thresh: float = 0.20,
  510. diag_light_ratio_thresh: float = 0.10,
  511. diag_min_edge_count: int = 10,
  512. diag_dilate_radius: int = 3,
  513. seal_protect: bool = True,
  514. seal_hue_high: int = 15,
  515. seal_sat_min: int = 40,
  516. ) -> Tuple[np.ndarray, Dict[str, Any]]:
  517. """
  518. 白底流水水印掩膜(方案 C + E)。
  519. 1. Hough 斜向线段 → geom_region(几何限定区域)
  520. 2. wm_candidate = 浅色带且非正文保护
  521. 3. wm_mask = geom_region(置白区域由几何约束;实际白化时再 g>=light_gray_low)
  522. 4. debug 输出 candidate / geom / 交集 / 热力图
  523. """
  524. gray_arr = np.asarray(gray)
  525. bg_th = int(background_threshold)
  526. low = int(light_gray_low)
  527. high = int(light_gray_high)
  528. if text_protect_gray_max > 0:
  529. t_protect = float(text_protect_gray_max)
  530. else:
  531. dark = gray_arr[gray_arr < min(130, bg_th)]
  532. if dark.size > 0 and text_protect_percentile is not None:
  533. t_protect = float(np.percentile(dark, text_protect_percentile))
  534. else:
  535. t_protect = 120.0
  536. text_protect = gray_arr <= t_protect
  537. low = max(low, int(t_protect) + 25)
  538. wm_candidate = (gray_arr >= low) & (gray_arr < high) & (~text_protect)
  539. direction = (direction_filter or "hough").lower().strip()
  540. hough_info: Dict[str, Any] = {}
  541. geom_region = np.zeros_like(gray_arr, dtype=bool)
  542. if direction == "hough":
  543. geom_region, hough_info = _build_diag_hough_region_mask(
  544. gray_arr,
  545. midtone_low=hough_midtone_low,
  546. midtone_high=hough_midtone_high,
  547. canny_low=hough_canny_low,
  548. canny_high=hough_canny_high,
  549. hough_threshold=hough_threshold,
  550. min_line_length=hough_min_line_length,
  551. max_line_gap=hough_max_line_gap,
  552. angle_tolerance=hough_angle_tolerance,
  553. use_angle_statistics=hough_use_angle_statistics,
  554. secondary_peak_ratio=hough_secondary_peak_ratio,
  555. min_length_percentile=hough_min_length_percentile,
  556. line_thickness=hough_line_thickness,
  557. band_dilate_radius=hough_band_dilate_radius,
  558. )
  559. elif diag_block_size > 0:
  560. geom_region = _build_diag_region_mask(
  561. gray_arr,
  562. block_size=diag_block_size,
  563. diag_ratio_thresh=diag_ratio_thresh,
  564. light_gray_thresh=low,
  565. light_ratio_thresh=diag_light_ratio_thresh,
  566. min_edge_count=diag_min_edge_count,
  567. dilate_radius=diag_dilate_radius,
  568. )
  569. geom_candidate = geom_region & wm_candidate
  570. wm_mask = geom_region.copy()
  571. if min_component_area > 0 and np.any(wm_mask):
  572. n_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
  573. wm_mask.astype(np.uint8), connectivity=8
  574. )
  575. filtered = np.zeros_like(wm_mask)
  576. for i in range(1, n_labels):
  577. if stats[i, cv2.CC_STAT_AREA] >= min_component_area:
  578. filtered[labels == i] = True
  579. if np.any(filtered):
  580. wm_mask = filtered
  581. elif np.any(geom_region):
  582. wm_mask = geom_region
  583. seal_mask = np.zeros_like(wm_mask, dtype=bool)
  584. if seal_protect and bgr is not None and bgr.ndim == 3:
  585. seal_mask = _build_seal_protect_mask(
  586. bgr, hue_high=seal_hue_high, sat_min=seal_sat_min
  587. )
  588. wm_mask &= ~seal_mask
  589. midtone = (gray_arr >= low) & (gray_arr < high)
  590. debug: Dict[str, Any] = {
  591. "mask_mode": "light_on_white",
  592. "direction_filter": direction,
  593. "light_gray_low": low,
  594. "light_gray_high": high,
  595. "midtone_ratio": float(midtone.sum() / gray_arr.size),
  596. "wm_candidate_ratio": float(wm_candidate.sum() / gray_arr.size),
  597. "geom_mask_ratio": float(geom_region.sum() / gray_arr.size),
  598. "geom_candidate_ratio": float(geom_candidate.sum() / gray_arr.size),
  599. "wm_mask_ratio": float(wm_mask.sum() / gray_arr.size),
  600. "T_protect": t_protect,
  601. "text_protect_gray_max": text_protect_gray_max,
  602. "text_protect": text_protect,
  603. "seal_protect": seal_mask,
  604. "wm_candidate": wm_candidate,
  605. "geom_region": geom_region,
  606. "geom_candidate": geom_candidate,
  607. "diag_region": geom_region,
  608. "wm_mask": wm_mask,
  609. "whiten_gray_low": int(whiten_gray_low),
  610. "hough_lines_bgr": hough_info.get("hough_lines_bgr"),
  611. "hough_lines_all_bgr": hough_info.get("hough_lines_all_bgr"),
  612. "angle_histogram_bgr": hough_info.get("angle_histogram_bgr"),
  613. "dominant_angles": hough_info.get("dominant_angles", []),
  614. "hough_kept_lines": hough_info.get("hough_kept_lines", 0),
  615. "hough_diag_candidates": hough_info.get("hough_diag_candidates", 0),
  616. "hough_total_lines": hough_info.get("hough_total_lines", 0),
  617. }
  618. if debug_block_maps:
  619. bs = debug_block_size if debug_block_size > 0 else 48
  620. diag_map, hv_map = _compute_block_orientation_debug_maps(gray_arr, block_size=bs)
  621. debug["diag_ratio_heatmap"] = render_ratio_heatmap(diag_map)
  622. debug["hv_ratio_heatmap"] = render_ratio_heatmap(hv_map)
  623. return wm_mask, debug
  624. def build_watermark_mask(
  625. gray: np.ndarray,
  626. *,
  627. bgr: Optional[np.ndarray] = None,
  628. mask_mode: str = "diagonal_midtone",
  629. light_gray_low: int = 236,
  630. light_gray_high: int = 253,
  631. whiten_gray_low: int = 200,
  632. text_protect_gray_max: int = 130,
  633. morph_close_kernel: int = 0,
  634. morph_close_iter: int = 1,
  635. morph_dilate_kernel: int = 0,
  636. morph_dilate_iter: int = 1,
  637. low_variance_thresh: float = 0.0,
  638. edge_window: int = 5,
  639. direction_filter: str = "hough",
  640. debug_block_maps: bool = True,
  641. debug_block_size: int = 48,
  642. hough_midtone_low: int = 200,
  643. hough_midtone_high: int = 254,
  644. hough_canny_low: int = 30,
  645. hough_canny_high: int = 100,
  646. hough_threshold: int = 25,
  647. hough_min_line_length: int = 35,
  648. hough_max_line_gap: int = 18,
  649. hough_line_thickness: int = 12,
  650. hough_band_dilate_radius: int = 14,
  651. hough_angle_tolerance: float = 5.0,
  652. hough_use_angle_statistics: bool = True,
  653. hough_secondary_peak_ratio: float = 0.35,
  654. hough_min_length_percentile: float = 25.0,
  655. diag_block_size: int = 0,
  656. diag_ratio_thresh: float = 0.20,
  657. diag_light_ratio_thresh: float = 0.10,
  658. diag_min_edge_count: int = 10,
  659. diag_dilate_radius: int = 3,
  660. # diagonal_midtone 参数
  661. midtone_low: int = 100,
  662. midtone_high: int = 220,
  663. remove_horizontal_vertical: bool = True,
  664. diagonal_enhance: bool = True,
  665. diagonal_kernel_length: int = 25,
  666. horizontal_kernel_length: int = 35,
  667. vertical_kernel_length: int = 35,
  668. morph_open_kernel: int = 2,
  669. dmorph_close_kernel: int = 3,
  670. min_component_area: int = 200,
  671. text_protect_percentile: float = 10.0,
  672. background_threshold: int = 248,
  673. seal_protect: bool = True,
  674. seal_hue_high: int = 15,
  675. seal_sat_min: int = 40,
  676. ) -> Tuple[np.ndarray, Dict[str, Any]]:
  677. """
  678. 构建水印掩膜 wm_mask(True=疑似水印像素)。
  679. mask_mode:
  680. light_on_white — Hough 斜向几何带 + 浅色白化(方案 C/E)
  681. diagonal_midtone — 中间调 + 斜向形态学(旧逻辑)
  682. """
  683. gray = np.asarray(gray)
  684. if gray.ndim != 2:
  685. raise ValueError("build_watermark_mask expects single-channel grayscale")
  686. mode = (mask_mode or "light_on_white").lower().strip()
  687. if mode == "light_on_white":
  688. return _build_watermark_mask_light_on_white(
  689. gray,
  690. bgr=bgr,
  691. light_gray_low=light_gray_low,
  692. light_gray_high=light_gray_high,
  693. whiten_gray_low=whiten_gray_low,
  694. text_protect_gray_max=text_protect_gray_max,
  695. text_protect_percentile=text_protect_percentile,
  696. background_threshold=background_threshold,
  697. morph_close_kernel=morph_close_kernel,
  698. morph_close_iter=morph_close_iter,
  699. morph_dilate_kernel=morph_dilate_kernel,
  700. morph_dilate_iter=morph_dilate_iter,
  701. low_variance_thresh=low_variance_thresh,
  702. edge_window=edge_window,
  703. min_component_area=min_component_area,
  704. direction_filter=direction_filter,
  705. debug_block_maps=debug_block_maps,
  706. debug_block_size=debug_block_size,
  707. hough_midtone_low=hough_midtone_low,
  708. hough_midtone_high=hough_midtone_high,
  709. hough_canny_low=hough_canny_low,
  710. hough_canny_high=hough_canny_high,
  711. hough_threshold=hough_threshold,
  712. hough_min_line_length=hough_min_line_length,
  713. hough_max_line_gap=hough_max_line_gap,
  714. hough_line_thickness=hough_line_thickness,
  715. hough_band_dilate_radius=hough_band_dilate_radius,
  716. hough_angle_tolerance=hough_angle_tolerance,
  717. hough_use_angle_statistics=hough_use_angle_statistics,
  718. hough_secondary_peak_ratio=hough_secondary_peak_ratio,
  719. hough_min_length_percentile=hough_min_length_percentile,
  720. diag_block_size=diag_block_size,
  721. diag_ratio_thresh=diag_ratio_thresh,
  722. diag_light_ratio_thresh=diag_light_ratio_thresh,
  723. diag_min_edge_count=diag_min_edge_count,
  724. diag_dilate_radius=diag_dilate_radius,
  725. seal_protect=seal_protect,
  726. seal_hue_high=seal_hue_high,
  727. seal_sat_min=seal_sat_min,
  728. )
  729. midtone = (gray > midtone_low) & (gray < midtone_high)
  730. mid_u8 = (midtone.astype(np.uint8)) * 255
  731. horiz = np.zeros_like(midtone, dtype=bool)
  732. vert = np.zeros_like(midtone, dtype=bool)
  733. if remove_horizontal_vertical:
  734. kh = cv2.getStructuringElement(
  735. cv2.MORPH_RECT, (max(3, horizontal_kernel_length), 1)
  736. )
  737. kv = cv2.getStructuringElement(
  738. cv2.MORPH_RECT, (1, max(3, vertical_kernel_length))
  739. )
  740. horiz = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kh) > 0
  741. vert = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, kv) > 0
  742. # 中间调去掉明显横竖线(保留斜向水印)
  743. candidate = midtone & ~(horiz | vert)
  744. if diagonal_enhance:
  745. k45 = _line_structuring_kernel(diagonal_kernel_length, 45)
  746. k135 = _line_structuring_kernel(diagonal_kernel_length, 135)
  747. d45 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k45) > 0
  748. d135 = cv2.morphologyEx(mid_u8, cv2.MORPH_OPEN, k135) > 0
  749. direction = d45 | d135
  750. dilate_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7))
  751. near_diag = cv2.dilate(direction.astype(np.uint8), dilate_k) > 0
  752. # 斜向结构足够时收窄到斜向附近;否则保留「中间调减横竖」结果
  753. if near_diag.sum() > gray.size * 0.001:
  754. candidate = candidate & near_diag
  755. cand_u8 = (candidate.astype(np.uint8)) * 255
  756. if morph_open_kernel > 0:
  757. k_open = cv2.getStructuringElement(
  758. cv2.MORPH_ELLIPSE, (morph_open_kernel, morph_open_kernel)
  759. )
  760. cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_OPEN, k_open)
  761. if dmorph_close_kernel > 0:
  762. k_close = cv2.getStructuringElement(
  763. cv2.MORPH_ELLIPSE, (dmorph_close_kernel, dmorph_close_kernel)
  764. )
  765. cand_u8 = cv2.morphologyEx(cand_u8, cv2.MORPH_CLOSE, k_close)
  766. wm_mask = cand_u8 > 0
  767. if min_component_area > 0:
  768. n_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
  769. wm_mask.astype(np.uint8), connectivity=8
  770. )
  771. filtered = np.zeros_like(wm_mask)
  772. for i in range(1, n_labels):
  773. if stats[i, cv2.CC_STAT_AREA] >= min_component_area:
  774. filtered[labels == i] = True
  775. wm_mask = filtered
  776. non_bg = gray[gray < background_threshold]
  777. if non_bg.size > 0:
  778. t_protect = float(np.percentile(non_bg, text_protect_percentile))
  779. else:
  780. t_protect = 85.0
  781. t_protect = max(t_protect, float(midtone_low))
  782. text_protect = gray <= t_protect
  783. midtone_ratio = float(midtone.sum() / gray.size)
  784. wm_ratio = float(wm_mask.sum() / gray.size)
  785. # 掩膜过小:回退为「中间调减横竖」或整块中间调(满版斜纹水印常见)
  786. min_wm_ratio = max(0.005, midtone_ratio * 0.12)
  787. if wm_ratio < min_wm_ratio:
  788. relaxed = midtone & ~(horiz | vert) & (~text_protect)
  789. if relaxed.sum() / gray.size < min_wm_ratio:
  790. relaxed = midtone & (~text_protect)
  791. wm_mask = relaxed
  792. wm_ratio = float(wm_mask.sum() / gray.size)
  793. seal_mask = np.zeros_like(wm_mask, dtype=bool)
  794. if seal_protect and bgr is not None and bgr.ndim == 3:
  795. seal_mask = _build_seal_protect_mask(
  796. bgr, hue_high=seal_hue_high, sat_min=seal_sat_min
  797. )
  798. debug: Dict[str, Any] = {
  799. "mask_mode": "diagonal_midtone",
  800. "midtone_ratio": midtone_ratio,
  801. "wm_mask_ratio": wm_ratio,
  802. "T_protect": t_protect,
  803. "text_protect": text_protect,
  804. "seal_protect": seal_mask,
  805. "midtone_mask": midtone,
  806. "wm_mask": wm_mask,
  807. }
  808. return wm_mask, debug
  809. def remove_watermark_masked_adaptive(
  810. gray: np.ndarray,
  811. *,
  812. bgr: Optional[np.ndarray] = None,
  813. mask_cfg: Optional[Dict[str, Any]] = None,
  814. adaptive_cfg: Optional[Dict[str, Any]] = None,
  815. threshold_fallback: int = 175,
  816. morph_close_kernel: int = 0,
  817. ) -> Tuple[np.ndarray, Dict[str, Any]]:
  818. """
  819. 掩膜内置白(whiten_mode=mask_fill)或掩膜内动态阈值(threshold_in_mask)。
  820. 掩膜为空时回退全局 threshold_fallback。
  821. """
  822. gray = np.asarray(gray).copy()
  823. mcfg: Dict[str, Any] = {
  824. "mask_mode": "light_on_white",
  825. "light_gray_low": 236,
  826. "light_gray_high": 253,
  827. "whiten_gray_low": 200,
  828. "text_protect_gray_max": 130,
  829. "morph_close_kernel": 0,
  830. "morph_close_iter": 1,
  831. "morph_dilate_kernel": 0,
  832. "morph_dilate_iter": 1,
  833. "low_variance_thresh": 0.0,
  834. "edge_window": 5,
  835. "min_component_area": 200,
  836. "direction_filter": "hough",
  837. "debug_block_maps": True,
  838. "debug_block_size": 48,
  839. "hough_midtone_low": 200,
  840. "hough_midtone_high": 254,
  841. "hough_canny_low": 30,
  842. "hough_canny_high": 100,
  843. "hough_threshold": 25,
  844. "hough_min_line_length": 35,
  845. "hough_max_line_gap": 18,
  846. "hough_line_thickness": 12,
  847. "hough_band_dilate_radius": 14,
  848. "hough_angle_tolerance": 5.0,
  849. "hough_use_angle_statistics": True,
  850. "hough_secondary_peak_ratio": 0.35,
  851. "hough_min_length_percentile": 25.0,
  852. "diag_block_size": 0,
  853. "diag_ratio_thresh": 0.20,
  854. "diag_light_ratio_thresh": 0.10,
  855. "diag_min_edge_count": 10,
  856. "diag_dilate_radius": 3,
  857. "midtone_low": 100,
  858. "midtone_high": 220,
  859. "remove_horizontal_vertical": True,
  860. "diagonal_enhance": True,
  861. "diagonal_kernel_length": 25,
  862. "horizontal_kernel_length": 35,
  863. "vertical_kernel_length": 35,
  864. "morph_open_kernel": 2,
  865. "dmorph_close_kernel": 3,
  866. "text_protect_percentile": 10.0,
  867. "background_threshold": 248,
  868. "seal_protect": True,
  869. "seal_hue_high": 15,
  870. "seal_sat_min": 40,
  871. }
  872. mcfg.update(mask_cfg or {})
  873. mask_mode = str(mcfg.get("mask_mode", "light_on_white")).lower().strip()
  874. # light_on_white 默认 mask_fill
  875. acfg: Dict[str, Any] = {
  876. "whiten_mode": None,
  877. "text_percentile": 10.0,
  878. "watermark_percentile": 88.0,
  879. "background_percentile": 95.0,
  880. "background_threshold": 248,
  881. "wm_margin": 12,
  882. "text_protect_max": 120,
  883. }
  884. acfg.update(adaptive_cfg or {})
  885. whiten_mode = acfg.get("whiten_mode")
  886. if not whiten_mode:
  887. whiten_mode = (
  888. "mask_fill"
  889. if mask_mode == "light_on_white"
  890. else "threshold_in_mask"
  891. )
  892. whiten_mode = str(whiten_mode).lower().strip()
  893. wm_mask, debug = build_watermark_mask(gray, bgr=bgr, **mcfg)
  894. if not np.any(wm_mask):
  895. cleaned = gray.copy()
  896. cleaned[gray > threshold_fallback] = 255
  897. debug["mode"] = "fallback_threshold"
  898. debug["threshold_fallback"] = threshold_fallback
  899. if morph_close_kernel > 0:
  900. kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8)
  901. cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel)
  902. return cleaned, debug
  903. bg_th = int(acfg["background_threshold"])
  904. bg_pixels = gray[gray >= bg_th]
  905. if bg_pixels.size > 0:
  906. b_level = float(np.percentile(bg_pixels, acfg["background_percentile"]))
  907. else:
  908. b_level = 250.0
  909. if mask_mode == "light_on_white":
  910. t_protect = float(debug.get("T_protect", 150.0))
  911. else:
  912. non_bg = gray[gray < bg_th]
  913. if non_bg.size > 0:
  914. t_protect = float(np.percentile(non_bg, acfg["text_percentile"]))
  915. else:
  916. t_protect = float(debug.get("T_protect", 85.0))
  917. t_protect = min(t_protect, float(acfg["text_protect_max"]))
  918. t_protect = max(t_protect, float(mcfg.get("midtone_low", 100)))
  919. text_protect = debug["text_protect"]
  920. seal_protect = debug["seal_protect"]
  921. t_wm: Optional[float] = None
  922. if whiten_mode == "mask_fill":
  923. # 几何带内:g>=whiten_gray_low 置白;g<=130 正文硬保护(方案 E)
  924. wm_gray_low = float(
  925. mcfg.get("whiten_gray_low", debug.get("whiten_gray_low", 200))
  926. )
  927. to_white = (
  928. wm_mask
  929. & (gray >= wm_gray_low)
  930. & (gray < int(mcfg.get("light_gray_high", 254)))
  931. & (~text_protect)
  932. & (~seal_protect)
  933. )
  934. else:
  935. mask_vals = gray[wm_mask]
  936. if mask_vals.size > 0:
  937. t_wm = float(np.percentile(mask_vals, acfg["watermark_percentile"]))
  938. else:
  939. t_wm = t_protect + 0.45 * (b_level - t_protect)
  940. margin = float(acfg["wm_margin"])
  941. t_wm = max(t_wm, t_protect + margin)
  942. t_wm = min(t_wm, b_level - 3.0)
  943. t_wm = min(t_wm, float(mcfg.get("midtone_high", 220)) - 5.0)
  944. to_white = wm_mask & (gray >= t_wm) & (~text_protect) & (~seal_protect)
  945. cleaned = gray.copy()
  946. cleaned[to_white] = 255
  947. if morph_close_kernel > 0:
  948. kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8)
  949. cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel)
  950. debug.update(
  951. {
  952. "mode": "masked_adaptive",
  953. "mask_mode": mask_mode,
  954. "whiten_mode": whiten_mode,
  955. "T_wm": t_wm,
  956. "T_protect": t_protect,
  957. "B_level": b_level,
  958. "white_pixel_ratio": float(to_white.sum() / gray.size),
  959. "threshold_fallback": threshold_fallback,
  960. }
  961. )
  962. return cleaned, debug
  963. def _image_to_gray_and_bgr(
  964. image: Union[np.ndarray, Image.Image],
  965. ) -> Tuple[np.ndarray, Optional[np.ndarray]]:
  966. """统一为灰度 + 可选 BGR(用于掩膜公章保护)。"""
  967. if isinstance(image, Image.Image):
  968. pil_img = image.convert("RGB") if image.mode == "RGBA" else image
  969. np_img = np.array(pil_img)
  970. np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR)
  971. else:
  972. np_img = image.copy()
  973. if np_img.ndim == 3:
  974. bgr = np_img
  975. gray = cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY)
  976. else:
  977. bgr = None
  978. gray = np_img
  979. return gray, bgr
  980. def _enhance_text_restore(
  981. gray: np.ndarray,
  982. *,
  983. background_threshold: int = 248,
  984. text_lo_percentile: float = 1.0,
  985. text_hi_percentile: float = 99.0,
  986. text_black_target: int = 85,
  987. ) -> np.ndarray:
  988. """
  989. 仅对非背景像素做动态范围压缩,将最深笔画拉向 text_black_target(默认 ~85,接近扫描件原图)。
  990. 背景(>= background_threshold)保持白色,避免整图 gamma 导致背景发灰。
  991. """
  992. result = gray.copy()
  993. bg_th = int(np.clip(background_threshold, 200, 255))
  994. text_mask = gray < bg_th
  995. if not np.any(text_mask):
  996. return result
  997. vals = gray[text_mask].astype(np.float32)
  998. lo = float(np.percentile(vals, text_lo_percentile))
  999. hi = float(np.percentile(vals, text_hi_percentile))
  1000. target = int(np.clip(text_black_target, 10, 200))
  1001. if hi <= lo + 1.0:
  1002. return result
  1003. stretched = (vals - lo) * target / (hi - lo)
  1004. result[text_mask] = np.clip(stretched, 0, 255).astype(np.uint8)
  1005. return result
  1006. def enhance_document_contrast(
  1007. gray: np.ndarray,
  1008. method: str = "text_restore",
  1009. *,
  1010. clip_limit: float = 2.0,
  1011. tile_grid_size: int = 8,
  1012. gamma: float = 0.85,
  1013. black_percentile: float = 2.0,
  1014. white_percentile: float = 98.0,
  1015. background_threshold: int = 248,
  1016. text_lo_percentile: float = 1.0,
  1017. text_hi_percentile: float = 99.0,
  1018. text_black_target: int = 85,
  1019. ) -> np.ndarray:
  1020. """
  1021. 文档灰度图对比度增强(常用于去水印后恢复笔画深度)。
  1022. Args:
  1023. gray: 单通道 uint8 灰度图
  1024. method: text_restore | clahe | gamma | linear
  1025. clip_limit: CLAHE 对比度限制
  1026. tile_grid_size: CLAHE 分块大小
  1027. gamma: gamma 校正指数,<1 加深文字(去水印后发浅时适用)
  1028. black_percentile: linear 拉伸下分位(映射到 0)
  1029. white_percentile: linear 拉伸上分位(映射到 255)
  1030. background_threshold: text_restore 背景阈值(>= 视为白底不处理)
  1031. text_lo_percentile: text_restore 笔画下分位
  1032. text_hi_percentile: text_restore 笔画上分位(映射到 text_black_target)
  1033. text_black_target: text_restore 最深笔画目标灰度(越小越深,建议 75~95)
  1034. Returns:
  1035. 增强后的灰度图
  1036. """
  1037. if gray is None or gray.size == 0:
  1038. return gray
  1039. if gray.ndim != 2:
  1040. raise ValueError("enhance_document_contrast expects single-channel grayscale image")
  1041. method = (method or "text_restore").lower().strip()
  1042. if method == "text_restore":
  1043. return _enhance_text_restore(
  1044. gray,
  1045. background_threshold=background_threshold,
  1046. text_lo_percentile=text_lo_percentile,
  1047. text_hi_percentile=text_hi_percentile,
  1048. text_black_target=text_black_target,
  1049. )
  1050. if method == "gamma":
  1051. gamma = max(0.1, min(float(gamma), 3.0))
  1052. inv_gamma = 1.0 / gamma
  1053. table = np.array(
  1054. [((i / 255.0) ** inv_gamma) * 255 for i in range(256)],
  1055. dtype=np.uint8,
  1056. )
  1057. return cv2.LUT(gray, table)
  1058. if method == "linear":
  1059. p_low = float(np.percentile(gray, black_percentile))
  1060. p_high = float(np.percentile(gray, white_percentile))
  1061. if p_high <= p_low + 1.0:
  1062. return gray
  1063. stretched = (gray.astype(np.float32) - p_low) * 255.0 / (p_high - p_low)
  1064. return np.clip(stretched, 0, 255).astype(np.uint8)
  1065. # 默认 CLAHE:局部对比度,适合扫描件
  1066. tile = max(2, int(tile_grid_size))
  1067. clahe = cv2.createCLAHE(
  1068. clipLimit=max(0.1, float(clip_limit)),
  1069. tileGridSize=(tile, tile),
  1070. )
  1071. return clahe.apply(gray)
  1072. def apply_contrast_enhancement_config(
  1073. gray: np.ndarray,
  1074. contrast_cfg: Optional[Dict[str, Any]],
  1075. ) -> np.ndarray:
  1076. """按配置字典应用对比度增强;未启用时原样返回。"""
  1077. if not contrast_cfg or not contrast_cfg.get("enabled", False):
  1078. return gray
  1079. return enhance_document_contrast(
  1080. gray,
  1081. method=contrast_cfg.get("method", "text_restore"),
  1082. clip_limit=contrast_cfg.get("clip_limit", 2.0),
  1083. tile_grid_size=contrast_cfg.get("tile_grid_size", 8),
  1084. gamma=contrast_cfg.get("gamma", 0.85),
  1085. black_percentile=contrast_cfg.get("black_percentile", 2.0),
  1086. white_percentile=contrast_cfg.get("white_percentile", 98.0),
  1087. background_threshold=contrast_cfg.get("background_threshold", 248),
  1088. text_lo_percentile=contrast_cfg.get("text_lo_percentile", 1.0),
  1089. text_hi_percentile=contrast_cfg.get("text_hi_percentile", 99.0),
  1090. text_black_target=contrast_cfg.get("text_black_target", 75),
  1091. )
  1092. def remove_watermark_from_image(
  1093. image: Union[np.ndarray, Image.Image],
  1094. threshold: int = 160,
  1095. morph_close_kernel: int = 2,
  1096. return_pil: Optional[bool] = None,
  1097. watermark_removal_cfg: Optional[Dict[str, Any]] = None,
  1098. removal_debug: Optional[Dict[str, Any]] = None,
  1099. ) -> Union[np.ndarray, Image.Image]:
  1100. """
  1101. 去除图像中的浅色斜向文字水印,返回灰度图。
  1102. method(watermark_removal_cfg):
  1103. threshold(默认): gray > threshold → 255
  1104. masked / masked_adaptive: 掩膜 + 掩膜内动态阈值
  1105. Args:
  1106. image: 输入图像(PIL.Image 或 np.ndarray BGR/RGB/灰度)。
  1107. threshold: 全局阈值或掩膜失败时的回退阈值。
  1108. morph_close_kernel: 形态学闭运算核大小,0 跳过。
  1109. watermark_removal_cfg: 完整配置(含 method / mask / adaptive)。
  1110. removal_debug: 若传入 dict,写入掩膜与 T_wm 等调试字段。
  1111. Returns:
  1112. 去除水印后的灰度图:PIL.Image(mode='L') 或 np.ndarray(HxW, uint8)。
  1113. """
  1114. input_is_pil = isinstance(image, Image.Image)
  1115. cfg = watermark_removal_cfg or {}
  1116. method = str(cfg.get("method") or "threshold").lower().strip()
  1117. gray, bgr = _image_to_gray_and_bgr(image)
  1118. if method in ("masked", "masked_adaptive"):
  1119. cleaned, dbg = remove_watermark_masked_adaptive(
  1120. gray,
  1121. bgr=bgr,
  1122. mask_cfg=cfg.get("mask") if isinstance(cfg.get("mask"), dict) else None,
  1123. adaptive_cfg=cfg.get("adaptive")
  1124. if isinstance(cfg.get("adaptive"), dict)
  1125. else None,
  1126. threshold_fallback=threshold,
  1127. morph_close_kernel=morph_close_kernel,
  1128. )
  1129. if removal_debug is not None:
  1130. removal_debug.clear()
  1131. removal_debug.update(dbg)
  1132. else:
  1133. cleaned = gray.copy()
  1134. cleaned[gray > threshold] = 255
  1135. if morph_close_kernel > 0:
  1136. kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8)
  1137. cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel)
  1138. if removal_debug is not None:
  1139. removal_debug.clear()
  1140. removal_debug.update({"mode": "threshold", "threshold": threshold})
  1141. should_return_pil = input_is_pil if return_pil is None else return_pil
  1142. return Image.fromarray(cleaned, mode='L') if should_return_pil else cleaned
  1143. def remove_watermark_from_image_rgb(
  1144. image: Union[np.ndarray, Image.Image],
  1145. threshold: int = 160,
  1146. morph_close_kernel: int = 2,
  1147. return_pil: Optional[bool] = None,
  1148. contrast_enhancement: Optional[Dict[str, Any]] = None,
  1149. apply_watermark_removal: bool = True,
  1150. watermark_removal_cfg: Optional[Dict[str, Any]] = None,
  1151. removal_debug: Optional[Dict[str, Any]] = None,
  1152. ) -> Union[np.ndarray, Image.Image]:
  1153. """
  1154. 去除水印并返回 RGB 三通道图像。
  1155. 与 remove_watermark_from_image 逻辑相同,但输出为 RGB(三通道),
  1156. 方便直接传入布局检测、OCR 等需要彩色输入的下游模型。
  1157. Args:
  1158. contrast_enhancement: 对比度增强配置(含 enabled / method 等),见 apply_contrast_enhancement_config
  1159. apply_watermark_removal: False 时跳过阈值抹白,仅做对比度增强(若启用)
  1160. Args/Returns: 同 remove_watermark_from_image,但输出为 RGB/BGR 三通道。
  1161. """
  1162. input_is_pil = isinstance(image, Image.Image)
  1163. if apply_watermark_removal:
  1164. gray_result = remove_watermark_from_image(
  1165. image,
  1166. threshold,
  1167. morph_close_kernel,
  1168. return_pil=False,
  1169. watermark_removal_cfg=watermark_removal_cfg,
  1170. removal_debug=removal_debug,
  1171. )
  1172. else:
  1173. if isinstance(image, Image.Image):
  1174. np_img = np.array(image.convert("RGB"))
  1175. np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR)
  1176. else:
  1177. np_img = image.copy()
  1178. gray_result = (
  1179. cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY)
  1180. if np_img.ndim == 3
  1181. else np_img
  1182. )
  1183. gray_result = apply_contrast_enhancement_config(gray_result, contrast_enhancement)
  1184. rgb_np = cv2.cvtColor(gray_result, cv2.COLOR_GRAY2BGR)
  1185. should_return_pil = input_is_pil if return_pil is None else return_pil
  1186. if should_return_pil:
  1187. return Image.fromarray(cv2.cvtColor(rgb_np, cv2.COLOR_BGR2RGB))
  1188. return rgb_np
  1189. def render_watermark_mask_overlay(
  1190. image: np.ndarray,
  1191. wm_mask: np.ndarray,
  1192. *,
  1193. color: Tuple[int, int, int] = (0, 0, 255),
  1194. alpha: float = 0.45,
  1195. ) -> np.ndarray:
  1196. """在原图上叠加红色半透明水印掩膜,供调试图保存。"""
  1197. if image.ndim == 2:
  1198. base = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
  1199. elif image.shape[2] == 3:
  1200. base = image.copy()
  1201. if image.max() <= 1:
  1202. base = (image * 255).astype(np.uint8)
  1203. else:
  1204. base = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
  1205. overlay = base.copy()
  1206. overlay[wm_mask] = color
  1207. return cv2.addWeighted(base, 1.0 - alpha, overlay, alpha, 0)
  1208. def _image_to_bgr_for_debug(img: np.ndarray) -> np.ndarray:
  1209. """将 ndarray 转为 BGR,供 cv2.imwrite 使用。"""
  1210. if img.ndim == 2:
  1211. return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
  1212. out = img.copy()
  1213. if out.shape[2] == 3:
  1214. return cv2.cvtColor(out, cv2.COLOR_RGB2BGR)
  1215. return out
  1216. def save_watermark_removal_debug(
  1217. before: Union[np.ndarray, Image.Image],
  1218. after: Union[np.ndarray, Image.Image],
  1219. output_dir: Union[str, Path],
  1220. page_name: str,
  1221. *,
  1222. processing_params: Optional[Dict[str, Any]] = None,
  1223. image_format: str = "png",
  1224. save_compare: bool = True,
  1225. subdir: str = "watermark_removal",
  1226. mask_overlay: Optional[np.ndarray] = None,
  1227. ) -> Dict[str, str]:
  1228. """
  1229. 保存去水印调试图(before / after / compare / meta.json)。
  1230. 与 universal_doc_parser 的 debug_comparison 目录结构一致:
  1231. ``{output_dir}/debug_comparison/{subdir}/``
  1232. Args:
  1233. before: 处理前图像(RGB/BGR/灰度)
  1234. after: 处理后图像
  1235. output_dir: 输出根目录(通常为 pipeline 或工具的输出目录)
  1236. page_name: 文件名前缀(如 ``doc_page_002``)
  1237. processing_params: 写入 meta.json 的参数(threshold、contrast_enhancement 等)
  1238. image_format: 图片格式,png/jpg
  1239. save_compare: 是否保存左右拼接对比图
  1240. subdir: debug_comparison 下的子目录名
  1241. Returns:
  1242. 已保存文件路径字典(before/after/compare/meta,未保存的键省略)
  1243. """
  1244. if isinstance(before, Image.Image):
  1245. before = np.array(before)
  1246. if isinstance(after, Image.Image):
  1247. after = np.array(after)
  1248. root = Path(output_dir)
  1249. debug_dir = root / "debug_comparison" / subdir
  1250. debug_dir.mkdir(parents=True, exist_ok=True)
  1251. fmt = (image_format or "png").lstrip(".")
  1252. before_bgr = _image_to_bgr_for_debug(before)
  1253. after_bgr = _image_to_bgr_for_debug(after)
  1254. paths: Dict[str, str] = {}
  1255. before_path = debug_dir / f"{page_name}_watermark_before.{fmt}"
  1256. after_path = debug_dir / f"{page_name}_watermark_after.{fmt}"
  1257. cv2.imwrite(str(before_path), before_bgr)
  1258. cv2.imwrite(str(after_path), after_bgr)
  1259. paths["before"] = str(before_path)
  1260. paths["after"] = str(after_path)
  1261. if save_compare:
  1262. h = max(before_bgr.shape[0], after_bgr.shape[0])
  1263. if before_bgr.shape[0] != h:
  1264. before_bgr = cv2.resize(before_bgr, (before_bgr.shape[1], h))
  1265. if after_bgr.shape[0] != h:
  1266. after_bgr = cv2.resize(after_bgr, (after_bgr.shape[1], h))
  1267. compare = np.hstack([before_bgr, after_bgr])
  1268. compare_path = debug_dir / f"{page_name}_watermark_compare.{fmt}"
  1269. cv2.imwrite(str(compare_path), compare)
  1270. paths["compare"] = str(compare_path)
  1271. logger.info(f"Saved watermark compare: {compare_path}")
  1272. if mask_overlay is not None:
  1273. mask_bgr = _image_to_bgr_for_debug(mask_overlay)
  1274. mask_path = debug_dir / f"{page_name}_watermark_mask.{fmt}"
  1275. cv2.imwrite(str(mask_path), mask_bgr)
  1276. paths["mask"] = str(mask_path)
  1277. meta: Dict[str, Any] = {"page_name": page_name}
  1278. if processing_params:
  1279. _skip_meta = (
  1280. "midtone_mask",
  1281. "wm_mask",
  1282. "wm_candidate",
  1283. "geom_region",
  1284. "geom_candidate",
  1285. "diag_region",
  1286. "text_protect",
  1287. "seal_protect",
  1288. "hough_lines_bgr",
  1289. "diag_ratio_heatmap",
  1290. "hv_ratio_heatmap",
  1291. )
  1292. meta_params = {
  1293. k: v
  1294. for k, v in processing_params.items()
  1295. if k not in _skip_meta
  1296. }
  1297. meta.update(meta_params)
  1298. else:
  1299. meta.update({})
  1300. meta["before"] = paths["before"]
  1301. meta["after"] = paths["after"]
  1302. if "compare" in paths:
  1303. meta["compare"] = paths["compare"]
  1304. meta_path = debug_dir / f"{page_name}_watermark_meta.json"
  1305. meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
  1306. paths["meta"] = str(meta_path)
  1307. logger.info(f"Saved watermark debug: {before_path}, {after_path}")
  1308. return paths
  1309. # ─────────────────────────────────────────────────────────────────────────────
  1310. # PDF 层级水印去除(文字型 PDF,保留可搜索性)
  1311. # ─────────────────────────────────────────────────────────────────────────────
  1312. def _is_watermark_xobj(doc, xref: int, obj_str: str) -> bool:
  1313. """
  1314. 判断一个 Form XObject 是否为水印。
  1315. 启发式规则(满足其一即视为水印):
  1316. 1. 含旋转变换矩阵(cm 指令 sin/cos 分量非零),无论是否有 /Group
  1317. 2. 有透明度组(/Group)且内容流包含透明度操作符(ca/CA)
  1318. 3. 有透明度组且内容流体积 > 2KB(大量重复绘图 = 平铺水印)
  1319. """
  1320. if "/Form" not in obj_str:
  1321. return False
  1322. try:
  1323. stream = doc.xref_stream(xref)
  1324. if not stream:
  1325. return False
  1326. stream_text = stream.decode("latin-1", errors="ignore")
  1327. except Exception:
  1328. return False
  1329. has_group = "/Group" in obj_str
  1330. cm_pattern = re.compile(
  1331. r"([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\s+[-\d.]+\s+[-\d.]+\s+cm"
  1332. )
  1333. for m in cm_pattern.finditer(stream_text):
  1334. a, b, c, d = float(m.group(1)), float(m.group(2)), float(m.group(3)), float(m.group(4))
  1335. if abs(b) > 0.1 or abs(c) > 0.1:
  1336. return True
  1337. if not has_group:
  1338. return False
  1339. if re.search(r'\b(ca|CA)\s+[0-9.]+', stream_text) or re.search(r'[0-9.]+\s+(ca|CA)\b', stream_text):
  1340. return True
  1341. if len(stream_text) > 2048:
  1342. return True
  1343. return False
  1344. def _is_watermark_image_xobj(doc, xref: int, obj_str: str) -> bool:
  1345. """
  1346. 判断一个 Image XObject 是否为水印背景图。
  1347. 判断规则(全部满足):
  1348. 1. /Subtype /Image
  1349. 2. 有 /SMask(半透明)
  1350. 3. 宽 >= 600 且 高 >= 800(全页尺寸,排除小图标)
  1351. 4. 解码后像素均值 >= 240(近乎全白,水印文字稀疏)
  1352. """
  1353. if "/Image" not in obj_str or "/SMask" not in obj_str:
  1354. return False
  1355. w_m = re.search(r'/Width\s+(\d+)', obj_str)
  1356. h_m = re.search(r'/Height\s+(\d+)', obj_str)
  1357. if not w_m or not h_m:
  1358. return False
  1359. if int(w_m.group(1)) < 600 or int(h_m.group(1)) < 800:
  1360. return False
  1361. try:
  1362. from io import BytesIO
  1363. img_info = doc.extract_image(xref)
  1364. pil_img = Image.open(BytesIO(img_info["image"])).convert("L")
  1365. return float(np.array(pil_img).mean()) >= 240.0
  1366. except Exception:
  1367. return False
  1368. def _blank_watermark_image(doc, img_xref: int) -> None:
  1369. """
  1370. 将水印 Image XObject 的 RGB 流和 SMask 替换为全白/全不透明。
  1371. 关键点:必须先移除 /DecodeParms(Predictor 11),再调用 update_stream。
  1372. 否则渲染器在 FlateDecode 之后还会尝试 Predictor 解码,失败后回退原始数据,
  1373. 水印依然可见。
  1374. """
  1375. obj_str = doc.xref_object(img_xref)
  1376. w_m = re.search(r'/Width\s+(\d+)', obj_str)
  1377. h_m = re.search(r'/Height\s+(\d+)', obj_str)
  1378. w = int(w_m.group(1)) if w_m else 1
  1379. h = int(h_m.group(1)) if h_m else 1
  1380. cs_m = re.search(r'/ColorSpace\s+/Device(RGB|Gray|CMYK)', obj_str)
  1381. channels = {'RGB': 3, 'CMYK': 4}.get(cs_m.group(1) if cs_m else '', 1)
  1382. doc.xref_set_key(img_xref, "DecodeParms", "null")
  1383. doc.update_stream(img_xref, bytes([255]) * (w * h * channels))
  1384. smask_m = re.search(r'/SMask\s+(\d+)\s+0\s+R', obj_str)
  1385. if smask_m:
  1386. smask_xref = int(smask_m.group(1))
  1387. smask_obj = doc.xref_object(smask_xref)
  1388. sw = int(m.group(1)) if (m := re.search(r'/Width\s+(\d+)', smask_obj)) else w
  1389. sh = int(m.group(1)) if (m := re.search(r'/Height\s+(\d+)', smask_obj)) else h
  1390. doc.xref_set_key(smask_xref, "DecodeParms", "null")
  1391. doc.update_stream(smask_xref, bytes([255]) * (sw * sh))
  1392. def scan_pdf_watermark_xobjs(pdf_bytes: bytes, sample_pages: int = 3) -> bool:
  1393. """
  1394. 快速扫描 PDF 前 N 页,判断是否含水印 XObject。
  1395. 无副作用(只读),用于在执行去水印前快速判断,避免对无水印的大文件
  1396. 执行全量扫描和序列化,显著降低财报等大文件的处理开销。
  1397. Args:
  1398. pdf_bytes: PDF 文件的原始字节。
  1399. sample_pages: 扫描页数上限,默认 3(银行流水通常前几页有水印)。
  1400. Returns:
  1401. True 表示发现水印 XObject,False 表示未发现。
  1402. """
  1403. try:
  1404. import fitz
  1405. except ImportError:
  1406. return False
  1407. doc = fitz.open(stream=pdf_bytes, filetype="pdf")
  1408. pages_to_check = min(sample_pages, len(doc))
  1409. try:
  1410. for i in range(pages_to_check):
  1411. page = doc[i]
  1412. for xref, *_ in page.get_xobjects():
  1413. try:
  1414. obj_str = doc.xref_object(xref)
  1415. except Exception:
  1416. continue
  1417. if _is_watermark_xobj(doc, xref, obj_str):
  1418. return True
  1419. for img_tuple in page.get_images(full=True):
  1420. try:
  1421. obj_str = doc.xref_object(img_tuple[0])
  1422. except Exception:
  1423. continue
  1424. if _is_watermark_image_xobj(doc, img_tuple[0], obj_str):
  1425. return True
  1426. finally:
  1427. doc.close()
  1428. return False
  1429. def remove_txt_pdf_watermark(pdf_bytes: bytes) -> Optional[bytes]:
  1430. """
  1431. 对文字型 PDF 执行原生水印去除,完全在内存中完成,不写临时文件。
  1432. 支持两种水印形式:
  1433. - Form XObject 水印:清空内容流
  1434. - Image XObject 水印(全页背景图 + SMask 透明通道):替换为全白像素
  1435. 适用场景:pdf_type='txt' 的 PDF,去除后可直接传给渲染层(tobytes() → bytes)。
  1436. 对于大文件(如财报),建议先用 scan_pdf_watermark_xobjs() 快速判断再调用本函数。
  1437. Args:
  1438. pdf_bytes: 原始 PDF 的字节内容。
  1439. Returns:
  1440. 去除水印后的 PDF bytes(garbage=4 压缩);若未发现水印返回 None。
  1441. """
  1442. try:
  1443. import fitz
  1444. except ImportError:
  1445. raise ImportError("请安装 PyMuPDF: pip install PyMuPDF")
  1446. from loguru import logger
  1447. doc = fitz.open(stream=pdf_bytes, filetype="pdf")
  1448. processed_xrefs: set[int] = set()
  1449. total_removed = 0
  1450. for page in doc:
  1451. # ── Form XObject 水印 ─────────────────────────────────────────
  1452. for xref, name, _invoker, _unused in page.get_xobjects():
  1453. if xref in processed_xrefs:
  1454. continue
  1455. try:
  1456. obj_str = doc.xref_object(xref)
  1457. except Exception:
  1458. continue
  1459. if _is_watermark_xobj(doc, xref, obj_str):
  1460. try:
  1461. doc.update_stream(xref, b"")
  1462. processed_xrefs.add(xref)
  1463. total_removed += 1
  1464. logger.debug(f" [Form XObject] 清空水印 xref={xref}, name={name}")
  1465. except Exception as e:
  1466. logger.warning(f" 清空 Form XObject xref={xref} 失败: {e}")
  1467. # ── Image XObject 水印 ────────────────────────────────────────
  1468. for img_tuple in page.get_images(full=True):
  1469. img_xref = img_tuple[0]
  1470. if img_xref in processed_xrefs:
  1471. continue
  1472. try:
  1473. obj_str = doc.xref_object(img_xref)
  1474. except Exception:
  1475. continue
  1476. if _is_watermark_image_xobj(doc, img_xref, obj_str):
  1477. _blank_watermark_image(doc, img_xref)
  1478. processed_xrefs.add(img_xref)
  1479. total_removed += 1
  1480. logger.debug(f" [Image XObject] 替换水印图像 xref={img_xref}")
  1481. if total_removed == 0:
  1482. doc.close()
  1483. return None
  1484. result = doc.tobytes(garbage=4, deflate=True)
  1485. doc.close()
  1486. logger.info(f"✅ PDF 层级水印去除:共清除 {total_removed} 个水印 XObject")
  1487. return result