diff --git a/mcp_server.py b/mcp_server.py index c801d31..d01b06c 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -5,15 +5,15 @@ Based on FastMCP (stdio transport), reuses existing decryption. Runs on Windows Python (needs access to D:\ WeChat databases). """ -import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re -import hmac as hmac_mod -from datetime import datetime -import xml.etree.ElementTree as ET -from Crypto.Cipher import AES -from mcp.server.fastmcp import FastMCP -import zstandard as zstd -from decode_image import ImageResolver -from key_utils import get_key_info, key_path_variants, strip_key_metadata +import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re +import hmac as hmac_mod +from datetime import datetime +import xml.etree.ElementTree as ET +from Crypto.Cipher import AES +from mcp.server.fastmcp import FastMCP +import zstandard as zstd +from decode_image import ImageResolver +from key_utils import get_key_info, key_path_variants, strip_key_metadata # ============ 加密常量 ============ PAGE_SZ = 4096 @@ -220,6 +220,8 @@ atexit.register(_cache.cleanup) _contact_names = None # {username: display_name} _contact_full = None # [{username, nick_name, remark}] _self_username = None +_XML_UNSAFE_RE = re.compile(r' 0xFFFFFFFF: return t & 0xFFFFFFFF, t >> 32 return t, 0 @@ -329,10 +332,10 @@ def _decompress_content(content, ct): return content -def _parse_message_content(content, local_type, is_group): - """解析消息内容,返回 (sender_id, text)""" - if content is None: - return '', '' +def _parse_message_content(content, local_type, is_group): + """解析消息内容,返回 (sender_id, text)""" + if content is None: + return '', '' if isinstance(content, bytes): return '', '(二进制内容)' @@ -341,104 +344,125 @@ def _parse_message_content(content, local_type, is_group): if is_group and ':\n' in content: sender, text = content.split(':\n', 1) - return sender, text - - -def _collapse_text(text): - if not text: - return '' - return re.sub(r'\s+', ' ', text).strip() - - + return sender, text + + +def _collapse_text(text): + if not text: + return '' + return re.sub(r'\s+', ' ', text).strip() + + def _get_self_username(): global _self_username - if _self_username is not None: + if _self_username: return _self_username + if not DB_DIR: + return '' + names = get_contact_names() account_dir = os.path.basename(os.path.dirname(DB_DIR)) candidates = [account_dir] - - m = re.fullmatch(r'(.+)_([0-9a-fA-F]{4,})', account_dir) - if m: - candidates.insert(0, m.group(1)) - + + m = re.fullmatch(r'(.+)_([0-9a-fA-F]{4,})', account_dir) + if m: + candidates.insert(0, m.group(1)) + for candidate in candidates: if candidate and candidate in names: _self_username = candidate return _self_username - _self_username = '' - return _self_username + return '' def _load_name2id_maps(conn): id_to_username = {} - username_to_id = {} try: rows = conn.execute("SELECT rowid, user_name FROM Name2Id").fetchall() except sqlite3.Error: - return id_to_username, username_to_id + return id_to_username for rowid, user_name in rows: if not user_name: continue id_to_username[rowid] = user_name - username_to_id[user_name] = rowid - return id_to_username, username_to_id - - -def _display_name_for_username(username, names): - if not username: - return '' - if username == _get_self_username(): - return 'me' - return names.get(username, username) - - -def _resolve_sender_label(real_sender_id, sender_from_content, is_group, chat_username, chat_display_name, names, id_to_username): - sender_username = id_to_username.get(real_sender_id, '') - - if is_group: - if sender_username and sender_username != chat_username: - return _display_name_for_username(sender_username, names) - if sender_from_content: - return _display_name_for_username(sender_from_content, names) - return '' - - if sender_username == chat_username: - return chat_display_name - if sender_username: - return _display_name_for_username(sender_username, names) - return '' - - + return id_to_username + + +def _display_name_for_username(username, names): + if not username: + return '' + if username == _get_self_username(): + return 'me' + return names.get(username, username) + + +def _resolve_sender_label(real_sender_id, sender_from_content, is_group, chat_username, chat_display_name, names, id_to_username): + sender_username = id_to_username.get(real_sender_id, '') + + if is_group: + if sender_username and sender_username != chat_username: + return _display_name_for_username(sender_username, names) + if sender_from_content: + return _display_name_for_username(sender_from_content, names) + return '' + + if sender_username == chat_username: + return chat_display_name + if sender_username: + return _display_name_for_username(sender_username, names) + return '' + + def _resolve_quote_sender_label(ref_user, ref_display_name, is_group, chat_username, chat_display_name, names): if is_group: if ref_user: return _display_name_for_username(ref_user, names) return ref_display_name or '' + self_username = _get_self_username() if ref_user: if ref_user == chat_username: return chat_display_name - return 'me' + if self_username and ref_user == self_username: + return 'me' + return names.get(ref_user, ref_display_name or ref_user) if ref_display_name: if ref_display_name == chat_display_name: return chat_display_name - return 'me' + self_display_name = names.get(self_username, self_username) if self_username else '' + if self_display_name and ref_display_name == self_display_name: + return 'me' + return ref_display_name return '' +def _parse_xml_root(content): + if not content or len(content) > _XML_PARSE_MAX_LEN or _XML_UNSAFE_RE.search(content): + return None + + try: + return ET.fromstring(content) + except ET.ParseError: + return None + + +def _parse_int(value, fallback=0): + try: + return int(value) + except (TypeError, ValueError): + return fallback + + def _format_app_message_text(content, local_type, is_group, chat_username, chat_display_name, names): if not content or ' 160: - ref_content = ref_content[:160] + "..." - - quote_text = title or "[引用消息]" - if ref_content: - ref_label = _resolve_quote_sender_label( - ref_user, ref_display_name, is_group, chat_username, chat_display_name, names - ) - prefix = f"回复 {ref_label}: " if ref_label else "回复: " - quote_text += f"\n ↳ {prefix}{ref_content}" - return quote_text - - if app_type == 6: - return f"[文件] {title}" if title else "[文件]" - if app_type == 5: - return f"[链接] {title}" if title else "[链接]" - if app_type in (33, 36, 44): - return f"[小程序] {title}" if title else "[小程序]" + app_type = _parse_int(app_type_text, _parse_int(sub_type, 0)) + + if app_type == 57: + ref = appmsg.find('.//refermsg') + ref_user = '' + ref_display_name = '' + ref_content = '' + if ref is not None: + ref_user = (ref.findtext('fromusr') or '').strip() + ref_display_name = (ref.findtext('displayname') or '').strip() + ref_content = _collapse_text(ref.findtext('content') or '') + if len(ref_content) > 160: + ref_content = ref_content[:160] + "..." + + quote_text = title or "[引用消息]" + if ref_content: + ref_label = _resolve_quote_sender_label( + ref_user, ref_display_name, is_group, chat_username, chat_display_name, names + ) + prefix = f"回复 {ref_label}: " if ref_label else "回复: " + quote_text += f"\n ↳ {prefix}{ref_content}" + return quote_text + + if app_type == 6: + return f"[文件] {title}" if title else "[文件]" + if app_type == 5: + return f"[链接] {title}" if title else "[链接]" + if app_type in (33, 36, 44): + return f"[小程序] {title}" if title else "[小程序]" if title: return f"[链接/文件] {title}" return "[链接/文件]" +def _format_voip_message_text(content): + if not content or ' str: conn = sqlite3.connect(db_path) try: - id_to_username, _ = _load_name2id_maps(conn) + id_to_username = _load_name2id_maps(conn) rows = conn.execute(f""" SELECT local_id, local_type, create_time, real_sender_id, message_content, WCDB_CT_message_content - FROM [{table_name}] - ORDER BY create_time DESC - LIMIT ? - """, (limit,)).fetchall() + FROM [{table_name}] + ORDER BY create_time DESC + LIMIT ? + """, (limit,)).fetchall() except Exception as e: conn.close() return f"查询失败: {e}" @@ -639,30 +700,30 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str: if not rows: return f"{display_name} 无消息记录" - - lines = [] - for local_id, local_type, create_time, real_sender_id, content, ct in reversed(rows): - time_str = datetime.fromtimestamp(create_time).strftime('%m-%d %H:%M') - - # zstd 解压 - content = _decompress_content(content, ct) - if content is None: - content = '(无法解压)' - - sender, text = _format_message_text( - local_id, local_type, content, is_group, username, display_name, names - ) - - if text and len(text) > 500: - text = text[:500] + "..." - - sender_label = _resolve_sender_label( - real_sender_id, sender, is_group, username, display_name, names, id_to_username - ) - if sender_label: - lines.append(f"[{time_str}] {sender_label}: {text}") - else: - lines.append(f"[{time_str}] {text}") + + lines = [] + for local_id, local_type, create_time, real_sender_id, content, ct in reversed(rows): + time_str = datetime.fromtimestamp(create_time).strftime('%m-%d %H:%M') + + # zstd 解压 + content = _decompress_content(content, ct) + if content is None: + content = '(无法解压)' + + sender, text = _format_message_text( + local_id, local_type, content, is_group, username, display_name, names + ) + + if text and len(text) > 500: + text = text[:500] + "..." + + sender_label = _resolve_sender_label( + real_sender_id, sender, is_group, username, display_name, names, id_to_username + ) + if sender_label: + lines.append(f"[{time_str}] {sender_label}: {text}") + else: + lines.append(f"[{time_str}] {text}") header = f"{display_name} 的最近 {len(lines)} 条消息" if is_group: