""" 从微信进程内存中提取所有数据库的缓存raw key WCDB为每个DB缓存: x'<64hex_enc_key><32hex_salt>' salt嵌在hex字符串中,可以直接匹配DB文件的salt """ import ctypes import ctypes.wintypes as wt import os, sys, time, re import functools print = functools.partial(print, flush=True) from key_scan_common import ( collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results, ) kernel32 = ctypes.windll.kernel32 MEM_COMMIT = 0x1000 READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80} class MBI(ctypes.Structure): _fields_ = [ ("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64), ("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD), ("RegionSize", ctypes.c_uint64), ("State", wt.DWORD), ("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD), ] def get_pids(): """返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序""" import subprocess r = subprocess.run(["tasklist", "/FI", "IMAGENAME eq Weixin.exe", "/FO", "CSV", "/NH"], capture_output=True, text=True) pids = [] for line in r.stdout.strip().split('\n'): if not line.strip(): continue p = line.strip('"').split('","') if len(p) >= 5: pid = int(p[1]) mem = int(p[4].replace(',', '').replace(' K', '').strip() or '0') pids.append((pid, mem)) if not pids: raise RuntimeError("Weixin.exe 未运行") pids.sort(key=lambda x: x[1], reverse=True) for pid, mem in pids: print(f"[+] Weixin.exe PID={pid} ({mem // 1024}MB)") return pids def read_mem(h, addr, sz): buf = ctypes.create_string_buffer(sz) n = ctypes.c_size_t(0) if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)): return buf.raw[:n.value] return None def enum_regions(h): regs = [] addr = 0 mbi = MBI() while addr < 0x7FFFFFFFFFFF: if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)) == 0: break if mbi.State == MEM_COMMIT and mbi.Protect in READABLE and 0 < mbi.RegionSize < 500 * 1024 * 1024: regs.append((mbi.BaseAddress, mbi.RegionSize)) nxt = mbi.BaseAddress + mbi.RegionSize if nxt <= addr: break addr = nxt return regs def main(): from config import load_config _cfg = load_config() db_dir = _cfg["db_dir"] out_file = _cfg["keys_file"] print("=" * 60) print(" 提取所有微信数据库密钥") print("=" * 60) # 1. 收集所有DB文件及其salt db_files, salt_to_dbs = collect_db_files(db_dir) print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的salt") for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True): print(f" salt {salt_hex}: {', '.join(dbs)}") # 2. 打开所有微信进程 pids = get_pids() hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'") key_map = {} remaining_salts = set(salt_to_dbs.keys()) all_hex_matches = 0 t0 = time.time() for pid, mem_kb in pids: h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid) if not h: print(f"[WARN] 无法打开进程 PID={pid},跳过") continue try: regions = enum_regions(h) total_bytes = sum(s for _, s in regions) total_mb = total_bytes / 1024 / 1024 print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)") scanned_bytes = 0 for reg_idx, (base, size) in enumerate(regions): data = read_mem(h, base, size) scanned_bytes += size if not data: continue all_hex_matches += scan_memory_for_keys( data, hex_re, db_files, salt_to_dbs, key_map, remaining_salts, base, pid, print, ) if (reg_idx + 1) % 200 == 0: elapsed = time.time() - t0 progress = scanned_bytes / total_bytes * 100 if total_bytes else 100 print( f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, " f"{all_hex_matches} hex patterns, {elapsed:.1f}s" ) finally: kernel32.CloseHandle(h) if not remaining_salts: print(f"\n[+] 所有密钥已找到,跳过剩余进程") break elapsed = time.time() - t0 print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式") cross_verify_keys(db_files, salt_to_dbs, key_map, print) save_results(db_files, salt_to_dbs, key_map, db_dir, out_file, print) if __name__ == '__main__': try: main() except RuntimeError as e: print(f"\n[ERROR] {e}") sys.exit(1)