435 lines
15 KiB
Python
435 lines
15 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""TinyPNG/Tinify 多 API Key 负载均衡压缩工具。
|
||
|
||
使用示例:
|
||
python tinypng_balancer.py --init-config tinify_keys.json
|
||
python tinypng_balancer.py input.png -o output.png --config tinify_keys.json
|
||
python tinypng_balancer.py images -o optimized --recursive --convert webp
|
||
"""
|
||
from __future__ import absolute_import, print_function
|
||
|
||
import argparse
|
||
import datetime as _dt
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import sys
|
||
import tempfile
|
||
|
||
DEFAULT_LIMIT = 500
|
||
SUPPORTED_EXTENSIONS = (".png", ".jpg", ".jpeg", ".webp", ".avif")
|
||
CONVERT_TYPES = {
|
||
"png": ("image/png", ".png"),
|
||
"jpeg": ("image/jpeg", ".jpg"),
|
||
"jpg": ("image/jpeg", ".jpg"),
|
||
"webp": ("image/webp", ".webp"),
|
||
"avif": ("image/avif", ".avif"),
|
||
}
|
||
CONVERT_SOURCE = "source"
|
||
CONVERT_ALL = "all"
|
||
CONVERT_ALL_TARGETS = ("png", "jpeg", "webp", "avif")
|
||
|
||
|
||
class BatchArgs(object):
|
||
"""给命令行和 GUI 共用的简易参数对象。"""
|
||
|
||
def __init__(
|
||
self,
|
||
resize_method=None,
|
||
width=None,
|
||
height=None,
|
||
convert=None,
|
||
recursive=False,
|
||
overwrite=False,
|
||
dry_run=False,
|
||
sync_usage=True,
|
||
):
|
||
self.resize_method = resize_method
|
||
self.width = width
|
||
self.height = height
|
||
self.convert = convert
|
||
self.recursive = recursive
|
||
self.overwrite = overwrite
|
||
self.dry_run = dry_run
|
||
self.sync_usage = sync_usage
|
||
|
||
|
||
def current_month():
|
||
return _dt.date.today().strftime("%Y-%m")
|
||
|
||
|
||
def key_id(api_key):
|
||
return hashlib.sha256(api_key.encode("utf-8")).hexdigest()[:16]
|
||
|
||
|
||
def read_json(path, default):
|
||
if not os.path.exists(path):
|
||
return default
|
||
with open(path, "r", encoding="utf-8") as fh:
|
||
return json.load(fh)
|
||
|
||
|
||
def write_json(path, data):
|
||
directory = os.path.dirname(os.path.abspath(path))
|
||
if directory and not os.path.exists(directory):
|
||
os.makedirs(directory)
|
||
fd, tmp_path = tempfile.mkstemp(prefix=".tinify-", suffix=".json", dir=directory)
|
||
try:
|
||
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
||
json.dump(data, fh, indent=2, sort_keys=True, ensure_ascii=False)
|
||
fh.write("\n")
|
||
os.replace(tmp_path, path)
|
||
finally:
|
||
if os.path.exists(tmp_path):
|
||
os.remove(tmp_path)
|
||
|
||
|
||
def init_config(path):
|
||
if os.path.exists(path):
|
||
raise SystemExit("配置文件已存在:{0}".format(path))
|
||
sample = {
|
||
"api_keys": [
|
||
{"name": "账号-1", "key": "YOUR_TINIFY_API_KEY_1", "monthly_limit": DEFAULT_LIMIT},
|
||
{"name": "账号-2", "key": "YOUR_TINIFY_API_KEY_2", "monthly_limit": DEFAULT_LIMIT},
|
||
],
|
||
"proxy": None,
|
||
}
|
||
write_json(path, sample)
|
||
print("已创建配置模板:{0}".format(path))
|
||
|
||
|
||
def load_tinify():
|
||
try:
|
||
import tinify
|
||
except ImportError as exc:
|
||
raise SystemExit("缺少依赖:{0}。请使用 uv run tinypng_gui.py 或 uv run tinypng_balancer.py 运行。".format(exc))
|
||
return tinify
|
||
|
||
|
||
def load_keys(config_path):
|
||
config = read_json(config_path, None)
|
||
if not config:
|
||
raise SystemExit("找不到配置文件。请先执行:--init-config {0}".format(config_path))
|
||
|
||
entries = config.get("api_keys", [])
|
||
keys = []
|
||
for index, item in enumerate(entries):
|
||
if isinstance(item, str):
|
||
item = {"key": item}
|
||
api_key = (item.get("key") or "").strip()
|
||
if not api_key or api_key.startswith("YOUR_TINIFY_API_KEY"):
|
||
continue
|
||
keys.append(
|
||
{
|
||
"id": key_id(api_key),
|
||
"key": api_key,
|
||
"name": item.get("name") or "key-{0}".format(index + 1),
|
||
"monthly_limit": int(item.get("monthly_limit") or DEFAULT_LIMIT),
|
||
}
|
||
)
|
||
if not keys:
|
||
raise SystemExit("配置文件中没有可用的 API Key:{0}".format(config_path))
|
||
return config, keys
|
||
|
||
|
||
def load_state(path):
|
||
state = read_json(path, {})
|
||
month = current_month()
|
||
if state.get("month") != month:
|
||
state = {"month": month, "usage": {}}
|
||
state.setdefault("usage", {})
|
||
return state
|
||
|
||
|
||
def get_count(state, candidate):
|
||
return int(state["usage"].get(candidate["id"], 0))
|
||
|
||
|
||
def set_count(state, candidate, count):
|
||
state["usage"][candidate["id"]] = int(count)
|
||
|
||
|
||
def choose_key(keys, state, blocked):
|
||
# 按配置顺序使用 API Key:先用完第一个,再切到下一个。
|
||
available = [
|
||
key
|
||
for key in keys
|
||
if not key.get("invalid")
|
||
and key["id"] not in blocked
|
||
and get_count(state, key) < key["monthly_limit"]
|
||
]
|
||
if not available:
|
||
return None
|
||
return available[0]
|
||
|
||
|
||
def sync_key_usage(tinify, keys, state, state_path, config, emit):
|
||
"""调用官方 validate 接口,同步每个 API Key 的本月真实用量。"""
|
||
emit("正在同步官方本月用量...")
|
||
for candidate in keys:
|
||
tinify.key = candidate["key"]
|
||
tinify.proxy = config.get("proxy")
|
||
try:
|
||
tinify.validate()
|
||
if tinify.compression_count is not None:
|
||
set_count(state, candidate, tinify.compression_count)
|
||
emit(
|
||
"额度 {0}: {1}/{2}".format(
|
||
candidate["name"],
|
||
get_count(state, candidate),
|
||
candidate["monthly_limit"],
|
||
)
|
||
)
|
||
except tinify.AccountError as exc:
|
||
candidate["invalid"] = True
|
||
emit("禁用 {0}: {1}".format(candidate["name"], exc))
|
||
except (tinify.ConnectionError, tinify.ServerError) as exc:
|
||
emit("同步失败 {0}: {1}".format(candidate["name"], exc))
|
||
raise
|
||
write_json(state_path, state)
|
||
|
||
|
||
def collect_inputs(inputs, recursive):
|
||
files = []
|
||
roots = []
|
||
for path in inputs:
|
||
path = os.path.abspath(path)
|
||
if os.path.isdir(path):
|
||
roots.append(path)
|
||
if recursive:
|
||
for base, _, names in os.walk(path):
|
||
for name in names:
|
||
full_path = os.path.join(base, name)
|
||
if full_path.lower().endswith(SUPPORTED_EXTENSIONS):
|
||
files.append((full_path, path))
|
||
else:
|
||
for name in os.listdir(path):
|
||
full_path = os.path.join(path, name)
|
||
if os.path.isfile(full_path) and full_path.lower().endswith(SUPPORTED_EXTENSIONS):
|
||
files.append((full_path, path))
|
||
elif os.path.isfile(path):
|
||
files.append((path, os.path.dirname(path)))
|
||
else:
|
||
raise SystemExit("输入路径不存在:{0}".format(path))
|
||
return files, roots
|
||
|
||
|
||
def output_path_for(input_path, root, output, total_files, convert_to):
|
||
output = os.path.abspath(output)
|
||
_, original_ext = os.path.splitext(input_path)
|
||
final_ext = CONVERT_TYPES[convert_to][1] if convert_to else original_ext
|
||
|
||
if total_files == 1 and not os.path.isdir(output) and os.path.splitext(output)[1]:
|
||
return output
|
||
|
||
relative = os.path.relpath(input_path, root)
|
||
relative_base, _ = os.path.splitext(relative)
|
||
return os.path.join(output, relative_base + final_ext)
|
||
|
||
|
||
def convert_targets(convert_to):
|
||
if not convert_to:
|
||
return [CONVERT_SOURCE]
|
||
if isinstance(convert_to, (list, tuple)):
|
||
targets = []
|
||
for item in convert_to:
|
||
for target in convert_targets(item):
|
||
if target not in targets:
|
||
targets.append(target)
|
||
return targets
|
||
if convert_to == CONVERT_SOURCE:
|
||
return [CONVERT_SOURCE]
|
||
if convert_to == CONVERT_ALL:
|
||
return [CONVERT_SOURCE] + list(CONVERT_ALL_TARGETS)
|
||
return [convert_to]
|
||
|
||
|
||
def args_for_convert(args, convert_to):
|
||
return BatchArgs(
|
||
resize_method=args.resize_method,
|
||
width=args.width,
|
||
height=args.height,
|
||
convert=convert_to,
|
||
recursive=args.recursive,
|
||
overwrite=args.overwrite,
|
||
dry_run=args.dry_run,
|
||
sync_usage=args.sync_usage,
|
||
)
|
||
|
||
|
||
def build_source(tinify, input_path, args):
|
||
source = tinify.from_file(input_path)
|
||
if args.resize_method:
|
||
options = {"method": args.resize_method}
|
||
if args.width:
|
||
options["width"] = args.width
|
||
if args.height:
|
||
options["height"] = args.height
|
||
source = source.resize(**options)
|
||
if args.convert and args.convert != CONVERT_SOURCE:
|
||
source = source.convert(type=[CONVERT_TYPES[args.convert][0]])
|
||
return source
|
||
|
||
|
||
def optimize_one(tinify, input_path, output_path, keys, state, state_path, config, args):
|
||
blocked = set()
|
||
last_error = None
|
||
|
||
while True:
|
||
candidate = choose_key(keys, state, blocked)
|
||
if not candidate:
|
||
raise RuntimeError("所有 API Key 都已达到本月上限或暂不可用。最后错误:{0}".format(last_error))
|
||
|
||
tinify.key = candidate["key"]
|
||
tinify.proxy = config.get("proxy")
|
||
|
||
try:
|
||
directory = os.path.dirname(os.path.abspath(output_path))
|
||
if directory and not os.path.exists(directory):
|
||
os.makedirs(directory)
|
||
build_source(tinify, input_path, args).to_file(output_path)
|
||
if tinify.compression_count is not None:
|
||
set_count(state, candidate, tinify.compression_count)
|
||
else:
|
||
set_count(state, candidate, get_count(state, candidate) + 1)
|
||
write_json(state_path, state)
|
||
return candidate
|
||
except tinify.AccountError as exc:
|
||
last_error = exc
|
||
if getattr(exc, "status", None) == 429:
|
||
if tinify.compression_count is not None:
|
||
set_count(state, candidate, tinify.compression_count)
|
||
else:
|
||
set_count(state, candidate, candidate["monthly_limit"])
|
||
write_json(state_path, state)
|
||
blocked.add(candidate["id"])
|
||
except (tinify.ConnectionError, tinify.ServerError) as exc:
|
||
last_error = exc
|
||
blocked.add(candidate["id"])
|
||
|
||
|
||
def run_batch(inputs, output, config_path, state_path, args, progress=None):
|
||
"""执行批量压缩;progress 回调用于 GUI 实时显示日志。"""
|
||
|
||
def emit(message):
|
||
if progress:
|
||
progress(message)
|
||
else:
|
||
print(message)
|
||
|
||
config, keys = load_keys(config_path)
|
||
state = load_state(state_path)
|
||
files, _ = collect_inputs(inputs, args.recursive)
|
||
if not files:
|
||
raise SystemExit("没有找到支持的图片文件。")
|
||
|
||
tinify = None if args.dry_run else load_tinify()
|
||
if tinify and args.sync_usage:
|
||
sync_key_usage(tinify, keys, state, state_path, config, emit)
|
||
|
||
emit("月份:{0}".format(state["month"]))
|
||
emit("API Key 数量:{0}".format(len(keys)))
|
||
emit("图片数量:{0}".format(len(files)))
|
||
targets = convert_targets(args.convert)
|
||
if args.convert == CONVERT_ALL:
|
||
emit("转换格式:源格式/png/jpeg/webp/avif 全部输出")
|
||
else:
|
||
emit("转换格式:{0}".format(", ".join("源格式" if item == CONVERT_SOURCE else item for item in targets)))
|
||
|
||
ok = 0
|
||
skipped = 0
|
||
failed = 0
|
||
total_outputs = len(files) * len(targets)
|
||
|
||
for input_path, root in files:
|
||
for target in targets:
|
||
item_args = args_for_convert(args, target)
|
||
output_convert = None if target == CONVERT_SOURCE else target
|
||
output_path = output_path_for(input_path, root, output, total_outputs, output_convert)
|
||
if os.path.exists(output_path) and not args.overwrite:
|
||
skipped += 1
|
||
emit("跳过 {0} -> {1}".format(input_path, output_path))
|
||
continue
|
||
if args.dry_run:
|
||
emit("计划 {0} -> {1}".format(input_path, output_path))
|
||
continue
|
||
try:
|
||
used = optimize_one(tinify, input_path, output_path, keys, state, state_path, config, item_args)
|
||
ok += 1
|
||
emit(
|
||
"完成 {0} -> {1} [{2}: {3}/{4}]".format(
|
||
input_path,
|
||
output_path,
|
||
used["name"],
|
||
get_count(state, used),
|
||
used["monthly_limit"],
|
||
)
|
||
)
|
||
except Exception as exc:
|
||
failed += 1
|
||
emit("失败 {0}: {1}".format(input_path, exc))
|
||
|
||
if args.dry_run:
|
||
emit("试运行完成。")
|
||
else:
|
||
write_json(state_path, state)
|
||
emit("处理完成。成功={0},跳过={1},失败={2}".format(ok, skipped, failed))
|
||
|
||
return {"ok": ok, "skipped": skipped, "failed": failed}
|
||
|
||
|
||
def parse_args(argv):
|
||
parser = argparse.ArgumentParser(
|
||
description="使用无限个 Tinify API Key 进行图片压缩/转换,并按每月额度自动负载均衡。"
|
||
)
|
||
parser.add_argument("inputs", nargs="*", help="图片文件或目录。")
|
||
parser.add_argument("-o", "--output", default="optimized", help="输出文件或目录。默认:optimized")
|
||
parser.add_argument("--config", default="tinify_keys.json", help="API Key 配置 JSON。")
|
||
parser.add_argument("--state", default="tinify_usage.json", help="每月用量记录 JSON。")
|
||
parser.add_argument("--init-config", metavar="PATH", help="创建配置模板后退出。")
|
||
parser.add_argument("--recursive", action="store_true", help="递归扫描目录。")
|
||
parser.add_argument("--overwrite", action="store_true", help="覆盖已存在的输出文件。")
|
||
parser.add_argument(
|
||
"--convert",
|
||
action="append",
|
||
choices=[CONVERT_SOURCE] + sorted(CONVERT_TYPES) + [CONVERT_ALL],
|
||
help="转换输出格式,可重复传入;source 表示源格式,all 表示全部格式。",
|
||
)
|
||
parser.add_argument("--resize-method", choices=("scale", "fit", "cover", "thumb"), help="Tinify 缩放方式。")
|
||
parser.add_argument("--width", type=int, help="缩放宽度。")
|
||
parser.add_argument("--height", type=int, help="缩放高度。")
|
||
parser.add_argument("--dry-run", action="store_true", help="只显示计划,不调用 Tinify。")
|
||
parser.add_argument("--no-sync-usage", action="store_true", help="跳过启动前的官方额度同步。")
|
||
return parser.parse_args(argv)
|
||
|
||
|
||
def main(argv=None):
|
||
args = parse_args(argv or sys.argv[1:])
|
||
|
||
if args.init_config:
|
||
init_config(args.init_config)
|
||
return 0
|
||
|
||
if not args.inputs:
|
||
raise SystemExit("请至少提供一个输入文件或目录。")
|
||
if args.resize_method and not (args.width or args.height):
|
||
raise SystemExit("--resize-method 需要同时提供 --width 和/或 --height。")
|
||
|
||
batch_args = BatchArgs(
|
||
resize_method=args.resize_method,
|
||
width=args.width,
|
||
height=args.height,
|
||
convert=args.convert,
|
||
recursive=args.recursive,
|
||
overwrite=args.overwrite,
|
||
dry_run=args.dry_run,
|
||
sync_usage=not args.no_sync_usage,
|
||
)
|
||
result = run_batch(args.inputs, args.output, args.config, args.state, batch_args)
|
||
return 1 if result["failed"] else 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|