refactor(find_all_keys): extract shared key scan logic

feat/daemon-cli
PeanutSplash 2026-03-06 16:43:50 +08:00 committed by ylytdeng
parent 872e3f58dc
commit 6d9b2c0fe4
9 changed files with 365 additions and 369 deletions

View File

@ -45,6 +45,10 @@ Linux
### 安装依赖
```bash
pip install pycryptodome
```
### 快速开始
Windows

View File

@ -11,18 +11,24 @@ import sys
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
_SYSTEM = platform.system().lower()
_DEFAULT_TEMPLATE_DIR = (
os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage")
if _SYSTEM == "linux"
else r"D:\xwechat_files\your_wxid\db_storage"
)
if _SYSTEM == "linux":
_DEFAULT_TEMPLATE_DIR = os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage")
_DEFAULT_PROCESS = "wechat"
elif _SYSTEM == "darwin":
# macOS 使用独立的 C 扫描器 (find_all_keys_macos.c),此处仅提供 config 默认值
_DEFAULT_TEMPLATE_DIR = os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage")
_DEFAULT_PROCESS = "WeChat"
else:
_DEFAULT_TEMPLATE_DIR = r"D:\xwechat_files\your_wxid\db_storage"
_DEFAULT_PROCESS = "Weixin.exe"
_DEFAULT = {
"db_dir": _DEFAULT_TEMPLATE_DIR,
"keys_file": "all_keys.json",
"decrypted_dir": "decrypted",
"decoded_image_dir": "decoded_images",
"wechat_process": "wechat" if _SYSTEM == "linux" else "Weixin.exe",
"wechat_process": _DEFAULT_PROCESS,
}
@ -97,16 +103,16 @@ def _auto_detect_db_dir_windows():
def _auto_detect_db_dir_linux():
"""自动检测 Linux 微信 db_storage 路径。"""
"""自动检测 Linux 微信 db_storage 路径。
优先搜索当前用户的 home 目录避免以 root 运行时误检测其他用户的数据
"""
seen = set()
candidates = []
search_roots = {
# 只搜索当前用户的 home 目录
search_roots = [
os.path.expanduser("~/Documents/xwechat_files"),
}
if os.path.isdir("/home"):
for entry in os.listdir("/home"):
search_roots.add(os.path.join("/home", entry, "Documents", "xwechat_files"))
]
for root in search_roots:
if not os.path.isdir(root):
@ -118,13 +124,14 @@ def _auto_detect_db_dir_linux():
seen.add(normalized)
candidates.append(match)
# 早期 Linux 微信版本wine/容器方案)使用的数据路径
old_path = os.path.expanduser("~/.local/share/weixin/data/db_storage")
if os.path.isdir(old_path):
normalized = os.path.normcase(os.path.normpath(old_path))
if normalized not in seen:
candidates.append(old_path)
# Linux 优先使用最近活跃账号:按 message 目录 mtime 降序
# 优先使用最近活跃账号:按 message 目录 mtime 降序近似排序best-effort
def _mtime(path):
msg_dir = os.path.join(path, "message")
target = msg_dir if os.path.isdir(msg_dir) else path

View File

@ -1,7 +1,9 @@
import functools
import platform
import sys
@functools.lru_cache(maxsize=1)
def _load_impl():
system = platform.system().lower()
if system == "windows":
@ -10,7 +12,10 @@ def _load_impl():
if system == "linux":
import find_all_keys_linux as impl
return impl
raise RuntimeError(f"当前平台暂不支持通过 find_all_keys.py 提取密钥: {platform.system()}")
raise RuntimeError(
f"当前平台暂不支持通过 find_all_keys.py 提取密钥: {platform.system()}\n"
f"macOS 请使用 find_all_keys_macos.c (C 版扫描器)"
)
def get_pids():

View File

@ -9,26 +9,17 @@ WCDB 缓存的 x'<64hex_enc_key><32hex_salt>' 模式,
权限要求: root CAP_SYS_PTRACE
"""
import functools
import hashlib
import hmac as hmac_mod
import json
import os
import re
import struct
import sys
import time
from key_scan_common import (
collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results,
)
print = functools.partial(print, flush=True)
PAGE_SZ = 4096
KEY_SZ = 32
SALT_SZ = 16
from config import load_config
_cfg = load_config()
DB_DIR = _cfg["db_dir"]
OUT_FILE = _cfg["keys_file"]
def _safe_readlink(path):
try:
@ -37,6 +28,18 @@ def _safe_readlink(path):
return ""
def _is_wechat_process(pid):
"""检查 pid 是否为微信进程。"""
try:
with open(f"/proc/{pid}/comm") as f:
comm = f.read().strip()
exe_name = os.path.basename(_safe_readlink(f"/proc/{pid}/exe")) or comm
haystack = " ".join((comm, exe_name)).lower()
return "wechat" in haystack or "weixin" in haystack
except (PermissionError, FileNotFoundError, ProcessLookupError):
return False
def get_pids():
"""返回所有疑似微信主进程的 (pid, rss_kb) 列表,按内存降序。"""
pids = []
@ -45,15 +48,11 @@ def get_pids():
continue
pid = int(pid_str)
try:
with open(f"/proc/{pid}/comm") as f:
comm = f.read().strip()
if not _is_wechat_process(pid):
continue
with open(f"/proc/{pid}/statm") as f:
rss_pages = int(f.read().split()[1])
rss_kb = rss_pages * 4
exe_name = os.path.basename(_safe_readlink(f"/proc/{pid}/exe")) or comm
haystack = " ".join((comm, exe_name)).lower()
if "wechat" not in haystack and "weixin" not in haystack:
continue
pids.append((pid, rss_kb))
except (PermissionError, FileNotFoundError, ProcessLookupError):
continue
@ -68,8 +67,16 @@ def get_pids():
return pids
_SKIP_MAPPINGS = {"[vdso]", "[vsyscall]", "[vvar]"}
_SKIP_PATH_PREFIXES = ("/usr/lib/", "/lib/", "/usr/share/")
def _get_readable_regions(pid):
"""解析 /proc/<pid>/maps返回可读内存区域列表。"""
"""解析 /proc/<pid>/maps返回可读内存区域列表。
跳过 [vdso][vsyscall] 等特殊映射和系统库映射
聚焦匿名映射和堆区WCDB 密钥缓存所在位置
"""
regions = []
with open(f"/proc/{pid}/maps") as f:
for line in f:
@ -78,6 +85,13 @@ def _get_readable_regions(pid):
continue
if "r" not in parts[1]:
continue
# 跳过特殊映射
if len(parts) >= 6:
mapping_name = parts[5]
if mapping_name in _SKIP_MAPPINGS:
continue
if any(mapping_name.startswith(p) for p in _SKIP_PATH_PREFIXES):
continue
start_s, end_s = parts[0].split("-")
start = int(start_s, 16)
size = int(end_s, 16) - start
@ -86,46 +100,44 @@ def _get_readable_regions(pid):
return regions
def _collect_db_files():
db_files = []
salt_to_dbs = {}
for root, dirs, files in os.walk(DB_DIR):
for name in files:
if not name.endswith(".db") or name.endswith("-wal") or name.endswith("-shm"):
continue
path = os.path.join(root, name)
size = os.path.getsize(path)
if size < PAGE_SZ:
continue
with open(path, "rb") as f:
page1 = f.read(PAGE_SZ)
rel = os.path.relpath(path, DB_DIR)
salt = page1[:SALT_SZ].hex()
db_files.append((rel, path, size, salt, page1))
salt_to_dbs.setdefault(salt, []).append(rel)
return db_files, salt_to_dbs
def _verify_enc_key(enc_key, db_page1):
salt = db_page1[:SALT_SZ]
mac_salt = bytes(b ^ 0x3A for b in salt)
mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ)
hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16]
stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ]
hm = hmac_mod.new(mac_key, hmac_data, hashlib.sha512)
hm.update(struct.pack("<I", 1))
return hm.digest() == stored_hmac
def _check_permissions():
"""检查是否有读取进程内存的权限root 或 CAP_SYS_PTRACE"""
if os.geteuid() == 0:
return
# 检查 CAP_SYS_PTRACE: 读取 /proc/self/status 中的 CapEff
try:
with open("/proc/self/status") as f:
for line in f:
if line.startswith("CapEff:"):
cap_eff = int(line.split(":")[1].strip(), 16)
CAP_SYS_PTRACE = 1 << 19
if cap_eff & CAP_SYS_PTRACE:
return
break
except (OSError, ValueError):
pass
print("[!] 需要 root 权限或 CAP_SYS_PTRACE 才能读取进程内存")
print(" 请使用: sudo python3 find_all_keys.py")
print(" 或授予 capability: sudo setcap cap_sys_ptrace=ep $(which python3)")
sys.exit(1)
def main():
from config import load_config
_cfg = load_config()
db_dir = _cfg["db_dir"]
out_file = _cfg["keys_file"]
_check_permissions()
print("=" * 60)
print(" 提取 Linux 微信数据库密钥(内存扫描)")
print("=" * 60)
# 1. 收集 DB 文件和 salt
db_files, salt_to_dbs = _collect_db_files()
db_files, salt_to_dbs = collect_db_files(db_dir)
if not db_files:
raise RuntimeError(f"{DB_DIR} 未找到可解密的 .db 文件")
raise RuntimeError(f"{db_dir} 未找到可解密的 .db 文件")
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的 salt")
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
@ -164,6 +176,12 @@ def main():
print(f"[WARN] PID {pid} 已退出,跳过")
continue
# 防御 TOCTOU: 打开 mem 后再次确认仍为微信进程
if not _is_wechat_process(pid):
print(f"[WARN] PID {pid} 已不是微信进程,跳过")
mem.close()
continue
try:
for reg_idx, (base, size) in enumerate(regions):
try:
@ -173,61 +191,10 @@ def main():
continue
scanned_bytes += len(data)
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base + m.start()
all_hex_matches += 1
hex_len = len(hex_str)
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and _verify_enc_key(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len == 64:
if not remaining_salts:
continue
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db in remaining_salts and _verify_enc_key(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
remaining_salts.discard(salt_hex_db)
dbs = salt_to_dbs[salt_hex_db]
print(f"\n [FOUND] salt={salt_hex_db}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and _verify_enc_key(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
all_hex_matches += scan_memory_for_keys(
data, hex_re, db_files, salt_to_dbs,
key_map, remaining_salts, base, pid, print,
)
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
@ -246,53 +213,8 @@ def main():
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex 模式")
# 交叉验证:用已找到的 key 尝试未匹配的 salt
missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys())
if missing_salts and key_map:
print(f"\n还有 {len(missing_salts)} 个 salt 未匹配,尝试交叉验证...")
for salt_hex in list(missing_salts):
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
for known_salt, known_key_hex in key_map.items():
enc_key = bytes.fromhex(known_key_hex)
if _verify_enc_key(enc_key, page1):
key_map[salt_hex] = known_key_hex
print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}")
missing_salts.discard(salt_hex)
break
# 输出结果
print(f"\n{'=' * 60}")
print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥")
result = {}
for rel, path, sz, salt_hex, page1 in db_files:
if salt_hex in key_map:
result[rel] = {
"enc_key": key_map[salt_hex],
"salt": salt_hex,
"size_mb": round(sz / 1024 / 1024, 1)
}
print(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)")
else:
print(f" MISSING: {rel} (salt={salt_hex})")
if not result:
print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)")
raise RuntimeError("未能从任何微信进程中提取到密钥")
result["_db_dir"] = DB_DIR
result["_platform"] = "linux"
result["_key_source"] = "memory_scan"
with open(OUT_FILE, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"\n密钥保存到: {OUT_FILE}")
missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map]
if missing:
print(f"\n未找到密钥的数据库:")
for rel in missing:
print(f" {rel}")
cross_verify_keys(db_files, salt_to_dbs, key_map, print)
save_results(db_files, salt_to_dbs, key_map, db_dir, out_file, print)
if __name__ == "__main__":

View File

@ -6,24 +6,19 @@ salt嵌在hex字符串中可以直接匹配DB文件的salt
"""
import ctypes
import ctypes.wintypes as wt
import struct, os, sys, hashlib, time, re, json
import hmac as hmac_mod
from Crypto.Cipher import AES
import os, sys, time, re
import functools
print = functools.partial(print, flush=True)
from key_scan_common import (
collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results,
PAGE_SZ, KEY_SZ, SALT_SZ,
)
kernel32 = ctypes.windll.kernel32
MEM_COMMIT = 0x1000
READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80}
PAGE_SZ = 4096
KEY_SZ = 32
SALT_SZ = 16
from config import load_config
_cfg = load_config()
DB_DIR = _cfg["db_dir"]
OUT_FILE = _cfg["keys_file"]
class MBI(ctypes.Structure):
@ -81,42 +76,18 @@ def enum_regions(h):
return regs
def verify_key_for_db(enc_key, db_page1):
"""验证enc_key是否能解密这个DB的page 1"""
salt = db_page1[:SALT_SZ]
# HMAC验证 (最可靠)
mac_salt = bytes(b ^ 0x3a for b in salt)
mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ)
hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16]
stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ]
h = hmac_mod.new(mac_key, hmac_data, hashlib.sha512)
h.update(struct.pack('<I', 1))
return h.digest() == stored_hmac
def main():
from config import load_config
_cfg = load_config()
db_dir = _cfg["db_dir"]
out_file = _cfg["keys_file"]
print("=" * 60)
print(" 提取所有微信数据库密钥")
print("=" * 60)
# 1. 收集所有DB文件及其salt
db_files = []
salt_to_dbs = {}
for root, dirs, files in os.walk(DB_DIR):
for f in files:
if f.endswith('.db') and not f.endswith('-wal') and not f.endswith('-shm'):
path = os.path.join(root, f)
rel = os.path.relpath(path, DB_DIR)
sz = os.path.getsize(path)
if sz < PAGE_SZ:
continue
with open(path, 'rb') as fh:
page1 = fh.read(PAGE_SZ)
salt = page1[:SALT_SZ].hex()
db_files.append((rel, path, sz, salt, page1))
salt_to_dbs.setdefault(salt, []).append(rel)
db_files, salt_to_dbs = collect_db_files(db_dir)
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的salt")
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
@ -131,7 +102,7 @@ def main():
all_hex_matches = 0
t0 = time.time()
for pid, mem in pids:
for pid, mem_kb in pids:
h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid)
if not h:
print(f"[WARN] 无法打开进程 PID={pid},跳过")
@ -150,61 +121,10 @@ def main():
if not data:
continue
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base + m.start()
all_hex_matches += 1
hex_len = len(hex_str)
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len == 64:
if not remaining_salts:
continue
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db in remaining_salts and verify_key_for_db(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
remaining_salts.discard(salt_hex_db)
dbs = salt_to_dbs[salt_hex_db]
print(f"\n [FOUND] salt={salt_hex_db}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
all_hex_matches += scan_memory_for_keys(
data, hex_re, db_files, salt_to_dbs,
key_map, remaining_salts, base, pid, print,
)
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
@ -223,49 +143,8 @@ def main():
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式")
missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys())
if missing_salts and key_map:
print(f"\n还有 {len(missing_salts)} 个salt未匹配尝试交叉验证...")
for salt_hex in list(missing_salts):
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
for known_salt, known_key_hex in key_map.items():
enc_key = bytes.fromhex(known_key_hex)
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = known_key_hex
print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}")
missing_salts.discard(salt_hex)
break
print(f"\n{'=' * 60}")
print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥")
result = {}
for rel, path, sz, salt_hex, page1 in db_files:
if salt_hex in key_map:
result[rel] = {
"enc_key": key_map[salt_hex],
"salt": salt_hex,
"size_mb": round(sz / 1024 / 1024, 1)
}
print(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)")
else:
print(f" MISSING: {rel} (salt={salt_hex})")
if not result:
print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)")
raise RuntimeError("未能从任何微信进程中提取到密钥")
result["_db_dir"] = DB_DIR
with open(OUT_FILE, 'w') as f:
json.dump(result, f, indent=2)
print(f"\n密钥保存到: {OUT_FILE}")
missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map]
if missing:
print(f"\n未找到密钥的数据库:")
for rel in missing:
print(f" {rel}")
cross_verify_keys(db_files, salt_to_dbs, key_map, print)
save_results(db_files, salt_to_dbs, key_map, db_dir, out_file, print)
if __name__ == '__main__':

169
key_scan_common.py 100644
View File

@ -0,0 +1,169 @@
"""
跨平台共享的内存扫描逻辑HMAC 验证DB 收集hex 模式匹配与结果输出
Windows / Linux 版分别实现进程发现和内存读取共用此模块的核心算法
"""
import hashlib
import hmac as hmac_mod
import json
import os
import re
import struct
import time
PAGE_SZ = 4096
KEY_SZ = 32
SALT_SZ = 16
def verify_enc_key(enc_key, db_page1):
"""通过 HMAC-SHA512 校验 page 1 验证 enc_key 是否正确。"""
salt = db_page1[:SALT_SZ]
mac_salt = bytes(b ^ 0x3A for b in salt)
mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ)
hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16]
stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ]
hm = hmac_mod.new(mac_key, hmac_data, hashlib.sha512)
hm.update(struct.pack("<I", 1))
return hm.digest() == stored_hmac
def collect_db_files(db_dir):
"""遍历 db_dir 收集所有 .db 文件及其 salt。
返回 (db_files, salt_to_dbs):
db_files: [(rel_path, abs_path, size, salt_hex, page1_bytes), ...]
salt_to_dbs: {salt_hex: [rel_path, ...]}
"""
db_files = []
salt_to_dbs = {}
for root, dirs, files in os.walk(db_dir):
for name in files:
if not name.endswith(".db") or name.endswith("-wal") or name.endswith("-shm"):
continue
path = os.path.join(root, name)
size = os.path.getsize(path)
if size < PAGE_SZ:
continue
with open(path, "rb") as f:
page1 = f.read(PAGE_SZ)
rel = os.path.relpath(path, db_dir)
salt = page1[:SALT_SZ].hex()
db_files.append((rel, path, size, salt, page1))
salt_to_dbs.setdefault(salt, []).append(rel)
return db_files, salt_to_dbs
def scan_memory_for_keys(data, hex_re, db_files, salt_to_dbs, key_map,
remaining_salts, base_addr, pid, print_fn):
"""扫描一段内存数据,匹配 hex 模式并验证密钥。
返回本次扫描匹配到的 hex 模式数量
"""
matches = 0
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base_addr + m.start()
matches += 1
hex_len = len(hex_str)
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and verify_enc_key(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print_fn(f"\n [FOUND] salt={salt_hex}")
print_fn(f" enc_key={enc_key_hex}")
print_fn(f" PID={pid} 地址: 0x{addr:016X}")
print_fn(f" 数据库: {', '.join(dbs)}")
break
elif hex_len == 64:
if not remaining_salts:
continue
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db in remaining_salts and verify_enc_key(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
remaining_salts.discard(salt_hex_db)
dbs = salt_to_dbs[salt_hex_db]
print_fn(f"\n [FOUND] salt={salt_hex_db}")
print_fn(f" enc_key={enc_key_hex}")
print_fn(f" PID={pid} 地址: 0x{addr:016X}")
print_fn(f" 数据库: {', '.join(dbs)}")
break
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and verify_enc_key(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print_fn(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print_fn(f" enc_key={enc_key_hex}")
print_fn(f" PID={pid} 地址: 0x{addr:016X}")
print_fn(f" 数据库: {', '.join(dbs)}")
break
return matches
def cross_verify_keys(db_files, salt_to_dbs, key_map, print_fn):
"""用已找到的 key 交叉验证未匹配的 salt。"""
missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys())
if not missing_salts or not key_map:
return
print_fn(f"\n还有 {len(missing_salts)} 个 salt 未匹配,尝试交叉验证...")
for salt_hex in list(missing_salts):
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
for known_salt, known_key_hex in key_map.items():
enc_key = bytes.fromhex(known_key_hex)
if verify_enc_key(enc_key, page1):
key_map[salt_hex] = known_key_hex
print_fn(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}")
missing_salts.discard(salt_hex)
break
def save_results(db_files, salt_to_dbs, key_map, db_dir, out_file, print_fn):
"""输出扫描结果并保存 JSON。"""
print_fn(f"\n{'=' * 60}")
print_fn(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥")
result = {}
for rel, path, sz, salt_hex, page1 in db_files:
if salt_hex in key_map:
result[rel] = {
"enc_key": key_map[salt_hex],
"salt": salt_hex,
"size_mb": round(sz / 1024 / 1024, 1)
}
print_fn(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)")
else:
print_fn(f" MISSING: {rel} (salt={salt_hex})")
if not result:
print_fn(f"\n[!] 未提取到任何密钥,保留已有的 {out_file}(如存在)")
raise RuntimeError("未能从任何微信进程中提取到密钥")
result["_db_dir"] = db_dir
with open(out_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print_fn(f"\n密钥保存到: {out_file}")
missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map]
if missing:
print_fn(f"\n未找到密钥的数据库:")
for rel in missing:
print_fn(f" {rel}")

View File

@ -1,11 +1,18 @@
import os
import posixpath
def strip_key_metadata(keys):
"""移除 all_keys.json 中以下划线开头的元数据字段"""
"""移除 all_keys.json 中以下划线开头的元数据字段,返回新 dict"""
return {k: v for k, v in keys.items() if not k.startswith("_")}
def _is_safe_rel_path(path):
"""检查路径不包含 .. 等遍历组件。"""
normalized = path.replace("\\", "/")
return ".." not in posixpath.normpath(normalized).split("/")
def key_path_variants(rel_path):
"""生成同一路径的多种分隔符表示,兼容 Windows/Linux JSON key。"""
normalized = rel_path.replace("\\", "/")
@ -23,6 +30,8 @@ def key_path_variants(rel_path):
def get_key_info(keys, rel_path):
"""按相对路径查找数据库密钥,自动兼容不同平台分隔符。"""
if not _is_safe_rel_path(rel_path):
return None
for candidate in key_path_variants(rel_path):
if candidate in keys and not candidate.startswith("_"):
return keys[candidate]

View File

@ -250,7 +250,7 @@ def get_contact_names():
pass
# 实时解密
path = _cache.get("contact\\contact.db")
path = _cache.get(os.path.join("contact", "contact.db"))
if path:
try:
_contact_names, _contact_full = _load_contacts_from(path)
@ -331,7 +331,8 @@ def _parse_message_content(content, local_type, is_group):
return sender, text
# 消息 DB 的 rel_keys排除 fts/resource/media/biz
# 消息 DB 的 rel_keys
# 用 message_\d+\.db$ 匹配,自然排除 message_resource.db / message_fts_*.db
MSG_DB_KEYS = sorted([
k for k in ALL_KEYS
if any(v.startswith("message/") for v in key_path_variants(k))
@ -381,7 +382,7 @@ def get_recent_sessions(limit: int = 20) -> str:
Args:
limit: 返回的会话数量默认20
"""
path = _cache.get("session\\session.db")
path = _cache.get(os.path.join("session", "session.db"))
if not path:
return "错误: 无法解密 session.db"
@ -637,7 +638,7 @@ def get_new_messages() -> str:
"""获取自上次调用以来的新消息。首次调用返回最近的会话状态。"""
global _last_check_state
path = _cache.get("session\\session.db")
path = _cache.get(os.path.join("session", "session.db"))
if not path:
return "错误: 无法解密 session.db"

View File

@ -315,7 +315,7 @@ def build_username_db_map():
# 先获取每个 DB 的 mtime 用于排序
db_mtimes = {}
for i in range(5):
rel_key = f"message\\message_{i}.db"
rel_key = os.path.join("message", f"message_{i}.db")
db_path = os.path.join(DB_DIR, "message", f"message_{i}.db")
try:
db_mtimes[rel_key] = os.path.getmtime(db_path)
@ -328,7 +328,7 @@ def build_username_db_map():
db_path = os.path.join(decrypted_msg_dir, f"message_{i}.db")
if not os.path.exists(db_path):
continue
rel_key = f"message\\message_{i}.db"
rel_key = os.path.join("message", f"message_{i}.db")
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
for row in conn.execute("SELECT user_name FROM Name2Id").fetchall():
@ -597,7 +597,7 @@ class SessionMonitor:
# local_id 不全局唯一,需要同时匹配 create_time
file_md5 = None
for _try in range(2):
res_path = self.db_cache.get("message\\message_resource.db")
res_path = self.db_cache.get(os.path.join("message", "message_resource.db"))
if not res_path:
return None
try:
@ -622,7 +622,7 @@ class SessionMonitor:
except Exception as e:
if 'malformed' in str(e) and _try == 0:
print(f" [img] resource DB malformed, 强制刷新...", flush=True)
self.db_cache.invalidate("message\\message_resource.db")
self.db_cache.invalidate(os.path.join("message", "message_resource.db"))
continue
print(f" [img] 查询 message_resource 失败: {e}", flush=True)
return None
@ -1917,9 +1917,9 @@ def main():
def _warmup():
try:
t0 = time.perf_counter()
warmup_keys = ["message\\message_resource.db"]
warmup_keys = [os.path.join("message", "message_resource.db")]
for i in range(5):
k = f"message\\message_{i}.db"
k = os.path.join("message", f"message_{i}.db")
if get_key_info(keys, k):
warmup_keys.append(k)
for k in warmup_keys: