diff --git a/mcp_server.py b/mcp_server.py index c6b4df0..d01b06c 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -8,6 +8,7 @@ Runs on Windows Python (needs access to D:\ WeChat databases). import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re import hmac as hmac_mod from datetime import datetime +import xml.etree.ElementTree as ET from Crypto.Cipher import AES from mcp.server.fastmcp import FastMCP import zstandard as zstd @@ -216,8 +217,11 @@ atexit.register(_cache.cleanup) # ============ 联系人缓存 ============ -_contact_names = None # {username: display_name} -_contact_full = None # [{username, nick_name, remark}] +_contact_names = None # {username: display_name} +_contact_full = None # [{username, nick_name, remark}] +_self_username = None +_XML_UNSAFE_RE = re.compile(r' 0xFFFFFFFF: + return t & 0xFFFFFFFF, t >> 32 + return t, 0 def resolve_username(chat_name): @@ -331,24 +347,238 @@ def _parse_message_content(content, local_type, is_group): return sender, text -# 消息 DB 的 rel_keys -# 用 message_\d+\.db$ 匹配,自然排除 message_resource.db / message_fts_*.db -MSG_DB_KEYS = sorted([ +def _collapse_text(text): + if not text: + return '' + return re.sub(r'\s+', ' ', text).strip() + + +def _get_self_username(): + global _self_username + if _self_username: + return _self_username + + if not DB_DIR: + return '' + + names = get_contact_names() + account_dir = os.path.basename(os.path.dirname(DB_DIR)) + candidates = [account_dir] + + m = re.fullmatch(r'(.+)_([0-9a-fA-F]{4,})', account_dir) + if m: + candidates.insert(0, m.group(1)) + + for candidate in candidates: + if candidate and candidate in names: + _self_username = candidate + return _self_username + + return '' + + +def _load_name2id_maps(conn): + id_to_username = {} + try: + rows = conn.execute("SELECT rowid, user_name FROM Name2Id").fetchall() + except sqlite3.Error: + return id_to_username + + for rowid, user_name in rows: + if not user_name: + continue + id_to_username[rowid] = user_name + return id_to_username + + +def _display_name_for_username(username, names): + if not username: + return '' + if username == _get_self_username(): + return 'me' + return names.get(username, username) + + +def _resolve_sender_label(real_sender_id, sender_from_content, is_group, chat_username, chat_display_name, names, id_to_username): + sender_username = id_to_username.get(real_sender_id, '') + + if is_group: + if sender_username and sender_username != chat_username: + return _display_name_for_username(sender_username, names) + if sender_from_content: + return _display_name_for_username(sender_from_content, names) + return '' + + if sender_username == chat_username: + return chat_display_name + if sender_username: + return _display_name_for_username(sender_username, names) + return '' + + +def _resolve_quote_sender_label(ref_user, ref_display_name, is_group, chat_username, chat_display_name, names): + if is_group: + if ref_user: + return _display_name_for_username(ref_user, names) + return ref_display_name or '' + + self_username = _get_self_username() + if ref_user: + if ref_user == chat_username: + return chat_display_name + if self_username and ref_user == self_username: + return 'me' + return names.get(ref_user, ref_display_name or ref_user) + if ref_display_name: + if ref_display_name == chat_display_name: + return chat_display_name + self_display_name = names.get(self_username, self_username) if self_username else '' + if self_display_name and ref_display_name == self_display_name: + return 'me' + return ref_display_name + return '' + + +def _parse_xml_root(content): + if not content or len(content) > _XML_PARSE_MAX_LEN or _XML_UNSAFE_RE.search(content): + return None + + try: + return ET.fromstring(content) + except ET.ParseError: + return None + + +def _parse_int(value, fallback=0): + try: + return int(value) + except (TypeError, ValueError): + return fallback + + +def _format_app_message_text(content, local_type, is_group, chat_username, chat_display_name, names): + if not content or ' 160: + ref_content = ref_content[:160] + "..." + + quote_text = title or "[引用消息]" + if ref_content: + ref_label = _resolve_quote_sender_label( + ref_user, ref_display_name, is_group, chat_username, chat_display_name, names + ) + prefix = f"回复 {ref_label}: " if ref_label else "回复: " + quote_text += f"\n ↳ {prefix}{ref_content}" + return quote_text + + if app_type == 6: + return f"[文件] {title}" if title else "[文件]" + if app_type == 5: + return f"[链接] {title}" if title else "[链接]" + if app_type in (33, 36, 44): + return f"[小程序] {title}" if title else "[小程序]" + if title: + return f"[链接/文件] {title}" + return "[链接/文件]" + + +def _format_voip_message_text(content): + if not content or ' str: if not db_path: return f"找不到 {display_name} 的消息记录(可能在未解密的DB中或无消息)" - conn = sqlite3.connect(db_path) - try: - rows = conn.execute(f""" - SELECT local_id, local_type, create_time, message_content, - WCDB_CT_message_content + conn = sqlite3.connect(db_path) + try: + id_to_username = _load_name2id_maps(conn) + rows = conn.execute(f""" + SELECT local_id, local_type, create_time, real_sender_id, message_content, + WCDB_CT_message_content FROM [{table_name}] ORDER BY create_time DESC LIMIT ? @@ -471,7 +702,7 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str: return f"{display_name} 无消息记录" lines = [] - for local_id, local_type, create_time, content, ct in reversed(rows): + for local_id, local_type, create_time, real_sender_id, content, ct in reversed(rows): time_str = datetime.fromtimestamp(create_time).strftime('%m-%d %H:%M') # zstd 解压 @@ -479,22 +710,18 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str: if content is None: content = '(无法解压)' - sender, text = _parse_message_content(content, local_type, is_group) - - if local_type == 3: - text = f"[图片] (local_id={local_id})" - elif local_type == 47: - text = "[表情]" - elif local_type != 1: - type_label = format_msg_type(local_type) - text = f"[{type_label}] {text}" if text else f"[{type_label}]" + sender, text = _format_message_text( + local_id, local_type, content, is_group, username, display_name, names + ) if text and len(text) > 500: text = text[:500] + "..." - if is_group and sender: - sender_name = names.get(sender, sender) - lines.append(f"[{time_str}] {sender_name}: {text}") + sender_label = _resolve_sender_label( + real_sender_id, sender, is_group, username, display_name, names, id_to_username + ) + if sender_label: + lines.append(f"[{time_str}] {sender_label}: {text}") else: lines.append(f"[{time_str}] {text}")