wx-cli/src/daemon/query.rs

4218 lines
152 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

use anyhow::{Context, Result};
use chrono::{Local, TimeZone, Timelike};
use regex::Regex;
use roxmltree::{Document, Node};
use rusqlite::Connection;
use serde_json::{json, Value};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, OnceLock};
use super::cache::DbCache;
/// 静态编译的 Msg 表名正则,避免在热路径中重复编译
fn msg_table_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"^Msg_[0-9a-f]{32}$").unwrap())
}
/// 判定会话类型。返回值固定为 `group` / `official_account` / `folded` / `private` 之一。
///
/// 判据次序:
/// 1. `@chatroom` / 折叠入口特殊 username
/// 2. `contact.verify_flag` 非 0 —— 覆盖所有被微信官方打了认证标的账号,
/// 包括 username 为 `wxid_*` 但实为公众号的情况(如"人物"
/// 以及品牌服务号 `cmb4008205555`、系统号 `qqsafe` / `mphelper` 等
/// 3. username 前缀兜底(`gh_*` / `biz_*` / `@*` 等)—— 在 contact 表未加载或没记录时
/// 仍能给出正确结果
pub fn chat_type_of(username: &str, names: &Names) -> &'static str {
if username.contains("@chatroom") {
return "group";
}
if username == "brandsessionholder" || username == "@placeholder_foldgroup" {
return "folded";
}
if names.is_verified(username) {
return "official_account";
}
if username.starts_with("gh_") || username.starts_with("biz_") {
return "official_account";
}
// `@` 开头的剩余 username如 `@opencustomerservicemsg`)是微信内部系统账号,
// 通常不落在 contact 表里verify_flag 兜不住,按前缀兜底。
if username.starts_with('@') {
return "official_account";
}
"private"
}
/// 联系人名称缓存
#[derive(Clone)]
pub struct Names {
/// username -> display_name
pub map: HashMap<String, String>,
/// md5(username) -> username用于从 Msg_<md5> 表名反推联系人)
pub md5_to_uname: HashMap<String, String>,
/// 消息 DB 的相对路径列表message/message_N.db
pub msg_db_keys: Vec<String>,
/// username -> contact.verify_flag0=真人,非 0 通常为公众号/服务号/认证账号)
pub verify_flags: HashMap<String, i64>,
}
impl Names {
pub fn display(&self, username: &str) -> String {
self.map.get(username).cloned().unwrap_or_else(|| username.to_string())
}
/// 是否被微信官方标了认证/服务号 flag。未在 contact 表中的 username 返回 false。
pub fn is_verified(&self, username: &str) -> bool {
self.verify_flags.get(username).copied().unwrap_or(0) != 0
}
}
/// 加载联系人缓存(从 contact/contact.db
pub async fn load_names(db: &DbCache) -> Result<Names> {
let path = db.get("contact/contact.db").await?;
let mut map = HashMap::new();
let mut verify_flags: HashMap<String, i64> = HashMap::new();
if let Some(p) = path {
let p2 = p.clone();
let rows: Vec<(String, String, String, i64)> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&p2).context("打开 contact.db 失败")?;
let mut stmt = conn.prepare(
"SELECT username, nick_name, remark, verify_flag FROM contact"
)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1).unwrap_or_default(),
row.get::<_, String>(2).unwrap_or_default(),
row.get::<_, i64>(3).unwrap_or(0),
))
})?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok::<_, anyhow::Error>(rows)
}).await??;
for (uname, nick, remark, vf) in rows {
let display = if !remark.is_empty() { remark }
else if !nick.is_empty() { nick }
else { uname.clone() };
verify_flags.insert(uname.clone(), vf);
map.insert(uname, display);
}
}
let md5_to_uname: HashMap<String, String> = map.keys()
.map(|u| (format!("{:x}", md5::compute(u.as_bytes())), u.clone()))
.collect();
Ok(Names { map, md5_to_uname, msg_db_keys: Vec::new(), verify_flags })
}
/// 查询最近会话列表
pub async fn q_sessions(db: &DbCache, names: &Names, limit: usize) -> Result<Value> {
let path = db.get("session/session.db").await?
.context("无法解密 session.db")?;
let path2 = path.clone();
let limit_val = limit;
let rows: Vec<(String, i64, Vec<u8>, i64, i64, String, String)> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path2)?;
let mut stmt = conn.prepare(
"SELECT username, unread_count, summary, last_timestamp,
last_msg_type, last_msg_sender, last_sender_display_name
FROM SessionTable
WHERE last_timestamp > 0
ORDER BY last_timestamp DESC LIMIT ?"
)?;
let rows = stmt.query_map([limit_val as i64], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, i64>(1).unwrap_or(0),
get_content_bytes(row, 2),
row.get::<_, i64>(3).unwrap_or(0),
row.get::<_, i64>(4).unwrap_or(0),
row.get::<_, String>(5).unwrap_or_default(),
row.get::<_, String>(6).unwrap_or_default(),
))
})?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok::<_, anyhow::Error>(rows)
}).await??;
let mut results = Vec::new();
let mut group_nickname_cache: HashMap<String, HashMap<String, String>> = HashMap::new();
for (username, unread, summary_bytes, ts, msg_type, sender, sender_name) in rows {
let display = names.display(&username);
let chat_type = chat_type_of(&username, names);
let is_group = chat_type == "group";
// 尝试 zstd 解压 summary
let summary = decompress_or_str(&summary_bytes);
let summary = strip_group_prefix(&summary);
let sender_display = if is_group && !sender.is_empty() {
if !group_nickname_cache.contains_key(&username) {
let nicknames = load_group_nicknames(db, &username).await.unwrap_or_default();
group_nickname_cache.insert(username.clone(), nicknames);
}
let empty = HashMap::new();
let group_nicknames = group_nickname_cache.get(&username).unwrap_or(&empty);
sender_display(&sender, &sender_name, &names.map, group_nicknames)
} else {
String::new()
};
results.push(json!({
"chat": display,
"username": username,
"is_group": is_group,
"chat_type": chat_type,
"unread": unread,
"last_msg_type": fmt_type(msg_type),
"last_sender": sender_display,
"summary": summary,
"timestamp": ts,
"time": fmt_time(ts, "%m-%d %H:%M"),
}));
}
Ok(json!({ "sessions": results }))
}
/// 查询聊天记录
pub async fn q_history(
db: &DbCache,
names: &Names,
chat: &str,
limit: usize,
offset: usize,
since: Option<i64>,
until: Option<i64>,
msg_type: Option<i64>,
) -> Result<Value> {
let username = resolve_username(chat, names)
.with_context(|| format!("找不到联系人: {}", chat))?;
let display = names.display(&username);
let chat_type = chat_type_of(&username, names);
let is_group = chat_type == "group";
let tables = find_msg_tables(db, names, &username).await?;
if tables.is_empty() {
anyhow::bail!("找不到 {} 的消息记录", display);
}
let mut all_msgs: Vec<Value> = Vec::new();
let group_nicknames = if is_group {
load_group_nicknames(db, &username).await.unwrap_or_default()
} else {
HashMap::new()
};
for (db_path, table_name) in &tables {
let path = db_path.clone();
let tname = table_name.clone();
let uname = username.clone();
let is_group2 = is_group;
let names_map = names.map.clone();
let group_nicknames2 = group_nicknames.clone();
let since2 = since;
let until2 = until;
let limit2 = limit;
let offset2 = offset;
let msgs: Vec<Value> = tokio::task::spawn_blocking(move || {
// per-DB 软上限offset + limit 已足够全局分页,避免大群全量加载
let per_db_cap = offset2 + limit2;
query_messages(&path, &tname, &uname, is_group2, &names_map, &group_nicknames2, since2, until2, msg_type, per_db_cap, 0)
}).await??;
all_msgs.extend(msgs);
}
all_msgs.sort_by_key(|m| std::cmp::Reverse(m["timestamp"].as_i64().unwrap_or(0)));
let paged: Vec<Value> = all_msgs.into_iter().skip(offset).take(limit).collect();
let mut paged = paged;
paged.sort_by_key(|m| m["timestamp"].as_i64().unwrap_or(0));
Ok(json!({
"chat": display,
"username": username,
"is_group": is_group,
"chat_type": chat_type,
"count": paged.len(),
"messages": paged,
}))
}
/// 搜索消息
pub async fn q_search(
db: &DbCache,
names: &Names,
keyword: &str,
chats: Option<Vec<String>>,
limit: usize,
since: Option<i64>,
until: Option<i64>,
msg_type: Option<i64>,
) -> Result<Value> {
let mut targets: Vec<(String, String, String, String)> = Vec::new(); // (path, table, display, uname)
if let Some(chat_names) = chats {
for chat_name in &chat_names {
if let Some(uname) = resolve_username(chat_name, names) {
let tables = find_msg_tables(db, names, &uname).await?;
for (p, t) in tables {
targets.push((p.to_string_lossy().into_owned(), t, names.display(&uname), uname.clone()));
}
}
}
} else {
// 全局搜索:遍历所有消息 DB
for rel_key in &names.msg_db_keys {
let path = match db.get(rel_key).await? {
Some(p) => p,
None => continue,
};
let path2 = path.clone();
let md5_lookup = names.md5_to_uname.clone();
let names_map = names.map.clone();
let table_targets: Vec<(String, String, String, String)> = match tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path2)?;
let mut stmt = conn.prepare(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'"
)?;
let table_names: Vec<String> = stmt.query_map([], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
let re = msg_table_re();
let mut result = Vec::new();
for tname in table_names {
if !re.is_match(&tname) {
continue;
}
let hash = &tname[4..];
let uname = md5_lookup.get(hash).cloned().unwrap_or_default();
let display = if uname.is_empty() {
String::new()
} else {
names_map.get(&uname).cloned().unwrap_or_else(|| uname.clone())
};
result.push((
path2.to_string_lossy().into_owned(),
tname,
display,
uname,
));
}
Ok::<_, anyhow::Error>(result)
}).await {
Ok(Ok(v)) => v,
Ok(Err(e)) => { eprintln!("[search] skip DB {}: {}", rel_key, e); continue; }
Err(e) => { eprintln!("[search] task error {}: {}", rel_key, e); continue; }
};
targets.extend(table_targets);
}
}
// 按 db_path 分组
let mut by_path: HashMap<String, Vec<(String, String, String)>> = HashMap::new();
for (p, t, d, u) in targets {
by_path.entry(p).or_default().push((t, d, u));
}
let mut group_usernames = HashSet::new();
for table_list in by_path.values() {
for (_, _, uname) in table_list {
if uname.contains("@chatroom") {
group_usernames.insert(uname.clone());
}
}
}
let group_nicknames_by_chat = load_group_nickname_maps(db, group_usernames)
.await
.unwrap_or_default();
let group_nicknames_by_chat = Arc::new(group_nicknames_by_chat);
// 多个 message_*.db 之间没有数据依赖,并发解密 + 查询。每个 DB 内部仍按
// table 串行(共享同一 sqlite Connection 不能跨线程移动)。原版本是 N 个 DB
// 串行 await活跃账号上 N 个分片要轮 N 次磁盘 IO现在 JoinSet 把它们一次
// 全部 dispatch 到 blocking pool整体 latency 退化为单 DB 慢路径。
let kw = keyword.to_string();
let mut join_set: tokio::task::JoinSet<Result<Vec<Value>>> = tokio::task::JoinSet::new();
for (db_path, table_list) in by_path {
let kw2 = kw.clone();
let since2 = since;
let until2 = until;
let limit2 = limit * 3;
let names_map2 = names.map.clone();
let group_nicknames_by_chat2 = Arc::clone(&group_nicknames_by_chat);
let db_path_for_log = db_path.clone();
join_set.spawn_blocking(move || {
let conn = Connection::open(&db_path)?;
let mut all = Vec::new();
let empty_group_nicknames = HashMap::new();
for (tname, display, uname) in &table_list {
let is_group = uname.contains("@chatroom");
let group_nicknames = group_nicknames_by_chat2
.get(uname)
.unwrap_or(&empty_group_nicknames);
match search_in_table(&conn, tname, &uname, is_group,
&names_map2, group_nicknames, &kw2, since2, until2, msg_type, limit2)
{
Ok(rows) => {
for mut row in rows {
if row.get("chat").map(|v| v.as_str().unwrap_or("")).unwrap_or("").is_empty() {
if let Some(obj) = row.as_object_mut() {
obj.insert("chat".into(), serde_json::Value::String(
if display.is_empty() { tname.clone() } else { display.clone() }
));
}
}
all.push(row);
}
}
Err(e) => eprintln!("[search] skip table {} (db={}): {}", tname, db_path_for_log, e),
}
}
Ok(all)
});
}
let mut results: Vec<Value> = Vec::new();
while let Some(joined) = join_set.join_next().await {
match joined {
Ok(Ok(rows)) => results.extend(rows),
Ok(Err(e)) => eprintln!("[search] skip DB: {}", e),
Err(e) => eprintln!("[search] task error: {}", e),
}
}
results.sort_by_key(|r| std::cmp::Reverse(r["timestamp"].as_i64().unwrap_or(0)));
let paged: Vec<Value> = results.into_iter().take(limit).collect();
Ok(json!({ "keyword": keyword, "count": paged.len(), "results": paged }))
}
/// 查询联系人
///
/// 只返回真实联系人(`chat_type_of == "private"`)。`names.map` 是从 `contact` 表
/// 全量加载的,里面同时包含群(`@chatroom`)、公众号(`gh_*` / `biz_*` / verify_flag != 0
/// 折叠入口(`brandsessionholder` / `@placeholder_foldgroup`)以及微信内部 `@xxx` 系统账号。
/// 这些都不应该出现在 `wx contacts` 输出里,统一走 `chat_type_of` 这条同样的真相判定。
pub async fn q_contacts(names: &Names, query: Option<&str>, limit: usize) -> Result<Value> {
let mut contacts: Vec<Value> = names.map.iter()
.filter(|(u, _)| chat_type_of(u, names) == "private")
.map(|(u, d)| json!({ "username": u, "display": d }))
.collect();
if let Some(q) = query {
let low = q.to_lowercase();
contacts.retain(|c| {
c["display"].as_str().map(|s| s.to_lowercase().contains(&low)).unwrap_or(false)
|| c["username"].as_str().map(|s| s.to_lowercase().contains(&low)).unwrap_or(false)
});
}
contacts.sort_by(|a, b| {
a["display"].as_str().unwrap_or("").cmp(b["display"].as_str().unwrap_or(""))
});
let total = contacts.len();
contacts.truncate(limit);
Ok(json!({ "contacts": contacts, "total": total }))
}
// ─── 内部辅助函数 ────────────────────────────────────────────────────────────
fn resolve_username(chat_name: &str, names: &Names) -> Option<String> {
if names.map.contains_key(chat_name)
|| chat_name.contains("@chatroom")
|| chat_name.starts_with("wxid_")
{
return Some(chat_name.to_string());
}
let low = chat_name.to_lowercase();
// 精确匹配显示名:排序后取第一个,保证确定性
let mut exact: Vec<&String> = names.map.iter()
.filter(|(_, display)| display.to_lowercase() == low)
.map(|(uname, _)| uname)
.collect();
exact.sort();
if let Some(u) = exact.into_iter().next() {
return Some(u.clone());
}
// 模糊匹配:取 display name 最短的(最精确),相同长度取字典序最小
let mut candidates: Vec<(&String, &String)> = names.map.iter()
.filter(|(_, display)| display.to_lowercase().contains(&low))
.collect();
candidates.sort_by_key(|(uname, display)| (display.len(), uname.as_str()));
candidates.into_iter().next().map(|(uname, _)| uname.clone())
}
async fn find_msg_tables(
db: &DbCache,
names: &Names,
username: &str,
) -> Result<Vec<(std::path::PathBuf, String)>> {
let table_name = format!("Msg_{:x}", md5::compute(username.as_bytes()));
if !msg_table_re().is_match(&table_name) {
return Ok(Vec::new());
}
let mut results: Vec<(i64, std::path::PathBuf, String)> = Vec::new();
for rel_key in &names.msg_db_keys {
let path = match db.get(rel_key).await? {
Some(p) => p,
None => continue,
};
let tname = table_name.clone();
let path2 = path.clone();
let max_ts: Option<i64> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path2)?;
let table_exists: Option<i64> = conn.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?",
[&tname],
|row| row.get(0),
).ok().flatten();
if table_exists.is_none() {
return Ok::<_, anyhow::Error>(None);
}
let ts: Option<i64> = conn.query_row(
&format!("SELECT MAX(create_time) FROM [{}]", tname),
[],
|row| row.get(0),
).ok().flatten();
Ok(ts)
}).await??;
if let Some(ts) = max_ts {
results.push((ts, path.clone(), table_name.clone()));
}
}
// 按最大时间戳降序排列(最新的优先)
results.sort_by_key(|(ts, _, _)| std::cmp::Reverse(*ts));
Ok(results.into_iter().map(|(_, p, t)| (p, t)).collect())
}
fn query_messages(
db_path: &std::path::Path,
table: &str,
chat_username: &str,
is_group: bool,
names_map: &HashMap<String, String>,
group_nicknames: &HashMap<String, String>,
since: Option<i64>,
until: Option<i64>,
msg_type: Option<i64>,
limit: usize,
offset: usize,
) -> Result<Vec<Value>> {
let conn = Connection::open(db_path)?;
let id2u = load_id2u(&conn);
let mut clauses: Vec<String> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
if let Some(s) = since {
clauses.push("create_time >= ?".into());
params.push(Box::new(s));
}
if let Some(u) = until {
clauses.push("create_time <= ?".into());
params.push(Box::new(u));
}
if let Some(t) = msg_type {
push_msg_type_filter(&mut clauses, &mut params, t);
}
let where_clause = if clauses.is_empty() {
String::new()
} else {
format!("WHERE {}", clauses.join(" AND "))
};
let sql = format!(
"SELECT local_id, local_type, create_time, real_sender_id,
message_content, WCDB_CT_message_content
FROM [{}] {} ORDER BY create_time DESC LIMIT ? OFFSET ?",
table, where_clause
);
params.push(Box::new(limit as i64));
params.push(Box::new(offset as i64));
let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(params_ref.as_slice(), |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
get_content_bytes(row, 4),
row.get::<_, i64>(5).unwrap_or(0),
))
})?
.filter_map(|r| r.ok())
.collect::<Vec<_>>();
let mut result = Vec::new();
for (local_id, local_type, ts, real_sender_id, content_bytes, ct) in rows {
let content = decompress_message(&content_bytes, ct);
let sender = sender_label(real_sender_id, &content, is_group, chat_username, &id2u, names_map, group_nicknames);
let text = fmt_content(local_id, local_type, &content, is_group);
let url = appmsg_url_for_message(local_type, &content);
let mut msg = json!({
"timestamp": ts,
"time": fmt_time(ts, "%Y-%m-%d %H:%M"),
"sender": sender,
"content": text,
"type": fmt_type(local_type),
"local_id": local_id,
});
if let Some(u) = url {
msg["url"] = serde_json::Value::String(u);
}
result.push(msg);
}
Ok(result)
}
fn search_in_table(
conn: &Connection,
table: &str,
chat_username: &str,
is_group: bool,
names_map: &HashMap<String, String>,
group_nicknames: &HashMap<String, String>,
keyword: &str,
since: Option<i64>,
until: Option<i64>,
msg_type: Option<i64>,
limit: usize,
) -> Result<Vec<Value>> {
let id2u = load_id2u(conn);
// 转义 LIKE 通配符,使用 '\' 作为 ESCAPE 字符
let escaped_kw = keyword.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_");
let search_decoded_content = msg_type == Some(49);
let keyword_lower = keyword.to_lowercase();
let mut clauses: Vec<String> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
if !search_decoded_content {
clauses.push("message_content LIKE ? ESCAPE '\\'".to_string());
params.push(Box::new(format!("%{}%", escaped_kw)));
}
if let Some(s) = since {
clauses.push("create_time >= ?".into());
params.push(Box::new(s));
}
if let Some(u) = until {
clauses.push("create_time <= ?".into());
params.push(Box::new(u));
}
if let Some(t) = msg_type {
push_msg_type_filter(&mut clauses, &mut params, t);
}
let where_clause = if clauses.is_empty() {
String::new()
} else {
format!("WHERE {}", clauses.join(" AND "))
};
let limit_clause = if search_decoded_content { "" } else { " LIMIT ?" };
let sql = format!(
"SELECT local_id, local_type, create_time, real_sender_id,
message_content, WCDB_CT_message_content
FROM [{}] {} ORDER BY create_time DESC{}",
table, where_clause, limit_clause
);
if !search_decoded_content {
params.push(Box::new(limit as i64));
}
let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(params_ref.as_slice(), |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
get_content_bytes(row, 4),
row.get::<_, i64>(5).unwrap_or(0),
))
})?
.filter_map(|r| r.ok())
.collect::<Vec<_>>();
let mut result = Vec::new();
for (local_id, local_type, ts, real_sender_id, content_bytes, ct) in rows {
let content = decompress_message(&content_bytes, ct);
let sender = sender_label(real_sender_id, &content, is_group, chat_username, &id2u, names_map, group_nicknames);
let text = fmt_content(local_id, local_type, &content, is_group);
if search_decoded_content && !matches_search_text(&content, &text, keyword, &keyword_lower) {
continue;
}
let url = appmsg_url_for_message(local_type, &content);
let mut msg = json!({
"timestamp": ts,
"time": fmt_time(ts, "%Y-%m-%d %H:%M"),
"chat": "",
"sender": sender,
"content": text,
"type": fmt_type(local_type),
});
if let Some(u) = url {
msg["url"] = serde_json::Value::String(u);
}
result.push(msg);
if search_decoded_content && result.len() >= limit {
break;
}
}
Ok(result)
}
fn push_msg_type_filter(
clauses: &mut Vec<String>,
params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
msg_type: i64,
) {
clauses.push("(local_type & 4294967295) = ?".into());
params.push(Box::new(msg_type));
}
fn matches_search_text(raw: &str, formatted: &str, keyword: &str, keyword_lower: &str) -> bool {
contains_search_text(raw, keyword, keyword_lower)
|| contains_search_text(formatted, keyword, keyword_lower)
}
fn contains_search_text(haystack: &str, keyword: &str, keyword_lower: &str) -> bool {
haystack.contains(keyword)
|| (!keyword_lower.is_empty() && haystack.to_lowercase().contains(keyword_lower))
}
fn load_id2u(conn: &Connection) -> HashMap<i64, String> {
let mut map = HashMap::new();
if let Ok(mut stmt) = conn.prepare("SELECT rowid, user_name FROM Name2Id") {
let _ = stmt.query_map([], |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
}).map(|rows| {
for r in rows.flatten() {
map.insert(r.0, r.1);
}
});
}
map
}
async fn load_group_nicknames(
db: &DbCache,
chat_username: &str,
) -> Result<HashMap<String, String>> {
if !chat_username.contains("@chatroom") {
return Ok(HashMap::new());
}
let Some(contact_p) = db.get("contact/contact.db").await? else {
return Ok(HashMap::new());
};
let chat = chat_username.to_string();
tokio::task::spawn_blocking(move || {
let conn = Connection::open(&contact_p)?;
Ok::<_, anyhow::Error>(load_group_nickname_map_from_conn(&conn, &chat, None))
}).await?
}
async fn load_group_nickname_maps(
db: &DbCache,
chat_usernames: HashSet<String>,
) -> Result<HashMap<String, HashMap<String, String>>> {
if chat_usernames.is_empty() {
return Ok(HashMap::new());
}
let Some(contact_p) = db.get("contact/contact.db").await? else {
return Ok(HashMap::new());
};
tokio::task::spawn_blocking(move || {
let conn = Connection::open(&contact_p)?;
let mut out = HashMap::new();
for chat in chat_usernames {
let nicknames = load_group_nickname_map_from_conn(&conn, &chat, None);
if !nicknames.is_empty() {
out.insert(chat, nicknames);
}
}
Ok::<_, anyhow::Error>(out)
}).await?
}
fn load_group_nickname_map_from_conn(
conn: &Connection,
chat_username: &str,
targets: Option<&HashSet<String>>,
) -> HashMap<String, String> {
if !chat_username.contains("@chatroom") {
return HashMap::new();
}
let ext = load_group_ext_buffer(conn, chat_username);
let owned_targets = if targets.is_none() {
load_group_member_username_set(conn, chat_username)
} else {
None
};
let targets = targets.or(owned_targets.as_ref());
ext.as_deref()
.map(|buf| parse_group_nickname_map(buf, targets))
.unwrap_or_default()
}
fn load_group_ext_buffer(
conn: &Connection,
chat_username: &str,
) -> Option<Vec<u8>> {
[
"SELECT ext_buffer FROM chat_room WHERE username = ? LIMIT 1",
"SELECT ext_buffer FROM chat_room WHERE chat_room_name = ? LIMIT 1",
"SELECT ext_buffer FROM chat_room WHERE name = ? LIMIT 1",
].iter().find_map(|sql| {
conn.query_row(sql, [chat_username], |row| row.get::<_, Option<Vec<u8>>>(0))
.ok()
.flatten()
})
}
fn load_group_member_username_set(
conn: &Connection,
chat_username: &str,
) -> Option<HashSet<String>> {
let room_id: i64 = [
"SELECT id FROM chat_room WHERE username = ?",
"SELECT id FROM chat_room WHERE chat_room_name = ?",
"SELECT id FROM chat_room WHERE name = ?",
].iter().find_map(|sql| {
conn.query_row(sql, [chat_username], |row| row.get::<_, i64>(0)).ok()
}).unwrap_or(0);
if room_id == 0 {
return None;
}
let mut stmt = conn.prepare(
"SELECT c.username
FROM chatroom_member cm
LEFT JOIN contact c ON c.id = cm.member_id
WHERE cm.room_id = ?"
).ok()?;
let usernames: HashSet<String> = stmt.query_map([room_id], |row| {
row.get::<_, String>(0)
}).ok()?
.filter_map(|r| r.ok())
.filter(|uid| !uid.is_empty())
.collect();
if usernames.is_empty() { None } else { Some(usernames) }
}
fn decode_proto_varint(raw: &[u8], offset: usize) -> Option<(u64, usize)> {
let mut value = 0u64;
let mut shift = 0u32;
let mut pos = offset;
while pos < raw.len() {
let byte = raw[pos];
pos += 1;
value |= u64::from(byte & 0x7f) << shift;
if byte & 0x80 == 0 {
return Some((value, pos));
}
shift += 7;
if shift > 63 {
return None;
}
}
None
}
fn proto_len_fields<'a>(raw: &'a [u8]) -> Vec<(u64, &'a [u8])> {
let mut fields = Vec::new();
let mut idx = 0usize;
while idx < raw.len() {
let Some((tag, next)) = decode_proto_varint(raw, idx) else { break; };
if next <= idx { break; }
idx = next;
let field_no = tag >> 3;
let wire_type = tag & 0x07;
match wire_type {
0 => {
let Some((_, next)) = decode_proto_varint(raw, idx) else { break; };
if next <= idx { break; }
idx = next;
}
1 => {
let Some(next) = idx.checked_add(8) else { break; };
if next > raw.len() { break; }
idx = next;
}
2 => {
let Some((size, next)) = decode_proto_varint(raw, idx) else { break; };
if next <= idx { break; }
idx = next;
let Ok(size) = usize::try_from(size) else { break; };
let Some(end) = idx.checked_add(size) else { break; };
if end > raw.len() { break; }
fields.push((field_no, &raw[idx..end]));
idx = end;
}
5 => {
let Some(next) = idx.checked_add(4) else { break; };
if next > raw.len() { break; }
idx = next;
}
_ => break,
}
}
fields
}
fn proto_string_fields(raw: &[u8]) -> Vec<(u64, String)> {
proto_len_fields(raw)
.into_iter()
.filter_map(|(field_no, value)| {
if value.is_empty() || value.len() > 256 {
return None;
}
let text = std::str::from_utf8(value).ok()?.trim().to_string();
if text.is_empty() || text.chars().any(char::is_control) {
return None;
}
Some((field_no, text))
})
.collect()
}
fn is_strong_username_hint(value: &str) -> bool {
value.starts_with("wxid_")
|| value.ends_with("@chatroom")
|| value.starts_with("gh_")
|| value.contains('@')
}
fn looks_like_username(value: &str) -> bool {
let value = value.trim();
if value.is_empty() {
return false;
}
if is_strong_username_hint(value) {
return true;
}
if value.len() < 6 || value.len() > 32 || value.chars().any(char::is_whitespace) {
return false;
}
let mut chars = value.chars();
let Some(first) = chars.next() else { return false; };
first.is_ascii_alphabetic()
&& chars.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
}
fn pick_member_username(
strings: &[(u64, String)],
targets: Option<&HashSet<String>>,
) -> Option<String> {
if let Some(targets) = targets {
return strings
.iter()
.find(|(_, value)| targets.contains(value))
.map(|(_, value)| value.clone());
}
for field_no in [1u64, 4u64] {
if let Some((_, value)) = strings
.iter()
.find(|(f, value)| *f == field_no && looks_like_username(value))
{
return Some(value.clone());
}
}
strings
.iter()
.find(|(_, value)| is_strong_username_hint(value))
.or_else(|| strings.iter().find(|(_, value)| looks_like_username(value)))
.map(|(_, value)| value.clone())
}
fn pick_group_nickname(strings: &[(u64, String)], username: &str) -> Option<String> {
let mut best_score = i64::MIN;
let mut best = String::new();
for (idx, (field_no, value)) in strings.iter().enumerate() {
let value = value.trim();
if value.is_empty()
|| value == username
|| is_strong_username_hint(value)
|| value.contains('\n')
|| value.contains('\r')
|| value.len() > 64
{
continue;
}
let mut score = 0i64;
if *field_no == 2 {
score += 100;
}
if !looks_like_username(value) {
score += 20;
}
score += (32usize.saturating_sub(value.len())) as i64;
score = score * 1000 - idx as i64;
if score > best_score {
best_score = score;
best = value.to_string();
}
}
if best.is_empty() { None } else { Some(best) }
}
fn parse_group_nickname_map(
ext_buffer: &[u8],
targets: Option<&HashSet<String>>,
) -> HashMap<String, String> {
let mut out = HashMap::new();
if ext_buffer.is_empty() {
return out;
}
for (_, chunk) in proto_len_fields(ext_buffer) {
let strings = proto_string_fields(chunk);
if strings.is_empty() {
continue;
}
let Some(username) = pick_member_username(&strings, targets) else {
continue;
};
if out.contains_key(&username) {
continue;
}
if let Some(nickname) = pick_group_nickname(&strings, &username) {
out.insert(username, nickname);
}
}
out
}
fn contact_display(
uid: &str,
nick: &str,
remark: &str,
names_map: &HashMap<String, String>,
) -> String {
if !remark.is_empty() {
remark.to_string()
} else if !nick.is_empty() {
nick.to_string()
} else {
names_map.get(uid).cloned().unwrap_or_else(|| uid.to_string())
}
}
fn sender_display(
username: &str,
fallback_sender_name: &str,
names: &HashMap<String, String>,
group_nicknames: &HashMap<String, String>,
) -> String {
if username.is_empty() {
return String::new();
}
group_nicknames
.get(username)
.filter(|s| !s.is_empty())
.cloned()
.or_else(|| names.get(username).cloned())
.or_else(|| {
if fallback_sender_name.is_empty() {
None
} else {
Some(fallback_sender_name.to_string())
}
})
.unwrap_or_else(|| username.to_string())
}
fn group_top_senders(
sender_counts: &HashMap<String, i64>,
names: &HashMap<String, String>,
group_nicknames: &HashMap<String, String>,
limit: usize,
) -> Vec<Value> {
let mut top_senders: Vec<Value> = sender_counts.iter()
.map(|(username, count)| json!({
"sender": sender_display(username, "", names, group_nicknames),
"count": count,
}))
.collect();
top_senders.sort_by(|a, b| {
b["count"].as_i64().unwrap_or(0)
.cmp(&a["count"].as_i64().unwrap_or(0))
.then_with(|| {
a["sender"].as_str().unwrap_or("")
.cmp(b["sender"].as_str().unwrap_or(""))
})
});
top_senders.truncate(limit);
top_senders
}
fn sender_label(
real_sender_id: i64,
content: &str,
is_group: bool,
chat_username: &str,
id2u: &HashMap<i64, String>,
names: &HashMap<String, String>,
group_nicknames: &HashMap<String, String>,
) -> String {
let sender_uname = id2u.get(&real_sender_id).cloned().unwrap_or_default();
if is_group {
if !sender_uname.is_empty() && sender_uname != chat_username {
return sender_display(&sender_uname, "", names, group_nicknames);
}
if content.contains(":\n") {
let raw = content.splitn(2, ":\n").next().unwrap_or("");
return sender_display(raw, "", names, group_nicknames);
}
return String::new();
}
if !sender_uname.is_empty() && sender_uname != chat_username {
return names.get(&sender_uname).cloned().unwrap_or(sender_uname);
}
String::new()
}
/// 读取消息内容列(兼容 TEXT 和 BLOB 两种存储类型)
///
/// SQLite 中 message_content 在未压缩时为 TEXTzstd 压缩后为 BLOB。
/// rusqlite 的 Vec<u8> FromSql 只接受 BLOB读 TEXT 会静默返回空。
fn get_content_bytes(row: &rusqlite::Row<'_>, idx: usize) -> Vec<u8> {
// 先尝试 BLOB再 fallback 到 TEXT→bytes
row.get::<_, Vec<u8>>(idx)
.or_else(|_| row.get::<_, String>(idx).map(|s| s.into_bytes()))
.unwrap_or_default()
}
fn decompress_message(data: &[u8], ct: i64) -> String {
if ct == 4 && !data.is_empty() {
// zstd 压缩
if let Ok(dec) = zstd::decode_all(data) {
return String::from_utf8_lossy(&dec).into_owned();
}
}
String::from_utf8_lossy(data).into_owned()
}
fn decompress_or_str(data: &[u8]) -> String {
if data.is_empty() {
return String::new();
}
// 尝试 zstd 解压
if let Ok(dec) = zstd::decode_all(data) {
if let Ok(s) = String::from_utf8(dec) {
return s;
}
}
String::from_utf8_lossy(data).into_owned()
}
fn strip_group_prefix(s: &str) -> String {
if s.contains(":\n") {
s.splitn(2, ":\n").nth(1).unwrap_or(s).to_string()
} else {
s.to_string()
}
}
pub fn fmt_type(t: i64) -> String {
let base = (t as u64 & 0xFFFFFFFF) as i64;
match base {
1 => "文本".into(),
3 => "图片".into(),
34 => "语音".into(),
42 => "名片".into(),
43 => "视频".into(),
47 => "表情".into(),
48 => "位置".into(),
49 => "链接/文件".into(),
50 => "通话".into(),
10000 => "系统".into(),
10002 => "撤回".into(),
_ => format!("type={}", base),
}
}
fn fmt_content(local_id: i64, local_type: i64, content: &str, is_group: bool) -> String {
let base = (local_type as u64 & 0xFFFFFFFF) as i64;
match base {
3 => return format!("[图片] local_id={}", local_id),
34 => return "[语音]".into(),
43 => return "[视频]".into(),
47 => return "[表情]".into(),
50 => return "[通话]".into(),
10000 => return parse_sysmsg(content).unwrap_or_else(|| "[系统消息]".into()),
10002 => return parse_revoke(content).unwrap_or_else(|| "[撤回了一条消息]".into()),
_ => {}
}
let text = if is_group && content.contains(":\n") {
content.splitn(2, ":\n").nth(1).unwrap_or(content)
} else {
content
};
if base == 49 && text.contains("<appmsg") {
if let Some(parsed) = parse_appmsg(text) {
return parsed;
}
}
text.to_string()
}
/// 解析撤回消息 XML提取被撤回的内容摘要
/// `<sysmsg type="revokemsg"><revokemsg><content>...</content></revokemsg></sysmsg>`
fn parse_revoke(xml: &str) -> Option<String> {
let inner = extract_xml_text(xml, "content")?;
// 有时 content 是 "xxx recalled a message" 英文,有时是中文
if inner.is_empty() {
return Some("[撤回了一条消息]".into());
}
// 尝试简化:如果是 XML 格式的撤回内容,直接显示摘要
Some(format!("[撤回] {}", inner
.chars()
.take(30)
.collect::<String>()))
}
/// 解析系统消息 XML群通知等
fn parse_sysmsg(xml: &str) -> Option<String> {
// 常见格式:<sysmsg type="...">...</sysmsg>
// 尝试提取 content 标签
if let Some(s) = extract_xml_text(xml, "content") {
if !s.is_empty() {
return Some(format!("[系统] {}", s.chars().take(50).collect::<String>()));
}
}
// 纯文本系统消息(无 XML
if !xml.starts_with('<') {
return Some(format!("[系统] {}", xml.chars().take(50).collect::<String>()));
}
Some("[系统消息]".into())
}
fn parse_appmsg(text: &str) -> Option<String> {
if let Some(parsed) = parse_appmsg_dom(text) {
return Some(parsed);
}
parse_appmsg_legacy(text)
}
fn parse_appmsg_dom(text: &str) -> Option<String> {
let doc = Document::parse(text).ok()?;
let appmsg = doc.descendants().find(|node| node.has_tag_name("appmsg"))?;
let title = xml_text(xml_child(appmsg, "title")).unwrap_or_default();
let atype = xml_text(xml_child(appmsg, "type")).unwrap_or_default();
match atype.as_str() {
"6" => Some(format_file_appmsg(appmsg, &title)),
"19" => Some(format_record_appmsg(appmsg, &title)),
_ => None,
}
}
fn parse_appmsg_legacy(text: &str) -> Option<String> {
let title = extract_xml_text(text, "title")?;
let atype = extract_xml_text(text, "type").unwrap_or_default();
match atype.as_str() {
"6" => Some(if !title.is_empty() { format!("[文件] {}", title) } else { "[文件]".into() }),
"57" => {
let ref_content = quote_refermsg_content(text)
.or_else(|| extract_xml_text(text, "content").and_then(|s| quote_content_text(&s, 40)))
.unwrap_or_default();
let quote = if !title.is_empty() { format!("[引用] {}", title) } else { "[引用]".into() };
if !ref_content.is_empty() {
Some(format!("{}\n \u{21b3} {}", quote, ref_content))
} else {
Some(quote)
}
}
"33" | "36" | "44" => Some(if !title.is_empty() { format!("[小程序] {}", title) } else { "[小程序]".into() }),
_ => Some(if !title.is_empty() { format!("[链接] {}", title) } else { "[链接/文件]".into() }),
}
}
fn format_file_appmsg<'a, 'input>(appmsg: Node<'a, 'input>, title: &str) -> String {
let mut meta = Vec::new();
if let Some(size) = xml_child(appmsg, "appattach")
.and_then(|attach| xml_text(xml_child(attach, "totallen")))
.and_then(|value| value.parse::<u64>().ok())
.filter(|size| *size > 0)
{
meta.push(format_byte_size(size));
}
if let Some(ext) = xml_child(appmsg, "appattach")
.and_then(|attach| xml_text(xml_child(attach, "fileext")))
.filter(|ext| !ext.is_empty())
{
meta.push(ext);
}
let base = if !title.is_empty() {
format!("[文件] {}", title)
} else {
"[文件]".into()
};
if meta.is_empty() {
base
} else {
format!("{} ({})", base, meta.join(", "))
}
}
fn format_record_appmsg<'a, 'input>(appmsg: Node<'a, 'input>, title: &str) -> String {
let items = record_item_lines(appmsg);
let mut header = if !title.is_empty() {
format!("[合并聊天记录] {}", title)
} else {
"[合并聊天记录]".into()
};
if !items.is_empty() {
header.push_str(&format!(" ({}条)", items.len()));
}
let mut lines = vec![header];
if items.is_empty() {
if let Some(desc) = xml_text(xml_child(appmsg, "des")).filter(|desc| !desc.is_empty()) {
lines.push(format!(" {}", collapse_text(&desc, 120)));
}
} else {
for item in items.iter().take(10) {
lines.push(format!(" - {}", item));
}
if items.len() > 10 {
lines.push(format!(" - ... 还有{}", items.len() - 10));
}
}
lines.join("\n")
}
fn record_item_lines<'a, 'input>(appmsg: Node<'a, 'input>) -> Vec<String> {
let mut lines = record_item_lines_from_node(appmsg);
if !lines.is_empty() {
return lines;
}
let Some(record_xml) = xml_text(xml_child(appmsg, "recorditem")).filter(|value| !value.is_empty()) else {
return Vec::new();
};
let unescaped = unescape_html(&record_xml);
for candidate in [&record_xml, &unescaped] {
if let Ok(doc) = Document::parse(candidate) {
lines = record_item_lines_from_node(doc.root_element());
if !lines.is_empty() {
break;
}
}
}
lines
}
fn record_item_lines_from_node<'a, 'input>(node: Node<'a, 'input>) -> Vec<String> {
node.descendants()
.filter(|child| child.has_tag_name("dataitem"))
.filter_map(format_record_item)
.collect()
}
fn format_record_item<'a, 'input>(item: Node<'a, 'input>) -> Option<String> {
let name = first_child_text(item, &["sourcename", "datasrcname", "sourceusername"]);
let desc = first_child_text(item, &["datadesc", "datatitle", "datafmt"])
.or_else(|| item.attribute("datatype").and_then(record_datatype_label).map(str::to_string))?;
let desc = collapse_text(&desc, 100);
if let Some(name) = name.filter(|value| !value.is_empty()) {
Some(format!("{}: {}", name, desc))
} else {
Some(desc)
}
}
fn first_child_text<'a, 'input>(node: Node<'a, 'input>, tags: &[&str]) -> Option<String> {
tags.iter()
.find_map(|tag| xml_text(xml_child(node, tag)))
.filter(|value| !value.is_empty())
}
fn record_datatype_label(datatype: &str) -> Option<&'static str> {
match datatype {
"1" => Some("[文本]"),
"2" => Some("[图片]"),
"3" => Some("[语音]"),
"4" => Some("[视频]"),
"6" => Some("[文件]"),
"17" => Some("[链接]"),
_ => None,
}
}
fn quote_refermsg_content(text: &str) -> Option<String> {
let refer = extract_xml_text(text, "refermsg")?;
let content = extract_xml_text(&refer, "content")
.and_then(|s| quote_content_text(&s, 80))
.or_else(|| {
extract_xml_text(&refer, "type")
.and_then(|t| quote_refermsg_type_label(&t).map(str::to_string))
})?;
match extract_xml_text(&refer, "displayname") {
Some(name) if !name.is_empty() => Some(format!("{}: {}", name, content)),
_ => Some(content),
}
}
fn quote_content_text(raw: &str, max_chars: usize) -> Option<String> {
let unescaped = unescape_html(raw);
if unescaped.contains("<appmsg") {
if let Some(parsed) = parse_appmsg(&unescaped) {
return Some(parsed);
}
}
let collapsed = collapse_text(&unescaped, max_chars);
if collapsed.is_empty() {
None
} else {
Some(collapsed)
}
}
fn quote_refermsg_type_label(t: &str) -> Option<&'static str> {
match t {
"1" => None,
"3" => Some("[图片]"),
"34" => Some("[语音]"),
"43" => Some("[视频]"),
"47" => Some("[表情]"),
"49" => Some("[链接/文件]"),
_ => None,
}
}
fn collapse_text(text: &str, max_chars: usize) -> String {
let collapsed = text.split_whitespace().collect::<Vec<_>>().join(" ");
if collapsed.chars().count() > max_chars {
format!("{}...", collapsed.chars().take(max_chars).collect::<String>())
} else {
collapsed
}
}
fn format_byte_size(bytes: u64) -> String {
const KB: f64 = 1024.0;
const MB: f64 = KB * 1024.0;
const GB: f64 = MB * 1024.0;
let bytes_f = bytes as f64;
if bytes_f >= GB {
format_decimal_unit(bytes_f / GB, "GB")
} else if bytes_f >= MB {
format_decimal_unit(bytes_f / MB, "MB")
} else if bytes_f >= KB {
format_decimal_unit(bytes_f / KB, "KB")
} else {
format!("{} B", bytes)
}
}
fn format_decimal_unit(value: f64, unit: &str) -> String {
let mut s = format!("{:.1}", value);
if s.ends_with(".0") {
s.truncate(s.len() - 2);
}
format!("{} {}", s, unit)
}
fn extract_xml_text(xml: &str, tag: &str) -> Option<String> {
let open = format!("<{}>", tag);
let close = format!("</{}>", tag);
let start = xml.find(&open)?;
let content_start = start + open.len();
let end = xml[content_start..].find(&close)?;
Some(xml[content_start..content_start + end].trim().to_string())
}
fn appmsg_url_for_message(local_type: i64, content: &str) -> Option<String> {
if (local_type as u64 & 0xFFFFFFFF) != 49 {
return None;
}
extract_appmsg_url(content)
}
fn extract_favorite_url(content: &str) -> Option<String> {
let url = extract_xml_text(content, "link")
.map(|s| unescape_html(strip_xml_cdata(&s)))?;
if url.is_empty() || !(url.starts_with("http://") || url.starts_with("https://")) {
return None;
}
Some(url)
}
fn strip_xml_cdata(s: &str) -> &str {
s.strip_prefix("<![CDATA[")
.and_then(|inner| inner.strip_suffix("]]>"))
.unwrap_or(s)
}
/// 从 appmsg XML 中提取链接 URL优先取 <url>fallback 到 <url1>
fn extract_appmsg_url(text: &str) -> Option<String> {
let xml = strip_group_prefix(text);
if !xml.contains("<appmsg") {
return None;
}
if extract_xml_text(&xml, "type").as_deref() == Some("57") {
return None;
}
let url = extract_xml_text(&xml, "url")
.or_else(|| extract_xml_text(&xml, "url1"))
.map(|s| unescape_html(strip_xml_cdata(&s)))?;
if url.is_empty() || !(url.starts_with("http://") || url.starts_with("https://")) {
return None;
}
Some(url)
}
fn extract_xml_attr(xml: &str, tag: &str, attr: &str) -> Option<String> {
let open = format!("<{}", tag);
let start = xml.find(&open)?;
let tag_end = start + xml[start..].find('>')?;
let attr_pat = format!(r#"{}=""#, attr);
let attr_start = start + xml[start..tag_end].find(&attr_pat)? + attr_pat.len();
let attr_end = attr_start + xml[attr_start..tag_end].find('"')?;
let value = xml[attr_start..attr_end].trim();
if value.is_empty() {
None
} else {
Some(value.to_string())
}
}
fn unescape_html(s: &str) -> String {
s.replace("&lt;", "<")
.replace("&gt;", ">")
.replace("&amp;", "&")
.replace("&quot;", "\"")
.replace("&apos;", "'")
}
#[cfg(test)]
mod appmsg_tests {
use super::*;
#[test]
fn parse_forwarded_chat_record_expands_record_items() {
let xml = r#"
<msg>
<appmsg appid="" sdkver="0">
<title>群聊的聊天记录</title>
<des>张三: 早上好
李四: 收到</des>
<type>19</type>
<recorditem>&lt;recordinfo&gt;&lt;datalist count="2"&gt;&lt;dataitem datatype="1"&gt;&lt;sourcename&gt;张三&lt;/sourcename&gt;&lt;sourcetime&gt;1710000000&lt;/sourcetime&gt;&lt;datadesc&gt;早上好 &amp;amp; coffee&lt;/datadesc&gt;&lt;/dataitem&gt;&lt;dataitem datatype="2"&gt;&lt;sourcename&gt;李四&lt;/sourcename&gt;&lt;sourcetime&gt;1710000060&lt;/sourcetime&gt;&lt;datafmt&gt;图片&lt;/datafmt&gt;&lt;datadesc&gt;[图片]&lt;/datadesc&gt;&lt;/dataitem&gt;&lt;/datalist&gt;&lt;/recordinfo&gt;</recorditem>
</appmsg>
</msg>
"#;
assert_eq!(
parse_appmsg(xml).as_deref(),
Some("[合并聊天记录] 群聊的聊天记录 (2条)\n - 张三: 早上好 & coffee\n - 李四: [图片]")
);
}
#[test]
fn parse_file_appmsg_includes_attachment_metadata() {
let xml = r#"
<msg>
<appmsg appid="" sdkver="0">
<title>report.pdf</title>
<type>6</type>
<appattach>
<totallen>1536</totallen>
<fileext>pdf</fileext>
</appattach>
<md5>abcdef123456</md5>
</appmsg>
</msg>
"#;
assert_eq!(
parse_appmsg(xml).as_deref(),
Some("[文件] report.pdf (1.5 KB, pdf)")
);
}
#[test]
fn parse_quote_appmsg_reads_refermsg_content() {
let xml = r#"
<msg>
<appmsg appid="" sdkver="0">
<title>我也没有用ai啊</title>
<type>57</type>
<content />
<refermsg>
<type>1</type>
<displayname>不再熬夜</displayname>
<content>昨天用 claude 爬小红书数据来着</content>
</refermsg>
</appmsg>
</msg>
"#;
assert_eq!(
parse_appmsg(xml).as_deref(),
Some("[引用] 我也没有用ai啊\n \u{21b3} 不再熬夜: 昨天用 claude 爬小红书数据来着")
);
}
#[test]
fn query_messages_filters_appmsg_by_base_type() {
let path = temp_db_path("query_messages_filters_appmsg_by_base_type");
{
let conn = Connection::open(&path).expect("open temp db");
conn.execute(
"CREATE TABLE Msg_test (
local_id INTEGER,
local_type INTEGER,
create_time INTEGER,
real_sender_id INTEGER,
message_content TEXT,
WCDB_CT_message_content INTEGER
)",
[],
)
.expect("create message table");
conn.execute(
"INSERT INTO Msg_test VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
rusqlite::params![
1_i64,
((57_i64) << 32) | 49_i64,
1775146911_i64,
0_i64,
r#"<msg><appmsg><title>我也没有用ai啊</title><type>57</type><content /><refermsg><displayname>不再熬夜</displayname><content>昨天用 claude 爬小红书数据来着</content></refermsg></appmsg></msg>"#,
0_i64
],
)
.expect("insert quote message");
}
let rows = query_messages(
&path,
"Msg_test",
"wxid_r605h38n08mv22",
false,
&HashMap::new(),
&HashMap::new(),
None,
None,
Some(49),
10,
0,
)
.expect("query messages");
let _ = std::fs::remove_file(&path);
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0]["content"].as_str(),
Some("[引用] 我也没有用ai啊\n \u{21b3} 不再熬夜: 昨天用 claude 爬小红书数据来着")
);
}
#[test]
fn search_in_table_filters_appmsg_by_base_type() {
let conn = Connection::open_in_memory().expect("open in-memory db");
conn.execute(
"CREATE TABLE Msg_test (
local_id INTEGER,
local_type INTEGER,
create_time INTEGER,
real_sender_id INTEGER,
message_content TEXT,
WCDB_CT_message_content INTEGER
)",
[],
)
.expect("create message table");
conn.execute(
"INSERT INTO Msg_test VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
rusqlite::params![
1_i64,
((57_i64) << 32) | 49_i64,
1775146911_i64,
0_i64,
r#"<msg><appmsg><title>我也没有用ai啊</title><type>57</type><content /><refermsg><displayname>不再熬夜</displayname><content>昨天用 claude 爬小红书数据来着</content></refermsg></appmsg></msg>"#,
0_i64
],
)
.expect("insert quote message");
let rows = search_in_table(
&conn,
"Msg_test",
"wxid_r605h38n08mv22",
false,
&HashMap::new(),
&HashMap::new(),
"claude",
None,
None,
Some(49),
10,
)
.expect("search messages");
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0]["content"].as_str(),
Some("[引用] 我也没有用ai啊\n \u{21b3} 不再熬夜: 昨天用 claude 爬小红书数据来着")
);
}
#[test]
fn search_in_table_matches_decompressed_formatted_appmsg_content() {
let conn = Connection::open_in_memory().expect("open in-memory db");
conn.execute(
"CREATE TABLE Msg_test (
local_id INTEGER,
local_type INTEGER,
create_time INTEGER,
real_sender_id INTEGER,
message_content BLOB,
WCDB_CT_message_content INTEGER
)",
[],
)
.expect("create message table");
let xml = r#"<msg><appmsg><title>我也没有用ai啊</title><type>57</type><content /><refermsg><displayname>不再熬夜</displayname><content>昨天用 claude 爬小红书数据来着</content></refermsg></appmsg></msg>"#;
let compressed = zstd::encode_all(xml.as_bytes(), 0).expect("compress appmsg xml");
conn.execute(
"INSERT INTO Msg_test VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
rusqlite::params![
1_i64,
((57_i64) << 32) | 49_i64,
1775146911_i64,
0_i64,
compressed,
4_i64
],
)
.expect("insert compressed quote message");
let rows = search_in_table(
&conn,
"Msg_test",
"wxid_r605h38n08mv22",
false,
&HashMap::new(),
&HashMap::new(),
"claude",
None,
None,
Some(49),
10,
)
.expect("search messages");
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0]["content"].as_str(),
Some("[引用] 我也没有用ai啊\n \u{21b3} 不再熬夜: 昨天用 claude 爬小红书数据来着")
);
}
fn temp_db_path(name: &str) -> std::path::PathBuf {
let unique = format!(
"wx-cli-{}-{}-{}.db",
name,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock before unix epoch")
.as_nanos()
);
std::env::temp_dir().join(unique)
}
}
fn fmt_time(ts: i64, fmt: &str) -> String {
Local.timestamp_opt(ts, 0)
.single()
.map(|dt| dt.format(fmt).to_string())
.unwrap_or_else(|| ts.to_string())
}
// ─── 新增命令查询函数 ──────────────────────────────────────────────────────────
/// 查询有未读消息的会话
///
/// `filter`:按 chat_type 过滤None 或空 Vec 等价于 "all"。
/// 可选值:`private` / `group` / `official` / `folded` / `all`。
/// 多选支持在 CLI 层用逗号分隔后传入多个元素。
pub async fn q_unread(
db: &DbCache,
names: &Names,
limit: usize,
filter: Option<Vec<String>>,
) -> Result<Value> {
let path = db.get("session/session.db").await?
.context("无法解密 session.db")?;
// 归一化 filter小写 + 去除别名。返回 None 代表"不过滤"。
let filter_set: Option<std::collections::HashSet<&'static str>> = filter.and_then(|v| {
let mut set = std::collections::HashSet::new();
for raw in v {
match raw.trim().to_lowercase().as_str() {
"" | "all" => return None,
"private" => { set.insert("private"); }
"group" => { set.insert("group"); }
"official" | "official_account" => { set.insert("official_account"); }
"folded" | "fold" => { set.insert("folded"); }
_ => {} // 未知值忽略,避免拼错导致什么都不返回
}
}
if set.is_empty() { None } else { Some(set) }
});
// 有 filter 时必须全表扫SQL LIMIT 会把想要的公众号先筛掉。
// 无 filter 时保留 LIMIT避免重度用户的大量未读会话拖慢默认路径。
let has_filter = filter_set.is_some();
let limit_val = limit;
let rows: Vec<(String, i64, Vec<u8>, i64, i64, String, String)> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path)?;
let sql = if has_filter {
"SELECT username, unread_count, summary, last_timestamp,
last_msg_type, last_msg_sender, last_sender_display_name
FROM SessionTable WHERE unread_count > 0
ORDER BY last_timestamp DESC"
} else {
"SELECT username, unread_count, summary, last_timestamp,
last_msg_type, last_msg_sender, last_sender_display_name
FROM SessionTable WHERE unread_count > 0
ORDER BY last_timestamp DESC LIMIT ?"
};
let mut stmt = conn.prepare(sql)?;
let map_row = |row: &rusqlite::Row<'_>| Ok((
row.get::<_, String>(0)?,
row.get::<_, i64>(1).unwrap_or(0),
get_content_bytes(row, 2),
row.get::<_, i64>(3).unwrap_or(0),
row.get::<_, i64>(4).unwrap_or(0),
row.get::<_, String>(5).unwrap_or_default(),
row.get::<_, String>(6).unwrap_or_default(),
));
let rows = if has_filter {
stmt.query_map([], map_row)?.collect::<rusqlite::Result<Vec<_>>>()?
} else {
stmt.query_map([limit_val as i64], map_row)?.collect::<rusqlite::Result<Vec<_>>>()?
};
Ok::<_, anyhow::Error>(rows)
}).await??;
let mut results = Vec::new();
let mut group_nickname_cache: HashMap<String, HashMap<String, String>> = HashMap::new();
for (username, unread, summary_bytes, ts, msg_type, sender, sender_name) in rows {
let chat_type = chat_type_of(&username, names);
if let Some(ref set) = filter_set {
if !set.contains(chat_type) { continue; }
}
if results.len() >= limit { break; }
let display = names.display(&username);
let is_group = chat_type == "group";
let summary = decompress_or_str(&summary_bytes);
let summary = strip_group_prefix(&summary);
let sender_display = if is_group && !sender.is_empty() {
if !group_nickname_cache.contains_key(&username) {
let nicknames = load_group_nicknames(db, &username).await.unwrap_or_default();
group_nickname_cache.insert(username.clone(), nicknames);
}
let empty = HashMap::new();
let group_nicknames = group_nickname_cache.get(&username).unwrap_or(&empty);
sender_display(&sender, &sender_name, &names.map, group_nicknames)
} else {
String::new()
};
results.push(json!({
"chat": display,
"username": username,
"is_group": is_group,
"chat_type": chat_type,
"unread": unread,
"last_msg_type": fmt_type(msg_type),
"last_sender": sender_display,
"summary": summary,
"timestamp": ts,
"time": fmt_time(ts, "%m-%d %H:%M"),
}));
}
let total = results.len();
Ok(json!({ "sessions": results, "total": total }))
}
/// 查询群成员:优先从 contact.db 的 chatroom_member/chat_room 表获取完整列表,
/// 若表不存在则退化为从消息记录聚合有发言记录的成员
pub async fn q_members(db: &DbCache, names: &Names, chat: &str) -> Result<Value> {
let username = resolve_username(chat, names)
.with_context(|| format!("找不到联系人: {}", chat))?;
if !username.contains("@chatroom") {
anyhow::bail!("'{}' 不是群聊,无法查看群成员", names.display(&username));
}
let display = names.display(&username);
let names_map = names.map.clone();
// 优先路径contact.db → chatroom_member + chat_room完整成员列表
if let Some(contact_p) = db.get("contact/contact.db").await? {
let uname2 = username.clone();
let names_map2 = names_map.clone();
let members_opt: Option<Vec<Value>> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&contact_p)?;
let has_table: bool = conn.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='chatroom_member'",
[],
|_| Ok(true),
).unwrap_or(false);
if !has_table {
return Ok::<_, anyhow::Error>(None);
}
// 从 chat_room 表获取整数 room_id 和群主
// WeChat 不同版本列名可能不同username / chat_room_name / name
let (room_id, owner): (i64, String) = [
"SELECT id, owner FROM chat_room WHERE username = ?",
"SELECT id, owner FROM chat_room WHERE chat_room_name = ?",
"SELECT id, owner FROM chat_room WHERE name = ?",
].iter().find_map(|sql| {
conn.query_row(sql, [&uname2], |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1).unwrap_or_default()))
}).ok()
}).unwrap_or((0, String::new()));
if room_id == 0 {
return Ok::<_, anyhow::Error>(None);
}
let mut stmt = conn.prepare(
"SELECT c.username, c.nick_name, c.remark
FROM chatroom_member cm
LEFT JOIN contact c ON c.id = cm.member_id
WHERE cm.room_id = ?"
)?;
let raw: Vec<(String, String, String)> = stmt.query_map([room_id], |row| {
Ok((
row.get::<_, String>(0).unwrap_or_default(),
row.get::<_, String>(1).unwrap_or_default(),
row.get::<_, String>(2).unwrap_or_default(),
))
})?
.filter_map(|r| r.ok())
.filter(|(uid, _, _)| !uid.is_empty())
.collect();
if raw.is_empty() {
return Ok(None);
}
let target_usernames: HashSet<String> = raw.iter()
.map(|(uid, _, _)| uid.clone())
.collect();
let group_nicknames = load_group_nickname_map_from_conn(
&conn,
&uname2,
Some(&target_usernames),
);
let mut members: Vec<Value> = raw.iter().map(|(uid, nick, remark)| {
let contact_display = contact_display(uid, nick, remark, &names_map2);
let group_nickname = group_nicknames.get(uid).cloned().unwrap_or_default();
let disp = if group_nickname.is_empty() {
contact_display.clone()
} else {
group_nickname.clone()
};
let is_owner = uid == &owner && !owner.is_empty();
json!({
"username": uid,
"display": disp,
"contact_display": contact_display,
"group_nickname": group_nickname,
"is_owner": is_owner,
})
}).collect();
// 群主排首位,其余按 display 字典序
members.sort_by(|a, b| {
let ao = a["is_owner"].as_bool().unwrap_or(false);
let bo = b["is_owner"].as_bool().unwrap_or(false);
if ao != bo { return bo.cmp(&ao); }
a["display"].as_str().unwrap_or("").cmp(b["display"].as_str().unwrap_or(""))
});
Ok(Some(members))
}).await??;
if let Some(members) = members_opt {
return Ok(json!({
"chat": display,
"username": username,
"count": members.len(),
"members": members,
}));
}
}
// 降级路径:从消息记录中聚合发言过的成员
let tables = find_msg_tables(db, names, &username).await?;
if tables.is_empty() {
return Ok(json!({
"chat": display,
"username": username,
"count": 0,
"members": [],
}));
}
let mut sender_set: std::collections::HashSet<String> = std::collections::HashSet::new();
for (db_path, table_name) in &tables {
let path = db_path.clone();
let tname = table_name.clone();
let uname = username.clone();
let senders: Vec<String> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path)?;
let id2u = load_id2u(&conn);
let mut stmt = conn.prepare(&format!(
"SELECT DISTINCT real_sender_id FROM [{}] WHERE real_sender_id > 0", tname
))?;
let ids: Vec<i64> = stmt.query_map([], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
let senders: Vec<String> = ids.iter()
.filter_map(|id| id2u.get(id))
.filter(|u| *u != &uname)
.cloned()
.collect();
Ok::<_, anyhow::Error>(senders)
}).await??;
sender_set.extend(senders);
}
let group_nicknames = load_group_nicknames(db, &username).await.unwrap_or_default();
let mut members: Vec<Value> = sender_set.iter().map(|u| {
let contact_display = names_map.get(u).cloned().unwrap_or_else(|| u.clone());
let group_nickname = group_nicknames.get(u).cloned().unwrap_or_default();
let display = if group_nickname.is_empty() {
contact_display.clone()
} else {
group_nickname.clone()
};
json!({
"username": u,
"display": display,
"contact_display": contact_display,
"group_nickname": group_nickname,
"is_owner": false,
})
}).collect();
members.sort_by(|a, b| {
a["display"].as_str().unwrap_or("").cmp(b["display"].as_str().unwrap_or(""))
});
Ok(json!({
"chat": display,
"username": username,
"count": members.len(),
"members": members,
}))
}
/// 查询新消息:以 session.db 的 last_timestamp 作为 inbox 索引,
/// 只查询 last_timestamp > state[username] 的会话,精确且高效
pub async fn q_new_messages(
db: &DbCache,
names: &Names,
state: Option<HashMap<String, i64>>,
limit: usize,
) -> Result<Value> {
// 首次运行state=None或未见过的会话用 24h 前作为起点,
// 避免第一次运行时把全量历史消息涌入
let fallback_ts = chrono::Utc::now().timestamp() - 86400;
// 1. 从 session.db 读取所有会话的当前 last_timestamp
let session_path = db.get("session/session.db").await?
.context("无法解密 session.db")?;
let all_sessions: Vec<(String, i64)> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&session_path)?;
let mut stmt = conn.prepare(
"SELECT username, last_timestamp FROM SessionTable WHERE last_timestamp > 0"
)?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1).unwrap_or(0)))
})?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok::<_, anyhow::Error>(rows)
}).await??;
// 2. 记录 session.db 的当前快照(用于构建 new_state 基础)
let session_ts_map: HashMap<String, i64> = all_sessions.iter()
.map(|(u, ts)| (u.clone(), *ts))
.collect();
// 3. 找出有新消息的会话
// 不在 state 中的会话(首次运行或新会话)以 fallback_ts 为基准
let changed: Vec<(String, i64)> = all_sessions.into_iter()
.filter(|(uname, ts)| {
let last_known = state.as_ref()
.and_then(|m| m.get(uname))
.copied()
.unwrap_or(fallback_ts);
*ts > last_known
})
.collect();
if changed.is_empty() {
return Ok(json!({
"count": 0,
"messages": [],
"new_state": session_ts_map,
}));
}
// 4. 只查询有新消息的会话的消息表
// per_table_limit 取 limit*5 防止单表截断,最终由全局 truncate 收尾
let per_table_limit = limit.saturating_mul(5).max(200);
let mut all_msgs: Vec<Value> = Vec::new();
for (uname, _) in &changed {
let since_ts = state.as_ref()
.and_then(|m| m.get(uname))
.copied()
.unwrap_or(fallback_ts);
let tables = find_msg_tables(db, names, uname).await?;
if tables.is_empty() { continue; }
let display = names.display(uname);
let chat_type = chat_type_of(uname, names);
let is_group = chat_type == "group";
let group_nicknames = if is_group {
load_group_nicknames(db, uname).await.unwrap_or_default()
} else {
HashMap::new()
};
for (db_path, table_name) in &tables {
let path = db_path.clone();
let tname = table_name.clone();
let uname2 = uname.clone();
let display2 = display.clone();
let names_map = names.map.clone();
let group_nicknames2 = group_nicknames.clone();
let tname_for_log = tname.clone();
let msgs: Vec<Value> = match tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path)?;
let id2u = load_id2u(&conn);
let sql = format!(
"SELECT local_id, local_type, create_time, real_sender_id,
message_content, WCDB_CT_message_content
FROM [{}] WHERE create_time > ? ORDER BY create_time ASC LIMIT ?",
tname
);
let rows: Vec<_> = conn.prepare(&sql)
.and_then(|mut stmt| {
stmt.query_map(
rusqlite::params![since_ts, per_table_limit as i64],
|row| Ok((
row.get::<_, i64>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
get_content_bytes(row, 4),
row.get::<_, i64>(5).unwrap_or(0),
)),
).map(|it| it.filter_map(|r| r.ok()).collect())
})
.unwrap_or_default();
let mut result = Vec::new();
for (local_id, local_type, ts, real_sender_id, content_bytes, ct) in rows {
let content = decompress_message(&content_bytes, ct);
let sender = sender_label(real_sender_id, &content, is_group, &uname2, &id2u, &names_map, &group_nicknames2);
let text = fmt_content(local_id, local_type, &content, is_group);
let url = appmsg_url_for_message(local_type, &content);
let mut msg = json!({
"chat": display2,
"username": uname2,
"is_group": is_group,
"chat_type": chat_type,
"timestamp": ts,
"time": fmt_time(ts, "%Y-%m-%d %H:%M"),
"sender": sender,
"content": text,
"type": fmt_type(local_type),
});
if let Some(u) = url {
msg["url"] = serde_json::Value::String(u);
}
result.push(msg);
}
Ok::<_, anyhow::Error>(result)
}).await {
Ok(Ok(v)) => v,
Ok(Err(e)) => { eprintln!("[new-messages] skip {}: {}", tname_for_log, e); continue; }
Err(e) => { eprintln!("[new-messages] task error: {}", e); continue; }
};
all_msgs.extend(msgs);
}
}
all_msgs.sort_by_key(|m| m["timestamp"].as_i64().unwrap_or(0));
all_msgs.truncate(limit);
// 5. 重建 new_state防止全局 limit 截断导致消息永久丢失:
// - 未变化的会话:沿用 session.db 的 last_timestamp即 session_ts_map
// - 变化但全被截断(无消息在最终结果中):
// * 后续调用 (state=Some):保留旧 since_ts下次重试拿这部分消息
// * 首次调用 (state=None)advance 到 session_ts避免 since_ts 锁死在
// fallback_ts 导致后续每次都回扫 24h。窗口会随调用次数 + 时间累积扩大,
// 性能持续衰退。代价:首次 + 被截断会话的老消息看不到,需走 `wx history`。
// - 变化且有消息返回advance 到该会话在结果中的最大 timestamp增量 fetch 标准语义)
let returned_max_ts: HashMap<String, i64> = {
let mut m: HashMap<String, i64> = HashMap::new();
for msg in &all_msgs {
if let (Some(u), Some(ts)) = (msg["username"].as_str(), msg["timestamp"].as_i64()) {
let e = m.entry(u.to_string()).or_insert(0);
if ts > *e { *e = ts; }
}
}
m
};
let mut new_state = session_ts_map;
for (uname, _) in &changed {
let in_results = returned_max_ts.contains_key(uname);
let prev = state.as_ref().and_then(|m| m.get(uname)).copied();
let next_ts = match (in_results, prev) {
(true, _) => {
// 有消息返回advance 到 returned_max返回的最大 ts 通常 ≤ session_ts
// 这样下次查 `since > returned_max` 仍能拿到 returned_max..session_ts 的截断尾巴。
returned_max_ts[uname]
}
(false, Some(prev)) => prev, // 后续 + 截断:保持旧 since
(false, None) => {
// 首次 + 截断advance 到 session_ts 兜底,避免 since_ts 锁死。
new_state.get(uname).copied().unwrap_or(fallback_ts)
}
};
new_state.insert(uname.clone(), next_ts);
}
Ok(json!({
"count": all_msgs.len(),
"messages": all_msgs,
"new_state": new_state,
}))
}
/// 查询收藏内容favorite/favorite.db 的 fav_db_item 表)
pub async fn q_favorites(
db: &DbCache,
limit: usize,
fav_type: Option<i64>,
query: Option<String>,
) -> Result<Value> {
let path = db.get("favorite/favorite.db").await?
.context("找不到 favorite.db请确认微信数据目录")?;
let rows: Vec<Value> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path)?;
let mut clauses: Vec<&'static str> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
if let Some(t) = fav_type {
clauses.push("type = ?");
params.push(Box::new(t));
}
let like_str: Option<String> = query.map(|q| {
let esc = q.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_");
format!("%{}%", esc)
});
if let Some(ref s) = like_str {
clauses.push("content LIKE ? ESCAPE '\\'");
params.push(Box::new(s.clone()));
}
let where_clause = if clauses.is_empty() {
String::new()
} else {
format!("WHERE {}", clauses.join(" AND "))
};
params.push(Box::new(limit as i64));
let sql = format!(
"SELECT local_id, type, update_time, content, fromusr, realchatname
FROM fav_db_item {} ORDER BY update_time DESC LIMIT ?",
where_clause
);
let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql)?;
let rows: Vec<Value> = stmt.query_map(params_ref.as_slice(), |row| {
Ok((
row.get::<_, i64>(0).unwrap_or(0),
row.get::<_, i64>(1).unwrap_or(0),
row.get::<_, i64>(2).unwrap_or(0),
row.get::<_, String>(3).unwrap_or_default(),
row.get::<_, String>(4).unwrap_or_default(),
row.get::<_, String>(5).unwrap_or_default(),
))
})?
.filter_map(|r| r.ok())
.map(|(local_id, ftype, ts, content, fromusr, chatname)| {
let type_str = match ftype {
1 => "文本",
2 => "图片",
5 => "文章",
19 => "名片",
20 => "视频",
_ => "其他",
};
// 安全截断(按 Unicode 字符而非字节)
let preview: String = content.chars().take(100).collect();
let preview = if content.chars().count() > 100 {
format!("{}...", preview)
} else {
preview
};
// WeChat 部分版本的 update_time 为毫秒10位以上判定为毫秒后转秒
let ts_secs = if ts > 9_999_999_999 { ts / 1000 } else { ts };
let mut item = json!({
"id": local_id,
"type": type_str,
"type_num": ftype,
"time": fmt_time(ts_secs, "%Y-%m-%d %H:%M"),
"timestamp": ts_secs,
"preview": preview,
"from": fromusr,
"chat": chatname,
});
if ftype == 5 {
if let Some(url) = extract_favorite_url(&content) {
item["url"] = Value::String(url);
}
}
item
})
.collect();
Ok::<_, anyhow::Error>(rows)
}).await??;
Ok(json!({
"count": rows.len(),
"items": rows,
}))
}
/// 聊天统计消息总数、类型分布、发言排行、24小时分布
pub async fn q_stats(
db: &DbCache,
names: &Names,
chat: &str,
since: Option<i64>,
until: Option<i64>,
) -> Result<Value> {
let username = resolve_username(chat, names)
.with_context(|| format!("找不到联系人: {}", chat))?;
let display = names.display(&username);
let chat_type = chat_type_of(&username, names);
let is_group = chat_type == "group";
let tables = find_msg_tables(db, names, &username).await?;
if tables.is_empty() {
anyhow::bail!("找不到 {} 的消息记录", display);
}
// 跨所有分片 DB 累计统计
let mut total: i64 = 0;
let mut type_counts: HashMap<String, i64> = HashMap::new();
let mut sender_counts: HashMap<String, i64> = HashMap::new();
let mut hour_counts = [0i64; 24];
let group_nicknames = if is_group {
load_group_nicknames(db, &username).await.unwrap_or_default()
} else {
HashMap::new()
};
for (db_path, table_name) in &tables {
let path = db_path.clone();
let tname = table_name.clone();
let uname = username.clone();
let is_group2 = is_group;
// 用 SQL GROUP BY 在数据库侧聚合,避免把全量消息内容加载进内存
let result: (i64, HashMap<String, i64>, HashMap<String, i64>, [i64; 24]) =
tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path)?;
let id2u = load_id2u(&conn);
let mut clauses = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
if let Some(s) = since {
clauses.push("create_time >= ?");
params.push(Box::new(s));
}
if let Some(u) = until {
clauses.push("create_time <= ?");
params.push(Box::new(u));
}
let where_clause = if clauses.is_empty() {
String::new()
} else {
format!("WHERE {}", clauses.join(" AND "))
};
let params_ref: Vec<&dyn rusqlite::types::ToSql> =
params.iter().map(|p| p.as_ref()).collect();
// 1. 总数
let count: i64 = conn.query_row(
&format!("SELECT COUNT(*) FROM [{}] {}", tname, where_clause),
params_ref.as_slice(),
|row| row.get(0),
).unwrap_or(0);
// 2. 类型分布SQL GROUP BY不加载消息内容
let type_sql = format!(
"SELECT (local_type & 0xFFFFFFFF), COUNT(*) FROM [{}] {} GROUP BY (local_type & 0xFFFFFFFF)",
tname, where_clause
);
let mut type_c: HashMap<String, i64> = HashMap::new();
if let Ok(mut stmt) = conn.prepare(&type_sql) {
let _ = stmt.query_map(params_ref.as_slice(), |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
}).map(|rows| {
for r in rows.flatten() {
*type_c.entry(fmt_type(r.0)).or_insert(0) += r.1;
}
});
}
// 3. 小时分布:只取时间戳,不加载消息内容
let hour_sql = format!(
"SELECT create_time FROM [{}] {}",
tname, where_clause
);
let mut hour_c = [0i64; 24];
if let Ok(mut stmt) = conn.prepare(&hour_sql) {
let _ = stmt.query_map(params_ref.as_slice(), |row| row.get::<_, i64>(0))
.map(|rows| {
for ts in rows.flatten() {
if let Some(dt) = Local.timestamp_opt(ts, 0).single() {
let h = dt.hour() as usize;
if h < 24 { hour_c[h] += 1; }
}
}
});
}
// 4. 发言排行:只取 real_sender_id不加载消息内容
// where_clause 可能已含 WHERE用 AND 追加而非重复写 WHERE
let sender_filter = if where_clause.is_empty() {
"WHERE real_sender_id > 0".to_string()
} else {
format!("{} AND real_sender_id > 0", where_clause)
};
let sender_sql = format!(
"SELECT real_sender_id, COUNT(*) FROM [{}] {} GROUP BY real_sender_id",
tname, sender_filter
);
let mut sender_c: HashMap<String, i64> = HashMap::new();
if is_group2 {
if let Ok(mut stmt) = conn.prepare(&sender_sql) {
let _ = stmt.query_map(params_ref.as_slice(), |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
}).map(|rows| {
for (id, cnt) in rows.flatten() {
if let Some(u) = id2u.get(&id) {
if u != &uname {
*sender_c.entry(u.clone()).or_insert(0) += cnt;
}
}
}
});
}
}
Ok::<_, anyhow::Error>((count, type_c, sender_c, hour_c))
}).await??;
let (count, type_c, sender_c, hour_c) = result;
total += count;
for (k, v) in type_c { *type_counts.entry(k).or_insert(0) += v; }
for (k, v) in sender_c { *sender_counts.entry(k).or_insert(0) += v; }
for i in 0..24 { hour_counts[i] += hour_c[i]; }
}
// 类型分布,按数量降序
let mut by_type: Vec<Value> = type_counts.iter()
.map(|(t, c)| json!({ "type": t, "count": c }))
.collect();
by_type.sort_by_key(|v| std::cmp::Reverse(v["count"].as_i64().unwrap_or(0)));
// 发言排行Top 10
let top_senders = group_top_senders(&sender_counts, &names.map, &group_nicknames, 10);
// 24小时分布
let by_hour: Vec<Value> = hour_counts.iter().enumerate()
.map(|(h, c)| json!({ "hour": h, "count": c }))
.collect();
Ok(json!({
"chat": display,
"username": username,
"is_group": is_group,
"chat_type": chat_type,
"total": total,
"by_type": by_type,
"top_senders": top_senders,
"by_hour": by_hour,
}))
}
/// 查询朋友圈互动通知(点赞 + 评论),对应微信 app 右上角的红点入口。
/// 空 `content` 是点赞,非空是评论正文。
pub async fn q_sns_notifications(
db: &DbCache,
names: &Names,
limit: usize,
since: Option<i64>,
until: Option<i64>,
include_read: bool,
) -> Result<Value> {
let path = db.get("sns/sns.db").await?
.context("无法解密 sns.db")?;
let path2 = path.clone();
type Row = (i64, i64, i64, i64, String, String, String);
let rows: Vec<Row> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path2)?;
let mut clauses: Vec<&str> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
if !include_read {
clauses.push("is_unread = 1");
}
if let Some(s) = since {
clauses.push("create_time >= ?");
params.push(Box::new(s));
}
if let Some(u) = until {
clauses.push("create_time <= ?");
params.push(Box::new(u));
}
let where_clause = if clauses.is_empty() {
String::new()
} else {
format!("WHERE {}", clauses.join(" AND "))
};
let sql = format!(
"SELECT local_id, create_time, type, feed_id, from_username, from_nickname, content
FROM SnsMessage_tmp3 {} ORDER BY create_time DESC LIMIT ?",
where_clause
);
params.push(Box::new(limit as i64));
let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(params_ref.as_slice(), |row| Ok((
row.get::<_, i64>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, i64>(2).unwrap_or(0),
row.get::<_, i64>(3).unwrap_or(0),
row.get::<_, String>(4).unwrap_or_default(),
row.get::<_, String>(5).unwrap_or_default(),
row.get::<_, String>(6).unwrap_or_default(),
)))?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok::<_, anyhow::Error>(rows)
}).await??;
// 一次性取出涉及的 feed 原帖,避免 N+1 查询
let feed_ids: Vec<i64> = {
let mut v: Vec<i64> = rows.iter().map(|r| r.3).collect();
v.sort_unstable();
v.dedup();
v
};
let path3 = path.clone();
let feed_ids_clone = feed_ids.clone();
let feeds: HashMap<i64, (String, String)> = tokio::task::spawn_blocking(move || {
if feed_ids_clone.is_empty() {
return Ok::<_, anyhow::Error>(HashMap::new());
}
let conn = Connection::open(&path3)?;
let placeholders = std::iter::repeat("?").take(feed_ids_clone.len()).collect::<Vec<_>>().join(",");
let sql = format!(
"SELECT tid, user_name, content FROM SnsTimeLine WHERE tid IN ({})",
placeholders
);
let params: Vec<&dyn rusqlite::types::ToSql> =
feed_ids_clone.iter().map(|id| id as &dyn rusqlite::types::ToSql).collect();
let mut stmt = conn.prepare(&sql)?;
let mut map = HashMap::new();
let mut rows2 = stmt.query(params.as_slice())?;
while let Some(row) = rows2.next()? {
let tid: i64 = row.get(0)?;
let author: String = row.get::<_, String>(1).unwrap_or_default();
let content: String = row.get::<_, String>(2).unwrap_or_default();
let preview = extract_xml_text(&content, "contentDesc")
.map(|s| s.chars().take(60).collect::<String>())
.unwrap_or_default();
// 原帖 user_name 偶尔为空(转发帖),再从 XML 兜一下
let author = if author.is_empty() {
extract_xml_text(&content, "username").unwrap_or_default()
} else {
author
};
map.insert(tid, (author, preview));
}
Ok(map)
}).await??;
let mut out = Vec::with_capacity(rows.len());
for (_local_id, ct, _typ, fid, from_u, from_nick, content) in rows {
let kind = if content.trim().is_empty() { "like" } else { "comment" };
let display = if !from_nick.is_empty() {
from_nick.clone()
} else {
names.display(&from_u)
};
let (feed_author_u, feed_preview) = feeds.get(&fid)
.cloned()
.unwrap_or_default();
let feed_author_display = if feed_author_u.is_empty() {
String::new()
} else {
names.display(&feed_author_u)
};
out.push(json!({
"type": kind,
"time": fmt_time(ct, "%m-%d %H:%M"),
"timestamp": ct,
"from_username": from_u,
"from_nickname": display,
"content": content,
"feed_id": fid,
"feed_author_username": feed_author_u,
"feed_author": feed_author_display,
"feed_preview": feed_preview,
}));
}
let total = out.len();
Ok(json!({ "notifications": out, "total": total }))
}
// 朋友圈扫描的硬上限:单次查询最多解析这么多行 SnsTimeLine
// 防止用户传超大 limit 或者底层数据异常时把 daemon 卡住。
// 当前账号 ~10k+ 帖子5w 上限留足缓冲。
const SNS_MAX_LIMIT: usize = 10_000;
const SNS_MAX_SCAN: usize = 50_000;
/// 转义 SQL LIKE 模式中的元字符。配合 `ESCAPE '\\'` 使用。
/// 反斜杠必须最先转义,否则后续替换出的 `\%` / `\_` 会被再次吞掉。
fn escape_like_pattern(s: &str) -> String {
s.replace('\\', r"\\")
.replace('%', r"\%")
.replace('_', r"\_")
}
fn xml_child<'a, 'input>(node: Node<'a, 'input>, tag: &str) -> Option<Node<'a, 'input>> {
node.children()
.find(|child| child.is_element() && child.has_tag_name(tag))
}
fn xml_text<'a, 'input>(node: Option<Node<'a, 'input>>) -> Option<String> {
node.and_then(|n| n.text())
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string)
}
fn xml_attr<'a, 'input>(node: Option<Node<'a, 'input>>, attr: &str) -> Option<String> {
node.and_then(|n| n.attribute(attr))
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string)
}
fn insert_media_string(out: &mut serde_json::Map<String, Value>, key: &str, value: Option<String>) {
if let Some(value) = value {
out.insert(key.to_string(), Value::String(value));
}
}
fn insert_media_i64(out: &mut serde_json::Map<String, Value>, key: &str, value: Option<i64>) {
if let Some(value) = value {
out.insert(key.to_string(), Value::from(value));
}
}
/// 从已经定位到的 `<TimelineObject>` 节点里抽 `<mediaList>/<media>` 数组。
/// 字段名与 artifacts 仓库 `wechat_sns_dump.py::_parse_media` 对齐,
/// 便于跨实现 diff。缺失字段直接省略不输出 null供下游代理图片 / 离线渲染。
fn parse_media_from_timeline(timeline: Node) -> Vec<Value> {
let Some(media_list) = xml_child(timeline, "ContentObject")
.and_then(|node| xml_child(node, "mediaList"))
else {
return Vec::new();
};
media_list
.children()
.filter(|node| node.is_element() && node.has_tag_name("media"))
.map(|media| {
let url_el = xml_child(media, "url");
let thumb_el = xml_child(media, "thumb");
let size_el = xml_child(media, "size");
let mut out = serde_json::Map::new();
insert_media_string(&mut out, "type", xml_text(xml_child(media, "type")));
insert_media_string(&mut out, "sub_type", xml_text(xml_child(media, "sub_type")));
insert_media_string(&mut out, "url", xml_text(url_el));
insert_media_string(&mut out, "thumb", xml_text(thumb_el));
insert_media_string(&mut out, "md5", xml_attr(url_el, "md5"));
insert_media_string(&mut out, "url_key", xml_attr(url_el, "key"));
insert_media_string(&mut out, "url_token", xml_attr(url_el, "token"));
insert_media_string(&mut out, "url_enc_idx", xml_attr(url_el, "enc_idx"));
insert_media_string(&mut out, "thumb_key", xml_attr(thumb_el, "key"));
insert_media_string(&mut out, "thumb_token", xml_attr(thumb_el, "token"));
insert_media_string(&mut out, "thumb_enc_idx", xml_attr(thumb_el, "enc_idx"));
insert_media_i64(
&mut out,
"width",
xml_attr(size_el, "width").and_then(|v| v.parse::<i64>().ok()),
);
insert_media_i64(
&mut out,
"height",
xml_attr(size_el, "height").and_then(|v| v.parse::<i64>().ok()),
);
insert_media_i64(
&mut out,
"total_size",
xml_attr(size_el, "totalSize").and_then(|v| v.parse::<i64>().ok()),
);
insert_media_string(
&mut out,
"video_md5",
xml_text(xml_child(media, "videomd5")),
);
insert_media_i64(
&mut out,
"video_duration",
xml_text(xml_child(media, "videoDuration")).and_then(|v| v.parse::<i64>().ok()),
);
Value::Object(out)
})
.collect()
}
/// 从 `SnsTimeLine.content` 整段 XML 抽 media[]。仅供单测使用 —— 生产路径走
/// `parse_post_xml`,那边已经把整份 doc parse 一次直接复用 timeline 节点。
#[cfg(test)]
fn parse_post_media(xml: &str) -> Vec<Value> {
let Ok(doc) = Document::parse(xml) else { return Vec::new(); };
let Some(timeline) = doc.descendants().find(|n| n.has_tag_name("TimelineObject")) else {
return Vec::new();
};
parse_media_from_timeline(timeline)
}
/// SnsTimeLine 行解析产物。不含 display name依赖 Names需要出 spawn_blocking 再补)。
struct ParsedPost {
tid: i64,
create_time: i64,
author_username: String,
content: String,
media: Vec<Value>,
location: String,
}
fn parse_post_xml_fallback(tid: i64, user_name_column: &str, content: &str) -> ParsedPost {
let create_time = extract_xml_text(content, "createTime")
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0);
let text = extract_xml_text(content, "contentDesc")
.map(|s| unescape_html(&s))
.unwrap_or_default();
let author_username = if user_name_column.is_empty() {
extract_xml_text(content, "username")
.map(|s| unescape_html(&s))
.unwrap_or_default()
} else {
user_name_column.to_string()
};
let location = extract_xml_attr(content, "location", "poiName")
.map(|s| unescape_html(&s))
.unwrap_or_default();
ParsedPost {
tid,
create_time,
author_username,
content: text,
media: Vec::new(),
location,
}
}
/// 纯 XML 解析,无 Names 依赖,可以在 spawn_blocking 里跑。
/// user_name_column 为空时从 TimelineObject/<username> 兜底(转发帖)。
///
/// 单 roxmltree DOM 解析一次出全部字段createTime / contentDesc / username / media / location
/// 取代旧版 regex + DOM 双解析。XML entity 解码(`&lt;` / `&amp;` 等)由 roxmltree 自动处理,
/// 旧版 `extract_xml_text` 是字符串扫描不解码 —— 因此 `content` / `location` / `username` 字段
/// 现在会输出解码后的文本,对下游是更正确的语义。
/// 如果 XML 已损坏到无法 DOM parse或缺少 `TimelineObject`,则退回轻量 string
/// fallback尽量保住 createTime / contentDesc / username / location避免一条帖子
/// 因为局部坏 XML 被整体打成零值,影响排序 / 搜索 / 作者过滤语义。
fn parse_post_xml(tid: i64, user_name_column: &str, content: &str) -> ParsedPost {
let Ok(doc) = Document::parse(content) else {
return parse_post_xml_fallback(tid, user_name_column, content);
};
let Some(timeline) = doc.descendants().find(|n| n.has_tag_name("TimelineObject")) else {
return parse_post_xml_fallback(tid, user_name_column, content);
};
let create_time = xml_text(xml_child(timeline, "createTime"))
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0);
let text = xml_text(xml_child(timeline, "contentDesc")).unwrap_or_default();
let author_username = if user_name_column.is_empty() {
xml_text(xml_child(timeline, "username")).unwrap_or_default()
} else {
user_name_column.to_string()
};
let media = parse_media_from_timeline(timeline);
let location = xml_child(timeline, "location")
.and_then(|n| n.attribute("poiName"))
.map(str::to_string)
.unwrap_or_default();
ParsedPost { tid, create_time, author_username, content: text, media, location }
}
fn post_to_value(p: ParsedPost, names: &Names) -> Value {
let author = if p.author_username.is_empty() {
String::new()
} else {
names.display(&p.author_username)
};
json!({
"tid": p.tid,
"timestamp": p.create_time,
"time": fmt_time(p.create_time, "%Y-%m-%d %H:%M"),
"author_username": p.author_username,
"author": author,
"content": p.content,
"media_count": p.media.len() as i64,
"media": p.media,
"location": p.location,
})
}
/// 查询朋友圈时间线:按时间/作者筛选。用于浏览自己或好友的朋友圈。
pub async fn q_sns_feed(
db: &DbCache,
names: &Names,
limit: usize,
since: Option<i64>,
until: Option<i64>,
user: Option<&str>,
) -> Result<Value> {
let path = db.get("sns/sns.db").await?
.context("无法解密 sns.db")?;
let limit = limit.min(SNS_MAX_LIMIT);
let user_uname = match user {
Some(q) => Some(
resolve_username(q, names)
.with_context(|| format!("找不到联系人: {}", q))?,
),
None => None,
};
// user 过滤不在 SQL 层做SnsTimeLine.user_name 列对部分(转发)帖子是空,
// 真正作者只在 XML <username> 里。SQL 层 `user_name = ?` 会把这部分提前漏掉,
// 让 parse_post_xml 的 fallback 失效。所以扫全表 → parse → 用 ParsedPost.author_username 过滤。
// (createTime 也不是列,本来就要扫全表 parse XML 才能正确按时间排序。)
let path2 = path.clone();
let parsed: Vec<ParsedPost> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path2)?;
let sql = "SELECT tid, user_name, content FROM SnsTimeLine ORDER BY tid DESC";
let mut stmt = conn.prepare(sql)?;
let rows = stmt.query_map([], |row| Ok((
row.get::<_, i64>(0)?,
row.get::<_, String>(1).unwrap_or_default(),
row.get::<_, String>(2).unwrap_or_default(),
)))?;
let mut scanned = 0usize;
let mut out: Vec<ParsedPost> = Vec::new();
for row in rows {
scanned += 1;
if scanned > SNS_MAX_SCAN {
eprintln!(
"[sns_feed] scan 超过硬上限 {},结果可能不完整。建议加 --user / --since 缩小范围。",
SNS_MAX_SCAN
);
break;
}
let (tid, uname, content) = row?;
let p = parse_post_xml(tid, &uname, &content);
if let Some(u) = user_uname.as_ref() { if &p.author_username != u { continue; } }
if let Some(s) = since { if p.create_time < s { continue; } }
if let Some(u) = until { if p.create_time > u { continue; } }
out.push(p);
}
// tid DESC 不严格等于 createTime DESC不同账号 tid 生成算法不同),
// 所以要先收齐全部匹配的、按 create_time 排序,再 truncate —— 否则会丢帖。
out.sort_by_key(|p| std::cmp::Reverse(p.create_time));
out.truncate(limit);
Ok::<_, anyhow::Error>(out)
}).await??;
let posts: Vec<Value> = parsed.into_iter().map(|p| post_to_value(p, names)).collect();
let total = posts.len();
Ok(json!({ "posts": posts, "total": total }))
}
/// 搜索朋友圈全文:在 contentDesc正文里匹配 keyword可叠加时间 / 作者过滤。
pub async fn q_sns_search(
db: &DbCache,
names: &Names,
keyword: &str,
limit: usize,
since: Option<i64>,
until: Option<i64>,
user: Option<&str>,
) -> Result<Value> {
if keyword.trim().is_empty() {
anyhow::bail!("搜索关键词不能为空");
}
let path = db.get("sns/sns.db").await?
.context("无法解密 sns.db")?;
let limit = limit.min(SNS_MAX_LIMIT);
let user_uname = match user {
Some(q) => Some(
resolve_username(q, names)
.with_context(|| format!("找不到联系人: {}", q))?,
),
None => None,
};
// SQL LIKE 在 content 上粗筛 keyword这步省掉绝大多数行的 XML parse 开销)。
// user 不在 SQL 层过滤,原因同 q_sns_feedSnsTimeLine.user_name 列对部分(转发)
// 帖子为空,真实作者只在 XML <username> 里。
let like_pattern = format!("%{}%", escape_like_pattern(keyword));
let keyword_owned = keyword.to_string();
let path2 = path.clone();
let parsed: Vec<ParsedPost> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path2)?;
let sql = "SELECT tid, user_name, content FROM SnsTimeLine \
WHERE content LIKE ? ESCAPE '\\' ORDER BY tid DESC";
let mut stmt = conn.prepare(sql)?;
let rows = stmt.query_map([&like_pattern], |row| Ok((
row.get::<_, i64>(0)?,
row.get::<_, String>(1).unwrap_or_default(),
row.get::<_, String>(2).unwrap_or_default(),
)))?;
let needle = keyword_owned.to_lowercase();
let mut scanned = 0usize;
let mut out: Vec<ParsedPost> = Vec::new();
for row in rows {
scanned += 1;
if scanned > SNS_MAX_SCAN {
eprintln!(
"[sns_search] scan 超过硬上限 {},结果可能不完整。建议缩小 keyword 或加 --user / --since。",
SNS_MAX_SCAN
);
break;
}
let (tid, uname, content) = row?;
let desc = extract_xml_text(&content, "contentDesc").unwrap_or_default();
if !desc.to_lowercase().contains(&needle) { continue; }
let p = parse_post_xml(tid, &uname, &content);
if let Some(u) = user_uname.as_ref() { if &p.author_username != u { continue; } }
if let Some(s) = since { if p.create_time < s { continue; } }
if let Some(u) = until { if p.create_time > u { continue; } }
out.push(p);
}
out.sort_by_key(|p| std::cmp::Reverse(p.create_time));
out.truncate(limit);
Ok::<_, anyhow::Error>(out)
}).await??;
let posts: Vec<Value> = parsed.into_iter().map(|p| post_to_value(p, names)).collect();
let total = posts.len();
Ok(json!({ "keyword": keyword, "posts": posts, "total": total }))
}
// ─── 公众号文章查询 ───────────────────────────────────────────────────────────
/// 一条公众号文章的解析产物
#[derive(Debug)]
struct BizArticle {
/// 接收该推送的时间戳(即消息的 create_time
recv_time: i64,
/// 公众号 username
account_username: String,
/// 文章标题
title: String,
/// 文章链接
url: String,
/// 摘要
digest: String,
/// 封面图
cover: String,
/// 文章发布时间pub_time单位秒
pub_time: i64,
}
/// 从 biz_message 表的单条 XML 解析出全部 article items
fn parse_biz_xml_items(recv_time: i64, account_username: &str, xml: &str) -> Vec<BizArticle> {
let mut items = Vec::new();
let mut search_from = 0;
loop {
let Some(item_start) = xml[search_from..].find("<item>") else { break; };
let abs_start = search_from + item_start;
let Some(item_end) = xml[abs_start..].find("</item>") else { break; };
let abs_end = abs_start + item_end + 7;
let item_xml = &xml[abs_start..abs_end];
let title = extract_cdata(item_xml, "title").unwrap_or_default();
let url = extract_cdata(item_xml, "url").unwrap_or_default();
// Skip items with no URL or empty title (e.g. payment entries)
if url.is_empty() || title.is_empty() {
search_from = abs_end;
continue;
}
let digest = extract_cdata(item_xml, "digest").unwrap_or_default();
let cover = extract_cdata(item_xml, "cover").unwrap_or_default();
let pub_time = extract_xml_text(item_xml, "pub_time")
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(recv_time);
items.push(BizArticle {
recv_time,
account_username: account_username.to_string(),
title,
url,
digest,
cover,
pub_time,
});
search_from = abs_end;
}
items
}
/// 提取 CDATA 或普通文本内容: `<tag><![CDATA[...]]></tag>` 或 `<tag>...</tag>`
///
/// 注意: 内容匹配到 `</tag>` 之前的内容。CDATA 块中的 "]]"已在 "]]\x3e" 之前,
/// 所以 inner 为 `<![CDATA[content]]>` 或 `<![CDATA[content]]` (如果 ">" 被 close tag 吸掉)
fn extract_cdata(xml: &str, tag: &str) -> Option<String> {
let open = format!("<{}>", tag);
let close = format!("</{}>", tag);
let start = xml.find(&open)? + open.len();
let end = xml[start..].find(&close)?;
let inner = xml[start..start + end].trim();
if inner.starts_with("<![CDATA[") {
// inner = `<![CDATA[content]]>` → strip 9-char `<![CDATA[` prefix + 3-char `]]>` suffix
let body = &inner[9..];
// Strip `]]>` (normal) or `]]` (edge case)
let cdata_end = b"]]>";
let cdata_end2 = b"]]";
let content: &str = if body.as_bytes().ends_with(cdata_end) {
&body[..body.len() - 3]
} else if body.as_bytes().ends_with(cdata_end2) {
&body[..body.len() - 2]
} else {
body
};
let content = content.trim();
if content.is_empty() { None } else { Some(content.to_string()) }
} else if inner.is_empty() {
None
} else {
Some(unescape_html(inner))
}
}
/// 查询公众号文章推送biz_message_0.db
///
/// 每条消息可能包含多篇文章(多图文推送)。返回所有文章展开就的平底列表。
pub async fn q_biz_articles(
db: &DbCache,
names: &Names,
limit: usize,
account: Option<String>,
since: Option<i64>,
until: Option<i64>,
unread: bool,
) -> Result<Value> {
let biz_path = db.get("message/biz_message_0.db").await?
.context("无法解密 biz_message_0.db请确认 all_keys.json 包含对应密钥")?
;
// 开启 --unread从 session.db 拿“公众号 + unread_count>0”的 username 子集,
// 作为合集过滤(与 --account 取交集),后续结果按 account_username 去重取顶 1 篇。
let unread_usernames: Option<std::collections::HashSet<String>> = if unread {
let session_path = db.get("session/session.db").await?
.context("无法解密 session.db")?;
let session_path2 = session_path.clone();
let unread_rows: Vec<String> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&session_path2)?;
let mut stmt = conn.prepare(
"SELECT username FROM SessionTable WHERE unread_count > 0"
)?;
let rows: Vec<String> = stmt.query_map([], |row| row.get::<_, String>(0))?
.filter_map(|r| r.ok())
.collect();
Ok::<_, anyhow::Error>(rows)
}).await??;
// 仅保留公众号类型的未读会话
let set: std::collections::HashSet<String> = unread_rows.into_iter()
.filter(|u| chat_type_of(u, names) == "official_account")
.collect();
if set.is_empty() {
// 没有未读公众号 → 直接空返回,避免打 biz 表扫描
return Ok(json!({ "count": 0, "articles": [] }));
}
Some(set)
} else {
None
};
// 1. 从 Name2Id 表获取 rowid -> username 映射,再推导 md5 -> username
let biz_path2 = biz_path.clone();
let id2username: HashMap<i64, String> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&biz_path2)?;
let mut stmt = conn.prepare("SELECT rowid, user_name FROM Name2Id WHERE user_name LIKE 'gh_%'")?
;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
})?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok::<_, anyhow::Error>(rows.into_iter().collect())
}).await??;
// 构建 md5(username) -> username 映射
let md5_to_uname: HashMap<String, String> = id2username.values()
.map(|u| (format!("{:x}", md5::compute(u.as_bytes())), u.clone()))
.collect();
// 2. 如果 指定了 --account找到匹配的 username 列表
let account_low = account.as_deref().map(|s| s.to_lowercase());
let mut target_usernames: Option<Vec<String>> = account_low.as_ref().map(|low| {
id2username.values()
.filter(|u| {
let display = names.display(u);
display.to_lowercase().contains(low.as_str())
|| u.to_lowercase().contains(low.as_str())
})
.cloned()
.collect()
});
// --unread 与 --account 取交集(进一步缩小范围)
if let Some(ref unread_set) = unread_usernames {
target_usernames = Some(match target_usernames.take() {
Some(acc_list) => acc_list.into_iter()
.filter(|u| unread_set.contains(u))
.collect(),
None => unread_set.iter().cloned().collect(),
});
// 交集为空 → 提前返回
if target_usernames.as_ref().map(|v| v.is_empty()).unwrap_or(false) {
return Ok(json!({ "count": 0, "articles": [] }));
}
}
// 3. 进行数据库查询
let biz_path3 = biz_path.clone();
let since2 = since;
let until2 = until;
let target_hashes: Option<Vec<String>> = target_usernames.as_ref().map(|unames| {
unames.iter()
.map(|u| format!("{:x}", md5::compute(u.as_bytes())))
.collect()
});
let rows: Vec<(String, i64, i64, Vec<u8>, i64)> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&biz_path3)?;
// 列出所有 Msg_<hash> 表
let mut stmt = conn.prepare(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'"
)?;
let table_names: Vec<String> = stmt.query_map([], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
let re = regex::Regex::new(r"^Msg_[0-9a-f]{32}$").unwrap();
let mut all_rows: Vec<(String, i64, i64, Vec<u8>, i64)> = Vec::new();
for tname in &table_names {
if !re.is_match(tname) { continue; }
let hash = &tname[4..];
// account 过滤
if let Some(ref hashes) = target_hashes {
if !hashes.iter().any(|h| h == hash) { continue; }
}
let username = md5_to_uname.get(hash).cloned().unwrap_or_default();
// 构建过滤条件
let mut clauses: Vec<String> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
// local_type & 0xFFFFFFFF = 49 是 appmsg公众号文章
clauses.push("(local_type & 4294967295) = 49".to_string());
if let Some(s) = since2 {
clauses.push("create_time >= ?".to_string());
params.push(Box::new(s));
}
if let Some(u) = until2 {
clauses.push("create_time <= ?".to_string());
params.push(Box::new(u));
}
let where_clause = format!("WHERE {}", clauses.join(" AND "));
let sql = format!(
"SELECT create_time, WCDB_CT_message_content, message_content \
FROM [{}] {} ORDER BY create_time DESC",
tname, where_clause
);
let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
if let Ok(mut inner_stmt) = conn.prepare(&sql) {
let msg_rows: Vec<_> = inner_stmt
.query_map(params_ref.as_slice(), |row| {
Ok((
username.clone(),
row.get::<_, i64>(0)?,
row.get::<_, i64>(1).unwrap_or(0),
get_content_bytes(row, 2),
0i64,
))
})
.map(|it| it.filter_map(|r| r.ok()).collect())
.unwrap_or_default();
all_rows.extend(msg_rows);
}
}
Ok::<_, anyhow::Error>(all_rows)
}).await??;
// 4. 解压并解析 XML
let mut articles: Vec<BizArticle> = Vec::new();
for (username, recv_time, ct, content_bytes, _) in rows {
let content = decompress_message(&content_bytes, ct);
if content.is_empty() { continue; }
let items = parse_biz_xml_items(recv_time, &username, &content);
articles.extend(items);
}
// 5. 按 pub_time DESC 排序
articles.sort_by_key(|a| std::cmp::Reverse(a.pub_time));
// --unread 语义 A每个公众号只保留最新 1 篇(已按 pub_time 排序,取首条即可)
if unread {
let mut seen = std::collections::HashSet::<String>::new();
articles.retain(|a| seen.insert(a.account_username.clone()));
}
articles.truncate(limit);
let results: Vec<Value> = articles.into_iter().map(|a| {
let account_display = names.display(&a.account_username);
json!({
"time": fmt_time(a.pub_time, "%Y-%m-%d %H:%M"),
"timestamp": a.pub_time,
"recv_time": a.recv_time,
"recv_time_str": fmt_time(a.recv_time, "%Y-%m-%d %H:%M"),
"account": account_display,
"account_username": a.account_username,
"title": a.title,
"url": a.url,
"digest": a.digest,
"cover_url": a.cover,
})
}).collect();
Ok(json!({ "count": results.len(), "articles": results }))
}
// ─── 附件(图片 / 视频 / 文件 / 语音)查询与提取 ─────────────────────────────────
//
// 设计要点:
// - `q_attachments` 只走 `Msg_<chat_md5>` 表,按 `local_type & 0xFFFFFFFF IN (...)` 过滤
// 出附件消息行,再编出 `attachment_id`。**不**去翻 `message_resource.db`,因为列出动作
// 要可枚举几千条resource lookup 留到 `q_extract` 才做。
// - `q_extract` 走完整链:`AttachmentId` → `message_resource.db` 查 md5 →
// `<wxchat_base>/msg/attach/...` 找 .dat → 按 magic 分发到 v1/v2 decoder → 写盘。
// - V2 image AES key 通过 `image_key::default_provider()` 拿codex 后续填实现)。
// 缺 key 时 V2 解码会返回明确错误CLI 直接抛给用户。
/// 列出某会话内的附件消息(默认 image可多选。返回每条的 `attachment_id`
/// 后续传给 `Extract` 才真正读 message_resource.db + 解密 .dat。
pub async fn q_attachments(
db: &DbCache,
names: &Names,
chat: &str,
kinds: Option<Vec<String>>,
limit: usize,
offset: usize,
since: Option<i64>,
until: Option<i64>,
) -> Result<Value> {
use crate::attachment::{AttachmentId, AttachmentKind};
let username = resolve_username(chat, names)
.with_context(|| format!("找不到联系人: {}", chat))?;
let display = names.display(&username);
let chat_type = chat_type_of(&username, names);
let is_group = chat_type == "group";
// 解析 kinds → 低 32 bit local_type 集合
let kind_filters: Vec<(AttachmentKind, i64)> = parse_attachment_kinds(kinds.as_deref())?;
if kind_filters.is_empty() {
anyhow::bail!("kinds 为空 — 至少传一种 image/video/file/voice");
}
let lo32_types: Vec<i64> = kind_filters.iter().map(|(_, t)| *t).collect();
// local_type → AttachmentKind 反查mask 完后定 kind
let type_to_kind: HashMap<i64, AttachmentKind> = kind_filters.iter()
.map(|(k, t)| (*t, *k))
.collect();
let tables = find_msg_tables(db, names, &username).await?;
if tables.is_empty() {
anyhow::bail!("找不到 {} 的消息记录", display);
}
// 群聊需要 sender 显示名
let group_nicknames = if is_group {
load_group_nicknames(db, &username).await.unwrap_or_default()
} else {
HashMap::new()
};
let mut all_rows: Vec<(i64, i64, i64, i64, String, i64, i64)> = Vec::new();
// 元组:(local_id, local_type_lo32, create_time, real_sender_id, sender_label, ts_for_sort, db_idx)
for (db_idx, (db_path, table_name)) in tables.iter().enumerate() {
let path = db_path.clone();
let tname = table_name.clone();
let uname = username.clone();
let is_group2 = is_group;
let names_map = names.map.clone();
let group_nicknames2 = group_nicknames.clone();
let lo32_types2 = lo32_types.clone();
let since2 = since;
let until2 = until;
// per-DB 软上限避免巨群全量加载
let per_db_cap = (offset + limit).max(limit) * 2;
let db_idx2 = db_idx as i64;
let rows: Vec<(i64, i64, i64, i64, String, i64, i64)> =
tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path)?;
let id2u = load_id2u(&conn);
// local_type 在 DB 里可能带高位 flag过滤要 mask 低 32 bit
let placeholders = lo32_types2.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let mut clauses: Vec<String> = vec![
format!("(local_type & 4294967295) IN ({})", placeholders),
];
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = lo32_types2.iter()
.map(|t| Box::new(*t) as Box<dyn rusqlite::types::ToSql>)
.collect();
if let Some(s) = since2 {
clauses.push("create_time >= ?".into());
params.push(Box::new(s));
}
if let Some(u) = until2 {
clauses.push("create_time <= ?".into());
params.push(Box::new(u));
}
let where_clause = format!("WHERE {}", clauses.join(" AND "));
let sql = format!(
"SELECT local_id, local_type, create_time, real_sender_id,
message_content, WCDB_CT_message_content
FROM [{}] {} ORDER BY create_time DESC LIMIT ?",
tname, where_clause
);
params.push(Box::new(per_db_cap as i64));
let params_ref: Vec<&dyn rusqlite::types::ToSql> =
params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql)?;
let rows: Vec<(i64, i64, i64, i64, String, i64, i64)> = stmt
.query_map(params_ref.as_slice(), |row| {
let local_id: i64 = row.get(0)?;
let raw_type: i64 = row.get(1)?;
let lo32 = (raw_type as u64 & 0xFFFFFFFF) as i64;
let ts: i64 = row.get(2)?;
let real_sender_id: i64 = row.get(3)?;
let content_bytes = get_content_bytes(row, 4);
let ct: i64 = row.get::<_, i64>(5).unwrap_or(0);
let content = decompress_message(&content_bytes, ct);
let sender = if is_group2 {
sender_label(real_sender_id, &content, true, &uname,
&id2u, &names_map, &group_nicknames2)
} else {
String::new()
};
Ok((local_id, lo32, ts, real_sender_id, sender, ts, db_idx2))
})?
.filter_map(|r| r.ok())
.collect();
Ok::<_, anyhow::Error>(rows)
})
.await??;
all_rows.extend(rows);
}
// 全局按 ts DESC 排序后分页
all_rows.sort_by_key(|r| std::cmp::Reverse(r.5));
let paged: Vec<_> = all_rows.into_iter().skip(offset).take(limit).collect();
// 翻成 JSON
let mut results: Vec<Value> = Vec::with_capacity(paged.len());
for (local_id, lo32, ts, _real_sender_id, sender, _ts2, _db_idx) in paged {
let kind = type_to_kind.get(&lo32).copied()
.unwrap_or(AttachmentKind::Image); // 理论不会 fallthrough
let id = AttachmentId {
v: 1,
chat: username.clone(),
local_id,
create_time: ts,
kind,
db: None,
};
let id_str = id.encode()?;
let mut row = json!({
"attachment_id": id_str,
"kind": kind.as_str(),
"type": fmt_type(lo32),
"local_id": local_id,
"timestamp": ts,
"time": fmt_time(ts, "%Y-%m-%d %H:%M"),
});
if is_group && !sender.is_empty() {
row["sender"] = Value::String(sender);
}
results.push(row);
}
Ok(json!({
"chat": display,
"username": username,
"is_group": is_group,
"chat_type": chat_type,
"count": results.len(),
"attachments": results,
}))
}
/// 解码 attachment_id → 查 message_resource.db → 找本地 .dat → 解密 → 写盘。
pub async fn q_extract(
db: &DbCache,
_names: &Names,
attachment_id: &str,
output: &str,
overwrite: bool,
) -> Result<Value> {
use crate::attachment::{
attachment_id::AttachmentId,
decoder::{self, V2KeyMaterial},
image_key,
resolver,
};
let id = AttachmentId::decode(attachment_id)
.context("解析 attachment_id 失败(不是合法 base64url(json)")?;
let output_path = std::path::PathBuf::from(output);
if output_path.exists() && !overwrite {
anyhow::bail!(
"目标已存在:{}(加 --overwrite 覆盖)",
output_path.display()
);
}
if let Some(parent) = output_path.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await
.with_context(|| format!("创建输出目录失败:{}", parent.display()))?;
}
}
// 1) 拿 message_resource.db
let resource_path = db.get("message/message_resource.db").await?
.context("无法解密 message_resource.db请确认 all_keys.json 包含该 DB 的密钥)")?;
// 2) 推 wxchat_base = db_dir.parent(),再拼 attach_root
let wxchat_base = db.db_dir().parent()
.ok_or_else(|| anyhow::anyhow!("db_dir 没有 parent无法推断 xwechat_files 根目录"))?
.to_path_buf();
let attach_root = resolver::attach_root_for(&wxchat_base);
// 3) blocking pool 跑 resolver + 读盘 + 解码
let id_for_task = id.clone();
let resource_path2 = resource_path.clone();
let attach_root2 = attach_root.clone();
let wxchat_base2 = wxchat_base.clone();
let output_path2 = output_path.clone();
let report: Value = tokio::task::spawn_blocking(move || -> Result<Value> {
let resolved = resolver::resolve_blocking(&id_for_task, &resource_path2, &attach_root2)?;
let dat_bytes = std::fs::read(&resolved.dat_path)
.with_context(|| format!("读取 .dat 失败:{}", resolved.dat_path.display()))?;
// V2 image key — 平台相关。`ImageKeyMaterial` 同时给 aes_key + xor_key。
// xor_key 不能硬编码 0x88实测 macOS 真实账号上是 `uin & 0xff` 派生的0xa2 等),
// 所以这里桥接时必须把 provider 的 xor_key 透传给 V2KeyMaterial。
// 缺 key 时让 decoder 自己抛带诊断的错。
let provider = image_key::default_provider();
let key_material = if let Some(p) = provider.as_ref() {
// 从 wxchat_base 末段拿 wxid
let wxid = wxchat_base2.file_name()
.and_then(|s| s.to_str())
.unwrap_or_default()
.to_string();
if wxid.is_empty() {
None
} else {
match p.get_key(&wxid) {
Ok(km) => Some(km),
Err(e) => {
eprintln!("[extract] image key 提取失败 (wxid={}): {} — V2 文件将无法解码", wxid, e);
None
}
}
}
} else {
None
};
let v2_key = match key_material.as_ref() {
Some(km) => V2KeyMaterial { aes_key: Some(&km.aes_key), xor_key: km.xor_key },
None => V2KeyMaterial::default(),
};
let decoded = decoder::dispatch(&dat_bytes, v2_key)?;
// 写盘
std::fs::write(&output_path2, &decoded.data)
.with_context(|| format!("写出文件失败:{}", output_path2.display()))?;
Ok(json!({
"ok": true,
"attachment_id": attachment_id_str(&id_for_task)?,
"kind": id_for_task.kind.as_str(),
"md5": resolved.md5,
"dat_path": resolved.dat_path.display().to_string(),
"dat_size": resolved.size,
"output": output_path2.display().to_string(),
"output_size": decoded.data.len(),
"format": decoded.format,
"decoder": decoded.decoder,
}))
}).await??;
Ok(report)
}
/// 解析 `kinds` 参数到 `(AttachmentKind, lo32_local_type)` 列表。
/// 缺省None / 空)按 image 处理。
fn parse_attachment_kinds(
kinds: Option<&[String]>,
) -> Result<Vec<(crate::attachment::AttachmentKind, i64)>> {
use crate::attachment::AttachmentKind;
let raw = kinds.unwrap_or(&[]);
if raw.is_empty() {
return Ok(vec![(AttachmentKind::Image, 3)]);
}
let mut out: Vec<(AttachmentKind, i64)> = Vec::with_capacity(raw.len());
let mut seen = HashSet::<&'static str>::new();
for k in raw {
let (kind, t): (AttachmentKind, i64) = match k.to_ascii_lowercase().as_str() {
"image" | "img" => (AttachmentKind::Image, 3),
"voice" | "audio" => (AttachmentKind::Voice, 34),
"video" => (AttachmentKind::Video, 43),
"file" => (AttachmentKind::File, 49),
other => anyhow::bail!("未知附件类型:{}(支持 image/voice/video/file", other),
};
if seen.insert(kind.as_str()) {
out.push((kind, t));
}
}
Ok(out)
}
fn attachment_id_str(id: &crate::attachment::AttachmentId) -> Result<String> {
id.encode()
}
#[cfg(test)]
mod biz_tests {
use super::*;
#[test]
fn extract_cdata_normal() {
let xml = "<title><![CDATA[TencentResearch]]></title>";
assert_eq!(extract_cdata(xml, "title"), Some("TencentResearch".into()));
}
#[test]
fn extract_cdata_empty() {
let xml = "<cover><![CDATA[]]></cover>";
assert_eq!(extract_cdata(xml, "cover"), None);
}
#[test]
fn extract_cdata_url() {
let xml = "<url><![CDATA[http://mp.weixin.qq.com/s?__biz=abc&mid=123]]></url>";
let result = extract_cdata(xml, "url");
assert!(result.is_some());
let url = result.unwrap();
assert!(url.starts_with("http://mp.weixin.qq.com"));
assert!(!url.contains("CDATA"));
}
#[test]
fn extract_cdata_no_cdata_wrapper() {
let xml = "<pub_time>1700000000</pub_time>";
assert_eq!(extract_cdata(xml, "pub_time"), Some("1700000000".into()));
}
#[test]
fn parse_biz_xml_items_single_article() {
let xml = r#"<msg><appmsg><mmreader><category><item>
<title><![CDATA[Test Article Title]]></title>
<url><![CDATA[http://mp.weixin.qq.com/s?test=1]]></url>
<digest><![CDATA[Test Digest]]></digest>
<cover><![CDATA[https://example.com/cover.jpg]]></cover>
<pub_time>1700000000</pub_time>
</item></category></mmreader></appmsg></msg>"#;
let items = parse_biz_xml_items(1699999999, "gh_test123", xml);
assert_eq!(items.len(), 1);
assert_eq!(items[0].title, "Test Article Title");
assert_eq!(items[0].url, "http://mp.weixin.qq.com/s?test=1");
assert_eq!(items[0].digest, "Test Digest");
assert_eq!(items[0].pub_time, 1700000000);
assert_eq!(items[0].account_username, "gh_test123");
}
#[test]
fn parse_biz_xml_items_skips_no_url() {
let xml = r#"<msg><mmreader><category><item>
<title><![CDATA[Has Title No URL]]></title>
<url><![CDATA[]]></url>
<pub_time>1700000001</pub_time>
</item></category></mmreader></msg>"#;
let items = parse_biz_xml_items(1700000001, "gh_test", xml);
assert_eq!(items.len(), 0);
}
#[test]
fn parse_biz_xml_items_multi_article() {
let xml = r#"<msg><mmreader><category>
<item>
<title><![CDATA[Article 1]]></title>
<url><![CDATA[http://mp.weixin.qq.com/s?a=1]]></url>
<pub_time>1700000010</pub_time>
</item>
<item>
<title><![CDATA[Article 2]]></title>
<url><![CDATA[http://mp.weixin.qq.com/s?a=2]]></url>
<pub_time>1700000020</pub_time>
</item>
</category></mmreader></msg>"#;
let items = parse_biz_xml_items(1700000000, "gh_multi", xml);
assert_eq!(items.len(), 2);
assert_eq!(items[0].title, "Article 1");
assert_eq!(items[1].title, "Article 2");
}
#[test]
fn parse_biz_xml_items_pub_time_fallback() {
// When pub_time is missing, should fall back to recv_time
let xml = r#"<item>
<title><![CDATA[No PubTime]]></title>
<url><![CDATA[http://mp.weixin.qq.com/s?x=1]]></url>
</item>"#;
let items = parse_biz_xml_items(1700000099, "gh_fallback", xml);
assert_eq!(items.len(), 1);
assert_eq!(items[0].pub_time, 1700000099); // falls back to recv_time
}
}
#[cfg(test)]
mod group_nickname_tests {
use super::*;
fn varint(mut value: u64) -> Vec<u8> {
let mut out = Vec::new();
loop {
let mut byte = (value & 0x7f) as u8;
value >>= 7;
if value != 0 {
byte |= 0x80;
}
out.push(byte);
if value == 0 {
return out;
}
}
}
fn len_field(field_no: u64, bytes: &[u8]) -> Vec<u8> {
let mut out = varint((field_no << 3) | 2);
out.extend(varint(bytes.len() as u64));
out.extend(bytes);
out
}
fn string_field(field_no: u64, value: &str) -> Vec<u8> {
len_field(field_no, value.as_bytes())
}
fn member_chunk(username: &str, group_nickname: &str) -> Vec<u8> {
let mut member = Vec::new();
member.extend(string_field(1, username));
member.extend(string_field(2, group_nickname));
len_field(1, &member)
}
#[test]
fn parses_group_nickname_member_chunks() {
let mut ext_buffer = Vec::new();
ext_buffer.extend(member_chunk("wxid_alice", "Alice In Group"));
ext_buffer.extend(member_chunk("bob_123456", "Bob Card"));
let nicknames = parse_group_nickname_map(&ext_buffer, None);
assert_eq!(
nicknames.get("wxid_alice").map(String::as_str),
Some("Alice In Group")
);
assert_eq!(
nicknames.get("bob_123456").map(String::as_str),
Some("Bob Card")
);
}
#[test]
fn target_filter_anchors_member_username_choice() {
let mut member = Vec::new();
member.extend(string_field(3, "candidate_name"));
member.extend(string_field(4, "wxid_target"));
member.extend(string_field(2, "Target Card"));
let ext_buffer = len_field(1, &member);
let targets = HashSet::from(["wxid_target".to_string()]);
let nicknames = parse_group_nickname_map(&ext_buffer, Some(&targets));
assert_eq!(
nicknames.get("wxid_target").map(String::as_str),
Some("Target Card")
);
assert!(!nicknames.contains_key("candidate_name"));
}
#[test]
fn group_top_senders_keeps_duplicate_display_names_separate() {
let sender_counts = HashMap::from([
("wxid_alice".to_string(), 7),
("wxid_bob".to_string(), 3),
]);
let names = HashMap::from([
("wxid_alice".to_string(), "Alice Contact".to_string()),
("wxid_bob".to_string(), "Bob Contact".to_string()),
]);
let group_nicknames = HashMap::from([
("wxid_alice".to_string(), "同名".to_string()),
("wxid_bob".to_string(), "同名".to_string()),
]);
let top = group_top_senders(&sender_counts, &names, &group_nicknames, 10);
assert_eq!(top.len(), 2);
assert_eq!(top[0]["sender"].as_str(), Some("同名"));
assert_eq!(top[0]["count"].as_i64(), Some(7));
assert_eq!(top[1]["sender"].as_str(), Some("同名"));
assert_eq!(top[1]["count"].as_i64(), Some(3));
}
}
#[cfg(test)]
mod sns_tests {
use super::*;
fn make_post_xml(create_time: &str, desc: &str, username_tag: Option<&str>, media: usize, location: Option<&str>) -> String {
let username = username_tag.map(|u| format!("<username>{}</username>", u)).unwrap_or_default();
let media_tags = "<media><type>2</type></media>".repeat(media);
let content_object = if media > 0 {
format!("<ContentObject><mediaList>{}</mediaList></ContentObject>", media_tags)
} else {
String::new()
};
let loc = location
.map(|p| format!(r#"<location poiName="{}" longitude="0" latitude="0" />"#, p))
.unwrap_or_default();
format!(
"<TimelineObject>{}<createTime>{}</createTime><contentDesc>{}</contentDesc>{}{}</TimelineObject>",
username, create_time, desc, content_object, loc
)
}
#[test]
fn parse_uses_user_name_column_when_present() {
let xml = make_post_xml("1700000000", "hello", Some("wxid_xml"), 0, None);
let p = parse_post_xml(1, "wxid_column", &xml);
assert_eq!(p.author_username, "wxid_column");
assert_eq!(p.create_time, 1700000000);
assert_eq!(p.content, "hello");
assert_eq!(p.media.len(), 0);
assert_eq!(p.location, "");
}
#[test]
fn parse_falls_back_to_xml_username_when_column_empty() {
let xml = make_post_xml("1700000001", "world", Some("wxid_xml_only"), 0, None);
let p = parse_post_xml(2, "", &xml);
assert_eq!(p.author_username, "wxid_xml_only");
}
#[test]
fn parse_handles_missing_create_time() {
let xml = "<TimelineObject><contentDesc>x</contentDesc></TimelineObject>";
let p = parse_post_xml(3, "wxid", xml);
assert_eq!(p.create_time, 0);
assert_eq!(p.content, "x");
}
#[test]
fn parse_counts_media_and_extracts_location() {
let xml = make_post_xml("1700000002", "post", None, 3, Some("Wuxi"));
let p = parse_post_xml(4, "wxid", &xml);
assert_eq!(p.media.len(), 3);
assert_eq!(p.location, "Wuxi");
}
#[test]
fn parse_when_both_column_and_xml_username_empty_returns_empty_author() {
let xml = "<TimelineObject><createTime>1700000003</createTime><contentDesc>orphan</contentDesc></TimelineObject>";
let p = parse_post_xml(5, "", xml);
assert_eq!(p.author_username, "");
}
#[test]
fn parse_decodes_xml_entities_in_content() {
// 单 DOM 解析的副作用roxmltree 自动把 &lt; / &amp; / &quot; 等还原成原字符;
// 旧版 extract_xml_text 字符串扫描不解码,会把 "&lt;world&gt;" 原样输出。
// 新版语义对下游更正确(拿到的就是用户真实内容),把这个行为锁进测试。
let xml = "<TimelineObject><contentDesc>Hello &lt;world&gt; &amp; friends</contentDesc></TimelineObject>";
let p = parse_post_xml(6, "wxid", xml);
assert_eq!(p.content, "Hello <world> & friends");
}
#[test]
fn parse_malformed_xml_falls_back_to_string_fields_when_column_present() {
let xml = "<TimelineObject><createTime>1700000007</createTime><contentDesc>A &amp; B</contentDesc><location poiName=\"Wuxi &amp; Lake\" /><not valid xml";
let p = parse_post_xml(7, "wxid_fallback", xml);
assert_eq!(p.create_time, 1700000007);
assert_eq!(p.content, "A & B");
assert_eq!(p.author_username, "wxid_fallback");
assert!(p.media.is_empty());
assert_eq!(p.location, "Wuxi & Lake");
}
#[test]
fn parse_malformed_xml_can_still_use_xml_username_when_column_empty() {
let xml = "<TimelineObject><createTime>1700000008</createTime><contentDesc>broken</contentDesc><username>wxid_xml_only</username><not valid xml";
let p = parse_post_xml(8, "", xml);
assert_eq!(p.create_time, 1700000008);
assert_eq!(p.content, "broken");
assert_eq!(p.author_username, "wxid_xml_only");
assert!(p.media.is_empty());
}
#[test]
fn parse_without_timeline_object_falls_back_to_string_fields() {
let xml = "<SnsDataItem><createTime>1700000009</createTime><contentDesc>still here</contentDesc><username>wxid_outer</username></SnsDataItem>";
let p = parse_post_xml(9, "", xml);
assert_eq!(p.create_time, 1700000009);
assert_eq!(p.content, "still here");
assert_eq!(p.author_username, "wxid_outer");
assert!(p.media.is_empty());
}
#[test]
fn escape_like_pattern_escapes_backslash_first() {
// 反斜杠必须在 % / _ 之前转义;否则后面塞进去的 \% / \_ 会被再次双转义吃掉
assert_eq!(escape_like_pattern("a\\b"), "a\\\\b");
assert_eq!(escape_like_pattern("100%"), "100\\%");
assert_eq!(escape_like_pattern("foo_bar"), "foo\\_bar");
}
#[test]
fn escape_like_pattern_combined() {
// \%_ 三个元字符同时出现
let escaped = escape_like_pattern("a\\b%c_d");
assert_eq!(escaped, "a\\\\b\\%c\\_d");
}
#[test]
fn escape_like_pattern_no_special_chars_unchanged() {
assert_eq!(escape_like_pattern("hello world"), "hello world");
assert_eq!(escape_like_pattern("中文关键词"), "中文关键词");
assert_eq!(escape_like_pattern(""), "");
}
#[test]
fn extract_appmsg_url_unescapes_html_entities() {
let xml = concat!(
"<appmsg>",
"<type>5</type>",
"<url>https://mp.weixin.qq.com/s?__biz=MzI4&amp;mid=2247&amp;idx=1</url>",
"</appmsg>"
);
assert_eq!(
extract_appmsg_url(xml).as_deref(),
Some("https://mp.weixin.qq.com/s?__biz=MzI4&mid=2247&idx=1")
);
}
#[test]
fn extract_appmsg_url_strips_group_prefix_and_cdata() {
let xml = concat!(
"wxid_sender:\n",
"<appmsg>",
"<type>5</type>",
"<url><![CDATA[https://example.com/x?a=1&b=2]]></url>",
"</appmsg>"
);
assert_eq!(
extract_appmsg_url(xml).as_deref(),
Some("https://example.com/x?a=1&b=2")
);
}
#[test]
fn extract_appmsg_url_falls_back_to_url1() {
let xml = concat!(
"<appmsg>",
"<type>5</type>",
"<url1>https://example.com/fallback</url1>",
"</appmsg>"
);
assert_eq!(
extract_appmsg_url(xml).as_deref(),
Some("https://example.com/fallback")
);
}
#[test]
fn extract_appmsg_url_ignores_non_http_values() {
let xml = concat!(
"<appmsg>",
"<type>5</type>",
"<url>weixin://bizmsgmenu?msgmenucontent=foo</url>",
"</appmsg>"
);
assert_eq!(extract_appmsg_url(xml), None);
}
#[test]
fn extract_appmsg_url_ignores_refermsg() {
let xml = concat!(
"<appmsg>",
"<type>57</type>",
"<url>https://example.com/nested</url>",
"</appmsg>"
);
assert_eq!(extract_appmsg_url(xml), None);
}
#[test]
fn extract_favorite_url_reads_link_tag() {
let xml = concat!(
"<favitem>",
"<type>5</type>",
"<link><![CDATA[https://mp.weixin.qq.com/s?__biz=foo&mid=1]]></link>",
"</favitem>"
);
assert_eq!(
extract_favorite_url(xml).as_deref(),
Some("https://mp.weixin.qq.com/s?__biz=foo&mid=1")
);
}
#[test]
fn extract_favorite_url_ignores_non_http_values() {
let xml = concat!(
"<favitem>",
"<type>5</type>",
"<link>weixin://favorites/item/1</link>",
"</favitem>"
);
assert_eq!(extract_favorite_url(xml), None);
}
fn media_object(value: &Value) -> &serde_json::Map<String, Value> {
value.as_object().expect("media entry should be an object")
}
#[test]
fn single_image_media() {
let xml = r#"
<SnsDataItem>
<TimelineObject>
<ContentObject>
<mediaList>
<media>
<type>2</type>
<url enc_idx="1" key="placeholder-key" token="placeholder-token" md5="placeholder-md5">https://szmmsns.qpic.cn/&lt;redacted&gt;/image.jpg</url>
<thumb enc_idx="0" key="placeholder-thumb-key" token="placeholder-thumb-token">https://szmmsns.qpic.cn/&lt;redacted&gt;/thumb.jpg</thumb>
<size width="1440" height="1080" totalSize="123456" />
</media>
</mediaList>
</ContentObject>
</TimelineObject>
</SnsDataItem>
"#;
let media = parse_post_media(xml);
assert_eq!(media.len(), 1);
let item = media_object(&media[0]);
assert_eq!(item.get("type").and_then(Value::as_str), Some("2"));
assert_eq!(
item.get("url").and_then(Value::as_str),
Some("https://szmmsns.qpic.cn/<redacted>/image.jpg")
);
assert_eq!(
item.get("thumb").and_then(Value::as_str),
Some("https://szmmsns.qpic.cn/<redacted>/thumb.jpg")
);
assert_eq!(item.get("url_enc_idx").and_then(Value::as_str), Some("1"));
assert_eq!(
item.get("url_key").and_then(Value::as_str),
Some("placeholder-key")
);
assert_eq!(
item.get("url_token").and_then(Value::as_str),
Some("placeholder-token")
);
assert_eq!(
item.get("md5").and_then(Value::as_str),
Some("placeholder-md5")
);
assert_eq!(item.get("width").and_then(Value::as_i64), Some(1440));
assert_eq!(item.get("height").and_then(Value::as_i64), Some(1080));
assert_eq!(item.get("total_size").and_then(Value::as_i64), Some(123456));
}
#[test]
fn three_images_media() {
let xml = r#"
<SnsDataItem>
<TimelineObject>
<ContentObject>
<mediaList>
<media>
<type>2</type>
<sub_type>10</sub_type>
<url enc_idx="1" key="placeholder-key-1" token="placeholder-token-1">https://szmmsns.qpic.cn/&lt;redacted&gt;/image-1.jpg</url>
<thumb>https://szmmsns.qpic.cn/&lt;redacted&gt;/thumb-1.jpg</thumb>
<size width="100" height="200" totalSize="111" />
</media>
<media>
<type>2</type>
<sub_type>11</sub_type>
<url enc_idx="0" key="placeholder-key-2" token="placeholder-token-2">https://szmmsns.qpic.cn/&lt;redacted&gt;/image-2.jpg</url>
<thumb>https://szmmsns.qpic.cn/&lt;redacted&gt;/thumb-2.jpg</thumb>
<size width="300" height="400" totalSize="222" />
</media>
<media>
<type>6</type>
<url>https://szmmsns.qpic.cn/&lt;redacted&gt;/image-3.jpg</url>
<thumb enc_idx="1" key="placeholder-thumb-key-3" token="placeholder-thumb-token-3">https://szmmsns.qpic.cn/&lt;redacted&gt;/thumb-3.jpg</thumb>
<size width="500" height="600" totalSize="333" />
</media>
</mediaList>
</ContentObject>
</TimelineObject>
</SnsDataItem>
"#;
let media = parse_post_media(xml);
assert_eq!(media.len(), 3);
let first = media_object(&media[0]);
assert_eq!(first.get("sub_type").and_then(Value::as_str), Some("10"));
assert_eq!(
first.get("url_key").and_then(Value::as_str),
Some("placeholder-key-1")
);
let second = media_object(&media[1]);
assert_eq!(second.get("sub_type").and_then(Value::as_str), Some("11"));
assert_eq!(second.get("width").and_then(Value::as_i64), Some(300));
let third = media_object(&media[2]);
assert_eq!(third.get("type").and_then(Value::as_str), Some("6"));
assert_eq!(
third.get("thumb_key").and_then(Value::as_str),
Some("placeholder-thumb-key-3")
);
}
#[test]
fn video_media() {
let xml = r#"
<SnsDataItem>
<TimelineObject>
<ContentObject>
<mediaList>
<media>
<type>15</type>
<url enc_idx="1" key="placeholder-video-key" token="placeholder-video-token">https://szmmsns.qpic.cn/&lt;redacted&gt;/video.mp4</url>
<thumb>https://szmmsns.qpic.cn/&lt;redacted&gt;/video-thumb.jpg</thumb>
<size width="720" height="1280" />
<videomd5>&lt;placeholder-video-md5&gt;</videomd5>
<videoDuration>37</videoDuration>
</media>
</mediaList>
</ContentObject>
</TimelineObject>
</SnsDataItem>
"#;
let media = parse_post_media(xml);
assert_eq!(media.len(), 1);
let item = media_object(&media[0]);
assert_eq!(
item.get("video_md5").and_then(Value::as_str),
Some("<placeholder-video-md5>")
);
assert_eq!(item.get("video_duration").and_then(Value::as_i64), Some(37));
assert!(!item.contains_key("total_size"));
}
#[test]
fn text_only_post() {
let without_media_list = r#"
<SnsDataItem>
<TimelineObject>
<ContentObject>
<type>1</type>
</ContentObject>
</TimelineObject>
</SnsDataItem>
"#;
let empty_media_list = r#"
<SnsDataItem>
<TimelineObject>
<ContentObject>
<mediaList />
</ContentObject>
</TimelineObject>
</SnsDataItem>
"#;
assert!(parse_post_media(without_media_list).is_empty());
assert!(parse_post_media(empty_media_list).is_empty());
}
#[test]
fn malformed_xml() {
let xml = r#"
<SnsDataItem>
<TimelineObject>
<ContentObject>
<mediaList>
<media>
<type>2</type>
</mediaList>
</ContentObject>
</TimelineObject>
</SnsDataItem>
"#;
assert!(parse_post_media(xml).is_empty());
}
#[test]
fn size_without_total_size_omits_total_size_key() {
let xml = r#"
<SnsDataItem>
<TimelineObject>
<ContentObject>
<mediaList>
<media>
<type>2</type>
<size width="640" height="480" />
</media>
</mediaList>
</ContentObject>
</TimelineObject>
</SnsDataItem>
"#;
let media = parse_post_media(xml);
assert_eq!(media.len(), 1);
let item = media_object(&media[0]);
assert_eq!(item.get("width").and_then(Value::as_i64), Some(640));
assert_eq!(item.get("height").and_then(Value::as_i64), Some(480));
assert!(!item.contains_key("total_size"));
}
}