diff --git a/find_all_keys_linux.py b/find_all_keys_linux.py index aac4784..51184e8 100644 --- a/find_all_keys_linux.py +++ b/find_all_keys_linux.py @@ -146,6 +146,9 @@ def main(): except PermissionError: print(f"[WARN] 无法读取 /proc/{pid}/maps,权限不足,跳过") continue + except (FileNotFoundError, ProcessLookupError): + print(f"[WARN] PID {pid} 已退出,跳过") + continue total_bytes = sum(s for _, s in regions) total_mb = total_bytes / 1024 / 1024 @@ -157,6 +160,9 @@ def main(): except PermissionError: print(f"[WARN] 无法打开 /proc/{pid}/mem,权限不足,跳过") continue + except (FileNotFoundError, ProcessLookupError): + print(f"[WARN] PID {pid} 已退出,跳过") + continue try: for reg_idx, (base, size) in enumerate(regions): diff --git a/mcp_server.py b/mcp_server.py index bea71ad..71db380 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -5,14 +5,14 @@ Based on FastMCP (stdio transport), reuses existing decryption. Runs on Windows Python (needs access to D:\ WeChat databases). """ -import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit +import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re import hmac as hmac_mod from datetime import datetime from Crypto.Cipher import AES from mcp.server.fastmcp import FastMCP -import zstandard as zstd -from decode_image import ImageResolver -from key_utils import get_key_info, key_path_variants, strip_key_metadata +import zstandard as zstd +from decode_image import ImageResolver +from key_utils import get_key_info, key_path_variants, strip_key_metadata # ============ 加密常量 ============ PAGE_SZ = 4096 @@ -50,8 +50,8 @@ if not DECODED_IMAGE_DIR: elif not os.path.isabs(DECODED_IMAGE_DIR): DECODED_IMAGE_DIR = os.path.join(SCRIPT_DIR, DECODED_IMAGE_DIR) -with open(KEYS_FILE) as f: - ALL_KEYS = strip_key_metadata(json.load(f)) +with open(KEYS_FILE) as f: + ALL_KEYS = strip_key_metadata(json.load(f)) # ============ 解密函数 ============ @@ -175,13 +175,13 @@ class DBCache: except OSError: pass - def get(self, rel_key): - key_info = get_key_info(ALL_KEYS, rel_key) - if not key_info: - return None - rel_path = rel_key.replace('\\', '/').replace('/', os.sep) - db_path = os.path.join(DB_DIR, rel_path) - wal_path = db_path + "-wal" + def get(self, rel_key): + key_info = get_key_info(ALL_KEYS, rel_key) + if not key_info: + return None + rel_path = rel_key.replace('\\', '/').replace('/', os.sep) + db_path = os.path.join(DB_DIR, rel_path) + wal_path = db_path + "-wal" if not os.path.exists(db_path): return None @@ -197,8 +197,8 @@ class DBCache: return c_path tmp_path = self._cache_path(rel_key) - enc_key = bytes.fromhex(key_info["enc_key"]) - full_decrypt(db_path, tmp_path, enc_key) + enc_key = bytes.fromhex(key_info["enc_key"]) + full_decrypt(db_path, tmp_path, enc_key) if os.path.exists(wal_path): decrypt_wal(wal_path, tmp_path, enc_key) self._cache[rel_key] = (db_mtime, wal_mtime, tmp_path) @@ -332,12 +332,11 @@ def _parse_message_content(content, local_type, is_group): # 消息 DB 的 rel_keys(排除 fts/resource/media/biz) -MSG_DB_KEYS = sorted([ - k for k in ALL_KEYS - if any(v.startswith("message/") for v in key_path_variants(k)) - and any(v.endswith(".db") for v in key_path_variants(k)) - and "fts" not in k and "resource" not in k -]) +MSG_DB_KEYS = sorted([ + k for k in ALL_KEYS + if any(v.startswith("message/") for v in key_path_variants(k)) + and any(re.search(r"message_\d+\.db$", v) for v in key_path_variants(k)) +]) def _find_msg_table_for_user(username):