diff --git a/.gitignore b/.gitignore index e4d09eb..db821c3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ all_keys.json wechat_key.txt config.json decrypted/ +decoded_images/ *.db *.db-shm *.db-wal diff --git a/config.py b/config.py index ef1828a..a7055b4 100644 --- a/config.py +++ b/config.py @@ -12,6 +12,7 @@ _DEFAULT = { "db_dir": r"D:\xwechat_files\your_wxid\db_storage", "keys_file": "all_keys.json", "decrypted_dir": "decrypted", + "decoded_image_dir": "decoded_images", "wechat_process": "Weixin.exe", } @@ -29,8 +30,21 @@ def load_config(): # 将相对路径转为绝对路径 base = os.path.dirname(os.path.abspath(__file__)) - for key in ("keys_file", "decrypted_dir"): + for key in ("keys_file", "decrypted_dir", "decoded_image_dir"): if key in cfg and not os.path.isabs(cfg[key]): cfg[key] = os.path.join(base, cfg[key]) + # 自动推导微信数据根目录(db_dir 的上级目录) + # db_dir 格式: D:\xwechat_files\\db_storage + # base_dir 格式: D:\xwechat_files\ + db_dir = cfg.get("db_dir", "") + if db_dir and os.path.basename(db_dir) == "db_storage": + cfg["wechat_base_dir"] = os.path.dirname(db_dir) + else: + cfg["wechat_base_dir"] = db_dir + + # decoded_image_dir 默认值 + if "decoded_image_dir" not in cfg: + cfg["decoded_image_dir"] = os.path.join(base, "decoded_images") + return cfg diff --git a/decode_image.py b/decode_image.py new file mode 100644 index 0000000..0eb510c --- /dev/null +++ b/decode_image.py @@ -0,0 +1,470 @@ +r""" +微信图片 .dat 文件解密模块 + +支持两种加密格式: + - 旧格式: 单字节 XOR 加密,key 通过对比文件头与已知图片 magic bytes 自动检测 + - V2 格式 (2025-08+): AES-128-ECB + XOR 混合加密,需要从微信进程内存提取 AES key + +V2 文件结构: + [6B signature: 07 08 V2 08 07] [4B aes_size LE] [4B xor_size LE] [1B padding] + [aligned_aes_size bytes AES-ECB] [raw_data] [xor_size bytes XOR] + +文件路径格式: + D:\xwechat_files\\msg\attach\\\Img\[_t|_h].dat + +映射链: + message_*.db (local_id) → message_resource.db (packed_info 含 MD5) → .dat 文件 → 解密 +""" + +import os +import sys +import glob +import hashlib +import sqlite3 +import struct + +# V2 格式完整 magic (6 bytes) +V2_MAGIC = b'\x07\x08\x56\x32' # 前 4 字节用于快速检测 +V2_MAGIC_FULL = b'\x07\x08V2\x08\x07' # 完整 6 字节签名 +V1_MAGIC_FULL = b'\x07\x08V1\x08\x07' # V1 签名 (固定 key) + +# 常见图片格式的 magic bytes (按长度降序排列,避免短 magic 假阳性) +IMAGE_MAGIC = { + 'png': [0x89, 0x50, 0x4E, 0x47], + 'gif': [0x47, 0x49, 0x46, 0x38], + 'tif': [0x49, 0x49, 0x2A, 0x00], # little-endian TIFF + 'webp': [0x52, 0x49, 0x46, 0x46], # RIFF header + 'jpg': [0xFF, 0xD8, 0xFF], + # BMP 只有 2 字节 magic,容易假阳性,需要额外验证 +} + + +def is_v2_format(dat_path): + """检测是否是微信 V2 加密格式 (2025-08+)""" + try: + with open(dat_path, 'rb') as f: + magic = f.read(4) + return magic == V2_MAGIC + except (OSError, IOError): + return False + + +def detect_xor_key(dat_path): + """通过对比文件头和已知图片 magic bytes 自动检测 XOR key + + 返回 key (int) 或 None。V2 格式文件返回 None。 + """ + with open(dat_path, 'rb') as f: + header = f.read(16) + + if len(header) < 4: + return None + + # V2 新格式无法用 XOR 解密 + if header[:4] == V2_MAGIC: + return None + + # 先尝试 3+ 字节 magic 的格式(可靠匹配) + for fmt, magic in IMAGE_MAGIC.items(): + key = header[0] ^ magic[0] + match = True + for i in range(1, len(magic)): + if i >= len(header): + break + if (header[i] ^ key) != magic[i]: + match = False + break + if match: + return key + + # 最后尝试 BMP (2 字节 magic,需要额外验证) + bmp_magic = [0x42, 0x4D] + key = header[0] ^ bmp_magic[0] + if len(header) >= 2 and (header[1] ^ key) == bmp_magic[1]: + # 额外验证: XOR 解密后检查 BMP file size 和 offset 字段 + if len(header) >= 14: + dec = bytes(b ^ key for b in header[:14]) + bmp_size = struct.unpack_from('= 12 and header_bytes[8:12] == b'WEBP': + return 'webp' + if header_bytes[:4] == bytes([0x49, 0x49, 0x2A, 0x00]): + return 'tif' + return 'bin' + + +def v2_decrypt_file(dat_path, out_path=None, aes_key=None, xor_key=0x88): + """解密 V2 格式 .dat 文件 (AES-ECB + XOR) + + Args: + dat_path: V2 .dat 文件路径 + out_path: 输出路径 (None 则自动命名) + aes_key: 16 字节 AES key (bytes 或 str) + xor_key: XOR key (int, 默认 0x88) + + Returns: + (output_path, format) 或 (None, None) + """ + if aes_key is None: + return None, None + + from Crypto.Cipher import AES + from Crypto.Util import Padding + + # 确保 key 是 16 字节 bytes + if isinstance(aes_key, str): + aes_key = aes_key.encode('ascii')[:16] + if len(aes_key) < 16: + return None, None + + with open(dat_path, 'rb') as f: + data = f.read() + + if len(data) < 15: + return None, None + + # 解析 header + sig = data[:6] + if sig not in (V2_MAGIC_FULL, V1_MAGIC_FULL): + return None, None + + aes_size, xor_size = struct.unpack_from('= aes_size,向上对齐到 16 + # 当 aes_size 是 16 的倍数时,还需要加 16 (完整填充块) + aligned_aes_size = aes_size + aligned_aes_size -= ~(~aligned_aes_size % 16) # 同 wx-dat 的公式 + + offset = 15 + if offset + aligned_aes_size > len(data): + return None, None + + # AES-ECB 解密 + aes_data = data[offset:offset + aligned_aes_size] + try: + cipher = AES.new(aes_key[:16], AES.MODE_ECB) + dec_aes = Padding.unpad(cipher.decrypt(aes_data), AES.block_size) + except (ValueError, KeyError): + return None, None + offset += aligned_aes_size + + # Raw 部分 (不加密) + raw_end = len(data) - xor_size + raw_data = data[offset:raw_end] if offset < raw_end else b'' + offset = raw_end + + # XOR 部分 + xor_data = data[offset:] + dec_xor = bytes(b ^ xor_key for b in xor_data) + + decrypted = dec_aes + raw_data + dec_xor + fmt = detect_image_format(decrypted[:16]) + + # wxgf (HEVC 裸流) 格式 + if decrypted[:4] == b'wxgf': + fmt = 'hevc' + + if out_path is None: + base = os.path.splitext(dat_path)[0] + for suffix in ('_t', '_h'): + if base.endswith(suffix): + base = base[:-len(suffix)] + break + out_path = f"{base}.{fmt}" + + os.makedirs(os.path.dirname(out_path), exist_ok=True) + with open(out_path, 'wb') as f: + f.write(decrypted) + + return out_path, fmt + + +def xor_decrypt_file(dat_path, out_path=None, key=None): + """解密单个 .dat 文件,返回 (output_path, format)""" + if key is None: + key = detect_xor_key(dat_path) + if key is None: + return None, None + + with open(dat_path, 'rb') as f: + data = f.read() + + decrypted = bytes(b ^ key for b in data) + fmt = detect_image_format(decrypted[:16]) + + if out_path is None: + base = os.path.splitext(dat_path)[0] + # 去掉 _t, _h 后缀 + for suffix in ('_t', '_h'): + if base.endswith(suffix): + base = base[:-len(suffix)] + break + out_path = f"{base}.{fmt}" + + os.makedirs(os.path.dirname(out_path), exist_ok=True) + with open(out_path, 'wb') as f: + f.write(decrypted) + + return out_path, fmt + + +def decrypt_dat_file(dat_path, out_path=None, aes_key=None, xor_key=0x88): + """智能解密 .dat 文件 (自动检测格式) + + Args: + dat_path: .dat 文件路径 + out_path: 输出路径 + aes_key: V2 格式的 AES key (str 或 bytes, 16 字节) + xor_key: XOR key (int) + + Returns: + (output_path, format) 或 (None, None) + """ + with open(dat_path, 'rb') as f: + head = f.read(6) + + # V2 新格式 + if head == V2_MAGIC_FULL: + return v2_decrypt_file(dat_path, out_path, aes_key, xor_key) + + # V1 格式 (固定 AES key) + if head == V1_MAGIC_FULL: + return v2_decrypt_file(dat_path, out_path, b'cfcd208495d565ef', xor_key) + + # 旧 XOR 格式 + return xor_decrypt_file(dat_path, out_path) + + +def extract_md5_from_packed_info(blob): + """从 message_resource.db 的 packed_info (protobuf) 中提取文件 MD5 + + 格式: ... \\x12\\x22\\x0a\\x20 + 32 字节 ASCII hex MD5 ... + """ + if not blob or not isinstance(blob, bytes): + return None + + # 查找 protobuf 标记 + marker = b'\x12\x22\x0a\x20' + idx = blob.find(marker) + if idx >= 0 and idx + len(marker) + 32 <= len(blob): + md5_bytes = blob[idx + len(marker): idx + len(marker) + 32] + try: + md5_str = md5_bytes.decode('ascii') + # 验证是合法的 hex 字符串 + int(md5_str, 16) + return md5_str + except (UnicodeDecodeError, ValueError): + pass + + # 备用方案:扫描 32 字节连续 hex 字符 + hex_chars = set(b'0123456789abcdef') + i = 0 + while i <= len(blob) - 32: + if blob[i] in hex_chars: + candidate = blob[i:i+32] + if all(b in hex_chars for b in candidate): + try: + return candidate.decode('ascii') + except UnicodeDecodeError: + pass + i += 32 + else: + i += 1 + + return None + + +class ImageResolver: + """封装从 local_id 到图片文件的完整解析链""" + + def __init__(self, wechat_base_dir, decoded_image_dir, cache): + """ + Args: + wechat_base_dir: 微信数据根目录 (如 D:\\xwechat_files\\) + decoded_image_dir: 解密图片输出目录 + cache: DBCache 实例,用于解密 message_resource.db + """ + self.base_dir = wechat_base_dir + self.attach_dir = os.path.join(wechat_base_dir, "msg", "attach") + self.out_dir = decoded_image_dir + self.cache = cache + + def get_image_md5(self, local_id): + """通过 local_id 查 message_resource.db 获取图片文件 MD5""" + path = self.cache.get("message\\message_resource.db") + if not path: + return None + + conn = sqlite3.connect(path) + try: + row = conn.execute( + "SELECT packed_info FROM MessageResourceInfo WHERE local_id = ?", + (local_id,) + ).fetchone() + if row and row[0]: + return extract_md5_from_packed_info(row[0]) + except Exception: + pass + finally: + conn.close() + + return None + + def find_dat_files(self, username, file_md5): + """在 attach 目录下查找对应的 .dat 文件 + + 路径: attach///Img/[_t|_h].dat + """ + username_hash = hashlib.md5(username.encode()).hexdigest() + search_base = os.path.join(self.attach_dir, username_hash) + + if not os.path.isdir(search_base): + return [] + + # 在所有月份目录下搜索 + results = [] + pattern = os.path.join(search_base, "*", "Img", f"{file_md5}*.dat") + for p in glob.glob(pattern): + results.append(p) + + return sorted(results) + + def decode_image(self, username, local_id): + """完整流程:local_id → MD5 → .dat → 解密 + + Returns: + dict with keys: success, path, format, md5, error + """ + # 1. 获取 MD5 + file_md5 = self.get_image_md5(local_id) + if not file_md5: + return {'success': False, 'error': f'无法从 message_resource.db 找到 local_id={local_id} 的图片信息'} + + # 2. 找 .dat 文件 + dat_files = self.find_dat_files(username, file_md5) + if not dat_files: + return {'success': False, 'error': f'找不到 .dat 文件 (MD5={file_md5})', 'md5': file_md5} + + # 优先选标准版(非 _t/_h),然后高清 _h,最后缩略图 _t + selected = dat_files[0] + for f in dat_files: + fname = os.path.basename(f) + if not fname.startswith(file_md5 + '_'): + selected = f + break + for f in dat_files: + if f.endswith('_h.dat'): + selected = f + break + + # 3. 解密 + out_name = f"{file_md5}" + out_path_base = os.path.join(self.out_dir, out_name) + + result_path, fmt = xor_decrypt_file(selected, f"{out_path_base}.tmp") + if not result_path: + return {'success': False, 'error': f'无法检测 XOR key (文件: {selected})', 'md5': file_md5} + + # 重命名为正确扩展名 + final_path = f"{out_path_base}.{fmt}" + if os.path.exists(final_path): + os.unlink(final_path) + os.rename(result_path, final_path) + + return { + 'success': True, + 'path': final_path, + 'format': fmt, + 'md5': file_md5, + 'source': selected, + 'size': os.path.getsize(final_path), + } + + def list_chat_images(self, db_path, table_name, username, limit=20): + """列出某个聊天中的所有图片消息""" + conn = sqlite3.connect(db_path) + try: + rows = conn.execute(f""" + SELECT local_id, create_time + FROM [{table_name}] + WHERE local_type = 3 + ORDER BY create_time DESC + LIMIT ? + """, (limit,)).fetchall() + except Exception as e: + conn.close() + return [] + conn.close() + + results = [] + for local_id, create_time in rows: + file_md5 = self.get_image_md5(local_id) + info = { + 'local_id': local_id, + 'create_time': create_time, + 'md5': file_md5, + } + if file_md5: + dat_files = self.find_dat_files(username, file_md5) + if dat_files: + info['dat_file'] = dat_files[0] + try: + info['size'] = os.path.getsize(dat_files[0]) + except OSError: + pass + results.append(info) + + return results + + +# ============ CLI 测试 ============ + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("用法: python decode_image.py [output_file]") + print(" 解密单个 .dat 文件") + sys.exit(1) + + dat_file = sys.argv[1] + out_file = sys.argv[2] if len(sys.argv) > 2 else None + + if not os.path.exists(dat_file): + print(f"文件不存在: {dat_file}") + sys.exit(1) + + key = detect_xor_key(dat_file) + if key is None: + print("无法检测 XOR key,文件可能不是微信加密图片") + sys.exit(1) + + print(f"检测到 XOR key: 0x{key:02X}") + + result_path, fmt = xor_decrypt_file(dat_file, out_file, key) + if result_path: + size = os.path.getsize(result_path) + print(f"解密成功: {result_path}") + print(f"格式: {fmt}, 大小: {size:,} bytes") + else: + print("解密失败") diff --git a/find_image_key.py b/find_image_key.py new file mode 100644 index 0000000..6800696 --- /dev/null +++ b/find_image_key.py @@ -0,0 +1,410 @@ +"""从微信进程内存中提取图片 AES 密钥 (V2 .dat 格式) + +V2 .dat 文件结构: + [6B signature: 07 08 V2 08 07] [4B aes_size LE] [4B xor_size LE] [1B padding] + [aes_size bytes AES-ECB encrypted] [raw_data unencrypted] [xor_size bytes XOR encrypted] + +AES key: 16-byte ASCII string found in Weixin.exe process memory +XOR key: single byte, same as old format (derived from JPEG FF D9 ending) + +Usage: + 1. 打开微信, 进入聊天/朋友圈, 点击查看 2-3 张图片 + 2. 立即运行: python find_image_key.py +""" +import os +import sys +import re +import struct +import glob +import json +import time +import ctypes +from ctypes import wintypes +from Crypto.Cipher import AES +from Crypto.Util import Padding + +# Windows API constants +PROCESS_ALL_ACCESS = 0x1F0FFF +PROCESS_VM_READ = 0x0010 +PROCESS_QUERY_INFORMATION = 0x0400 +MEM_COMMIT = 0x1000 +PAGE_NOACCESS = 0x01 +PAGE_GUARD = 0x100 +PAGE_READWRITE = 0x04 +PAGE_WRITECOPY = 0x08 +PAGE_EXECUTE_READWRITE = 0x40 +PAGE_EXECUTE_WRITECOPY = 0x80 + +class MEMORY_BASIC_INFORMATION(ctypes.Structure): + _fields_ = [ + ("BaseAddress", ctypes.c_void_p), + ("AllocationBase", ctypes.c_void_p), + ("AllocationProtect", wintypes.DWORD), + ("RegionSize", ctypes.c_size_t), + ("State", wintypes.DWORD), + ("Protect", wintypes.DWORD), + ("Type", wintypes.DWORD), + ] + +kernel32 = ctypes.windll.kernel32 + +# 正则: 精确 32 字符 alphanum (前后是非 alphanum 或边界) +RE_KEY32 = re.compile(rb'(?= 2: + pids.append(int(parts[1])) + return pids + + +def find_v2_ciphertext(attach_dir): + """从多个 V2 .dat 文件中提取第一个 AES 密文块 (16 bytes)""" + v2_magic = b'\x07\x08V2\x08\x07' + + # Search _t.dat (thumbnails, likely JPEG) + pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat") + dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) + + for f in dat_files[:100]: + try: + with open(f, 'rb') as fp: + header = fp.read(31) + if header[:6] == v2_magic and len(header) >= 31: + return header[15:31], os.path.basename(f) + except: + continue + return None, None + + +def find_xor_key(attach_dir): + """从缩略图文件末尾推导 XOR key (JPEG 结尾 FF D9)""" + v2_magic = b'\x07\x08V2\x08\x07' + pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat") + dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) + + tail_counts = {} + for f in dat_files[:32]: + try: + sz = os.path.getsize(f) + with open(f, 'rb') as fp: + head = fp.read(6) + fp.seek(sz - 2) + tail = fp.read(2) + if head == v2_magic and len(tail) == 2: + key = (tail[0], tail[1]) + tail_counts[key] = tail_counts.get(key, 0) + 1 + except: + continue + + if not tail_counts: + return None + + most_common = max(tail_counts, key=tail_counts.get) + x, y = most_common + xor_key = x ^ 0xFF + check = y ^ 0xD9 + + if xor_key == check: + return xor_key + return xor_key # return best guess anyway + + +def try_key(key_bytes, ciphertext): + """Try decrypting ciphertext with key, return format name if successful""" + try: + cipher = AES.new(key_bytes, AES.MODE_ECB) + dec = cipher.decrypt(ciphertext) + if dec[:3] == b'\xFF\xD8\xFF': + return 'JPEG' + if dec[:4] == bytes([0x89, 0x50, 0x4E, 0x47]): + return 'PNG' + if dec[:4] == b'RIFF': + return 'WEBP' + if dec[:4] == b'wxgf': + return 'WXGF' + if dec[:3] == b'GIF': + return 'GIF' + except: + pass + return None + + +def is_rw_protect(protect): + """Check if memory region is readable/writable (where string keys live)""" + rw_flags = (PAGE_READWRITE | PAGE_WRITECOPY | + PAGE_EXECUTE_READWRITE | PAGE_EXECUTE_WRITECOPY) + return (protect & rw_flags) != 0 + + +def scan_memory_for_aes_key(pid, ciphertext): + """扫描微信进程内存寻找 AES key (regex 加速版)""" + access = PROCESS_VM_READ | PROCESS_QUERY_INFORMATION + h_process = kernel32.OpenProcess(access, False, pid) + if not h_process: + print(f" 无法打开进程 {pid} (尝试以管理员运行)", flush=True) + return None + + try: + # Enumerate memory regions + address = 0 + mbi = MEMORY_BASIC_INFORMATION() + rw_regions = [] + all_regions = [] + + while address < 0x7FFFFFFFFFFF: + result = kernel32.VirtualQueryEx( + h_process, ctypes.c_void_p(address), + ctypes.byref(mbi), ctypes.sizeof(mbi) + ) + if result == 0: + break + if (mbi.State == MEM_COMMIT and + mbi.Protect != PAGE_NOACCESS and + (mbi.Protect & PAGE_GUARD) == 0 and + mbi.RegionSize <= 50 * 1024 * 1024): + region = (mbi.BaseAddress, mbi.RegionSize, mbi.Protect) + all_regions.append(region) + if is_rw_protect(mbi.Protect): + rw_regions.append(region) + next_addr = address + mbi.RegionSize + if next_addr <= address: + break + address = next_addr + + rw_mb = sum(r[1] for r in rw_regions) / 1024 / 1024 + all_mb = sum(r[1] for r in all_regions) / 1024 / 1024 + print(f" RW 区域: {len(rw_regions)} ({rw_mb:.0f} MB), 总计: {len(all_regions)} ({all_mb:.0f} MB)", flush=True) + + # Phase 1: 只扫描 RW 区域 (key 字符串最可能在这里) + print(" === Phase 1: 扫描 RW 内存 ===", flush=True) + result = _scan_regions(h_process, rw_regions, ciphertext) + if result: + return result + + # Phase 2: 扫描所有可读区域 + print(" === Phase 2: 扫描所有内存 ===", flush=True) + # 排除已扫描的 RW 区域 + rw_set = set((r[0], r[1]) for r in rw_regions) + other_regions = [r for r in all_regions if (r[0], r[1]) not in rw_set] + result = _scan_regions(h_process, other_regions, ciphertext) + if result: + return result + + return None + + finally: + kernel32.CloseHandle(h_process) + + +def _scan_regions(h_process, regions, ciphertext): + """扫描指定内存区域列表,返回找到的 key 或 None""" + candidates_32 = 0 + candidates_16 = 0 + t0 = time.time() + + for idx, (base_addr, region_size, _protect) in enumerate(regions): + if idx % 100 == 0: + elapsed = time.time() - t0 + print(f" 扫描 {idx}/{len(regions)} ({elapsed:.1f}s)", end='\r', flush=True) + + buffer = ctypes.create_string_buffer(region_size) + bytes_read = ctypes.c_size_t(0) + ok = kernel32.ReadProcessMemory( + h_process, ctypes.c_void_p(base_addr), + buffer, region_size, ctypes.byref(bytes_read) + ) + if not ok or bytes_read.value < 32: + continue + + data = buffer.raw[:bytes_read.value] + + # 用正则找 32 字符 alphanum (C 级速度) + for m in RE_KEY32.finditer(data): + key_bytes = m.group() + candidates_32 += 1 + + # 前 16 字符作为 AES-128 key + fmt = try_key(key_bytes[:16], ciphertext) + if fmt: + key_str = key_bytes.decode('ascii') + print(f"\n*** 找到 AES key (32-char)! → {fmt} ***", flush=True) + print(f" 完整: {key_str}", flush=True) + print(f" AES key: {key_str[:16]}", flush=True) + return key_str[:16] + + # 也试完整 32 字节作 AES-256 + fmt = try_key(key_bytes, ciphertext) + if fmt: + key_str = key_bytes.decode('ascii') + print(f"\n*** 找到 AES key (32-byte)! → {fmt} ***", flush=True) + print(f" 完整: {key_str}", flush=True) + return key_str + + # 也找独立的 16 字符 alphanum + for m in RE_KEY16.finditer(data): + key_bytes = m.group() + candidates_16 += 1 + + fmt = try_key(key_bytes, ciphertext) + if fmt: + key_str = key_bytes.decode('ascii') + print(f"\n*** 找到 AES key (16-char)! → {fmt} ***", flush=True) + print(f" AES key: {key_str}", flush=True) + return key_str + + elapsed = time.time() - t0 + print(f"\n 测试: {candidates_32} x 32-char + {candidates_16} x 16-char ({elapsed:.1f}s)", flush=True) + return None + + +def verify_and_decrypt(attach_dir, aes_key_str, xor_key): + """完整解密一个 V2 文件作为验证""" + v2_magic = b'\x07\x08V2\x08\x07' + key = aes_key_str.encode('ascii')[:16] + + pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat") + dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) + + for f in dat_files[:10]: + try: + with open(f, 'rb') as fp: + data = fp.read() + if data[:6] != v2_magic: + continue + + sig, aes_size, xor_size = struct.unpack_from('<6sLL', data) + + # AES 对齐: 向上取整到 16 的倍数 (PKCS7 填充) + aligned_aes_size = aes_size + aligned_aes_size -= ~(~aligned_aes_size % 16) + + offset = 15 + aes_data = data[offset:offset + aligned_aes_size] + cipher = AES.new(key, AES.MODE_ECB) + dec_aes = Padding.unpad(cipher.decrypt(aes_data), AES.block_size) + offset += aligned_aes_size + + # Raw portion + raw_data = data[offset:len(data) - xor_size] + offset += len(raw_data) + + # XOR portion + xor_data = data[offset:] + dec_xor = bytes(b ^ xor_key for b in xor_data) if xor_key is not None else xor_data + + result = dec_aes + raw_data + dec_xor + + fmt = "unknown" + ext = ".bin" + if result[:3] == b'\xFF\xD8\xFF': + fmt, ext = "JPEG", ".jpg" + elif result[:4] == bytes([0x89, 0x50, 0x4E, 0x47]): + fmt, ext = "PNG", ".png" + elif result[:4] == b'RIFF': + fmt, ext = "WEBP", ".webp" + elif result[:4] == b'wxgf': + fmt, ext = "WXGF", ".hevc" + + print(f" {os.path.basename(f)} -> {fmt} ({len(result):,}B)", flush=True) + + if fmt != "unknown": + out_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "decoded_images") + os.makedirs(out_dir, exist_ok=True) + out_path = os.path.join(out_dir, os.path.splitext(os.path.basename(f))[0] + ext) + with open(out_path, 'wb') as fp: + fp.write(result) + print(f" saved: {out_path}", flush=True) + return True + except Exception as e: + continue + return False + + +def main(): + config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') + with open(config_path) as f: + config = json.load(f) + + db_dir = config['db_dir'] + base_dir = os.path.dirname(db_dir) + attach_dir = os.path.join(base_dir, 'msg', 'attach') + + # 1. XOR key + print("=== XOR Key ===", flush=True) + xor_key = find_xor_key(attach_dir) + if xor_key is not None: + print(f"XOR key: 0x{xor_key:02x}", flush=True) + + # 2. V2 ciphertext + print("\n=== V2 ciphertext ===", flush=True) + ciphertext, ct_file = find_v2_ciphertext(attach_dir) + if ciphertext is None: + print("No V2 .dat files found") + return + print(f"File: {ct_file}", flush=True) + print(f"Cipher: {ciphertext.hex()}", flush=True) + + # 3. Check if already have key in config + if config.get('image_aes_key'): + print(f"\nExisting image_aes_key: {config['image_aes_key']}", flush=True) + fmt = try_key(config['image_aes_key'].encode('ascii')[:16], ciphertext) + if fmt: + print(f"Key valid! -> {fmt}", flush=True) + print("\n=== Verify decrypt ===", flush=True) + verify_and_decrypt(attach_dir, config['image_aes_key'], xor_key) + return + else: + print("Saved key invalid, re-scanning...", flush=True) + + # 4. Scan memory + print("\n=== Scanning WeChat process memory ===", flush=True) + pids = get_wechat_pids() + if not pids: + print("WeChat not running!") + return + print(f"PIDs: {pids}", flush=True) + print("Tip: View 2-3 images in WeChat first, then run this script immediately\n", flush=True) + + aes_key = None + for pid in pids: + print(f"Scanning PID {pid}...", flush=True) + aes_key = scan_memory_for_aes_key(pid, ciphertext) + if aes_key: + break + + if aes_key: + print(f"\n=== Result ===", flush=True) + print(f"AES key: {aes_key}", flush=True) + print(f"XOR key: 0x{xor_key:02x}" if xor_key is not None else "XOR key: unknown", flush=True) + + config['image_aes_key'] = aes_key + if xor_key is not None: + config['image_xor_key'] = xor_key + with open(config_path, 'w') as f: + json.dump(config, f, indent=2, ensure_ascii=False) + print(f"Saved to {config_path}", flush=True) + + print("\n=== Verify decrypt ===", flush=True) + verify_and_decrypt(attach_dir, aes_key, xor_key) + else: + print("\nAES key not found!", flush=True) + print("Steps:", flush=True) + print(" 1. Login WeChat and keep it running", flush=True) + print(" 2. Open Moments or a chat, view 2-3 images (tap to open full size)", flush=True) + print(" 3. Immediately re-run this script", flush=True) + + +if __name__ == '__main__': + main() diff --git a/find_image_key_monitor.py b/find_image_key_monitor.py new file mode 100644 index 0000000..f81442b --- /dev/null +++ b/find_image_key_monitor.py @@ -0,0 +1,318 @@ +"""持续监控微信进程内存,捕获图片 AES 密钥 + +运行此脚本后,在微信中打开查看几张图片。 +脚本会自动检测到 key 并保存到 config.json。 + +按 Ctrl+C 退出。 +""" +import os +import sys +import re +import struct +import glob +import json +import time +import ctypes +from ctypes import wintypes +from Crypto.Cipher import AES +from Crypto.Util import Padding + +# Windows API constants +PROCESS_VM_READ = 0x0010 +PROCESS_QUERY_INFORMATION = 0x0400 +MEM_COMMIT = 0x1000 +PAGE_NOACCESS = 0x01 +PAGE_GUARD = 0x100 +PAGE_READWRITE = 0x04 +PAGE_WRITECOPY = 0x08 +PAGE_EXECUTE_READWRITE = 0x40 +PAGE_EXECUTE_WRITECOPY = 0x80 + +class MEMORY_BASIC_INFORMATION(ctypes.Structure): + _fields_ = [ + ("BaseAddress", ctypes.c_void_p), + ("AllocationBase", ctypes.c_void_p), + ("AllocationProtect", wintypes.DWORD), + ("RegionSize", ctypes.c_size_t), + ("State", wintypes.DWORD), + ("Protect", wintypes.DWORD), + ("Type", wintypes.DWORD), + ] + +kernel32 = ctypes.windll.kernel32 + +# Regex for key patterns +RE_KEY32 = re.compile(rb'(?= 2: + pids.append(int(parts[1])) + return pids + + +def find_v2_ciphertext(attach_dir): + v2_magic = b'\x07\x08V2\x08\x07' + pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat") + dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) + for f in dat_files[:100]: + try: + with open(f, 'rb') as fp: + header = fp.read(31) + if header[:6] == v2_magic and len(header) >= 31: + return header[15:31], os.path.basename(f) + except: + continue + return None, None + + +def find_xor_key(attach_dir): + v2_magic = b'\x07\x08V2\x08\x07' + pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat") + dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) + tail_counts = {} + for f in dat_files[:32]: + try: + sz = os.path.getsize(f) + with open(f, 'rb') as fp: + head = fp.read(6) + fp.seek(sz - 2) + tail = fp.read(2) + if head == v2_magic and len(tail) == 2: + key = (tail[0], tail[1]) + tail_counts[key] = tail_counts.get(key, 0) + 1 + except: + continue + if not tail_counts: + return None + most_common = max(tail_counts, key=tail_counts.get) + return most_common[0] ^ 0xFF + + +def try_key(key_bytes, ciphertext): + try: + cipher = AES.new(key_bytes, AES.MODE_ECB) + dec = cipher.decrypt(ciphertext) + if dec[:3] == b'\xFF\xD8\xFF': return 'JPEG' + if dec[:4] == bytes([0x89, 0x50, 0x4E, 0x47]): return 'PNG' + if dec[:4] == b'RIFF': return 'WEBP' + if dec[:4] == b'wxgf': return 'WXGF' + if dec[:3] == b'GIF': return 'GIF' + except: + pass + return None + + +def is_rw_protect(protect): + rw_flags = (PAGE_READWRITE | PAGE_WRITECOPY | + PAGE_EXECUTE_READWRITE | PAGE_EXECUTE_WRITECOPY) + return (protect & rw_flags) != 0 + + +def get_rw_regions(h_process): + """Get RW committed memory regions""" + address = 0 + mbi = MEMORY_BASIC_INFORMATION() + regions = [] + while address < 0x7FFFFFFFFFFF: + result = kernel32.VirtualQueryEx( + h_process, ctypes.c_void_p(address), + ctypes.byref(mbi), ctypes.sizeof(mbi) + ) + if result == 0: + break + if (mbi.State == MEM_COMMIT and + mbi.Protect != PAGE_NOACCESS and + (mbi.Protect & PAGE_GUARD) == 0 and + mbi.RegionSize <= 50 * 1024 * 1024 and + is_rw_protect(mbi.Protect)): + regions.append((mbi.BaseAddress, mbi.RegionSize)) + next_addr = address + mbi.RegionSize + if next_addr <= address: + break + address = next_addr + return regions + + +def quick_scan(h_process, regions, ciphertext): + """Fast scan of RW regions, return key or None""" + for base_addr, region_size in regions: + buffer = ctypes.create_string_buffer(region_size) + bytes_read = ctypes.c_size_t(0) + ok = kernel32.ReadProcessMemory( + h_process, ctypes.c_void_p(base_addr), + buffer, region_size, ctypes.byref(bytes_read) + ) + if not ok or bytes_read.value < 32: + continue + + data = buffer.raw[:bytes_read.value] + + # 32-char keys (first 16 as AES-128) + for m in RE_KEY32.finditer(data): + key_bytes = m.group() + fmt = try_key(key_bytes[:16], ciphertext) + if fmt: + return key_bytes.decode('ascii')[:16], fmt + fmt = try_key(key_bytes, ciphertext) + if fmt: + return key_bytes.decode('ascii'), fmt + + # Standalone 16-char keys + for m in RE_KEY16.finditer(data): + key_bytes = m.group() + fmt = try_key(key_bytes, ciphertext) + if fmt: + return key_bytes.decode('ascii'), fmt + + return None, None + + +def verify_and_decrypt(attach_dir, aes_key_str, xor_key): + """Decrypt one V2 file as verification""" + v2_magic = b'\x07\x08V2\x08\x07' + key = aes_key_str.encode('ascii')[:16] + pattern = os.path.join(attach_dir, "*", "*", "Img", "*_t.dat") + dat_files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) + + for f in dat_files[:10]: + try: + with open(f, 'rb') as fp: + data = fp.read() + if data[:6] != v2_magic: + continue + sig, aes_size, xor_size = struct.unpack_from('<6sLL', data) + aligned_aes_size = aes_size + aligned_aes_size -= ~(~aligned_aes_size % 16) + offset = 15 + aes_data = data[offset:offset + aligned_aes_size] + cipher = AES.new(key, AES.MODE_ECB) + dec_aes = Padding.unpad(cipher.decrypt(aes_data), AES.block_size) + offset += aligned_aes_size + raw_data = data[offset:len(data) - xor_size] + offset += len(raw_data) + xor_data = data[offset:] + dec_xor = bytes(b ^ xor_key for b in xor_data) if xor_key is not None else xor_data + result = dec_aes + raw_data + dec_xor + + fmt, ext = "unknown", ".bin" + if result[:3] == b'\xFF\xD8\xFF': fmt, ext = "JPEG", ".jpg" + elif result[:4] == bytes([0x89, 0x50, 0x4E, 0x47]): fmt, ext = "PNG", ".png" + elif result[:4] == b'RIFF': fmt, ext = "WEBP", ".webp" + elif result[:4] == b'wxgf': fmt, ext = "WXGF", ".hevc" + + if fmt != "unknown": + out_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "decoded_images") + os.makedirs(out_dir, exist_ok=True) + out_path = os.path.join(out_dir, os.path.splitext(os.path.basename(f))[0] + ext) + with open(out_path, 'wb') as fp: + fp.write(result) + print(f" Verified: {os.path.basename(f)} -> {fmt} ({len(result):,}B)", flush=True) + print(f" Saved: {out_path}", flush=True) + return True + except: + continue + return False + + +def main(): + config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') + with open(config_path) as f: + config = json.load(f) + + db_dir = config['db_dir'] + base_dir = os.path.dirname(db_dir) + attach_dir = os.path.join(base_dir, 'msg', 'attach') + + xor_key = find_xor_key(attach_dir) + print(f"XOR key: 0x{xor_key:02x}" if xor_key else "XOR key: unknown", flush=True) + + ciphertext, ct_file = find_v2_ciphertext(attach_dir) + if ciphertext is None: + print("No V2 .dat files found") + return + print(f"V2 cipher: {ciphertext.hex()} ({ct_file})", flush=True) + + # Check existing key + if config.get('image_aes_key'): + fmt = try_key(config['image_aes_key'].encode('ascii')[:16], ciphertext) + if fmt: + print(f"Existing key valid: {config['image_aes_key']} -> {fmt}", flush=True) + return + + pids = get_wechat_pids() + if not pids: + print("WeChat not running!") + return + + # Find the main PID (largest memory footprint) + main_pid = pids[0] + print(f"\nMonitoring PID {main_pid} (main WeChat process)", flush=True) + print("=" * 60, flush=True) + print("NOW: Open WeChat and tap to view 2-3 images (full size)", flush=True) + print("The script will automatically detect the key...", flush=True) + print("=" * 60, flush=True) + + access = PROCESS_VM_READ | PROCESS_QUERY_INFORMATION + h_process = kernel32.OpenProcess(access, False, main_pid) + if not h_process: + print(f"Cannot open process {main_pid} (run as admin?)", flush=True) + return + + try: + # Get regions once (they don't change much) + regions = get_rw_regions(h_process) + total_mb = sum(r[1] for r in regions) / 1024 / 1024 + print(f"RW regions: {len(regions)} ({total_mb:.0f} MB)", flush=True) + + scan_count = 0 + while True: + scan_count += 1 + t0 = time.time() + aes_key, fmt = quick_scan(h_process, regions, ciphertext) + elapsed = time.time() - t0 + + if aes_key: + print(f"\n{'='*60}", flush=True) + print(f"*** FOUND AES key! -> {fmt} ***", flush=True) + print(f"AES key: {aes_key}", flush=True) + print(f"XOR key: 0x{xor_key:02x}" if xor_key else "XOR key: unknown", flush=True) + print(f"{'='*60}", flush=True) + + config['image_aes_key'] = aes_key + if xor_key is not None: + config['image_xor_key'] = xor_key + with open(config_path, 'w') as f: + json.dump(config, f, indent=2, ensure_ascii=False) + print(f"Saved to {config_path}", flush=True) + + verify_and_decrypt(attach_dir, aes_key, xor_key) + return + + print(f" Scan #{scan_count}: no key found ({elapsed:.1f}s)", end='\r', flush=True) + + # Wait 5 seconds before next scan + time.sleep(5) + + # Refresh regions periodically (every 5 scans) + if scan_count % 5 == 0: + regions = get_rw_regions(h_process) + + except KeyboardInterrupt: + print("\nStopped by user", flush=True) + finally: + kernel32.CloseHandle(h_process) + + +if __name__ == '__main__': + main() diff --git a/mcp_server.py b/mcp_server.py index a27f5a3..e2a98c4 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -10,6 +10,8 @@ import hmac as hmac_mod from datetime import datetime from Crypto.Cipher import AES from mcp.server.fastmcp import FastMCP +import zstandard as zstd +from decode_image import ImageResolver # ============ 加密常量 ============ PAGE_SZ = 4096 @@ -34,6 +36,19 @@ DB_DIR = _cfg["db_dir"] KEYS_FILE = _cfg["keys_file"] DECRYPTED_DIR = _cfg["decrypted_dir"] +# 图片相关路径 +_db_dir = _cfg["db_dir"] +if os.path.basename(_db_dir) == "db_storage": + WECHAT_BASE_DIR = os.path.dirname(_db_dir) +else: + WECHAT_BASE_DIR = _db_dir + +DECODED_IMAGE_DIR = _cfg.get("decoded_image_dir") +if not DECODED_IMAGE_DIR: + DECODED_IMAGE_DIR = os.path.join(SCRIPT_DIR, "decoded_images") +elif not os.path.isabs(DECODED_IMAGE_DIR): + DECODED_IMAGE_DIR = os.path.join(SCRIPT_DIR, DECODED_IMAGE_DIR) + with open(KEYS_FILE) as f: ALL_KEYS = json.load(f) @@ -240,12 +255,30 @@ def resolve_username(chat_name): return None +_zstd_dctx = zstd.ZstdDecompressor() + + +def _decompress_content(content, ct): + """解压 zstd 压缩的消息内容""" + if ct and ct == 4 and isinstance(content, bytes): + try: + return _zstd_dctx.decompress(content).decode('utf-8', errors='replace') + except Exception: + return None + if isinstance(content, bytes): + try: + return content.decode('utf-8', errors='replace') + except Exception: + return None + return content + + def _parse_message_content(content, local_type, is_group): """解析消息内容,返回 (sender_id, text)""" if content is None: return '', '' if isinstance(content, bytes): - return '', '(压缩内容)' + return '', '(二进制内容)' sender = '' text = content @@ -327,10 +360,13 @@ def get_recent_sessions(limit: int = 20) -> str: display = names.get(username, username) is_group = '@chatroom' in username + if isinstance(summary, bytes): + try: + summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace') + except Exception: + summary = '(压缩内容)' if isinstance(summary, str) and ':\n' in summary: summary = summary.split(':\n', 1)[1] - elif isinstance(summary, bytes): - summary = '(压缩内容)' sender_display = '' if is_group and sender: @@ -376,9 +412,9 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str: conn = sqlite3.connect(db_path) try: rows = conn.execute(f""" - SELECT local_type, create_time, message_content, WCDB_CT_message_content + SELECT local_id, local_type, create_time, message_content, + WCDB_CT_message_content FROM [{table_name}] - WHERE WCDB_CT_message_content = 0 OR WCDB_CT_message_content IS NULL ORDER BY create_time DESC LIMIT ? """, (limit,)).fetchall() @@ -391,11 +427,21 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str: return f"{display_name} 无消息记录" lines = [] - for local_type, create_time, content, ct in reversed(rows): + for local_id, local_type, create_time, content, ct in reversed(rows): time_str = datetime.fromtimestamp(create_time).strftime('%m-%d %H:%M') + + # zstd 解压 + content = _decompress_content(content, ct) + if content is None: + content = '(无法解压)' + sender, text = _parse_message_content(content, local_type, is_group) - if local_type != 1: + if local_type == 3: + text = f"[图片] (local_id={local_id})" + elif local_type == 47: + text = "[表情]" + elif local_type != 1: type_label = format_msg_type(local_type) text = f"[{type_label}] {text}" if text else f"[{type_label}]" @@ -461,17 +507,20 @@ def search_messages(keyword: str, limit: int = 20) -> str: try: rows = conn.execute(f""" - SELECT local_type, create_time, message_content + SELECT local_type, create_time, message_content, + WCDB_CT_message_content FROM [{tname}] - WHERE message_content LIKE ? AND - (WCDB_CT_message_content = 0 OR WCDB_CT_message_content IS NULL) + WHERE message_content LIKE ? ORDER BY create_time DESC LIMIT ? """, (f'%{keyword}%', limit - len(results))).fetchall() except Exception: continue - for local_type, ts, content in rows: + for local_type, ts, content, ct in rows: + content = _decompress_content(content, ct) + if content is None: + continue sender, text = _parse_message_content(content, local_type, is_group) time_str = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M') sender_name = '' @@ -577,10 +626,13 @@ def get_new_messages() -> str: display = names.get(username, username) is_group = '@chatroom' in username summary = s['summary'] + if isinstance(summary, bytes): + try: + summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace') + except Exception: + summary = '(压缩内容)' if isinstance(summary, str) and ':\n' in summary: summary = summary.split(':\n', 1)[1] - elif isinstance(summary, bytes): - summary = '(压缩内容)' time_str = datetime.fromtimestamp(s['timestamp']).strftime('%H:%M') tag = "[群]" if is_group else "" unread_msgs.append(f"[{time_str}] {display}{tag} ({s['unread']}条未读): {summary}") @@ -597,10 +649,13 @@ def get_new_messages() -> str: display = names.get(username, username) is_group = '@chatroom' in username summary = s['summary'] + if isinstance(summary, bytes): + try: + summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace') + except Exception: + summary = '(压缩内容)' if isinstance(summary, str) and ':\n' in summary: summary = summary.split(':\n', 1)[1] - elif isinstance(summary, bytes): - summary = '(压缩内容)' sender_display = '' if is_group and s['sender']: @@ -626,5 +681,83 @@ def get_new_messages() -> str: return f"{len(entries)} 条新消息:\n\n" + "\n".join(entries) +# ============ 图片解密 ============ + +_image_resolver = ImageResolver(WECHAT_BASE_DIR, DECODED_IMAGE_DIR, _cache) + + +@mcp.tool() +def decode_image(chat_name: str, local_id: int) -> str: + """解密微信聊天中的一张图片。 + + 先用 get_chat_history 查看消息,图片消息会显示 local_id, + 然后用此工具解密对应图片。 + + Args: + chat_name: 聊天对象的名字、备注名或wxid + local_id: 图片消息的 local_id(从 get_chat_history 获取) + """ + username = resolve_username(chat_name) + if not username: + return f"找不到聊天对象: {chat_name}" + + result = _image_resolver.decode_image(username, local_id) + if result['success']: + return ( + f"解密成功!\n" + f" 文件: {result['path']}\n" + f" 格式: {result['format']}\n" + f" 大小: {result['size']:,} bytes\n" + f" MD5: {result['md5']}" + ) + else: + error = result['error'] + if 'md5' in result: + error += f"\n MD5: {result['md5']}" + return f"解密失败: {error}" + + +@mcp.tool() +def get_chat_images(chat_name: str, limit: int = 20) -> str: + """列出某个聊天中的图片消息。 + + 返回图片的时间、local_id、MD5、文件大小等信息。 + 可以配合 decode_image 工具解密指定图片。 + + Args: + chat_name: 聊天对象的名字、备注名或wxid + limit: 返回数量,默认20 + """ + username = resolve_username(chat_name) + if not username: + return f"找不到聊天对象: {chat_name}" + + names = get_contact_names() + display_name = names.get(username, username) + + db_path, table_name = _find_msg_table_for_user(username) + if not db_path: + return f"找不到 {display_name} 的消息记录" + + images = _image_resolver.list_chat_images(db_path, table_name, username, limit) + if not images: + return f"{display_name} 无图片消息" + + lines = [] + for img in images: + time_str = datetime.fromtimestamp(img['create_time']).strftime('%Y-%m-%d %H:%M') + line = f"[{time_str}] local_id={img['local_id']}" + if img.get('md5'): + line += f" MD5={img['md5']}" + if img.get('size'): + size_kb = img['size'] / 1024 + line += f" {size_kb:.0f}KB" + if not img.get('md5'): + line += " (无资源信息)" + lines.append(line) + + return f"{display_name} 的 {len(lines)} 张图片:\n\n" + "\n".join(lines) + + if __name__ == "__main__": mcp.run() diff --git a/monitor.py b/monitor.py index efea0f8..83b6265 100644 --- a/monitor.py +++ b/monitor.py @@ -8,6 +8,9 @@ import hashlib, struct, os, sys, json, time, sqlite3, io import hmac as hmac_mod from datetime import datetime from Crypto.Cipher import AES +import zstandard as zstd + +_zstd_dctx = zstd.ZstdDecompressor() sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') @@ -218,6 +221,11 @@ def main(): # 消息内容 summary = curr['summary'] + if isinstance(summary, bytes): + try: + summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace') + except Exception: + summary = '(压缩内容)' if summary: # 群消息格式: "wxid_xxx:\n内容" - 提取内容部分 if ':\n' in summary: diff --git a/monitor_web.py b/monitor_web.py index d174db2..04504b0 100644 --- a/monitor_web.py +++ b/monitor_web.py @@ -6,13 +6,19 @@ http://localhost:5678 - 检测到变化后:全量解密DB + 全量WAL patch - SSE 服务器推送 """ -import hashlib, struct, os, sys, json, time, sqlite3, io, threading, queue +import hashlib, struct, os, sys, json, time, sqlite3, io, threading, queue, traceback import hmac as hmac_mod +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn from Crypto.Cipher import AES import urllib.parse +import glob as glob_mod +import zstandard as zstd +from decode_image import extract_md5_from_packed_info, decrypt_dat_file, is_v2_format + +_zstd_dctx = zstd.ZstdDecompressor() PAGE_SZ = 4096 KEY_SZ = 32 @@ -28,6 +34,11 @@ DB_DIR = _cfg["db_dir"] KEYS_FILE = _cfg["keys_file"] CONTACT_CACHE = os.path.join(_cfg["decrypted_dir"], "contact", "contact.db") DECRYPTED_SESSION = os.path.join(_cfg["decrypted_dir"], "session", "session.db") +DECODED_IMAGE_DIR = _cfg.get("decoded_image_dir", os.path.join(os.path.dirname(os.path.abspath(__file__)), "decoded_images")) +MONITOR_CACHE_DIR = os.path.join(_cfg["decrypted_dir"], "_monitor_cache") +WECHAT_BASE_DIR = _cfg.get("wechat_base_dir", "") +IMAGE_AES_KEY = _cfg.get("image_aes_key") # V2 格式 AES key (从微信内存提取) +IMAGE_XOR_KEY = _cfg.get("image_xor_key", 0x88) # XOR key POLL_MS = 30 # 高频轮询WAL/DB的mtime,30ms一次 PORT = 5678 @@ -37,6 +48,98 @@ sse_lock = threading.Lock() messages_log = [] messages_lock = threading.Lock() MAX_LOG = 500 +_img_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix='img') + + +class MonitorDBCache: + """轻量 DB 缓存,mtime 检测变化时重新解密""" + + def __init__(self, keys, tmp_dir): + self.keys = keys + self.tmp_dir = tmp_dir + os.makedirs(tmp_dir, exist_ok=True) + self._state = {} # rel_key → (db_mtime, wal_mtime) + + def get(self, rel_key): + """返回解密后的临时文件路径,mtime 变化时自动重新解密""" + if rel_key not in self.keys: + return None + + enc_key = bytes.fromhex(self.keys[rel_key]["enc_key"]) + rel_path = rel_key.replace('\\', os.sep) + db_path = os.path.join(DB_DIR, rel_path) + wal_path = db_path + "-wal" + + if not os.path.exists(db_path): + return None + + try: + db_mtime = os.path.getmtime(db_path) + wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0 + except OSError: + return None + + out_name = rel_key.replace('\\', '_') + out_path = os.path.join(self.tmp_dir, out_name) + + prev = self._state.get(rel_key) + + if prev is None or db_mtime != prev[0]: + t0 = time.perf_counter() + full_decrypt(db_path, out_path, enc_key) + if os.path.exists(wal_path): + decrypt_wal_full(wal_path, out_path, enc_key) + ms = (time.perf_counter() - t0) * 1000 + print(f" [cache] {rel_key} 全量解密 {ms:.0f}ms", flush=True) + self._state[rel_key] = (db_mtime, wal_mtime) + elif wal_mtime != prev[1]: + t0 = time.perf_counter() + decrypt_wal_full(wal_path, out_path, enc_key) + ms = (time.perf_counter() - t0) * 1000 + print(f" [cache] {rel_key} WAL patch {ms:.0f}ms", flush=True) + self._state[rel_key] = (db_mtime, wal_mtime) + + return out_path + + +def build_username_db_map(): + """从已解密的 Name2Id 表构建 username → [db_keys] 映射 + + 同一个 username 可能存在于多个 message_N.db 中, + 按 DB 文件修改时间倒序排列(最新的排前面)。 + """ + # 先获取每个 DB 的 mtime 用于排序 + db_mtimes = {} + for i in range(5): + rel_key = f"message\\message_{i}.db" + db_path = os.path.join(DB_DIR, "message", f"message_{i}.db") + try: + db_mtimes[rel_key] = os.path.getmtime(db_path) + except OSError: + db_mtimes[rel_key] = 0 + + mapping = {} # username → [db_keys], 最新的在前 + decrypted_msg_dir = os.path.join(_cfg["decrypted_dir"], "message") + for i in range(5): + db_path = os.path.join(decrypted_msg_dir, f"message_{i}.db") + if not os.path.exists(db_path): + continue + rel_key = f"message\\message_{i}.db" + try: + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + for row in conn.execute("SELECT user_name FROM Name2Id").fetchall(): + if row[0] not in mapping: + mapping[row[0]] = [] + mapping[row[0]].append(rel_key) + conn.close() + except Exception as e: + print(f" [WARN] Name2Id message_{i}.db: {e}", flush=True) + + # 对每个 username 的 db_keys 按 mtime 倒序(最新的优先) + for username in mapping: + mapping[username].sort(key=lambda k: db_mtimes.get(k, 0), reverse=True) + + return mapping def decrypt_page(enc_key, page_data, pgno): @@ -156,7 +259,12 @@ def msg_type_icon(t): def broadcast_sse(msg_data): - payload = f"data: {json.dumps(msg_data, ensure_ascii=False)}\n\n" + event_type = msg_data.get('event', '') + data_line = f"data: {json.dumps(msg_data, ensure_ascii=False)}\n" + if event_type: + payload = f"event: {event_type}\n{data_line}\n" + else: + payload = f"{data_line}\n" with sse_lock: dead = [] for q in sse_clients: @@ -171,15 +279,180 @@ def broadcast_sse(msg_data): # ============ 监听器 ============ class SessionMonitor: - def __init__(self, enc_key, session_db, contact_names): + def __init__(self, enc_key, session_db, contact_names, db_cache=None, username_db_map=None): self.enc_key = enc_key self.session_db = session_db self.wal_path = session_db + "-wal" self.contact_names = contact_names + self.db_cache = db_cache + self.username_db_map = username_db_map or {} self.prev_state = {} self.decrypt_ms = 0 self.patched_pages = 0 + def resolve_image(self, username, timestamp): + """解密图片: username+timestamp → 解密后的图片文件名,失败返回 None""" + if not self.db_cache or not self.username_db_map: + return None + + # 1. 找到 username 对应的所有 message_N.db(按 mtime 倒序) + db_keys = self.username_db_map.get(username) + if not db_keys: + return None + + # 2. 遍历候选 DB,找到包含该 timestamp 消息的那个 + table_name = f"Msg_{hashlib.md5(username.encode()).hexdigest()}" + local_id = None + for db_key in db_keys: + msg_db_path = self.db_cache.get(db_key) + if not msg_db_path: + continue + try: + conn = sqlite3.connect(f"file:{msg_db_path}?mode=ro", uri=True) + # 精确匹配 timestamp + row = conn.execute(f""" + SELECT local_id FROM [{table_name}] + WHERE local_type = 3 AND create_time = ? + """, (timestamp,)).fetchone() + if not row: + # 模糊匹配(±3秒内最近的图片消息) + row = conn.execute(f""" + SELECT local_id FROM [{table_name}] + WHERE local_type = 3 AND ABS(create_time - ?) <= 3 + ORDER BY ABS(create_time - ?) LIMIT 1 + """, (timestamp, timestamp)).fetchone() + conn.close() + if row: + local_id = row[0] + break + except Exception as e: + print(f" [img] 查询 {db_key}/{table_name} 失败: {e}", flush=True) + + if not local_id: + print(f" [img] 未找到 local_id: {username} t={timestamp}", flush=True) + return None + + # 4. 查 message_resource.db 获取 MD5 + # local_id 不全局唯一,需要同时匹配 create_time + res_path = self.db_cache.get("message\\message_resource.db") + if not res_path: + return None + + file_md5 = None + try: + conn = sqlite3.connect(f"file:{res_path}?mode=ro", uri=True) + row = conn.execute( + "SELECT packed_info FROM MessageResourceInfo " + "WHERE message_local_id = ? AND message_create_time = ? AND message_local_type = 3", + (local_id, timestamp) + ).fetchone() + if not row: + # 降级: 只用 create_time + type + row = conn.execute( + "SELECT packed_info FROM MessageResourceInfo " + "WHERE message_create_time = ? AND message_local_type = 3", + (timestamp,) + ).fetchone() + conn.close() + if row and row[0]: + file_md5 = extract_md5_from_packed_info(row[0]) + except Exception as e: + print(f" [img] 查询 message_resource 失败: {e}", flush=True) + return None + + if not file_md5: + print(f" [img] 未找到 MD5: local_id={local_id} t={timestamp}", flush=True) + return None + + # 5. 查找 .dat 文件 + attach_dir = os.path.join(WECHAT_BASE_DIR, "msg", "attach") + username_hash = hashlib.md5(username.encode()).hexdigest() + search_base = os.path.join(attach_dir, username_hash) + + if not os.path.isdir(search_base): + print(f" [img] attach 目录不存在: {search_base}", flush=True) + return None + + pattern = os.path.join(search_base, "*", "Img", f"{file_md5}*.dat") + dat_files = sorted(glob_mod.glob(pattern)) + if not dat_files: + print(f" [img] 未找到 .dat: MD5={file_md5}", flush=True) + return None + + # 优先原图,然后高清 _h,最后缩略图 _t + selected = dat_files[0] + for f in dat_files: + fname = os.path.basename(f) + if not fname.startswith(file_md5 + '_'): + selected = f + break + for f in dat_files: + if f.endswith('_h.dat'): + selected = f + break + + # 6. 解密图片 + os.makedirs(DECODED_IMAGE_DIR, exist_ok=True) + out_base = os.path.join(DECODED_IMAGE_DIR, file_md5) + + # 已解密则跳过 + for ext in ('jpg', 'png', 'gif', 'webp', 'bmp', 'tif'): + candidate = f"{out_base}.{ext}" + if os.path.exists(candidate): + return os.path.basename(candidate) + + # V2 新格式需要 AES key + if is_v2_format(selected) and not IMAGE_AES_KEY: + print(f" [img] V2 格式缺少 AES key: {os.path.basename(selected)}", flush=True) + print(f" [img] 请运行 find_image_key.py 提取密钥", flush=True) + return '__v2_unsupported__' + + result_path, fmt = decrypt_dat_file(selected, f"{out_base}.tmp", IMAGE_AES_KEY, IMAGE_XOR_KEY) + if not result_path: + print(f" [img] 解密失败: {selected}", flush=True) + return None + + final = f"{out_base}.{fmt}" + if os.path.exists(final): + os.unlink(final) + os.rename(result_path, final) + size_kb = os.path.getsize(final) / 1024 + print(f" [img] 解密成功: {os.path.basename(final)} ({size_kb:.0f}KB)", flush=True) + return os.path.basename(final) + + def _async_resolve_image(self, username, timestamp, msg_data): + """后台线程: 解密图片并通过 SSE 推送更新""" + for attempt in range(3): + try: + img_name = self.resolve_image(username, timestamp) + if img_name == '__v2_unsupported__': + # V2 新加密格式,显示占位提示 + msg_data['content'] = '[图片 - 新加密格式暂不支持预览]' + broadcast_sse({ + 'event': 'image_update', + 'timestamp': timestamp, + 'username': username, + 'v2_unsupported': True, + }) + return + elif img_name: + image_url = f'/img/{img_name}' + msg_data['image_url'] = image_url + broadcast_sse({ + 'event': 'image_update', + 'timestamp': timestamp, + 'username': username, + 'image_url': image_url, + }) + print(f" [img] 异步解密成功: {img_name}", flush=True) + return + elif attempt < 2: + time.sleep(1.5) + except Exception as e: + print(f" [img] 异步解密失败(attempt={attempt}): {e}", flush=True) + if attempt < 2: + time.sleep(1.5) + def query_state(self): """查询已解密副本的session状态""" conn = sqlite3.connect(f"file:{DECRYPTED_SESSION}?mode=ro", uri=True) @@ -237,10 +510,15 @@ class SessionMonitor: sender = self.contact_names.get(curr['sender'], curr['sender_name'] or curr['sender']) summary = curr['summary'] + if isinstance(summary, bytes): + try: + summary = _zstd_dctx.decompress(summary).decode('utf-8', errors='replace') + except Exception: + summary = '(压缩内容)' if summary and ':\n' in summary: summary = summary.split(':\n', 1)[1] - new_msgs.append({ + msg_data = { 'time': datetime.fromtimestamp(curr['timestamp']).strftime('%H:%M:%S'), 'timestamp': curr['timestamp'], 'chat': display, @@ -253,7 +531,16 @@ class SessionMonitor: 'unread': curr['unread'], 'decrypt_ms': round(self.decrypt_ms, 1), 'pages': self.patched_pages, - }) + } + + new_msgs.append(msg_data) + + # 图片消息: 后台异步解密(不阻塞轮询) + if curr['msg_type'] == 3: + _img_executor.submit( + self._async_resolve_image, + username, curr['timestamp'], msg_data + ) # 按时间排序 new_msgs.sort(key=lambda m: m['timestamp']) @@ -281,8 +568,8 @@ class SessionMonitor: self.prev_state = curr_state -def monitor_thread(enc_key, session_db, contact_names): - mon = SessionMonitor(enc_key, session_db, contact_names) +def monitor_thread(enc_key, session_db, contact_names, db_cache=None, username_db_map=None): + mon = SessionMonitor(enc_key, session_db, contact_names, db_cache, username_db_map) wal_path = mon.wal_path # 初始全量解密 @@ -372,6 +659,8 @@ body{font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,sans-serif;b .msg-unread{font-size:10px;padding:1px 6px;border-radius:8px;background:rgba(244,67,54,.2);color:#ef9a9a;font-weight:600} .msg-perf{font-size:9px;color:#333} .msg-content{font-size:13px;line-height:1.4;color:#bbb;word-break:break-all;padding-left:63px} +.msg-img{max-width:300px;max-height:200px;border-radius:8px;cursor:pointer;margin-top:4px;transition:transform .2s} +.msg-img:hover{transform:scale(1.02)} .empty{text-align:center;padding:80px 20px;color:#444} .empty .icon{font-size:48px;margin-bottom:12px} ::-webkit-scrollbar{width:4px} @@ -415,7 +704,13 @@ function addMsg(m, animate){ const ur=m.unread>0?`${m.unread}`:''; const cc=m.is_group?'msg-chat grp':'msg-chat'; - d.innerHTML=`
${m.time}${esc(m.chat)}${sn}
${m.type_icon} ${m.type}${ur}
${esc(m.content||'')}
`; + let contentHtml = esc(m.content||''); + if(m.image_url){ + contentHtml = `${esc(m.content||'')}`; + } + + const dk=m.timestamp+'|'+(m.username||m.chat); + d.innerHTML=`
${m.time}${esc(m.chat)}${sn}
${m.type_icon} ${m.type}${ur}
${contentHtml}
`; M.insertBefore(d, M.firstChild); @@ -438,6 +733,22 @@ function connectSSE(){ es.onmessage=ev=>{ addMsg(JSON.parse(ev.data), true); // 新消息有动画 }; + es.addEventListener('image_update', ev=>{ + const d=JSON.parse(ev.data); + const key=d.timestamp+'|'+(d.username||''); + const msgs=M.querySelectorAll('.msg'); + for(const el of msgs){ + const ct=el.querySelector('.msg-content'); + if(ct && ct.dataset.key===key){ + if(d.v2_unsupported){ + ct.innerHTML='[图片 - 新加密格式暂不支持预览]'; + } else if(d.image_url){ + ct.innerHTML=``; + } + break; + } + } + }); es.onerror=()=>{ S.textContent='重连...'; S.className='status err'; @@ -481,6 +792,32 @@ class Handler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) + elif self.path.startswith('/img/'): + filename = urllib.parse.unquote(self.path[5:]) + # 安全: 防目录穿越 + if '/' in filename or '\\' in filename or '..' in filename: + self.send_error(403) + return + filepath = os.path.join(DECODED_IMAGE_DIR, filename) + if not os.path.isfile(filepath): + self.send_error(404) + return + ext = os.path.splitext(filename)[1].lower() + ct = { + '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', + '.png': 'image/png', '.gif': 'image/gif', + '.webp': 'image/webp', '.bmp': 'image/bmp', + '.tif': 'image/tiff', + }.get(ext, 'application/octet-stream') + with open(filepath, 'rb') as f: + data = f.read() + self.send_response(200) + self.send_header('Content-Type', ct) + self.send_header('Content-Length', str(len(data))) + self.send_header('Cache-Control', 'public, max-age=86400') + self.end_headers() + self.wfile.write(data) + elif self.path == '/stream': self.send_response(200) self.send_header('Content-Type', 'text/event-stream') @@ -529,7 +866,20 @@ def main(): contact_names = load_contact_names() print(f"已加载 {len(contact_names)} 个联系人", flush=True) - t = threading.Thread(target=monitor_thread, args=(enc_key, session_db, contact_names), daemon=True) + print("构建 username→DB 映射...", flush=True) + username_db_map = build_username_db_map() + print(f"已映射 {len(username_db_map)} 个用户名", flush=True) + + db_cache = MonitorDBCache(keys, MONITOR_CACHE_DIR) + + # 后台预热 message_resource.db(图片解密必需) + def _warmup(): + t0 = time.perf_counter() + db_cache.get("message\\message_resource.db") + print(f"[warmup] message_resource.db 预热完成 {(time.perf_counter()-t0)*1000:.0f}ms", flush=True) + threading.Thread(target=_warmup, daemon=True).start() + + t = threading.Thread(target=monitor_thread, args=(enc_key, session_db, contact_names, db_cache, username_db_map), daemon=True) t.start() server = ThreadedServer(('0.0.0.0', PORT), Handler)