diff --git a/README.md b/README.md index f509a73..8cc5558 100644 --- a/README.md +++ b/README.md @@ -1,238 +1,240 @@ -# WeChat 4.0 Database Decryptor - -微信 4.0 (Windows) 本地数据库解密工具。从运行中的微信进程内存提取加密密钥,解密所有 SQLCipher 4 加密数据库,并提供实时消息监听。 - -## 更新日志 - -### 2025-03-03 — 富媒体内容 & 组合消息修复 - -- **表情包内联显示**: 自动从 emoticon.db 构建 MD5→CDN 映射,支持自定义表情(NonStore)和商店表情(Store),CDN 下载后本地缓存 -- **富媒体内容解析**: 链接卡片(type 49)、文件、视频号、小程序、引用回复、位置分享等在 Web UI 中完整渲染 -- **文字+图片组合消息不再丢失**: 修复同时发送文字和图片时只显示最后一条的问题(前端去重 key 增加消息类型) -- **隐藏消息检测**: 新增 `_check_hidden_messages` 机制,session.db 只保存最后一条消息摘要,现在会异步查 message DB 找回同一秒内的其他消息 -- **MonitorDBCache 线程安全**: 引入 per-key 锁,防止多线程并发解密同一数据库导致文件损坏 -- **Web UI 改进**: 消息气泡样式优化、群聊发送者显示、图片缩略图点击放大 - -## 原理 - -微信 4.0 使用 SQLCipher 4 加密本地数据库: -- **加密算法**: AES-256-CBC + HMAC-SHA512 -- **KDF**: PBKDF2-HMAC-SHA512, 256,000 iterations -- **页面大小**: 4096 bytes, reserve = 80 (IV 16 + HMAC 64) -- **每个数据库有独立的 salt 和 enc_key** - -WCDB (微信的 SQLCipher 封装) 会在进程内存中缓存派生后的 raw key,格式为 `x'<64hex_enc_key><32hex_salt>'`。本工具通过扫描进程内存中的这种模式,匹配数据库文件的 salt,并通过 HMAC 验证来提取正确的密钥。 - -## 使用方法 - -### 环境要求 - -- Windows 10/11 -- Python 3.10+ -- 微信 4.0 (正在运行) -- 需要管理员权限 (读取进程内存) - -### 安装依赖 - -```bash -pip install pycryptodome -``` - -### 快速开始 - -确保微信正在运行,以**管理员权限**执行: - -```bash -python main.py # 实时消息监听 (Web UI) -python main.py decrypt # 解密全部数据库到 decrypted/ -``` - -程序会自动完成:配置检测 → 密钥提取 → 启动。首次运行会自动检测微信数据目录并生成 `config.json`。 - -如果自动检测失败(例如微信安装在非默认位置),手动创建 `config.json`: -```json -{ - "db_dir": "D:\\xwechat_files\\你的微信ID\\db_storage", - "keys_file": "all_keys.json", - "decrypted_dir": "decrypted", - "wechat_process": "Weixin.exe" -} -``` - -`db_dir` 路径可以在 微信设置 → 文件管理 中找到。 - -### Web UI 说明 - -`python main.py` 启动后打开 http://localhost:5678 查看实时消息流。 - -- 30ms 轮询 WAL 文件变化 (mtime) -- 检测到变化后全量解密 + WAL patch (~70ms) -- SSE 实时推送到浏览器 -- 总延迟约 100ms -- **图片消息内联预览**(支持旧 XOR / V1 / V2 三种 .dat 加密格式) - -### MCP Server (Claude AI 集成) - -将微信数据查询能力接入 [Claude Code](https://claude.ai/claude-code),让 AI 直接读取你的微信消息。 - -```bash -pip install mcp -``` - -注册到 Claude Code: - -```bash -claude mcp add wechat -- python C:\Users\你的用户名\wechat-decrypt\mcp_server.py -``` - -或手动编辑 `~/.claude.json`: - -```json -{ - "mcpServers": { - "wechat": { - "type": "stdio", - "command": "python", - "args": ["C:\\Users\\你的用户名\\wechat-decrypt\\mcp_server.py"] - } - } -} -``` - -注册后在 Claude Code 中即可使用以下工具: - -| Tool | 功能 | -|------|------| -| `get_recent_sessions(limit)` | 最近会话列表(含消息摘要、未读数) | -| `get_chat_history(chat_name, limit)` | 指定聊天的消息记录(支持模糊匹配名字) | -| `search_messages(keyword, limit)` | 全库搜索消息内容 | -| `get_contacts(query, limit)` | 搜索/列出联系人 | -| `get_new_messages()` | 获取自上次调用以来的新消息 | - -前置条件:需要先运行 `python main.py` 或 `python find_all_keys.py` 完成密钥提取。 - -**[查看使用案例 →](USAGE.md)** - -### 图片解密 (V2 格式) - -微信 4.0 (2025-08+) 的 .dat 图片文件使用 AES-128-ECB + XOR 混合加密 (V2 格式)。AES 密钥需要从运行中的微信进程内存中提取: - -```bash -# 1. 在微信中打开查看 2-3 张图片(点击看大图) -# 2. 立即运行密钥提取(持续监控版): -python find_image_key_monitor.py - -# 或单次扫描版: -python find_image_key.py -``` - -密钥会自动保存到 `config.json` 的 `image_aes_key` 字段。之后 `monitor_web.py` 启动时会自动加载密钥,图片消息将显示内联预览。 - -> **注意**: AES 密钥仅在微信查看图片时临时加载到内存中。如果扫描未找到密钥,请先在微信中查看几张图片,然后立即重新运行脚本。 - -#### macOS 图片解密 - -macOS 上使用 C 版工具(通过 Mach VM API + CommonCrypto,性能比 Python 高 100 倍): - -**前置条件:** -- Xcode Command Line Tools: `xcode-select --install` -- 微信需要 ad-hoc 签名:`sudo codesign --force --deep --sign - /Applications/WeChat.app` -- 开发者模式:系统设置 → 隐私与安全 → 开发者模式 → 开启 - -```bash -# 编译 -cc -O3 -o find_image_key find_image_key.c -framework Security -cc -O3 -o decrypt_images decrypt_images.c -framework Security - -# 1. 持续扫描图片密钥(在微信中浏览图片,扫描器自动捕获密钥) -sudo ./find_image_key - -# 2. 批量解密所有 V2 图片 -./decrypt_images -``` - -`find_image_key` 会自动发现所有未解密的 V2 图片 pattern,持续扫描微信进程内存。当用户在微信中浏览图片时捕获密钥,保存到 `image_keys.json`。支持 `--deep` 模式进行逐字节深度扫描。 - -## 文件说明 - -| 文件 | 说明 | -|------|------| -| `main.py` | **一键启动入口** — 自动配置、提取密钥、启动服务 | -| `config.py` | 配置加载器(自动检测微信数据目录) | -| `find_all_keys.py` | 从微信进程内存提取所有数据库密钥 | -| `decrypt_db.py` | 全量解密所有数据库 | -| `mcp_server.py` | MCP Server,让 Claude AI 查询微信数据 | -| `monitor_web.py` | 实时消息监听 (Web UI + SSE + 图片预览) | -| `monitor.py` | 实时消息监听 (命令行) | -| `decode_image.py` | 图片 .dat 文件解密模块 (XOR / V1 / V2) | -| `find_image_key.py` | 从微信进程内存提取图片 AES 密钥 | -| `find_image_key_monitor.py` | 持续监控版密钥提取(推荐) | -| `latency_test.py` | 延迟测量诊断工具 | -| `find_all_keys_macos.c` | macOS 版内存密钥扫描器 (C, Mach VM API) | -| `find_image_key.c` | macOS 版图片密钥扫描器 (C, 持续监控模式) | -| `decrypt_images.c` | macOS 版批量图片解密器 (C, 多密钥支持) | - -## 技术细节 - -### WAL 处理 - -微信使用 SQLite WAL 模式,WAL 文件是**预分配固定大小** (4MB)。检测变化时: -- 不能用文件大小 (永远不变) -- 使用 mtime 检测写入 -- 解密 WAL frame 时需校验 salt 值,跳过旧周期遗留的 frame - -### 图片 .dat 加密格式 - -微信本地图片 (.dat) 有三种加密格式: - -| 格式 | 时期 | Magic | 加密方式 | 密钥来源 | -|------|------|-------|---------|---------| -| 旧 XOR | ~2025-07 | 无 | 单字节 XOR | 自动检测 (对比 magic bytes) | -| V1 | 过渡期 | `07 08 V1 08 07` | AES-ECB + XOR | 固定 key: `cfcd208495d565ef` | -| V2 | 2025-08+ | `07 08 V2 08 07` | AES-128-ECB + XOR | 从进程内存提取 | - -V2 文件结构: `[6B signature] [4B aes_size LE] [4B xor_size LE] [1B padding]` + `[AES-ECB encrypted] [raw unencrypted] [XOR encrypted]` - -### 数据库结构 - -解密后包含约 26 个数据库: -- `session/session.db` - 会话列表 (最新消息摘要) -- `message/message_*.db` - 聊天记录 -- `contact/contact.db` - 联系人 -- `media_*/media_*.db` - 媒体文件索引 -- 其他: head_image, favorite, sns, emoticon 等 - -## macOS 数据库密钥扫描 (WeChat 4.x) - -macOS 版微信 4.x 使用 SQLCipher 4 加密本地数据库,密钥格式为 `x'<64hex_key><32hex_salt>'`。C 版扫描器通过 Mach VM API 扫描微信进程内存提取密钥。 - -### 前置条件 - -- macOS (Apple Silicon / Intel) -- WeChat 4.x (macOS 版) -- Xcode Command Line Tools: `xcode-select --install` -- 微信需要 ad-hoc 签名(或安装了防撤回补丁): - `sudo codesign --force --deep --sign - /Applications/WeChat.app` - -### 编译和使用 - -```bash -# 编译 -cc -O2 -o find_all_keys_macos find_all_keys_macos.c -framework Foundation - -# 运行(自动查找微信进程、扫描内存、匹配 DB salt) -sudo ./find_all_keys_macos - -# 或指定 PID -sudo ./find_all_keys_macos -``` - -输出 `all_keys.json`,格式兼容 `decrypt_db.py`,可直接用于解密: - -```bash -python3 decrypt_db.py -``` - -## 免责声明 - -本工具仅用于学习和研究目的,用于解密**自己的**微信数据。请遵守相关法律法规,不要用于未经授权的数据访问。 +# WeChat 4.x Database Decryptor + +微信 4.0 (Windows、MacOS、Linux) 本地数据库解密工具。从运行中的微信进程内存提取加密密钥,解密所有 SQLCipher 4 加密数据库,并提供实时消息监听。 + +## 更新日志 + +### 2025-03-03 — 富媒体内容 & 组合消息修复 + +- **表情包内联显示**: 自动从 emoticon.db 构建 MD5→CDN 映射,支持自定义表情(NonStore)和商店表情(Store),CDN 下载后本地缓存 +- **富媒体内容解析**: 链接卡片(type 49)、文件、视频号、小程序、引用回复、位置分享等在 Web UI 中完整渲染 +- **文字+图片组合消息不再丢失**: 修复同时发送文字和图片时只显示最后一条的问题(前端去重 key 增加消息类型) +- **隐藏消息检测**: 新增 `_check_hidden_messages` 机制,session.db 只保存最后一条消息摘要,现在会异步查 message DB 找回同一秒内的其他消息 +- **MonitorDBCache 线程安全**: 引入 per-key 锁,防止多线程并发解密同一数据库导致文件损坏 +- **Web UI 改进**: 消息气泡样式优化、群聊发送者显示、图片缩略图点击放大 + +## 原理 + +微信 4.0 使用 SQLCipher 4 加密本地数据库: +- **加密算法**: AES-256-CBC + HMAC-SHA512 +- **KDF**: PBKDF2-HMAC-SHA512, 256,000 iterations +- **页面大小**: 4096 bytes, reserve = 80 (IV 16 + HMAC 64) +- **每个数据库有独立的 salt 和 enc_key** + +WCDB (微信的 SQLCipher 封装) 会在进程内存中缓存派生后的 raw key,格式为 `x'<64hex_enc_key><32hex_salt>'`。三个平台(Windows / Linux / macOS)均可通过扫描进程内存匹配此模式,再通过 HMAC 校验 page 1 确认密钥正确性。 + +## 使用方法 + +### 环境要求 + +- Python 3.10+ +- 微信 4.x +- `pip install pycryptodome` + +Windows: + +- Windows 10/11 +- 微信正在运行 +- 需要管理员权限(读取进程内存) + +Linux: + +- 64-bit Linux +- 需要 root 权限或 `CAP_SYS_PTRACE`(读取 `/proc//mem`) +- `db_dir` 默认类似 `~/Documents/xwechat_files//db_storage` + +### 安装依赖 + +### 快速开始 + +Windows: + +```bash +python main.py +python main.py decrypt +``` + +Linux: + +```bash +python3 main.py decrypt +``` + +程序会自动完成:配置检测 → 内存扫描提取密钥 → 解密。首次运行会自动检测微信数据目录并生成 `config.json`。微信只要在运行中即可,无需重启或重新登录。 + +如果自动检测失败(例如微信安装在非默认位置),手动创建 `config.json`: +```json +{ + "db_dir": "D:\\xwechat_files\\你的微信ID\\db_storage", + "keys_file": "all_keys.json", + "decrypted_dir": "decrypted", + "wechat_process": "Weixin.exe" +} +``` + +Linux 版 `config.json` 示例: + +```json +{ + "db_dir": "/home/yourname/Documents/xwechat_files/your_wxid/db_storage", + "keys_file": "all_keys.json", + "decrypted_dir": "decrypted", + "wechat_process": "wechat" +} +``` + +`db_dir` 路径:Windows 可在微信设置 → 文件管理中找到;Linux 默认在 `~/Documents/xwechat_files//db_storage`。 + +### Web UI 说明 + +`python main.py` 启动后打开 http://localhost:5678 查看实时消息流。 + +- 30ms 轮询 WAL 文件变化 (mtime) +- 检测到变化后全量解密 + WAL patch (~70ms) +- SSE 实时推送到浏览器 +- 总延迟约 100ms +- **图片消息内联预览**(支持旧 XOR / V1 / V2 三种 .dat 加密格式) + +### MCP Server (Claude AI 集成) + +将微信数据查询能力接入 [Claude Code](https://claude.ai/claude-code),让 AI 直接读取你的微信消息。 + +```bash +pip install mcp +``` + +注册到 Claude Code: + +```bash +claude mcp add wechat -- python C:\Users\你的用户名\wechat-decrypt\mcp_server.py +``` + +或手动编辑 `~/.claude.json`: + +```json +{ + "mcpServers": { + "wechat": { + "type": "stdio", + "command": "python", + "args": ["C:\\Users\\你的用户名\\wechat-decrypt\\mcp_server.py"] + } + } +} +``` + +注册后在 Claude Code 中即可使用以下工具: + +| Tool | 功能 | +|------|------| +| `get_recent_sessions(limit)` | 最近会话列表(含消息摘要、未读数) | +| `get_chat_history(chat_name, limit)` | 指定聊天的消息记录(支持模糊匹配名字) | +| `search_messages(keyword, limit)` | 全库搜索消息内容 | +| `get_contacts(query, limit)` | 搜索/列出联系人 | +| `get_new_messages()` | 获取自上次调用以来的新消息 | + +前置条件:需要先运行 `python main.py` 或 `python find_all_keys.py` 完成密钥提取。 + +**[查看使用案例 →](USAGE.md)** + +### 图片解密 (V2 格式) + +微信 4.0 (2025-08+) 的 .dat 图片文件使用 AES-128-ECB + XOR 混合加密 (V2 格式)。AES 密钥需要从运行中的微信进程内存中提取: + +```bash +# 1. 在微信中打开查看 2-3 张图片(点击看大图) +# 2. 立即运行密钥提取(持续监控版): +python find_image_key_monitor.py + +# 或单次扫描版: +python find_image_key.py +``` + +密钥会自动保存到 `config.json` 的 `image_aes_key` 字段。之后 `monitor_web.py` 启动时会自动加载密钥,图片消息将显示内联预览。 + +> **注意**: AES 密钥仅在微信查看图片时临时加载到内存中。如果扫描未找到密钥,请先在微信中查看几张图片,然后立即重新运行脚本。 + +## 文件说明 + +| 文件 | 说明 | +|------|------| +| `main.py` | **一键启动入口** — 自动配置、提取密钥、启动服务 | +| `config.py` | 配置加载器(自动检测微信数据目录) | +| `find_all_keys.py` | 平台分发入口(Windows / Linux) | +| `find_all_keys_windows.py` | Windows 版内存扫描提 key | +| `find_all_keys_linux.py` | Linux 版内存扫描提 key | +| `decrypt_db.py` | 全量解密所有数据库 | +| `mcp_server.py` | MCP Server,让 Claude AI 查询微信数据 | +| `monitor_web.py` | 实时消息监听 (Web UI + SSE + 图片预览) | +| `monitor.py` | 实时消息监听 (命令行) | +| `decode_image.py` | 图片 .dat 文件解密模块 (XOR / V1 / V2) | +| `find_image_key.py` | 从微信进程内存提取图片 AES 密钥 | +| `find_image_key_monitor.py` | 持续监控版密钥提取(推荐) | +| `latency_test.py` | 延迟测量诊断工具 | +| `find_all_keys_macos.c` | macOS 版内存密钥扫描器 (C, Mach VM API) | + +## 技术细节 + +### WAL 处理 + +微信使用 SQLite WAL 模式,WAL 文件是**预分配固定大小** (4MB)。检测变化时: +- 不能用文件大小 (永远不变) +- 使用 mtime 检测写入 +- 解密 WAL frame 时需校验 salt 值,跳过旧周期遗留的 frame + +### 图片 .dat 加密格式 + +微信本地图片 (.dat) 有三种加密格式: + +| 格式 | 时期 | Magic | 加密方式 | 密钥来源 | +|------|------|-------|---------|---------| +| 旧 XOR | ~2025-07 | 无 | 单字节 XOR | 自动检测 (对比 magic bytes) | +| V1 | 过渡期 | `07 08 V1 08 07` | AES-ECB + XOR | 固定 key: `cfcd208495d565ef` | +| V2 | 2025-08+ | `07 08 V2 08 07` | AES-128-ECB + XOR | 从进程内存提取 | + +V2 文件结构: `[6B signature] [4B aes_size LE] [4B xor_size LE] [1B padding]` + `[AES-ECB encrypted] [raw unencrypted] [XOR encrypted]` + +### 数据库结构 + +解密后包含约 26 个数据库: +- `session/session.db` - 会话列表 (最新消息摘要) +- `message/message_*.db` - 聊天记录 +- `contact/contact.db` - 联系人 +- `media_*/media_*.db` - 媒体文件索引 +- 其他: head_image, favorite, sns, emoticon 等 + +## macOS 数据库密钥扫描 (WeChat 4.x) + + +macOS 版微信 4.x 使用 SQLCipher 4 加密本地数据库,密钥格式为 `x'<64hex_key><32hex_salt>'`。C 版扫描器通过 Mach VM API 扫描微信进程内存提取密钥。 + +### 前置条件 + +- macOS (Apple Silicon / Intel) +- WeChat 4.x (macOS 版) +- Xcode Command Line Tools: `xcode-select --install` +- 微信需要 ad-hoc 签名(或安装了防撤回补丁): + `sudo codesign --force --deep --sign - /Applications/WeChat.app` + +### 编译和使用 + +```bash +# 编译 +cc -O2 -o find_all_keys_macos find_all_keys_macos.c -framework Foundation + +# 运行(自动查找微信进程、扫描内存、匹配 DB salt) +sudo ./find_all_keys_macos + +# 或指定 PID +sudo ./find_all_keys_macos +``` + +输出 `all_keys.json`,格式兼容 `decrypt_db.py`,可直接用于解密: + +```bash +python3 decrypt_db.py +``` + +## 免责声明 + +本工具仅用于学习和研究目的,用于解密**自己的**微信数据。请遵守相关法律法规,不要用于未经授权的数据访问。 diff --git a/config.example.json b/config.example.json index 3670993..abbd8fa 100644 --- a/config.example.json +++ b/config.example.json @@ -1,6 +1,6 @@ -{ - "db_dir": "D:\\xwechat_files\\your_wxid\\db_storage", - "keys_file": "all_keys.json", - "decrypted_dir": "decrypted", - "wechat_process": "Weixin.exe" -} +{ + "db_dir": "D:\\xwechat_files\\your_wxid\\db_storage", + "keys_file": "all_keys.json", + "decrypted_dir": "decrypted", + "wechat_process": "Weixin.exe" +} diff --git a/config.py b/config.py index 432273c..4d153a9 100644 --- a/config.py +++ b/config.py @@ -1,138 +1,201 @@ -""" -配置加载器 - 从 config.json 读取路径配置 -首次运行时自动检测微信数据目录,检测失败则提示手动配置 -""" -import glob -import json -import os -import sys - -CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json") - -_DEFAULT_TEMPLATE_DIR = r"D:\xwechat_files\your_wxid\db_storage" - -_DEFAULT = { - "db_dir": _DEFAULT_TEMPLATE_DIR, - "keys_file": "all_keys.json", - "decrypted_dir": "decrypted", - "decoded_image_dir": "decoded_images", - "wechat_process": "Weixin.exe", -} - - -def auto_detect_db_dir(): - """从微信本地配置自动检测 db_storage 路径。 - - 读取 %APPDATA%\\Tencent\\xwechat\\config\\*.ini, - 找到数据存储根目录,然后匹配 xwechat_files\\*\\db_storage。 - """ - appdata = os.environ.get("APPDATA", "") - config_dir = os.path.join(appdata, "Tencent", "xwechat", "config") - if not os.path.isdir(config_dir): - return None - - # 从 ini 文件中找到有效的目录路径 - data_roots = [] - for ini_file in glob.glob(os.path.join(config_dir, "*.ini")): - try: - # 微信 ini 可能是 utf-8 或 gbk 编码(中文路径) - content = None - for enc in ("utf-8", "gbk"): - try: - with open(ini_file, "r", encoding=enc) as f: - content = f.read(1024).strip() - break - except UnicodeDecodeError: - continue - if not content or any(c in content for c in "\n\r\x00"): - continue - if os.path.isdir(content): - data_roots.append(content) - except OSError: - continue - - # 在每个根目录下搜索 xwechat_files\*\db_storage - seen = set() - candidates = [] - for root in data_roots: - pattern = os.path.join(root, "xwechat_files", "*", "db_storage") - for match in glob.glob(pattern): - normalized = os.path.normcase(os.path.normpath(match)) - if os.path.isdir(match) and normalized not in seen: - seen.add(normalized) - candidates.append(match) - - if len(candidates) == 1: - return candidates[0] - if len(candidates) > 1: - # 非交互环境(MCP、无 stdin 管道等)直接取第一个 - if not sys.stdin.isatty(): - return candidates[0] - print("[!] 检测到多个微信数据目录(请选择当前正在运行的微信账号):") - for i, c in enumerate(candidates, 1): - print(f" {i}. {c}") - print(" 0. 跳过,稍后手动配置") - try: - while True: - choice = input("请选择 [0-{}]: ".format(len(candidates))).strip() - if choice == "0": - return None - if choice.isdigit() and 1 <= int(choice) <= len(candidates): - return candidates[int(choice) - 1] - print(" 无效输入,请重新选择") - except (EOFError, KeyboardInterrupt): - print() - return None - return None - - -def load_config(): - cfg = {} - if os.path.exists(CONFIG_FILE): - try: - with open(CONFIG_FILE) as f: - cfg = json.load(f) - except json.JSONDecodeError: - print(f"[!] {CONFIG_FILE} 格式损坏,将使用默认配置") - cfg = {} - - # db_dir 缺失或仍为模板值时,尝试自动检测 - db_dir = cfg.get("db_dir", "") - if not db_dir or db_dir == _DEFAULT_TEMPLATE_DIR or "your_wxid" in db_dir: - detected = auto_detect_db_dir() - if detected: - print(f"[+] 自动检测到微信数据目录: {detected}") - # 合并默认值并保存 - cfg = {**_DEFAULT, **cfg, "db_dir": detected} - with open(CONFIG_FILE, "w") as f: - json.dump(cfg, f, indent=4, ensure_ascii=False) - print(f"[+] 已保存到: {CONFIG_FILE}") - else: - if not os.path.exists(CONFIG_FILE): - with open(CONFIG_FILE, "w") as f: - json.dump(_DEFAULT, f, indent=4) - print(f"[!] 未能自动检测微信数据目录") - print(f" 请手动编辑 {CONFIG_FILE} 中的 db_dir 字段") - print(f" 路径可在 微信设置 → 文件管理 中找到") - sys.exit(1) - - # 将相对路径转为绝对路径 - base = os.path.dirname(os.path.abspath(__file__)) - for key in ("keys_file", "decrypted_dir", "decoded_image_dir"): - if key in cfg and not os.path.isabs(cfg[key]): - cfg[key] = os.path.join(base, cfg[key]) - - # 自动推导微信数据根目录(db_dir 的上级目录) - # db_dir 格式: D:\xwechat_files\\db_storage - # base_dir 格式: D:\xwechat_files\ - db_dir = cfg.get("db_dir", "") - if db_dir and os.path.basename(db_dir) == "db_storage": - cfg["wechat_base_dir"] = os.path.dirname(db_dir) - else: - cfg["wechat_base_dir"] = db_dir - - # decoded_image_dir 默认值 - if "decoded_image_dir" not in cfg: - cfg["decoded_image_dir"] = os.path.join(base, "decoded_images") - - return cfg +""" +配置加载器 - 从 config.json 读取路径配置 +首次运行时自动检测微信数据目录,检测失败则提示手动配置 +""" +import glob +import json +import os +import platform +import sys + +CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json") + +_SYSTEM = platform.system().lower() +_DEFAULT_TEMPLATE_DIR = ( + os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage") + if _SYSTEM == "linux" + else r"D:\xwechat_files\your_wxid\db_storage" +) + +_DEFAULT = { + "db_dir": _DEFAULT_TEMPLATE_DIR, + "keys_file": "all_keys.json", + "decrypted_dir": "decrypted", + "decoded_image_dir": "decoded_images", + "wechat_process": "wechat" if _SYSTEM == "linux" else "Weixin.exe", +} + + +def _choose_candidate(candidates): + """在多个候选目录中选择一个。""" + if len(candidates) == 1: + return candidates[0] + if len(candidates) > 1: + if not sys.stdin.isatty(): + return candidates[0] + print("[!] 检测到多个微信数据目录(请选择当前正在运行的微信账号):") + for i, c in enumerate(candidates, 1): + print(f" {i}. {c}") + print(" 0. 跳过,稍后手动配置") + try: + while True: + choice = input("请选择 [0-{}]: ".format(len(candidates))).strip() + if choice == "0": + return None + if choice.isdigit() and 1 <= int(choice) <= len(candidates): + return candidates[int(choice) - 1] + print(" 无效输入,请重新选择") + except (EOFError, KeyboardInterrupt): + print() + return None + return None + + +def _auto_detect_db_dir_windows(): + """从微信本地配置自动检测 Windows db_storage 路径。 + + 读取 %APPDATA%\\Tencent\\xwechat\\config\\*.ini, + 找到数据存储根目录,然后匹配 xwechat_files\\*\\db_storage。 + """ + appdata = os.environ.get("APPDATA", "") + config_dir = os.path.join(appdata, "Tencent", "xwechat", "config") + if not os.path.isdir(config_dir): + return None + + # 从 ini 文件中找到有效的目录路径 + data_roots = [] + for ini_file in glob.glob(os.path.join(config_dir, "*.ini")): + try: + # 微信 ini 可能是 utf-8 或 gbk 编码(中文路径) + content = None + for enc in ("utf-8", "gbk"): + try: + with open(ini_file, "r", encoding=enc) as f: + content = f.read(1024).strip() + break + except UnicodeDecodeError: + continue + if not content or any(c in content for c in "\n\r\x00"): + continue + if os.path.isdir(content): + data_roots.append(content) + except OSError: + continue + + # 在每个根目录下搜索 xwechat_files\*\db_storage + seen = set() + candidates = [] + for root in data_roots: + pattern = os.path.join(root, "xwechat_files", "*", "db_storage") + for match in glob.glob(pattern): + normalized = os.path.normcase(os.path.normpath(match)) + if os.path.isdir(match) and normalized not in seen: + seen.add(normalized) + candidates.append(match) + + return _choose_candidate(candidates) + + +def _auto_detect_db_dir_linux(): + """自动检测 Linux 微信 db_storage 路径。""" + seen = set() + candidates = [] + search_roots = { + os.path.expanduser("~/Documents/xwechat_files"), + } + + if os.path.isdir("/home"): + for entry in os.listdir("/home"): + search_roots.add(os.path.join("/home", entry, "Documents", "xwechat_files")) + + for root in search_roots: + if not os.path.isdir(root): + continue + pattern = os.path.join(root, "*", "db_storage") + for match in glob.glob(pattern): + normalized = os.path.normcase(os.path.normpath(match)) + if os.path.isdir(match) and normalized not in seen: + seen.add(normalized) + candidates.append(match) + + old_path = os.path.expanduser("~/.local/share/weixin/data/db_storage") + if os.path.isdir(old_path): + normalized = os.path.normcase(os.path.normpath(old_path)) + if normalized not in seen: + candidates.append(old_path) + + # Linux 优先使用最近活跃账号:按 message 目录 mtime 降序 + def _mtime(path): + msg_dir = os.path.join(path, "message") + target = msg_dir if os.path.isdir(msg_dir) else path + try: + return os.path.getmtime(target) + except OSError: + return 0 + + candidates.sort(key=_mtime, reverse=True) + return _choose_candidate(candidates) + + +def auto_detect_db_dir(): + if _SYSTEM == "windows": + return _auto_detect_db_dir_windows() + if _SYSTEM == "linux": + return _auto_detect_db_dir_linux() + return None + + +def load_config(): + cfg = {} + if os.path.exists(CONFIG_FILE): + try: + with open(CONFIG_FILE) as f: + cfg = json.load(f) + except json.JSONDecodeError: + print(f"[!] {CONFIG_FILE} 格式损坏,将使用默认配置") + cfg = {} + cfg = {**_DEFAULT, **cfg} + + # db_dir 缺失或仍为模板值时,尝试自动检测 + db_dir = cfg.get("db_dir", "") + if not db_dir or db_dir == _DEFAULT_TEMPLATE_DIR or "your_wxid" in db_dir: + detected = auto_detect_db_dir() + if detected: + print(f"[+] 自动检测到微信数据目录: {detected}") + # 合并默认值并保存 + cfg = {**_DEFAULT, **cfg, "db_dir": detected} + with open(CONFIG_FILE, "w") as f: + json.dump(cfg, f, indent=4, ensure_ascii=False) + print(f"[+] 已保存到: {CONFIG_FILE}") + else: + if not os.path.exists(CONFIG_FILE): + with open(CONFIG_FILE, "w") as f: + json.dump(_DEFAULT, f, indent=4, ensure_ascii=False) + print(f"[!] 未能自动检测微信数据目录") + print(f" 请手动编辑 {CONFIG_FILE} 中的 db_dir 字段") + if _SYSTEM == "linux": + print(" Linux 默认路径类似: ~/Documents/xwechat_files//db_storage") + else: + print(f" 路径可在 微信设置 → 文件管理 中找到") + sys.exit(1) + + # 将相对路径转为绝对路径 + base = os.path.dirname(os.path.abspath(__file__)) + for key in ("keys_file", "decrypted_dir", "decoded_image_dir"): + if key in cfg and not os.path.isabs(cfg[key]): + cfg[key] = os.path.join(base, cfg[key]) + + # 自动推导微信数据根目录(db_dir 的上级目录) + # db_dir 格式: D:\xwechat_files\\db_storage + # base_dir 格式: D:\xwechat_files\ + db_dir = cfg.get("db_dir", "") + if db_dir and os.path.basename(db_dir) == "db_storage": + cfg["wechat_base_dir"] = os.path.dirname(db_dir) + else: + cfg["wechat_base_dir"] = db_dir + + # decoded_image_dir 默认值 + if "decoded_image_dir" not in cfg: + cfg["decoded_image_dir"] = os.path.join(base, "decoded_images") + + return cfg diff --git a/decrypt_db.py b/decrypt_db.py index 430fffd..bd8fa9f 100644 --- a/decrypt_db.py +++ b/decrypt_db.py @@ -7,7 +7,7 @@ WeChat 4.0 数据库解密器 """ import hashlib, struct, os, sys, json import hmac as hmac_mod -from Crypto.Cipher import AES +from Crypto.Cipher import AES import functools print = functools.partial(print, flush=True) @@ -20,11 +20,12 @@ HMAC_SZ = 64 RESERVE_SZ = 80 # IV(16) + HMAC(64) SQLITE_HDR = b'SQLite format 3\x00' -from config import load_config -_cfg = load_config() -DB_DIR = _cfg["db_dir"] -OUT_DIR = _cfg["decrypted_dir"] -KEYS_FILE = _cfg["keys_file"] +from config import load_config +from key_utils import get_key_info, strip_key_metadata +_cfg = load_config() +DB_DIR = _cfg["db_dir"] +OUT_DIR = _cfg["decrypted_dir"] +KEYS_FILE = _cfg["keys_file"] def derive_mac_key(enc_key, salt): @@ -115,13 +116,13 @@ def main(): print("请先运行 find_all_keys.py") sys.exit(1) - with open(KEYS_FILE) as f: - keys = json.load(f) - - keys.pop("_db_dir", None) - print(f"\n加载 {len(keys)} 个数据库密钥") - print(f"输出目录: {OUT_DIR}") - os.makedirs(OUT_DIR, exist_ok=True) + with open(KEYS_FILE) as f: + keys = json.load(f) + + keys = strip_key_metadata(keys) + print(f"\n加载 {len(keys)} 个数据库密钥") + print(f"输出目录: {OUT_DIR}") + os.makedirs(OUT_DIR, exist_ok=True) # 收集所有DB文件 db_files = [] @@ -129,7 +130,7 @@ def main(): for f in files: if f.endswith('.db') and not f.endswith('-wal') and not f.endswith('-shm'): path = os.path.join(root, f) - rel = os.path.relpath(path, DB_DIR).replace('\\', '/') + rel = os.path.relpath(path, DB_DIR) sz = os.path.getsize(path) db_files.append((rel, path, sz)) @@ -141,16 +142,15 @@ def main(): failed = 0 total_bytes = 0 - for rel, path, sz in db_files: - # 统一用正斜杠查找key - rel_key = rel.replace('\\', '/') - if rel_key not in keys: - print(f"SKIP: {rel} (无密钥)") - failed += 1 - continue - - enc_key = bytes.fromhex(keys[rel_key]["enc_key"]) - out_path = os.path.join(OUT_DIR, rel) + for rel, path, sz in db_files: + key_info = get_key_info(keys, rel) + if not key_info: + print(f"SKIP: {rel} (无密钥)") + failed += 1 + continue + + enc_key = bytes.fromhex(key_info["enc_key"]) + out_path = os.path.join(OUT_DIR, rel) print(f"解密: {rel} ({sz/1024/1024:.1f}MB) ...", end=" ") diff --git a/find_all_keys.py b/find_all_keys.py index ff7f70a..c466740 100644 --- a/find_all_keys.py +++ b/find_all_keys.py @@ -1,275 +1,29 @@ -""" -从微信进程内存中提取所有数据库的缓存raw key - -WCDB为每个DB缓存: x'<64hex_enc_key><32hex_salt>' -salt嵌在hex字符串中,可以直接匹配DB文件的salt -""" -import ctypes -import ctypes.wintypes as wt -import struct, os, sys, hashlib, time, re, json -import hmac as hmac_mod -from Crypto.Cipher import AES - -import functools -print = functools.partial(print, flush=True) - -kernel32 = ctypes.windll.kernel32 -MEM_COMMIT = 0x1000 -READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80} -PAGE_SZ = 4096 -KEY_SZ = 32 -SALT_SZ = 16 - -from config import load_config -_cfg = load_config() -DB_DIR = _cfg["db_dir"] -OUT_FILE = _cfg["keys_file"] - -class MBI(ctypes.Structure): - _fields_ = [ - ("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64), - ("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD), - ("RegionSize", ctypes.c_uint64), ("State", wt.DWORD), - ("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD), - ] - -def get_pids(): - """返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序""" - import subprocess - r = subprocess.run(["tasklist","/FI","IMAGENAME eq Weixin.exe","/FO","CSV","/NH"], - capture_output=True, text=True) - pids = [] - for line in r.stdout.strip().split('\n'): - if not line.strip(): continue - p = line.strip('"').split('","') - if len(p)>=5: - pid=int(p[1]); mem=int(p[4].replace(',','').replace(' K','').strip() or '0') - pids.append((pid, mem)) - if not pids: raise RuntimeError("Weixin.exe 未运行") - pids.sort(key=lambda x: x[1], reverse=True) - for pid, mem in pids: - print(f"[+] Weixin.exe PID={pid} ({mem//1024}MB)") - return pids - -def read_mem(h, addr, sz): - buf = ctypes.create_string_buffer(sz) - n = ctypes.c_size_t(0) - if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)): - return buf.raw[:n.value] - return None - -def enum_regions(h): - regs = [] - addr = 0 - mbi = MBI() - while addr < 0x7FFFFFFFFFFF: - if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi))==0: break - if mbi.State==MEM_COMMIT and mbi.Protect in READABLE and 0 enc_key_hex - remaining_salts = set(salt_to_dbs.keys()) - all_hex_matches = 0 - t0 = time.time() - - for proc_idx, (pid, mem) in enumerate(pids): - h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid) - if not h: - print(f"[WARN] 无法打开进程 PID={pid},跳过") - continue - - try: - regions = enum_regions(h) - total_bytes = sum(s for _,s in regions) - total_mb = total_bytes/1024/1024 - print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)") - - scanned_bytes = 0 - for reg_idx, (base, size) in enumerate(regions): - data = read_mem(h, base, size) - scanned_bytes += size - if not data: continue - - for m in hex_re.finditer(data): - hex_str = m.group(1).decode() - addr = base + m.start() - all_hex_matches += 1 - hex_len = len(hex_str) - - if hex_len == 96: - enc_key_hex = hex_str[:64] - salt_hex = hex_str[64:] - - if salt_hex in remaining_salts: - enc_key = bytes.fromhex(enc_key_hex) - for rel, path, sz, s, page1 in db_files: - if s == salt_hex: - if verify_key_for_db(enc_key, page1): - key_map[salt_hex] = enc_key_hex - remaining_salts.discard(salt_hex) - dbs = salt_to_dbs[salt_hex] - print(f"\n [FOUND] salt={salt_hex}") - print(f" enc_key={enc_key_hex}") - print(f" PID={pid} 地址: 0x{addr:016X}") - print(f" 数据库: {', '.join(dbs)}") - break - - elif hex_len == 64: - if not remaining_salts: - continue - enc_key_hex = hex_str - enc_key = bytes.fromhex(enc_key_hex) - for rel, path, sz, salt_hex_db, page1 in db_files: - if salt_hex_db in remaining_salts: - if verify_key_for_db(enc_key, page1): - key_map[salt_hex_db] = enc_key_hex - remaining_salts.discard(salt_hex_db) - dbs = salt_to_dbs[salt_hex_db] - print(f"\n [FOUND] salt={salt_hex_db}") - print(f" enc_key={enc_key_hex}") - print(f" PID={pid} 地址: 0x{addr:016X}") - print(f" 数据库: {', '.join(dbs)}") - break - - elif hex_len > 96 and hex_len % 2 == 0: - enc_key_hex = hex_str[:64] - salt_hex = hex_str[-32:] - - if salt_hex in remaining_salts: - enc_key = bytes.fromhex(enc_key_hex) - for rel, path, sz, s, page1 in db_files: - if s == salt_hex: - if verify_key_for_db(enc_key, page1): - key_map[salt_hex] = enc_key_hex - remaining_salts.discard(salt_hex) - dbs = salt_to_dbs[salt_hex] - print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})") - print(f" enc_key={enc_key_hex}") - print(f" PID={pid} 地址: 0x{addr:016X}") - print(f" 数据库: {', '.join(dbs)}") - break - - if (reg_idx + 1) % 200 == 0: - elapsed = time.time() - t0 - progress = scanned_bytes / total_bytes * 100 if total_bytes else 100 - print(f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, " - f"{all_hex_matches} hex patterns, {elapsed:.1f}s") - finally: - kernel32.CloseHandle(h) - - # 所有 salt 都找到了就提前退出 - if not remaining_salts: - print(f"\n[+] 所有密钥已找到,跳过剩余进程") - break - - elapsed = time.time() - t0 - print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式") - - # 4. 如果有未找到的salt,用已找到的key做交叉验证 - # (WCDB有时对同一passphrase的不同DB用同一enc_key,如果salt相同) - missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys()) - if missing_salts and key_map: - print(f"\n还有 {len(missing_salts)} 个salt未匹配,尝试交叉验证...") - for salt_hex in list(missing_salts): - for rel, path, sz, s, page1 in db_files: - if s == salt_hex: - for known_salt, known_key_hex in key_map.items(): - enc_key = bytes.fromhex(known_key_hex) - if verify_key_for_db(enc_key, page1): - key_map[salt_hex] = known_key_hex - print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}") - missing_salts.discard(salt_hex) - break - - # 5. 输出结果 - print(f"\n{'='*60}") - print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥") - - result = {} - for rel, path, sz, salt_hex, page1 in db_files: - if salt_hex in key_map: - result[rel] = { - "enc_key": key_map[salt_hex], - "salt": salt_hex, - "size_mb": round(sz/1024/1024, 1) - } - print(f" OK: {rel} ({sz/1024/1024:.1f}MB)") - else: - print(f" MISSING: {rel} (salt={salt_hex})") - - if not result: - print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)") - raise RuntimeError("未能从任何微信进程中提取到密钥") - - # 写入密钥并记录对应的 db_dir,防止切换账号后误复用 - result["_db_dir"] = DB_DIR - with open(OUT_FILE, 'w') as f: - json.dump(result, f, indent=2) - print(f"\n密钥保存到: {OUT_FILE}") - - missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map] - if missing: - print(f"\n未找到密钥的数据库:") - for rel in missing: - print(f" {rel}") - - -if __name__ == '__main__': - try: - main() - except RuntimeError as e: - print(f"\n[ERROR] {e}") - sys.exit(1) +import platform +import sys + + +def _load_impl(): + system = platform.system().lower() + if system == "windows": + import find_all_keys_windows as impl + return impl + if system == "linux": + import find_all_keys_linux as impl + return impl + raise RuntimeError(f"当前平台暂不支持通过 find_all_keys.py 提取密钥: {platform.system()}") + + +def get_pids(): + return _load_impl().get_pids() + + +def main(): + return _load_impl().main() + + +if __name__ == "__main__": + try: + main() + except RuntimeError as exc: + print(f"\n[ERROR] {exc}") + sys.exit(1) diff --git a/find_all_keys_linux.py b/find_all_keys_linux.py new file mode 100644 index 0000000..aac4784 --- /dev/null +++ b/find_all_keys_linux.py @@ -0,0 +1,297 @@ +""" +Linux 版微信数据库密钥提取 + +原理: 与 Windows/macOS 相同 — 扫描微信进程内存,查找 +WCDB 缓存的 x'<64hex_enc_key><32hex_salt>' 模式, +通过匹配数据库 salt + HMAC 校验确认密钥。 + +读取方式: /proc//maps + /proc//mem +权限要求: root 或 CAP_SYS_PTRACE +""" +import functools +import hashlib +import hmac as hmac_mod +import json +import os +import re +import struct +import sys +import time + +print = functools.partial(print, flush=True) + +PAGE_SZ = 4096 +KEY_SZ = 32 +SALT_SZ = 16 + +from config import load_config +_cfg = load_config() +DB_DIR = _cfg["db_dir"] +OUT_FILE = _cfg["keys_file"] + + +def _safe_readlink(path): + try: + return os.path.realpath(os.readlink(path)) + except OSError: + return "" + + +def get_pids(): + """返回所有疑似微信主进程的 (pid, rss_kb) 列表,按内存降序。""" + pids = [] + for pid_str in os.listdir("/proc"): + if not pid_str.isdigit(): + continue + pid = int(pid_str) + try: + with open(f"/proc/{pid}/comm") as f: + comm = f.read().strip() + with open(f"/proc/{pid}/statm") as f: + rss_pages = int(f.read().split()[1]) + rss_kb = rss_pages * 4 + exe_name = os.path.basename(_safe_readlink(f"/proc/{pid}/exe")) or comm + haystack = " ".join((comm, exe_name)).lower() + if "wechat" not in haystack and "weixin" not in haystack: + continue + pids.append((pid, rss_kb)) + except (PermissionError, FileNotFoundError, ProcessLookupError): + continue + + if not pids: + raise RuntimeError("未检测到 Linux 微信进程") + + pids.sort(key=lambda item: item[1], reverse=True) + for pid, rss_kb in pids: + exe_path = _safe_readlink(f"/proc/{pid}/exe") + print(f"[+] WeChat PID={pid} ({rss_kb // 1024}MB) {exe_path}") + return pids + + +def _get_readable_regions(pid): + """解析 /proc//maps,返回可读内存区域列表。""" + regions = [] + with open(f"/proc/{pid}/maps") as f: + for line in f: + parts = line.split() + if len(parts) < 2: + continue + if "r" not in parts[1]: + continue + start_s, end_s = parts[0].split("-") + start = int(start_s, 16) + size = int(end_s, 16) - start + if 0 < size < 500 * 1024 * 1024: + regions.append((start, size)) + return regions + + +def _collect_db_files(): + db_files = [] + salt_to_dbs = {} + for root, dirs, files in os.walk(DB_DIR): + for name in files: + if not name.endswith(".db") or name.endswith("-wal") or name.endswith("-shm"): + continue + path = os.path.join(root, name) + size = os.path.getsize(path) + if size < PAGE_SZ: + continue + with open(path, "rb") as f: + page1 = f.read(PAGE_SZ) + rel = os.path.relpath(path, DB_DIR) + salt = page1[:SALT_SZ].hex() + db_files.append((rel, path, size, salt, page1)) + salt_to_dbs.setdefault(salt, []).append(rel) + return db_files, salt_to_dbs + + +def _verify_enc_key(enc_key, db_page1): + salt = db_page1[:SALT_SZ] + mac_salt = bytes(b ^ 0x3A for b in salt) + mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ) + hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16] + stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ] + hm = hmac_mod.new(mac_key, hmac_data, hashlib.sha512) + hm.update(struct.pack(" enc_key_hex + remaining_salts = set(salt_to_dbs.keys()) + all_hex_matches = 0 + t0 = time.time() + + for pid, rss_kb in pids: + try: + regions = _get_readable_regions(pid) + except PermissionError: + print(f"[WARN] 无法读取 /proc/{pid}/maps,权限不足,跳过") + continue + + total_bytes = sum(s for _, s in regions) + total_mb = total_bytes / 1024 / 1024 + print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)") + + scanned_bytes = 0 + try: + mem = open(f"/proc/{pid}/mem", "rb") + except PermissionError: + print(f"[WARN] 无法打开 /proc/{pid}/mem,权限不足,跳过") + continue + + try: + for reg_idx, (base, size) in enumerate(regions): + try: + mem.seek(base) + data = mem.read(size) + except (OSError, ValueError): + continue + scanned_bytes += len(data) + + for m in hex_re.finditer(data): + hex_str = m.group(1).decode() + addr = base + m.start() + all_hex_matches += 1 + hex_len = len(hex_str) + + if hex_len == 96: + enc_key_hex = hex_str[:64] + salt_hex = hex_str[64:] + + if salt_hex in remaining_salts: + enc_key = bytes.fromhex(enc_key_hex) + for rel, path, sz, s, page1 in db_files: + if s == salt_hex and _verify_enc_key(enc_key, page1): + key_map[salt_hex] = enc_key_hex + remaining_salts.discard(salt_hex) + dbs = salt_to_dbs[salt_hex] + print(f"\n [FOUND] salt={salt_hex}") + print(f" enc_key={enc_key_hex}") + print(f" PID={pid} 地址: 0x{addr:016X}") + print(f" 数据库: {', '.join(dbs)}") + break + + elif hex_len == 64: + if not remaining_salts: + continue + enc_key_hex = hex_str + enc_key = bytes.fromhex(enc_key_hex) + for rel, path, sz, salt_hex_db, page1 in db_files: + if salt_hex_db in remaining_salts and _verify_enc_key(enc_key, page1): + key_map[salt_hex_db] = enc_key_hex + remaining_salts.discard(salt_hex_db) + dbs = salt_to_dbs[salt_hex_db] + print(f"\n [FOUND] salt={salt_hex_db}") + print(f" enc_key={enc_key_hex}") + print(f" PID={pid} 地址: 0x{addr:016X}") + print(f" 数据库: {', '.join(dbs)}") + break + + elif hex_len > 96 and hex_len % 2 == 0: + enc_key_hex = hex_str[:64] + salt_hex = hex_str[-32:] + + if salt_hex in remaining_salts: + enc_key = bytes.fromhex(enc_key_hex) + for rel, path, sz, s, page1 in db_files: + if s == salt_hex and _verify_enc_key(enc_key, page1): + key_map[salt_hex] = enc_key_hex + remaining_salts.discard(salt_hex) + dbs = salt_to_dbs[salt_hex] + print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})") + print(f" enc_key={enc_key_hex}") + print(f" PID={pid} 地址: 0x{addr:016X}") + print(f" 数据库: {', '.join(dbs)}") + break + + if (reg_idx + 1) % 200 == 0: + elapsed = time.time() - t0 + progress = scanned_bytes / total_bytes * 100 if total_bytes else 100 + print( + f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, " + f"{all_hex_matches} hex patterns, {elapsed:.1f}s" + ) + finally: + mem.close() + + if not remaining_salts: + print(f"\n[+] 所有密钥已找到,跳过剩余进程") + break + + elapsed = time.time() - t0 + print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex 模式") + + # 交叉验证:用已找到的 key 尝试未匹配的 salt + missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys()) + if missing_salts and key_map: + print(f"\n还有 {len(missing_salts)} 个 salt 未匹配,尝试交叉验证...") + for salt_hex in list(missing_salts): + for rel, path, sz, s, page1 in db_files: + if s == salt_hex: + for known_salt, known_key_hex in key_map.items(): + enc_key = bytes.fromhex(known_key_hex) + if _verify_enc_key(enc_key, page1): + key_map[salt_hex] = known_key_hex + print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}") + missing_salts.discard(salt_hex) + break + + # 输出结果 + print(f"\n{'=' * 60}") + print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥") + + result = {} + for rel, path, sz, salt_hex, page1 in db_files: + if salt_hex in key_map: + result[rel] = { + "enc_key": key_map[salt_hex], + "salt": salt_hex, + "size_mb": round(sz / 1024 / 1024, 1) + } + print(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)") + else: + print(f" MISSING: {rel} (salt={salt_hex})") + + if not result: + print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)") + raise RuntimeError("未能从任何微信进程中提取到密钥") + + result["_db_dir"] = DB_DIR + result["_platform"] = "linux" + result["_key_source"] = "memory_scan" + with open(OUT_FILE, 'w', encoding='utf-8') as f: + json.dump(result, f, indent=2, ensure_ascii=False) + print(f"\n密钥保存到: {OUT_FILE}") + + missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map] + if missing: + print(f"\n未找到密钥的数据库:") + for rel in missing: + print(f" {rel}") + + +if __name__ == "__main__": + try: + main() + except RuntimeError as exc: + print(f"\n[ERROR] {exc}") + sys.exit(1) diff --git a/find_all_keys_windows.py b/find_all_keys_windows.py new file mode 100644 index 0000000..85068f6 --- /dev/null +++ b/find_all_keys_windows.py @@ -0,0 +1,276 @@ +""" +从微信进程内存中提取所有数据库的缓存raw key + +WCDB为每个DB缓存: x'<64hex_enc_key><32hex_salt>' +salt嵌在hex字符串中,可以直接匹配DB文件的salt +""" +import ctypes +import ctypes.wintypes as wt +import struct, os, sys, hashlib, time, re, json +import hmac as hmac_mod +from Crypto.Cipher import AES + +import functools +print = functools.partial(print, flush=True) + +kernel32 = ctypes.windll.kernel32 +MEM_COMMIT = 0x1000 +READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80} +PAGE_SZ = 4096 +KEY_SZ = 32 +SALT_SZ = 16 + +from config import load_config +_cfg = load_config() +DB_DIR = _cfg["db_dir"] +OUT_FILE = _cfg["keys_file"] + + +class MBI(ctypes.Structure): + _fields_ = [ + ("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64), + ("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD), + ("RegionSize", ctypes.c_uint64), ("State", wt.DWORD), + ("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD), + ] + + +def get_pids(): + """返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序""" + import subprocess + r = subprocess.run(["tasklist", "/FI", "IMAGENAME eq Weixin.exe", "/FO", "CSV", "/NH"], + capture_output=True, text=True) + pids = [] + for line in r.stdout.strip().split('\n'): + if not line.strip(): + continue + p = line.strip('"').split('","') + if len(p) >= 5: + pid = int(p[1]) + mem = int(p[4].replace(',', '').replace(' K', '').strip() or '0') + pids.append((pid, mem)) + if not pids: + raise RuntimeError("Weixin.exe 未运行") + pids.sort(key=lambda x: x[1], reverse=True) + for pid, mem in pids: + print(f"[+] Weixin.exe PID={pid} ({mem // 1024}MB)") + return pids + + +def read_mem(h, addr, sz): + buf = ctypes.create_string_buffer(sz) + n = ctypes.c_size_t(0) + if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)): + return buf.raw[:n.value] + return None + + +def enum_regions(h): + regs = [] + addr = 0 + mbi = MBI() + while addr < 0x7FFFFFFFFFFF: + if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)) == 0: + break + if mbi.State == MEM_COMMIT and mbi.Protect in READABLE and 0 < mbi.RegionSize < 500 * 1024 * 1024: + regs.append((mbi.BaseAddress, mbi.RegionSize)) + nxt = mbi.BaseAddress + mbi.RegionSize + if nxt <= addr: + break + addr = nxt + return regs + + +def verify_key_for_db(enc_key, db_page1): + """验证enc_key是否能解密这个DB的page 1""" + salt = db_page1[:SALT_SZ] + + # HMAC验证 (最可靠) + mac_salt = bytes(b ^ 0x3a for b in salt) + mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ) + hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16] + stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ] + h = hmac_mod.new(mac_key, hmac_data, hashlib.sha512) + h.update(struct.pack(' 96 and hex_len % 2 == 0: + enc_key_hex = hex_str[:64] + salt_hex = hex_str[-32:] + + if salt_hex in remaining_salts: + enc_key = bytes.fromhex(enc_key_hex) + for rel, path, sz, s, page1 in db_files: + if s == salt_hex and verify_key_for_db(enc_key, page1): + key_map[salt_hex] = enc_key_hex + remaining_salts.discard(salt_hex) + dbs = salt_to_dbs[salt_hex] + print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})") + print(f" enc_key={enc_key_hex}") + print(f" PID={pid} 地址: 0x{addr:016X}") + print(f" 数据库: {', '.join(dbs)}") + break + + if (reg_idx + 1) % 200 == 0: + elapsed = time.time() - t0 + progress = scanned_bytes / total_bytes * 100 if total_bytes else 100 + print( + f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, " + f"{all_hex_matches} hex patterns, {elapsed:.1f}s" + ) + finally: + kernel32.CloseHandle(h) + + if not remaining_salts: + print(f"\n[+] 所有密钥已找到,跳过剩余进程") + break + + elapsed = time.time() - t0 + print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式") + + missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys()) + if missing_salts and key_map: + print(f"\n还有 {len(missing_salts)} 个salt未匹配,尝试交叉验证...") + for salt_hex in list(missing_salts): + for rel, path, sz, s, page1 in db_files: + if s == salt_hex: + for known_salt, known_key_hex in key_map.items(): + enc_key = bytes.fromhex(known_key_hex) + if verify_key_for_db(enc_key, page1): + key_map[salt_hex] = known_key_hex + print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}") + missing_salts.discard(salt_hex) + break + + print(f"\n{'=' * 60}") + print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥") + + result = {} + for rel, path, sz, salt_hex, page1 in db_files: + if salt_hex in key_map: + result[rel] = { + "enc_key": key_map[salt_hex], + "salt": salt_hex, + "size_mb": round(sz / 1024 / 1024, 1) + } + print(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)") + else: + print(f" MISSING: {rel} (salt={salt_hex})") + + if not result: + print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)") + raise RuntimeError("未能从任何微信进程中提取到密钥") + + result["_db_dir"] = DB_DIR + with open(OUT_FILE, 'w') as f: + json.dump(result, f, indent=2) + print(f"\n密钥保存到: {OUT_FILE}") + + missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map] + if missing: + print(f"\n未找到密钥的数据库:") + for rel in missing: + print(f" {rel}") + + +if __name__ == '__main__': + try: + main() + except RuntimeError as e: + print(f"\n[ERROR] {e}") + sys.exit(1) diff --git a/key_utils.py b/key_utils.py new file mode 100644 index 0000000..0ba4c20 --- /dev/null +++ b/key_utils.py @@ -0,0 +1,29 @@ +import os + + +def strip_key_metadata(keys): + """移除 all_keys.json 中以下划线开头的元数据字段。""" + return {k: v for k, v in keys.items() if not k.startswith("_")} + + +def key_path_variants(rel_path): + """生成同一路径的多种分隔符表示,兼容 Windows/Linux JSON key。""" + normalized = rel_path.replace("\\", "/") + variants = [] + for candidate in ( + rel_path, + normalized, + normalized.replace("/", "\\"), + normalized.replace("/", os.sep), + ): + if candidate not in variants: + variants.append(candidate) + return variants + + +def get_key_info(keys, rel_path): + """按相对路径查找数据库密钥,自动兼容不同平台分隔符。""" + for candidate in key_path_variants(rel_path): + if candidate in keys and not candidate.startswith("_"): + return keys[candidate] + return None diff --git a/main.py b/main.py index a7884a5..90b16ee 100644 --- a/main.py +++ b/main.py @@ -11,6 +11,8 @@ import sys import functools print = functools.partial(print, flush=True) +from key_utils import strip_key_metadata + def check_wechat_running(): """检查微信是否在运行,返回 True/False""" @@ -37,6 +39,7 @@ def ensure_keys(keys_file, db_dir): print(f" 旧: {saved_dir}") print(f" 新: {db_dir}") keys = {} + keys = strip_key_metadata(keys) if keys: print(f"[+] 已有 {len(keys)} 个数据库密钥") return @@ -60,7 +63,7 @@ def ensure_keys(keys_file, db_dir): keys = json.load(f) except (json.JSONDecodeError, ValueError): keys = {} - if not keys: + if not strip_key_metadata(keys): print("[!] 未能提取到任何密钥") print(" 可能原因:选择了错误的微信数据目录,或微信需要重启") print(" 请检查 config.json 中的 db_dir 是否与当前登录的微信账号匹配") @@ -79,7 +82,7 @@ def main(): # 2. 检查微信进程 if not check_wechat_running(): - print("[!] 未检测到微信进程 (Weixin.exe)") + print(f"[!] 未检测到微信进程 ({cfg.get('wechat_process', 'WeChat')})") print(" 请先启动微信并登录,然后重新运行") sys.exit(1) print("[+] 微信进程运行中") diff --git a/mcp_server.py b/mcp_server.py index f8f2fa8..bea71ad 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -10,8 +10,9 @@ import hmac as hmac_mod from datetime import datetime from Crypto.Cipher import AES from mcp.server.fastmcp import FastMCP -import zstandard as zstd -from decode_image import ImageResolver +import zstandard as zstd +from decode_image import ImageResolver +from key_utils import get_key_info, key_path_variants, strip_key_metadata # ============ 加密常量 ============ PAGE_SZ = 4096 @@ -49,8 +50,8 @@ if not DECODED_IMAGE_DIR: elif not os.path.isabs(DECODED_IMAGE_DIR): DECODED_IMAGE_DIR = os.path.join(SCRIPT_DIR, DECODED_IMAGE_DIR) -with open(KEYS_FILE) as f: - ALL_KEYS = json.load(f) +with open(KEYS_FILE) as f: + ALL_KEYS = strip_key_metadata(json.load(f)) # ============ 解密函数 ============ @@ -149,7 +150,7 @@ class DBCache: tmp_path = info["path"] if not os.path.exists(tmp_path): continue - rel_path = rel_key.replace('/', os.sep) + rel_path = rel_key.replace('\\', os.sep) db_path = os.path.join(DB_DIR, rel_path) wal_path = db_path + "-wal" try: @@ -174,12 +175,13 @@ class DBCache: except OSError: pass - def get(self, rel_key): - if rel_key not in ALL_KEYS: - return None - rel_path = rel_key.replace('/', os.sep) - db_path = os.path.join(DB_DIR, rel_path) - wal_path = db_path + "-wal" + def get(self, rel_key): + key_info = get_key_info(ALL_KEYS, rel_key) + if not key_info: + return None + rel_path = rel_key.replace('\\', '/').replace('/', os.sep) + db_path = os.path.join(DB_DIR, rel_path) + wal_path = db_path + "-wal" if not os.path.exists(db_path): return None @@ -195,8 +197,8 @@ class DBCache: return c_path tmp_path = self._cache_path(rel_key) - enc_key = bytes.fromhex(ALL_KEYS[rel_key]["enc_key"]) - full_decrypt(db_path, tmp_path, enc_key) + enc_key = bytes.fromhex(key_info["enc_key"]) + full_decrypt(db_path, tmp_path, enc_key) if os.path.exists(wal_path): decrypt_wal(wal_path, tmp_path, enc_key) self._cache[rel_key] = (db_mtime, wal_mtime, tmp_path) @@ -248,7 +250,7 @@ def get_contact_names(): pass # 实时解密 - path = _cache.get("contact/contact.db") + path = _cache.get("contact\\contact.db") if path: try: _contact_names, _contact_full = _load_contacts_from(path) @@ -330,11 +332,12 @@ def _parse_message_content(content, local_type, is_group): # 消息 DB 的 rel_keys(排除 fts/resource/media/biz) -MSG_DB_KEYS = sorted([ - k for k in ALL_KEYS - if k.startswith("message/message_") and k.endswith(".db") - and "fts" not in k and "resource" not in k -]) +MSG_DB_KEYS = sorted([ + k for k in ALL_KEYS + if any(v.startswith("message/") for v in key_path_variants(k)) + and any(v.endswith(".db") for v in key_path_variants(k)) + and "fts" not in k and "resource" not in k +]) def _find_msg_table_for_user(username): @@ -379,7 +382,7 @@ def get_recent_sessions(limit: int = 20) -> str: Args: limit: 返回的会话数量,默认20 """ - path = _cache.get("session/session.db") + path = _cache.get("session\\session.db") if not path: return "错误: 无法解密 session.db" @@ -635,7 +638,7 @@ def get_new_messages() -> str: """获取自上次调用以来的新消息。首次调用返回最近的会话状态。""" global _last_check_state - path = _cache.get("session/session.db") + path = _cache.get("session\\session.db") if not path: return "错误: 无法解密 session.db" diff --git a/monitor.py b/monitor.py index 3f503a4..0169b2d 100644 --- a/monitor.py +++ b/monitor.py @@ -7,8 +7,9 @@ session.db 包含每个聊天的最新消息摘要、发送者、时间戳 import hashlib, struct, os, sys, json, time, sqlite3, io import hmac as hmac_mod from datetime import datetime -from Crypto.Cipher import AES -import zstandard as zstd +from Crypto.Cipher import AES +import zstandard as zstd +from key_utils import get_key_info, strip_key_metadata _zstd_dctx = zstd.ZstdDecompressor() @@ -148,13 +149,13 @@ def main(): print("=" * 60) # 加载密钥 - with open(KEYS_FILE) as f: - keys = json.load(f) - - session_key_info = keys.get("session/session.db") - if not session_key_info: - print("[ERROR] 找不到session.db的密钥") - sys.exit(1) + with open(KEYS_FILE) as f: + keys = strip_key_metadata(json.load(f)) + + session_key_info = get_key_info(keys, os.path.join("session", "session.db")) + if not session_key_info: + print("[ERROR] 找不到session.db的密钥") + sys.exit(1) enc_key = bytes.fromhex(session_key_info["enc_key"]) session_db = os.path.join(DB_DIR, "session", "session.db") diff --git a/monitor_web.py b/monitor_web.py index 2c0c81b..5c5b2bb 100644 --- a/monitor_web.py +++ b/monitor_web.py @@ -13,10 +13,11 @@ from datetime import datetime from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn from Crypto.Cipher import AES -import urllib.parse -import glob as glob_mod -import zstandard as zstd -from decode_image import extract_md5_from_packed_info, decrypt_dat_file, is_v2_format +import urllib.parse +import glob as glob_mod +import zstandard as zstd +from decode_image import extract_md5_from_packed_info, decrypt_dat_file, is_v2_format +from key_utils import get_key_info, strip_key_metadata _zstd_dctx = zstd.ZstdDecompressor() @@ -58,14 +59,14 @@ _emoji_lookup_lock = threading.Lock() _emoji_keys_dict = None # 保存 keys 引用供刷新用 _emoji_last_refresh = 0 -def _build_emoji_lookup(keys_dict): - """从 emoticon.db 构建 emoji md5 → URL 映射(直接解密,不走 cache)""" - global _emoji_lookup, _emoji_keys_dict, _emoji_last_refresh - _emoji_keys_dict = keys_dict - key_info = keys_dict.get("emoticon/emoticon.db") - if not key_info: - print("[emoji] 无 emoticon.db key,跳过", flush=True) - return +def _build_emoji_lookup(keys_dict): + """从 emoticon.db 构建 emoji md5 → URL 映射(直接解密,不走 cache)""" + global _emoji_lookup, _emoji_keys_dict, _emoji_last_refresh + _emoji_keys_dict = keys_dict + key_info = get_key_info(keys_dict, os.path.join("emoticon", "emoticon.db")) + if not key_info: + print("[emoji] 无 emoticon.db key,跳过", flush=True) + return src = os.path.join(DB_DIR, "emoticon", "emoticon.db") if not os.path.exists(src): @@ -252,17 +253,18 @@ class MonitorDBCache: with lock: self._state.pop(rel_key, None) - def get(self, rel_key): - """返回解密后的临时文件路径,mtime 变化时自动重新解密""" - if rel_key not in self.keys: - return None - - lock = self._get_lock(rel_key) - with lock: - enc_key = bytes.fromhex(self.keys[rel_key]["enc_key"]) - rel_path = rel_key.replace('/', os.sep) - db_path = os.path.join(DB_DIR, rel_path) - wal_path = db_path + "-wal" + def get(self, rel_key): + """返回解密后的临时文件路径,mtime 变化时自动重新解密""" + key_info = get_key_info(self.keys, rel_key) + if not key_info: + return None + + lock = self._get_lock(rel_key) + with lock: + enc_key = bytes.fromhex(key_info["enc_key"]) + rel_path = rel_key.replace('\\', '/').replace('/', os.sep) + db_path = os.path.join(DB_DIR, rel_path) + wal_path = db_path + "-wal" if not os.path.exists(db_path): return None @@ -273,8 +275,8 @@ class MonitorDBCache: except OSError: return None - out_name = rel_key.replace('/', '_') - out_path = os.path.join(self.tmp_dir, out_name) + out_name = rel_key.replace('\\', '_').replace('/', '_') + out_path = os.path.join(self.tmp_dir, out_name) prev = self._state.get(rel_key) @@ -313,7 +315,7 @@ def build_username_db_map(): # 先获取每个 DB 的 mtime 用于排序 db_mtimes = {} for i in range(5): - rel_key = f"message/message_{i}.db" + rel_key = f"message\\message_{i}.db" db_path = os.path.join(DB_DIR, "message", f"message_{i}.db") try: db_mtimes[rel_key] = os.path.getmtime(db_path) @@ -326,7 +328,7 @@ def build_username_db_map(): db_path = os.path.join(decrypted_msg_dir, f"message_{i}.db") if not os.path.exists(db_path): continue - rel_key = f"message/message_{i}.db" + rel_key = f"message\\message_{i}.db" try: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) for row in conn.execute("SELECT user_name FROM Name2Id").fetchall(): @@ -595,7 +597,7 @@ class SessionMonitor: # local_id 不全局唯一,需要同时匹配 create_time file_md5 = None for _try in range(2): - res_path = self.db_cache.get("message/message_resource.db") + res_path = self.db_cache.get("message\\message_resource.db") if not res_path: return None try: @@ -620,7 +622,7 @@ class SessionMonitor: except Exception as e: if 'malformed' in str(e) and _try == 0: print(f" [img] resource DB malformed, 强制刷新...", flush=True) - self.db_cache.invalidate("message/message_resource.db") + self.db_cache.invalidate("message\\message_resource.db") continue print(f" [img] 查询 message_resource 失败: {e}", flush=True) return None @@ -751,13 +753,14 @@ class SessionMonitor: if attempt < 2: time.sleep(delays[attempt]) - def _fresh_decrypt_query(self, db_key, table_name, prev_ts, curr_ts): - """独立解密 message DB 到临时文件并查询,避免共享缓存竞态""" - if db_key not in self.db_cache.keys: - return [] - enc_key = bytes.fromhex(self.db_cache.keys[db_key]["enc_key"]) - rel_path = db_key.replace('/', os.sep) - db_path = os.path.join(DB_DIR, rel_path) + def _fresh_decrypt_query(self, db_key, table_name, prev_ts, curr_ts): + """独立解密 message DB 到临时文件并查询,避免共享缓存竞态""" + key_info = get_key_info(self.db_cache.keys, db_key) + if not key_info: + return [] + enc_key = bytes.fromhex(key_info["enc_key"]) + rel_path = db_key.replace('\\', '/').replace('/', os.sep) + db_path = os.path.join(DB_DIR, rel_path) wal_path = db_path + "-wal" if not os.path.exists(db_path): return [] @@ -1872,13 +1875,17 @@ class ThreadedServer(ThreadingMixIn, HTTPServer): def main(): print("=" * 60, flush=True) print(" 微信实时监听 (WAL增量 + SSE推送)", flush=True) - print("=" * 60, flush=True) - - with open(KEYS_FILE) as f: - keys = json.load(f) - - enc_key = bytes.fromhex(keys["session/session.db"]["enc_key"]) - session_db = os.path.join(DB_DIR, "session", "session.db") + print("=" * 60, flush=True) + + with open(KEYS_FILE) as f: + keys = strip_key_metadata(json.load(f)) + + session_key_info = get_key_info(keys, os.path.join("session", "session.db")) + if not session_key_info: + print("[ERROR] 找不到 session.db 的密钥", flush=True) + sys.exit(1) + enc_key = bytes.fromhex(session_key_info["enc_key"]) + session_db = os.path.join(DB_DIR, "session", "session.db") print("加载联系人...", flush=True) contact_names = load_contact_names() @@ -1909,12 +1916,12 @@ def main(): # 后台预热所有 message DB(图片/emoji 解密必需) def _warmup(): try: - t0 = time.perf_counter() - warmup_keys = ["message/message_resource.db"] - for i in range(5): - k = f"message/message_{i}.db" - if k in keys: - warmup_keys.append(k) + t0 = time.perf_counter() + warmup_keys = ["message\\message_resource.db"] + for i in range(5): + k = f"message\\message_{i}.db" + if get_key_info(keys, k): + warmup_keys.append(k) for k in warmup_keys: t1 = time.perf_counter() try: