watermark_synthesis.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. """
  2. 水印合成脚本:在clean图片上叠加斜向浅色文字水印,输出带水印图 + 精确mask。
  3. 用法:
  4. python watermark_synthesis.py # 默认参数演示
  5. python watermark_synthesis.py --input ./test_images/clean/ # 指定输入目录
  6. python watermark_synthesis.py --text "SAMPLE" --opacity 0.15 --angle 45
  7. """
  8. from __future__ import annotations
  9. import argparse
  10. import math
  11. from pathlib import Path
  12. from typing import Optional
  13. import cv2
  14. import numpy as np
  15. from loguru import logger
  16. from PIL import Image, ImageDraw, ImageFont
  17. def _find_font() -> str:
  18. """查找可用中文字体,找不到返回默认字体。"""
  19. candidates = [
  20. "/System/Library/Fonts/PingFang.ttc",
  21. "/System/Library/Fonts/STHeiti Light.ttc",
  22. "/System/Library/Fonts/Hiragino Sans GB.ttc",
  23. "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
  24. "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
  25. ]
  26. for fp in candidates:
  27. if Path(fp).exists():
  28. return fp
  29. logger.warning("未找到中文字体,使用PIL默认字体")
  30. return ""
  31. def _text_size_to_font_size(text_height_px: int) -> int:
  32. """根据目标文字像素高度估算 font_size。"""
  33. return int(text_height_px * 1.15)
  34. def _render_watermark_tile(
  35. pil_img: Image.Image,
  36. text: str,
  37. font_path: str,
  38. font_size: int,
  39. opacity: float,
  40. angle_deg: float,
  41. spacing_x: int,
  42. spacing_y: int,
  43. ) -> tuple[np.ndarray, np.ndarray]:
  44. """
  45. 在图上平铺斜向水印文字,返回 (watermarked_np, mask_np)。
  46. mask_np: H×W bool, True=水印像素位置。
  47. """
  48. w, h = pil_img.size
  49. text_height = int(font_size / 1.15)
  50. gray_value = int(255 * (1 - opacity))
  51. # 创建水印文字mask(稍大画布以覆盖旋转后区域)
  52. diag = int(math.sqrt(w * w + h * h)) + text_height * 4
  53. tile_w = diag
  54. tile_h = diag
  55. tile = Image.new("L", (tile_w, tile_h), 0)
  56. draw = ImageDraw.Draw(tile)
  57. font = ImageFont.truetype(font_path, font_size) if font_path else ImageFont.load_default()
  58. # 步长取spacing + 文字大小,确保均匀分布
  59. step_x = text_height + spacing_x
  60. step_y = text_height + spacing_y
  61. for y in range(0, tile_h, step_y):
  62. for x in range(0, tile_w, step_x):
  63. draw.text((x, y), text, fill=255, font=font)
  64. # 旋转
  65. tile_rot = tile.rotate(angle_deg, expand=False, fillcolor=0)
  66. # 裁剪到原图大小(中心对齐)
  67. cx, cy = tile_rot.size[0] // 2, tile_rot.size[1] // 2
  68. left = cx - w // 2
  69. top = cy - h // 2
  70. watermark_tile = tile_rot.crop((left, top, left + w, top + h))
  71. mask_np = np.array(watermark_tile) > 0
  72. # 叠加到原图
  73. base = np.array(pil_img.convert("RGB"))
  74. alpha = opacity
  75. result = base.copy()
  76. result[mask_np] = (
  77. base[mask_np].astype(np.float32) * (1 - alpha)
  78. + np.array([gray_value, gray_value, gray_value], dtype=np.float32) * alpha
  79. ).astype(np.uint8)
  80. return result, mask_np
  81. def synthesize_watermark(
  82. input_path: Path,
  83. output_dir: Path,
  84. *,
  85. text: str = "SAMPLE",
  86. font_path: str = "",
  87. text_height_px: int = 36,
  88. opacity: float = 0.12,
  89. angle_deg: float = 45.0,
  90. spacing_x: int = 180,
  91. spacing_y: int = 180,
  92. save_mask: bool = True,
  93. ) -> Path:
  94. """
  95. 在输入图片上合成水印,输出到 output_dir。
  96. Returns:
  97. 合成后的图片路径
  98. """
  99. output_dir.mkdir(parents=True, exist_ok=True)
  100. pil_img = Image.open(str(input_path)).convert("RGB")
  101. fp = font_path or _find_font()
  102. font_size = _text_size_to_font_size(text_height_px)
  103. logger.info(
  104. f"合成水印: {input_path.name} | "
  105. f"text='{text}' font_size={font_size} opacity={opacity} angle={angle_deg}"
  106. )
  107. result_np, mask_np = _render_watermark_tile(
  108. pil_img, text, fp, font_size, opacity, angle_deg, spacing_x, spacing_y
  109. )
  110. out_name = f"{input_path.stem}_watermarked{input_path.suffix}"
  111. out_path = output_dir / out_name
  112. Image.fromarray(result_np).save(str(out_path))
  113. logger.info(f" 水印图: {out_path}")
  114. if save_mask:
  115. mask_path = output_dir / f"{input_path.stem}_mask.png"
  116. cv2.imwrite(str(mask_path), (mask_np.astype(np.uint8) * 255))
  117. logger.info(f" mask: {mask_path}")
  118. return out_path
  119. def main():
  120. parser = argparse.ArgumentParser(description="水印合成工具")
  121. parser.add_argument("--input", type=Path, default=None,
  122. help="输入图片或目录(默认: test_images/clean/)")
  123. parser.add_argument("--output", type=Path, default=None,
  124. help="输出目录(默认: test_images/synthetic/)")
  125. parser.add_argument("--text", type=str, default="行内内部使用",
  126. help="水印文字内容")
  127. parser.add_argument("--text-height", type=int, default=48,
  128. help="文字像素高度(默认48)")
  129. parser.add_argument("--opacity", type=float, default=0.10,
  130. help="水印透明度 0~1(默认0.10)")
  131. parser.add_argument("--angle", type=float, default=45.0,
  132. help="水印倾斜角度(默认45°)")
  133. parser.add_argument("--spacing-x", type=int, default=250,
  134. help="水印文字水平间距(默认250px)")
  135. parser.add_argument("--spacing-y", type=int, default=250,
  136. help="水印文字垂直间距(默认250px)")
  137. parser.add_argument("--font", type=str, default="",
  138. help="字体文件路径")
  139. parser.add_argument("--no-mask", action="store_true",
  140. help="不保存mask")
  141. parser.add_argument("--demo", action="store_true",
  142. help="使用input目录下第一张测试图生成演示图")
  143. args = parser.parse_args()
  144. root = Path(__file__).parent
  145. input_dir = args.input or (root / "test_images" / "clean")
  146. output_dir = args.output or (root / "test_images" / "synthetic")
  147. if args.demo:
  148. # 无clean图时,直接用input目录的水印图再加一层合成水印做演示
  149. img_files = sorted(root.glob("test_images/input/*"))
  150. if not img_files:
  151. logger.error("test_images/input/ 下没有测试图片,请放入图片后重试")
  152. return
  153. input_dir = root / "test_images" / "input"
  154. output_dir = root / "test_images" / "synthetic"
  155. input_dir = Path(input_dir)
  156. output_dir = Path(output_dir)
  157. output_dir.mkdir(parents=True, exist_ok=True)
  158. if input_dir.is_dir():
  159. img_files = sorted([
  160. f for f in input_dir.iterdir()
  161. if f.suffix.lower() in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff", ".webp"}
  162. ])
  163. elif input_dir.is_file():
  164. img_files = [input_dir]
  165. else:
  166. logger.error(f"输入路径不存在: {input_dir}")
  167. return
  168. if not img_files:
  169. logger.warning(f"{input_dir} 下没有图片文件")
  170. return
  171. for f in img_files:
  172. synthesize_watermark(
  173. f, output_dir,
  174. text=args.text,
  175. font_path=args.font,
  176. text_height_px=args.text_height,
  177. opacity=args.opacity,
  178. angle_deg=args.angle,
  179. spacing_x=args.spacing_x,
  180. spacing_y=args.spacing_y,
  181. save_mask=not args.no_mask,
  182. )
  183. if __name__ == "__main__":
  184. main()