diff --git a/README.md b/README.md index 8cc5558..c7bb84e 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,10 @@ Linux: ### 安装依赖 +```bash +pip install pycryptodome +``` + ### 快速开始 Windows: diff --git a/config.py b/config.py index 4d153a9..86435a5 100644 --- a/config.py +++ b/config.py @@ -11,18 +11,24 @@ import sys CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json") _SYSTEM = platform.system().lower() -_DEFAULT_TEMPLATE_DIR = ( - os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage") - if _SYSTEM == "linux" - else r"D:\xwechat_files\your_wxid\db_storage" -) + +if _SYSTEM == "linux": + _DEFAULT_TEMPLATE_DIR = os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage") + _DEFAULT_PROCESS = "wechat" +elif _SYSTEM == "darwin": + # macOS 使用独立的 C 扫描器 (find_all_keys_macos.c),此处仅提供 config 默认值 + _DEFAULT_TEMPLATE_DIR = os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage") + _DEFAULT_PROCESS = "WeChat" +else: + _DEFAULT_TEMPLATE_DIR = r"D:\xwechat_files\your_wxid\db_storage" + _DEFAULT_PROCESS = "Weixin.exe" _DEFAULT = { "db_dir": _DEFAULT_TEMPLATE_DIR, "keys_file": "all_keys.json", "decrypted_dir": "decrypted", "decoded_image_dir": "decoded_images", - "wechat_process": "wechat" if _SYSTEM == "linux" else "Weixin.exe", + "wechat_process": _DEFAULT_PROCESS, } @@ -97,16 +103,16 @@ def _auto_detect_db_dir_windows(): def _auto_detect_db_dir_linux(): - """自动检测 Linux 微信 db_storage 路径。""" + """自动检测 Linux 微信 db_storage 路径。 + + 优先搜索当前用户的 home 目录,避免以 root 运行时误检测其他用户的数据。 + """ seen = set() candidates = [] - search_roots = { + # 只搜索当前用户的 home 目录 + search_roots = [ os.path.expanduser("~/Documents/xwechat_files"), - } - - if os.path.isdir("/home"): - for entry in os.listdir("/home"): - search_roots.add(os.path.join("/home", entry, "Documents", "xwechat_files")) + ] for root in search_roots: if not os.path.isdir(root): @@ -118,13 +124,14 @@ def _auto_detect_db_dir_linux(): seen.add(normalized) candidates.append(match) + # 早期 Linux 微信版本(wine/容器方案)使用的数据路径 old_path = os.path.expanduser("~/.local/share/weixin/data/db_storage") if os.path.isdir(old_path): normalized = os.path.normcase(os.path.normpath(old_path)) if normalized not in seen: candidates.append(old_path) - # Linux 优先使用最近活跃账号:按 message 目录 mtime 降序 + # 优先使用最近活跃账号:按 message 目录 mtime 降序(近似排序,best-effort) def _mtime(path): msg_dir = os.path.join(path, "message") target = msg_dir if os.path.isdir(msg_dir) else path diff --git a/find_all_keys.py b/find_all_keys.py index c466740..eba2191 100644 --- a/find_all_keys.py +++ b/find_all_keys.py @@ -1,7 +1,9 @@ +import functools import platform import sys +@functools.lru_cache(maxsize=1) def _load_impl(): system = platform.system().lower() if system == "windows": @@ -10,7 +12,10 @@ def _load_impl(): if system == "linux": import find_all_keys_linux as impl return impl - raise RuntimeError(f"当前平台暂不支持通过 find_all_keys.py 提取密钥: {platform.system()}") + raise RuntimeError( + f"当前平台暂不支持通过 find_all_keys.py 提取密钥: {platform.system()}\n" + f"macOS 请使用 find_all_keys_macos.c (C 版扫描器)" + ) def get_pids(): diff --git a/find_all_keys_linux.py b/find_all_keys_linux.py index 51184e8..a98a970 100644 --- a/find_all_keys_linux.py +++ b/find_all_keys_linux.py @@ -9,26 +9,17 @@ WCDB 缓存的 x'<64hex_enc_key><32hex_salt>' 模式, 权限要求: root 或 CAP_SYS_PTRACE """ import functools -import hashlib -import hmac as hmac_mod -import json import os import re -import struct import sys import time +from key_scan_common import ( + collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results, +) + print = functools.partial(print, flush=True) -PAGE_SZ = 4096 -KEY_SZ = 32 -SALT_SZ = 16 - -from config import load_config -_cfg = load_config() -DB_DIR = _cfg["db_dir"] -OUT_FILE = _cfg["keys_file"] - def _safe_readlink(path): try: @@ -37,6 +28,18 @@ def _safe_readlink(path): return "" +def _is_wechat_process(pid): + """检查 pid 是否为微信进程。""" + try: + with open(f"/proc/{pid}/comm") as f: + comm = f.read().strip() + exe_name = os.path.basename(_safe_readlink(f"/proc/{pid}/exe")) or comm + haystack = " ".join((comm, exe_name)).lower() + return "wechat" in haystack or "weixin" in haystack + except (PermissionError, FileNotFoundError, ProcessLookupError): + return False + + def get_pids(): """返回所有疑似微信主进程的 (pid, rss_kb) 列表,按内存降序。""" pids = [] @@ -45,15 +48,11 @@ def get_pids(): continue pid = int(pid_str) try: - with open(f"/proc/{pid}/comm") as f: - comm = f.read().strip() + if not _is_wechat_process(pid): + continue with open(f"/proc/{pid}/statm") as f: rss_pages = int(f.read().split()[1]) rss_kb = rss_pages * 4 - exe_name = os.path.basename(_safe_readlink(f"/proc/{pid}/exe")) or comm - haystack = " ".join((comm, exe_name)).lower() - if "wechat" not in haystack and "weixin" not in haystack: - continue pids.append((pid, rss_kb)) except (PermissionError, FileNotFoundError, ProcessLookupError): continue @@ -68,8 +67,16 @@ def get_pids(): return pids +_SKIP_MAPPINGS = {"[vdso]", "[vsyscall]", "[vvar]"} +_SKIP_PATH_PREFIXES = ("/usr/lib/", "/lib/", "/usr/share/") + + def _get_readable_regions(pid): - """解析 /proc//maps,返回可读内存区域列表。""" + """解析 /proc//maps,返回可读内存区域列表。 + + 跳过 [vdso]、[vsyscall] 等特殊映射和系统库映射, + 聚焦匿名映射和堆区(WCDB 密钥缓存所在位置)。 + """ regions = [] with open(f"/proc/{pid}/maps") as f: for line in f: @@ -78,6 +85,13 @@ def _get_readable_regions(pid): continue if "r" not in parts[1]: continue + # 跳过特殊映射 + if len(parts) >= 6: + mapping_name = parts[5] + if mapping_name in _SKIP_MAPPINGS: + continue + if any(mapping_name.startswith(p) for p in _SKIP_PATH_PREFIXES): + continue start_s, end_s = parts[0].split("-") start = int(start_s, 16) size = int(end_s, 16) - start @@ -86,46 +100,44 @@ def _get_readable_regions(pid): return regions -def _collect_db_files(): - db_files = [] - salt_to_dbs = {} - for root, dirs, files in os.walk(DB_DIR): - for name in files: - if not name.endswith(".db") or name.endswith("-wal") or name.endswith("-shm"): - continue - path = os.path.join(root, name) - size = os.path.getsize(path) - if size < PAGE_SZ: - continue - with open(path, "rb") as f: - page1 = f.read(PAGE_SZ) - rel = os.path.relpath(path, DB_DIR) - salt = page1[:SALT_SZ].hex() - db_files.append((rel, path, size, salt, page1)) - salt_to_dbs.setdefault(salt, []).append(rel) - return db_files, salt_to_dbs - - -def _verify_enc_key(enc_key, db_page1): - salt = db_page1[:SALT_SZ] - mac_salt = bytes(b ^ 0x3A for b in salt) - mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ) - hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16] - stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ] - hm = hmac_mod.new(mac_key, hmac_data, hashlib.sha512) - hm.update(struct.pack(" 96 and hex_len % 2 == 0: - enc_key_hex = hex_str[:64] - salt_hex = hex_str[-32:] - - if salt_hex in remaining_salts: - enc_key = bytes.fromhex(enc_key_hex) - for rel, path, sz, s, page1 in db_files: - if s == salt_hex and _verify_enc_key(enc_key, page1): - key_map[salt_hex] = enc_key_hex - remaining_salts.discard(salt_hex) - dbs = salt_to_dbs[salt_hex] - print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})") - print(f" enc_key={enc_key_hex}") - print(f" PID={pid} 地址: 0x{addr:016X}") - print(f" 数据库: {', '.join(dbs)}") - break + all_hex_matches += scan_memory_for_keys( + data, hex_re, db_files, salt_to_dbs, + key_map, remaining_salts, base, pid, print, + ) if (reg_idx + 1) % 200 == 0: elapsed = time.time() - t0 @@ -246,53 +213,8 @@ def main(): elapsed = time.time() - t0 print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex 模式") - # 交叉验证:用已找到的 key 尝试未匹配的 salt - missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys()) - if missing_salts and key_map: - print(f"\n还有 {len(missing_salts)} 个 salt 未匹配,尝试交叉验证...") - for salt_hex in list(missing_salts): - for rel, path, sz, s, page1 in db_files: - if s == salt_hex: - for known_salt, known_key_hex in key_map.items(): - enc_key = bytes.fromhex(known_key_hex) - if _verify_enc_key(enc_key, page1): - key_map[salt_hex] = known_key_hex - print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}") - missing_salts.discard(salt_hex) - break - - # 输出结果 - print(f"\n{'=' * 60}") - print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥") - - result = {} - for rel, path, sz, salt_hex, page1 in db_files: - if salt_hex in key_map: - result[rel] = { - "enc_key": key_map[salt_hex], - "salt": salt_hex, - "size_mb": round(sz / 1024 / 1024, 1) - } - print(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)") - else: - print(f" MISSING: {rel} (salt={salt_hex})") - - if not result: - print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)") - raise RuntimeError("未能从任何微信进程中提取到密钥") - - result["_db_dir"] = DB_DIR - result["_platform"] = "linux" - result["_key_source"] = "memory_scan" - with open(OUT_FILE, 'w', encoding='utf-8') as f: - json.dump(result, f, indent=2, ensure_ascii=False) - print(f"\n密钥保存到: {OUT_FILE}") - - missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map] - if missing: - print(f"\n未找到密钥的数据库:") - for rel in missing: - print(f" {rel}") + cross_verify_keys(db_files, salt_to_dbs, key_map, print) + save_results(db_files, salt_to_dbs, key_map, db_dir, out_file, print) if __name__ == "__main__": diff --git a/find_all_keys_windows.py b/find_all_keys_windows.py index 85068f6..14ecdfd 100644 --- a/find_all_keys_windows.py +++ b/find_all_keys_windows.py @@ -6,24 +6,19 @@ salt嵌在hex字符串中,可以直接匹配DB文件的salt """ import ctypes import ctypes.wintypes as wt -import struct, os, sys, hashlib, time, re, json -import hmac as hmac_mod -from Crypto.Cipher import AES +import os, sys, time, re import functools print = functools.partial(print, flush=True) +from key_scan_common import ( + collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results, + PAGE_SZ, KEY_SZ, SALT_SZ, +) + kernel32 = ctypes.windll.kernel32 MEM_COMMIT = 0x1000 READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80} -PAGE_SZ = 4096 -KEY_SZ = 32 -SALT_SZ = 16 - -from config import load_config -_cfg = load_config() -DB_DIR = _cfg["db_dir"] -OUT_FILE = _cfg["keys_file"] class MBI(ctypes.Structure): @@ -81,42 +76,18 @@ def enum_regions(h): return regs -def verify_key_for_db(enc_key, db_page1): - """验证enc_key是否能解密这个DB的page 1""" - salt = db_page1[:SALT_SZ] - - # HMAC验证 (最可靠) - mac_salt = bytes(b ^ 0x3a for b in salt) - mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ) - hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16] - stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ] - h = hmac_mod.new(mac_key, hmac_data, hashlib.sha512) - h.update(struct.pack(' 96 and hex_len % 2 == 0: - enc_key_hex = hex_str[:64] - salt_hex = hex_str[-32:] - - if salt_hex in remaining_salts: - enc_key = bytes.fromhex(enc_key_hex) - for rel, path, sz, s, page1 in db_files: - if s == salt_hex and verify_key_for_db(enc_key, page1): - key_map[salt_hex] = enc_key_hex - remaining_salts.discard(salt_hex) - dbs = salt_to_dbs[salt_hex] - print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})") - print(f" enc_key={enc_key_hex}") - print(f" PID={pid} 地址: 0x{addr:016X}") - print(f" 数据库: {', '.join(dbs)}") - break + all_hex_matches += scan_memory_for_keys( + data, hex_re, db_files, salt_to_dbs, + key_map, remaining_salts, base, pid, print, + ) if (reg_idx + 1) % 200 == 0: elapsed = time.time() - t0 @@ -223,49 +143,8 @@ def main(): elapsed = time.time() - t0 print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式") - missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys()) - if missing_salts and key_map: - print(f"\n还有 {len(missing_salts)} 个salt未匹配,尝试交叉验证...") - for salt_hex in list(missing_salts): - for rel, path, sz, s, page1 in db_files: - if s == salt_hex: - for known_salt, known_key_hex in key_map.items(): - enc_key = bytes.fromhex(known_key_hex) - if verify_key_for_db(enc_key, page1): - key_map[salt_hex] = known_key_hex - print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}") - missing_salts.discard(salt_hex) - break - - print(f"\n{'=' * 60}") - print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥") - - result = {} - for rel, path, sz, salt_hex, page1 in db_files: - if salt_hex in key_map: - result[rel] = { - "enc_key": key_map[salt_hex], - "salt": salt_hex, - "size_mb": round(sz / 1024 / 1024, 1) - } - print(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)") - else: - print(f" MISSING: {rel} (salt={salt_hex})") - - if not result: - print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)") - raise RuntimeError("未能从任何微信进程中提取到密钥") - - result["_db_dir"] = DB_DIR - with open(OUT_FILE, 'w') as f: - json.dump(result, f, indent=2) - print(f"\n密钥保存到: {OUT_FILE}") - - missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map] - if missing: - print(f"\n未找到密钥的数据库:") - for rel in missing: - print(f" {rel}") + cross_verify_keys(db_files, salt_to_dbs, key_map, print) + save_results(db_files, salt_to_dbs, key_map, db_dir, out_file, print) if __name__ == '__main__': diff --git a/key_scan_common.py b/key_scan_common.py new file mode 100644 index 0000000..7975328 --- /dev/null +++ b/key_scan_common.py @@ -0,0 +1,169 @@ +""" +跨平台共享的内存扫描逻辑:HMAC 验证、DB 收集、hex 模式匹配与结果输出。 + +Windows / Linux 版分别实现进程发现和内存读取,共用此模块的核心算法。 +""" +import hashlib +import hmac as hmac_mod +import json +import os +import re +import struct +import time + +PAGE_SZ = 4096 +KEY_SZ = 32 +SALT_SZ = 16 + + +def verify_enc_key(enc_key, db_page1): + """通过 HMAC-SHA512 校验 page 1 验证 enc_key 是否正确。""" + salt = db_page1[:SALT_SZ] + mac_salt = bytes(b ^ 0x3A for b in salt) + mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ) + hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16] + stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ] + hm = hmac_mod.new(mac_key, hmac_data, hashlib.sha512) + hm.update(struct.pack(" 96 and hex_len % 2 == 0: + enc_key_hex = hex_str[:64] + salt_hex = hex_str[-32:] + if salt_hex in remaining_salts: + enc_key = bytes.fromhex(enc_key_hex) + for rel, path, sz, s, page1 in db_files: + if s == salt_hex and verify_enc_key(enc_key, page1): + key_map[salt_hex] = enc_key_hex + remaining_salts.discard(salt_hex) + dbs = salt_to_dbs[salt_hex] + print_fn(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})") + print_fn(f" enc_key={enc_key_hex}") + print_fn(f" PID={pid} 地址: 0x{addr:016X}") + print_fn(f" 数据库: {', '.join(dbs)}") + break + + return matches + + +def cross_verify_keys(db_files, salt_to_dbs, key_map, print_fn): + """用已找到的 key 交叉验证未匹配的 salt。""" + missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys()) + if not missing_salts or not key_map: + return + print_fn(f"\n还有 {len(missing_salts)} 个 salt 未匹配,尝试交叉验证...") + for salt_hex in list(missing_salts): + for rel, path, sz, s, page1 in db_files: + if s == salt_hex: + for known_salt, known_key_hex in key_map.items(): + enc_key = bytes.fromhex(known_key_hex) + if verify_enc_key(enc_key, page1): + key_map[salt_hex] = known_key_hex + print_fn(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}") + missing_salts.discard(salt_hex) + break + + +def save_results(db_files, salt_to_dbs, key_map, db_dir, out_file, print_fn): + """输出扫描结果并保存 JSON。""" + print_fn(f"\n{'=' * 60}") + print_fn(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥") + + result = {} + for rel, path, sz, salt_hex, page1 in db_files: + if salt_hex in key_map: + result[rel] = { + "enc_key": key_map[salt_hex], + "salt": salt_hex, + "size_mb": round(sz / 1024 / 1024, 1) + } + print_fn(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)") + else: + print_fn(f" MISSING: {rel} (salt={salt_hex})") + + if not result: + print_fn(f"\n[!] 未提取到任何密钥,保留已有的 {out_file}(如存在)") + raise RuntimeError("未能从任何微信进程中提取到密钥") + + result["_db_dir"] = db_dir + with open(out_file, 'w', encoding='utf-8') as f: + json.dump(result, f, indent=2, ensure_ascii=False) + print_fn(f"\n密钥保存到: {out_file}") + + missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map] + if missing: + print_fn(f"\n未找到密钥的数据库:") + for rel in missing: + print_fn(f" {rel}") diff --git a/key_utils.py b/key_utils.py index 0ba4c20..f189c41 100644 --- a/key_utils.py +++ b/key_utils.py @@ -1,11 +1,18 @@ import os +import posixpath def strip_key_metadata(keys): - """移除 all_keys.json 中以下划线开头的元数据字段。""" + """移除 all_keys.json 中以下划线开头的元数据字段,返回新 dict。""" return {k: v for k, v in keys.items() if not k.startswith("_")} +def _is_safe_rel_path(path): + """检查路径不包含 .. 等遍历组件。""" + normalized = path.replace("\\", "/") + return ".." not in posixpath.normpath(normalized).split("/") + + def key_path_variants(rel_path): """生成同一路径的多种分隔符表示,兼容 Windows/Linux JSON key。""" normalized = rel_path.replace("\\", "/") @@ -23,6 +30,8 @@ def key_path_variants(rel_path): def get_key_info(keys, rel_path): """按相对路径查找数据库密钥,自动兼容不同平台分隔符。""" + if not _is_safe_rel_path(rel_path): + return None for candidate in key_path_variants(rel_path): if candidate in keys and not candidate.startswith("_"): return keys[candidate] diff --git a/mcp_server.py b/mcp_server.py index 71db380..c6b4df0 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -250,7 +250,7 @@ def get_contact_names(): pass # 实时解密 - path = _cache.get("contact\\contact.db") + path = _cache.get(os.path.join("contact", "contact.db")) if path: try: _contact_names, _contact_full = _load_contacts_from(path) @@ -331,7 +331,8 @@ def _parse_message_content(content, local_type, is_group): return sender, text -# 消息 DB 的 rel_keys(排除 fts/resource/media/biz) +# 消息 DB 的 rel_keys +# 用 message_\d+\.db$ 匹配,自然排除 message_resource.db / message_fts_*.db MSG_DB_KEYS = sorted([ k for k in ALL_KEYS if any(v.startswith("message/") for v in key_path_variants(k)) @@ -381,7 +382,7 @@ def get_recent_sessions(limit: int = 20) -> str: Args: limit: 返回的会话数量,默认20 """ - path = _cache.get("session\\session.db") + path = _cache.get(os.path.join("session", "session.db")) if not path: return "错误: 无法解密 session.db" @@ -637,7 +638,7 @@ def get_new_messages() -> str: """获取自上次调用以来的新消息。首次调用返回最近的会话状态。""" global _last_check_state - path = _cache.get("session\\session.db") + path = _cache.get(os.path.join("session", "session.db")) if not path: return "错误: 无法解密 session.db" diff --git a/monitor_web.py b/monitor_web.py index 5c5b2bb..78e2957 100644 --- a/monitor_web.py +++ b/monitor_web.py @@ -13,11 +13,11 @@ from datetime import datetime from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn from Crypto.Cipher import AES -import urllib.parse -import glob as glob_mod -import zstandard as zstd -from decode_image import extract_md5_from_packed_info, decrypt_dat_file, is_v2_format -from key_utils import get_key_info, strip_key_metadata +import urllib.parse +import glob as glob_mod +import zstandard as zstd +from decode_image import extract_md5_from_packed_info, decrypt_dat_file, is_v2_format +from key_utils import get_key_info, strip_key_metadata _zstd_dctx = zstd.ZstdDecompressor() @@ -59,14 +59,14 @@ _emoji_lookup_lock = threading.Lock() _emoji_keys_dict = None # 保存 keys 引用供刷新用 _emoji_last_refresh = 0 -def _build_emoji_lookup(keys_dict): - """从 emoticon.db 构建 emoji md5 → URL 映射(直接解密,不走 cache)""" - global _emoji_lookup, _emoji_keys_dict, _emoji_last_refresh - _emoji_keys_dict = keys_dict - key_info = get_key_info(keys_dict, os.path.join("emoticon", "emoticon.db")) - if not key_info: - print("[emoji] 无 emoticon.db key,跳过", flush=True) - return +def _build_emoji_lookup(keys_dict): + """从 emoticon.db 构建 emoji md5 → URL 映射(直接解密,不走 cache)""" + global _emoji_lookup, _emoji_keys_dict, _emoji_last_refresh + _emoji_keys_dict = keys_dict + key_info = get_key_info(keys_dict, os.path.join("emoticon", "emoticon.db")) + if not key_info: + print("[emoji] 无 emoticon.db key,跳过", flush=True) + return src = os.path.join(DB_DIR, "emoticon", "emoticon.db") if not os.path.exists(src): @@ -253,18 +253,18 @@ class MonitorDBCache: with lock: self._state.pop(rel_key, None) - def get(self, rel_key): - """返回解密后的临时文件路径,mtime 变化时自动重新解密""" - key_info = get_key_info(self.keys, rel_key) - if not key_info: - return None - - lock = self._get_lock(rel_key) - with lock: - enc_key = bytes.fromhex(key_info["enc_key"]) - rel_path = rel_key.replace('\\', '/').replace('/', os.sep) - db_path = os.path.join(DB_DIR, rel_path) - wal_path = db_path + "-wal" + def get(self, rel_key): + """返回解密后的临时文件路径,mtime 变化时自动重新解密""" + key_info = get_key_info(self.keys, rel_key) + if not key_info: + return None + + lock = self._get_lock(rel_key) + with lock: + enc_key = bytes.fromhex(key_info["enc_key"]) + rel_path = rel_key.replace('\\', '/').replace('/', os.sep) + db_path = os.path.join(DB_DIR, rel_path) + wal_path = db_path + "-wal" if not os.path.exists(db_path): return None @@ -275,8 +275,8 @@ class MonitorDBCache: except OSError: return None - out_name = rel_key.replace('\\', '_').replace('/', '_') - out_path = os.path.join(self.tmp_dir, out_name) + out_name = rel_key.replace('\\', '_').replace('/', '_') + out_path = os.path.join(self.tmp_dir, out_name) prev = self._state.get(rel_key) @@ -315,7 +315,7 @@ def build_username_db_map(): # 先获取每个 DB 的 mtime 用于排序 db_mtimes = {} for i in range(5): - rel_key = f"message\\message_{i}.db" + rel_key = os.path.join("message", f"message_{i}.db") db_path = os.path.join(DB_DIR, "message", f"message_{i}.db") try: db_mtimes[rel_key] = os.path.getmtime(db_path) @@ -328,7 +328,7 @@ def build_username_db_map(): db_path = os.path.join(decrypted_msg_dir, f"message_{i}.db") if not os.path.exists(db_path): continue - rel_key = f"message\\message_{i}.db" + rel_key = os.path.join("message", f"message_{i}.db") try: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) for row in conn.execute("SELECT user_name FROM Name2Id").fetchall(): @@ -597,7 +597,7 @@ class SessionMonitor: # local_id 不全局唯一,需要同时匹配 create_time file_md5 = None for _try in range(2): - res_path = self.db_cache.get("message\\message_resource.db") + res_path = self.db_cache.get(os.path.join("message", "message_resource.db")) if not res_path: return None try: @@ -622,7 +622,7 @@ class SessionMonitor: except Exception as e: if 'malformed' in str(e) and _try == 0: print(f" [img] resource DB malformed, 强制刷新...", flush=True) - self.db_cache.invalidate("message\\message_resource.db") + self.db_cache.invalidate(os.path.join("message", "message_resource.db")) continue print(f" [img] 查询 message_resource 失败: {e}", flush=True) return None @@ -753,14 +753,14 @@ class SessionMonitor: if attempt < 2: time.sleep(delays[attempt]) - def _fresh_decrypt_query(self, db_key, table_name, prev_ts, curr_ts): - """独立解密 message DB 到临时文件并查询,避免共享缓存竞态""" - key_info = get_key_info(self.db_cache.keys, db_key) - if not key_info: - return [] - enc_key = bytes.fromhex(key_info["enc_key"]) - rel_path = db_key.replace('\\', '/').replace('/', os.sep) - db_path = os.path.join(DB_DIR, rel_path) + def _fresh_decrypt_query(self, db_key, table_name, prev_ts, curr_ts): + """独立解密 message DB 到临时文件并查询,避免共享缓存竞态""" + key_info = get_key_info(self.db_cache.keys, db_key) + if not key_info: + return [] + enc_key = bytes.fromhex(key_info["enc_key"]) + rel_path = db_key.replace('\\', '/').replace('/', os.sep) + db_path = os.path.join(DB_DIR, rel_path) wal_path = db_path + "-wal" if not os.path.exists(db_path): return [] @@ -1875,17 +1875,17 @@ class ThreadedServer(ThreadingMixIn, HTTPServer): def main(): print("=" * 60, flush=True) print(" 微信实时监听 (WAL增量 + SSE推送)", flush=True) - print("=" * 60, flush=True) - - with open(KEYS_FILE) as f: - keys = strip_key_metadata(json.load(f)) - - session_key_info = get_key_info(keys, os.path.join("session", "session.db")) - if not session_key_info: - print("[ERROR] 找不到 session.db 的密钥", flush=True) - sys.exit(1) - enc_key = bytes.fromhex(session_key_info["enc_key"]) - session_db = os.path.join(DB_DIR, "session", "session.db") + print("=" * 60, flush=True) + + with open(KEYS_FILE) as f: + keys = strip_key_metadata(json.load(f)) + + session_key_info = get_key_info(keys, os.path.join("session", "session.db")) + if not session_key_info: + print("[ERROR] 找不到 session.db 的密钥", flush=True) + sys.exit(1) + enc_key = bytes.fromhex(session_key_info["enc_key"]) + session_db = os.path.join(DB_DIR, "session", "session.db") print("加载联系人...", flush=True) contact_names = load_contact_names() @@ -1916,12 +1916,12 @@ def main(): # 后台预热所有 message DB(图片/emoji 解密必需) def _warmup(): try: - t0 = time.perf_counter() - warmup_keys = ["message\\message_resource.db"] - for i in range(5): - k = f"message\\message_{i}.db" - if get_key_info(keys, k): - warmup_keys.append(k) + t0 = time.perf_counter() + warmup_keys = [os.path.join("message", "message_resource.db")] + for i in range(5): + k = os.path.join("message", f"message_{i}.db") + if get_key_info(keys, k): + warmup_keys.append(k) for k in warmup_keys: t1 = time.perf_counter() try: