From 7eb29b03e8174cce496fc9062a0f4bcf1c0930a5 Mon Sep 17 00:00:00 2001 From: ylytdeng Date: Mon, 6 Apr 2026 09:54:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E8=81=94=E7=B3=BB?= =?UTF-8?q?=E4=BA=BA=E6=A0=87=E7=AD=BE=E6=9F=A5=E8=AF=A2=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 解析 contact.db 的 contact_label 表和 extra_buffer protobuf Field #30, 支持查询标签列表及指定标签下的成员。 - mcp_server.py: 新增 get_contact_tags / get_tag_members MCP 工具 - monitor_web.py: 新增 /api/tags JSON 端点,支持 ?name= 过滤 Co-Authored-By: Claude Opus 4.6 (1M context) --- mcp_server.py | 1399 +++++++++++++++++++++++++++--------------------- monitor_web.py | 104 ++++ 2 files changed, 888 insertions(+), 615 deletions(-) diff --git a/mcp_server.py b/mcp_server.py index 455cf8f..5c5101d 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -5,10 +5,10 @@ Based on FastMCP (stdio transport), reuses existing decryption. Runs on Windows Python (needs access to D:\ WeChat databases). """ -import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re -import hmac as hmac_mod -from contextlib import closing -from datetime import datetime +import os, sys, json, time, sqlite3, tempfile, struct, hashlib, atexit, re +import hmac as hmac_mod +from contextlib import closing +from datetime import datetime import xml.etree.ElementTree as ET from Crypto.Cipher import AES from mcp.server.fastmcp import FastMCP @@ -220,11 +220,12 @@ atexit.register(_cache.cleanup) _contact_names = None # {username: display_name} _contact_full = None # [{username, nick_name, remark}] -_self_username = None -_XML_UNSAFE_RE = re.compile(r'> 3 + wire_type = tag & 0x07 + if wire_type == 0: # varint + while pos < n and data[pos] & 0x80: + pos += 1 + pos += 1 + elif wire_type == 2: # length-delimited + length = 0; shift = 0 + while pos < n: + b = data[pos]; pos += 1 + length |= (b & 0x7f) << shift + if not (b & 0x80): + break + shift += 7 + if field_num == 30: + try: + return data[pos:pos + length].decode('utf-8') + except Exception: + return None + pos += length + elif wire_type == 1: # 64-bit + pos += 8 + elif wire_type == 5: # 32-bit + pos += 4 + else: + break + return None + + +def _load_contact_tags(): + """加载并缓存联系人标签数据""" + global _contact_tags + if _contact_tags is not None: + return _contact_tags + + db_path = _get_contact_db_path() + if not db_path: + return {} + + try: + conn = sqlite3.connect(db_path) + except Exception: + return {} + + try: + # 1. 加载标签定义 + try: + label_rows = conn.execute( + "SELECT label_id_, label_name_, sort_order_ FROM contact_label ORDER BY sort_order_" + ).fetchall() + except sqlite3.OperationalError: + return {} + if not label_rows: + return {} + + labels = {} + for lid, lname, sort_order in label_rows: + labels[lid] = {'name': lname, 'sort_order': sort_order, 'members': []} + + # 2. 扫描联系人的标签关联 + names = get_contact_names() + rows = conn.execute( + "SELECT username, extra_buffer FROM contact WHERE extra_buffer IS NOT NULL" + ).fetchall() + + for username, buf in rows: + label_str = _extract_pb_field_30(buf) + if not label_str: + continue + display = names.get(username, username) + for lid_s in label_str.split(','): + try: + lid = int(lid_s.strip()) + except (ValueError, AttributeError): + continue + if lid in labels: + labels[lid]['members'].append({'username': username, 'display_name': display}) + + _contact_tags = labels + return _contact_tags + except Exception: + return {} + finally: + conn.close() + + # ============ 辅助函数 ============ def format_msg_type(t): @@ -571,12 +682,12 @@ MSG_DB_KEYS = sorted([ ]) -def _find_msg_table_for_user(username): - """在所有 message_N.db 中查找用户的消息表,返回 (db_path, table_name)""" - table_hash = hashlib.md5(username.encode()).hexdigest() - table_name = f"Msg_{table_hash}" - if not _is_safe_msg_table_name(table_name): - return None, None +def _find_msg_table_for_user(username): + """在所有 message_N.db 中查找用户的消息表,返回 (db_path, table_name)""" + table_hash = hashlib.md5(username.encode()).hexdigest() + table_name = f"Msg_{table_hash}" + if not _is_safe_msg_table_name(table_name): + return None, None for rel_key in MSG_DB_KEYS: path = _cache.get(rel_key) @@ -595,54 +706,54 @@ def _find_msg_table_for_user(username): pass finally: conn.close() - - return None, None - - -def _find_msg_tables_for_user(username): - """返回用户在所有 message_N.db 中对应的消息表,按最新消息时间倒序排列。""" - table_hash = hashlib.md5(username.encode()).hexdigest() - table_name = f"Msg_{table_hash}" - if not _is_safe_msg_table_name(table_name): - return [] - - matches = [] - for rel_key in MSG_DB_KEYS: - path = _cache.get(rel_key) - if not path: - continue - conn = sqlite3.connect(path) - try: - exists = conn.execute( - "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", - (table_name,) - ).fetchone() - if not exists: - continue - max_create_time = conn.execute( - f"SELECT MAX(create_time) FROM [{table_name}]" - ).fetchone()[0] or 0 - matches.append({ - 'db_path': path, - 'table_name': table_name, - 'max_create_time': max_create_time, - }) - except Exception: - pass - finally: - conn.close() - - matches.sort(key=lambda item: item['max_create_time'], reverse=True) - return matches + + return None, None -def _validate_pagination(limit, offset=0, limit_max=_QUERY_LIMIT_MAX): - if limit <= 0: - raise ValueError("limit 必须大于 0") - if limit_max is not None and limit > limit_max: - raise ValueError(f"limit 不能大于 {limit_max}") - if offset < 0: - raise ValueError("offset 不能小于 0") +def _find_msg_tables_for_user(username): + """返回用户在所有 message_N.db 中对应的消息表,按最新消息时间倒序排列。""" + table_hash = hashlib.md5(username.encode()).hexdigest() + table_name = f"Msg_{table_hash}" + if not _is_safe_msg_table_name(table_name): + return [] + + matches = [] + for rel_key in MSG_DB_KEYS: + path = _cache.get(rel_key) + if not path: + continue + conn = sqlite3.connect(path) + try: + exists = conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", + (table_name,) + ).fetchone() + if not exists: + continue + max_create_time = conn.execute( + f"SELECT MAX(create_time) FROM [{table_name}]" + ).fetchone()[0] or 0 + matches.append({ + 'db_path': path, + 'table_name': table_name, + 'max_create_time': max_create_time, + }) + except Exception: + pass + finally: + conn.close() + + matches.sort(key=lambda item: item['max_create_time'], reverse=True) + return matches + + +def _validate_pagination(limit, offset=0, limit_max=_QUERY_LIMIT_MAX): + if limit <= 0: + raise ValueError("limit 必须大于 0") + if limit_max is not None and limit > limit_max: + raise ValueError(f"limit 不能大于 {limit_max}") + if offset < 0: + raise ValueError("offset 不能小于 0") def _parse_time_value(value, field_name, is_end=False): @@ -692,59 +803,59 @@ def _build_message_filters(start_ts=None, end_ts=None, keyword=''): return clauses, params -def _query_messages(conn, table_name, start_ts=None, end_ts=None, keyword='', limit=20, offset=0): - if not _is_safe_msg_table_name(table_name): - raise ValueError(f'非法消息表名: {table_name}') - - clauses, params = _build_message_filters(start_ts, end_ts, keyword) - where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else '' - sql = f""" - SELECT local_id, local_type, create_time, real_sender_id, message_content, - WCDB_CT_message_content - FROM [{table_name}] - {where_sql} - ORDER BY create_time DESC - """ - if limit is None: - return conn.execute(sql, params).fetchall() - sql += "\n LIMIT ? OFFSET ?" - return conn.execute(sql, (*params, limit, offset)).fetchall() +def _query_messages(conn, table_name, start_ts=None, end_ts=None, keyword='', limit=20, offset=0): + if not _is_safe_msg_table_name(table_name): + raise ValueError(f'非法消息表名: {table_name}') + + clauses, params = _build_message_filters(start_ts, end_ts, keyword) + where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else '' + sql = f""" + SELECT local_id, local_type, create_time, real_sender_id, message_content, + WCDB_CT_message_content + FROM [{table_name}] + {where_sql} + ORDER BY create_time DESC + """ + if limit is None: + return conn.execute(sql, params).fetchall() + sql += "\n LIMIT ? OFFSET ?" + return conn.execute(sql, (*params, limit, offset)).fetchall() -def _resolve_chat_context(chat_name): - username = resolve_username(chat_name) - if not username: - return None - - names = get_contact_names() - display_name = names.get(username, username) - message_tables = _find_msg_tables_for_user(username) - if not message_tables: - return { - 'query': chat_name, - 'username': username, - 'display_name': display_name, - 'db_path': None, - 'table_name': None, - 'message_tables': [], - 'is_group': '@chatroom' in username, - } - - primary = message_tables[0] - return { - 'query': chat_name, - 'username': username, - 'display_name': display_name, - 'db_path': primary['db_path'], - 'table_name': primary['table_name'], - 'message_tables': message_tables, - 'is_group': '@chatroom' in username, - } +def _resolve_chat_context(chat_name): + username = resolve_username(chat_name) + if not username: + return None + + names = get_contact_names() + display_name = names.get(username, username) + message_tables = _find_msg_tables_for_user(username) + if not message_tables: + return { + 'query': chat_name, + 'username': username, + 'display_name': display_name, + 'db_path': None, + 'table_name': None, + 'message_tables': [], + 'is_group': '@chatroom' in username, + } + + primary = message_tables[0] + return { + 'query': chat_name, + 'username': username, + 'display_name': display_name, + 'db_path': primary['db_path'], + 'table_name': primary['table_name'], + 'message_tables': message_tables, + 'is_group': '@chatroom' in username, + } -def _resolve_chat_contexts(chat_names): - if not chat_names: - raise ValueError('chat_names 不能为空') +def _resolve_chat_contexts(chat_names): + if not chat_names: + raise ValueError('chat_names 不能为空') resolved = [] unresolved = [] @@ -760,50 +871,50 @@ def _resolve_chat_contexts(chat_names): if not ctx: unresolved.append(name) continue - if not ctx['message_tables']: - missing_tables.append(ctx['display_name']) - continue - if ctx['username'] in seen: - continue + if not ctx['message_tables']: + missing_tables.append(ctx['display_name']) + continue + if ctx['username'] in seen: + continue seen.add(ctx['username']) resolved.append(ctx) - - return resolved, unresolved, missing_tables - - -def _normalize_chat_names(chat_name): - if chat_name is None: - return [] - if isinstance(chat_name, str): - value = chat_name.strip() - return [value] if value else [] - if isinstance(chat_name, (list, tuple, set)): - normalized = [] - for item in chat_name: - if item is None: - continue - value = str(item).strip() - if value: - normalized.append(value) - return normalized - value = str(chat_name).strip() - return [value] if value else [] + + return resolved, unresolved, missing_tables -def _format_history_lines(rows, username, display_name, is_group, names, id_to_username): - lines = [] - ctx = { - 'username': username, - 'display_name': display_name, - 'is_group': is_group, - } - for row in reversed(rows): - _, line = _build_history_line(row, ctx, names, id_to_username) - lines.append(line) - return lines +def _normalize_chat_names(chat_name): + if chat_name is None: + return [] + if isinstance(chat_name, str): + value = chat_name.strip() + return [value] if value else [] + if isinstance(chat_name, (list, tuple, set)): + normalized = [] + for item in chat_name: + if item is None: + continue + value = str(item).strip() + if value: + normalized.append(value) + return normalized + value = str(chat_name).strip() + return [value] if value else [] -def _build_search_entry(row, ctx, names, id_to_username): +def _format_history_lines(rows, username, display_name, is_group, names, id_to_username): + lines = [] + ctx = { + 'username': username, + 'display_name': display_name, + 'is_group': is_group, + } + for row in reversed(rows): + _, line = _build_history_line(row, ctx, names, id_to_username) + lines.append(line) + return lines + + +def _build_search_entry(row, ctx, names, id_to_username): local_id, local_type, create_time, real_sender_id, content, ct = row content = _decompress_content(content, ct) if content is None: @@ -828,346 +939,346 @@ def _build_search_entry(row, ctx, names, id_to_username): entry = f"[{time_str}] [{ctx['display_name']}]" if sender_label: entry += f" {sender_label}:" - entry += f" {text}" - return create_time, entry - - -def _build_history_line(row, ctx, names, id_to_username): - local_id, local_type, create_time, real_sender_id, content, ct = row - time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M') - content = _decompress_content(content, ct) - if content is None: - content = '(无法解压)' - - sender, text = _format_message_text( - local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names - ) - - sender_label = _resolve_sender_label( - real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username - ) - if sender_label: - return create_time, f'[{time_str}] {sender_label}: {text}' - return create_time, f'[{time_str}] {text}' - - -def _get_chat_message_tables(ctx): - if ctx.get('message_tables'): - return ctx['message_tables'] - if ctx.get('db_path') and ctx.get('table_name'): - return [{'db_path': ctx['db_path'], 'table_name': ctx['table_name']}] - return [] - - -def _iter_table_contexts(ctx): - for table in _get_chat_message_tables(ctx): - yield { - 'query': ctx['query'], - 'username': ctx['username'], - 'display_name': ctx['display_name'], - 'db_path': table['db_path'], - 'table_name': table['table_name'], - 'is_group': ctx['is_group'], - } - - -def _candidate_page_size(limit, offset): - return limit + offset - - -def _message_query_batch_size(candidate_limit): - return candidate_limit - - -def _history_query_batch_size(candidate_limit): - return min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE) - - -def _page_ranked_entries(entries, limit, offset): - ordered = sorted(entries, key=lambda item: item[0], reverse=True) - paged = ordered[offset:offset + limit] - paged.sort(key=lambda item: item[0]) - return paged - - -def _collect_chat_history_lines(ctx, names, start_ts=None, end_ts=None, limit=20, offset=0): - collected = [] - failures = [] - candidate_limit = _candidate_page_size(limit, offset) - batch_size = _history_query_batch_size(candidate_limit) - - for table_ctx in _iter_table_contexts(ctx): - try: - with closing(sqlite3.connect(table_ctx['db_path'])) as conn: - id_to_username = _load_name2id_maps(conn) - fetch_offset = 0 - collected_before_table = len(collected) - # 当前页上的消息一定落在各分表最近的 offset+limit 条记录内。 - while len(collected) - collected_before_table < candidate_limit: - rows = _query_messages( - conn, - table_ctx['table_name'], - start_ts=start_ts, - end_ts=end_ts, - limit=batch_size, - offset=fetch_offset, - ) - if not rows: - break - fetch_offset += len(rows) - - for row in rows: - try: - collected.append(_build_history_line(row, table_ctx, names, id_to_username)) - except Exception as e: - failures.append( - f"{table_ctx['display_name']} local_id={row[0]} create_time={row[2]}: {e}" - ) - if len(collected) - collected_before_table >= candidate_limit: - break - - if len(rows) < batch_size: - break - except Exception as e: - failures.append(f"{table_ctx['db_path']}: {e}") - - paged = _page_ranked_entries(collected, limit, offset) - return [line for _, line in paged], failures - - -def _collect_chat_search_entries(ctx, names, keyword, start_ts=None, end_ts=None, candidate_limit=20): - collected = [] - failures = [] - contexts_by_db = {} - for table_ctx in _iter_table_contexts(ctx): - contexts_by_db.setdefault(table_ctx['db_path'], []).append(table_ctx) - - for db_path, db_contexts in contexts_by_db.items(): - try: - with closing(sqlite3.connect(db_path)) as conn: - db_entries, db_failures = _collect_search_entries( - conn, - db_contexts, - names, - keyword, - start_ts=start_ts, - end_ts=end_ts, - candidate_limit=candidate_limit, - ) - collected.extend(db_entries) - failures.extend(db_failures) - except Exception as e: - failures.extend(f"{table_ctx['display_name']}: {e}" for table_ctx in db_contexts) - - return collected, failures - - -def _load_search_contexts_from_db(conn, db_path, names): - tables = conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'" - ).fetchall() - - table_to_username = {} - try: - for (user_name,) in conn.execute("SELECT user_name FROM Name2Id").fetchall(): - if not user_name: - continue - table_hash = hashlib.md5(user_name.encode()).hexdigest() - table_to_username[f"Msg_{table_hash}"] = user_name - except sqlite3.Error: - pass - - contexts = [] - for (table_name,) in tables: - username = table_to_username.get(table_name, '') - display_name = names.get(username, username) if username else table_name - contexts.append({ - 'query': display_name, - 'username': username, - 'display_name': display_name, - 'db_path': db_path, - 'table_name': table_name, - 'is_group': '@chatroom' in username, - }) - return contexts - - -def _collect_search_entries(conn, contexts, names, keyword, start_ts=None, end_ts=None, candidate_limit=20): - collected = [] - failures = [] - id_to_username = _load_name2id_maps(conn) - batch_size = _message_query_batch_size(candidate_limit) - - for ctx in contexts: - try: - fetch_offset = 0 - collected_before_table = len(collected) - # 全局分页只需要每个分表最新的 offset+limit 条有效命中,无需把整表命中读进内存。 - while len(collected) - collected_before_table < candidate_limit: - rows = _query_messages( - conn, - ctx['table_name'], - start_ts=start_ts, - end_ts=end_ts, - keyword=keyword, - limit=batch_size, - offset=fetch_offset, - ) - if not rows: - break - fetch_offset += len(rows) - - for row in rows: - formatted = _build_search_entry(row, ctx, names, id_to_username) - if formatted: - collected.append(formatted) - if len(collected) - collected_before_table >= candidate_limit: - break - - if len(rows) < batch_size: - break - except Exception as e: - failures.append(f"{ctx['display_name']}: {e}") - - return collected, failures - - -def _page_search_entries(entries, limit, offset): - return _page_ranked_entries(entries, limit, offset) - - -def _search_single_chat(ctx, keyword, start_ts, end_ts, start_time, end_time, limit, offset): - names = get_contact_names() - candidate_limit = _candidate_page_size(limit, offset) - - entries, failures = _collect_chat_search_entries( - ctx, - names, - keyword, - start_ts=start_ts, - end_ts=end_ts, - candidate_limit=candidate_limit, - ) - - paged = _page_search_entries(entries, limit, offset) - - if not paged: - if failures: - return "查询失败: " + ";".join(failures) - return f"未在 {ctx['display_name']} 中找到包含 \"{keyword}\" 的消息" - - header = f"在 {ctx['display_name']} 中搜索 \"{keyword}\" 找到 {len(paged)} 条结果(offset={offset}, limit={limit})" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if failures: - header += "\n查询失败: " + ";".join(failures) - return header + ":\n\n" + "\n\n".join(item[1] for item in paged) - - -def _search_multiple_chats(chat_names, keyword, start_ts, end_ts, start_time, end_time, limit, offset): - try: - resolved_contexts, unresolved, missing_tables = _resolve_chat_contexts(chat_names) - except ValueError as e: - return f"错误: {e}" - - if not resolved_contexts: - details = [] - if unresolved: - details.append("未找到联系人: " + "、".join(unresolved)) - if missing_tables: - details.append("无消息表: " + "、".join(missing_tables)) - suffix = f"\n{chr(10).join(details)}" if details else "" - return f"错误: 没有可查询的聊天对象{suffix}" - - names = get_contact_names() - candidate_limit = _candidate_page_size(limit, offset) - collected = [] - failures = [] - for ctx in resolved_contexts: - chat_entries, chat_failures = _collect_chat_search_entries( - ctx, - names, - keyword, - start_ts=start_ts, - end_ts=end_ts, - candidate_limit=candidate_limit, - ) - collected.extend(chat_entries) - failures.extend(chat_failures) - - paged = _page_search_entries(collected, limit, offset) - - notes = [] - if unresolved: - notes.append("未找到联系人: " + "、".join(unresolved)) - if missing_tables: - notes.append("无消息表: " + "、".join(missing_tables)) - if failures: - notes.append("查询失败: " + ";".join(failures)) - - if not paged: - header = f"在 {len(resolved_contexts)} 个聊天对象中未找到包含 \"{keyword}\" 的消息" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if notes: - header += "\n" + "\n".join(notes) - return header - - header = ( - f"在 {len(resolved_contexts)} 个聊天对象中搜索 \"{keyword}\" 找到 {len(paged)} 条结果" - f"(offset={offset}, limit={limit})" - ) - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if notes: - header += "\n" + "\n".join(notes) - return header + ":\n\n" + "\n\n".join(item[1] for item in paged) - - -def _search_all_messages(keyword, start_ts, end_ts, start_time, end_time, limit, offset): - names = get_contact_names() - collected = [] - failures = [] - candidate_limit = _candidate_page_size(limit, offset) - - for rel_key in MSG_DB_KEYS: - path = _cache.get(rel_key) - if not path: - continue - - try: - with closing(sqlite3.connect(path)) as conn: - contexts = _load_search_contexts_from_db(conn, path, names) - db_entries, db_failures = _collect_search_entries( - conn, - contexts, - names, - keyword, - start_ts=start_ts, - end_ts=end_ts, - candidate_limit=candidate_limit, - ) - collected.extend(db_entries) - failures.extend(db_failures) - except Exception as e: - failures.append(f"{rel_key}: {e}") - - paged = _page_search_entries(collected, limit, offset) - - if not paged: - header = f"未找到包含 \"{keyword}\" 的消息" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if failures: - header += "\n查询失败: " + ";".join(failures) - return header - - header = f"搜索 \"{keyword}\" 找到 {len(paged)} 条结果(offset={offset}, limit={limit})" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if failures: - header += "\n查询失败: " + ";".join(failures) - return header + ":\n\n" + "\n\n".join(item[1] for item in paged) + entry += f" {text}" + return create_time, entry + + +def _build_history_line(row, ctx, names, id_to_username): + local_id, local_type, create_time, real_sender_id, content, ct = row + time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M') + content = _decompress_content(content, ct) + if content is None: + content = '(无法解压)' + + sender, text = _format_message_text( + local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names + ) + + sender_label = _resolve_sender_label( + real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username + ) + if sender_label: + return create_time, f'[{time_str}] {sender_label}: {text}' + return create_time, f'[{time_str}] {text}' + + +def _get_chat_message_tables(ctx): + if ctx.get('message_tables'): + return ctx['message_tables'] + if ctx.get('db_path') and ctx.get('table_name'): + return [{'db_path': ctx['db_path'], 'table_name': ctx['table_name']}] + return [] + + +def _iter_table_contexts(ctx): + for table in _get_chat_message_tables(ctx): + yield { + 'query': ctx['query'], + 'username': ctx['username'], + 'display_name': ctx['display_name'], + 'db_path': table['db_path'], + 'table_name': table['table_name'], + 'is_group': ctx['is_group'], + } + + +def _candidate_page_size(limit, offset): + return limit + offset + + +def _message_query_batch_size(candidate_limit): + return candidate_limit + + +def _history_query_batch_size(candidate_limit): + return min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE) + + +def _page_ranked_entries(entries, limit, offset): + ordered = sorted(entries, key=lambda item: item[0], reverse=True) + paged = ordered[offset:offset + limit] + paged.sort(key=lambda item: item[0]) + return paged + + +def _collect_chat_history_lines(ctx, names, start_ts=None, end_ts=None, limit=20, offset=0): + collected = [] + failures = [] + candidate_limit = _candidate_page_size(limit, offset) + batch_size = _history_query_batch_size(candidate_limit) + + for table_ctx in _iter_table_contexts(ctx): + try: + with closing(sqlite3.connect(table_ctx['db_path'])) as conn: + id_to_username = _load_name2id_maps(conn) + fetch_offset = 0 + collected_before_table = len(collected) + # 当前页上的消息一定落在各分表最近的 offset+limit 条记录内。 + while len(collected) - collected_before_table < candidate_limit: + rows = _query_messages( + conn, + table_ctx['table_name'], + start_ts=start_ts, + end_ts=end_ts, + limit=batch_size, + offset=fetch_offset, + ) + if not rows: + break + fetch_offset += len(rows) + + for row in rows: + try: + collected.append(_build_history_line(row, table_ctx, names, id_to_username)) + except Exception as e: + failures.append( + f"{table_ctx['display_name']} local_id={row[0]} create_time={row[2]}: {e}" + ) + if len(collected) - collected_before_table >= candidate_limit: + break + + if len(rows) < batch_size: + break + except Exception as e: + failures.append(f"{table_ctx['db_path']}: {e}") + + paged = _page_ranked_entries(collected, limit, offset) + return [line for _, line in paged], failures + + +def _collect_chat_search_entries(ctx, names, keyword, start_ts=None, end_ts=None, candidate_limit=20): + collected = [] + failures = [] + contexts_by_db = {} + for table_ctx in _iter_table_contexts(ctx): + contexts_by_db.setdefault(table_ctx['db_path'], []).append(table_ctx) + + for db_path, db_contexts in contexts_by_db.items(): + try: + with closing(sqlite3.connect(db_path)) as conn: + db_entries, db_failures = _collect_search_entries( + conn, + db_contexts, + names, + keyword, + start_ts=start_ts, + end_ts=end_ts, + candidate_limit=candidate_limit, + ) + collected.extend(db_entries) + failures.extend(db_failures) + except Exception as e: + failures.extend(f"{table_ctx['display_name']}: {e}" for table_ctx in db_contexts) + + return collected, failures + + +def _load_search_contexts_from_db(conn, db_path, names): + tables = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'" + ).fetchall() + + table_to_username = {} + try: + for (user_name,) in conn.execute("SELECT user_name FROM Name2Id").fetchall(): + if not user_name: + continue + table_hash = hashlib.md5(user_name.encode()).hexdigest() + table_to_username[f"Msg_{table_hash}"] = user_name + except sqlite3.Error: + pass + + contexts = [] + for (table_name,) in tables: + username = table_to_username.get(table_name, '') + display_name = names.get(username, username) if username else table_name + contexts.append({ + 'query': display_name, + 'username': username, + 'display_name': display_name, + 'db_path': db_path, + 'table_name': table_name, + 'is_group': '@chatroom' in username, + }) + return contexts + + +def _collect_search_entries(conn, contexts, names, keyword, start_ts=None, end_ts=None, candidate_limit=20): + collected = [] + failures = [] + id_to_username = _load_name2id_maps(conn) + batch_size = _message_query_batch_size(candidate_limit) + + for ctx in contexts: + try: + fetch_offset = 0 + collected_before_table = len(collected) + # 全局分页只需要每个分表最新的 offset+limit 条有效命中,无需把整表命中读进内存。 + while len(collected) - collected_before_table < candidate_limit: + rows = _query_messages( + conn, + ctx['table_name'], + start_ts=start_ts, + end_ts=end_ts, + keyword=keyword, + limit=batch_size, + offset=fetch_offset, + ) + if not rows: + break + fetch_offset += len(rows) + + for row in rows: + formatted = _build_search_entry(row, ctx, names, id_to_username) + if formatted: + collected.append(formatted) + if len(collected) - collected_before_table >= candidate_limit: + break + + if len(rows) < batch_size: + break + except Exception as e: + failures.append(f"{ctx['display_name']}: {e}") + + return collected, failures + + +def _page_search_entries(entries, limit, offset): + return _page_ranked_entries(entries, limit, offset) + + +def _search_single_chat(ctx, keyword, start_ts, end_ts, start_time, end_time, limit, offset): + names = get_contact_names() + candidate_limit = _candidate_page_size(limit, offset) + + entries, failures = _collect_chat_search_entries( + ctx, + names, + keyword, + start_ts=start_ts, + end_ts=end_ts, + candidate_limit=candidate_limit, + ) + + paged = _page_search_entries(entries, limit, offset) + + if not paged: + if failures: + return "查询失败: " + ";".join(failures) + return f"未在 {ctx['display_name']} 中找到包含 \"{keyword}\" 的消息" + + header = f"在 {ctx['display_name']} 中搜索 \"{keyword}\" 找到 {len(paged)} 条结果(offset={offset}, limit={limit})" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + return header + ":\n\n" + "\n\n".join(item[1] for item in paged) + + +def _search_multiple_chats(chat_names, keyword, start_ts, end_ts, start_time, end_time, limit, offset): + try: + resolved_contexts, unresolved, missing_tables = _resolve_chat_contexts(chat_names) + except ValueError as e: + return f"错误: {e}" + + if not resolved_contexts: + details = [] + if unresolved: + details.append("未找到联系人: " + "、".join(unresolved)) + if missing_tables: + details.append("无消息表: " + "、".join(missing_tables)) + suffix = f"\n{chr(10).join(details)}" if details else "" + return f"错误: 没有可查询的聊天对象{suffix}" + + names = get_contact_names() + candidate_limit = _candidate_page_size(limit, offset) + collected = [] + failures = [] + for ctx in resolved_contexts: + chat_entries, chat_failures = _collect_chat_search_entries( + ctx, + names, + keyword, + start_ts=start_ts, + end_ts=end_ts, + candidate_limit=candidate_limit, + ) + collected.extend(chat_entries) + failures.extend(chat_failures) + + paged = _page_search_entries(collected, limit, offset) + + notes = [] + if unresolved: + notes.append("未找到联系人: " + "、".join(unresolved)) + if missing_tables: + notes.append("无消息表: " + "、".join(missing_tables)) + if failures: + notes.append("查询失败: " + ";".join(failures)) + + if not paged: + header = f"在 {len(resolved_contexts)} 个聊天对象中未找到包含 \"{keyword}\" 的消息" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if notes: + header += "\n" + "\n".join(notes) + return header + + header = ( + f"在 {len(resolved_contexts)} 个聊天对象中搜索 \"{keyword}\" 找到 {len(paged)} 条结果" + f"(offset={offset}, limit={limit})" + ) + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if notes: + header += "\n" + "\n".join(notes) + return header + ":\n\n" + "\n\n".join(item[1] for item in paged) + + +def _search_all_messages(keyword, start_ts, end_ts, start_time, end_time, limit, offset): + names = get_contact_names() + collected = [] + failures = [] + candidate_limit = _candidate_page_size(limit, offset) + + for rel_key in MSG_DB_KEYS: + path = _cache.get(rel_key) + if not path: + continue + + try: + with closing(sqlite3.connect(path)) as conn: + contexts = _load_search_contexts_from_db(conn, path, names) + db_entries, db_failures = _collect_search_entries( + conn, + contexts, + names, + keyword, + start_ts=start_ts, + end_ts=end_ts, + candidate_limit=candidate_limit, + ) + collected.extend(db_entries) + failures.extend(db_failures) + except Exception as e: + failures.append(f"{rel_key}: {e}") + + paged = _page_search_entries(collected, limit, offset) + + if not paged: + header = f"未找到包含 \"{keyword}\" 的消息" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + return header + + header = f"搜索 \"{keyword}\" 找到 {len(paged)} 条结果(offset={offset}, limit={limit})" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + return header + ":\n\n" + "\n\n".join(item[1] for item in paged) # ============ MCP Server ============ @@ -1179,7 +1290,7 @@ _last_check_state = {} # {username: last_timestamp} @mcp.tool() -def get_recent_sessions(limit: int = 20) -> str: +def get_recent_sessions(limit: int = 20) -> str: """获取微信最近会话列表,包含最新消息摘要、未读数、时间等。 用于了解最近有哪些人/群在聊天。 @@ -1191,15 +1302,15 @@ def get_recent_sessions(limit: int = 20) -> str: return "错误: 无法解密 session.db" names = get_contact_names() - with closing(sqlite3.connect(path)) as conn: - rows = conn.execute(""" - SELECT username, unread_count, summary, last_timestamp, - last_msg_type, last_msg_sender, last_sender_display_name - FROM SessionTable - WHERE last_timestamp > 0 - ORDER BY last_timestamp DESC - LIMIT ? - """, (limit,)).fetchall() + with closing(sqlite3.connect(path)) as conn: + rows = conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable + WHERE last_timestamp > 0 + ORDER BY last_timestamp DESC + LIMIT ? + """, (limit,)).fetchall() results = [] for r in rows: @@ -1237,21 +1348,21 @@ def get_recent_sessions(limit: int = 20) -> str: @mcp.tool() -def get_chat_history(chat_name: str, limit: int = 50, offset: int = 0, start_time: str = "", end_time: str = "") -> str: - """获取指定聊天的消息记录。 - - Args: - chat_name: 聊天对象的名字、备注名或wxid,自动模糊匹配 - limit: 返回的消息数量,默认50;支持较大的值,建议配合 offset 分页使用 - offset: 分页偏移量,默认0 - start_time: 起始时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS - end_time: 结束时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS - """ - try: - _validate_pagination(limit, offset, limit_max=None) - start_ts, end_ts = _parse_time_range(start_time, end_time) - except ValueError as e: - return f"错误: {e}" +def get_chat_history(chat_name: str, limit: int = 50, offset: int = 0, start_time: str = "", end_time: str = "") -> str: + """获取指定聊天的消息记录。 + + Args: + chat_name: 聊天对象的名字、备注名或wxid,自动模糊匹配 + limit: 返回的消息数量,默认50;支持较大的值,建议配合 offset 分页使用 + offset: 分页偏移量,默认0 + start_time: 起始时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS + end_time: 结束时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS + """ + try: + _validate_pagination(limit, offset, limit_max=None) + start_ts, end_ts = _parse_time_range(start_time, end_time) + except ValueError as e: + return f"错误: {e}" ctx = _resolve_chat_context(chat_name) if not ctx: @@ -1259,102 +1370,102 @@ def get_chat_history(chat_name: str, limit: int = 50, offset: int = 0, start_tim if not ctx['db_path']: return f"找不到 {ctx['display_name']} 的消息记录(可能在未解密的DB中或无消息)" - names = get_contact_names() - lines, failures = _collect_chat_history_lines( - ctx, - names, - start_ts=start_ts, - end_ts=end_ts, - limit=limit, - offset=offset, - ) - - if not lines: - if failures: - return "查询失败: " + ";".join(failures) - return f"{ctx['display_name']} 无消息记录" - - header = f"{ctx['display_name']} 的消息记录(返回 {len(lines)} 条,offset={offset}, limit={limit})" - if ctx['is_group']: - header += " [群聊]" - if start_time or end_time: - header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" - if failures: - header += "\n查询失败: " + ";".join(failures) - return header + ":\n\n" + "\n".join(lines) + names = get_contact_names() + lines, failures = _collect_chat_history_lines( + ctx, + names, + start_ts=start_ts, + end_ts=end_ts, + limit=limit, + offset=offset, + ) + + if not lines: + if failures: + return "查询失败: " + ";".join(failures) + return f"{ctx['display_name']} 无消息记录" + + header = f"{ctx['display_name']} 的消息记录(返回 {len(lines)} 条,offset={offset}, limit={limit})" + if ctx['is_group']: + header += " [群聊]" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + return header + ":\n\n" + "\n".join(lines) -@mcp.tool() -def search_messages( - keyword: str, - chat_name: str | list[str] | None = None, - start_time: str = "", - end_time: str = "", - limit: int = 20, - offset: int = 0, -) -> str: - """搜索消息内容,支持全库、单个聊天对象、多个聊天对象,以及时间范围和分页。 - - Args: - keyword: 搜索关键词 - chat_name: 聊天对象名称,可为空、单个字符串或字符串列表 - start_time: 起始时间,可为空 - end_time: 结束时间,可为空 - limit: 返回的结果数量,默认20,最大500 - offset: 分页偏移量,默认0 - """ - if not keyword or len(keyword) < 1: - return "请提供搜索关键词" - - chat_names = _normalize_chat_names(chat_name) - - try: - _validate_pagination(limit, offset) - start_ts, end_ts = _parse_time_range(start_time, end_time) - except ValueError as e: - return f"错误: {e}" - - if len(chat_names) == 1: - ctx = _resolve_chat_context(chat_names[0]) - if not ctx: - return f"找不到聊天对象: {chat_names[0]}\n提示: 可以用 get_contacts(query='{chat_names[0]}') 搜索联系人" - if not ctx['db_path']: - return f"找不到 {ctx['display_name']} 的消息记录(可能在未解密的DB中或无消息)" - return _search_single_chat( - ctx, - keyword, - start_ts, - end_ts, - start_time, - end_time, - limit, - offset, - ) - - if len(chat_names) > 1: - return _search_multiple_chats( - chat_names, - keyword, - start_ts, - end_ts, - start_time, - end_time, - limit, - offset, - ) - - return _search_all_messages( - keyword, - start_ts, - end_ts, - start_time, - end_time, - limit, - offset, - ) +@mcp.tool() +def search_messages( + keyword: str, + chat_name: str | list[str] | None = None, + start_time: str = "", + end_time: str = "", + limit: int = 20, + offset: int = 0, +) -> str: + """搜索消息内容,支持全库、单个聊天对象、多个聊天对象,以及时间范围和分页。 -@mcp.tool() -def get_contacts(query: str = "", limit: int = 50) -> str: + Args: + keyword: 搜索关键词 + chat_name: 聊天对象名称,可为空、单个字符串或字符串列表 + start_time: 起始时间,可为空 + end_time: 结束时间,可为空 + limit: 返回的结果数量,默认20,最大500 + offset: 分页偏移量,默认0 + """ + if not keyword or len(keyword) < 1: + return "请提供搜索关键词" + + chat_names = _normalize_chat_names(chat_name) + + try: + _validate_pagination(limit, offset) + start_ts, end_ts = _parse_time_range(start_time, end_time) + except ValueError as e: + return f"错误: {e}" + + if len(chat_names) == 1: + ctx = _resolve_chat_context(chat_names[0]) + if not ctx: + return f"找不到聊天对象: {chat_names[0]}\n提示: 可以用 get_contacts(query='{chat_names[0]}') 搜索联系人" + if not ctx['db_path']: + return f"找不到 {ctx['display_name']} 的消息记录(可能在未解密的DB中或无消息)" + return _search_single_chat( + ctx, + keyword, + start_ts, + end_ts, + start_time, + end_time, + limit, + offset, + ) + + if len(chat_names) > 1: + return _search_multiple_chats( + chat_names, + keyword, + start_ts, + end_ts, + start_time, + end_time, + limit, + offset, + ) + + return _search_all_messages( + keyword, + start_ts, + end_ts, + start_time, + end_time, + limit, + offset, + ) + +@mcp.tool() +def get_contacts(query: str = "", limit: int = 50) -> str: """搜索或列出微信联系人。 Args: @@ -1397,7 +1508,65 @@ def get_contacts(query: str = "", limit: int = 50) -> str: @mcp.tool() -def get_new_messages() -> str: +def get_contact_tags() -> str: + """列出所有微信联系人标签及成员数量。""" + tags = _load_contact_tags() + if not tags: + return "未找到标签数据(contact_label 表可能不存在)" + + sorted_tags = sorted(tags.values(), key=lambda t: t['sort_order']) + total_assoc = sum(len(t['members']) for t in sorted_tags) + + lines = [f"共 {len(sorted_tags)} 个标签,{total_assoc} 个关联:\n"] + for t in sorted_tags: + lines.append(f" [{t['name']}] {len(t['members'])}人") + return "\n".join(lines) + + +@mcp.tool() +def get_tag_members(tag_name: str) -> str: + """获取指定标签下的所有联系人。支持模糊匹配标签名。 + + Args: + tag_name: 标签名称,支持精确和模糊匹配 + """ + tags = _load_contact_tags() + if not tags: + return "未找到标签数据(contact_label 表可能不存在)" + + q = tag_name.strip().lower() + + # 精确匹配 + exact = [t for t in tags.values() if t['name'].lower() == q] + if exact: + matched = exact[0] + else: + # 模糊匹配 (contains) + fuzzy = [t for t in tags.values() if q in t['name'].lower()] + if not fuzzy: + all_names = [t['name'] for t in sorted(tags.values(), key=lambda t: t['sort_order'])] + return f"未找到匹配 \"{tag_name}\" 的标签。\n\n现有标签: {', '.join(all_names)}" + if len(fuzzy) == 1: + matched = fuzzy[0] + else: + names = [t['name'] for t in fuzzy] + return f"找到 {len(fuzzy)} 个匹配的标签,请指定:\n" + "\n".join(f" [{n}]" for n in names) + + members = matched['members'] + if not members: + return f"标签 [{matched['name']}] 没有成员" + + lines = [f"标签 [{matched['name']}] 共 {len(members)} 人:\n"] + for m in members: + line = m['username'] + if m['display_name'] != m['username']: + line += f" {m['display_name']}" + lines.append(f" {line}") + return "\n".join(lines) + + +@mcp.tool() +def get_new_messages() -> str: """获取自上次调用以来的新消息。首次调用返回最近的会话状态。""" global _last_check_state @@ -1406,14 +1575,14 @@ def get_new_messages() -> str: return "错误: 无法解密 session.db" names = get_contact_names() - with closing(sqlite3.connect(path)) as conn: - rows = conn.execute(""" - SELECT username, unread_count, summary, last_timestamp, - last_msg_type, last_msg_sender, last_sender_display_name - FROM SessionTable - WHERE last_timestamp > 0 - ORDER BY last_timestamp DESC - """).fetchall() + with closing(sqlite3.connect(path)) as conn: + rows = conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable + WHERE last_timestamp > 0 + ORDER BY last_timestamp DESC + """).fetchall() curr_state = {} for r in rows: diff --git a/monitor_web.py b/monitor_web.py index 41859e6..e4d6397 100644 --- a/monitor_web.py +++ b/monitor_web.py @@ -447,6 +447,96 @@ def load_contact_names(): return names +def _extract_pb_field_30(data): + """从 extra_buffer (protobuf) 中提取 Field #30 的字符串值(联系人标签ID)""" + if not data: + return None + pos = 0 + n = len(data) + while pos < n: + tag = 0 + shift = 0 + while pos < n: + b = data[pos]; pos += 1 + tag |= (b & 0x7f) << shift + if not (b & 0x80): + break + shift += 7 + field_num = tag >> 3 + wire_type = tag & 0x07 + if wire_type == 0: + while pos < n and data[pos] & 0x80: + pos += 1 + pos += 1 + elif wire_type == 2: + length = 0; shift = 0 + while pos < n: + b = data[pos]; pos += 1 + length |= (b & 0x7f) << shift + if not (b & 0x80): + break + shift += 7 + if field_num == 30: + try: + return data[pos:pos + length].decode('utf-8') + except Exception: + return None + pos += length + elif wire_type == 1: + pos += 8 + elif wire_type == 5: + pos += 4 + else: + break + return None + + +def load_contact_tags(): + """加载联系人标签及其成员""" + try: + conn = sqlite3.connect(CONTACT_CACHE) + try: + label_rows = conn.execute( + "SELECT label_id_, label_name_, sort_order_ FROM contact_label ORDER BY sort_order_" + ).fetchall() + except Exception: + conn.close() + return [] + if not label_rows: + conn.close() + return [] + + labels = {} + for lid, lname, sort_order in label_rows: + labels[lid] = {'id': lid, 'name': lname, 'sort_order': sort_order, 'members': []} + + names = load_contact_names() + rows = conn.execute( + "SELECT username, extra_buffer FROM contact WHERE extra_buffer IS NOT NULL" + ).fetchall() + conn.close() + + for username, buf in rows: + label_str = _extract_pb_field_30(buf) + if not label_str: + continue + display = names.get(username, username) + for lid_s in label_str.split(','): + try: + lid = int(lid_s.strip()) + except (ValueError, AttributeError): + continue + if lid in labels: + labels[lid]['members'].append({'username': username, 'display_name': display}) + + result = sorted(labels.values(), key=lambda t: t['sort_order']) + for t in result: + t['member_count'] = len(t['members']) + return result + except Exception: + return [] + + def format_msg_type(t): return { 1: '文本', 3: '图片', 34: '语音', 42: '名片', @@ -1849,6 +1939,20 @@ class Handler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(data) + elif self.path.startswith('/api/tags'): + parsed = urllib.parse.urlparse(self.path) + params = urllib.parse.parse_qs(parsed.query) + name_filter = params.get('name', [''])[0].strip().lower() + + tags = load_contact_tags() + if name_filter: + tags = [t for t in tags if name_filter in t['name'].lower()] + + self.send_response(200) + self.send_header('Content-Type', 'application/json; charset=utf-8') + self.end_headers() + self.wfile.write(json.dumps(tags, ensure_ascii=False).encode('utf-8')) + elif self.path == '/stream': self.send_response(200) self.send_header('Content-Type', 'text/event-stream')