fix(config): handle corrupted config file and improve encoding detection

feat/daemon-cli
PeanutSplash 2026-03-03 22:49:03 +08:00
parent eb6471d42c
commit fd4a2fce31
No known key found for this signature in database
GPG Key ID: A7CBACC1085C9A05
4 changed files with 128 additions and 84 deletions

View File

@ -35,13 +35,20 @@ def auto_detect_db_dir():
data_roots = []
for ini_file in glob.glob(os.path.join(config_dir, "*.ini")):
try:
with open(ini_file, "r", encoding="utf-8") as f:
content = f.read(1024).strip()
if not content or "\n" in content or "\x00" in content:
# 微信 ini 可能是 utf-8 或 gbk 编码(中文路径)
content = None
for enc in ("utf-8", "gbk"):
try:
with open(ini_file, "r", encoding=enc) as f:
content = f.read(1024).strip()
break
except UnicodeDecodeError:
continue
if not content or any(c in content for c in "\n\r\x00"):
continue
if os.path.isdir(content):
data_roots.append(content)
except (OSError, UnicodeDecodeError):
except OSError:
continue
# 在每个根目录下搜索 xwechat_files\*\db_storage
@ -82,8 +89,12 @@ def auto_detect_db_dir():
def load_config():
cfg = {}
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE) as f:
cfg = json.load(f)
try:
with open(CONFIG_FILE) as f:
cfg = json.load(f)
except json.JSONDecodeError:
print(f"[!] {CONFIG_FILE} 格式损坏,将使用默认配置")
cfg = {}
# db_dir 缺失或仍为模板值时,尝试自动检测
db_dir = cfg.get("db_dir", "")
@ -91,7 +102,6 @@ def load_config():
detected = auto_detect_db_dir()
if detected:
print(f"[+] 自动检测到微信数据目录: {detected}")
cfg["db_dir"] = detected
# 合并默认值并保存
cfg = {**_DEFAULT, **cfg, "db_dir": detected}
with open(CONFIG_FILE, "w") as f:

View File

@ -118,6 +118,7 @@ def main():
with open(KEYS_FILE) as f:
keys = json.load(f)
keys.pop("_db_dir", None)
print(f"\n加载 {len(keys)} 个数据库密钥")
print(f"输出目录: {OUT_DIR}")
os.makedirs(OUT_DIR, exist_ok=True)

View File

@ -121,6 +121,7 @@ def main():
hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'")
key_map = {} # salt_hex -> enc_key_hex
remaining_salts = set(salt_to_dbs.keys())
all_hex_matches = 0
t0 = time.time()
@ -130,78 +131,87 @@ def main():
print(f"[WARN] 无法打开进程 PID={pid},跳过")
continue
regions = enum_regions(h)
total_mb = sum(s for _,s in regions)/1024/1024
print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)")
try:
regions = enum_regions(h)
total_bytes = sum(s for _,s in regions)
total_mb = total_bytes/1024/1024
print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)")
for reg_idx, (base, size) in enumerate(regions):
data = read_mem(h, base, size)
if not data: continue
scanned_bytes = 0
for reg_idx, (base, size) in enumerate(regions):
data = read_mem(h, base, size)
scanned_bytes += size
if not data: continue
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base + m.start()
all_hex_matches += 1
hex_len = len(hex_str)
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base + m.start()
all_hex_matches += 1
hex_len = len(hex_str)
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
if salt_hex in salt_to_dbs and salt_hex not in key_map:
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len == 64:
if not remaining_salts:
continue
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db in remaining_salts:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex}")
key_map[salt_hex_db] = enc_key_hex
remaining_salts.discard(salt_hex_db)
dbs = salt_to_dbs[salt_hex_db]
print(f"\n [FOUND] salt={salt_hex_db}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
break
elif hex_len == 64:
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db not in key_map:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
dbs = salt_to_dbs[salt_hex_db]
print(f"\n [FOUND] salt={salt_hex_db}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
if salt_hex in salt_to_dbs and salt_hex not in key_map:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = sum(s for b,s in regions[:reg_idx+1]) / sum(s for _,s in regions) * 100
print(f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s")
kernel32.CloseHandle(h)
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
print(f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s")
finally:
kernel32.CloseHandle(h)
# 所有 salt 都找到了就提前退出
if len(key_map) == len(salt_to_dbs):
if not remaining_salts:
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
break
@ -240,6 +250,12 @@ def main():
else:
print(f" MISSING: {rel} (salt={salt_hex})")
if not result:
print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)")
raise RuntimeError("未能从任何微信进程中提取到密钥")
# 写入密钥并记录对应的 db_dir防止切换账号后误复用
result["_db_dir"] = DB_DIR
with open(OUT_FILE, 'w') as f:
json.dump(result, f, indent=2)
print(f"\n密钥保存到: {OUT_FILE}")
@ -251,6 +267,9 @@ def main():
print(f" {rel}")
if __name__ == '__main__':
main()
try:
main()
except RuntimeError as e:
print(f"\n[ERROR] {e}")
sys.exit(1)

48
main.py
View File

@ -6,7 +6,6 @@ python main.py decrypt # 提取密钥 + 解密全部数据库
"""
import json
import os
import subprocess
import sys
import functools
@ -15,21 +14,29 @@ print = functools.partial(print, flush=True)
def check_wechat_running():
"""检查微信是否在运行,返回 True/False"""
r = subprocess.run(
["tasklist", "/FI", "IMAGENAME eq Weixin.exe", "/FO", "CSV", "/NH"],
capture_output=True, text=True,
)
for line in r.stdout.strip().split("\n"):
if "Weixin.exe" in line:
return True
return False
from find_all_keys import get_pids
try:
get_pids()
return True
except RuntimeError:
return False
def ensure_keys(keys_file):
"""确保密钥文件存在,不存在则自动提取"""
def ensure_keys(keys_file, db_dir):
"""确保密钥文件存在且匹配当前 db_dir否则重新提取"""
if os.path.exists(keys_file):
with open(keys_file) as f:
keys = json.load(f)
try:
with open(keys_file) as f:
keys = json.load(f)
except (json.JSONDecodeError, ValueError):
keys = {}
# 检查密钥是否匹配当前 db_dir防止切换账号后误复用旧密钥
saved_dir = keys.pop("_db_dir", None)
if saved_dir and os.path.normcase(os.path.normpath(saved_dir)) != os.path.normcase(os.path.normpath(db_dir)):
print(f"[!] 密钥文件对应的目录已变更,需要重新提取")
print(f" 旧: {saved_dir}")
print(f" 新: {db_dir}")
keys = {}
if keys:
print(f"[+] 已有 {len(keys)} 个数据库密钥")
return
@ -37,15 +44,22 @@ def ensure_keys(keys_file):
print("[*] 密钥文件不存在,正在从微信进程提取...")
print()
from find_all_keys import main as extract_keys
extract_keys()
try:
extract_keys()
except RuntimeError as e:
print(f"\n[!] 密钥提取失败: {e}")
sys.exit(1)
print()
# 提取后再次检查
if not os.path.exists(keys_file):
print("[!] 密钥提取失败")
sys.exit(1)
with open(keys_file) as f:
keys = json.load(f)
try:
with open(keys_file) as f:
keys = json.load(f)
except (json.JSONDecodeError, ValueError):
keys = {}
if not keys:
print("[!] 未能提取到任何密钥")
print(" 可能原因:选择了错误的微信数据目录,或微信需要重启")
@ -71,7 +85,7 @@ def main():
print("[+] 微信进程运行中")
# 3. 提取密钥
ensure_keys(cfg["keys_file"])
ensure_keys(cfg["keys_file"], cfg["db_dir"])
# 4. 根据子命令执行
cmd = sys.argv[1] if len(sys.argv) > 1 else "web"