From 4c91eb34ef1b52ef92774f5de920e06227daf80e Mon Sep 17 00:00:00 2001 From: joshua-deng Date: Sat, 28 Feb 2026 12:03:38 +0800 Subject: [PATCH] WeChat 4.0 database decryptor and real-time message monitor Extract encryption keys from Weixin.exe process memory, decrypt all SQLCipher 4 databases, and monitor new messages via Web UI with ~100ms latency. --- .gitignore | 24 ++ README.md | 122 ++++++++++ config.example.json | 6 + config.py | 36 +++ decrypt_db.py | 184 +++++++++++++++ find_all_keys.py | 255 ++++++++++++++++++++ latency_test.py | 175 ++++++++++++++ monitor.py | 251 ++++++++++++++++++++ monitor_web.py | 551 ++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 1604 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config.example.json create mode 100644 config.py create mode 100644 decrypt_db.py create mode 100644 find_all_keys.py create mode 100644 latency_test.py create mode 100644 monitor.py create mode 100644 monitor_web.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e4d09eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# Decrypted databases and keys - NEVER upload +all_keys.json +wechat_key.txt +config.json +decrypted/ +*.db +*.db-shm +*.db-wal +*.db.tmp_monitor + +# Hook outputs +hook_output.txt +hook_start_output.txt +hook_stderr.txt +run_hook.bat + +# Python +__pycache__/ +*.py[cod] +*.egg-info/ + +# OS +.DS_Store +Thumbs.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..0ef3e3c --- /dev/null +++ b/README.md @@ -0,0 +1,122 @@ +# WeChat 4.0 Database Decryptor + +微信 4.0 (Windows) 本地数据库解密工具。从运行中的微信进程内存提取加密密钥,解密所有 SQLCipher 4 加密数据库,并提供实时消息监听。 + +## 原理 + +微信 4.0 使用 SQLCipher 4 加密本地数据库: +- **加密算法**: AES-256-CBC + HMAC-SHA512 +- **KDF**: PBKDF2-HMAC-SHA512, 256,000 iterations +- **页面大小**: 4096 bytes, reserve = 80 (IV 16 + HMAC 64) +- **每个数据库有独立的 salt 和 enc_key** + +WCDB (微信的 SQLCipher 封装) 会在进程内存中缓存派生后的 raw key,格式为 `x'<64hex_enc_key><32hex_salt>'`。本工具通过扫描进程内存中的这种模式,匹配数据库文件的 salt,并通过 HMAC 验证来提取正确的密钥。 + +## 使用方法 + +### 环境要求 + +- Windows 10/11 +- Python 3.10+ +- 微信 4.0 (正在运行) +- 需要管理员权限 (读取进程内存) + +### 安装依赖 + +```bash +pip install pycryptodome +``` + +### 1. 配置 + +复制配置模板并修改: + +```bash +copy config.example.json config.json +``` + +编辑 `config.json`: +```json +{ + "db_dir": "D:\\xwechat_files\\你的微信ID\\db_storage", + "keys_file": "all_keys.json", + "decrypted_dir": "decrypted", + "wechat_process": "Weixin.exe" +} +``` + +`db_dir` 路径可以在 微信设置 → 文件管理 中找到。 + +### 2. 提取密钥 + +确保微信正在运行,以**管理员权限**运行: + +```bash +python find_all_keys.py +``` + +密钥将保存到 `all_keys.json`。 + +### 3. 解密数据库 + +```bash +python decrypt_db.py +``` + +解密后的数据库保存在 `decrypted/` 目录,可以直接用 SQLite 工具打开。 + +### 4. 实时消息监听 + +#### Web UI (推荐) + +```bash +python monitor_web.py +``` + +打开 http://localhost:5678 查看实时消息流。 + +- 30ms 轮询 WAL 文件变化 (mtime) +- 检测到变化后全量解密 + WAL patch (~70ms) +- SSE 实时推送到浏览器 +- 总延迟约 100ms + +#### 命令行 + +```bash +python monitor.py +``` + +每 3 秒轮询一次,在终端显示新消息。 + +## 文件说明 + +| 文件 | 说明 | +|------|------| +| `config.py` | 配置加载器 | +| `find_all_keys.py` | 从微信进程内存提取所有数据库密钥 | +| `decrypt_db.py` | 全量解密所有数据库 | +| `monitor_web.py` | 实时消息监听 (Web UI + SSE) | +| `monitor.py` | 实时消息监听 (命令行) | +| `latency_test.py` | 延迟测量诊断工具 | + +## 技术细节 + +### WAL 处理 + +微信使用 SQLite WAL 模式,WAL 文件是**预分配固定大小** (4MB)。检测变化时: +- 不能用文件大小 (永远不变) +- 使用 mtime 检测写入 +- 解密 WAL frame 时需校验 salt 值,跳过旧周期遗留的 frame + +### 数据库结构 + +解密后包含约 26 个数据库: +- `session/session.db` - 会话列表 (最新消息摘要) +- `message/message_*.db` - 聊天记录 +- `contact/contact.db` - 联系人 +- `media_*/media_*.db` - 媒体文件索引 +- 其他: head_image, favorite, sns, emoticon 等 + +## 免责声明 + +本工具仅用于学习和研究目的,用于解密**自己的**微信数据。请遵守相关法律法规,不要用于未经授权的数据访问。 diff --git a/config.example.json b/config.example.json new file mode 100644 index 0000000..3670993 --- /dev/null +++ b/config.example.json @@ -0,0 +1,6 @@ +{ + "db_dir": "D:\\xwechat_files\\your_wxid\\db_storage", + "keys_file": "all_keys.json", + "decrypted_dir": "decrypted", + "wechat_process": "Weixin.exe" +} diff --git a/config.py b/config.py new file mode 100644 index 0000000..ef1828a --- /dev/null +++ b/config.py @@ -0,0 +1,36 @@ +""" +配置加载器 - 从 config.json 读取路径配置 +首次运行时自动生成 config.json 模板 +""" +import json +import os +import sys + +CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json") + +_DEFAULT = { + "db_dir": r"D:\xwechat_files\your_wxid\db_storage", + "keys_file": "all_keys.json", + "decrypted_dir": "decrypted", + "wechat_process": "Weixin.exe", +} + + +def load_config(): + if not os.path.exists(CONFIG_FILE): + with open(CONFIG_FILE, "w") as f: + json.dump(_DEFAULT, f, indent=4) + print(f"[!] 已生成配置文件: {CONFIG_FILE}") + print(" 请修改 config.json 中的路径后重新运行") + sys.exit(1) + + with open(CONFIG_FILE) as f: + cfg = json.load(f) + + # 将相对路径转为绝对路径 + base = os.path.dirname(os.path.abspath(__file__)) + for key in ("keys_file", "decrypted_dir"): + if key in cfg and not os.path.isabs(cfg[key]): + cfg[key] = os.path.join(base, cfg[key]) + + return cfg diff --git a/decrypt_db.py b/decrypt_db.py new file mode 100644 index 0000000..9ac39e5 --- /dev/null +++ b/decrypt_db.py @@ -0,0 +1,184 @@ +""" +WeChat 4.0 数据库解密器 + +使用从进程内存提取的per-DB enc_key解密SQLCipher 4加密的数据库 +参数: SQLCipher 4, AES-256-CBC, HMAC-SHA512, reserve=80, page_size=4096 +密钥来源: all_keys.json (由find_all_keys.py从内存提取) +""" +import hashlib, struct, os, sys, json +import hmac as hmac_mod +from Crypto.Cipher import AES + +import functools +print = functools.partial(print, flush=True) + +PAGE_SZ = 4096 +KEY_SZ = 32 +SALT_SZ = 16 +IV_SZ = 16 +HMAC_SZ = 64 +RESERVE_SZ = 80 # IV(16) + HMAC(64) +SQLITE_HDR = b'SQLite format 3\x00' + +from config import load_config +_cfg = load_config() +DB_DIR = _cfg["db_dir"] +OUT_DIR = _cfg["decrypted_dir"] +KEYS_FILE = _cfg["keys_file"] + + +def derive_mac_key(enc_key, salt): + """从enc_key派生HMAC密钥""" + mac_salt = bytes(b ^ 0x3a for b in salt) + return hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ) + + +def decrypt_page(enc_key, page_data, pgno): + """解密单个页面,输出4096字节的标准SQLite页面""" + iv = page_data[PAGE_SZ - RESERVE_SZ : PAGE_SZ - RESERVE_SZ + IV_SZ] + + if pgno == 1: + encrypted = page_data[SALT_SZ : PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + page = bytearray(SQLITE_HDR + decrypted + b'\x00' * RESERVE_SZ) + # 保留 reserve=80, B-tree 基于 usable_size=4016 构建 + return bytes(page) + else: + encrypted = page_data[:PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + return decrypted + b'\x00' * RESERVE_SZ + + +def decrypt_database(db_path, out_path, enc_key): + """解密整个数据库文件""" + file_size = os.path.getsize(db_path) + total_pages = file_size // PAGE_SZ + + if file_size % PAGE_SZ != 0: + print(f" [WARN] 文件大小 {file_size} 不是 {PAGE_SZ} 的倍数") + total_pages += 1 + + with open(db_path, 'rb') as fin: + page1 = fin.read(PAGE_SZ) + + if len(page1) < PAGE_SZ: + print(f" [ERROR] 文件太小") + return False + + # 提取salt并派生mac_key, 验证page 1 + salt = page1[:SALT_SZ] + mac_key = derive_mac_key(enc_key, salt) + p1_hmac_data = page1[SALT_SZ : PAGE_SZ - RESERVE_SZ + IV_SZ] + p1_stored_hmac = page1[PAGE_SZ - HMAC_SZ : PAGE_SZ] + hm = hmac_mod.new(mac_key, p1_hmac_data, hashlib.sha512) + hm.update(struct.pack(' 0: + page = page + b'\x00' * (PAGE_SZ - len(page)) + else: + break + + decrypted = decrypt_page(enc_key, page, pgno) + fout.write(decrypted) + + if pgno == 1: + if decrypted[:16] != SQLITE_HDR: + print(f" [WARN] 解密后header不匹配!") + + if pgno % 10000 == 0: + print(f" 进度: {pgno}/{total_pages} ({100*pgno/total_pages:.1f}%)") + + return True + + +def main(): + print("=" * 60) + print(" WeChat 4.0 数据库解密器") + print("=" * 60) + + # 加载密钥 + if not os.path.exists(KEYS_FILE): + print(f"[ERROR] 密钥文件不存在: {KEYS_FILE}") + print("请先运行 find_all_keys.py") + sys.exit(1) + + with open(KEYS_FILE) as f: + keys = json.load(f) + + print(f"\n加载 {len(keys)} 个数据库密钥") + print(f"输出目录: {OUT_DIR}") + os.makedirs(OUT_DIR, exist_ok=True) + + # 收集所有DB文件 + db_files = [] + for root, dirs, files in os.walk(DB_DIR): + for f in files: + if f.endswith('.db') and not f.endswith('-wal') and not f.endswith('-shm'): + path = os.path.join(root, f) + rel = os.path.relpath(path, DB_DIR) + sz = os.path.getsize(path) + db_files.append((rel, path, sz)) + + db_files.sort(key=lambda x: x[2]) # 从小到大 + + print(f"找到 {len(db_files)} 个数据库文件\n") + + success = 0 + failed = 0 + total_bytes = 0 + + for rel, path, sz in db_files: + # 用反斜杠格式查找key (json中的key是Windows路径) + rel_key = rel.replace('/', '\\') + if rel_key not in keys: + print(f"SKIP: {rel} (无密钥)") + failed += 1 + continue + + enc_key = bytes.fromhex(keys[rel_key]["enc_key"]) + out_path = os.path.join(OUT_DIR, rel) + + print(f"解密: {rel} ({sz/1024/1024:.1f}MB) ...", end=" ") + + ok = decrypt_database(path, out_path, enc_key) + if ok: + # SQLite验证 + try: + import sqlite3 + conn = sqlite3.connect(out_path) + tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + conn.close() + table_names = [t[0] for t in tables] + print(f" OK! 表: {', '.join(table_names[:5])}", end="") + if len(table_names) > 5: + print(f" ...共{len(table_names)}个", end="") + print() + success += 1 + total_bytes += sz + except Exception as e: + print(f" [WARN] SQLite验证失败: {e}") + failed += 1 + else: + failed += 1 + + print(f"\n{'='*60}") + print(f"结果: {success} 成功, {failed} 失败, 共 {len(db_files)} 个") + print(f"解密数据量: {total_bytes/1024/1024/1024:.1f}GB") + print(f"解密文件在: {OUT_DIR}") + + +if __name__ == '__main__': + main() diff --git a/find_all_keys.py b/find_all_keys.py new file mode 100644 index 0000000..7b1cec6 --- /dev/null +++ b/find_all_keys.py @@ -0,0 +1,255 @@ +""" +从微信进程内存中提取所有数据库的缓存raw key + +WCDB为每个DB缓存: x'<64hex_enc_key><32hex_salt>' +salt嵌在hex字符串中,可以直接匹配DB文件的salt +""" +import ctypes +import ctypes.wintypes as wt +import struct, os, sys, hashlib, time, re, json +import hmac as hmac_mod +from Crypto.Cipher import AES + +import functools +print = functools.partial(print, flush=True) + +kernel32 = ctypes.windll.kernel32 +MEM_COMMIT = 0x1000 +READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80} +PAGE_SZ = 4096 +KEY_SZ = 32 +SALT_SZ = 16 + +from config import load_config +_cfg = load_config() +DB_DIR = _cfg["db_dir"] +OUT_FILE = _cfg["keys_file"] + +class MBI(ctypes.Structure): + _fields_ = [ + ("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64), + ("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD), + ("RegionSize", ctypes.c_uint64), ("State", wt.DWORD), + ("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD), + ] + +def get_pid(): + import subprocess + r = subprocess.run(["tasklist","/FI","IMAGENAME eq Weixin.exe","/FO","CSV","/NH"], + capture_output=True, text=True) + best = (0,0) + for line in r.stdout.strip().split('\n'): + if not line.strip(): continue + p = line.strip('"').split('","') + if len(p)>=5: + pid=int(p[1]); mem=int(p[4].replace(',','').replace(' K','').strip() or '0') + if mem>best[1]: best=(pid,mem) + if not best[0]: print("[ERROR] Weixin.exe 未运行"); sys.exit(1) + print(f"[+] Weixin.exe PID={best[0]} ({best[1]//1024}MB)") + return best[0] + +def read_mem(h, addr, sz): + buf = ctypes.create_string_buffer(sz) + n = ctypes.c_size_t(0) + if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)): + return buf.raw[:n.value] + return None + +def enum_regions(h): + regs = [] + addr = 0 + mbi = MBI() + while addr < 0x7FFFFFFFFFFF: + if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi))==0: break + if mbi.State==MEM_COMMIT and mbi.Protect in READABLE and 0' 模式 + print(f"\n搜索 x'' 缓存密钥...") + hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'") + + # 结果: salt_hex -> enc_key_hex + key_map = {} + all_hex_matches = 0 + t0 = time.time() + + for reg_idx, (base, size) in enumerate(regions): + data = read_mem(h, base, size) + if not data: continue + + for m in hex_re.finditer(data): + hex_str = m.group(1).decode() + addr = base + m.start() + all_hex_matches += 1 + hex_len = len(hex_str) + + if hex_len == 96: + # enc_key(32bytes=64hex) + salt(16bytes=32hex) + enc_key_hex = hex_str[:64] + salt_hex = hex_str[64:] + + if salt_hex in salt_to_dbs and salt_hex not in key_map: + # 验证! + enc_key = bytes.fromhex(enc_key_hex) + # 找到对应的page1 + for rel, path, sz, s, page1 in db_files: + if s == salt_hex: + if verify_key_for_db(enc_key, page1): + key_map[salt_hex] = enc_key_hex + dbs = salt_to_dbs[salt_hex] + print(f"\n [FOUND] salt={salt_hex}") + print(f" enc_key={enc_key_hex}") + print(f" 地址: 0x{addr:016X}") + print(f" 数据库: {', '.join(dbs)}") + break + + elif hex_len == 64: + # 只有enc_key, 没有salt - 需要逐个DB试 + enc_key_hex = hex_str + enc_key = bytes.fromhex(enc_key_hex) + for rel, path, sz, salt_hex_db, page1 in db_files: + if salt_hex_db not in key_map: + if verify_key_for_db(enc_key, page1): + key_map[salt_hex_db] = enc_key_hex + dbs = salt_to_dbs[salt_hex_db] + print(f"\n [FOUND] salt={salt_hex_db}") + print(f" enc_key={enc_key_hex}") + print(f" 地址: 0x{addr:016X}") + print(f" 数据库: {', '.join(dbs)}") + break + + elif hex_len > 96 and hex_len % 2 == 0: + # 可能是 enc_key + hmac_key + salt 或其他格式 + # 取前64作为enc_key, 后32作为salt + enc_key_hex = hex_str[:64] + salt_hex = hex_str[-32:] + + if salt_hex in salt_to_dbs and salt_hex not in key_map: + enc_key = bytes.fromhex(enc_key_hex) + for rel, path, sz, s, page1 in db_files: + if s == salt_hex: + if verify_key_for_db(enc_key, page1): + key_map[salt_hex] = enc_key_hex + dbs = salt_to_dbs[salt_hex] + print(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})") + print(f" enc_key={enc_key_hex}") + print(f" 地址: 0x{addr:016X}") + print(f" 数据库: {', '.join(dbs)}") + break + + # 进度 + if (reg_idx + 1) % 200 == 0: + elapsed = time.time() - t0 + progress = sum(s for b,s in regions[:reg_idx+1]) / sum(s for _,s in regions) * 100 + print(f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, " + f"{all_hex_matches} hex patterns, {elapsed:.1f}s") + + elapsed = time.time() - t0 + print(f"\n扫描完成: {elapsed:.1f}s, {all_hex_matches} hex模式") + + # 4. 如果有未找到的salt,用已找到的key做交叉验证 + # (WCDB有时对同一passphrase的不同DB用同一enc_key,如果salt相同) + missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys()) + if missing_salts and key_map: + print(f"\n还有 {len(missing_salts)} 个salt未匹配,尝试交叉验证...") + for salt_hex in list(missing_salts): + for rel, path, sz, s, page1 in db_files: + if s == salt_hex: + for known_salt, known_key_hex in key_map.items(): + enc_key = bytes.fromhex(known_key_hex) + if verify_key_for_db(enc_key, page1): + key_map[salt_hex] = known_key_hex + print(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}") + missing_salts.discard(salt_hex) + break + + # 5. 输出结果 + print(f"\n{'='*60}") + print(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥") + + result = {} + for rel, path, sz, salt_hex, page1 in db_files: + if salt_hex in key_map: + result[rel] = { + "enc_key": key_map[salt_hex], + "salt": salt_hex, + "size_mb": round(sz/1024/1024, 1) + } + print(f" OK: {rel} ({sz/1024/1024:.1f}MB)") + else: + print(f" MISSING: {rel} (salt={salt_hex})") + + with open(OUT_FILE, 'w') as f: + json.dump(result, f, indent=2) + print(f"\n密钥保存到: {OUT_FILE}") + + missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map] + if missing: + print(f"\n未找到密钥的数据库:") + for rel in missing: + print(f" {rel}") + + kernel32.CloseHandle(h) + + +if __name__ == '__main__': + main() diff --git a/latency_test.py b/latency_test.py new file mode 100644 index 0000000..b1d4603 --- /dev/null +++ b/latency_test.py @@ -0,0 +1,175 @@ +"""测量消息延迟 - 用mtime检测WAL变化(WAL文件是预分配固定大小的)""" +import time, os, sys, io, hashlib, struct, sqlite3, json +from datetime import datetime +from Crypto.Cipher import AES + +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') + +PAGE_SZ = 4096; KEY_SZ = 32; SALT_SZ = 16; RESERVE_SZ = 80 +SQLITE_HDR = b'SQLite format 3\x00' +WAL_HEADER_SZ = 32; WAL_FRAME_HEADER_SZ = 24 + +from config import load_config +_cfg = load_config() +DB_DIR = _cfg["db_dir"] +KEYS_FILE = _cfg["keys_file"] +DECRYPTED = os.path.join(_cfg["decrypted_dir"], "session", "session.db") + +with open(KEYS_FILE) as f: + keys = json.load(f) +enc_key = bytes.fromhex(keys["session\\session.db"]["enc_key"]) + +session_db = os.path.join(DB_DIR, "session", "session.db") +wal_path = session_db + "-wal" + + +def decrypt_page(enc_key, page_data, pgno): + iv = page_data[PAGE_SZ - RESERVE_SZ: PAGE_SZ - RESERVE_SZ + 16] + if pgno == 1: + encrypted = page_data[SALT_SZ: PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + return bytearray(SQLITE_HDR + decrypted + b'\x00' * RESERVE_SZ) + else: + encrypted = page_data[:PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + return decrypted + b'\x00' * RESERVE_SZ + + +def full_decrypt(src, dst): + t0 = time.perf_counter() + total = os.path.getsize(src) // PAGE_SZ + with open(src, 'rb') as fin, open(dst, 'wb') as fout: + for pgno in range(1, total + 1): + page = fin.read(PAGE_SZ) + if len(page) < PAGE_SZ: break + fout.write(decrypt_page(enc_key, page, pgno)) + return total, (time.perf_counter() - t0) * 1000 + + +def decrypt_wal_full(wal_path, dst): + """解密WAL当前有效frame,patch到dst (校验salt跳过旧周期遗留frame)""" + t0 = time.perf_counter() + wal_sz = os.path.getsize(wal_path) + frame_size = WAL_FRAME_HEADER_SZ + PAGE_SZ + patched = 0 + + with open(wal_path, 'rb') as wf, open(dst, 'r+b') as df: + wal_hdr = wf.read(WAL_HEADER_SZ) + wal_salt1 = struct.unpack('>I', wal_hdr[16:20])[0] + wal_salt2 = struct.unpack('>I', wal_hdr[20:24])[0] + + while wf.tell() + frame_size <= wal_sz: + fh = wf.read(WAL_FRAME_HEADER_SZ) + if len(fh) < WAL_FRAME_HEADER_SZ: break + pgno = struct.unpack('>I', fh[0:4])[0] + frame_salt1 = struct.unpack('>I', fh[8:12])[0] + frame_salt2 = struct.unpack('>I', fh[12:16])[0] + ep = wf.read(PAGE_SZ) + if len(ep) < PAGE_SZ: break + if pgno == 0 or pgno > 1000000: continue + if frame_salt1 != wal_salt1 or frame_salt2 != wal_salt2: continue + dec = decrypt_page(enc_key, ep, pgno) + df.seek((pgno - 1) * PAGE_SZ) + df.write(dec) + patched += 1 + + return patched, (time.perf_counter() - t0) * 1000 + + +# 初始化: 全量解密 +print("初始全量解密...", flush=True) +pages, ms = full_decrypt(session_db, DECRYPTED) +print(f" DB: {pages}页 {ms:.0f}ms", flush=True) +if os.path.exists(wal_path): + patched, ms2 = decrypt_wal_full(wal_path, DECRYPTED) + print(f" WAL: {patched}页 {ms2:.0f}ms", flush=True) + +# 获取初始状态 +conn = sqlite3.connect(DECRYPTED) +prev_sessions = {} +for r in conn.execute("SELECT username, last_timestamp FROM SessionTable WHERE last_timestamp>0"): + prev_sessions[r[0]] = r[1] +conn.close() + +# 记录初始mtime +prev_wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0 +prev_db_mtime = os.path.getmtime(session_db) +wal_sz = os.path.getsize(wal_path) if os.path.exists(wal_path) else 0 + +print(f"\nWAL大小: {wal_sz} bytes (固定预分配)", flush=True) +print(f"跟踪 {len(prev_sessions)} 个会话", flush=True) +print(f"\n等待微信新消息... (60秒超时, 30ms轮询)\n", flush=True) + +start = time.time() + +while time.time() - start < 60: + time.sleep(0.03) + + # 用mtime检测变化 + try: + wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0 + db_mtime = os.path.getmtime(session_db) + except: + continue + + if wal_mtime == prev_wal_mtime and db_mtime == prev_db_mtime: + continue + + t_detect = time.perf_counter() + detect_str = datetime.now().strftime('%H:%M:%S.%f')[:-3] + + wal_changed = wal_mtime != prev_wal_mtime + db_changed = db_mtime != prev_db_mtime + print(f"[{detect_str}] 变化检测: WAL={'变' if wal_changed else '不变'} DB={'变' if db_changed else '不变'}", flush=True) + + # 如果DB变了(checkpoint), 全量重解密 + if db_changed and not wal_changed: + pages, ms = full_decrypt(session_db, DECRYPTED) + print(f" 全量解密: {pages}页 {ms:.0f}ms", flush=True) + else: + # WAL变了, 重新patch所有WAL frame (因为不知道哪些是新的) + # 先全量解密DB基础 + pages, ms = full_decrypt(session_db, DECRYPTED) + patched, ms2 = decrypt_wal_full(wal_path, DECRYPTED) + print(f" DB {pages}页/{ms:.0f}ms + WAL {patched}页/{ms2:.0f}ms", flush=True) + + t_decrypt = time.perf_counter() + + # 查询变化 + conn = sqlite3.connect(DECRYPTED) + new_msgs = [] + for r in conn.execute(""" + SELECT username, last_timestamp, summary, last_sender_display_name + FROM SessionTable WHERE last_timestamp > 0 + """): + uname, ts, summary, sender = r + if ts > prev_sessions.get(uname, 0): + delay = time.time() - ts + new_msgs.append((uname, ts, summary or '', sender or '', delay)) + prev_sessions[uname] = ts + conn.close() + + t_query = time.perf_counter() + + decrypt_ms = (t_decrypt - t_detect) * 1000 + query_ms = (t_query - t_decrypt) * 1000 + total_ms = (t_query - t_detect) * 1000 + + print(f" 处理总耗时: {total_ms:.1f}ms (解密{decrypt_ms:.1f}ms + 查询{query_ms:.1f}ms)", flush=True) + + for uname, ts, summary, sender, delay in sorted(new_msgs, key=lambda x: x[1]): + if ':\n' in summary: + summary = summary.split(':\n', 1)[1] + msg_time = datetime.fromtimestamp(ts).strftime('%H:%M:%S') + print(f" >>> 消息时间={msg_time} | 微信→DB延迟={delay:.1f}s | {sender}: {summary}", flush=True) + + if not new_msgs: + print(f" (无新消息变化)", flush=True) + + prev_wal_mtime = wal_mtime + prev_db_mtime = db_mtime + print(flush=True) + +print("超时退出", flush=True) diff --git a/monitor.py b/monitor.py new file mode 100644 index 0000000..efea0f8 --- /dev/null +++ b/monitor.py @@ -0,0 +1,251 @@ +""" +微信实时消息监听器 + +原理: 定期解密 session.db (2MB, <1秒), 检测新消息 +session.db 包含每个聊天的最新消息摘要、发送者、时间戳 +""" +import hashlib, struct, os, sys, json, time, sqlite3, io +import hmac as hmac_mod +from datetime import datetime +from Crypto.Cipher import AES + +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') + +import functools +print = functools.partial(print, flush=True) + +PAGE_SZ = 4096 +KEY_SZ = 32 +SALT_SZ = 16 +IV_SZ = 16 +HMAC_SZ = 64 +RESERVE_SZ = 80 +SQLITE_HDR = b'SQLite format 3\x00' + +from config import load_config +_cfg = load_config() +DB_DIR = _cfg["db_dir"] +KEYS_FILE = _cfg["keys_file"] +CONTACT_CACHE = os.path.join(_cfg["decrypted_dir"], "contact", "contact.db") + +POLL_INTERVAL = 3 # 秒 + + +def derive_mac_key(enc_key, salt): + mac_salt = bytes(b ^ 0x3a for b in salt) + return hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ) + + +def decrypt_page(enc_key, page_data, pgno): + iv = page_data[PAGE_SZ - RESERVE_SZ : PAGE_SZ - RESERVE_SZ + IV_SZ] + if pgno == 1: + encrypted = page_data[SALT_SZ : PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + page = bytearray(SQLITE_HDR + decrypted + b'\x00' * RESERVE_SZ) + return bytes(page) + else: + encrypted = page_data[:PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + return decrypted + b'\x00' * RESERVE_SZ + + +def decrypt_db_to_memory(db_path, enc_key): + """解密DB到内存中的bytes, 返回可用于sqlite3的数据""" + file_size = os.path.getsize(db_path) + total_pages = file_size // PAGE_SZ + if file_size % PAGE_SZ != 0: + total_pages += 1 + + chunks = [] + with open(db_path, 'rb') as fin: + for pgno in range(1, total_pages + 1): + page = fin.read(PAGE_SZ) + if len(page) < PAGE_SZ: + if len(page) > 0: + page = page + b'\x00' * (PAGE_SZ - len(page)) + else: + break + decrypted = decrypt_page(enc_key, page, pgno) + chunks.append(decrypted) + + return b''.join(chunks) + + +def decrypt_db_to_sqlite(db_path, enc_key): + """解密DB并返回sqlite3连接 (内存数据库)""" + data = decrypt_db_to_memory(db_path, enc_key) + + # 写临时文件 (sqlite3不支持直接从bytes打开) + tmp_path = db_path + ".tmp_monitor" + with open(tmp_path, 'wb') as f: + f.write(data) + + conn = sqlite3.connect(tmp_path) + conn.row_factory = sqlite3.Row + return conn, tmp_path + + +def load_contact_names(): + """从已解密的contact.db加载联系人昵称映射""" + names = {} + if not os.path.exists(CONTACT_CACHE): + return names + try: + conn = sqlite3.connect(CONTACT_CACHE) + rows = conn.execute( + "SELECT username, nick_name, remark FROM contact" + ).fetchall() + for r in rows: + username, nick, remark = r + names[username] = remark if remark else nick if nick else username + conn.close() + except Exception as e: + print(f"[WARN] 加载联系人失败: {e}") + return names + + +def get_session_state(conn): + """获取当前session状态""" + state = {} + try: + rows = conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable + WHERE last_timestamp > 0 + """).fetchall() + for r in rows: + state[r[0]] = { + 'unread': r[1], + 'summary': r[2] or '', + 'timestamp': r[3], + 'msg_type': r[4], + 'sender': r[5] or '', + 'sender_name': r[6] or '', + } + except Exception as e: + print(f"[ERROR] 读取session失败: {e}") + return state + + +def format_msg_type(t): + types = { + 1: '文本', 3: '图片', 34: '语音', 42: '名片', + 43: '视频', 47: '表情', 48: '位置', 49: '链接/文件', + 50: '语音/视频通话', 10000: '系统消息', 10002: '撤回', + } + return types.get(t, f'type={t}') + + +def main(): + print("=" * 60) + print(" 微信实时消息监听器") + print("=" * 60) + + # 加载密钥 + with open(KEYS_FILE) as f: + keys = json.load(f) + + session_key_info = keys.get("session\\session.db") + if not session_key_info: + print("[ERROR] 找不到session.db的密钥") + sys.exit(1) + + enc_key = bytes.fromhex(session_key_info["enc_key"]) + session_db = os.path.join(DB_DIR, "session", "session.db") + + # 加载联系人 + print("加载联系人...") + contact_names = load_contact_names() + print(f"已加载 {len(contact_names)} 个联系人") + + # 初始状态 + print("读取初始状态...") + conn, tmp_path = decrypt_db_to_sqlite(session_db, enc_key) + prev_state = get_session_state(conn) + conn.close() + os.remove(tmp_path) + + print(f"跟踪 {len(prev_state)} 个会话") + print(f"轮询间隔: {POLL_INTERVAL}秒") + print(f"\n{'='*60}") + print("开始监听... (Ctrl+C 停止)\n") + + poll_count = 0 + try: + while True: + time.sleep(POLL_INTERVAL) + poll_count += 1 + + try: + conn, tmp_path = decrypt_db_to_sqlite(session_db, enc_key) + curr_state = get_session_state(conn) + conn.close() + os.remove(tmp_path) + except Exception as e: + if poll_count % 10 == 0: + print(f"[{datetime.now().strftime('%H:%M:%S')}] 读取失败: {e}") + continue + + # 比较差异 + for username, curr in curr_state.items(): + prev = prev_state.get(username) + + if prev is None: + # 新会话 + display = contact_names.get(username, username) + ts = datetime.fromtimestamp(curr['timestamp']).strftime('%H:%M:%S') + print(f"[{ts}] 新会话 [{display}]") + print(f" {curr['summary']}") + print() + continue + + # 检查时间戳变化 (有新消息) + if curr['timestamp'] > prev['timestamp']: + display = contact_names.get(username, username) + ts = datetime.fromtimestamp(curr['timestamp']).strftime('%H:%M:%S') + msg_type = format_msg_type(curr['msg_type']) + sender = curr['sender_name'] or curr['sender'] or '' + + # 群聊显示发送者 + if '@chatroom' in username and sender: + sender_display = contact_names.get(curr['sender'], sender) + print(f"[{ts}] [{display}] {sender_display}:") + else: + print(f"[{ts}] [{display}]") + + # 消息内容 + summary = curr['summary'] + if summary: + # 群消息格式: "wxid_xxx:\n内容" - 提取内容部分 + if ':\n' in summary: + summary = summary.split(':\n', 1)[1] + print(f" [{msg_type}] {summary}") + else: + print(f" [{msg_type}]") + + # 未读数变化 + if curr['unread'] > 0: + print(f" (未读: {curr['unread']})") + print() + + prev_state = curr_state + + # 心跳 + if poll_count % 20 == 0: + now = datetime.now().strftime('%H:%M:%S') + print(f"--- {now} 运行中 (第{poll_count}次轮询) ---") + + except KeyboardInterrupt: + print(f"\n监听结束, 共 {poll_count} 次轮询") + + # 清理 + tmp = session_db + ".tmp_monitor" + if os.path.exists(tmp): + os.remove(tmp) + + +if __name__ == '__main__': + main() diff --git a/monitor_web.py b/monitor_web.py new file mode 100644 index 0000000..d174db2 --- /dev/null +++ b/monitor_web.py @@ -0,0 +1,551 @@ +""" +微信实时消息监听器 - Web UI (SSE推送 + mtime检测) + +http://localhost:5678 +- 30ms轮询WAL/DB文件的mtime变化(WAL是预分配固定大小,不能用size检测) +- 检测到变化后:全量解密DB + 全量WAL patch +- SSE 服务器推送 +""" +import hashlib, struct, os, sys, json, time, sqlite3, io, threading, queue +import hmac as hmac_mod +from datetime import datetime +from http.server import HTTPServer, BaseHTTPRequestHandler +from socketserver import ThreadingMixIn +from Crypto.Cipher import AES +import urllib.parse + +PAGE_SZ = 4096 +KEY_SZ = 32 +SALT_SZ = 16 +RESERVE_SZ = 80 +SQLITE_HDR = b'SQLite format 3\x00' +WAL_HEADER_SZ = 32 +WAL_FRAME_HEADER_SZ = 24 + +from config import load_config +_cfg = load_config() +DB_DIR = _cfg["db_dir"] +KEYS_FILE = _cfg["keys_file"] +CONTACT_CACHE = os.path.join(_cfg["decrypted_dir"], "contact", "contact.db") +DECRYPTED_SESSION = os.path.join(_cfg["decrypted_dir"], "session", "session.db") + +POLL_MS = 30 # 高频轮询WAL/DB的mtime,30ms一次 +PORT = 5678 + +sse_clients = [] +sse_lock = threading.Lock() +messages_log = [] +messages_lock = threading.Lock() +MAX_LOG = 500 + + +def decrypt_page(enc_key, page_data, pgno): + """解密单个加密页面""" + iv = page_data[PAGE_SZ - RESERVE_SZ: PAGE_SZ - RESERVE_SZ + 16] + if pgno == 1: + encrypted = page_data[SALT_SZ: PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + return bytearray(SQLITE_HDR + decrypted + b'\x00' * RESERVE_SZ) + else: + encrypted = page_data[:PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + return decrypted + b'\x00' * RESERVE_SZ + + +def full_decrypt(db_path, out_path, enc_key): + """首次全量解密""" + t0 = time.perf_counter() + file_size = os.path.getsize(db_path) + total_pages = file_size // PAGE_SZ + + with open(db_path, 'rb') as fin, open(out_path, 'wb') as fout: + for pgno in range(1, total_pages + 1): + page = fin.read(PAGE_SZ) + if len(page) < PAGE_SZ: + if len(page) > 0: + page = page + b'\x00' * (PAGE_SZ - len(page)) + else: + break + fout.write(decrypt_page(enc_key, page, pgno)) + + ms = (time.perf_counter() - t0) * 1000 + return total_pages, ms + + +def decrypt_wal_full(wal_path, out_path, enc_key): + """解密WAL当前有效frame,patch到已解密的DB副本 + + WAL是预分配固定大小(4MB),包含当前有效frame和上一轮遗留的旧frame。 + 通过WAL header中的salt值区分:只有frame header的salt匹配WAL header的才是有效frame。 + + 返回: (patched_pages, elapsed_ms) + """ + t0 = time.perf_counter() + + if not os.path.exists(wal_path): + return 0, 0 + + wal_size = os.path.getsize(wal_path) + if wal_size <= WAL_HEADER_SZ: + return 0, 0 + + frame_size = WAL_FRAME_HEADER_SZ + PAGE_SZ # 24 + 4096 = 4120 + patched = 0 + + with open(wal_path, 'rb') as wf, open(out_path, 'r+b') as df: + # 读WAL header,获取当前salt值 + wal_hdr = wf.read(WAL_HEADER_SZ) + wal_salt1 = struct.unpack('>I', wal_hdr[16:20])[0] + wal_salt2 = struct.unpack('>I', wal_hdr[20:24])[0] + + while wf.tell() + frame_size <= wal_size: + fh = wf.read(WAL_FRAME_HEADER_SZ) + if len(fh) < WAL_FRAME_HEADER_SZ: + break + pgno = struct.unpack('>I', fh[0:4])[0] + frame_salt1 = struct.unpack('>I', fh[8:12])[0] + frame_salt2 = struct.unpack('>I', fh[12:16])[0] + + ep = wf.read(PAGE_SZ) + if len(ep) < PAGE_SZ: + break + + # 校验: pgno有效 且 salt匹配当前WAL周期 + if pgno == 0 or pgno > 1000000: + continue + if frame_salt1 != wal_salt1 or frame_salt2 != wal_salt2: + continue # 旧周期遗留的frame,跳过 + + dec = decrypt_page(enc_key, ep, pgno) + df.seek((pgno - 1) * PAGE_SZ) + df.write(dec) + patched += 1 + + ms = (time.perf_counter() - t0) * 1000 + return patched, ms + + +def load_contact_names(): + names = {} + try: + conn = sqlite3.connect(CONTACT_CACHE) + for r in conn.execute("SELECT username, nick_name, remark FROM contact").fetchall(): + names[r[0]] = r[2] if r[2] else r[1] if r[1] else r[0] + conn.close() + except: + pass + return names + + +def format_msg_type(t): + return { + 1: '文本', 3: '图片', 34: '语音', 42: '名片', + 43: '视频', 47: '表情', 48: '位置', 49: '链接/文件', + 50: '通话', 10000: '系统', 10002: '撤回', + }.get(t, f'type={t}') + + +def msg_type_icon(t): + return { + 1: '💬', 3: '🖼️', 34: '🎤', 42: '👤', + 43: '🎬', 47: '😀', 48: '📍', 49: '🔗', + 50: '📞', 10000: '⚙️', 10002: '↩️', + }.get(t, '📨') + + +def broadcast_sse(msg_data): + payload = f"data: {json.dumps(msg_data, ensure_ascii=False)}\n\n" + with sse_lock: + dead = [] + for q in sse_clients: + try: + q.put_nowait(payload) + except: + dead.append(q) + for q in dead: + sse_clients.remove(q) + + +# ============ 监听器 ============ + +class SessionMonitor: + def __init__(self, enc_key, session_db, contact_names): + self.enc_key = enc_key + self.session_db = session_db + self.wal_path = session_db + "-wal" + self.contact_names = contact_names + self.prev_state = {} + self.decrypt_ms = 0 + self.patched_pages = 0 + + def query_state(self): + """查询已解密副本的session状态""" + conn = sqlite3.connect(f"file:{DECRYPTED_SESSION}?mode=ro", uri=True) + state = {} + for r in conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable WHERE last_timestamp > 0 + """).fetchall(): + state[r[0]] = { + 'unread': r[1], 'summary': r[2] or '', 'timestamp': r[3], + 'msg_type': r[4], 'sender': r[5] or '', 'sender_name': r[6] or '', + } + conn.close() + return state + + def do_full_refresh(self): + """全量解密DB + 全量WAL patch""" + # 先解密主DB + pages, ms = full_decrypt(self.session_db, DECRYPTED_SESSION, self.enc_key) + total_ms = ms + wal_patched = 0 + + # 再patch所有WAL frames + if os.path.exists(self.wal_path): + wal_patched, ms2 = decrypt_wal_full(self.wal_path, DECRYPTED_SESSION, self.enc_key) + total_ms += ms2 + + self.decrypt_ms = total_ms + self.patched_pages = pages + wal_patched + return self.patched_pages + + def check_updates(self): + global messages_log + try: + t0 = time.perf_counter() + self.do_full_refresh() + t1 = time.perf_counter() + curr_state = self.query_state() + t2 = time.perf_counter() + print(f" [perf] decrypt={self.patched_pages}页/{(t1-t0)*1000:.1f}ms, query={(t2-t1)*1000:.1f}ms", flush=True) + except Exception as e: + print(f" [ERROR] check_updates: {e}", flush=True) + return + + # 收集所有新消息,按时间排序后再推送 + new_msgs = [] + for username, curr in curr_state.items(): + prev = self.prev_state.get(username) + if prev and curr['timestamp'] > prev['timestamp']: + display = self.contact_names.get(username, username) + is_group = '@chatroom' in username + sender = '' + if is_group: + sender = self.contact_names.get(curr['sender'], curr['sender_name'] or curr['sender']) + + summary = curr['summary'] + if summary and ':\n' in summary: + summary = summary.split(':\n', 1)[1] + + new_msgs.append({ + 'time': datetime.fromtimestamp(curr['timestamp']).strftime('%H:%M:%S'), + 'timestamp': curr['timestamp'], + 'chat': display, + 'username': username, + 'is_group': is_group, + 'sender': sender, + 'type': format_msg_type(curr['msg_type']), + 'type_icon': msg_type_icon(curr['msg_type']), + 'content': summary, + 'unread': curr['unread'], + 'decrypt_ms': round(self.decrypt_ms, 1), + 'pages': self.patched_pages, + }) + + # 按时间排序 + new_msgs.sort(key=lambda m: m['timestamp']) + + for msg in new_msgs: + with messages_lock: + messages_log.append(msg) + if len(messages_log) > MAX_LOG: + messages_log = messages_log[-MAX_LOG:] + + broadcast_sse(msg) + + try: + now = time.time() + msg_age = now - msg['timestamp'] + tag = f"{self.patched_pages}pg/{self.decrypt_ms:.0f}ms" + sender = msg['sender'] + now_str = datetime.fromtimestamp(now).strftime('%H:%M:%S') + if sender: + print(f"[{msg['time']} 延迟={msg_age:.1f}s] [{msg['chat']}] {sender}: {msg['content']} ({tag})", flush=True) + else: + print(f"[{msg['time']} 延迟={msg_age:.1f}s] [{msg['chat']}] {msg['content']} ({tag})", flush=True) + except Exception: + pass # Windows CMD编码问题,不影响SSE推送 + + self.prev_state = curr_state + +def monitor_thread(enc_key, session_db, contact_names): + mon = SessionMonitor(enc_key, session_db, contact_names) + wal_path = mon.wal_path + + # 初始全量解密 + pages, ms = full_decrypt(session_db, DECRYPTED_SESSION, enc_key) + wal_patched = 0 + wal_ms = 0 + if os.path.exists(wal_path): + wal_patched, wal_ms = decrypt_wal_full(wal_path, DECRYPTED_SESSION, enc_key) + print(f"[init] DB {pages}页/{ms:.0f}ms + WAL {wal_patched}页/{wal_ms:.0f}ms", flush=True) + else: + print(f"[init] DB {pages}页/{ms:.0f}ms", flush=True) + + mon.prev_state = mon.query_state() + print(f"[monitor] 跟踪 {len(mon.prev_state)} 个会话", flush=True) + print(f"[monitor] mtime轮询模式 (每{POLL_MS}ms)", flush=True) + + # mtime-based 轮询: WAL是预分配固定大小,不能用size检测 + poll_interval = POLL_MS / 1000 + prev_wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0 + prev_db_mtime = os.path.getmtime(session_db) + + while True: + time.sleep(poll_interval) + try: + # 用mtime检测WAL和DB变化 + try: + wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0 + db_mtime = os.path.getmtime(session_db) + except OSError: + continue + + if wal_mtime == prev_wal_mtime and db_mtime == prev_db_mtime: + continue # 无变化 + + t_detect = time.perf_counter() + wal_changed = wal_mtime != prev_wal_mtime + db_changed = db_mtime != prev_db_mtime + + mon.check_updates() + + t_done = time.perf_counter() + try: + detect_str = datetime.now().strftime('%H:%M:%S.%f')[:-3] + print(f" [{detect_str}] WAL={'变' if wal_changed else '-'} DB={'变' if db_changed else '-'} 总耗时={(t_done-t_detect)*1000:.1f}ms", flush=True) + except Exception: + pass + + prev_wal_mtime = wal_mtime + prev_db_mtime = db_mtime + + except Exception as e: + print(f"[poll] 错误: {e}", flush=True) + time.sleep(1) + + +# ============ Web ============ + +HTML_PAGE = ''' + + + + +微信消息监听 + + + +
+

WeChat Monitor

+
SSE 实时
+
0 消息
+
+
+
📡

等待新消息...

WAL增量解密 · SSE推送

+
+ + +''' + + +class Handler(BaseHTTPRequestHandler): + def log_message(self, *a): pass + def handle(self): + try: + super().handle() + except (ConnectionAbortedError, ConnectionResetError, BrokenPipeError, OSError): + pass # 浏览器关闭连接,正常 + + def do_GET(self): + if self.path in ('/', '/index.html'): + self.send_response(200) + self.send_header('Content-Type', 'text/html; charset=utf-8') + self.end_headers() + self.wfile.write(HTML_PAGE.encode('utf-8')) + + elif self.path == '/api/history': + with messages_lock: + data = sorted(messages_log, key=lambda m: m.get('timestamp', 0)) + self.send_response(200) + self.send_header('Content-Type', 'application/json; charset=utf-8') + self.end_headers() + self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) + + elif self.path == '/stream': + self.send_response(200) + self.send_header('Content-Type', 'text/event-stream') + self.send_header('Cache-Control', 'no-cache') + self.send_header('Connection', 'keep-alive') + self.end_headers() + + q = queue.Queue() + with sse_lock: + sse_clients.append(q) + try: + while True: + try: + payload = q.get(timeout=15) + self.wfile.write(payload.encode('utf-8')) + self.wfile.flush() + except queue.Empty: + self.wfile.write(b': hb\n\n') + self.wfile.flush() + except: + pass + finally: + with sse_lock: + if q in sse_clients: + sse_clients.remove(q) + else: + self.send_error(404) + + +class ThreadedServer(ThreadingMixIn, HTTPServer): + daemon_threads = True + + +def main(): + print("=" * 60, flush=True) + print(" 微信实时监听 (WAL增量 + SSE推送)", flush=True) + print("=" * 60, flush=True) + + with open(KEYS_FILE) as f: + keys = json.load(f) + + enc_key = bytes.fromhex(keys["session\\session.db"]["enc_key"]) + session_db = os.path.join(DB_DIR, "session", "session.db") + + print("加载联系人...", flush=True) + contact_names = load_contact_names() + print(f"已加载 {len(contact_names)} 个联系人", flush=True) + + t = threading.Thread(target=monitor_thread, args=(enc_key, session_db, contact_names), daemon=True) + t.start() + + server = ThreadedServer(('0.0.0.0', PORT), Handler) + print(f"\n=> http://localhost:{PORT}", flush=True) + print("Ctrl+C 停止\n", flush=True) + + try: + os.system(f'cmd.exe /c start http://localhost:{PORT}') + except: + pass + + try: + server.serve_forever() + except KeyboardInterrupt: + print("\n已停止") + + +if __name__ == '__main__': + main()