python-tinypng/tinypng_balancer.py

435 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""TinyPNG/Tinify 多 API Key 负载均衡压缩工具。
使用示例:
python tinypng_balancer.py --init-config tinify_keys.json
python tinypng_balancer.py input.png -o output.png --config tinify_keys.json
python tinypng_balancer.py images -o optimized --recursive --convert webp
"""
from __future__ import absolute_import, print_function
import argparse
import datetime as _dt
import hashlib
import json
import os
import sys
import tempfile
DEFAULT_LIMIT = 500
SUPPORTED_EXTENSIONS = (".png", ".jpg", ".jpeg", ".webp", ".avif")
CONVERT_TYPES = {
"png": ("image/png", ".png"),
"jpeg": ("image/jpeg", ".jpg"),
"jpg": ("image/jpeg", ".jpg"),
"webp": ("image/webp", ".webp"),
"avif": ("image/avif", ".avif"),
}
CONVERT_SOURCE = "source"
CONVERT_ALL = "all"
CONVERT_ALL_TARGETS = ("png", "jpeg", "webp", "avif")
class BatchArgs(object):
"""给命令行和 GUI 共用的简易参数对象。"""
def __init__(
self,
resize_method=None,
width=None,
height=None,
convert=None,
recursive=False,
overwrite=False,
dry_run=False,
sync_usage=True,
):
self.resize_method = resize_method
self.width = width
self.height = height
self.convert = convert
self.recursive = recursive
self.overwrite = overwrite
self.dry_run = dry_run
self.sync_usage = sync_usage
def current_month():
return _dt.date.today().strftime("%Y-%m")
def key_id(api_key):
return hashlib.sha256(api_key.encode("utf-8")).hexdigest()[:16]
def read_json(path, default):
if not os.path.exists(path):
return default
with open(path, "r", encoding="utf-8") as fh:
return json.load(fh)
def write_json(path, data):
directory = os.path.dirname(os.path.abspath(path))
if directory and not os.path.exists(directory):
os.makedirs(directory)
fd, tmp_path = tempfile.mkstemp(prefix=".tinify-", suffix=".json", dir=directory)
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2, sort_keys=True, ensure_ascii=False)
fh.write("\n")
os.replace(tmp_path, path)
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)
def init_config(path):
if os.path.exists(path):
raise SystemExit("配置文件已存在:{0}".format(path))
sample = {
"api_keys": [
{"name": "账号-1", "key": "YOUR_TINIFY_API_KEY_1", "monthly_limit": DEFAULT_LIMIT},
{"name": "账号-2", "key": "YOUR_TINIFY_API_KEY_2", "monthly_limit": DEFAULT_LIMIT},
],
"proxy": None,
}
write_json(path, sample)
print("已创建配置模板:{0}".format(path))
def load_tinify():
try:
import tinify
except ImportError as exc:
raise SystemExit("缺少依赖:{0}。请使用 uv run tinypng_gui.py 或 uv run tinypng_balancer.py 运行。".format(exc))
return tinify
def load_keys(config_path):
config = read_json(config_path, None)
if not config:
raise SystemExit("找不到配置文件。请先执行:--init-config {0}".format(config_path))
entries = config.get("api_keys", [])
keys = []
for index, item in enumerate(entries):
if isinstance(item, str):
item = {"key": item}
api_key = (item.get("key") or "").strip()
if not api_key or api_key.startswith("YOUR_TINIFY_API_KEY"):
continue
keys.append(
{
"id": key_id(api_key),
"key": api_key,
"name": item.get("name") or "key-{0}".format(index + 1),
"monthly_limit": int(item.get("monthly_limit") or DEFAULT_LIMIT),
}
)
if not keys:
raise SystemExit("配置文件中没有可用的 API Key{0}".format(config_path))
return config, keys
def load_state(path):
state = read_json(path, {})
month = current_month()
if state.get("month") != month:
state = {"month": month, "usage": {}}
state.setdefault("usage", {})
return state
def get_count(state, candidate):
return int(state["usage"].get(candidate["id"], 0))
def set_count(state, candidate, count):
state["usage"][candidate["id"]] = int(count)
def choose_key(keys, state, blocked):
# 按配置顺序使用 API Key先用完第一个再切到下一个。
available = [
key
for key in keys
if not key.get("invalid")
and key["id"] not in blocked
and get_count(state, key) < key["monthly_limit"]
]
if not available:
return None
return available[0]
def sync_key_usage(tinify, keys, state, state_path, config, emit):
"""调用官方 validate 接口,同步每个 API Key 的本月真实用量。"""
emit("正在同步官方本月用量...")
for candidate in keys:
tinify.key = candidate["key"]
tinify.proxy = config.get("proxy")
try:
tinify.validate()
if tinify.compression_count is not None:
set_count(state, candidate, tinify.compression_count)
emit(
"额度 {0}: {1}/{2}".format(
candidate["name"],
get_count(state, candidate),
candidate["monthly_limit"],
)
)
except tinify.AccountError as exc:
candidate["invalid"] = True
emit("禁用 {0}: {1}".format(candidate["name"], exc))
except (tinify.ConnectionError, tinify.ServerError) as exc:
emit("同步失败 {0}: {1}".format(candidate["name"], exc))
raise
write_json(state_path, state)
def collect_inputs(inputs, recursive):
files = []
roots = []
for path in inputs:
path = os.path.abspath(path)
if os.path.isdir(path):
roots.append(path)
if recursive:
for base, _, names in os.walk(path):
for name in names:
full_path = os.path.join(base, name)
if full_path.lower().endswith(SUPPORTED_EXTENSIONS):
files.append((full_path, path))
else:
for name in os.listdir(path):
full_path = os.path.join(path, name)
if os.path.isfile(full_path) and full_path.lower().endswith(SUPPORTED_EXTENSIONS):
files.append((full_path, path))
elif os.path.isfile(path):
files.append((path, os.path.dirname(path)))
else:
raise SystemExit("输入路径不存在:{0}".format(path))
return files, roots
def output_path_for(input_path, root, output, total_files, convert_to):
output = os.path.abspath(output)
_, original_ext = os.path.splitext(input_path)
final_ext = CONVERT_TYPES[convert_to][1] if convert_to else original_ext
if total_files == 1 and not os.path.isdir(output) and os.path.splitext(output)[1]:
return output
relative = os.path.relpath(input_path, root)
relative_base, _ = os.path.splitext(relative)
return os.path.join(output, relative_base + final_ext)
def convert_targets(convert_to):
if not convert_to:
return [CONVERT_SOURCE]
if isinstance(convert_to, (list, tuple)):
targets = []
for item in convert_to:
for target in convert_targets(item):
if target not in targets:
targets.append(target)
return targets
if convert_to == CONVERT_SOURCE:
return [CONVERT_SOURCE]
if convert_to == CONVERT_ALL:
return [CONVERT_SOURCE] + list(CONVERT_ALL_TARGETS)
return [convert_to]
def args_for_convert(args, convert_to):
return BatchArgs(
resize_method=args.resize_method,
width=args.width,
height=args.height,
convert=convert_to,
recursive=args.recursive,
overwrite=args.overwrite,
dry_run=args.dry_run,
sync_usage=args.sync_usage,
)
def build_source(tinify, input_path, args):
source = tinify.from_file(input_path)
if args.resize_method:
options = {"method": args.resize_method}
if args.width:
options["width"] = args.width
if args.height:
options["height"] = args.height
source = source.resize(**options)
if args.convert and args.convert != CONVERT_SOURCE:
source = source.convert(type=[CONVERT_TYPES[args.convert][0]])
return source
def optimize_one(tinify, input_path, output_path, keys, state, state_path, config, args):
blocked = set()
last_error = None
while True:
candidate = choose_key(keys, state, blocked)
if not candidate:
raise RuntimeError("所有 API Key 都已达到本月上限或暂不可用。最后错误:{0}".format(last_error))
tinify.key = candidate["key"]
tinify.proxy = config.get("proxy")
try:
directory = os.path.dirname(os.path.abspath(output_path))
if directory and not os.path.exists(directory):
os.makedirs(directory)
build_source(tinify, input_path, args).to_file(output_path)
if tinify.compression_count is not None:
set_count(state, candidate, tinify.compression_count)
else:
set_count(state, candidate, get_count(state, candidate) + 1)
write_json(state_path, state)
return candidate
except tinify.AccountError as exc:
last_error = exc
if getattr(exc, "status", None) == 429:
if tinify.compression_count is not None:
set_count(state, candidate, tinify.compression_count)
else:
set_count(state, candidate, candidate["monthly_limit"])
write_json(state_path, state)
blocked.add(candidate["id"])
except (tinify.ConnectionError, tinify.ServerError) as exc:
last_error = exc
blocked.add(candidate["id"])
def run_batch(inputs, output, config_path, state_path, args, progress=None):
"""执行批量压缩progress 回调用于 GUI 实时显示日志。"""
def emit(message):
if progress:
progress(message)
else:
print(message)
config, keys = load_keys(config_path)
state = load_state(state_path)
files, _ = collect_inputs(inputs, args.recursive)
if not files:
raise SystemExit("没有找到支持的图片文件。")
tinify = None if args.dry_run else load_tinify()
if tinify and args.sync_usage:
sync_key_usage(tinify, keys, state, state_path, config, emit)
emit("月份:{0}".format(state["month"]))
emit("API Key 数量:{0}".format(len(keys)))
emit("图片数量:{0}".format(len(files)))
targets = convert_targets(args.convert)
if args.convert == CONVERT_ALL:
emit("转换格式:源格式/png/jpeg/webp/avif 全部输出")
else:
emit("转换格式:{0}".format(", ".join("源格式" if item == CONVERT_SOURCE else item for item in targets)))
ok = 0
skipped = 0
failed = 0
total_outputs = len(files) * len(targets)
for input_path, root in files:
for target in targets:
item_args = args_for_convert(args, target)
output_convert = None if target == CONVERT_SOURCE else target
output_path = output_path_for(input_path, root, output, total_outputs, output_convert)
if os.path.exists(output_path) and not args.overwrite:
skipped += 1
emit("跳过 {0} -> {1}".format(input_path, output_path))
continue
if args.dry_run:
emit("计划 {0} -> {1}".format(input_path, output_path))
continue
try:
used = optimize_one(tinify, input_path, output_path, keys, state, state_path, config, item_args)
ok += 1
emit(
"完成 {0} -> {1} [{2}: {3}/{4}]".format(
input_path,
output_path,
used["name"],
get_count(state, used),
used["monthly_limit"],
)
)
except Exception as exc:
failed += 1
emit("失败 {0}: {1}".format(input_path, exc))
if args.dry_run:
emit("试运行完成。")
else:
write_json(state_path, state)
emit("处理完成。成功={0},跳过={1},失败={2}".format(ok, skipped, failed))
return {"ok": ok, "skipped": skipped, "failed": failed}
def parse_args(argv):
parser = argparse.ArgumentParser(
description="使用无限个 Tinify API Key 进行图片压缩/转换,并按每月额度自动负载均衡。"
)
parser.add_argument("inputs", nargs="*", help="图片文件或目录。")
parser.add_argument("-o", "--output", default="optimized", help="输出文件或目录。默认optimized")
parser.add_argument("--config", default="tinify_keys.json", help="API Key 配置 JSON。")
parser.add_argument("--state", default="tinify_usage.json", help="每月用量记录 JSON。")
parser.add_argument("--init-config", metavar="PATH", help="创建配置模板后退出。")
parser.add_argument("--recursive", action="store_true", help="递归扫描目录。")
parser.add_argument("--overwrite", action="store_true", help="覆盖已存在的输出文件。")
parser.add_argument(
"--convert",
action="append",
choices=[CONVERT_SOURCE] + sorted(CONVERT_TYPES) + [CONVERT_ALL],
help="转换输出格式可重复传入source 表示源格式all 表示全部格式。",
)
parser.add_argument("--resize-method", choices=("scale", "fit", "cover", "thumb"), help="Tinify 缩放方式。")
parser.add_argument("--width", type=int, help="缩放宽度。")
parser.add_argument("--height", type=int, help="缩放高度。")
parser.add_argument("--dry-run", action="store_true", help="只显示计划,不调用 Tinify。")
parser.add_argument("--no-sync-usage", action="store_true", help="跳过启动前的官方额度同步。")
return parser.parse_args(argv)
def main(argv=None):
args = parse_args(argv or sys.argv[1:])
if args.init_config:
init_config(args.init_config)
return 0
if not args.inputs:
raise SystemExit("请至少提供一个输入文件或目录。")
if args.resize_method and not (args.width or args.height):
raise SystemExit("--resize-method 需要同时提供 --width 和/或 --height。")
batch_args = BatchArgs(
resize_method=args.resize_method,
width=args.width,
height=args.height,
convert=args.convert,
recursive=args.recursive,
overwrite=args.overwrite,
dry_run=args.dry_run,
sync_usage=not args.no_sync_usage,
)
result = run_batch(args.inputs, args.output, args.config, args.state, batch_args)
return 1 if result["failed"] else 0
if __name__ == "__main__":
sys.exit(main())