| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- """
- 水印处理门面:preset 解析、检测、去水印、对比度增强。
- """
- from __future__ import annotations
- from typing import Any, Dict, List, Optional, Tuple, Union
- import numpy as np
- from loguru import logger
- from PIL import Image
- from ocr_utils.watermark.algorithms import detect_watermark
- from ocr_utils.watermark.presets import Scope, merge_watermark_config
- from ocr_utils.watermark.removal import remove_watermark_from_image_rgb
- class WatermarkProcessor:
- """页级 / 单元格级水印去除编排。"""
- def __init__(
- self,
- config: Dict[str, Any],
- *,
- scope: Scope = "page",
- ):
- self.scope = scope
- self.config = merge_watermark_config(scope, config)
- @classmethod
- def from_user_config(
- cls,
- user_cfg: Optional[Dict[str, Any]],
- *,
- scope: Scope = "page",
- ) -> "WatermarkProcessor":
- return cls(user_cfg or {}, scope=scope)
- @property
- def enabled(self) -> bool:
- return bool(self.config.get("enabled", False))
- @property
- def method(self) -> str:
- return str(self.config.get("method") or "masked_adaptive")
- @property
- def threshold(self) -> int:
- return int(self.config.get("threshold", 155))
- @property
- def morph_close_kernel(self) -> int:
- return int(self.config.get("morph_close_kernel", 0))
- def contrast_config(self) -> Optional[Dict[str, Any]]:
- ce = self.config.get("contrast_enhancement")
- if not isinstance(ce, dict):
- return None
- if not ce.get("enabled", False):
- return None
- return dict(ce)
- def should_apply(self, image: Union[np.ndarray, Image.Image]) -> bool:
- if not self.enabled:
- return False
- if not bool(self.config.get("detect_before_remove", True)):
- return True
- detect_cfg = self.config.get("detect")
- if not isinstance(detect_cfg, dict):
- detect_cfg = {}
- angle_range = detect_cfg.get("diagonal_angle_range", (30, 60))
- if isinstance(angle_range, list):
- angle_range = tuple(angle_range)
- return detect_watermark(
- image,
- midtone_low=int(detect_cfg.get("midtone_low", 100)),
- midtone_high=int(detect_cfg.get("midtone_high", 220)),
- ratio_threshold=float(detect_cfg.get("ratio_threshold", 0.025)),
- check_diagonal=bool(detect_cfg.get("check_diagonal", True)),
- diagonal_angle_range=angle_range,
- )
- def process(
- self,
- image: Union[np.ndarray, Image.Image],
- *,
- apply_removal: Optional[bool] = None,
- apply_contrast: Optional[bool] = None,
- contrast_override: Optional[Dict[str, Any]] = None,
- removal_debug: Optional[Dict[str, Any]] = None,
- force: bool = False,
- ) -> Tuple[np.ndarray, List[str]]:
- """
- 去水印 + 可选对比度增强。
- Returns:
- (BGR ndarray, preprocess_stages)
- """
- stages: List[str] = []
- if isinstance(image, Image.Image):
- img = np.array(image.convert("RGB"))
- img = img[:, :, ::-1].copy() # RGB -> BGR
- else:
- img = np.array(image)
- if img.ndim == 2:
- img = np.stack([img, img, img], axis=-1)
- do_remove = apply_removal if apply_removal is not None else self.enabled
- if do_remove and not force and not self.should_apply(img):
- do_remove = False
- if contrast_override is not None:
- contrast_cfg = dict(contrast_override)
- if apply_contrast is not False and not contrast_cfg.get("enabled", True):
- contrast_cfg["enabled"] = True
- else:
- contrast_cfg = self.contrast_config()
- if apply_contrast is False:
- contrast_cfg = None
- elif apply_contrast is True and contrast_cfg is None:
- ce = self.config.get("contrast_enhancement") or {}
- if isinstance(ce, dict) and ce.get("method"):
- contrast_cfg = dict(ce)
- contrast_cfg["enabled"] = True
- if not do_remove and not contrast_cfg:
- return img, stages
- try:
- if do_remove:
- stages.append("wm")
- if contrast_cfg:
- stages.append("contrast")
- cleaned = remove_watermark_from_image_rgb(
- img,
- threshold=self.threshold,
- morph_close_kernel=self.morph_close_kernel,
- return_pil=False,
- contrast_enhancement=contrast_cfg,
- apply_watermark_removal=do_remove,
- watermark_removal_cfg=self.config,
- removal_debug=removal_debug,
- )
- return np.asarray(cleaned), stages
- except Exception as e:
- logger.warning(f"WatermarkProcessor.process failed (scope={self.scope}): {e}")
- return img, stages
- def get_full_config(self) -> Dict[str, Any]:
- return dict(self.config)
|