feat: 新增联系人标签查询功能

解析 contact.db 的 contact_label 表和 extra_buffer protobuf Field #30,
支持查询标签列表及指定标签下的成员。

- mcp_server.py: 新增 get_contact_tags / get_tag_members MCP 工具
- monitor_web.py: 新增 /api/tags JSON 端点,支持 ?name= 过滤

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feat/daemon-cli
ylytdeng 2026-04-06 09:54:21 +08:00
parent b80e7d1c14
commit 7eb29b03e8
2 changed files with 888 additions and 615 deletions

View File

@ -220,6 +220,7 @@ atexit.register(_cache.cleanup)
_contact_names = None # {username: display_name} _contact_names = None # {username: display_name}
_contact_full = None # [{username, nick_name, remark}] _contact_full = None # [{username, nick_name, remark}]
_contact_tags = None # {label_id: {name, sort_order, members: [{username, display_name}]}}
_self_username = None _self_username = None
_XML_UNSAFE_RE = re.compile(r'<!DOCTYPE|<!ENTITY', re.IGNORECASE) _XML_UNSAFE_RE = re.compile(r'<!DOCTYPE|<!ENTITY', re.IGNORECASE)
_XML_PARSE_MAX_LEN = 20000 _XML_PARSE_MAX_LEN = 20000
@ -275,6 +276,116 @@ def get_contact_full():
return _contact_full or [] return _contact_full or []
def _get_contact_db_path():
"""获取 contact.db 路径(优先已解密,其次实时解密)"""
pre = os.path.join(DECRYPTED_DIR, "contact", "contact.db")
if os.path.exists(pre):
return pre
return _cache.get(os.path.join("contact", "contact.db"))
def _extract_pb_field_30(data):
"""从 extra_buffer (protobuf) 中提取 Field #30 的字符串值联系人标签ID"""
if not data:
return None
pos = 0
n = len(data)
while pos < n:
# 读 varint tag
tag = 0
shift = 0
while pos < n:
b = data[pos]; pos += 1
tag |= (b & 0x7f) << shift
if not (b & 0x80):
break
shift += 7
field_num = tag >> 3
wire_type = tag & 0x07
if wire_type == 0: # varint
while pos < n and data[pos] & 0x80:
pos += 1
pos += 1
elif wire_type == 2: # length-delimited
length = 0; shift = 0
while pos < n:
b = data[pos]; pos += 1
length |= (b & 0x7f) << shift
if not (b & 0x80):
break
shift += 7
if field_num == 30:
try:
return data[pos:pos + length].decode('utf-8')
except Exception:
return None
pos += length
elif wire_type == 1: # 64-bit
pos += 8
elif wire_type == 5: # 32-bit
pos += 4
else:
break
return None
def _load_contact_tags():
"""加载并缓存联系人标签数据"""
global _contact_tags
if _contact_tags is not None:
return _contact_tags
db_path = _get_contact_db_path()
if not db_path:
return {}
try:
conn = sqlite3.connect(db_path)
except Exception:
return {}
try:
# 1. 加载标签定义
try:
label_rows = conn.execute(
"SELECT label_id_, label_name_, sort_order_ FROM contact_label ORDER BY sort_order_"
).fetchall()
except sqlite3.OperationalError:
return {}
if not label_rows:
return {}
labels = {}
for lid, lname, sort_order in label_rows:
labels[lid] = {'name': lname, 'sort_order': sort_order, 'members': []}
# 2. 扫描联系人的标签关联
names = get_contact_names()
rows = conn.execute(
"SELECT username, extra_buffer FROM contact WHERE extra_buffer IS NOT NULL"
).fetchall()
for username, buf in rows:
label_str = _extract_pb_field_30(buf)
if not label_str:
continue
display = names.get(username, username)
for lid_s in label_str.split(','):
try:
lid = int(lid_s.strip())
except (ValueError, AttributeError):
continue
if lid in labels:
labels[lid]['members'].append({'username': username, 'display_name': display})
_contact_tags = labels
return _contact_tags
except Exception:
return {}
finally:
conn.close()
# ============ 辅助函数 ============ # ============ 辅助函数 ============
def format_msg_type(t): def format_msg_type(t):
@ -1396,6 +1507,64 @@ def get_contacts(query: str = "", limit: int = 50) -> str:
return header + ":\n\n" + "\n".join(lines) return header + ":\n\n" + "\n".join(lines)
@mcp.tool()
def get_contact_tags() -> str:
"""列出所有微信联系人标签及成员数量。"""
tags = _load_contact_tags()
if not tags:
return "未找到标签数据contact_label 表可能不存在)"
sorted_tags = sorted(tags.values(), key=lambda t: t['sort_order'])
total_assoc = sum(len(t['members']) for t in sorted_tags)
lines = [f"{len(sorted_tags)} 个标签,{total_assoc} 个关联:\n"]
for t in sorted_tags:
lines.append(f" [{t['name']}] {len(t['members'])}")
return "\n".join(lines)
@mcp.tool()
def get_tag_members(tag_name: str) -> str:
"""获取指定标签下的所有联系人。支持模糊匹配标签名。
Args:
tag_name: 标签名称支持精确和模糊匹配
"""
tags = _load_contact_tags()
if not tags:
return "未找到标签数据contact_label 表可能不存在)"
q = tag_name.strip().lower()
# 精确匹配
exact = [t for t in tags.values() if t['name'].lower() == q]
if exact:
matched = exact[0]
else:
# 模糊匹配 (contains)
fuzzy = [t for t in tags.values() if q in t['name'].lower()]
if not fuzzy:
all_names = [t['name'] for t in sorted(tags.values(), key=lambda t: t['sort_order'])]
return f"未找到匹配 \"{tag_name}\" 的标签。\n\n现有标签: {', '.join(all_names)}"
if len(fuzzy) == 1:
matched = fuzzy[0]
else:
names = [t['name'] for t in fuzzy]
return f"找到 {len(fuzzy)} 个匹配的标签,请指定:\n" + "\n".join(f" [{n}]" for n in names)
members = matched['members']
if not members:
return f"标签 [{matched['name']}] 没有成员"
lines = [f"标签 [{matched['name']}] 共 {len(members)} 人:\n"]
for m in members:
line = m['username']
if m['display_name'] != m['username']:
line += f" {m['display_name']}"
lines.append(f" {line}")
return "\n".join(lines)
@mcp.tool() @mcp.tool()
def get_new_messages() -> str: def get_new_messages() -> str:
"""获取自上次调用以来的新消息。首次调用返回最近的会话状态。""" """获取自上次调用以来的新消息。首次调用返回最近的会话状态。"""

View File

@ -447,6 +447,96 @@ def load_contact_names():
return names return names
def _extract_pb_field_30(data):
"""从 extra_buffer (protobuf) 中提取 Field #30 的字符串值联系人标签ID"""
if not data:
return None
pos = 0
n = len(data)
while pos < n:
tag = 0
shift = 0
while pos < n:
b = data[pos]; pos += 1
tag |= (b & 0x7f) << shift
if not (b & 0x80):
break
shift += 7
field_num = tag >> 3
wire_type = tag & 0x07
if wire_type == 0:
while pos < n and data[pos] & 0x80:
pos += 1
pos += 1
elif wire_type == 2:
length = 0; shift = 0
while pos < n:
b = data[pos]; pos += 1
length |= (b & 0x7f) << shift
if not (b & 0x80):
break
shift += 7
if field_num == 30:
try:
return data[pos:pos + length].decode('utf-8')
except Exception:
return None
pos += length
elif wire_type == 1:
pos += 8
elif wire_type == 5:
pos += 4
else:
break
return None
def load_contact_tags():
"""加载联系人标签及其成员"""
try:
conn = sqlite3.connect(CONTACT_CACHE)
try:
label_rows = conn.execute(
"SELECT label_id_, label_name_, sort_order_ FROM contact_label ORDER BY sort_order_"
).fetchall()
except Exception:
conn.close()
return []
if not label_rows:
conn.close()
return []
labels = {}
for lid, lname, sort_order in label_rows:
labels[lid] = {'id': lid, 'name': lname, 'sort_order': sort_order, 'members': []}
names = load_contact_names()
rows = conn.execute(
"SELECT username, extra_buffer FROM contact WHERE extra_buffer IS NOT NULL"
).fetchall()
conn.close()
for username, buf in rows:
label_str = _extract_pb_field_30(buf)
if not label_str:
continue
display = names.get(username, username)
for lid_s in label_str.split(','):
try:
lid = int(lid_s.strip())
except (ValueError, AttributeError):
continue
if lid in labels:
labels[lid]['members'].append({'username': username, 'display_name': display})
result = sorted(labels.values(), key=lambda t: t['sort_order'])
for t in result:
t['member_count'] = len(t['members'])
return result
except Exception:
return []
def format_msg_type(t): def format_msg_type(t):
return { return {
1: '文本', 3: '图片', 34: '语音', 42: '名片', 1: '文本', 3: '图片', 34: '语音', 42: '名片',
@ -1849,6 +1939,20 @@ class Handler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
self.wfile.write(data) self.wfile.write(data)
elif self.path.startswith('/api/tags'):
parsed = urllib.parse.urlparse(self.path)
params = urllib.parse.parse_qs(parsed.query)
name_filter = params.get('name', [''])[0].strip().lower()
tags = load_contact_tags()
if name_filter:
tags = [t for t in tags if name_filter in t['name'].lower()]
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(tags, ensure_ascii=False).encode('utf-8'))
elif self.path == '/stream': elif self.path == '/stream':
self.send_response(200) self.send_response(200)
self.send_header('Content-Type', 'text/event-stream') self.send_header('Content-Type', 'text/event-stream')