Add image decryption and inline preview for WeChat V2 format

Support all three .dat encryption formats:
- Old XOR format: single-byte XOR, auto-detect key from magic bytes
- V1 format: AES-ECB with fixed key (md5("0")[:16]) + XOR tail
- V2 format (2025-08+): AES-128-ECB + raw middle + XOR tail

New files:
- decode_image.py: unified image decryption module (XOR/V1/V2)
- find_image_key.py: extract AES key from WeChat process memory
- find_image_key_monitor.py: continuous monitoring version for key capture

monitor_web.py changes:
- Inline image preview in Web UI with async decryption
- MonitorDBCache for mtime-based DB decryption caching
- username-to-DB mapping for image resolution chain
- /img/ endpoint for serving decoded images
- SSE image_update events for real-time preview updates

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat/daemon-cli
ylytdeng 2026-03-02 00:30:01 +08:00
parent 05b8ba4d45
commit da7525db95
8 changed files with 1729 additions and 25 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@ all_keys.json
wechat_key.txt
config.json
decrypted/
decoded_images/
*.db
*.db-shm
*.db-wal

View File

@ -12,6 +12,7 @@ _DEFAULT = {
"db_dir": r"D:\xwechat_files\your_wxid\db_storage",
"keys_file": "all_keys.json",
"decrypted_dir": "decrypted",
"decoded_image_dir": "decoded_images",
"wechat_process": "Weixin.exe",
}
@ -29,8 +30,21 @@ def load_config():
# 将相对路径转为绝对路径
base = os.path.dirname(os.path.abspath(__file__))
for key in ("keys_file", "decrypted_dir"):
for key in ("keys_file", "decrypted_dir", "decoded_image_dir"):
if key in cfg and not os.path.isabs(cfg[key]):
cfg[key] = os.path.join(base, cfg[key])
# 自动推导微信数据根目录db_dir 的上级目录)
# db_dir 格式: D:\xwechat_files\<wxid>\db_storage
# base_dir 格式: D:\xwechat_files\<wxid>
db_dir = cfg.get("db_dir", "")
if db_dir and os.path.basename(db_dir) == "db_storage":
cfg["wechat_base_dir"] = os.path.dirname(db_dir)
else:
cfg["wechat_base_dir"] = db_dir
# decoded_image_dir 默认值
if "decoded_image_dir" not in cfg:
cfg["decoded_image_dir"] = os.path.join(base, "decoded_images")
return cfg

470
decode_image.py 100644
View File

@ -0,0 +1,470 @@
r"""
微信图片 .dat 文件解密模块
支持两种加密格式:
- 旧格式: 单字节 XOR 加密key 通过对比文件头与已知图片 magic bytes 自动检测
- V2 格式 (2025-08+): AES-128-ECB + XOR 混合加密需要从微信进程内存提取 AES key
V2 文件结构:
[6B signature: 07 08 V2 08 07] [4B aes_size LE] [4B xor_size LE] [1B padding]
[aligned_aes_size bytes AES-ECB] [raw_data] [xor_size bytes XOR]
文件路径格式:
D:\xwechat_files\<wxid>\msg\attach\<md5(username)>\<YYYY-MM>\Img\<file_md5>[_t|_h].dat
映射链:
message_*.db (local_id) message_resource.db (packed_info MD5) .dat 文件 解密
"""
import os
import sys
import glob
import hashlib
import sqlite3
import struct
# V2 格式完整 magic (6 bytes)
V2_MAGIC = b'\x07\x08\x56\x32' # 前 4 字节用于快速检测
V2_MAGIC_FULL = b'\x07\x08V2\x08\x07' # 完整 6 字节签名
V1_MAGIC_FULL = b'\x07\x08V1\x08\x07' # V1 签名 (固定 key)
# 常见图片格式的 magic bytes (按长度降序排列,避免短 magic 假阳性)
IMAGE_MAGIC = {
'png': [0x89, 0x50, 0x4E, 0x47],
'gif': [0x47, 0x49, 0x46, 0x38],
'tif': [0x49, 0x49, 0x2A, 0x00], # little-endian TIFF
'webp': [0x52, 0x49, 0x46, 0x46], # RIFF header
'jpg': [0xFF, 0xD8, 0xFF],
# BMP 只有 2 字节 magic容易假阳性需要额外验证
}
def is_v2_format(dat_path):
"""检测是否是微信 V2 加密格式 (2025-08+)"""
try:
with open(dat_path, 'rb') as f:
magic = f.read(4)
return magic == V2_MAGIC
except (OSError, IOError):
return False
def detect_xor_key(dat_path):
"""通过对比文件头和已知图片 magic bytes 自动检测 XOR key
返回 key (int) NoneV2 格式文件返回 None
"""
with open(dat_path, 'rb') as f:
header = f.read(16)
if len(header) < 4:
return None
# V2 新格式无法用 XOR 解密
if header[:4] == V2_MAGIC:
return None
# 先尝试 3+ 字节 magic 的格式(可靠匹配)
for fmt, magic in IMAGE_MAGIC.items():
key = header[0] ^ magic[0]
match = True
for i in range(1, len(magic)):
if i >= len(header):
break
if (header[i] ^ key) != magic[i]:
match = False
break
if match:
return key
# 最后尝试 BMP (2 字节 magic需要额外验证)
bmp_magic = [0x42, 0x4D]
key = header[0] ^ bmp_magic[0]
if len(header) >= 2 and (header[1] ^ key) == bmp_magic[1]:
# 额外验证: XOR 解密后检查 BMP file size 和 offset 字段
if len(header) >= 14:
dec = bytes(b ^ key for b in header[:14])
bmp_size = struct.unpack_from('<I', dec, 2)[0]
bmp_offset = struct.unpack_from('<I', dec, 10)[0]
file_size = os.path.getsize(dat_path)
# BMP file_size 字段应与实际文件大小接近offset 应在合理范围
if (abs(bmp_size - file_size) < 1024 and 14 <= bmp_offset <= 1078):
return key
return None
def detect_image_format(header_bytes):
"""根据解密后的文件头检测图片格式"""
if header_bytes[:3] == bytes([0xFF, 0xD8, 0xFF]):
return 'jpg'
if header_bytes[:4] == bytes([0x89, 0x50, 0x4E, 0x47]):
return 'png'
if header_bytes[:3] == b'GIF':
return 'gif'
if header_bytes[:2] == b'BM':
return 'bmp'
if header_bytes[:4] == b'RIFF' and len(header_bytes) >= 12 and header_bytes[8:12] == b'WEBP':
return 'webp'
if header_bytes[:4] == bytes([0x49, 0x49, 0x2A, 0x00]):
return 'tif'
return 'bin'
def v2_decrypt_file(dat_path, out_path=None, aes_key=None, xor_key=0x88):
"""解密 V2 格式 .dat 文件 (AES-ECB + XOR)
Args:
dat_path: V2 .dat 文件路径
out_path: 输出路径 (None 则自动命名)
aes_key: 16 字节 AES key (bytes str)
xor_key: XOR key (int, 默认 0x88)
Returns:
(output_path, format) (None, None)
"""
if aes_key is None:
return None, None
from Crypto.Cipher import AES
from Crypto.Util import Padding
# 确保 key 是 16 字节 bytes
if isinstance(aes_key, str):
aes_key = aes_key.encode('ascii')[:16]
if len(aes_key) < 16:
return None, None
with open(dat_path, 'rb') as f:
data = f.read()
if len(data) < 15:
return None, None
# 解析 header
sig = data[:6]
if sig not in (V2_MAGIC_FULL, V1_MAGIC_FULL):
return None, None
aes_size, xor_size = struct.unpack_from('<LL', data, 6)
# V1 用固定 key
if sig == V1_MAGIC_FULL:
aes_key = b'cfcd208495d565ef' # md5("0")[:16]
# AES 对齐: PKCS7 填充使实际密文 >= aes_size向上对齐到 16
# 当 aes_size 是 16 的倍数时,还需要加 16 (完整填充块)
aligned_aes_size = aes_size
aligned_aes_size -= ~(~aligned_aes_size % 16) # 同 wx-dat 的公式
offset = 15
if offset + aligned_aes_size > len(data):
return None, None
# AES-ECB 解密
aes_data = data[offset:offset + aligned_aes_size]
try:
cipher = AES.new(aes_key[:16], AES.MODE_ECB)
dec_aes = Padding.unpad(cipher.decrypt(aes_data), AES.block_size)
except (ValueError, KeyError):
return None, None
offset += aligned_aes_size
# Raw 部分 (不加密)
raw_end = len(data) - xor_size
raw_data = data[offset:raw_end] if offset < raw_end else b''
offset = raw_end
# XOR 部分
xor_data = data[offset:]
dec_xor = bytes(b ^ xor_key for b in xor_data)
decrypted = dec_aes + raw_data + dec_xor
fmt = detect_image_format(decrypted[:16])
# wxgf (HEVC 裸流) 格式
if decrypted[:4] == b'wxgf':
fmt = 'hevc'
if out_path is None:
base = os.path.splitext(dat_path)[0]
for suffix in ('_t', '_h'):
if base.endswith(suffix):
base = base[:-len(suffix)]
break
out_path = f"{base}.{fmt}"
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(out_path, 'wb') as f:
f.write(decrypted)
return out_path, fmt
def xor_decrypt_file(dat_path, out_path=None, key=None):
"""解密单个 .dat 文件,返回 (output_path, format)"""
if key is None:
key = detect_xor_key(dat_path)
if key is None:
return None, None
with open(dat_path, 'rb') as f:
data = f.read()
decrypted = bytes(b ^ key for b in data)
fmt = detect_image_format(decrypted[:16])
if out_path is None:
base = os.path.splitext(dat_path)[0]
# 去掉 _t, _h 后缀
for suffix in ('_t', '_h'):
if base.endswith(suffix):
base = base[:-len(suffix)]
break
out_path = f"{base}.{fmt}"
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(out_path, 'wb') as f:
f.write(decrypted)
return out_path, fmt
def decrypt_dat_file(dat_path, out_path=None, aes_key=None, xor_key=0x88):
"""智能解密 .dat 文件 (自动检测格式)
Args:
dat_path: .dat 文件路径
out_path: 输出路径
aes_key: V2 格式的 AES key (str bytes, 16 字节)
xor_key: XOR key (int)
Returns:
(output_path, format) (None, None)
"""
with open(dat_path, 'rb') as f:
head = f.read(6)
# V2 新格式
if head == V2_MAGIC_FULL:
return v2_decrypt_file(dat_path, out_path, aes_key, xor_key)
# V1 格式 (固定 AES key)
if head == V1_MAGIC_FULL:
return v2_decrypt_file(dat_path, out_path, b'cfcd208495d565ef', xor_key)
# 旧 XOR 格式
return xor_decrypt_file(dat_path, out_path)
def extract_md5_from_packed_info(blob):
"""从 message_resource.db 的 packed_info (protobuf) 中提取文件 MD5
格式: ... \\x12\\x22\\x0a\\x20 + 32 字节 ASCII hex MD5 ...
"""
if not blob or not isinstance(blob, bytes):
return None
# 查找 protobuf 标记
marker = b'\x12\x22\x0a\x20'
idx = blob.find(marker)
if idx >= 0 and idx + len(marker) + 32 <= len(blob):
md5_bytes = blob[idx + len(marker): idx + len(marker) + 32]
try:
md5_str = md5_bytes.decode('ascii')
# 验证是合法的 hex 字符串
int(md5_str, 16)
return md5_str
except (UnicodeDecodeError, ValueError):
pass
# 备用方案:扫描 32 字节连续 hex 字符
hex_chars = set(b'0123456789abcdef')
i = 0
while i <= len(blob) - 32:
if blob[i] in hex_chars:
candidate = blob[i:i+32]
if all(b in hex_chars for b in candidate):
try:
return candidate.decode('ascii')
except UnicodeDecodeError:
pass
i += 32
else:
i += 1
return None
class ImageResolver:
"""封装从 local_id 到图片文件的完整解析链"""
def __init__(self, wechat_base_dir, decoded_image_dir, cache):
"""
Args:
wechat_base_dir: 微信数据根目录 ( D:\\xwechat_files\\<wxid>)
decoded_image_dir: 解密图片输出目录
cache: DBCache 实例用于解密 message_resource.db
"""
self.base_dir = wechat_base_dir
self.attach_dir = os.path.join(wechat_base_dir, "msg", "attach")
self.out_dir = decoded_image_dir
self.cache = cache
def get_image_md5(self, local_id):
"""通过 local_id 查 message_resource.db 获取图片文件 MD5"""
path = self.cache.get("message\\message_resource.db")
if not path:
return None
conn = sqlite3.connect(path)
try:
row = conn.execute(
"SELECT packed_info FROM MessageResourceInfo WHERE local_id = ?",
(local_id,)
).fetchone()
if row and row[0]:
return extract_md5_from_packed_info(row[0])
except Exception:
pass
finally:
conn.close()
return None
def find_dat_files(self, username, file_md5):
"""在 attach 目录下查找对应的 .dat 文件
路径: attach/<md5(username)>/<YYYY-MM>/Img/<file_md5>[_t|_h].dat
"""
username_hash = hashlib.md5(username.encode()).hexdigest()
search_base = os.path.join(self.attach_dir, username_hash)
if not os.path.isdir(search_base):
return []
# 在所有月份目录下搜索
results = []
pattern = os.path.join(search_base, "*", "Img", f"{file_md5}*.dat")
for p in glob.glob(pattern):
results.append(p)
return sorted(results)
def decode_image(self, username, local_id):
"""完整流程local_id → MD5 → .dat → 解密
Returns:
dict with keys: success, path, format, md5, error
"""
# 1. 获取 MD5
file_md5 = self.get_image_md5(local_id)
if not file_md5:
return {'success': False, 'error': f'无法从 message_resource.db 找到 local_id={local_id} 的图片信息'}
# 2. 找 .dat 文件
dat_files = self.find_dat_files(username, file_md5)
if not dat_files:
return {'success': False, 'error': f'找不到 .dat 文件 (MD5={file_md5})', 'md5': file_md5}
# 优先选标准版(非 _t/_h然后高清 _h最后缩略图 _t
selected = dat_files[0]
for f in dat_files:
fname = os.path.basename(f)
if not fname.startswith(file_md5 + '_'):
selected = f
break
for f in dat_files:
if f.endswith('_h.dat'):
selected = f
break
# 3. 解密
out_name = f"{file_md5}"
out_path_base = os.path.join(self.out_dir, out_name)
result_path, fmt = xor_decrypt_file(selected, f"{out_path_base}.tmp")
if not result_path:
return {'success': False, 'error': f'无法检测 XOR key (文件: {selected})', 'md5': file_md5}
# 重命名为正确扩展名
final_path = f"{out_path_base}.{fmt}"
if os.path.exists(final_path):
os.unlink(final_path)
os.rename(result_path, final_path)
return {
'success': True,
'path': final_path,
'format': fmt,
'md5': file_md5,
'source': selected,
'size': os.path.getsize(final_path),
}
def list_chat_images(self, db_path, table_name, username, limit=20):
"""列出某个聊天中的所有图片消息"""
conn = sqlite3.connect(db_path)
try:
rows = conn.execute(f"""
SELECT local_id, create_time
FROM [{table_name}]
WHERE local_type = 3
ORDER BY create_time DESC
LIMIT ?
""", (limit,)).fetchall()
except Exception as e:
conn.close()
return []
conn.close()
results = []
for local_id, create_time in rows:
file_md5 = self.get_image_md5(local_id)
info = {
'local_id': local_id,
'create_time': create_time,
'md5': file_md5,
}
if file_md5:
dat_files = self.find_dat_files(username, file_md5)
if dat_files:
info['dat_file'] = dat_files[0]
try:
info['size'] = os.path.getsize(dat_files[0])
except OSError:
pass
results.append(info)
return results
# ============ CLI 测试 ============
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python decode_image.py <dat_file> [output_file]")
print(" 解密单个 .dat 文件")
sys.exit(1)
dat_file = sys.argv[1]
out_file = sys.argv[2] if len(sys.argv) > 2 else None
if not os.path.exists(dat_file):
print(f"文件不存在: {dat_file}")
sys.exit(1)
key = detect_xor_key(dat_file)
if key is None:
print("无法检测 XOR key文件可能不是微信加密图片")
sys.exit(1)
print(f"检测到 XOR key: 0x{key:02X}")
result_path, fmt = xor_decrypt_file(dat_file, out_file, key)
if result_path:
size = os.path.getsize(result_path)
print(f"解密成功: {result_path}")
print(f"格式: {fmt}, 大小: {size:,} bytes")
else:
print("解密失败")

410
find_image_key.py 100644
View File

@ -0,0 +1,410 @@
"""从微信进程内存中提取图片 AES 密钥 (V2 .dat 格式)
V2 .dat 文件结构:
[6B signature: 07 08 V2 08 07] [4B aes_size LE] [4B xor_size LE] [1B padding]
[aes_size bytes AES-ECB encrypted] [raw_data unencrypted] [xor_size bytes XOR encrypted]
AES key: 16-byte ASCII string found in Weixin.exe process memory
XOR key: single byte, same as old format (derived from JPEG FF D9 ending)
Usage:
1. 打开微信, 进入聊天/朋友圈, 点击查看 2-3 张图片
2. 立即运行: python find_image_key.py
"""
import os
import sys
import re
import struct
import glob
import json
import time
import ctypes
from ctypes import wintypes
from Crypto.Cipher import AES
from Crypto.Util import Padding
# Windows API constants
PROCESS_ALL_ACCESS = 0x1F0FFF
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
MEM_COMMIT = 0x1000
PAGE_NOACCESS = 0x01
PAGE_GUARD = 0x100
PAGE_READWRITE = 0x04
PAGE_WRITECOPY = 0x08
PAGE_EXECUTE_READWRITE = 0x40
PAGE_EXECUTE_WRITECOPY = 0x80
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_void_p),
("AllocationBase", ctypes.c_void_p),
("AllocationProtect", wintypes.DWORD),
("RegionSize", ctypes.c_size_t),
("State", wintypes.DWORD),
("Protect", wintypes.DWORD),
("Type", wintypes.DWORD),
]
kernel32 = ctypes.windll.kernel32
# 正则: 精确 32 字符 alphanum (前后是非 alphanum 或边界)
RE_KEY32 = re.compile(rb'(?<![a-zA-Z0-9])[a-zA-Z0-9]{32}(?![a-zA-Z0-9])')
# 正则: 精确 16 字符 alphanum
RE_KEY16 = re.compile(rb'(?<![a-zA-Z0-9])[a-zA-Z0-9]{16}(?![a-zA-Z0-9])')
def get_wechat_pids():
import subprocess
result = subprocess.run(
['tasklist.exe', '/FI', 'IMAGENAME eq Weixin.exe', '/FO', 'CSV', '/NH'],
capture_output=True, text=True
)
pids = []
for line in result.stdout.strip().split('\n'):
if 'Weixin.exe' in line:
parts = line.strip('"').split('","')
if len(parts) >= 2:
pids.append(int(parts[1]))
return pids
def find_v2_ciphertext(attach_dir):
"""从多个 V2 .dat 文件中提取第一个 AES 密文块 (16 bytes)"""
v2_magic = b'\x07\x08V2\x08\x07'
# Search _t.dat (thumbnails, likely JPEG)
pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat")
dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True)
for f in dat_files[:100]:
try:
with open(f, 'rb') as fp:
header = fp.read(31)
if header[:6] == v2_magic and len(header) >= 31:
return header[15:31], os.path.basename(f)
except:
continue
return None, None
def find_xor_key(attach_dir):
"""从缩略图文件末尾推导 XOR key (JPEG 结尾 FF D9)"""
v2_magic = b'\x07\x08V2\x08\x07'
pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat")
dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True)
tail_counts = {}
for f in dat_files[:32]:
try:
sz = os.path.getsize(f)
with open(f, 'rb') as fp:
head = fp.read(6)
fp.seek(sz - 2)
tail = fp.read(2)
if head == v2_magic and len(tail) == 2:
key = (tail[0], tail[1])
tail_counts[key] = tail_counts.get(key, 0) + 1
except:
continue
if not tail_counts:
return None
most_common = max(tail_counts, key=tail_counts.get)
x, y = most_common
xor_key = x ^ 0xFF
check = y ^ 0xD9
if xor_key == check:
return xor_key
return xor_key # return best guess anyway
def try_key(key_bytes, ciphertext):
"""Try decrypting ciphertext with key, return format name if successful"""
try:
cipher = AES.new(key_bytes, AES.MODE_ECB)
dec = cipher.decrypt(ciphertext)
if dec[:3] == b'\xFF\xD8\xFF':
return 'JPEG'
if dec[:4] == bytes([0x89, 0x50, 0x4E, 0x47]):
return 'PNG'
if dec[:4] == b'RIFF':
return 'WEBP'
if dec[:4] == b'wxgf':
return 'WXGF'
if dec[:3] == b'GIF':
return 'GIF'
except:
pass
return None
def is_rw_protect(protect):
"""Check if memory region is readable/writable (where string keys live)"""
rw_flags = (PAGE_READWRITE | PAGE_WRITECOPY |
PAGE_EXECUTE_READWRITE | PAGE_EXECUTE_WRITECOPY)
return (protect & rw_flags) != 0
def scan_memory_for_aes_key(pid, ciphertext):
"""扫描微信进程内存寻找 AES key (regex 加速版)"""
access = PROCESS_VM_READ | PROCESS_QUERY_INFORMATION
h_process = kernel32.OpenProcess(access, False, pid)
if not h_process:
print(f" 无法打开进程 {pid} (尝试以管理员运行)", flush=True)
return None
try:
# Enumerate memory regions
address = 0
mbi = MEMORY_BASIC_INFORMATION()
rw_regions = []
all_regions = []
while address < 0x7FFFFFFFFFFF:
result = kernel32.VirtualQueryEx(
h_process, ctypes.c_void_p(address),
ctypes.byref(mbi), ctypes.sizeof(mbi)
)
if result == 0:
break
if (mbi.State == MEM_COMMIT and
mbi.Protect != PAGE_NOACCESS and
(mbi.Protect & PAGE_GUARD) == 0 and
mbi.RegionSize <= 50 * 1024 * 1024):
region = (mbi.BaseAddress, mbi.RegionSize, mbi.Protect)
all_regions.append(region)
if is_rw_protect(mbi.Protect):
rw_regions.append(region)
next_addr = address + mbi.RegionSize
if next_addr <= address:
break
address = next_addr
rw_mb = sum(r[1] for r in rw_regions) / 1024 / 1024
all_mb = sum(r[1] for r in all_regions) / 1024 / 1024
print(f" RW 区域: {len(rw_regions)} ({rw_mb:.0f} MB), 总计: {len(all_regions)} ({all_mb:.0f} MB)", flush=True)
# Phase 1: 只扫描 RW 区域 (key 字符串最可能在这里)
print(" === Phase 1: 扫描 RW 内存 ===", flush=True)
result = _scan_regions(h_process, rw_regions, ciphertext)
if result:
return result
# Phase 2: 扫描所有可读区域
print(" === Phase 2: 扫描所有内存 ===", flush=True)
# 排除已扫描的 RW 区域
rw_set = set((r[0], r[1]) for r in rw_regions)
other_regions = [r for r in all_regions if (r[0], r[1]) not in rw_set]
result = _scan_regions(h_process, other_regions, ciphertext)
if result:
return result
return None
finally:
kernel32.CloseHandle(h_process)
def _scan_regions(h_process, regions, ciphertext):
"""扫描指定内存区域列表,返回找到的 key 或 None"""
candidates_32 = 0
candidates_16 = 0
t0 = time.time()
for idx, (base_addr, region_size, _protect) in enumerate(regions):
if idx % 100 == 0:
elapsed = time.time() - t0
print(f" 扫描 {idx}/{len(regions)} ({elapsed:.1f}s)", end='\r', flush=True)
buffer = ctypes.create_string_buffer(region_size)
bytes_read = ctypes.c_size_t(0)
ok = kernel32.ReadProcessMemory(
h_process, ctypes.c_void_p(base_addr),
buffer, region_size, ctypes.byref(bytes_read)
)
if not ok or bytes_read.value < 32:
continue
data = buffer.raw[:bytes_read.value]
# 用正则找 32 字符 alphanum (C 级速度)
for m in RE_KEY32.finditer(data):
key_bytes = m.group()
candidates_32 += 1
# 前 16 字符作为 AES-128 key
fmt = try_key(key_bytes[:16], ciphertext)
if fmt:
key_str = key_bytes.decode('ascii')
print(f"\n*** 找到 AES key (32-char)! → {fmt} ***", flush=True)
print(f" 完整: {key_str}", flush=True)
print(f" AES key: {key_str[:16]}", flush=True)
return key_str[:16]
# 也试完整 32 字节作 AES-256
fmt = try_key(key_bytes, ciphertext)
if fmt:
key_str = key_bytes.decode('ascii')
print(f"\n*** 找到 AES key (32-byte)! → {fmt} ***", flush=True)
print(f" 完整: {key_str}", flush=True)
return key_str
# 也找独立的 16 字符 alphanum
for m in RE_KEY16.finditer(data):
key_bytes = m.group()
candidates_16 += 1
fmt = try_key(key_bytes, ciphertext)
if fmt:
key_str = key_bytes.decode('ascii')
print(f"\n*** 找到 AES key (16-char)! → {fmt} ***", flush=True)
print(f" AES key: {key_str}", flush=True)
return key_str
elapsed = time.time() - t0
print(f"\n 测试: {candidates_32} x 32-char + {candidates_16} x 16-char ({elapsed:.1f}s)", flush=True)
return None
def verify_and_decrypt(attach_dir, aes_key_str, xor_key):
"""完整解密一个 V2 文件作为验证"""
v2_magic = b'\x07\x08V2\x08\x07'
key = aes_key_str.encode('ascii')[:16]
pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat")
dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True)
for f in dat_files[:10]:
try:
with open(f, 'rb') as fp:
data = fp.read()
if data[:6] != v2_magic:
continue
sig, aes_size, xor_size = struct.unpack_from('<6sLL', data)
# AES 对齐: 向上取整到 16 的倍数 (PKCS7 填充)
aligned_aes_size = aes_size
aligned_aes_size -= ~(~aligned_aes_size % 16)
offset = 15
aes_data = data[offset:offset + aligned_aes_size]
cipher = AES.new(key, AES.MODE_ECB)
dec_aes = Padding.unpad(cipher.decrypt(aes_data), AES.block_size)
offset += aligned_aes_size
# Raw portion
raw_data = data[offset:len(data) - xor_size]
offset += len(raw_data)
# XOR portion
xor_data = data[offset:]
dec_xor = bytes(b ^ xor_key for b in xor_data) if xor_key is not None else xor_data
result = dec_aes + raw_data + dec_xor
fmt = "unknown"
ext = ".bin"
if result[:3] == b'\xFF\xD8\xFF':
fmt, ext = "JPEG", ".jpg"
elif result[:4] == bytes([0x89, 0x50, 0x4E, 0x47]):
fmt, ext = "PNG", ".png"
elif result[:4] == b'RIFF':
fmt, ext = "WEBP", ".webp"
elif result[:4] == b'wxgf':
fmt, ext = "WXGF", ".hevc"
print(f" {os.path.basename(f)} -> {fmt} ({len(result):,}B)", flush=True)
if fmt != "unknown":
out_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "decoded_images")
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, os.path.splitext(os.path.basename(f))[0] + ext)
with open(out_path, 'wb') as fp:
fp.write(result)
print(f" saved: {out_path}", flush=True)
return True
except Exception as e:
continue
return False
def main():
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json')
with open(config_path) as f:
config = json.load(f)
db_dir = config['db_dir']
base_dir = os.path.dirname(db_dir)
attach_dir = os.path.join(base_dir, 'msg', 'attach')
# 1. XOR key
print("=== XOR Key ===", flush=True)
xor_key = find_xor_key(attach_dir)
if xor_key is not None:
print(f"XOR key: 0x{xor_key:02x}", flush=True)
# 2. V2 ciphertext
print("\n=== V2 ciphertext ===", flush=True)
ciphertext, ct_file = find_v2_ciphertext(attach_dir)
if ciphertext is None:
print("No V2 .dat files found")
return
print(f"File: {ct_file}", flush=True)
print(f"Cipher: {ciphertext.hex()}", flush=True)
# 3. Check if already have key in config
if config.get('image_aes_key'):
print(f"\nExisting image_aes_key: {config['image_aes_key']}", flush=True)
fmt = try_key(config['image_aes_key'].encode('ascii')[:16], ciphertext)
if fmt:
print(f"Key valid! -> {fmt}", flush=True)
print("\n=== Verify decrypt ===", flush=True)
verify_and_decrypt(attach_dir, config['image_aes_key'], xor_key)
return
else:
print("Saved key invalid, re-scanning...", flush=True)
# 4. Scan memory
print("\n=== Scanning WeChat process memory ===", flush=True)
pids = get_wechat_pids()
if not pids:
print("WeChat not running!")
return
print(f"PIDs: {pids}", flush=True)
print("Tip: View 2-3 images in WeChat first, then run this script immediately\n", flush=True)
aes_key = None
for pid in pids:
print(f"Scanning PID {pid}...", flush=True)
aes_key = scan_memory_for_aes_key(pid, ciphertext)
if aes_key:
break
if aes_key:
print(f"\n=== Result ===", flush=True)
print(f"AES key: {aes_key}", flush=True)
print(f"XOR key: 0x{xor_key:02x}" if xor_key is not None else "XOR key: unknown", flush=True)
config['image_aes_key'] = aes_key
if xor_key is not None:
config['image_xor_key'] = xor_key
with open(config_path, 'w') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
print(f"Saved to {config_path}", flush=True)
print("\n=== Verify decrypt ===", flush=True)
verify_and_decrypt(attach_dir, aes_key, xor_key)
else:
print("\nAES key not found!", flush=True)
print("Steps:", flush=True)
print(" 1. Login WeChat and keep it running", flush=True)
print(" 2. Open Moments or a chat, view 2-3 images (tap to open full size)", flush=True)
print(" 3. Immediately re-run this script", flush=True)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,318 @@
"""持续监控微信进程内存,捕获图片 AES 密钥
运行此脚本后在微信中打开查看几张图片
脚本会自动检测到 key 并保存到 config.json
Ctrl+C 退出
"""
import os
import sys
import re
import struct
import glob
import json
import time
import ctypes
from ctypes import wintypes
from Crypto.Cipher import AES
from Crypto.Util import Padding
# Windows API constants
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
MEM_COMMIT = 0x1000
PAGE_NOACCESS = 0x01
PAGE_GUARD = 0x100
PAGE_READWRITE = 0x04
PAGE_WRITECOPY = 0x08
PAGE_EXECUTE_READWRITE = 0x40
PAGE_EXECUTE_WRITECOPY = 0x80
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_void_p),
("AllocationBase", ctypes.c_void_p),
("AllocationProtect", wintypes.DWORD),
("RegionSize", ctypes.c_size_t),
("State", wintypes.DWORD),
("Protect", wintypes.DWORD),
("Type", wintypes.DWORD),
]
kernel32 = ctypes.windll.kernel32
# Regex for key patterns
RE_KEY32 = re.compile(rb'(?<![a-zA-Z0-9])[a-zA-Z0-9]{32}(?![a-zA-Z0-9])')
RE_KEY16 = re.compile(rb'(?<![a-zA-Z0-9])[a-zA-Z0-9]{16}(?![a-zA-Z0-9])')
def get_wechat_pids():
import subprocess
result = subprocess.run(
['tasklist.exe', '/FI', 'IMAGENAME eq Weixin.exe', '/FO', 'CSV', '/NH'],
capture_output=True, text=True
)
pids = []
for line in result.stdout.strip().split('\n'):
if 'Weixin.exe' in line:
parts = line.strip('"').split('","')
if len(parts) >= 2:
pids.append(int(parts[1]))
return pids
def find_v2_ciphertext(attach_dir):
v2_magic = b'\x07\x08V2\x08\x07'
pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat")
dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True)
for f in dat_files[:100]:
try:
with open(f, 'rb') as fp:
header = fp.read(31)
if header[:6] == v2_magic and len(header) >= 31:
return header[15:31], os.path.basename(f)
except:
continue
return None, None
def find_xor_key(attach_dir):
v2_magic = b'\x07\x08V2\x08\x07'
pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat")
dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True)
tail_counts = {}
for f in dat_files[:32]:
try:
sz = os.path.getsize(f)
with open(f, 'rb') as fp:
head = fp.read(6)
fp.seek(sz - 2)
tail = fp.read(2)
if head == v2_magic and len(tail) == 2:
key = (tail[0], tail[1])
tail_counts[key] = tail_counts.get(key, 0) + 1
except:
continue
if not tail_counts:
return None
most_common = max(tail_counts, key=tail_counts.get)
return most_common[0] ^ 0xFF
def try_key(key_bytes, ciphertext):
try:
cipher = AES.new(key_bytes, AES.MODE_ECB)
dec = cipher.decrypt(ciphertext)
if dec[:3] == b'\xFF\xD8\xFF': return 'JPEG'
if dec[:4] == bytes([0x89, 0x50, 0x4E, 0x47]): return 'PNG'
if dec[:4] == b'RIFF': return 'WEBP'
if dec[:4] == b'wxgf': return 'WXGF'
if dec[:3] == b'GIF': return 'GIF'
except:
pass
return None
def is_rw_protect(protect):
rw_flags = (PAGE_READWRITE | PAGE_WRITECOPY |
PAGE_EXECUTE_READWRITE | PAGE_EXECUTE_WRITECOPY)
return (protect & rw_flags) != 0
def get_rw_regions(h_process):
"""Get RW committed memory regions"""
address = 0
mbi = MEMORY_BASIC_INFORMATION()
regions = []
while address < 0x7FFFFFFFFFFF:
result = kernel32.VirtualQueryEx(
h_process, ctypes.c_void_p(address),
ctypes.byref(mbi), ctypes.sizeof(mbi)
)
if result == 0:
break
if (mbi.State == MEM_COMMIT and
mbi.Protect != PAGE_NOACCESS and
(mbi.Protect & PAGE_GUARD) == 0 and
mbi.RegionSize <= 50 * 1024 * 1024 and
is_rw_protect(mbi.Protect)):
regions.append((mbi.BaseAddress, mbi.RegionSize))
next_addr = address + mbi.RegionSize
if next_addr <= address:
break
address = next_addr
return regions
def quick_scan(h_process, regions, ciphertext):
"""Fast scan of RW regions, return key or None"""
for base_addr, region_size in regions:
buffer = ctypes.create_string_buffer(region_size)
bytes_read = ctypes.c_size_t(0)
ok = kernel32.ReadProcessMemory(
h_process, ctypes.c_void_p(base_addr),
buffer, region_size, ctypes.byref(bytes_read)
)
if not ok or bytes_read.value < 32:
continue
data = buffer.raw[:bytes_read.value]
# 32-char keys (first 16 as AES-128)
for m in RE_KEY32.finditer(data):
key_bytes = m.group()
fmt = try_key(key_bytes[:16], ciphertext)
if fmt:
return key_bytes.decode('ascii')[:16], fmt
fmt = try_key(key_bytes, ciphertext)
if fmt:
return key_bytes.decode('ascii'), fmt
# Standalone 16-char keys
for m in RE_KEY16.finditer(data):
key_bytes = m.group()
fmt = try_key(key_bytes, ciphertext)
if fmt:
return key_bytes.decode('ascii'), fmt
return None, None
def verify_and_decrypt(attach_dir, aes_key_str, xor_key):
"""Decrypt one V2 file as verification"""
v2_magic = b'\x07\x08V2\x08\x07'
key = aes_key_str.encode('ascii')[:16]
pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat")
dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True)
for f in dat_files[:10]:
try:
with open(f, 'rb') as fp:
data = fp.read()
if data[:6] != v2_magic:
continue
sig, aes_size, xor_size = struct.unpack_from('<6sLL', data)
aligned_aes_size = aes_size
aligned_aes_size -= ~(~aligned_aes_size % 16)
offset = 15
aes_data = data[offset:offset + aligned_aes_size]
cipher = AES.new(key, AES.MODE_ECB)
dec_aes = Padding.unpad(cipher.decrypt(aes_data), AES.block_size)
offset += aligned_aes_size
raw_data = data[offset:len(data) - xor_size]
offset += len(raw_data)
xor_data = data[offset:]
dec_xor = bytes(b ^ xor_key for b in xor_data) if xor_key is not None else xor_data
result = dec_aes + raw_data + dec_xor
fmt, ext = "unknown", ".bin"
if result[:3] == b'\xFF\xD8\xFF': fmt, ext = "JPEG", ".jpg"
elif result[:4] == bytes([0x89, 0x50, 0x4E, 0x47]): fmt, ext = "PNG", ".png"
elif result[:4] == b'RIFF': fmt, ext = "WEBP", ".webp"
elif result[:4] == b'wxgf': fmt, ext = "WXGF", ".hevc"
if fmt != "unknown":
out_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "decoded_images")
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, os.path.splitext(os.path.basename(f))[0] + ext)
with open(out_path, 'wb') as fp:
fp.write(result)
print(f" Verified: {os.path.basename(f)} -> {fmt} ({len(result):,}B)", flush=True)
print(f" Saved: {out_path}", flush=True)
return True
except:
continue
return False
def main():
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json')
with open(config_path) as f:
config = json.load(f)
db_dir = config['db_dir']
base_dir = os.path.dirname(db_dir)
attach_dir = os.path.join(base_dir, 'msg', 'attach')
xor_key = find_xor_key(attach_dir)
print(f"XOR key: 0x{xor_key:02x}" if xor_key else "XOR key: unknown", flush=True)
ciphertext, ct_file = find_v2_ciphertext(attach_dir)
if ciphertext is None:
print("No V2 .dat files found")
return
print(f"V2 cipher: {ciphertext.hex()} ({ct_file})", flush=True)
# Check existing key
if config.get('image_aes_key'):
fmt = try_key(config['image_aes_key'].encode('ascii')[:16], ciphertext)
if fmt:
print(f"Existing key valid: {config['image_aes_key']} -> {fmt}", flush=True)
return
pids = get_wechat_pids()
if not pids:
print("WeChat not running!")
return
# Find the main PID (largest memory footprint)
main_pid = pids[0]
print(f"\nMonitoring PID {main_pid} (main WeChat process)", flush=True)
print("=" * 60, flush=True)
print("NOW: Open WeChat and tap to view 2-3 images (full size)", flush=True)
print("The script will automatically detect the key...", flush=True)
print("=" * 60, flush=True)
access = PROCESS_VM_READ | PROCESS_QUERY_INFORMATION
h_process = kernel32.OpenProcess(access, False, main_pid)
if not h_process:
print(f"Cannot open process {main_pid} (run as admin?)", flush=True)
return
try:
# Get regions once (they don't change much)
regions = get_rw_regions(h_process)
total_mb = sum(r[1] for r in regions) / 1024 / 1024
print(f"RW regions: {len(regions)} ({total_mb:.0f} MB)", flush=True)
scan_count = 0
while True:
scan_count += 1
t0 = time.time()
aes_key, fmt = quick_scan(h_process, regions, ciphertext)
elapsed = time.time() - t0
if aes_key:
print(f"\n{'='*60}", flush=True)
print(f"*** FOUND AES key! -> {fmt} ***", flush=True)
print(f"AES key: {aes_key}", flush=True)
print(f"XOR key: 0x{xor_key:02x}" if xor_key else "XOR key: unknown", flush=True)
print(f"{'='*60}", flush=True)
config['image_aes_key'] = aes_key
if xor_key is not None:
config['image_xor_key'] = xor_key
with open(config_path, 'w') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
print(f"Saved to {config_path}", flush=True)
verify_and_decrypt(attach_dir, aes_key, xor_key)
return
print(f" Scan #{scan_count}: no key found ({elapsed:.1f}s)", end='\r', flush=True)
# Wait 5 seconds before next scan
time.sleep(5)
# Refresh regions periodically (every 5 scans)
if scan_count % 5 == 0:
regions = get_rw_regions(h_process)
except KeyboardInterrupt:
print("\nStopped by user", flush=True)
finally:
kernel32.CloseHandle(h_process)
if __name__ == '__main__':
main()

View File

@ -10,6 +10,8 @@ import hmac as hmac_mod
from datetime import datetime
from Crypto.Cipher import AES
from mcp.server.fastmcp import FastMCP
import zstandard as zstd
from decode_image import ImageResolver
# ============ 加密常量 ============
PAGE_SZ = 4096
@ -34,6 +36,19 @@ DB_DIR = _cfg["db_dir"]
KEYS_FILE = _cfg["keys_file"]
DECRYPTED_DIR = _cfg["decrypted_dir"]
# 图片相关路径
_db_dir = _cfg["db_dir"]
if os.path.basename(_db_dir) == "db_storage":
WECHAT_BASE_DIR = os.path.dirname(_db_dir)
else:
WECHAT_BASE_DIR = _db_dir
DECODED_IMAGE_DIR = _cfg.get("decoded_image_dir")
if not DECODED_IMAGE_DIR:
DECODED_IMAGE_DIR = os.path.join(SCRIPT_DIR, "decoded_images")
elif not os.path.isabs(DECODED_IMAGE_DIR):
DECODED_IMAGE_DIR = os.path.join(SCRIPT_DIR, DECODED_IMAGE_DIR)
with open(KEYS_FILE) as f:
ALL_KEYS = json.load(f)
@ -240,12 +255,30 @@ def resolve_username(chat_name):
return None
_zstd_dctx = zstd.ZstdDecompressor()
def _decompress_content(content, ct):
"""解压 zstd 压缩的消息内容"""
if ct and ct == 4 and isinstance(content, bytes):
try:
return _zstd_dctx.decompress(content).decode('utf-8', errors='replace')
except Exception:
return None
if isinstance(content, bytes):
try:
return content.decode('utf-8', errors='replace')
except Exception:
return None
return content
def _parse_message_content(content, local_type, is_group):
"""解析消息内容,返回 (sender_id, text)"""
if content is None:
return '', ''
if isinstance(content, bytes):
return '', '(压缩内容)'
return '', '(二进制内容)'
sender = ''
text = content
@ -327,10 +360,13 @@ def get_recent_sessions(limit: int = 20) -> str:
display = names.get(username, username)
is_group = '@chatroom' in username
if isinstance(summary, bytes):
try:
summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace')
except Exception:
summary = '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
elif isinstance(summary, bytes):
summary = '(压缩内容)'
sender_display = ''
if is_group and sender:
@ -376,9 +412,9 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str:
conn = sqlite3.connect(db_path)
try:
rows = conn.execute(f"""
SELECT local_type, create_time, message_content, WCDB_CT_message_content
SELECT local_id, local_type, create_time, message_content,
WCDB_CT_message_content
FROM [{table_name}]
WHERE WCDB_CT_message_content = 0 OR WCDB_CT_message_content IS NULL
ORDER BY create_time DESC
LIMIT ?
""", (limit,)).fetchall()
@ -391,11 +427,21 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str:
return f"{display_name} 无消息记录"
lines = []
for local_type, create_time, content, ct in reversed(rows):
for local_id, local_type, create_time, content, ct in reversed(rows):
time_str = datetime.fromtimestamp(create_time).strftime('%m-%d %H:%M')
# zstd 解压
content = _decompress_content(content, ct)
if content is None:
content = '(无法解压)'
sender, text = _parse_message_content(content, local_type, is_group)
if local_type != 1:
if local_type == 3:
text = f"[图片] (local_id={local_id})"
elif local_type == 47:
text = "[表情]"
elif local_type != 1:
type_label = format_msg_type(local_type)
text = f"[{type_label}] {text}" if text else f"[{type_label}]"
@ -461,17 +507,20 @@ def search_messages(keyword: str, limit: int = 20) -> str:
try:
rows = conn.execute(f"""
SELECT local_type, create_time, message_content
SELECT local_type, create_time, message_content,
WCDB_CT_message_content
FROM [{tname}]
WHERE message_content LIKE ? AND
(WCDB_CT_message_content = 0 OR WCDB_CT_message_content IS NULL)
WHERE message_content LIKE ?
ORDER BY create_time DESC
LIMIT ?
""", (f'%{keyword}%', limit - len(results))).fetchall()
except Exception:
continue
for local_type, ts, content in rows:
for local_type, ts, content, ct in rows:
content = _decompress_content(content, ct)
if content is None:
continue
sender, text = _parse_message_content(content, local_type, is_group)
time_str = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M')
sender_name = ''
@ -577,10 +626,13 @@ def get_new_messages() -> str:
display = names.get(username, username)
is_group = '@chatroom' in username
summary = s['summary']
if isinstance(summary, bytes):
try:
summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace')
except Exception:
summary = '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
elif isinstance(summary, bytes):
summary = '(压缩内容)'
time_str = datetime.fromtimestamp(s['timestamp']).strftime('%H:%M')
tag = "[群]" if is_group else ""
unread_msgs.append(f"[{time_str}] {display}{tag} ({s['unread']}条未读): {summary}")
@ -597,10 +649,13 @@ def get_new_messages() -> str:
display = names.get(username, username)
is_group = '@chatroom' in username
summary = s['summary']
if isinstance(summary, bytes):
try:
summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace')
except Exception:
summary = '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
elif isinstance(summary, bytes):
summary = '(压缩内容)'
sender_display = ''
if is_group and s['sender']:
@ -626,5 +681,83 @@ def get_new_messages() -> str:
return f"{len(entries)} 条新消息:\n\n" + "\n".join(entries)
# ============ 图片解密 ============
_image_resolver = ImageResolver(WECHAT_BASE_DIR, DECODED_IMAGE_DIR, _cache)
@mcp.tool()
def decode_image(chat_name: str, local_id: int) -> str:
"""解密微信聊天中的一张图片。
先用 get_chat_history 查看消息图片消息会显示 local_id
然后用此工具解密对应图片
Args:
chat_name: 聊天对象的名字备注名或wxid
local_id: 图片消息的 local_id get_chat_history 获取
"""
username = resolve_username(chat_name)
if not username:
return f"找不到聊天对象: {chat_name}"
result = _image_resolver.decode_image(username, local_id)
if result['success']:
return (
f"解密成功!\n"
f" 文件: {result['path']}\n"
f" 格式: {result['format']}\n"
f" 大小: {result['size']:,} bytes\n"
f" MD5: {result['md5']}"
)
else:
error = result['error']
if 'md5' in result:
error += f"\n MD5: {result['md5']}"
return f"解密失败: {error}"
@mcp.tool()
def get_chat_images(chat_name: str, limit: int = 20) -> str:
"""列出某个聊天中的图片消息。
返回图片的时间local_idMD5文件大小等信息
可以配合 decode_image 工具解密指定图片
Args:
chat_name: 聊天对象的名字备注名或wxid
limit: 返回数量默认20
"""
username = resolve_username(chat_name)
if not username:
return f"找不到聊天对象: {chat_name}"
names = get_contact_names()
display_name = names.get(username, username)
db_path, table_name = _find_msg_table_for_user(username)
if not db_path:
return f"找不到 {display_name} 的消息记录"
images = _image_resolver.list_chat_images(db_path, table_name, username, limit)
if not images:
return f"{display_name} 无图片消息"
lines = []
for img in images:
time_str = datetime.fromtimestamp(img['create_time']).strftime('%Y-%m-%d %H:%M')
line = f"[{time_str}] local_id={img['local_id']}"
if img.get('md5'):
line += f" MD5={img['md5']}"
if img.get('size'):
size_kb = img['size'] / 1024
line += f" {size_kb:.0f}KB"
if not img.get('md5'):
line += " (无资源信息)"
lines.append(line)
return f"{display_name}{len(lines)} 张图片:\n\n" + "\n".join(lines)
if __name__ == "__main__":
mcp.run()

View File

@ -8,6 +8,9 @@ import hashlib, struct, os, sys, json, time, sqlite3, io
import hmac as hmac_mod
from datetime import datetime
from Crypto.Cipher import AES
import zstandard as zstd
_zstd_dctx = zstd.ZstdDecompressor()
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
@ -218,6 +221,11 @@ def main():
# 消息内容
summary = curr['summary']
if isinstance(summary, bytes):
try:
summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace')
except Exception:
summary = '(压缩内容)'
if summary:
# 群消息格式: "wxid_xxx:\n内容" - 提取内容部分
if ':\n' in summary:

View File

@ -6,13 +6,19 @@ http://localhost:5678
- 检测到变化后全量解密DB + 全量WAL patch
- SSE 服务器推送
"""
import hashlib, struct, os, sys, json, time, sqlite3, io, threading, queue
import hashlib, struct, os, sys, json, time, sqlite3, io, threading, queue, traceback
import hmac as hmac_mod
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from Crypto.Cipher import AES
import urllib.parse
import glob as glob_mod
import zstandard as zstd
from decode_image import extract_md5_from_packed_info, decrypt_dat_file, is_v2_format
_zstd_dctx = zstd.ZstdDecompressor()
PAGE_SZ = 4096
KEY_SZ = 32
@ -28,6 +34,11 @@ DB_DIR = _cfg["db_dir"]
KEYS_FILE = _cfg["keys_file"]
CONTACT_CACHE = os.path.join(_cfg["decrypted_dir"], "contact", "contact.db")
DECRYPTED_SESSION = os.path.join(_cfg["decrypted_dir"], "session", "session.db")
DECODED_IMAGE_DIR = _cfg.get("decoded_image_dir", os.path.join(os.path.dirname(os.path.abspath(__file__)), "decoded_images"))
MONITOR_CACHE_DIR = os.path.join(_cfg["decrypted_dir"], "_monitor_cache")
WECHAT_BASE_DIR = _cfg.get("wechat_base_dir", "")
IMAGE_AES_KEY = _cfg.get("image_aes_key") # V2 格式 AES key (从微信内存提取)
IMAGE_XOR_KEY = _cfg.get("image_xor_key", 0x88) # XOR key
POLL_MS = 30 # 高频轮询WAL/DB的mtime30ms一次
PORT = 5678
@ -37,6 +48,98 @@ sse_lock = threading.Lock()
messages_log = []
messages_lock = threading.Lock()
MAX_LOG = 500
_img_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix='img')
class MonitorDBCache:
"""轻量 DB 缓存mtime 检测变化时重新解密"""
def __init__(self, keys, tmp_dir):
self.keys = keys
self.tmp_dir = tmp_dir
os.makedirs(tmp_dir, exist_ok=True)
self._state = {} # rel_key → (db_mtime, wal_mtime)
def get(self, rel_key):
"""返回解密后的临时文件路径mtime 变化时自动重新解密"""
if rel_key not in self.keys:
return None
enc_key = bytes.fromhex(self.keys[rel_key]["enc_key"])
rel_path = rel_key.replace('\\', os.sep)
db_path = os.path.join(DB_DIR, rel_path)
wal_path = db_path + "-wal"
if not os.path.exists(db_path):
return None
try:
db_mtime = os.path.getmtime(db_path)
wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0
except OSError:
return None
out_name = rel_key.replace('\\', '_')
out_path = os.path.join(self.tmp_dir, out_name)
prev = self._state.get(rel_key)
if prev is None or db_mtime != prev[0]:
t0 = time.perf_counter()
full_decrypt(db_path, out_path, enc_key)
if os.path.exists(wal_path):
decrypt_wal_full(wal_path, out_path, enc_key)
ms = (time.perf_counter() - t0) * 1000
print(f" [cache] {rel_key} 全量解密 {ms:.0f}ms", flush=True)
self._state[rel_key] = (db_mtime, wal_mtime)
elif wal_mtime != prev[1]:
t0 = time.perf_counter()
decrypt_wal_full(wal_path, out_path, enc_key)
ms = (time.perf_counter() - t0) * 1000
print(f" [cache] {rel_key} WAL patch {ms:.0f}ms", flush=True)
self._state[rel_key] = (db_mtime, wal_mtime)
return out_path
def build_username_db_map():
"""从已解密的 Name2Id 表构建 username → [db_keys] 映射
同一个 username 可能存在于多个 message_N.db ,
DB 文件修改时间倒序排列最新的排前面
"""
# 先获取每个 DB 的 mtime 用于排序
db_mtimes = {}
for i in range(5):
rel_key = f"message\\message_{i}.db"
db_path = os.path.join(DB_DIR, "message", f"message_{i}.db")
try:
db_mtimes[rel_key] = os.path.getmtime(db_path)
except OSError:
db_mtimes[rel_key] = 0
mapping = {} # username → [db_keys], 最新的在前
decrypted_msg_dir = os.path.join(_cfg["decrypted_dir"], "message")
for i in range(5):
db_path = os.path.join(decrypted_msg_dir, f"message_{i}.db")
if not os.path.exists(db_path):
continue
rel_key = f"message\\message_{i}.db"
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
for row in conn.execute("SELECT user_name FROM Name2Id").fetchall():
if row[0] not in mapping:
mapping[row[0]] = []
mapping[row[0]].append(rel_key)
conn.close()
except Exception as e:
print(f" [WARN] Name2Id message_{i}.db: {e}", flush=True)
# 对每个 username 的 db_keys 按 mtime 倒序(最新的优先)
for username in mapping:
mapping[username].sort(key=lambda k: db_mtimes.get(k, 0), reverse=True)
return mapping
def decrypt_page(enc_key, page_data, pgno):
@ -156,7 +259,12 @@ def msg_type_icon(t):
def broadcast_sse(msg_data):
payload = f"data: {json.dumps(msg_data, ensure_ascii=False)}\n\n"
event_type = msg_data.get('event', '')
data_line = f"data: {json.dumps(msg_data, ensure_ascii=False)}\n"
if event_type:
payload = f"event: {event_type}\n{data_line}\n"
else:
payload = f"{data_line}\n"
with sse_lock:
dead = []
for q in sse_clients:
@ -171,15 +279,180 @@ def broadcast_sse(msg_data):
# ============ 监听器 ============
class SessionMonitor:
def __init__(self, enc_key, session_db, contact_names):
def __init__(self, enc_key, session_db, contact_names, db_cache=None, username_db_map=None):
self.enc_key = enc_key
self.session_db = session_db
self.wal_path = session_db + "-wal"
self.contact_names = contact_names
self.db_cache = db_cache
self.username_db_map = username_db_map or {}
self.prev_state = {}
self.decrypt_ms = 0
self.patched_pages = 0
def resolve_image(self, username, timestamp):
"""解密图片: username+timestamp → 解密后的图片文件名,失败返回 None"""
if not self.db_cache or not self.username_db_map:
return None
# 1. 找到 username 对应的所有 message_N.db按 mtime 倒序)
db_keys = self.username_db_map.get(username)
if not db_keys:
return None
# 2. 遍历候选 DB找到包含该 timestamp 消息的那个
table_name = f"Msg_{hashlib.md5(username.encode()).hexdigest()}"
local_id = None
for db_key in db_keys:
msg_db_path = self.db_cache.get(db_key)
if not msg_db_path:
continue
try:
conn = sqlite3.connect(f"file:{msg_db_path}?mode=ro", uri=True)
# 精确匹配 timestamp
row = conn.execute(f"""
SELECT local_id FROM [{table_name}]
WHERE local_type = 3 AND create_time = ?
""", (timestamp,)).fetchone()
if not row:
# 模糊匹配±3秒内最近的图片消息
row = conn.execute(f"""
SELECT local_id FROM [{table_name}]
WHERE local_type = 3 AND ABS(create_time - ?) <= 3
ORDER BY ABS(create_time - ?) LIMIT 1
""", (timestamp, timestamp)).fetchone()
conn.close()
if row:
local_id = row[0]
break
except Exception as e:
print(f" [img] 查询 {db_key}/{table_name} 失败: {e}", flush=True)
if not local_id:
print(f" [img] 未找到 local_id: {username} t={timestamp}", flush=True)
return None
# 4. 查 message_resource.db 获取 MD5
# local_id 不全局唯一,需要同时匹配 create_time
res_path = self.db_cache.get("message\\message_resource.db")
if not res_path:
return None
file_md5 = None
try:
conn = sqlite3.connect(f"file:{res_path}?mode=ro", uri=True)
row = conn.execute(
"SELECT packed_info FROM MessageResourceInfo "
"WHERE message_local_id = ? AND message_create_time = ? AND message_local_type = 3",
(local_id, timestamp)
).fetchone()
if not row:
# 降级: 只用 create_time + type
row = conn.execute(
"SELECT packed_info FROM MessageResourceInfo "
"WHERE message_create_time = ? AND message_local_type = 3",
(timestamp,)
).fetchone()
conn.close()
if row and row[0]:
file_md5 = extract_md5_from_packed_info(row[0])
except Exception as e:
print(f" [img] 查询 message_resource 失败: {e}", flush=True)
return None
if not file_md5:
print(f" [img] 未找到 MD5: local_id={local_id} t={timestamp}", flush=True)
return None
# 5. 查找 .dat 文件
attach_dir = os.path.join(WECHAT_BASE_DIR, "msg", "attach")
username_hash = hashlib.md5(username.encode()).hexdigest()
search_base = os.path.join(attach_dir, username_hash)
if not os.path.isdir(search_base):
print(f" [img] attach 目录不存在: {search_base}", flush=True)
return None
pattern = os.path.join(search_base, "*", "Img", f"{file_md5}*.dat")
dat_files = sorted(glob_mod.glob(pattern))
if not dat_files:
print(f" [img] 未找到 .dat: MD5={file_md5}", flush=True)
return None
# 优先原图,然后高清 _h最后缩略图 _t
selected = dat_files[0]
for f in dat_files:
fname = os.path.basename(f)
if not fname.startswith(file_md5 + '_'):
selected = f
break
for f in dat_files:
if f.endswith('_h.dat'):
selected = f
break
# 6. 解密图片
os.makedirs(DECODED_IMAGE_DIR, exist_ok=True)
out_base = os.path.join(DECODED_IMAGE_DIR, file_md5)
# 已解密则跳过
for ext in ('jpg', 'png', 'gif', 'webp', 'bmp', 'tif'):
candidate = f"{out_base}.{ext}"
if os.path.exists(candidate):
return os.path.basename(candidate)
# V2 新格式需要 AES key
if is_v2_format(selected) and not IMAGE_AES_KEY:
print(f" [img] V2 格式缺少 AES key: {os.path.basename(selected)}", flush=True)
print(f" [img] 请运行 find_image_key.py 提取密钥", flush=True)
return '__v2_unsupported__'
result_path, fmt = decrypt_dat_file(selected, f"{out_base}.tmp", IMAGE_AES_KEY, IMAGE_XOR_KEY)
if not result_path:
print(f" [img] 解密失败: {selected}", flush=True)
return None
final = f"{out_base}.{fmt}"
if os.path.exists(final):
os.unlink(final)
os.rename(result_path, final)
size_kb = os.path.getsize(final) / 1024
print(f" [img] 解密成功: {os.path.basename(final)} ({size_kb:.0f}KB)", flush=True)
return os.path.basename(final)
def _async_resolve_image(self, username, timestamp, msg_data):
"""后台线程: 解密图片并通过 SSE 推送更新"""
for attempt in range(3):
try:
img_name = self.resolve_image(username, timestamp)
if img_name == '__v2_unsupported__':
# V2 新加密格式,显示占位提示
msg_data['content'] = '[图片 - 新加密格式暂不支持预览]'
broadcast_sse({
'event': 'image_update',
'timestamp': timestamp,
'username': username,
'v2_unsupported': True,
})
return
elif img_name:
image_url = f'/img/{img_name}'
msg_data['image_url'] = image_url
broadcast_sse({
'event': 'image_update',
'timestamp': timestamp,
'username': username,
'image_url': image_url,
})
print(f" [img] 异步解密成功: {img_name}", flush=True)
return
elif attempt < 2:
time.sleep(1.5)
except Exception as e:
print(f" [img] 异步解密失败(attempt={attempt}): {e}", flush=True)
if attempt < 2:
time.sleep(1.5)
def query_state(self):
"""查询已解密副本的session状态"""
conn = sqlite3.connect(f"file:{DECRYPTED_SESSION}?mode=ro", uri=True)
@ -237,10 +510,15 @@ class SessionMonitor:
sender = self.contact_names.get(curr['sender'], curr['sender_name'] or curr['sender'])
summary = curr['summary']
if isinstance(summary, bytes):
try:
summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace')
except Exception:
summary = '(压缩内容)'
if summary and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
new_msgs.append({
msg_data = {
'time': datetime.fromtimestamp(curr['timestamp']).strftime('%H:%M:%S'),
'timestamp': curr['timestamp'],
'chat': display,
@ -253,7 +531,16 @@ class SessionMonitor:
'unread': curr['unread'],
'decrypt_ms': round(self.decrypt_ms, 1),
'pages': self.patched_pages,
})
}
new_msgs.append(msg_data)
# 图片消息: 后台异步解密(不阻塞轮询)
if curr['msg_type'] == 3:
_img_executor.submit(
self._async_resolve_image,
username, curr['timestamp'], msg_data
)
# 按时间排序
new_msgs.sort(key=lambda m: m['timestamp'])
@ -281,8 +568,8 @@ class SessionMonitor:
self.prev_state = curr_state
def monitor_thread(enc_key, session_db, contact_names):
mon = SessionMonitor(enc_key, session_db, contact_names)
def monitor_thread(enc_key, session_db, contact_names, db_cache=None, username_db_map=None):
mon = SessionMonitor(enc_key, session_db, contact_names, db_cache, username_db_map)
wal_path = mon.wal_path
# 初始全量解密
@ -372,6 +659,8 @@ body{font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,sans-serif;b
.msg-unread{font-size:10px;padding:1px 6px;border-radius:8px;background:rgba(244,67,54,.2);color:#ef9a9a;font-weight:600}
.msg-perf{font-size:9px;color:#333}
.msg-content{font-size:13px;line-height:1.4;color:#bbb;word-break:break-all;padding-left:63px}
.msg-img{max-width:300px;max-height:200px;border-radius:8px;cursor:pointer;margin-top:4px;transition:transform .2s}
.msg-img:hover{transform:scale(1.02)}
.empty{text-align:center;padding:80px 20px;color:#444}
.empty .icon{font-size:48px;margin-bottom:12px}
::-webkit-scrollbar{width:4px}
@ -415,7 +704,13 @@ function addMsg(m, animate){
const ur=m.unread>0?`<span class="msg-unread">${m.unread}</span>`:'';
const cc=m.is_group?'msg-chat grp':'msg-chat';
d.innerHTML=`<div class="msg-header"><span class="msg-time">${m.time}</span><span class="${cc}">${esc(m.chat)}</span>${sn}<div class="msg-r"><span class="msg-type">${m.type_icon} ${m.type}</span>${ur}</div></div><div class="msg-content">${esc(m.content||'')}</div>`;
let contentHtml = esc(m.content||'');
if(m.image_url){
contentHtml = `<img class="msg-img" src="${m.image_url}" onclick="window.open('${m.image_url}','_blank')" onerror="this.style.display='none';this.nextElementSibling.style.display='inline'" /><span style="display:none">${esc(m.content||'')}</span>`;
}
const dk=m.timestamp+'|'+(m.username||m.chat);
d.innerHTML=`<div class="msg-header"><span class="msg-time">${m.time}</span><span class="${cc}">${esc(m.chat)}</span>${sn}<div class="msg-r"><span class="msg-type">${m.type_icon} ${m.type}</span>${ur}</div></div><div class="msg-content" data-key="${dk}">${contentHtml}</div>`;
M.insertBefore(d, M.firstChild);
@ -438,6 +733,22 @@ function connectSSE(){
es.onmessage=ev=>{
addMsg(JSON.parse(ev.data), true); // 新消息有动画
};
es.addEventListener('image_update', ev=>{
const d=JSON.parse(ev.data);
const key=d.timestamp+'|'+(d.username||'');
const msgs=M.querySelectorAll('.msg');
for(const el of msgs){
const ct=el.querySelector('.msg-content');
if(ct && ct.dataset.key===key){
if(d.v2_unsupported){
ct.innerHTML='<span style="color:#999;font-style:italic">[图片 - 新加密格式暂不支持预览]</span>';
} else if(d.image_url){
ct.innerHTML=`<img class="msg-img" src="${d.image_url}" onclick="window.open('${d.image_url}','_blank')" onerror="this.style.display='none'" />`;
}
break;
}
}
});
es.onerror=()=>{
S.textContent='重连...';
S.className='status err';
@ -481,6 +792,32 @@ class Handler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
elif self.path.startswith('/img/'):
filename = urllib.parse.unquote(self.path[5:])
# 安全: 防目录穿越
if '/' in filename or '\\' in filename or '..' in filename:
self.send_error(403)
return
filepath = os.path.join(DECODED_IMAGE_DIR, filename)
if not os.path.isfile(filepath):
self.send_error(404)
return
ext = os.path.splitext(filename)[1].lower()
ct = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
'.png': 'image/png', '.gif': 'image/gif',
'.webp': 'image/webp', '.bmp': 'image/bmp',
'.tif': 'image/tiff',
}.get(ext, 'application/octet-stream')
with open(filepath, 'rb') as f:
data = f.read()
self.send_response(200)
self.send_header('Content-Type', ct)
self.send_header('Content-Length', str(len(data)))
self.send_header('Cache-Control', 'public, max-age=86400')
self.end_headers()
self.wfile.write(data)
elif self.path == '/stream':
self.send_response(200)
self.send_header('Content-Type', 'text/event-stream')
@ -529,7 +866,20 @@ def main():
contact_names = load_contact_names()
print(f"已加载 {len(contact_names)} 个联系人", flush=True)
t = threading.Thread(target=monitor_thread, args=(enc_key, session_db, contact_names), daemon=True)
print("构建 username→DB 映射...", flush=True)
username_db_map = build_username_db_map()
print(f"已映射 {len(username_db_map)} 个用户名", flush=True)
db_cache = MonitorDBCache(keys, MONITOR_CACHE_DIR)
# 后台预热 message_resource.db图片解密必需
def _warmup():
t0 = time.perf_counter()
db_cache.get("message\\message_resource.db")
print(f"[warmup] message_resource.db 预热完成 {(time.perf_counter()-t0)*1000:.0f}ms", flush=True)
threading.Thread(target=_warmup, daemon=True).start()
t = threading.Thread(target=monitor_thread, args=(enc_key, session_db, contact_names, db_cache, username_db_map), daemon=True)
t.start()
server = ThreadedServer(('0.0.0.0', PORT), Handler)