feat: auto-detect config, unified entry point & multi-process key extraction (#8)

feat: auto-detect config, unified entry point & multi-process key extraction
feat/daemon-cli
joshua-deng 2026-03-03 22:56:52 +08:00 committed by GitHub
commit e3efaac510
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 334 additions and 140 deletions

View File

@ -38,15 +38,18 @@ WCDB (微信的 SQLCipher 封装) 会在进程内存中缓存派生后的 raw ke
pip install pycryptodome
```
### 1. 配置
### 快速开始
复制配置模板并修改
确保微信正在运行,以**管理员权限**执行
```bash
copy config.example.json config.json
python main.py # 实时消息监听 (Web UI)
python main.py decrypt # 解密全部数据库到 decrypted/
```
编辑 `config.json`
程序会自动完成:配置检测 → 密钥提取 → 启动。首次运行会自动检测微信数据目录并生成 `config.json`
如果自动检测失败(例如微信安装在非默认位置),手动创建 `config.json`
```json
{
"db_dir": "D:\\xwechat_files\\你的微信ID\\db_storage",
@ -58,33 +61,9 @@ copy config.example.json config.json
`db_dir` 路径可以在 微信设置 → 文件管理 中找到。
### 2. 提取密钥
### Web UI 说明
确保微信正在运行,以**管理员权限**运行:
```bash
python find_all_keys.py
```
密钥将保存到 `all_keys.json`
### 3. 解密数据库
```bash
python decrypt_db.py
```
解密后的数据库保存在 `decrypted/` 目录,可以直接用 SQLite 工具打开。
### 4. 实时消息监听
#### Web UI (推荐)
```bash
python monitor_web.py
```
打开 http://localhost:5678 查看实时消息流。
`python main.py` 启动后打开 http://localhost:5678 查看实时消息流。
- 30ms 轮询 WAL 文件变化 (mtime)
- 检测到变化后全量解密 + WAL patch (~70ms)
@ -92,15 +71,7 @@ python monitor_web.py
- 总延迟约 100ms
- **图片消息内联预览**(支持旧 XOR / V1 / V2 三种 .dat 加密格式)
#### 命令行
```bash
python monitor.py
```
每 3 秒轮询一次,在终端显示新消息。
### 5. MCP Server (Claude AI 集成)
### MCP Server (Claude AI 集成)
将微信数据查询能力接入 [Claude Code](https://claude.ai/claude-code),让 AI 直接读取你的微信消息。
@ -138,11 +109,11 @@ claude mcp add wechat -- python C:\Users\你的用户名\wechat-decrypt\mcp_serv
| `get_contacts(query, limit)` | 搜索/列出联系人 |
| `get_new_messages()` | 获取自上次调用以来的新消息 |
前置条件:需要先完成步骤 1-2配置 + 提取密钥)
前置条件:需要先运行 `python main.py``python find_all_keys.py` 完成密钥提取
**[查看使用案例 →](USAGE.md)**
### 6. 图片解密 (V2 格式)
### 图片解密 (V2 格式)
微信 4.0 (2025-08+) 的 .dat 图片文件使用 AES-128-ECB + XOR 混合加密 (V2 格式)。AES 密钥需要从运行中的微信进程内存中提取:
@ -163,7 +134,8 @@ python find_image_key.py
| 文件 | 说明 |
|------|------|
| `config.py` | 配置加载器 |
| `main.py` | **一键启动入口** — 自动配置、提取密钥、启动服务 |
| `config.py` | 配置加载器(自动检测微信数据目录) |
| `find_all_keys.py` | 从微信进程内存提取所有数据库密钥 |
| `decrypt_db.py` | 全量解密所有数据库 |
| `mcp_server.py` | MCP Server让 Claude AI 查询微信数据 |

110
config.py
View File

@ -1,15 +1,18 @@
"""
配置加载器 - config.json 读取路径配置
首次运行时自动生成 config.json 模板
首次运行时自动检测微信数据目录检测失败则提示手动配置
"""
import glob
import json
import os
import sys
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
_DEFAULT_TEMPLATE_DIR = r"D:\xwechat_files\your_wxid\db_storage"
_DEFAULT = {
"db_dir": r"D:\xwechat_files\your_wxid\db_storage",
"db_dir": _DEFAULT_TEMPLATE_DIR,
"keys_file": "all_keys.json",
"decrypted_dir": "decrypted",
"decoded_image_dir": "decoded_images",
@ -17,16 +20,101 @@ _DEFAULT = {
}
def load_config():
if not os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "w") as f:
json.dump(_DEFAULT, f, indent=4)
print(f"[!] 已生成配置文件: {CONFIG_FILE}")
print(" 请修改 config.json 中的路径后重新运行")
sys.exit(1)
def auto_detect_db_dir():
"""从微信本地配置自动检测 db_storage 路径。
with open(CONFIG_FILE) as f:
cfg = json.load(f)
读取 %APPDATA%\\Tencent\\xwechat\\config\\*.ini
找到数据存储根目录然后匹配 xwechat_files\\*\\db_storage
"""
appdata = os.environ.get("APPDATA", "")
config_dir = os.path.join(appdata, "Tencent", "xwechat", "config")
if not os.path.isdir(config_dir):
return None
# 从 ini 文件中找到有效的目录路径
data_roots = []
for ini_file in glob.glob(os.path.join(config_dir, "*.ini")):
try:
# 微信 ini 可能是 utf-8 或 gbk 编码(中文路径)
content = None
for enc in ("utf-8", "gbk"):
try:
with open(ini_file, "r", encoding=enc) as f:
content = f.read(1024).strip()
break
except UnicodeDecodeError:
continue
if not content or any(c in content for c in "\n\r\x00"):
continue
if os.path.isdir(content):
data_roots.append(content)
except OSError:
continue
# 在每个根目录下搜索 xwechat_files\*\db_storage
seen = set()
candidates = []
for root in data_roots:
pattern = os.path.join(root, "xwechat_files", "*", "db_storage")
for match in glob.glob(pattern):
normalized = os.path.normcase(os.path.normpath(match))
if os.path.isdir(match) and normalized not in seen:
seen.add(normalized)
candidates.append(match)
if len(candidates) == 1:
return candidates[0]
if len(candidates) > 1:
# 非交互环境MCP、无 stdin 管道等)直接取第一个
if not sys.stdin.isatty():
return candidates[0]
print("[!] 检测到多个微信数据目录(请选择当前正在运行的微信账号):")
for i, c in enumerate(candidates, 1):
print(f" {i}. {c}")
print(" 0. 跳过,稍后手动配置")
try:
while True:
choice = input("请选择 [0-{}]: ".format(len(candidates))).strip()
if choice == "0":
return None
if choice.isdigit() and 1 <= int(choice) <= len(candidates):
return candidates[int(choice) - 1]
print(" 无效输入,请重新选择")
except (EOFError, KeyboardInterrupt):
print()
return None
return None
def load_config():
cfg = {}
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE) as f:
cfg = json.load(f)
except json.JSONDecodeError:
print(f"[!] {CONFIG_FILE} 格式损坏,将使用默认配置")
cfg = {}
# db_dir 缺失或仍为模板值时,尝试自动检测
db_dir = cfg.get("db_dir", "")
if not db_dir or db_dir == _DEFAULT_TEMPLATE_DIR or "your_wxid" in db_dir:
detected = auto_detect_db_dir()
if detected:
print(f"[+] 自动检测到微信数据目录: {detected}")
# 合并默认值并保存
cfg = {**_DEFAULT, **cfg, "db_dir": detected}
with open(CONFIG_FILE, "w") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
print(f"[+] 已保存到: {CONFIG_FILE}")
else:
if not os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "w") as f:
json.dump(_DEFAULT, f, indent=4)
print(f"[!] 未能自动检测微信数据目录")
print(f" 请手动编辑 {CONFIG_FILE} 中的 db_dir 字段")
print(f" 路径可在 微信设置 → 文件管理 中找到")
sys.exit(1)
# 将相对路径转为绝对路径
base = os.path.dirname(os.path.abspath(__file__))

View File

@ -118,6 +118,7 @@ def main():
with open(KEYS_FILE) as f:
keys = json.load(f)
keys.pop("_db_dir", None)
print(f"\n加载 {len(keys)} 个数据库密钥")
print(f"输出目录: {OUT_DIR}")
os.makedirs(OUT_DIR, exist_ok=True)

View File

@ -33,20 +33,23 @@ class MBI(ctypes.Structure):
("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD),
]
def get_pid():
def get_pids():
"""返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序"""
import subprocess
r = subprocess.run(["tasklist","/FI","IMAGENAME eq Weixin.exe","/FO","CSV","/NH"],
capture_output=True, text=True)
best = (0,0)
pids = []
for line in r.stdout.strip().split('\n'):
if not line.strip(): continue
p = line.strip('"').split('","')
if len(p)>=5:
pid=int(p[1]); mem=int(p[4].replace(',','').replace(' K','').strip() or '0')
if mem>best[1]: best=(pid,mem)
if not best[0]: print("[ERROR] Weixin.exe 未运行"); sys.exit(1)
print(f"[+] Weixin.exe PID={best[0]} ({best[1]//1024}MB)")
return best[0]
pids.append((pid, mem))
if not pids: raise RuntimeError("Weixin.exe 未运行")
pids.sort(key=lambda x: x[1], reverse=True)
for pid, mem in pids:
print(f"[+] Weixin.exe PID={pid} ({mem//1024}MB)")
return pids
def read_mem(h, addr, sz):
buf = ctypes.create_string_buffer(sz)
@ -113,98 +116,107 @@ def main():
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
print(f" salt {salt_hex}: {', '.join(dbs)}")
# 2. 打开进程
pid = get_pid()
h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid)
if not h:
print("[ERROR] 无法打开进程"); sys.exit(1)
# 2. 打开所有微信进程
pids = get_pids()
regions = enum_regions(h)
total_mb = sum(s for _,s in regions)/1024/1024
print(f"[+] 可读内存: {len(regions)} 区域, {total_mb:.0f}MB")
# 3. 搜索所有 x'<hex>' 模式
print(f"\n搜索 x'<hex>' 缓存密钥...")
hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'")
# 结果: salt_hex -> enc_key_hex
key_map = {}
key_map = {} # salt_hex -> enc_key_hex
remaining_salts = set(salt_to_dbs.keys())
all_hex_matches = 0
t0 = time.time()
for reg_idx, (base, size) in enumerate(regions):
data = read_mem(h, base, size)
if not data: continue
for proc_idx, (pid, mem) in enumerate(pids):
h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid)
if not h:
print(f"[WARN] 无法打开进程 PID={pid},跳过")
continue
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base + m.start()
all_hex_matches += 1
hex_len = len(hex_str)
try:
regions = enum_regions(h)
total_bytes = sum(s for _,s in regions)
total_mb = total_bytes/1024/1024
print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)")
if hex_len == 96:
# enc_key(32bytes=64hex) + salt(16bytes=32hex)
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
scanned_bytes = 0
for reg_idx, (base, size) in enumerate(regions):
data = read_mem(h, base, size)
scanned_bytes += size
if not data: continue
if salt_hex in salt_to_dbs and salt_hex not in key_map:
# 验证!
enc_key = bytes.fromhex(enc_key_hex)
# 找到对应的page1
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex}")
print(f" enc_key={enc_key_hex}")
print(f" 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base + m.start()
all_hex_matches += 1
hex_len = len(hex_str)
elif hex_len == 64:
# 只有enc_key, 没有salt - 需要逐个DB试
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db not in key_map:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
dbs = salt_to_dbs[salt_hex_db]
print(f"\n [FOUND] salt={salt_hex_db}")
print(f" enc_key={enc_key_hex}")
print(f" 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
elif hex_len > 96 and hex_len % 2 == 0:
# 可能是 enc_key + hmac_key + salt 或其他格式
# 取前64作为enc_key, 后32作为salt
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
if salt_hex in salt_to_dbs and salt_hex not in key_map:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print(f" enc_key={enc_key_hex}")
print(f" 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len == 64:
if not remaining_salts:
continue
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db in remaining_salts:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
remaining_salts.discard(salt_hex_db)
dbs = salt_to_dbs[salt_hex_db]
print(f"\n [FOUND] salt={salt_hex_db}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
# 进度
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = sum(s for b,s in regions[:reg_idx+1]) / sum(s for _,s in regions) * 100
print(f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s")
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
print(f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s")
finally:
kernel32.CloseHandle(h)
# 所有 salt 都找到了就提前退出
if not remaining_salts:
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
break
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {all_hex_matches} hex模式")
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式")
# 4. 如果有未找到的salt用已找到的key做交叉验证
# (WCDB有时对同一passphrase的不同DB用同一enc_key如果salt相同)
@ -238,6 +250,12 @@ def main():
else:
print(f" MISSING: {rel} (salt={salt_hex})")
if not result:
print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)")
raise RuntimeError("未能从任何微信进程中提取到密钥")
# 写入密钥并记录对应的 db_dir防止切换账号后误复用
result["_db_dir"] = DB_DIR
with open(OUT_FILE, 'w') as f:
json.dump(result, f, indent=2)
print(f"\n密钥保存到: {OUT_FILE}")
@ -248,8 +266,10 @@ def main():
for rel in missing:
print(f" {rel}")
kernel32.CloseHandle(h)
if __name__ == '__main__':
main()
try:
main()
except RuntimeError as e:
print(f"\n[ERROR] {e}")
sys.exit(1)

113
main.py 100644
View File

@ -0,0 +1,113 @@
"""
WeChat Decrypt 一键启动
python main.py # 提取密钥 + 启动 Web UI
python main.py decrypt # 提取密钥 + 解密全部数据库
"""
import json
import os
import sys
import functools
print = functools.partial(print, flush=True)
def check_wechat_running():
"""检查微信是否在运行,返回 True/False"""
from find_all_keys import get_pids
try:
get_pids()
return True
except RuntimeError:
return False
def ensure_keys(keys_file, db_dir):
"""确保密钥文件存在且匹配当前 db_dir否则重新提取"""
if os.path.exists(keys_file):
try:
with open(keys_file) as f:
keys = json.load(f)
except (json.JSONDecodeError, ValueError):
keys = {}
# 检查密钥是否匹配当前 db_dir防止切换账号后误复用旧密钥
saved_dir = keys.pop("_db_dir", None)
if saved_dir and os.path.normcase(os.path.normpath(saved_dir)) != os.path.normcase(os.path.normpath(db_dir)):
print(f"[!] 密钥文件对应的目录已变更,需要重新提取")
print(f" 旧: {saved_dir}")
print(f" 新: {db_dir}")
keys = {}
if keys:
print(f"[+] 已有 {len(keys)} 个数据库密钥")
return
print("[*] 密钥文件不存在,正在从微信进程提取...")
print()
from find_all_keys import main as extract_keys
try:
extract_keys()
except RuntimeError as e:
print(f"\n[!] 密钥提取失败: {e}")
sys.exit(1)
print()
# 提取后再次检查
if not os.path.exists(keys_file):
print("[!] 密钥提取失败")
sys.exit(1)
try:
with open(keys_file) as f:
keys = json.load(f)
except (json.JSONDecodeError, ValueError):
keys = {}
if not keys:
print("[!] 未能提取到任何密钥")
print(" 可能原因:选择了错误的微信数据目录,或微信需要重启")
print(" 请检查 config.json 中的 db_dir 是否与当前登录的微信账号匹配")
sys.exit(1)
def main():
print("=" * 60)
print(" WeChat Decrypt")
print("=" * 60)
print()
# 1. 加载配置(自动检测 db_dir
from config import load_config
cfg = load_config()
# 2. 检查微信进程
if not check_wechat_running():
print("[!] 未检测到微信进程 (Weixin.exe)")
print(" 请先启动微信并登录,然后重新运行")
sys.exit(1)
print("[+] 微信进程运行中")
# 3. 提取密钥
ensure_keys(cfg["keys_file"], cfg["db_dir"])
# 4. 根据子命令执行
cmd = sys.argv[1] if len(sys.argv) > 1 else "web"
if cmd == "decrypt":
print("[*] 开始解密全部数据库...")
print()
from decrypt_db import main as decrypt_all
decrypt_all()
elif cmd == "web":
print("[*] 启动 Web UI...")
print()
from monitor_web import main as start_web
start_web()
else:
print(f"[!] 未知命令: {cmd}")
print()
print("用法:")
print(" python main.py 启动实时消息监听 (Web UI)")
print(" python main.py decrypt 解密全部数据库到 decrypted/")
sys.exit(1)
if __name__ == "__main__":
main()