"""水印 去水印入口(由 ocr_utils.watermark_utils 迁入)。""" from __future__ import annotations import json import re from pathlib import Path from typing import Any, Dict, Optional, Tuple, Union import cv2 import numpy as np from loguru import logger from PIL import Image from ocr_utils.watermark.algorithms import ( _image_to_gray_and_bgr, remove_watermark_masked_adaptive, ) from ocr_utils.watermark.contrast import apply_contrast_enhancement_config def remove_watermark_from_image( image: Union[np.ndarray, Image.Image], threshold: int = 160, morph_close_kernel: int = 2, return_pil: Optional[bool] = None, watermark_removal_cfg: Optional[Dict[str, Any]] = None, removal_debug: Optional[Dict[str, Any]] = None, ) -> Union[np.ndarray, Image.Image]: """ 去除图像中的浅色斜向文字水印,返回灰度图。 method(watermark_removal_cfg): threshold(默认): gray > threshold → 255 masked / masked_adaptive: 掩膜 + 掩膜内动态阈值 Args: image: 输入图像(PIL.Image 或 np.ndarray BGR/RGB/灰度)。 threshold: 全局阈值或掩膜失败时的回退阈值。 morph_close_kernel: 形态学闭运算核大小,0 跳过。 watermark_removal_cfg: 完整配置(含 method / mask / adaptive)。 removal_debug: 若传入 dict,写入掩膜与 T_wm 等调试字段。 Returns: 去除水印后的灰度图:PIL.Image(mode='L') 或 np.ndarray(HxW, uint8)。 """ input_is_pil = isinstance(image, Image.Image) cfg = watermark_removal_cfg or {} method = str(cfg.get("method") or "threshold").lower().strip() gray, bgr = _image_to_gray_and_bgr(image) if method in ("masked", "masked_adaptive"): cleaned, dbg = remove_watermark_masked_adaptive( gray, bgr=bgr, mask_cfg=cfg.get("mask") if isinstance(cfg.get("mask"), dict) else None, adaptive_cfg=cfg.get("adaptive") if isinstance(cfg.get("adaptive"), dict) else None, threshold_fallback=threshold, morph_close_kernel=morph_close_kernel, ) if removal_debug is not None: removal_debug.clear() removal_debug.update(dbg) else: cleaned = gray.copy() cleaned[gray > threshold] = 255 if morph_close_kernel > 0: kernel = np.ones((morph_close_kernel, morph_close_kernel), np.uint8) cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_CLOSE, kernel) if removal_debug is not None: removal_debug.clear() removal_debug.update({"mode": "threshold", "threshold": threshold}) should_return_pil = input_is_pil if return_pil is None else return_pil return Image.fromarray(cleaned, mode='L') if should_return_pil else cleaned def remove_watermark_from_image_rgb( image: Union[np.ndarray, Image.Image], threshold: int = 160, morph_close_kernel: int = 2, return_pil: Optional[bool] = None, contrast_enhancement: Optional[Dict[str, Any]] = None, apply_watermark_removal: bool = True, watermark_removal_cfg: Optional[Dict[str, Any]] = None, removal_debug: Optional[Dict[str, Any]] = None, ) -> Union[np.ndarray, Image.Image]: """ 去除水印并返回 RGB 三通道图像。 与 remove_watermark_from_image 逻辑相同,但输出为 RGB(三通道), 方便直接传入布局检测、OCR 等需要彩色输入的下游模型。 Args: contrast_enhancement: 对比度增强配置(含 enabled / method 等),见 apply_contrast_enhancement_config apply_watermark_removal: False 时跳过阈值抹白,仅做对比度增强(若启用) Args/Returns: 同 remove_watermark_from_image,但输出为 RGB/BGR 三通道。 """ input_is_pil = isinstance(image, Image.Image) if apply_watermark_removal: gray_result = remove_watermark_from_image( image, threshold, morph_close_kernel, return_pil=False, watermark_removal_cfg=watermark_removal_cfg, removal_debug=removal_debug, ) else: if isinstance(image, Image.Image): np_img = np.array(image.convert("RGB")) np_img = cv2.cvtColor(np_img, cv2.COLOR_RGB2BGR) else: np_img = image.copy() gray_result = ( cv2.cvtColor(np_img, cv2.COLOR_BGR2GRAY) if np_img.ndim == 3 else np_img ) gray_result = apply_contrast_enhancement_config(gray_result, contrast_enhancement) rgb_np = cv2.cvtColor(gray_result, cv2.COLOR_GRAY2BGR) should_return_pil = input_is_pil if return_pil is None else return_pil if should_return_pil: return Image.fromarray(cv2.cvtColor(rgb_np, cv2.COLOR_BGR2RGB)) return rgb_np def render_watermark_mask_overlay( image: np.ndarray, wm_mask: np.ndarray, *, color: Tuple[int, int, int] = (0, 0, 255), alpha: float = 0.45, ) -> np.ndarray: """在原图上叠加红色半透明水印掩膜,供调试图保存。""" if image.ndim == 2: base = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) elif image.shape[2] == 3: base = image.copy() if image.max() <= 1: base = (image * 255).astype(np.uint8) else: base = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR) overlay = base.copy() overlay[wm_mask] = color return cv2.addWeighted(base, 1.0 - alpha, overlay, alpha, 0)