Merge pull request #19 from BiboyQG/feat/chat-history-formatting

功能改进实用,问题不阻塞合并。
feat/daemon-cli
joshua-deng 2026-03-09 19:48:06 +08:00 committed by GitHub
commit 64b2c9fdef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 260 additions and 33 deletions

View File

@ -8,6 +8,7 @@ Runs on Windows Python (needs access to D:\ WeChat databases).
import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re
import hmac as hmac_mod import hmac as hmac_mod
from datetime import datetime from datetime import datetime
import xml.etree.ElementTree as ET
from Crypto.Cipher import AES from Crypto.Cipher import AES
from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import FastMCP
import zstandard as zstd import zstandard as zstd
@ -216,8 +217,11 @@ atexit.register(_cache.cleanup)
# ============ 联系人缓存 ============ # ============ 联系人缓存 ============
_contact_names = None # {username: display_name} _contact_names = None # {username: display_name}
_contact_full = None # [{username, nick_name, remark}] _contact_full = None # [{username, nick_name, remark}]
_self_username = None
_XML_UNSAFE_RE = re.compile(r'<!DOCTYPE|<!ENTITY', re.IGNORECASE)
_XML_PARSE_MAX_LEN = 20000
def _load_contacts_from(db_path): def _load_contacts_from(db_path):
@ -271,11 +275,23 @@ def get_contact_full():
# ============ 辅助函数 ============ # ============ 辅助函数 ============
def format_msg_type(t): def format_msg_type(t):
base_type, _ = _split_msg_type(t)
return { return {
1: '文本', 3: '图片', 34: '语音', 42: '名片', 1: '文本', 3: '图片', 34: '语音', 42: '名片',
43: '视频', 47: '表情', 48: '位置', 49: '链接/文件', 43: '视频', 47: '表情', 48: '位置', 49: '链接/文件',
50: '通话', 10000: '系统', 10002: '撤回', 50: '通话', 10000: '系统', 10002: '撤回',
}.get(t, f'type={t}') }.get(base_type, f'type={t}')
def _split_msg_type(t):
try:
t = int(t)
except (TypeError, ValueError):
return 0, 0
# WeChat packs the base type into the low 32 bits and app subtype into the high 32 bits.
if t > 0xFFFFFFFF:
return t & 0xFFFFFFFF, t >> 32
return t, 0
def resolve_username(chat_name): def resolve_username(chat_name):
@ -331,24 +347,238 @@ def _parse_message_content(content, local_type, is_group):
return sender, text return sender, text
# 消息 DB 的 rel_keys def _collapse_text(text):
# 用 message_\d+\.db$ 匹配,自然排除 message_resource.db / message_fts_*.db if not text:
MSG_DB_KEYS = sorted([ return ''
return re.sub(r'\s+', ' ', text).strip()
def _get_self_username():
global _self_username
if _self_username:
return _self_username
if not DB_DIR:
return ''
names = get_contact_names()
account_dir = os.path.basename(os.path.dirname(DB_DIR))
candidates = [account_dir]
m = re.fullmatch(r'(.+)_([0-9a-fA-F]{4,})', account_dir)
if m:
candidates.insert(0, m.group(1))
for candidate in candidates:
if candidate and candidate in names:
_self_username = candidate
return _self_username
return ''
def _load_name2id_maps(conn):
id_to_username = {}
try:
rows = conn.execute("SELECT rowid, user_name FROM Name2Id").fetchall()
except sqlite3.Error:
return id_to_username
for rowid, user_name in rows:
if not user_name:
continue
id_to_username[rowid] = user_name
return id_to_username
def _display_name_for_username(username, names):
if not username:
return ''
if username == _get_self_username():
return 'me'
return names.get(username, username)
def _resolve_sender_label(real_sender_id, sender_from_content, is_group, chat_username, chat_display_name, names, id_to_username):
sender_username = id_to_username.get(real_sender_id, '')
if is_group:
if sender_username and sender_username != chat_username:
return _display_name_for_username(sender_username, names)
if sender_from_content:
return _display_name_for_username(sender_from_content, names)
return ''
if sender_username == chat_username:
return chat_display_name
if sender_username:
return _display_name_for_username(sender_username, names)
return ''
def _resolve_quote_sender_label(ref_user, ref_display_name, is_group, chat_username, chat_display_name, names):
if is_group:
if ref_user:
return _display_name_for_username(ref_user, names)
return ref_display_name or ''
self_username = _get_self_username()
if ref_user:
if ref_user == chat_username:
return chat_display_name
if self_username and ref_user == self_username:
return 'me'
return names.get(ref_user, ref_display_name or ref_user)
if ref_display_name:
if ref_display_name == chat_display_name:
return chat_display_name
self_display_name = names.get(self_username, self_username) if self_username else ''
if self_display_name and ref_display_name == self_display_name:
return 'me'
return ref_display_name
return ''
def _parse_xml_root(content):
if not content or len(content) > _XML_PARSE_MAX_LEN or _XML_UNSAFE_RE.search(content):
return None
try:
return ET.fromstring(content)
except ET.ParseError:
return None
def _parse_int(value, fallback=0):
try:
return int(value)
except (TypeError, ValueError):
return fallback
def _format_app_message_text(content, local_type, is_group, chat_username, chat_display_name, names):
if not content or '<appmsg' not in content:
return None
_, sub_type = _split_msg_type(local_type)
root = _parse_xml_root(content)
if root is None:
return None
appmsg = root.find('.//appmsg')
if appmsg is None:
return None
title = _collapse_text(appmsg.findtext('title') or '')
app_type_text = (appmsg.findtext('type') or '').strip()
app_type = _parse_int(app_type_text, _parse_int(sub_type, 0))
if app_type == 57:
ref = appmsg.find('.//refermsg')
ref_user = ''
ref_display_name = ''
ref_content = ''
if ref is not None:
ref_user = (ref.findtext('fromusr') or '').strip()
ref_display_name = (ref.findtext('displayname') or '').strip()
ref_content = _collapse_text(ref.findtext('content') or '')
if len(ref_content) > 160:
ref_content = ref_content[:160] + "..."
quote_text = title or "[引用消息]"
if ref_content:
ref_label = _resolve_quote_sender_label(
ref_user, ref_display_name, is_group, chat_username, chat_display_name, names
)
prefix = f"回复 {ref_label}: " if ref_label else "回复: "
quote_text += f"\n{prefix}{ref_content}"
return quote_text
if app_type == 6:
return f"[文件] {title}" if title else "[文件]"
if app_type == 5:
return f"[链接] {title}" if title else "[链接]"
if app_type in (33, 36, 44):
return f"[小程序] {title}" if title else "[小程序]"
if title:
return f"[链接/文件] {title}"
return "[链接/文件]"
def _format_voip_message_text(content):
if not content or '<voip' not in content:
return None
root = _parse_xml_root(content)
if root is None:
return "[通话]"
raw_text = _collapse_text(root.findtext('.//msg') or '')
if not raw_text:
return "[通话]"
status_map = {
'Canceled': '已取消',
'Line busy': '对方忙线',
'Already answered elsewhere': '已在其他设备接听',
'Declined on other device': '已在其他设备拒接',
'Call canceled by caller': '主叫已取消',
'Call not answered': '未接听',
"Call wasn't answered": '未接听',
}
if raw_text.startswith('Duration:'):
duration = raw_text.split(':', 1)[1].strip()
return f"[通话] 通话时长 {duration}" if duration else "[通话]"
return f"[通话] {status_map.get(raw_text, raw_text)}"
def _format_message_text(local_id, local_type, content, is_group, chat_username, chat_display_name, names):
sender_from_content, text = _parse_message_content(content, local_type, is_group)
base_type, _ = _split_msg_type(local_type)
if base_type == 3:
text = f"[图片] (local_id={local_id})"
elif base_type == 47:
text = "[表情]"
elif base_type == 50:
text = _format_voip_message_text(text) or "[通话]"
elif base_type == 49:
text = _format_app_message_text(
text, local_type, is_group, chat_username, chat_display_name, names
) or "[链接/文件]"
elif base_type != 1:
type_label = format_msg_type(local_type)
text = f"[{type_label}] {text}" if text else f"[{type_label}]"
return sender_from_content, text
def _is_safe_msg_table_name(table_name):
return bool(re.fullmatch(r'Msg_[0-9a-f]{32}', table_name))
# 消息 DB 的 rel_keys
# 用 message_\d+\.db$ 匹配,自然排除 message_resource.db / message_fts_*.db
MSG_DB_KEYS = sorted([
k for k in ALL_KEYS k for k in ALL_KEYS
if any(v.startswith("message/") for v in key_path_variants(k)) if any(v.startswith("message/") for v in key_path_variants(k))
and any(re.search(r"message_\d+\.db$", v) for v in key_path_variants(k)) and any(re.search(r"message_\d+\.db$", v) for v in key_path_variants(k))
]) ])
def _find_msg_table_for_user(username): def _find_msg_table_for_user(username):
"""在所有 message_N.db 中查找用户的消息表,返回 (db_path, table_name)""" """在所有 message_N.db 中查找用户的消息表,返回 (db_path, table_name)"""
table_hash = hashlib.md5(username.encode()).hexdigest() table_hash = hashlib.md5(username.encode()).hexdigest()
table_name = f"Msg_{table_hash}" table_name = f"Msg_{table_hash}"
if not _is_safe_msg_table_name(table_name):
for rel_key in MSG_DB_KEYS: return None, None
path = _cache.get(rel_key)
if not path: for rel_key in MSG_DB_KEYS:
continue path = _cache.get(rel_key)
if not path:
continue
conn = sqlite3.connect(path) conn = sqlite3.connect(path)
try: try:
exists = conn.execute( exists = conn.execute(
@ -453,11 +683,12 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str:
if not db_path: if not db_path:
return f"找不到 {display_name} 的消息记录可能在未解密的DB中或无消息" return f"找不到 {display_name} 的消息记录可能在未解密的DB中或无消息"
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
try: try:
rows = conn.execute(f""" id_to_username = _load_name2id_maps(conn)
SELECT local_id, local_type, create_time, message_content, rows = conn.execute(f"""
WCDB_CT_message_content SELECT local_id, local_type, create_time, real_sender_id, message_content,
WCDB_CT_message_content
FROM [{table_name}] FROM [{table_name}]
ORDER BY create_time DESC ORDER BY create_time DESC
LIMIT ? LIMIT ?
@ -471,7 +702,7 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str:
return f"{display_name} 无消息记录" return f"{display_name} 无消息记录"
lines = [] lines = []
for local_id, local_type, create_time, content, ct in reversed(rows): for local_id, local_type, create_time, real_sender_id, content, ct in reversed(rows):
time_str = datetime.fromtimestamp(create_time).strftime('%m-%d %H:%M') time_str = datetime.fromtimestamp(create_time).strftime('%m-%d %H:%M')
# zstd 解压 # zstd 解压
@ -479,22 +710,18 @@ def get_chat_history(chat_name: str, limit: int = 50) -> str:
if content is None: if content is None:
content = '(无法解压)' content = '(无法解压)'
sender, text = _parse_message_content(content, local_type, is_group) sender, text = _format_message_text(
local_id, local_type, content, is_group, username, display_name, names
if local_type == 3: )
text = f"[图片] (local_id={local_id})"
elif local_type == 47:
text = "[表情]"
elif local_type != 1:
type_label = format_msg_type(local_type)
text = f"[{type_label}] {text}" if text else f"[{type_label}]"
if text and len(text) > 500: if text and len(text) > 500:
text = text[:500] + "..." text = text[:500] + "..."
if is_group and sender: sender_label = _resolve_sender_label(
sender_name = names.get(sender, sender) real_sender_id, sender, is_group, username, display_name, names, id_to_username
lines.append(f"[{time_str}] {sender_name}: {text}") )
if sender_label:
lines.append(f"[{time_str}] {sender_label}: {text}")
else: else:
lines.append(f"[{time_str}] {text}") lines.append(f"[{time_str}] {text}")