Improve chat history formatting

feat/daemon-cli
Banghao Chi 2026-03-08 15:30:10 -05:00
parent a5a347f69e
commit fa273b810d
1 changed files with 229 additions and 63 deletions

View File

@ -5,14 +5,15 @@ Based on FastMCP (stdio transport), reuses existing decryption.
Runs on Windows Python (needs access to D:\ WeChat databases). Runs on Windows Python (needs access to D:\ WeChat databases).
""" """
import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re
import hmac as hmac_mod import hmac as hmac_mod
from datetime import datetime from datetime import datetime
from Crypto.Cipher import AES import xml.etree.ElementTree as ET
from mcp.server.fastmcp import FastMCP from Crypto.Cipher import AES
import zstandard as zstd from mcp.server.fastmcp import FastMCP
from decode_image import ImageResolver import zstandard as zstd
from key_utils import get_key_info, key_path_variants, strip_key_metadata from decode_image import ImageResolver
from key_utils import get_key_info, key_path_variants, strip_key_metadata
# ============ 加密常量 ============ # ============ 加密常量 ============
PAGE_SZ = 4096 PAGE_SZ = 4096
@ -216,8 +217,9 @@ atexit.register(_cache.cleanup)
# ============ 联系人缓存 ============ # ============ 联系人缓存 ============
_contact_names = None # {username: display_name} _contact_names = None # {username: display_name}
_contact_full = None # [{username, nick_name, remark}] _contact_full = None # [{username, nick_name, remark}]
_self_username = None
def _load_contacts_from(db_path): def _load_contacts_from(db_path):
@ -261,21 +263,32 @@ def get_contact_names():
return {} return {}
def get_contact_full(): def get_contact_full():
global _contact_full global _contact_full
if _contact_full is None: if _contact_full is None:
get_contact_names() get_contact_names()
return _contact_full or [] return _contact_full or []
# ============ 辅助函数 ============ # ============ 辅助函数 ============
def format_msg_type(t): def format_msg_type(t):
return { base_type, _ = _split_msg_type(t)
1: '文本', 3: '图片', 34: '语音', 42: '名片', return {
43: '视频', 47: '表情', 48: '位置', 49: '链接/文件', 1: '文本', 3: '图片', 34: '语音', 42: '名片',
50: '通话', 10000: '系统', 10002: '撤回', 43: '视频', 47: '表情', 48: '位置', 49: '链接/文件',
}.get(t, f'type={t}') 50: '通话', 10000: '系统', 10002: '撤回',
}.get(base_type, f'type={t}')
def _split_msg_type(t):
try:
t = int(t)
except (TypeError, ValueError):
return 0, 0
if t > 0xFFFFFFFF:
return t & 0xFFFFFFFF, t >> 32
return t, 0
def resolve_username(chat_name): def resolve_username(chat_name):
@ -316,10 +329,10 @@ def _decompress_content(content, ct):
return content return content
def _parse_message_content(content, local_type, is_group): def _parse_message_content(content, local_type, is_group):
"""解析消息内容,返回 (sender_id, text)""" """解析消息内容,返回 (sender_id, text)"""
if content is None: if content is None:
return '', '' return '', ''
if isinstance(content, bytes): if isinstance(content, bytes):
return '', '(二进制内容)' return '', '(二进制内容)'
@ -328,7 +341,163 @@ def _parse_message_content(content, local_type, is_group):
if is_group and ':\n' in content: if is_group and ':\n' in content:
sender, text = content.split(':\n', 1) sender, text = content.split(':\n', 1)
return sender, text return sender, text
def _collapse_text(text):
if not text:
return ''
return re.sub(r'\s+', ' ', text).strip()
def _get_self_username():
global _self_username
if _self_username is not None:
return _self_username
names = get_contact_names()
account_dir = os.path.basename(os.path.dirname(DB_DIR))
candidates = [account_dir]
m = re.fullmatch(r'(.+)_([0-9a-fA-F]{4,})', account_dir)
if m:
candidates.insert(0, m.group(1))
for candidate in candidates:
if candidate and candidate in names:
_self_username = candidate
return _self_username
_self_username = ''
return _self_username
def _load_name2id_maps(conn):
id_to_username = {}
username_to_id = {}
try:
rows = conn.execute("SELECT rowid, user_name FROM Name2Id").fetchall()
except sqlite3.Error:
return id_to_username, username_to_id
for rowid, user_name in rows:
if not user_name:
continue
id_to_username[rowid] = user_name
username_to_id[user_name] = rowid
return id_to_username, username_to_id
def _display_name_for_username(username, names):
if not username:
return ''
if username == _get_self_username():
return 'me'
return names.get(username, username)
def _resolve_sender_label(real_sender_id, sender_from_content, is_group, chat_username, chat_display_name, names, id_to_username):
sender_username = id_to_username.get(real_sender_id, '')
if is_group:
if sender_username and sender_username != chat_username:
return _display_name_for_username(sender_username, names)
if sender_from_content:
return _display_name_for_username(sender_from_content, names)
return ''
if sender_username == chat_username:
return chat_display_name
if sender_username:
return _display_name_for_username(sender_username, names)
return ''
def _resolve_quote_sender_label(ref_user, ref_display_name, is_group, chat_username, chat_display_name, names):
if is_group:
if ref_user:
return _display_name_for_username(ref_user, names)
return ref_display_name or ''
if ref_user:
if ref_user == chat_username:
return chat_display_name
return 'me'
if ref_display_name:
if ref_display_name == chat_display_name:
return chat_display_name
return 'me'
return ''
def _format_app_message_text(content, local_type, is_group, chat_username, chat_display_name, names):
if not content or '<appmsg' not in content:
return None
_, sub_type = _split_msg_type(local_type)
try:
root = ET.fromstring(content)
except ET.ParseError:
return None
appmsg = root.find('.//appmsg')
if appmsg is None:
return None
title = _collapse_text(appmsg.findtext('title') or '')
app_type_text = (appmsg.findtext('type') or '').strip()
app_type = int(app_type_text or sub_type or 0)
if app_type == 57:
ref = appmsg.find('.//refermsg')
ref_user = ''
ref_display_name = ''
ref_content = ''
if ref is not None:
ref_user = (ref.findtext('fromusr') or '').strip()
ref_display_name = (ref.findtext('displayname') or '').strip()
ref_content = _collapse_text(ref.findtext('content') or '')
if len(ref_content) > 160:
ref_content = ref_content[:160] + "..."
quote_text = title or "[引用消息]"
if ref_content:
ref_label = _resolve_quote_sender_label(
ref_user, ref_display_name, is_group, chat_username, chat_display_name, names
)
prefix = f"回复 {ref_label}: " if ref_label else "回复: "
quote_text += f"\n{prefix}{ref_content}"
return quote_text
if app_type == 6:
return f"[文件] {title}" if title else "[文件]"
if app_type == 5:
return f"[链接] {title}" if title else "[链接]"
if app_type in (33, 36, 44):
return f"[小程序] {title}" if title else "[小程序]"
if title:
return f"[链接/文件] {title}"
return "[链接/文件]"
def _format_message_text(local_id, local_type, content, is_group, chat_username, chat_display_name, names):
sender_from_content, text = _parse_message_content(content, local_type, is_group)
base_type, _ = _split_msg_type(local_type)
if base_type == 3:
text = f"[图片] (local_id={local_id})"
elif base_type == 47:
text = "[表情]"
elif base_type == 49:
text = _format_app_message_text(
text, local_type, is_group, chat_username, chat_display_name, names
) or "[链接/文件]"
elif base_type != 1:
type_label = format_msg_type(local_type)
text = f"[{type_label}] {text}" if text else f"[{type_label}]"
return sender_from_content, text
# 消息 DB 的 rel_keys # 消息 DB 的 rel_keys
@ -453,15 +622,16 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str:
if not db_path: if not db_path:
return f"找不到 {display_name} 的消息记录可能在未解密的DB中或无消息" return f"找不到 {display_name} 的消息记录可能在未解密的DB中或无消息"
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
try: try:
rows = conn.execute(f""" id_to_username, _ = _load_name2id_maps(conn)
SELECT local_id, local_type, create_time, message_content, rows = conn.execute(f"""
WCDB_CT_message_content SELECT local_id, local_type, create_time, real_sender_id, message_content,
FROM [{table_name}] WCDB_CT_message_content
ORDER BY create_time DESC FROM [{table_name}]
LIMIT ? ORDER BY create_time DESC
""", (limit,)).fetchall() LIMIT ?
""", (limit,)).fetchall()
except Exception as e: except Exception as e:
conn.close() conn.close()
return f"查询失败: {e}" return f"查询失败: {e}"
@ -469,34 +639,30 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str:
if not rows: if not rows:
return f"{display_name} 无消息记录" return f"{display_name} 无消息记录"
lines = [] lines = []
for local_id, local_type, create_time, content, ct in reversed(rows): for local_id, local_type, create_time, real_sender_id, content, ct in reversed(rows):
time_str = datetime.fromtimestamp(create_time).strftime('%m-%d %H:%M') time_str = datetime.fromtimestamp(create_time).strftime('%m-%d %H:%M')
# zstd 解压 # zstd 解压
content = _decompress_content(content, ct) content = _decompress_content(content, ct)
if content is None: if content is None:
content = '(无法解压)' content = '(无法解压)'
sender, text = _parse_message_content(content, local_type, is_group) sender, text = _format_message_text(
local_id, local_type, content, is_group, username, display_name, names
if local_type == 3: )
text = f"[图片] (local_id={local_id})"
elif local_type == 47: if text and len(text) > 500:
text = "[表情]" text = text[:500] + "..."
elif local_type != 1:
type_label = format_msg_type(local_type) sender_label = _resolve_sender_label(
text = f"[{type_label}] {text}" if text else f"[{type_label}]" real_sender_id, sender, is_group, username, display_name, names, id_to_username
)
if text and len(text) > 500: if sender_label:
text = text[:500] + "..." lines.append(f"[{time_str}] {sender_label}: {text}")
else:
if is_group and sender: lines.append(f"[{time_str}] {text}")
sender_name = names.get(sender, sender)
lines.append(f"[{time_str}] {sender_name}: {text}")
else:
lines.append(f"[{time_str}] {text}")
header = f"{display_name} 的最近 {len(lines)} 条消息" header = f"{display_name} 的最近 {len(lines)} 条消息"
if is_group: if is_group: