#!/usr/bin/env python # -*- coding: utf-8 -*- """TinyPNG/Tinify 多 API Key 负载均衡压缩工具。 使用示例: python tinypng_balancer.py --init-config tinify_keys.json python tinypng_balancer.py input.png -o output.png --config tinify_keys.json python tinypng_balancer.py images -o optimized --recursive --convert webp """ from __future__ import absolute_import, print_function import argparse import datetime as _dt import hashlib import json import os import sys import tempfile DEFAULT_LIMIT = 500 SUPPORTED_EXTENSIONS = (".png", ".jpg", ".jpeg", ".webp", ".avif") CONVERT_TYPES = { "png": ("image/png", ".png"), "jpeg": ("image/jpeg", ".jpg"), "jpg": ("image/jpeg", ".jpg"), "webp": ("image/webp", ".webp"), "avif": ("image/avif", ".avif"), } CONVERT_SOURCE = "source" CONVERT_ALL = "all" CONVERT_ALL_TARGETS = ("png", "jpeg", "webp", "avif") class BatchArgs(object): """给命令行和 GUI 共用的简易参数对象。""" def __init__( self, resize_method=None, width=None, height=None, convert=None, recursive=False, overwrite=False, dry_run=False, sync_usage=True, ): self.resize_method = resize_method self.width = width self.height = height self.convert = convert self.recursive = recursive self.overwrite = overwrite self.dry_run = dry_run self.sync_usage = sync_usage def current_month(): return _dt.date.today().strftime("%Y-%m") def key_id(api_key): return hashlib.sha256(api_key.encode("utf-8")).hexdigest()[:16] def read_json(path, default): if not os.path.exists(path): return default with open(path, "r", encoding="utf-8") as fh: return json.load(fh) def write_json(path, data): directory = os.path.dirname(os.path.abspath(path)) if directory and not os.path.exists(directory): os.makedirs(directory) fd, tmp_path = tempfile.mkstemp(prefix=".tinify-", suffix=".json", dir=directory) try: with os.fdopen(fd, "w", encoding="utf-8") as fh: json.dump(data, fh, indent=2, sort_keys=True, ensure_ascii=False) fh.write("\n") os.replace(tmp_path, path) finally: if os.path.exists(tmp_path): os.remove(tmp_path) def init_config(path): if os.path.exists(path): raise SystemExit("配置文件已存在:{0}".format(path)) sample = { "api_keys": [ {"name": "账号-1", "key": "YOUR_TINIFY_API_KEY_1", "monthly_limit": DEFAULT_LIMIT}, {"name": "账号-2", "key": "YOUR_TINIFY_API_KEY_2", "monthly_limit": DEFAULT_LIMIT}, ], "proxy": None, } write_json(path, sample) print("已创建配置模板:{0}".format(path)) def load_tinify(): try: import tinify except ImportError as exc: raise SystemExit("缺少依赖:{0}。请使用 uv run tinypng_gui.py 或 uv run tinypng_balancer.py 运行。".format(exc)) return tinify def load_keys(config_path): config = read_json(config_path, None) if not config: raise SystemExit("找不到配置文件。请先执行:--init-config {0}".format(config_path)) entries = config.get("api_keys", []) keys = [] for index, item in enumerate(entries): if isinstance(item, str): item = {"key": item} api_key = (item.get("key") or "").strip() if not api_key or api_key.startswith("YOUR_TINIFY_API_KEY"): continue keys.append( { "id": key_id(api_key), "key": api_key, "name": item.get("name") or "key-{0}".format(index + 1), "monthly_limit": int(item.get("monthly_limit") or DEFAULT_LIMIT), } ) if not keys: raise SystemExit("配置文件中没有可用的 API Key:{0}".format(config_path)) return config, keys def load_state(path): state = read_json(path, {}) month = current_month() if state.get("month") != month: state = {"month": month, "usage": {}} state.setdefault("usage", {}) return state def get_count(state, candidate): return int(state["usage"].get(candidate["id"], 0)) def set_count(state, candidate, count): state["usage"][candidate["id"]] = int(count) def choose_key(keys, state, blocked): # 按配置顺序使用 API Key:先用完第一个,再切到下一个。 available = [ key for key in keys if not key.get("invalid") and key["id"] not in blocked and get_count(state, key) < key["monthly_limit"] ] if not available: return None return available[0] def sync_key_usage(tinify, keys, state, state_path, config, emit): """调用官方 validate 接口,同步每个 API Key 的本月真实用量。""" emit("正在同步官方本月用量...") for candidate in keys: tinify.key = candidate["key"] tinify.proxy = config.get("proxy") try: tinify.validate() if tinify.compression_count is not None: set_count(state, candidate, tinify.compression_count) emit( "额度 {0}: {1}/{2}".format( candidate["name"], get_count(state, candidate), candidate["monthly_limit"], ) ) except tinify.AccountError as exc: candidate["invalid"] = True emit("禁用 {0}: {1}".format(candidate["name"], exc)) except (tinify.ConnectionError, tinify.ServerError) as exc: emit("同步失败 {0}: {1}".format(candidate["name"], exc)) raise write_json(state_path, state) def collect_inputs(inputs, recursive): files = [] roots = [] for path in inputs: path = os.path.abspath(path) if os.path.isdir(path): roots.append(path) if recursive: for base, _, names in os.walk(path): for name in names: full_path = os.path.join(base, name) if full_path.lower().endswith(SUPPORTED_EXTENSIONS): files.append((full_path, path)) else: for name in os.listdir(path): full_path = os.path.join(path, name) if os.path.isfile(full_path) and full_path.lower().endswith(SUPPORTED_EXTENSIONS): files.append((full_path, path)) elif os.path.isfile(path): files.append((path, os.path.dirname(path))) else: raise SystemExit("输入路径不存在:{0}".format(path)) return files, roots def output_path_for(input_path, root, output, total_files, convert_to): output = os.path.abspath(output) _, original_ext = os.path.splitext(input_path) final_ext = CONVERT_TYPES[convert_to][1] if convert_to else original_ext if total_files == 1 and not os.path.isdir(output) and os.path.splitext(output)[1]: return output relative = os.path.relpath(input_path, root) relative_base, _ = os.path.splitext(relative) return os.path.join(output, relative_base + final_ext) def convert_targets(convert_to): if not convert_to: return [CONVERT_SOURCE] if isinstance(convert_to, (list, tuple)): targets = [] for item in convert_to: for target in convert_targets(item): if target not in targets: targets.append(target) return targets if convert_to == CONVERT_SOURCE: return [CONVERT_SOURCE] if convert_to == CONVERT_ALL: return [CONVERT_SOURCE] + list(CONVERT_ALL_TARGETS) return [convert_to] def args_for_convert(args, convert_to): return BatchArgs( resize_method=args.resize_method, width=args.width, height=args.height, convert=convert_to, recursive=args.recursive, overwrite=args.overwrite, dry_run=args.dry_run, sync_usage=args.sync_usage, ) def build_source(tinify, input_path, args): source = tinify.from_file(input_path) if args.resize_method: options = {"method": args.resize_method} if args.width: options["width"] = args.width if args.height: options["height"] = args.height source = source.resize(**options) if args.convert and args.convert != CONVERT_SOURCE: source = source.convert(type=[CONVERT_TYPES[args.convert][0]]) return source def optimize_one(tinify, input_path, output_path, keys, state, state_path, config, args): blocked = set() last_error = None while True: candidate = choose_key(keys, state, blocked) if not candidate: raise RuntimeError("所有 API Key 都已达到本月上限或暂不可用。最后错误:{0}".format(last_error)) tinify.key = candidate["key"] tinify.proxy = config.get("proxy") try: directory = os.path.dirname(os.path.abspath(output_path)) if directory and not os.path.exists(directory): os.makedirs(directory) build_source(tinify, input_path, args).to_file(output_path) if tinify.compression_count is not None: set_count(state, candidate, tinify.compression_count) else: set_count(state, candidate, get_count(state, candidate) + 1) write_json(state_path, state) return candidate except tinify.AccountError as exc: last_error = exc if getattr(exc, "status", None) == 429: if tinify.compression_count is not None: set_count(state, candidate, tinify.compression_count) else: set_count(state, candidate, candidate["monthly_limit"]) write_json(state_path, state) blocked.add(candidate["id"]) except (tinify.ConnectionError, tinify.ServerError) as exc: last_error = exc blocked.add(candidate["id"]) def run_batch(inputs, output, config_path, state_path, args, progress=None): """执行批量压缩;progress 回调用于 GUI 实时显示日志。""" def emit(message): if progress: progress(message) else: print(message) config, keys = load_keys(config_path) state = load_state(state_path) files, _ = collect_inputs(inputs, args.recursive) if not files: raise SystemExit("没有找到支持的图片文件。") tinify = None if args.dry_run else load_tinify() if tinify and args.sync_usage: sync_key_usage(tinify, keys, state, state_path, config, emit) emit("月份:{0}".format(state["month"])) emit("API Key 数量:{0}".format(len(keys))) emit("图片数量:{0}".format(len(files))) targets = convert_targets(args.convert) if args.convert == CONVERT_ALL: emit("转换格式:源格式/png/jpeg/webp/avif 全部输出") else: emit("转换格式:{0}".format(", ".join("源格式" if item == CONVERT_SOURCE else item for item in targets))) ok = 0 skipped = 0 failed = 0 total_outputs = len(files) * len(targets) for input_path, root in files: for target in targets: item_args = args_for_convert(args, target) output_convert = None if target == CONVERT_SOURCE else target output_path = output_path_for(input_path, root, output, total_outputs, output_convert) if os.path.exists(output_path) and not args.overwrite: skipped += 1 emit("跳过 {0} -> {1}".format(input_path, output_path)) continue if args.dry_run: emit("计划 {0} -> {1}".format(input_path, output_path)) continue try: used = optimize_one(tinify, input_path, output_path, keys, state, state_path, config, item_args) ok += 1 emit( "完成 {0} -> {1} [{2}: {3}/{4}]".format( input_path, output_path, used["name"], get_count(state, used), used["monthly_limit"], ) ) except Exception as exc: failed += 1 emit("失败 {0}: {1}".format(input_path, exc)) if args.dry_run: emit("试运行完成。") else: write_json(state_path, state) emit("处理完成。成功={0},跳过={1},失败={2}".format(ok, skipped, failed)) return {"ok": ok, "skipped": skipped, "failed": failed} def parse_args(argv): parser = argparse.ArgumentParser( description="使用无限个 Tinify API Key 进行图片压缩/转换,并按每月额度自动负载均衡。" ) parser.add_argument("inputs", nargs="*", help="图片文件或目录。") parser.add_argument("-o", "--output", default="optimized", help="输出文件或目录。默认:optimized") parser.add_argument("--config", default="tinify_keys.json", help="API Key 配置 JSON。") parser.add_argument("--state", default="tinify_usage.json", help="每月用量记录 JSON。") parser.add_argument("--init-config", metavar="PATH", help="创建配置模板后退出。") parser.add_argument("--recursive", action="store_true", help="递归扫描目录。") parser.add_argument("--overwrite", action="store_true", help="覆盖已存在的输出文件。") parser.add_argument( "--convert", action="append", choices=[CONVERT_SOURCE] + sorted(CONVERT_TYPES) + [CONVERT_ALL], help="转换输出格式,可重复传入;source 表示源格式,all 表示全部格式。", ) parser.add_argument("--resize-method", choices=("scale", "fit", "cover", "thumb"), help="Tinify 缩放方式。") parser.add_argument("--width", type=int, help="缩放宽度。") parser.add_argument("--height", type=int, help="缩放高度。") parser.add_argument("--dry-run", action="store_true", help="只显示计划,不调用 Tinify。") parser.add_argument("--no-sync-usage", action="store_true", help="跳过启动前的官方额度同步。") return parser.parse_args(argv) def main(argv=None): args = parse_args(argv or sys.argv[1:]) if args.init_config: init_config(args.init_config) return 0 if not args.inputs: raise SystemExit("请至少提供一个输入文件或目录。") if args.resize_method and not (args.width or args.height): raise SystemExit("--resize-method 需要同时提供 --width 和/或 --height。") batch_args = BatchArgs( resize_method=args.resize_method, width=args.width, height=args.height, convert=args.convert, recursive=args.recursive, overwrite=args.overwrite, dry_run=args.dry_run, sync_usage=not args.no_sync_usage, ) result = run_batch(args.inputs, args.output, args.config, args.state, batch_args) return 1 if result["failed"] else 0 if __name__ == "__main__": sys.exit(main())