feat: add Linux support with cross-platform memory scanning

- Add Linux memory scanner (`find_all_keys_linux.py`) using `/proc/<pid>/mem`,
  same approach as Windows/macOS — no GDB, no function offsets, no restart needed
- Extract Windows-specific code to `find_all_keys_windows.py`
- Make `find_all_keys.py` a platform dispatcher (Windows / Linux)
- Add `key_utils.py` for cross-platform path matching (`/` vs `\` in all_keys.json)
- Update `config.py` with Linux auto-detection of db_storage paths
- Update all consumers (decrypt_db, monitor, monitor_web, mcp_server) to use
  `get_key_info()` for platform-agnostic key lookup

Tested on remote Linux container: 15/15 DBs scanned, decrypted, and verified.
feat/daemon-cli
PeanutSplash 2026-03-06 15:52:06 +08:00 committed by ylytdeng
parent 5879b58239
commit f9c338b48d
12 changed files with 1197 additions and 762 deletions

478
README.md
View File

@ -1,238 +1,240 @@
# WeChat 4.0 Database Decryptor # WeChat 4.x Database Decryptor
微信 4.0 (Windows) 本地数据库解密工具。从运行中的微信进程内存提取加密密钥,解密所有 SQLCipher 4 加密数据库,并提供实时消息监听。 微信 4.0 (Windows、MacOS、Linux) 本地数据库解密工具。从运行中的微信进程内存提取加密密钥,解密所有 SQLCipher 4 加密数据库,并提供实时消息监听。
## 更新日志 ## 更新日志
### 2025-03-03 — 富媒体内容 & 组合消息修复 ### 2025-03-03 — 富媒体内容 & 组合消息修复
- **表情包内联显示**: 自动从 emoticon.db 构建 MD5→CDN 映射支持自定义表情NonStore和商店表情StoreCDN 下载后本地缓存 - **表情包内联显示**: 自动从 emoticon.db 构建 MD5→CDN 映射支持自定义表情NonStore和商店表情StoreCDN 下载后本地缓存
- **富媒体内容解析**: 链接卡片type 49、文件、视频号、小程序、引用回复、位置分享等在 Web UI 中完整渲染 - **富媒体内容解析**: 链接卡片type 49、文件、视频号、小程序、引用回复、位置分享等在 Web UI 中完整渲染
- **文字+图片组合消息不再丢失**: 修复同时发送文字和图片时只显示最后一条的问题(前端去重 key 增加消息类型) - **文字+图片组合消息不再丢失**: 修复同时发送文字和图片时只显示最后一条的问题(前端去重 key 增加消息类型)
- **隐藏消息检测**: 新增 `_check_hidden_messages` 机制session.db 只保存最后一条消息摘要,现在会异步查 message DB 找回同一秒内的其他消息 - **隐藏消息检测**: 新增 `_check_hidden_messages` 机制session.db 只保存最后一条消息摘要,现在会异步查 message DB 找回同一秒内的其他消息
- **MonitorDBCache 线程安全**: 引入 per-key 锁,防止多线程并发解密同一数据库导致文件损坏 - **MonitorDBCache 线程安全**: 引入 per-key 锁,防止多线程并发解密同一数据库导致文件损坏
- **Web UI 改进**: 消息气泡样式优化、群聊发送者显示、图片缩略图点击放大 - **Web UI 改进**: 消息气泡样式优化、群聊发送者显示、图片缩略图点击放大
## 原理 ## 原理
微信 4.0 使用 SQLCipher 4 加密本地数据库: 微信 4.0 使用 SQLCipher 4 加密本地数据库:
- **加密算法**: AES-256-CBC + HMAC-SHA512 - **加密算法**: AES-256-CBC + HMAC-SHA512
- **KDF**: PBKDF2-HMAC-SHA512, 256,000 iterations - **KDF**: PBKDF2-HMAC-SHA512, 256,000 iterations
- **页面大小**: 4096 bytes, reserve = 80 (IV 16 + HMAC 64) - **页面大小**: 4096 bytes, reserve = 80 (IV 16 + HMAC 64)
- **每个数据库有独立的 salt 和 enc_key** - **每个数据库有独立的 salt 和 enc_key**
WCDB (微信的 SQLCipher 封装) 会在进程内存中缓存派生后的 raw key格式为 `x'<64hex_enc_key><32hex_salt>'`。本工具通过扫描进程内存中的这种模式,匹配数据库文件的 salt并通过 HMAC 验证来提取正确的密钥。 WCDB (微信的 SQLCipher 封装) 会在进程内存中缓存派生后的 raw key格式为 `x'<64hex_enc_key><32hex_salt>'`。三个平台Windows / Linux / macOS均可通过扫描进程内存匹配此模式再通过 HMAC 校验 page 1 确认密钥正确性。
## 使用方法 ## 使用方法
### 环境要求 ### 环境要求
- Windows 10/11 - Python 3.10+
- Python 3.10+ - 微信 4.x
- 微信 4.0 (正在运行) - `pip install pycryptodome`
- 需要管理员权限 (读取进程内存)
Windows
### 安装依赖
- Windows 10/11
```bash - 微信正在运行
pip install pycryptodome - 需要管理员权限(读取进程内存)
```
Linux
### 快速开始
- 64-bit Linux
确保微信正在运行,以**管理员权限**执行: - 需要 root 权限或 `CAP_SYS_PTRACE`(读取 `/proc/<pid>/mem`
- `db_dir` 默认类似 `~/Documents/xwechat_files/<wxid>/db_storage`
```bash
python main.py # 实时消息监听 (Web UI) ### 安装依赖
python main.py decrypt # 解密全部数据库到 decrypted/
``` ### 快速开始
程序会自动完成:配置检测 → 密钥提取 → 启动。首次运行会自动检测微信数据目录并生成 `config.json` Windows
如果自动检测失败(例如微信安装在非默认位置),手动创建 `config.json` ```bash
```json python main.py
{ python main.py decrypt
"db_dir": "D:\\xwechat_files\\你的微信ID\\db_storage", ```
"keys_file": "all_keys.json",
"decrypted_dir": "decrypted", Linux
"wechat_process": "Weixin.exe"
} ```bash
``` python3 main.py decrypt
```
`db_dir` 路径可以在 微信设置 → 文件管理 中找到。
程序会自动完成:配置检测 → 内存扫描提取密钥 → 解密。首次运行会自动检测微信数据目录并生成 `config.json`。微信只要在运行中即可,无需重启或重新登录。
### Web UI 说明
如果自动检测失败(例如微信安装在非默认位置),手动创建 `config.json`
`python main.py` 启动后打开 http://localhost:5678 查看实时消息流。 ```json
{
- 30ms 轮询 WAL 文件变化 (mtime) "db_dir": "D:\\xwechat_files\\你的微信ID\\db_storage",
- 检测到变化后全量解密 + WAL patch (~70ms) "keys_file": "all_keys.json",
- SSE 实时推送到浏览器 "decrypted_dir": "decrypted",
- 总延迟约 100ms "wechat_process": "Weixin.exe"
- **图片消息内联预览**(支持旧 XOR / V1 / V2 三种 .dat 加密格式) }
```
### MCP Server (Claude AI 集成)
Linux 版 `config.json` 示例:
将微信数据查询能力接入 [Claude Code](https://claude.ai/claude-code),让 AI 直接读取你的微信消息。
```json
```bash {
pip install mcp "db_dir": "/home/yourname/Documents/xwechat_files/your_wxid/db_storage",
``` "keys_file": "all_keys.json",
"decrypted_dir": "decrypted",
注册到 Claude Code "wechat_process": "wechat"
}
```bash ```
claude mcp add wechat -- python C:\Users\你的用户名\wechat-decrypt\mcp_server.py
``` `db_dir` 路径Windows 可在微信设置 → 文件管理中找到Linux 默认在 `~/Documents/xwechat_files/<wxid>/db_storage`
或手动编辑 `~/.claude.json` ### Web UI 说明
```json `python main.py` 启动后打开 http://localhost:5678 查看实时消息流。
{
"mcpServers": { - 30ms 轮询 WAL 文件变化 (mtime)
"wechat": { - 检测到变化后全量解密 + WAL patch (~70ms)
"type": "stdio", - SSE 实时推送到浏览器
"command": "python", - 总延迟约 100ms
"args": ["C:\\Users\\你的用户名\\wechat-decrypt\\mcp_server.py"] - **图片消息内联预览**(支持旧 XOR / V1 / V2 三种 .dat 加密格式)
}
} ### MCP Server (Claude AI 集成)
}
``` 将微信数据查询能力接入 [Claude Code](https://claude.ai/claude-code),让 AI 直接读取你的微信消息。
注册后在 Claude Code 中即可使用以下工具: ```bash
pip install mcp
| Tool | 功能 | ```
|------|------|
| `get_recent_sessions(limit)` | 最近会话列表(含消息摘要、未读数) | 注册到 Claude Code
| `get_chat_history(chat_name, limit)` | 指定聊天的消息记录(支持模糊匹配名字) |
| `search_messages(keyword, limit)` | 全库搜索消息内容 | ```bash
| `get_contacts(query, limit)` | 搜索/列出联系人 | claude mcp add wechat -- python C:\Users\你的用户名\wechat-decrypt\mcp_server.py
| `get_new_messages()` | 获取自上次调用以来的新消息 | ```
前置条件:需要先运行 `python main.py``python find_all_keys.py` 完成密钥提取。 或手动编辑 `~/.claude.json`
**[查看使用案例 →](USAGE.md)** ```json
{
### 图片解密 (V2 格式) "mcpServers": {
"wechat": {
微信 4.0 (2025-08+) 的 .dat 图片文件使用 AES-128-ECB + XOR 混合加密 (V2 格式)。AES 密钥需要从运行中的微信进程内存中提取: "type": "stdio",
"command": "python",
```bash "args": ["C:\\Users\\你的用户名\\wechat-decrypt\\mcp_server.py"]
# 1. 在微信中打开查看 2-3 张图片(点击看大图) }
# 2. 立即运行密钥提取(持续监控版): }
python find_image_key_monitor.py }
```
# 或单次扫描版:
python find_image_key.py 注册后在 Claude Code 中即可使用以下工具:
```
| Tool | 功能 |
密钥会自动保存到 `config.json``image_aes_key` 字段。之后 `monitor_web.py` 启动时会自动加载密钥,图片消息将显示内联预览。 |------|------|
| `get_recent_sessions(limit)` | 最近会话列表(含消息摘要、未读数) |
> **注意**: AES 密钥仅在微信查看图片时临时加载到内存中。如果扫描未找到密钥,请先在微信中查看几张图片,然后立即重新运行脚本。 | `get_chat_history(chat_name, limit)` | 指定聊天的消息记录(支持模糊匹配名字) |
| `search_messages(keyword, limit)` | 全库搜索消息内容 |
#### macOS 图片解密 | `get_contacts(query, limit)` | 搜索/列出联系人 |
| `get_new_messages()` | 获取自上次调用以来的新消息 |
macOS 上使用 C 版工具(通过 Mach VM API + CommonCrypto性能比 Python 高 100 倍):
前置条件:需要先运行 `python main.py``python find_all_keys.py` 完成密钥提取。
**前置条件:**
- Xcode Command Line Tools: `xcode-select --install` **[查看使用案例 →](USAGE.md)**
- 微信需要 ad-hoc 签名:`sudo codesign --force --deep --sign - /Applications/WeChat.app`
- 开发者模式:系统设置 → 隐私与安全 → 开发者模式 → 开启 ### 图片解密 (V2 格式)
```bash 微信 4.0 (2025-08+) 的 .dat 图片文件使用 AES-128-ECB + XOR 混合加密 (V2 格式)。AES 密钥需要从运行中的微信进程内存中提取:
# 编译
cc -O3 -o find_image_key find_image_key.c -framework Security ```bash
cc -O3 -o decrypt_images decrypt_images.c -framework Security # 1. 在微信中打开查看 2-3 张图片(点击看大图)
# 2. 立即运行密钥提取(持续监控版):
# 1. 持续扫描图片密钥(在微信中浏览图片,扫描器自动捕获密钥) python find_image_key_monitor.py
sudo ./find_image_key
# 或单次扫描版:
# 2. 批量解密所有 V2 图片 python find_image_key.py
./decrypt_images ```
```
密钥会自动保存到 `config.json``image_aes_key` 字段。之后 `monitor_web.py` 启动时会自动加载密钥,图片消息将显示内联预览。
`find_image_key` 会自动发现所有未解密的 V2 图片 pattern持续扫描微信进程内存。当用户在微信中浏览图片时捕获密钥保存到 `image_keys.json`。支持 `--deep` 模式进行逐字节深度扫描。
> **注意**: AES 密钥仅在微信查看图片时临时加载到内存中。如果扫描未找到密钥,请先在微信中查看几张图片,然后立即重新运行脚本。
## 文件说明
## 文件说明
| 文件 | 说明 |
|------|------| | 文件 | 说明 |
| `main.py` | **一键启动入口** — 自动配置、提取密钥、启动服务 | |------|------|
| `config.py` | 配置加载器(自动检测微信数据目录) | | `main.py` | **一键启动入口** — 自动配置、提取密钥、启动服务 |
| `find_all_keys.py` | 从微信进程内存提取所有数据库密钥 | | `config.py` | 配置加载器(自动检测微信数据目录) |
| `decrypt_db.py` | 全量解密所有数据库 | | `find_all_keys.py` | 平台分发入口Windows / Linux |
| `mcp_server.py` | MCP Server让 Claude AI 查询微信数据 | | `find_all_keys_windows.py` | Windows 版内存扫描提 key |
| `monitor_web.py` | 实时消息监听 (Web UI + SSE + 图片预览) | | `find_all_keys_linux.py` | Linux 版内存扫描提 key |
| `monitor.py` | 实时消息监听 (命令行) | | `decrypt_db.py` | 全量解密所有数据库 |
| `decode_image.py` | 图片 .dat 文件解密模块 (XOR / V1 / V2) | | `mcp_server.py` | MCP Server让 Claude AI 查询微信数据 |
| `find_image_key.py` | 从微信进程内存提取图片 AES 密钥 | | `monitor_web.py` | 实时消息监听 (Web UI + SSE + 图片预览) |
| `find_image_key_monitor.py` | 持续监控版密钥提取(推荐) | | `monitor.py` | 实时消息监听 (命令行) |
| `latency_test.py` | 延迟测量诊断工具 | | `decode_image.py` | 图片 .dat 文件解密模块 (XOR / V1 / V2) |
| `find_all_keys_macos.c` | macOS 版内存密钥扫描器 (C, Mach VM API) | | `find_image_key.py` | 从微信进程内存提取图片 AES 密钥 |
| `find_image_key.c` | macOS 版图片密钥扫描器 (C, 持续监控模式) | | `find_image_key_monitor.py` | 持续监控版密钥提取(推荐) |
| `decrypt_images.c` | macOS 版批量图片解密器 (C, 多密钥支持) | | `latency_test.py` | 延迟测量诊断工具 |
| `find_all_keys_macos.c` | macOS 版内存密钥扫描器 (C, Mach VM API) |
## 技术细节
## 技术细节
### WAL 处理
### WAL 处理
微信使用 SQLite WAL 模式WAL 文件是**预分配固定大小** (4MB)。检测变化时:
- 不能用文件大小 (永远不变) 微信使用 SQLite WAL 模式WAL 文件是**预分配固定大小** (4MB)。检测变化时:
- 使用 mtime 检测写入 - 不能用文件大小 (永远不变)
- 解密 WAL frame 时需校验 salt 值,跳过旧周期遗留的 frame - 使用 mtime 检测写入
- 解密 WAL frame 时需校验 salt 值,跳过旧周期遗留的 frame
### 图片 .dat 加密格式
### 图片 .dat 加密格式
微信本地图片 (.dat) 有三种加密格式:
微信本地图片 (.dat) 有三种加密格式:
| 格式 | 时期 | Magic | 加密方式 | 密钥来源 |
|------|------|-------|---------|---------| | 格式 | 时期 | Magic | 加密方式 | 密钥来源 |
| 旧 XOR | ~2025-07 | 无 | 单字节 XOR | 自动检测 (对比 magic bytes) | |------|------|-------|---------|---------|
| V1 | 过渡期 | `07 08 V1 08 07` | AES-ECB + XOR | 固定 key: `cfcd208495d565ef` | | 旧 XOR | ~2025-07 | 无 | 单字节 XOR | 自动检测 (对比 magic bytes) |
| V2 | 2025-08+ | `07 08 V2 08 07` | AES-128-ECB + XOR | 从进程内存提取 | | V1 | 过渡期 | `07 08 V1 08 07` | AES-ECB + XOR | 固定 key: `cfcd208495d565ef` |
| V2 | 2025-08+ | `07 08 V2 08 07` | AES-128-ECB + XOR | 从进程内存提取 |
V2 文件结构: `[6B signature] [4B aes_size LE] [4B xor_size LE] [1B padding]` + `[AES-ECB encrypted] [raw unencrypted] [XOR encrypted]`
V2 文件结构: `[6B signature] [4B aes_size LE] [4B xor_size LE] [1B padding]` + `[AES-ECB encrypted] [raw unencrypted] [XOR encrypted]`
### 数据库结构
### 数据库结构
解密后包含约 26 个数据库:
- `session/session.db` - 会话列表 (最新消息摘要) 解密后包含约 26 个数据库:
- `message/message_*.db` - 聊天记录 - `session/session.db` - 会话列表 (最新消息摘要)
- `contact/contact.db` - 联系人 - `message/message_*.db` - 聊天记录
- `media_*/media_*.db` - 媒体文件索引 - `contact/contact.db` - 联系人
- 其他: head_image, favorite, sns, emoticon 等 - `media_*/media_*.db` - 媒体文件索引
- 其他: head_image, favorite, sns, emoticon 等
## macOS 数据库密钥扫描 (WeChat 4.x)
## macOS 数据库密钥扫描 (WeChat 4.x)
macOS 版微信 4.x 使用 SQLCipher 4 加密本地数据库,密钥格式为 `x'<64hex_key><32hex_salt>'`。C 版扫描器通过 Mach VM API 扫描微信进程内存提取密钥。
### 前置条件 macOS 版微信 4.x 使用 SQLCipher 4 加密本地数据库,密钥格式为 `x'<64hex_key><32hex_salt>'`。C 版扫描器通过 Mach VM API 扫描微信进程内存提取密钥。
- macOS (Apple Silicon / Intel) ### 前置条件
- WeChat 4.x (macOS 版)
- Xcode Command Line Tools: `xcode-select --install` - macOS (Apple Silicon / Intel)
- 微信需要 ad-hoc 签名(或安装了防撤回补丁): - WeChat 4.x (macOS 版)
`sudo codesign --force --deep --sign - /Applications/WeChat.app` - Xcode Command Line Tools: `xcode-select --install`
- 微信需要 ad-hoc 签名(或安装了防撤回补丁):
### 编译和使用 `sudo codesign --force --deep --sign - /Applications/WeChat.app`
```bash ### 编译和使用
# 编译
cc -O2 -o find_all_keys_macos find_all_keys_macos.c -framework Foundation ```bash
# 编译
# 运行(自动查找微信进程、扫描内存、匹配 DB salt cc -O2 -o find_all_keys_macos find_all_keys_macos.c -framework Foundation
sudo ./find_all_keys_macos
# 运行(自动查找微信进程、扫描内存、匹配 DB salt
# 或指定 PID sudo ./find_all_keys_macos
sudo ./find_all_keys_macos <pid>
``` # 或指定 PID
sudo ./find_all_keys_macos <pid>
输出 `all_keys.json`,格式兼容 `decrypt_db.py`,可直接用于解密: ```
```bash 输出 `all_keys.json`,格式兼容 `decrypt_db.py`,可直接用于解密:
python3 decrypt_db.py
``` ```bash
python3 decrypt_db.py
## 免责声明 ```
本工具仅用于学习和研究目的,用于解密**自己的**微信数据。请遵守相关法律法规,不要用于未经授权的数据访问。 ## 免责声明
本工具仅用于学习和研究目的,用于解密**自己的**微信数据。请遵守相关法律法规,不要用于未经授权的数据访问。

View File

@ -1,6 +1,6 @@
{ {
"db_dir": "D:\\xwechat_files\\your_wxid\\db_storage", "db_dir": "D:\\xwechat_files\\your_wxid\\db_storage",
"keys_file": "all_keys.json", "keys_file": "all_keys.json",
"decrypted_dir": "decrypted", "decrypted_dir": "decrypted",
"wechat_process": "Weixin.exe" "wechat_process": "Weixin.exe"
} }

339
config.py
View File

@ -1,138 +1,201 @@
""" """
配置加载器 - config.json 读取路径配置 配置加载器 - config.json 读取路径配置
首次运行时自动检测微信数据目录检测失败则提示手动配置 首次运行时自动检测微信数据目录检测失败则提示手动配置
""" """
import glob import glob
import json import json
import os import os
import sys import platform
import sys
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
_DEFAULT_TEMPLATE_DIR = r"D:\xwechat_files\your_wxid\db_storage"
_SYSTEM = platform.system().lower()
_DEFAULT = { _DEFAULT_TEMPLATE_DIR = (
"db_dir": _DEFAULT_TEMPLATE_DIR, os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage")
"keys_file": "all_keys.json", if _SYSTEM == "linux"
"decrypted_dir": "decrypted", else r"D:\xwechat_files\your_wxid\db_storage"
"decoded_image_dir": "decoded_images", )
"wechat_process": "Weixin.exe",
} _DEFAULT = {
"db_dir": _DEFAULT_TEMPLATE_DIR,
"keys_file": "all_keys.json",
def auto_detect_db_dir(): "decrypted_dir": "decrypted",
"""从微信本地配置自动检测 db_storage 路径。 "decoded_image_dir": "decoded_images",
"wechat_process": "wechat" if _SYSTEM == "linux" else "Weixin.exe",
读取 %APPDATA%\\Tencent\\xwechat\\config\\*.ini }
找到数据存储根目录然后匹配 xwechat_files\\*\\db_storage
"""
appdata = os.environ.get("APPDATA", "") def _choose_candidate(candidates):
config_dir = os.path.join(appdata, "Tencent", "xwechat", "config") """在多个候选目录中选择一个。"""
if not os.path.isdir(config_dir): if len(candidates) == 1:
return None return candidates[0]
if len(candidates) > 1:
# 从 ini 文件中找到有效的目录路径 if not sys.stdin.isatty():
data_roots = [] return candidates[0]
for ini_file in glob.glob(os.path.join(config_dir, "*.ini")): print("[!] 检测到多个微信数据目录(请选择当前正在运行的微信账号):")
try: for i, c in enumerate(candidates, 1):
# 微信 ini 可能是 utf-8 或 gbk 编码(中文路径) print(f" {i}. {c}")
content = None print(" 0. 跳过,稍后手动配置")
for enc in ("utf-8", "gbk"): try:
try: while True:
with open(ini_file, "r", encoding=enc) as f: choice = input("请选择 [0-{}]: ".format(len(candidates))).strip()
content = f.read(1024).strip() if choice == "0":
break return None
except UnicodeDecodeError: if choice.isdigit() and 1 <= int(choice) <= len(candidates):
continue return candidates[int(choice) - 1]
if not content or any(c in content for c in "\n\r\x00"): print(" 无效输入,请重新选择")
continue except (EOFError, KeyboardInterrupt):
if os.path.isdir(content): print()
data_roots.append(content) return None
except OSError: return None
continue
# 在每个根目录下搜索 xwechat_files\*\db_storage def _auto_detect_db_dir_windows():
seen = set() """从微信本地配置自动检测 Windows db_storage 路径。
candidates = []
for root in data_roots: 读取 %APPDATA%\\Tencent\\xwechat\\config\\*.ini
pattern = os.path.join(root, "xwechat_files", "*", "db_storage") 找到数据存储根目录然后匹配 xwechat_files\\*\\db_storage
for match in glob.glob(pattern): """
normalized = os.path.normcase(os.path.normpath(match)) appdata = os.environ.get("APPDATA", "")
if os.path.isdir(match) and normalized not in seen: config_dir = os.path.join(appdata, "Tencent", "xwechat", "config")
seen.add(normalized) if not os.path.isdir(config_dir):
candidates.append(match) return None
if len(candidates) == 1: # 从 ini 文件中找到有效的目录路径
return candidates[0] data_roots = []
if len(candidates) > 1: for ini_file in glob.glob(os.path.join(config_dir, "*.ini")):
# 非交互环境MCP、无 stdin 管道等)直接取第一个 try:
if not sys.stdin.isatty(): # 微信 ini 可能是 utf-8 或 gbk 编码(中文路径)
return candidates[0] content = None
print("[!] 检测到多个微信数据目录(请选择当前正在运行的微信账号):") for enc in ("utf-8", "gbk"):
for i, c in enumerate(candidates, 1): try:
print(f" {i}. {c}") with open(ini_file, "r", encoding=enc) as f:
print(" 0. 跳过,稍后手动配置") content = f.read(1024).strip()
try: break
while True: except UnicodeDecodeError:
choice = input("请选择 [0-{}]: ".format(len(candidates))).strip() continue
if choice == "0": if not content or any(c in content for c in "\n\r\x00"):
return None continue
if choice.isdigit() and 1 <= int(choice) <= len(candidates): if os.path.isdir(content):
return candidates[int(choice) - 1] data_roots.append(content)
print(" 无效输入,请重新选择") except OSError:
except (EOFError, KeyboardInterrupt): continue
print()
return None # 在每个根目录下搜索 xwechat_files\*\db_storage
return None seen = set()
candidates = []
for root in data_roots:
def load_config(): pattern = os.path.join(root, "xwechat_files", "*", "db_storage")
cfg = {} for match in glob.glob(pattern):
if os.path.exists(CONFIG_FILE): normalized = os.path.normcase(os.path.normpath(match))
try: if os.path.isdir(match) and normalized not in seen:
with open(CONFIG_FILE) as f: seen.add(normalized)
cfg = json.load(f) candidates.append(match)
except json.JSONDecodeError:
print(f"[!] {CONFIG_FILE} 格式损坏,将使用默认配置") return _choose_candidate(candidates)
cfg = {}
# db_dir 缺失或仍为模板值时,尝试自动检测 def _auto_detect_db_dir_linux():
db_dir = cfg.get("db_dir", "") """自动检测 Linux 微信 db_storage 路径。"""
if not db_dir or db_dir == _DEFAULT_TEMPLATE_DIR or "your_wxid" in db_dir: seen = set()
detected = auto_detect_db_dir() candidates = []
if detected: search_roots = {
print(f"[+] 自动检测到微信数据目录: {detected}") os.path.expanduser("~/Documents/xwechat_files"),
# 合并默认值并保存 }
cfg = {**_DEFAULT, **cfg, "db_dir": detected}
with open(CONFIG_FILE, "w") as f: if os.path.isdir("/home"):
json.dump(cfg, f, indent=4, ensure_ascii=False) for entry in os.listdir("/home"):
print(f"[+] 已保存到: {CONFIG_FILE}") search_roots.add(os.path.join("/home", entry, "Documents", "xwechat_files"))
else:
if not os.path.exists(CONFIG_FILE): for root in search_roots:
with open(CONFIG_FILE, "w") as f: if not os.path.isdir(root):
json.dump(_DEFAULT, f, indent=4) continue
print(f"[!] 未能自动检测微信数据目录") pattern = os.path.join(root, "*", "db_storage")
print(f" 请手动编辑 {CONFIG_FILE} 中的 db_dir 字段") for match in glob.glob(pattern):
print(f" 路径可在 微信设置 → 文件管理 中找到") normalized = os.path.normcase(os.path.normpath(match))
sys.exit(1) if os.path.isdir(match) and normalized not in seen:
seen.add(normalized)
# 将相对路径转为绝对路径 candidates.append(match)
base = os.path.dirname(os.path.abspath(__file__))
for key in ("keys_file", "decrypted_dir", "decoded_image_dir"): old_path = os.path.expanduser("~/.local/share/weixin/data/db_storage")
if key in cfg and not os.path.isabs(cfg[key]): if os.path.isdir(old_path):
cfg[key] = os.path.join(base, cfg[key]) normalized = os.path.normcase(os.path.normpath(old_path))
if normalized not in seen:
# 自动推导微信数据根目录db_dir 的上级目录) candidates.append(old_path)
# db_dir 格式: D:\xwechat_files\<wxid>\db_storage
# base_dir 格式: D:\xwechat_files\<wxid> # Linux 优先使用最近活跃账号:按 message 目录 mtime 降序
db_dir = cfg.get("db_dir", "") def _mtime(path):
if db_dir and os.path.basename(db_dir) == "db_storage": msg_dir = os.path.join(path, "message")
cfg["wechat_base_dir"] = os.path.dirname(db_dir) target = msg_dir if os.path.isdir(msg_dir) else path
else: try:
cfg["wechat_base_dir"] = db_dir return os.path.getmtime(target)
except OSError:
# decoded_image_dir 默认值 return 0
if "decoded_image_dir" not in cfg:
cfg["decoded_image_dir"] = os.path.join(base, "decoded_images") candidates.sort(key=_mtime, reverse=True)
return _choose_candidate(candidates)
return cfg
def auto_detect_db_dir():
if _SYSTEM == "windows":
return _auto_detect_db_dir_windows()
if _SYSTEM == "linux":
return _auto_detect_db_dir_linux()
return None
def load_config():
cfg = {}
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE) as f:
cfg = json.load(f)
except json.JSONDecodeError:
print(f"[!] {CONFIG_FILE} 格式损坏,将使用默认配置")
cfg = {}
cfg = {**_DEFAULT, **cfg}
# db_dir 缺失或仍为模板值时,尝试自动检测
db_dir = cfg.get("db_dir", "")
if not db_dir or db_dir == _DEFAULT_TEMPLATE_DIR or "your_wxid" in db_dir:
detected = auto_detect_db_dir()
if detected:
print(f"[+] 自动检测到微信数据目录: {detected}")
# 合并默认值并保存
cfg = {**_DEFAULT, **cfg, "db_dir": detected}
with open(CONFIG_FILE, "w") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
print(f"[+] 已保存到: {CONFIG_FILE}")
else:
if not os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "w") as f:
json.dump(_DEFAULT, f, indent=4, ensure_ascii=False)
print(f"[!] 未能自动检测微信数据目录")
print(f" 请手动编辑 {CONFIG_FILE} 中的 db_dir 字段")
if _SYSTEM == "linux":
print(" Linux 默认路径类似: ~/Documents/xwechat_files/<wxid>/db_storage")
else:
print(f" 路径可在 微信设置 → 文件管理 中找到")
sys.exit(1)
# 将相对路径转为绝对路径
base = os.path.dirname(os.path.abspath(__file__))
for key in ("keys_file", "decrypted_dir", "decoded_image_dir"):
if key in cfg and not os.path.isabs(cfg[key]):
cfg[key] = os.path.join(base, cfg[key])
# 自动推导微信数据根目录db_dir 的上级目录)
# db_dir 格式: D:\xwechat_files\<wxid>\db_storage
# base_dir 格式: D:\xwechat_files\<wxid>
db_dir = cfg.get("db_dir", "")
if db_dir and os.path.basename(db_dir) == "db_storage":
cfg["wechat_base_dir"] = os.path.dirname(db_dir)
else:
cfg["wechat_base_dir"] = db_dir
# decoded_image_dir 默认值
if "decoded_image_dir" not in cfg:
cfg["decoded_image_dir"] = os.path.join(base, "decoded_images")
return cfg

View File

@ -7,7 +7,7 @@ WeChat 4.0 数据库解密器
""" """
import hashlib, struct, os, sys, json import hashlib, struct, os, sys, json
import hmac as hmac_mod import hmac as hmac_mod
from Crypto.Cipher import AES from Crypto.Cipher import AES
import functools import functools
print = functools.partial(print, flush=True) print = functools.partial(print, flush=True)
@ -20,11 +20,12 @@ HMAC_SZ = 64
RESERVE_SZ = 80 # IV(16) + HMAC(64) RESERVE_SZ = 80 # IV(16) + HMAC(64)
SQLITE_HDR = b'SQLite format 3\x00' SQLITE_HDR = b'SQLite format 3\x00'
from config import load_config from config import load_config
_cfg = load_config() from key_utils import get_key_info, strip_key_metadata
DB_DIR = _cfg["db_dir"] _cfg = load_config()
OUT_DIR = _cfg["decrypted_dir"] DB_DIR = _cfg["db_dir"]
KEYS_FILE = _cfg["keys_file"] OUT_DIR = _cfg["decrypted_dir"]
KEYS_FILE = _cfg["keys_file"]
def derive_mac_key(enc_key, salt): def derive_mac_key(enc_key, salt):
@ -115,13 +116,13 @@ def main():
print("请先运行 find_all_keys.py") print("请先运行 find_all_keys.py")
sys.exit(1) sys.exit(1)
with open(KEYS_FILE) as f: with open(KEYS_FILE) as f:
keys = json.load(f) keys = json.load(f)
keys.pop("_db_dir", None) keys = strip_key_metadata(keys)
print(f"\n加载 {len(keys)} 个数据库密钥") print(f"\n加载 {len(keys)} 个数据库密钥")
print(f"输出目录: {OUT_DIR}") print(f"输出目录: {OUT_DIR}")
os.makedirs(OUT_DIR, exist_ok=True) os.makedirs(OUT_DIR, exist_ok=True)
# 收集所有DB文件 # 收集所有DB文件
db_files = [] db_files = []
@ -129,7 +130,7 @@ def main():
for f in files: for f in files:
if f.endswith('.db') and not f.endswith('-wal') and not f.endswith('-shm'): if f.endswith('.db') and not f.endswith('-wal') and not f.endswith('-shm'):
path = os.path.join(root, f) path = os.path.join(root, f)
rel = os.path.relpath(path, DB_DIR).replace('\\', '/') rel = os.path.relpath(path, DB_DIR)
sz = os.path.getsize(path) sz = os.path.getsize(path)
db_files.append((rel, path, sz)) db_files.append((rel, path, sz))
@ -141,16 +142,15 @@ def main():
failed = 0 failed = 0
total_bytes = 0 total_bytes = 0
for rel, path, sz in db_files: for rel, path, sz in db_files:
# 统一用正斜杠查找key key_info = get_key_info(keys, rel)
rel_key = rel.replace('\\', '/') if not key_info:
if rel_key not in keys: print(f"SKIP: {rel} (无密钥)")
print(f"SKIP: {rel} (无密钥)") failed += 1
failed += 1 continue
continue
enc_key = bytes.fromhex(key_info["enc_key"])
enc_key = bytes.fromhex(keys[rel_key]["enc_key"]) out_path = os.path.join(OUT_DIR, rel)
out_path = os.path.join(OUT_DIR, rel)
print(f"解密: {rel} ({sz/1024/1024:.1f}MB) ...", end=" ") print(f"解密: {rel} ({sz/1024/1024:.1f}MB) ...", end=" ")

View File

@ -1,275 +1,29 @@
""" import platform
从微信进程内存中提取所有数据库的缓存raw key import sys
WCDB为每个DB缓存: x'<64hex_enc_key><32hex_salt>'
salt嵌在hex字符串中可以直接匹配DB文件的salt def _load_impl():
""" system = platform.system().lower()
import ctypes if system == "windows":
import ctypes.wintypes as wt import find_all_keys_windows as impl
import struct, os, sys, hashlib, time, re, json return impl
import hmac as hmac_mod if system == "linux":
from Crypto.Cipher import AES import find_all_keys_linux as impl
return impl
import functools raise RuntimeError(f"当前平台暂不支持通过 find_all_keys.py 提取密钥: {platform.system()}")
print = functools.partial(print, flush=True)
kernel32 = ctypes.windll.kernel32 def get_pids():
MEM_COMMIT = 0x1000 return _load_impl().get_pids()
READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80}
PAGE_SZ = 4096
KEY_SZ = 32 def main():
SALT_SZ = 16 return _load_impl().main()
from config import load_config
_cfg = load_config() if __name__ == "__main__":
DB_DIR = _cfg["db_dir"] try:
OUT_FILE = _cfg["keys_file"] main()
except RuntimeError as exc:
class MBI(ctypes.Structure): print(f"\n[ERROR] {exc}")
_fields_ = [ sys.exit(1)
("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64),
("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD),
("RegionSize", ctypes.c_uint64), ("State", wt.DWORD),
("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD),
]
def get_pids():
"""返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序"""
import subprocess
r = subprocess.run(["tasklist","/FI","IMAGENAME eq Weixin.exe","/FO","CSV","/NH"],
capture_output=True, text=True)
pids = []
for line in r.stdout.strip().split('\n'):
if not line.strip(): continue
p = line.strip('"').split('","')
if len(p)>=5:
pid=int(p[1]); mem=int(p[4].replace(',','').replace(' K','').strip() or '0')
pids.append((pid, mem))
if not pids: raise RuntimeError("Weixin.exe 未运行")
pids.sort(key=lambda x: x[1], reverse=True)
for pid, mem in pids:
print(f"[+] Weixin.exe PID={pid} ({mem//1024}MB)")
return pids
def read_mem(h, addr, sz):
buf = ctypes.create_string_buffer(sz)
n = ctypes.c_size_t(0)
if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)):
return buf.raw[:n.value]
return None
def enum_regions(h):
regs = []
addr = 0
mbi = MBI()
while addr < 0x7FFFFFFFFFFF:
if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi))==0: break
if mbi.State==MEM_COMMIT and mbi.Protect in READABLE and 0<mbi.RegionSize<500*1024*1024:
regs.append((mbi.BaseAddress, mbi.RegionSize))
nxt = mbi.BaseAddress + mbi.RegionSize
if nxt<=addr: break
addr = nxt
return regs
def verify_key_for_db(enc_key, db_page1):
"""验证enc_key是否能解密这个DB的page 1"""
salt = db_page1[:SALT_SZ]
iv = db_page1[PAGE_SZ - 80 : PAGE_SZ - 64]
encrypted = db_page1[SALT_SZ : PAGE_SZ - 80]
# HMAC验证 (最可靠)
mac_salt = bytes(b ^ 0x3a for b in salt)
mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ)
hmac_data = db_page1[SALT_SZ : PAGE_SZ - 80 + 16]
stored_hmac = db_page1[PAGE_SZ - 64 : PAGE_SZ]
h = hmac_mod.new(mac_key, hmac_data, hashlib.sha512)
h.update(struct.pack('<I', 1))
return h.digest() == stored_hmac
def main():
print("=" * 60)
print(" 提取所有微信数据库密钥")
print("=" * 60)
# 1. 收集所有DB文件及其salt
db_files = []
salt_to_dbs = {} # salt_hex -> [(rel_path, db_page1), ...]
for root, dirs, files in os.walk(DB_DIR):
for f in files:
if f.endswith('.db') and not f.endswith('-wal') and not f.endswith('-shm'):
path = os.path.join(root, f)
rel = os.path.relpath(path, DB_DIR).replace('\\', '/')
sz = os.path.getsize(path)
if sz < PAGE_SZ:
continue
with open(path, 'rb') as fh:
page1 = fh.read(PAGE_SZ)
salt = page1[:SALT_SZ].hex()
db_files.append((rel, path, sz, salt, page1))
if salt not in salt_to_dbs:
salt_to_dbs[salt] = []
salt_to_dbs[salt].append(rel)
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的salt")
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
print(f" salt {salt_hex}: {', '.join(dbs)}")
# 2. 打开所有微信进程
pids = get_pids()
hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'")
key_map = {} # salt_hex -> enc_key_hex
remaining_salts = set(salt_to_dbs.keys())
all_hex_matches = 0
t0 = time.time()
for proc_idx, (pid, mem) in enumerate(pids):
h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid)
if not h:
print(f"[WARN] 无法打开进程 PID={pid},跳过")
continue
try:
regions = enum_regions(h)
total_bytes = sum(s for _,s in regions)
total_mb = total_bytes/1024/1024
print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)")
scanned_bytes = 0
for reg_idx, (base, size) in enumerate(regions):
data = read_mem(h, base, size)
scanned_bytes += size
if not data: continue
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base + m.start()
all_hex_matches += 1
hex_len = len(hex_str)
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len == 64:
if not remaining_salts:
continue
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db in remaining_salts:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
remaining_salts.discard(salt_hex_db)
dbs = salt_to_dbs[salt_hex_db]
print(f"\n [FOUND] salt={salt_hex_db}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
print(f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s")
finally:
kernel32.CloseHandle(h)
# 所有 salt 都找到了就提前退出
if not remaining_salts:
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
break
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式")
# 4. 如果有未找到的salt用已找到的key做交叉验证
# (WCDB有时对同一passphrase的不同DB用同一enc_key如果salt相同)
missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys())
if missing_salts and key_map:
print(f"\n还有 {len(missing_salts)} 个salt未匹配尝试交叉验证...")
for salt_hex in list(missing_salts):
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
for known_salt, known_key_hex in key_map.items():
enc_key = bytes.fromhex(known_key_hex)
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = known_key_hex
print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}")
missing_salts.discard(salt_hex)
break
# 5. 输出结果
print(f"\n{'='*60}")
print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥")
result = {}
for rel, path, sz, salt_hex, page1 in db_files:
if salt_hex in key_map:
result[rel] = {
"enc_key": key_map[salt_hex],
"salt": salt_hex,
"size_mb": round(sz/1024/1024, 1)
}
print(f" OK: {rel} ({sz/1024/1024:.1f}MB)")
else:
print(f" MISSING: {rel} (salt={salt_hex})")
if not result:
print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)")
raise RuntimeError("未能从任何微信进程中提取到密钥")
# 写入密钥并记录对应的 db_dir防止切换账号后误复用
result["_db_dir"] = DB_DIR
with open(OUT_FILE, 'w') as f:
json.dump(result, f, indent=2)
print(f"\n密钥保存到: {OUT_FILE}")
missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map]
if missing:
print(f"\n未找到密钥的数据库:")
for rel in missing:
print(f" {rel}")
if __name__ == '__main__':
try:
main()
except RuntimeError as e:
print(f"\n[ERROR] {e}")
sys.exit(1)

View File

@ -0,0 +1,297 @@
"""
Linux 版微信数据库密钥提取
原理: Windows/macOS 相同 扫描微信进程内存查找
WCDB 缓存的 x'<64hex_enc_key><32hex_salt>' 模式
通过匹配数据库 salt + HMAC 校验确认密钥
读取方式: /proc/<pid>/maps + /proc/<pid>/mem
权限要求: root CAP_SYS_PTRACE
"""
import functools
import hashlib
import hmac as hmac_mod
import json
import os
import re
import struct
import sys
import time
print = functools.partial(print, flush=True)
PAGE_SZ = 4096
KEY_SZ = 32
SALT_SZ = 16
from config import load_config
_cfg = load_config()
DB_DIR = _cfg["db_dir"]
OUT_FILE = _cfg["keys_file"]
def _safe_readlink(path):
try:
return os.path.realpath(os.readlink(path))
except OSError:
return ""
def get_pids():
"""返回所有疑似微信主进程的 (pid, rss_kb) 列表,按内存降序。"""
pids = []
for pid_str in os.listdir("/proc"):
if not pid_str.isdigit():
continue
pid = int(pid_str)
try:
with open(f"/proc/{pid}/comm") as f:
comm = f.read().strip()
with open(f"/proc/{pid}/statm") as f:
rss_pages = int(f.read().split()[1])
rss_kb = rss_pages * 4
exe_name = os.path.basename(_safe_readlink(f"/proc/{pid}/exe")) or comm
haystack = " ".join((comm, exe_name)).lower()
if "wechat" not in haystack and "weixin" not in haystack:
continue
pids.append((pid, rss_kb))
except (PermissionError, FileNotFoundError, ProcessLookupError):
continue
if not pids:
raise RuntimeError("未检测到 Linux 微信进程")
pids.sort(key=lambda item: item[1], reverse=True)
for pid, rss_kb in pids:
exe_path = _safe_readlink(f"/proc/{pid}/exe")
print(f"[+] WeChat PID={pid} ({rss_kb // 1024}MB) {exe_path}")
return pids
def _get_readable_regions(pid):
"""解析 /proc/<pid>/maps返回可读内存区域列表。"""
regions = []
with open(f"/proc/{pid}/maps") as f:
for line in f:
parts = line.split()
if len(parts) < 2:
continue
if "r" not in parts[1]:
continue
start_s, end_s = parts[0].split("-")
start = int(start_s, 16)
size = int(end_s, 16) - start
if 0 < size < 500 * 1024 * 1024:
regions.append((start, size))
return regions
def _collect_db_files():
db_files = []
salt_to_dbs = {}
for root, dirs, files in os.walk(DB_DIR):
for name in files:
if not name.endswith(".db") or name.endswith("-wal") or name.endswith("-shm"):
continue
path = os.path.join(root, name)
size = os.path.getsize(path)
if size < PAGE_SZ:
continue
with open(path, "rb") as f:
page1 = f.read(PAGE_SZ)
rel = os.path.relpath(path, DB_DIR)
salt = page1[:SALT_SZ].hex()
db_files.append((rel, path, size, salt, page1))
salt_to_dbs.setdefault(salt, []).append(rel)
return db_files, salt_to_dbs
def _verify_enc_key(enc_key, db_page1):
salt = db_page1[:SALT_SZ]
mac_salt = bytes(b ^ 0x3A for b in salt)
mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ)
hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16]
stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ]
hm = hmac_mod.new(mac_key, hmac_data, hashlib.sha512)
hm.update(struct.pack("<I", 1))
return hm.digest() == stored_hmac
def main():
print("=" * 60)
print(" 提取 Linux 微信数据库密钥(内存扫描)")
print("=" * 60)
# 1. 收集 DB 文件和 salt
db_files, salt_to_dbs = _collect_db_files()
if not db_files:
raise RuntimeError(f"{DB_DIR} 未找到可解密的 .db 文件")
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的 salt")
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
print(f" salt {salt_hex}: {', '.join(dbs)}")
# 2. 找到微信进程
pids = get_pids()
hex_re = re.compile(rb"x'([0-9a-fA-F]{64,192})'")
key_map = {} # salt_hex -> enc_key_hex
remaining_salts = set(salt_to_dbs.keys())
all_hex_matches = 0
t0 = time.time()
for pid, rss_kb in pids:
try:
regions = _get_readable_regions(pid)
except PermissionError:
print(f"[WARN] 无法读取 /proc/{pid}/maps权限不足跳过")
continue
total_bytes = sum(s for _, s in regions)
total_mb = total_bytes / 1024 / 1024
print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)")
scanned_bytes = 0
try:
mem = open(f"/proc/{pid}/mem", "rb")
except PermissionError:
print(f"[WARN] 无法打开 /proc/{pid}/mem权限不足跳过")
continue
try:
for reg_idx, (base, size) in enumerate(regions):
try:
mem.seek(base)
data = mem.read(size)
except (OSError, ValueError):
continue
scanned_bytes += len(data)
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base + m.start()
all_hex_matches += 1
hex_len = len(hex_str)
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and _verify_enc_key(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len == 64:
if not remaining_salts:
continue
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db in remaining_salts and _verify_enc_key(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
remaining_salts.discard(salt_hex_db)
dbs = salt_to_dbs[salt_hex_db]
print(f"\n [FOUND] salt={salt_hex_db}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and _verify_enc_key(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
print(
f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s"
)
finally:
mem.close()
if not remaining_salts:
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
break
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex 模式")
# 交叉验证:用已找到的 key 尝试未匹配的 salt
missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys())
if missing_salts and key_map:
print(f"\n还有 {len(missing_salts)} 个 salt 未匹配,尝试交叉验证...")
for salt_hex in list(missing_salts):
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
for known_salt, known_key_hex in key_map.items():
enc_key = bytes.fromhex(known_key_hex)
if _verify_enc_key(enc_key, page1):
key_map[salt_hex] = known_key_hex
print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}")
missing_salts.discard(salt_hex)
break
# 输出结果
print(f"\n{'=' * 60}")
print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥")
result = {}
for rel, path, sz, salt_hex, page1 in db_files:
if salt_hex in key_map:
result[rel] = {
"enc_key": key_map[salt_hex],
"salt": salt_hex,
"size_mb": round(sz / 1024 / 1024, 1)
}
print(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)")
else:
print(f" MISSING: {rel} (salt={salt_hex})")
if not result:
print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)")
raise RuntimeError("未能从任何微信进程中提取到密钥")
result["_db_dir"] = DB_DIR
result["_platform"] = "linux"
result["_key_source"] = "memory_scan"
with open(OUT_FILE, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"\n密钥保存到: {OUT_FILE}")
missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map]
if missing:
print(f"\n未找到密钥的数据库:")
for rel in missing:
print(f" {rel}")
if __name__ == "__main__":
try:
main()
except RuntimeError as exc:
print(f"\n[ERROR] {exc}")
sys.exit(1)

View File

@ -0,0 +1,276 @@
"""
从微信进程内存中提取所有数据库的缓存raw key
WCDB为每个DB缓存: x'<64hex_enc_key><32hex_salt>'
salt嵌在hex字符串中可以直接匹配DB文件的salt
"""
import ctypes
import ctypes.wintypes as wt
import struct, os, sys, hashlib, time, re, json
import hmac as hmac_mod
from Crypto.Cipher import AES
import functools
print = functools.partial(print, flush=True)
kernel32 = ctypes.windll.kernel32
MEM_COMMIT = 0x1000
READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80}
PAGE_SZ = 4096
KEY_SZ = 32
SALT_SZ = 16
from config import load_config
_cfg = load_config()
DB_DIR = _cfg["db_dir"]
OUT_FILE = _cfg["keys_file"]
class MBI(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64),
("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD),
("RegionSize", ctypes.c_uint64), ("State", wt.DWORD),
("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD),
]
def get_pids():
"""返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序"""
import subprocess
r = subprocess.run(["tasklist", "/FI", "IMAGENAME eq Weixin.exe", "/FO", "CSV", "/NH"],
capture_output=True, text=True)
pids = []
for line in r.stdout.strip().split('\n'):
if not line.strip():
continue
p = line.strip('"').split('","')
if len(p) >= 5:
pid = int(p[1])
mem = int(p[4].replace(',', '').replace(' K', '').strip() or '0')
pids.append((pid, mem))
if not pids:
raise RuntimeError("Weixin.exe 未运行")
pids.sort(key=lambda x: x[1], reverse=True)
for pid, mem in pids:
print(f"[+] Weixin.exe PID={pid} ({mem // 1024}MB)")
return pids
def read_mem(h, addr, sz):
buf = ctypes.create_string_buffer(sz)
n = ctypes.c_size_t(0)
if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)):
return buf.raw[:n.value]
return None
def enum_regions(h):
regs = []
addr = 0
mbi = MBI()
while addr < 0x7FFFFFFFFFFF:
if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)) == 0:
break
if mbi.State == MEM_COMMIT and mbi.Protect in READABLE and 0 < mbi.RegionSize < 500 * 1024 * 1024:
regs.append((mbi.BaseAddress, mbi.RegionSize))
nxt = mbi.BaseAddress + mbi.RegionSize
if nxt <= addr:
break
addr = nxt
return regs
def verify_key_for_db(enc_key, db_page1):
"""验证enc_key是否能解密这个DB的page 1"""
salt = db_page1[:SALT_SZ]
# HMAC验证 (最可靠)
mac_salt = bytes(b ^ 0x3a for b in salt)
mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ)
hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16]
stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ]
h = hmac_mod.new(mac_key, hmac_data, hashlib.sha512)
h.update(struct.pack('<I', 1))
return h.digest() == stored_hmac
def main():
print("=" * 60)
print(" 提取所有微信数据库密钥")
print("=" * 60)
# 1. 收集所有DB文件及其salt
db_files = []
salt_to_dbs = {}
for root, dirs, files in os.walk(DB_DIR):
for f in files:
if f.endswith('.db') and not f.endswith('-wal') and not f.endswith('-shm'):
path = os.path.join(root, f)
rel = os.path.relpath(path, DB_DIR)
sz = os.path.getsize(path)
if sz < PAGE_SZ:
continue
with open(path, 'rb') as fh:
page1 = fh.read(PAGE_SZ)
salt = page1[:SALT_SZ].hex()
db_files.append((rel, path, sz, salt, page1))
salt_to_dbs.setdefault(salt, []).append(rel)
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的salt")
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
print(f" salt {salt_hex}: {', '.join(dbs)}")
# 2. 打开所有微信进程
pids = get_pids()
hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'")
key_map = {}
remaining_salts = set(salt_to_dbs.keys())
all_hex_matches = 0
t0 = time.time()
for pid, mem in pids:
h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid)
if not h:
print(f"[WARN] 无法打开进程 PID={pid},跳过")
continue
try:
regions = enum_regions(h)
total_bytes = sum(s for _, s in regions)
total_mb = total_bytes / 1024 / 1024
print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)")
scanned_bytes = 0
for reg_idx, (base, size) in enumerate(regions):
data = read_mem(h, base, size)
scanned_bytes += size
if not data:
continue
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base + m.start()
all_hex_matches += 1
hex_len = len(hex_str)
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len == 64:
if not remaining_salts:
continue
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db in remaining_salts and verify_key_for_db(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
remaining_salts.discard(salt_hex_db)
dbs = salt_to_dbs[salt_hex_db]
print(f"\n [FOUND] salt={salt_hex_db}")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and verify_key_for_db(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print(f" enc_key={enc_key_hex}")
print(f" PID={pid} 地址: 0x{addr:016X}")
print(f" 数据库: {', '.join(dbs)}")
break
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
print(
f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s"
)
finally:
kernel32.CloseHandle(h)
if not remaining_salts:
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
break
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式")
missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys())
if missing_salts and key_map:
print(f"\n还有 {len(missing_salts)} 个salt未匹配尝试交叉验证...")
for salt_hex in list(missing_salts):
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
for known_salt, known_key_hex in key_map.items():
enc_key = bytes.fromhex(known_key_hex)
if verify_key_for_db(enc_key, page1):
key_map[salt_hex] = known_key_hex
print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}")
missing_salts.discard(salt_hex)
break
print(f"\n{'=' * 60}")
print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥")
result = {}
for rel, path, sz, salt_hex, page1 in db_files:
if salt_hex in key_map:
result[rel] = {
"enc_key": key_map[salt_hex],
"salt": salt_hex,
"size_mb": round(sz / 1024 / 1024, 1)
}
print(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)")
else:
print(f" MISSING: {rel} (salt={salt_hex})")
if not result:
print(f"\n[!] 未提取到任何密钥,保留已有的 {OUT_FILE}(如存在)")
raise RuntimeError("未能从任何微信进程中提取到密钥")
result["_db_dir"] = DB_DIR
with open(OUT_FILE, 'w') as f:
json.dump(result, f, indent=2)
print(f"\n密钥保存到: {OUT_FILE}")
missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map]
if missing:
print(f"\n未找到密钥的数据库:")
for rel in missing:
print(f" {rel}")
if __name__ == '__main__':
try:
main()
except RuntimeError as e:
print(f"\n[ERROR] {e}")
sys.exit(1)

29
key_utils.py 100644
View File

@ -0,0 +1,29 @@
import os
def strip_key_metadata(keys):
"""移除 all_keys.json 中以下划线开头的元数据字段。"""
return {k: v for k, v in keys.items() if not k.startswith("_")}
def key_path_variants(rel_path):
"""生成同一路径的多种分隔符表示,兼容 Windows/Linux JSON key。"""
normalized = rel_path.replace("\\", "/")
variants = []
for candidate in (
rel_path,
normalized,
normalized.replace("/", "\\"),
normalized.replace("/", os.sep),
):
if candidate not in variants:
variants.append(candidate)
return variants
def get_key_info(keys, rel_path):
"""按相对路径查找数据库密钥,自动兼容不同平台分隔符。"""
for candidate in key_path_variants(rel_path):
if candidate in keys and not candidate.startswith("_"):
return keys[candidate]
return None

View File

@ -11,6 +11,8 @@ import sys
import functools import functools
print = functools.partial(print, flush=True) print = functools.partial(print, flush=True)
from key_utils import strip_key_metadata
def check_wechat_running(): def check_wechat_running():
"""检查微信是否在运行,返回 True/False""" """检查微信是否在运行,返回 True/False"""
@ -37,6 +39,7 @@ def ensure_keys(keys_file, db_dir):
print(f" 旧: {saved_dir}") print(f" 旧: {saved_dir}")
print(f" 新: {db_dir}") print(f" 新: {db_dir}")
keys = {} keys = {}
keys = strip_key_metadata(keys)
if keys: if keys:
print(f"[+] 已有 {len(keys)} 个数据库密钥") print(f"[+] 已有 {len(keys)} 个数据库密钥")
return return
@ -60,7 +63,7 @@ def ensure_keys(keys_file, db_dir):
keys = json.load(f) keys = json.load(f)
except (json.JSONDecodeError, ValueError): except (json.JSONDecodeError, ValueError):
keys = {} keys = {}
if not keys: if not strip_key_metadata(keys):
print("[!] 未能提取到任何密钥") print("[!] 未能提取到任何密钥")
print(" 可能原因:选择了错误的微信数据目录,或微信需要重启") print(" 可能原因:选择了错误的微信数据目录,或微信需要重启")
print(" 请检查 config.json 中的 db_dir 是否与当前登录的微信账号匹配") print(" 请检查 config.json 中的 db_dir 是否与当前登录的微信账号匹配")
@ -79,7 +82,7 @@ def main():
# 2. 检查微信进程 # 2. 检查微信进程
if not check_wechat_running(): if not check_wechat_running():
print("[!] 未检测到微信进程 (Weixin.exe)") print(f"[!] 未检测到微信进程 ({cfg.get('wechat_process', 'WeChat')})")
print(" 请先启动微信并登录,然后重新运行") print(" 请先启动微信并登录,然后重新运行")
sys.exit(1) sys.exit(1)
print("[+] 微信进程运行中") print("[+] 微信进程运行中")

View File

@ -10,8 +10,9 @@ import hmac as hmac_mod
from datetime import datetime from datetime import datetime
from Crypto.Cipher import AES from Crypto.Cipher import AES
from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import FastMCP
import zstandard as zstd import zstandard as zstd
from decode_image import ImageResolver from decode_image import ImageResolver
from key_utils import get_key_info, key_path_variants, strip_key_metadata
# ============ 加密常量 ============ # ============ 加密常量 ============
PAGE_SZ = 4096 PAGE_SZ = 4096
@ -49,8 +50,8 @@ if not DECODED_IMAGE_DIR:
elif not os.path.isabs(DECODED_IMAGE_DIR): elif not os.path.isabs(DECODED_IMAGE_DIR):
DECODED_IMAGE_DIR = os.path.join(SCRIPT_DIR, DECODED_IMAGE_DIR) DECODED_IMAGE_DIR = os.path.join(SCRIPT_DIR, DECODED_IMAGE_DIR)
with open(KEYS_FILE) as f: with open(KEYS_FILE) as f:
ALL_KEYS = json.load(f) ALL_KEYS = strip_key_metadata(json.load(f))
# ============ 解密函数 ============ # ============ 解密函数 ============
@ -149,7 +150,7 @@ class DBCache:
tmp_path = info["path"] tmp_path = info["path"]
if not os.path.exists(tmp_path): if not os.path.exists(tmp_path):
continue continue
rel_path = rel_key.replace('/', os.sep) rel_path = rel_key.replace('\\', os.sep)
db_path = os.path.join(DB_DIR, rel_path) db_path = os.path.join(DB_DIR, rel_path)
wal_path = db_path + "-wal" wal_path = db_path + "-wal"
try: try:
@ -174,12 +175,13 @@ class DBCache:
except OSError: except OSError:
pass pass
def get(self, rel_key): def get(self, rel_key):
if rel_key not in ALL_KEYS: key_info = get_key_info(ALL_KEYS, rel_key)
return None if not key_info:
rel_path = rel_key.replace('/', os.sep) return None
db_path = os.path.join(DB_DIR, rel_path) rel_path = rel_key.replace('\\', '/').replace('/', os.sep)
wal_path = db_path + "-wal" db_path = os.path.join(DB_DIR, rel_path)
wal_path = db_path + "-wal"
if not os.path.exists(db_path): if not os.path.exists(db_path):
return None return None
@ -195,8 +197,8 @@ class DBCache:
return c_path return c_path
tmp_path = self._cache_path(rel_key) tmp_path = self._cache_path(rel_key)
enc_key = bytes.fromhex(ALL_KEYS[rel_key]["enc_key"]) enc_key = bytes.fromhex(key_info["enc_key"])
full_decrypt(db_path, tmp_path, enc_key) full_decrypt(db_path, tmp_path, enc_key)
if os.path.exists(wal_path): if os.path.exists(wal_path):
decrypt_wal(wal_path, tmp_path, enc_key) decrypt_wal(wal_path, tmp_path, enc_key)
self._cache[rel_key] = (db_mtime, wal_mtime, tmp_path) self._cache[rel_key] = (db_mtime, wal_mtime, tmp_path)
@ -248,7 +250,7 @@ def get_contact_names():
pass pass
# 实时解密 # 实时解密
path = _cache.get("contact/contact.db") path = _cache.get("contact\\contact.db")
if path: if path:
try: try:
_contact_names, _contact_full = _load_contacts_from(path) _contact_names, _contact_full = _load_contacts_from(path)
@ -330,11 +332,12 @@ def _parse_message_content(content, local_type, is_group):
# 消息 DB 的 rel_keys排除 fts/resource/media/biz # 消息 DB 的 rel_keys排除 fts/resource/media/biz
MSG_DB_KEYS = sorted([ MSG_DB_KEYS = sorted([
k for k in ALL_KEYS k for k in ALL_KEYS
if k.startswith("message/message_") and k.endswith(".db") if any(v.startswith("message/") for v in key_path_variants(k))
and "fts" not in k and "resource" not in k and any(v.endswith(".db") for v in key_path_variants(k))
]) and "fts" not in k and "resource" not in k
])
def _find_msg_table_for_user(username): def _find_msg_table_for_user(username):
@ -379,7 +382,7 @@ def get_recent_sessions(limit: int = 20) -> str:
Args: Args:
limit: 返回的会话数量默认20 limit: 返回的会话数量默认20
""" """
path = _cache.get("session/session.db") path = _cache.get("session\\session.db")
if not path: if not path:
return "错误: 无法解密 session.db" return "错误: 无法解密 session.db"
@ -635,7 +638,7 @@ def get_new_messages() -> str:
"""获取自上次调用以来的新消息。首次调用返回最近的会话状态。""" """获取自上次调用以来的新消息。首次调用返回最近的会话状态。"""
global _last_check_state global _last_check_state
path = _cache.get("session/session.db") path = _cache.get("session\\session.db")
if not path: if not path:
return "错误: 无法解密 session.db" return "错误: 无法解密 session.db"

View File

@ -7,8 +7,9 @@ session.db 包含每个聊天的最新消息摘要、发送者、时间戳
import hashlib, struct, os, sys, json, time, sqlite3, io import hashlib, struct, os, sys, json, time, sqlite3, io
import hmac as hmac_mod import hmac as hmac_mod
from datetime import datetime from datetime import datetime
from Crypto.Cipher import AES from Crypto.Cipher import AES
import zstandard as zstd import zstandard as zstd
from key_utils import get_key_info, strip_key_metadata
_zstd_dctx = zstd.ZstdDecompressor() _zstd_dctx = zstd.ZstdDecompressor()
@ -148,13 +149,13 @@ def main():
print("=" * 60) print("=" * 60)
# 加载密钥 # 加载密钥
with open(KEYS_FILE) as f: with open(KEYS_FILE) as f:
keys = json.load(f) keys = strip_key_metadata(json.load(f))
session_key_info = keys.get("session/session.db") session_key_info = get_key_info(keys, os.path.join("session", "session.db"))
if not session_key_info: if not session_key_info:
print("[ERROR] 找不到session.db的密钥") print("[ERROR] 找不到session.db的密钥")
sys.exit(1) sys.exit(1)
enc_key = bytes.fromhex(session_key_info["enc_key"]) enc_key = bytes.fromhex(session_key_info["enc_key"])
session_db = os.path.join(DB_DIR, "session", "session.db") session_db = os.path.join(DB_DIR, "session", "session.db")

View File

@ -13,10 +13,11 @@ from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn from socketserver import ThreadingMixIn
from Crypto.Cipher import AES from Crypto.Cipher import AES
import urllib.parse import urllib.parse
import glob as glob_mod import glob as glob_mod
import zstandard as zstd import zstandard as zstd
from decode_image import extract_md5_from_packed_info, decrypt_dat_file, is_v2_format from decode_image import extract_md5_from_packed_info, decrypt_dat_file, is_v2_format
from key_utils import get_key_info, strip_key_metadata
_zstd_dctx = zstd.ZstdDecompressor() _zstd_dctx = zstd.ZstdDecompressor()
@ -58,14 +59,14 @@ _emoji_lookup_lock = threading.Lock()
_emoji_keys_dict = None # 保存 keys 引用供刷新用 _emoji_keys_dict = None # 保存 keys 引用供刷新用
_emoji_last_refresh = 0 _emoji_last_refresh = 0
def _build_emoji_lookup(keys_dict): def _build_emoji_lookup(keys_dict):
"""从 emoticon.db 构建 emoji md5 → URL 映射(直接解密,不走 cache""" """从 emoticon.db 构建 emoji md5 → URL 映射(直接解密,不走 cache"""
global _emoji_lookup, _emoji_keys_dict, _emoji_last_refresh global _emoji_lookup, _emoji_keys_dict, _emoji_last_refresh
_emoji_keys_dict = keys_dict _emoji_keys_dict = keys_dict
key_info = keys_dict.get("emoticon/emoticon.db") key_info = get_key_info(keys_dict, os.path.join("emoticon", "emoticon.db"))
if not key_info: if not key_info:
print("[emoji] 无 emoticon.db key跳过", flush=True) print("[emoji] 无 emoticon.db key跳过", flush=True)
return return
src = os.path.join(DB_DIR, "emoticon", "emoticon.db") src = os.path.join(DB_DIR, "emoticon", "emoticon.db")
if not os.path.exists(src): if not os.path.exists(src):
@ -252,17 +253,18 @@ class MonitorDBCache:
with lock: with lock:
self._state.pop(rel_key, None) self._state.pop(rel_key, None)
def get(self, rel_key): def get(self, rel_key):
"""返回解密后的临时文件路径mtime 变化时自动重新解密""" """返回解密后的临时文件路径mtime 变化时自动重新解密"""
if rel_key not in self.keys: key_info = get_key_info(self.keys, rel_key)
return None if not key_info:
return None
lock = self._get_lock(rel_key)
with lock: lock = self._get_lock(rel_key)
enc_key = bytes.fromhex(self.keys[rel_key]["enc_key"]) with lock:
rel_path = rel_key.replace('/', os.sep) enc_key = bytes.fromhex(key_info["enc_key"])
db_path = os.path.join(DB_DIR, rel_path) rel_path = rel_key.replace('\\', '/').replace('/', os.sep)
wal_path = db_path + "-wal" db_path = os.path.join(DB_DIR, rel_path)
wal_path = db_path + "-wal"
if not os.path.exists(db_path): if not os.path.exists(db_path):
return None return None
@ -273,8 +275,8 @@ class MonitorDBCache:
except OSError: except OSError:
return None return None
out_name = rel_key.replace('/', '_') out_name = rel_key.replace('\\', '_').replace('/', '_')
out_path = os.path.join(self.tmp_dir, out_name) out_path = os.path.join(self.tmp_dir, out_name)
prev = self._state.get(rel_key) prev = self._state.get(rel_key)
@ -313,7 +315,7 @@ def build_username_db_map():
# 先获取每个 DB 的 mtime 用于排序 # 先获取每个 DB 的 mtime 用于排序
db_mtimes = {} db_mtimes = {}
for i in range(5): for i in range(5):
rel_key = f"message/message_{i}.db" rel_key = f"message\\message_{i}.db"
db_path = os.path.join(DB_DIR, "message", f"message_{i}.db") db_path = os.path.join(DB_DIR, "message", f"message_{i}.db")
try: try:
db_mtimes[rel_key] = os.path.getmtime(db_path) db_mtimes[rel_key] = os.path.getmtime(db_path)
@ -326,7 +328,7 @@ def build_username_db_map():
db_path = os.path.join(decrypted_msg_dir, f"message_{i}.db") db_path = os.path.join(decrypted_msg_dir, f"message_{i}.db")
if not os.path.exists(db_path): if not os.path.exists(db_path):
continue continue
rel_key = f"message/message_{i}.db" rel_key = f"message\\message_{i}.db"
try: try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
for row in conn.execute("SELECT user_name FROM Name2Id").fetchall(): for row in conn.execute("SELECT user_name FROM Name2Id").fetchall():
@ -595,7 +597,7 @@ class SessionMonitor:
# local_id 不全局唯一,需要同时匹配 create_time # local_id 不全局唯一,需要同时匹配 create_time
file_md5 = None file_md5 = None
for _try in range(2): for _try in range(2):
res_path = self.db_cache.get("message/message_resource.db") res_path = self.db_cache.get("message\\message_resource.db")
if not res_path: if not res_path:
return None return None
try: try:
@ -620,7 +622,7 @@ class SessionMonitor:
except Exception as e: except Exception as e:
if 'malformed' in str(e) and _try == 0: if 'malformed' in str(e) and _try == 0:
print(f" [img] resource DB malformed, 强制刷新...", flush=True) print(f" [img] resource DB malformed, 强制刷新...", flush=True)
self.db_cache.invalidate("message/message_resource.db") self.db_cache.invalidate("message\\message_resource.db")
continue continue
print(f" [img] 查询 message_resource 失败: {e}", flush=True) print(f" [img] 查询 message_resource 失败: {e}", flush=True)
return None return None
@ -751,13 +753,14 @@ class SessionMonitor:
if attempt < 2: if attempt < 2:
time.sleep(delays[attempt]) time.sleep(delays[attempt])
def _fresh_decrypt_query(self, db_key, table_name, prev_ts, curr_ts): def _fresh_decrypt_query(self, db_key, table_name, prev_ts, curr_ts):
"""独立解密 message DB 到临时文件并查询,避免共享缓存竞态""" """独立解密 message DB 到临时文件并查询,避免共享缓存竞态"""
if db_key not in self.db_cache.keys: key_info = get_key_info(self.db_cache.keys, db_key)
return [] if not key_info:
enc_key = bytes.fromhex(self.db_cache.keys[db_key]["enc_key"]) return []
rel_path = db_key.replace('/', os.sep) enc_key = bytes.fromhex(key_info["enc_key"])
db_path = os.path.join(DB_DIR, rel_path) rel_path = db_key.replace('\\', '/').replace('/', os.sep)
db_path = os.path.join(DB_DIR, rel_path)
wal_path = db_path + "-wal" wal_path = db_path + "-wal"
if not os.path.exists(db_path): if not os.path.exists(db_path):
return [] return []
@ -1872,13 +1875,17 @@ class ThreadedServer(ThreadingMixIn, HTTPServer):
def main(): def main():
print("=" * 60, flush=True) print("=" * 60, flush=True)
print(" 微信实时监听 (WAL增量 + SSE推送)", flush=True) print(" 微信实时监听 (WAL增量 + SSE推送)", flush=True)
print("=" * 60, flush=True) print("=" * 60, flush=True)
with open(KEYS_FILE) as f: with open(KEYS_FILE) as f:
keys = json.load(f) keys = strip_key_metadata(json.load(f))
enc_key = bytes.fromhex(keys["session/session.db"]["enc_key"]) session_key_info = get_key_info(keys, os.path.join("session", "session.db"))
session_db = os.path.join(DB_DIR, "session", "session.db") if not session_key_info:
print("[ERROR] 找不到 session.db 的密钥", flush=True)
sys.exit(1)
enc_key = bytes.fromhex(session_key_info["enc_key"])
session_db = os.path.join(DB_DIR, "session", "session.db")
print("加载联系人...", flush=True) print("加载联系人...", flush=True)
contact_names = load_contact_names() contact_names = load_contact_names()
@ -1909,12 +1916,12 @@ def main():
# 后台预热所有 message DB图片/emoji 解密必需) # 后台预热所有 message DB图片/emoji 解密必需)
def _warmup(): def _warmup():
try: try:
t0 = time.perf_counter() t0 = time.perf_counter()
warmup_keys = ["message/message_resource.db"] warmup_keys = ["message\\message_resource.db"]
for i in range(5): for i in range(5):
k = f"message/message_{i}.db" k = f"message\\message_{i}.db"
if k in keys: if get_key_info(keys, k):
warmup_keys.append(k) warmup_keys.append(k)
for k in warmup_keys: for k in warmup_keys:
t1 = time.perf_counter() t1 = time.perf_counter()
try: try: