use anyhow::{Context, Result}; use chrono::{Local, TimeZone, Timelike}; use regex::Regex; use roxmltree::{Document, Node}; use rusqlite::Connection; use serde_json::{json, Value}; use std::collections::{HashMap, HashSet}; use std::fs; use std::path::{Path, PathBuf}; use std::sync::{Arc, OnceLock}; use super::cache::{CacheMode, DbCache}; use super::meta::{derive_status, discover_unknown_shards, Meta}; /// 静态编译的 Msg 表名正则,避免在热路径中重复编译 fn msg_table_re() -> &'static Regex { static RE: OnceLock = OnceLock::new(); RE.get_or_init(|| Regex::new(r"^Msg_[0-9a-f]{32}$").unwrap()) } /// 判定会话类型。返回值固定为 `group` / `official_account` / `folded` / `private` 之一。 /// /// 判据次序: /// 1. `@chatroom` / 折叠入口特殊 username /// 2. `contact.verify_flag` 非 0 —— 覆盖所有被微信官方打了认证标的账号, /// 包括 username 为 `wxid_*` 但实为公众号的情况(如"人物"), /// 以及品牌服务号 `cmb4008205555`、系统号 `qqsafe` / `mphelper` 等 /// 3. username 前缀兜底(`gh_*` / `biz_*` / `@*` 等)—— 在 contact 表未加载或没记录时 /// 仍能给出正确结果 pub fn chat_type_of(username: &str, names: &Names) -> &'static str { if username.contains("@chatroom") { return "group"; } if username == "brandsessionholder" || username == "@placeholder_foldgroup" { return "folded"; } if names.is_verified(username) { return "official_account"; } if username.starts_with("gh_") || username.starts_with("biz_") { return "official_account"; } // `@` 开头的剩余 username(如 `@opencustomerservicemsg`)是微信内部系统账号, // 通常不落在 contact 表里,verify_flag 兜不住,按前缀兜底。 if username.starts_with('@') { return "official_account"; } "private" } /// 联系人名称缓存 #[derive(Clone)] pub struct Names { /// username -> display_name pub map: HashMap, /// md5(username) -> username(用于从 Msg_ 表名反推联系人) pub md5_to_uname: HashMap, /// 消息 DB 的相对路径列表(message/message_N.db) pub msg_db_keys: Vec, /// 公众号推送 DB 的相对路径列表(message/biz_message_N.db) pub biz_msg_db_keys: Vec, /// username -> contact.verify_flag(0=真人,非 0 通常为公众号/服务号/认证账号) pub verify_flags: HashMap, } #[derive(Debug, Clone)] struct MessageShard { rel_key: String, path: std::path::PathBuf, table: String, max_ts: i64, cache_mode: CacheMode, } impl Names { pub fn display(&self, username: &str) -> String { self.map .get(username) .cloned() .unwrap_or_else(|| username.to_string()) } /// 是否被微信官方标了认证/服务号 flag。未在 contact 表中的 username 返回 false。 pub fn is_verified(&self, username: &str) -> bool { self.verify_flags.get(username).copied().unwrap_or(0) != 0 } } fn current_unknown_shards(db: &DbCache, names: &Names) -> Vec { discover_unknown_shards(db.db_dir(), &names.msg_db_keys) } fn meta_for_shards( scanned: usize, shards: &[MessageShard], shard_hits: usize, unknown_shards: Vec, session_last_timestamp: Option, windowed: bool, with_meta: bool, debug_source: bool, ) -> Meta { let latest = shards.first(); let chat_latest_timestamp = latest.map(|s| s.max_ts); Meta { chat_latest_timestamp, chat_latest_db: latest.map(|s| s.rel_key.clone()), session_last_timestamp, shards_scanned: scanned, shards_hit: shard_hits, unknown_shards: unknown_shards.clone(), status: derive_status( chat_latest_timestamp, session_last_timestamp, &unknown_shards, windowed, ), per_shard_latest: if with_meta || debug_source { Some( shards .iter() .map(|s| (s.rel_key.clone(), s.max_ts)) .collect(), ) } else { None }, cache_mode_per_shard: if with_meta || debug_source { Some( shards .iter() .map(|s| (s.rel_key.clone(), s.cache_mode.as_str().to_string())) .collect(), ) } else { None }, shard_paths: if debug_source { Some( shards .iter() .map(|s| (s.rel_key.clone(), s.path.to_string_lossy().into_owned())) .collect(), ) } else { None }, } } fn meta_for_global_query( scanned: usize, hit: usize, unknown_shards: Vec, windowed: bool, with_meta: bool, debug_source: bool, cache_modes: Option>, shard_paths: Option>, ) -> Meta { Meta { chat_latest_timestamp: None, chat_latest_db: None, session_last_timestamp: None, shards_scanned: scanned, shards_hit: hit, unknown_shards: unknown_shards.clone(), status: derive_status(None, None, &unknown_shards, windowed), per_shard_latest: if with_meta || debug_source { Some(HashMap::new()) } else { None }, cache_mode_per_shard: if with_meta || debug_source { cache_modes } else { None }, shard_paths: if debug_source { shard_paths } else { None }, } } async fn session_last_timestamp(db: &DbCache, username: &str) -> Option { let path = match db.get("session/session.db").await { Ok(Some(path)) => path, Ok(None) => return None, Err(e) => { eprintln!( "[freshness] skip session_last_timestamp {}: {}", username, e ); return None; } }; let username = username.to_string(); let username_for_query = username.clone(); match tokio::task::spawn_blocking(move || -> Result> { let conn = Connection::open(&path)?; let ts = conn .query_row( "SELECT last_timestamp FROM SessionTable WHERE username = ?", [&username_for_query], |row| row.get::<_, i64>(0), ) .ok(); Ok(ts) }) .await { Ok(Ok(ts)) => ts, Ok(Err(e)) => { eprintln!( "[freshness] skip session_last_timestamp {}: {}", username, e ); None } Err(e) => { eprintln!( "[freshness] task error session_last_timestamp {}: {}", username, e ); None } } } /// 加载联系人缓存(从 contact/contact.db) pub async fn load_names(db: &DbCache) -> Result { let path = db.get("contact/contact.db").await?; let mut map = HashMap::new(); let mut verify_flags: HashMap = HashMap::new(); if let Some(p) = path { let p2 = p.clone(); let rows: Vec<(String, String, String, i64)> = tokio::task::spawn_blocking(move || { let conn = Connection::open(&p2).context("打开 contact.db 失败")?; let mut stmt = conn.prepare("SELECT username, nick_name, remark, verify_flag FROM contact")?; let rows = stmt .query_map([], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, String>(1).unwrap_or_default(), row.get::<_, String>(2).unwrap_or_default(), row.get::<_, i64>(3).unwrap_or(0), )) })? .collect::>>()?; Ok::<_, anyhow::Error>(rows) }) .await??; for (uname, nick, remark, vf) in rows { let display = if !remark.is_empty() { remark } else if !nick.is_empty() { nick } else { uname.clone() }; verify_flags.insert(uname.clone(), vf); map.insert(uname, display); } } let md5_to_uname: HashMap = map .keys() .map(|u| (format!("{:x}", md5::compute(u.as_bytes())), u.clone())) .collect(); Ok(Names { map, md5_to_uname, msg_db_keys: Vec::new(), biz_msg_db_keys: Vec::new(), verify_flags, }) } /// 查询最近会话列表 pub async fn q_sessions( db: &DbCache, names: &Names, limit: usize, with_meta: bool, debug_source: bool, ) -> Result { let path = db .get("session/session.db") .await? .context("无法解密 session.db")?; let path2 = path.clone(); let limit_val = limit; let rows: Vec<(String, i64, Vec, i64, i64, String, String)> = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path2)?; let mut stmt = conn.prepare( "SELECT username, unread_count, summary, last_timestamp, last_msg_type, last_msg_sender, last_sender_display_name FROM SessionTable WHERE last_timestamp > 0 ORDER BY last_timestamp DESC LIMIT ?", )?; let rows = stmt .query_map([limit_val as i64], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, i64>(1).unwrap_or(0), get_content_bytes(row, 2), row.get::<_, i64>(3).unwrap_or(0), row.get::<_, i64>(4).unwrap_or(0), row.get::<_, String>(5).unwrap_or_default(), row.get::<_, String>(6).unwrap_or_default(), )) })? .collect::>>()?; Ok::<_, anyhow::Error>(rows) }) .await??; let mut results = Vec::new(); let mut group_nickname_cache: HashMap> = HashMap::new(); for (username, unread, summary_bytes, ts, msg_type, sender, sender_name) in rows { let display = names.display(&username); let chat_type = chat_type_of(&username, names); let is_group = chat_type == "group"; // 尝试 zstd 解压 summary let summary = decompress_or_str(&summary_bytes); let summary = strip_group_prefix(&summary); let sender_display = if is_group && !sender.is_empty() { if !group_nickname_cache.contains_key(&username) { let nicknames = load_group_nicknames(db, &username) .await .unwrap_or_default(); group_nickname_cache.insert(username.clone(), nicknames); } let empty = HashMap::new(); let group_nicknames = group_nickname_cache.get(&username).unwrap_or(&empty); sender_display(&sender, &sender_name, &names.map, group_nicknames) } else { String::new() }; results.push(json!({ "chat": display, "username": username, "is_group": is_group, "chat_type": chat_type, "unread": unread, "last_msg_type": fmt_type(msg_type), "last_sender": sender_display, "summary": summary, "timestamp": ts, "time": fmt_time(ts, "%m-%d %H:%M"), })); } let latest_ts = results .first() .and_then(|v| v.get("timestamp")) .and_then(|v| v.as_i64()); let unknown_shards = current_unknown_shards(db, names); let meta = Meta { chat_latest_timestamp: latest_ts, chat_latest_db: latest_ts.map(|_| "session/session.db".to_string()), session_last_timestamp: None, shards_scanned: 0, shards_hit: 0, unknown_shards: unknown_shards.clone(), status: derive_status(latest_ts, None, &unknown_shards, false), per_shard_latest: if with_meta || debug_source { Some(HashMap::new()) } else { None }, cache_mode_per_shard: None, shard_paths: None, }; Ok(json!({ "sessions": results, "meta": meta })) } /// 查询聊天记录 pub async fn q_history( db: &DbCache, names: &Names, chat: &str, limit: usize, offset: usize, since: Option, until: Option, msg_type: Option, with_meta: bool, debug_source: bool, ) -> Result { let username = resolve_username(chat, names).with_context(|| format!("找不到联系人: {}", chat))?; let display = names.display(&username); let chat_type = chat_type_of(&username, names); let is_group = chat_type == "group"; let (shards, scanned) = find_msg_shards(db, names, &username).await?; if shards.is_empty() { anyhow::bail!("找不到 {} 的消息记录", display); } let mut all_msgs: Vec = Vec::new(); let mut shard_hits = 0usize; let group_nicknames = if is_group { load_group_nicknames(db, &username) .await .unwrap_or_default() } else { HashMap::new() }; for shard in &shards { let path = shard.path.clone(); let tname = shard.table.clone(); let uname = username.clone(); let is_group2 = is_group; let names_map = names.map.clone(); let group_nicknames2 = group_nicknames.clone(); let since2 = since; let until2 = until; let limit2 = limit; let offset2 = offset; let msgs: Vec = tokio::task::spawn_blocking(move || { // per-DB 软上限:offset + limit 已足够全局分页,避免大群全量加载 let per_db_cap = offset2 + limit2; query_messages( &path, &tname, &uname, is_group2, &names_map, &group_nicknames2, since2, until2, msg_type, per_db_cap, 0, ) }) .await??; if !msgs.is_empty() { shard_hits += 1; } all_msgs.extend(msgs); } all_msgs.sort_by_key(|m| std::cmp::Reverse(m["timestamp"].as_i64().unwrap_or(0))); let paged: Vec = all_msgs.into_iter().skip(offset).take(limit).collect(); let mut paged = paged; paged.sort_by_key(|m| m["timestamp"].as_i64().unwrap_or(0)); let windowed = offset > 0 || since.is_some() || until.is_some() || msg_type.is_some(); let unknown_shards = current_unknown_shards(db, names); let session_ts = session_last_timestamp(db, &username).await; let meta = meta_for_shards( scanned, &shards, shard_hits, unknown_shards, session_ts, windowed, with_meta, debug_source, ); Ok(json!({ "chat": display, "username": username, "is_group": is_group, "chat_type": chat_type, "count": paged.len(), "messages": paged, "meta": meta, })) } /// 搜索消息 pub async fn q_search( db: &DbCache, names: &Names, keyword: &str, chats: Option>, limit: usize, since: Option, until: Option, msg_type: Option, with_meta: bool, debug_source: bool, ) -> Result { let mut targets: Vec<(String, String, String, String, String)> = Vec::new(); // (rel_key, path, table, display, uname) let mut scanned_rel_keys: HashSet = HashSet::new(); let mut cache_modes: HashMap = HashMap::new(); let mut shard_paths: HashMap = HashMap::new(); if let Some(chat_names) = chats { for chat_name in &chat_names { if let Some(uname) = resolve_username(chat_name, names) { let (shards, _) = find_msg_shards(db, names, &uname).await?; for shard in shards { scanned_rel_keys.insert(shard.rel_key.clone()); cache_modes .insert(shard.rel_key.clone(), shard.cache_mode.as_str().to_string()); shard_paths.insert( shard.rel_key.clone(), shard.path.to_string_lossy().into_owned(), ); targets.push(( shard.rel_key, shard.path.to_string_lossy().into_owned(), shard.table, names.display(&uname), uname.clone(), )); } } } } else { // 全局搜索:遍历所有消息 DB for rel_key in &names.msg_db_keys { let resolved = match db.get_with_mode(rel_key).await? { Some(r) => r, None => continue, }; scanned_rel_keys.insert(rel_key.clone()); cache_modes.insert(rel_key.clone(), resolved.mode.as_str().to_string()); shard_paths.insert( rel_key.clone(), resolved.path.to_string_lossy().into_owned(), ); let path2 = resolved.path.clone(); let md5_lookup = names.md5_to_uname.clone(); let names_map = names.map.clone(); let rel_key2 = rel_key.clone(); let table_targets: Vec<(String, String, String, String, String)> = match tokio::task::spawn_blocking(move || { let conn = Connection::open(&path2)?; let mut stmt = conn.prepare( "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'", )?; let table_names: Vec = stmt .query_map([], |row| row.get(0))? .filter_map(|r| r.ok()) .collect(); let re = msg_table_re(); let mut result = Vec::new(); for tname in table_names { if !re.is_match(&tname) { continue; } let hash = &tname[4..]; let uname = md5_lookup.get(hash).cloned().unwrap_or_default(); let display = if uname.is_empty() { String::new() } else { names_map .get(&uname) .cloned() .unwrap_or_else(|| uname.clone()) }; result.push(( rel_key2.clone(), path2.to_string_lossy().into_owned(), tname, display, uname, )); } Ok::<_, anyhow::Error>(result) }) .await { Ok(Ok(v)) => v, Ok(Err(e)) => { eprintln!("[search] skip DB {}: {}", rel_key, e); continue; } Err(e) => { eprintln!("[search] task error {}: {}", rel_key, e); continue; } }; targets.extend(table_targets); } } // 按 db_path 分组 let mut by_path: HashMap> = HashMap::new(); let mut path_to_rel_key: HashMap = HashMap::new(); for (rel_key, p, t, d, u) in targets { path_to_rel_key.insert(p.clone(), rel_key); by_path.entry(p).or_default().push((t, d, u)); } let mut group_usernames = HashSet::new(); for table_list in by_path.values() { for (_, _, uname) in table_list { if uname.contains("@chatroom") { group_usernames.insert(uname.clone()); } } } let group_nicknames_by_chat = load_group_nickname_maps(db, group_usernames) .await .unwrap_or_default(); let group_nicknames_by_chat = Arc::new(group_nicknames_by_chat); // 多个 message_*.db 之间没有数据依赖,并发解密 + 查询。每个 DB 内部仍按 // table 串行(共享同一 sqlite Connection 不能跨线程移动)。原版本是 N 个 DB // 串行 await,活跃账号上 N 个分片要轮 N 次磁盘 IO;现在 JoinSet 把它们一次 // 全部 dispatch 到 blocking pool,整体 latency 退化为单 DB 慢路径。 let kw = keyword.to_string(); let mut join_set: tokio::task::JoinSet)>> = tokio::task::JoinSet::new(); for (db_path, table_list) in by_path { let kw2 = kw.clone(); let since2 = since; let until2 = until; let limit2 = limit * 3; let names_map2 = names.map.clone(); let group_nicknames_by_chat2 = Arc::clone(&group_nicknames_by_chat); let db_path_for_log = db_path.clone(); join_set.spawn_blocking(move || { let conn = Connection::open(&db_path)?; let mut all = Vec::new(); let empty_group_nicknames = HashMap::new(); for (tname, display, uname) in &table_list { let is_group = uname.contains("@chatroom"); let group_nicknames = group_nicknames_by_chat2 .get(uname) .unwrap_or(&empty_group_nicknames); match search_in_table( &conn, tname, &uname, is_group, &names_map2, group_nicknames, &kw2, since2, until2, msg_type, limit2, ) { Ok(rows) => { for mut row in rows { if row .get("chat") .map(|v| v.as_str().unwrap_or("")) .unwrap_or("") .is_empty() { if let Some(obj) = row.as_object_mut() { obj.insert( "chat".into(), serde_json::Value::String(if display.is_empty() { tname.clone() } else { display.clone() }), ); } } all.push(row); } } Err(e) => eprintln!( "[search] skip table {} (db={}): {}", tname, db_path_for_log, e ), } } Ok((db_path_for_log, all)) }); } let mut results: Vec = Vec::new(); let mut hit_rel_keys: HashSet = HashSet::new(); while let Some(joined) = join_set.join_next().await { match joined { Ok(Ok((db_path, rows))) => { if !rows.is_empty() { if let Some(rel_key) = path_to_rel_key.get(&db_path) { hit_rel_keys.insert(rel_key.clone()); } } results.extend(rows) } Ok(Err(e)) => eprintln!("[search] skip DB: {}", e), Err(e) => eprintln!("[search] task error: {}", e), } } results.sort_by_key(|r| std::cmp::Reverse(r["timestamp"].as_i64().unwrap_or(0))); let paged: Vec = results.into_iter().take(limit).collect(); let unknown_shards = current_unknown_shards(db, names); // 全局搜索 / keyword 过滤天然是窗口化结果,没有稳定的 chat-level latest baseline, // 不参与 stale 推导;这里只保留 unknown_shards 这类 daemon 全局健康信号。 let meta = meta_for_global_query( scanned_rel_keys.len(), hit_rel_keys.len(), unknown_shards, true, with_meta, debug_source, Some(cache_modes), Some(shard_paths), ); Ok(json!({ "keyword": keyword, "count": paged.len(), "results": paged, "meta": meta })) } /// 查询联系人 /// /// 只返回真实联系人(`chat_type_of == "private"`)。`names.map` 是从 `contact` 表 /// 全量加载的,里面同时包含群(`@chatroom`)、公众号(`gh_*` / `biz_*` / verify_flag != 0)、 /// 折叠入口(`brandsessionholder` / `@placeholder_foldgroup`)以及微信内部 `@xxx` 系统账号。 /// 这些都不应该出现在 `wx contacts` 输出里,统一走 `chat_type_of` 这条同样的真相判定。 pub async fn q_contacts(names: &Names, query: Option<&str>, limit: usize) -> Result { let mut contacts: Vec = names .map .iter() .filter(|(u, _)| chat_type_of(u, names) == "private") .map(|(u, d)| json!({ "username": u, "display": d })) .collect(); if let Some(q) = query { let low = q.to_lowercase(); contacts.retain(|c| { c["display"] .as_str() .map(|s| s.to_lowercase().contains(&low)) .unwrap_or(false) || c["username"] .as_str() .map(|s| s.to_lowercase().contains(&low)) .unwrap_or(false) }); } contacts.sort_by(|a, b| { a["display"] .as_str() .unwrap_or("") .cmp(b["display"].as_str().unwrap_or("")) }); let total = contacts.len(); contacts.truncate(limit); Ok(json!({ "contacts": contacts, "total": total })) } // ─── 内部辅助函数 ──────────────────────────────────────────────────────────── fn resolve_username(chat_name: &str, names: &Names) -> Option { if names.map.contains_key(chat_name) || chat_name.contains("@chatroom") || chat_name.starts_with("wxid_") { return Some(chat_name.to_string()); } let low = chat_name.to_lowercase(); // 精确匹配显示名:排序后取第一个,保证确定性 let mut exact: Vec<&String> = names .map .iter() .filter(|(_, display)| display.to_lowercase() == low) .map(|(uname, _)| uname) .collect(); exact.sort(); if let Some(u) = exact.into_iter().next() { return Some(u.clone()); } // 模糊匹配:取 display name 最短的(最精确),相同长度取字典序最小 let mut candidates: Vec<(&String, &String)> = names .map .iter() .filter(|(_, display)| display.to_lowercase().contains(&low)) .collect(); candidates.sort_by_key(|(uname, display)| (display.len(), uname.as_str())); candidates .into_iter() .next() .map(|(uname, _)| uname.clone()) } async fn find_msg_tables( db: &DbCache, names: &Names, username: &str, ) -> Result> { let (shards, _) = find_msg_shards(db, names, username).await?; Ok(shards.into_iter().map(|s| (s.path, s.table)).collect()) } async fn find_msg_shards( db: &DbCache, names: &Names, username: &str, ) -> Result<(Vec, usize)> { let table_name = format!("Msg_{:x}", md5::compute(username.as_bytes())); if !msg_table_re().is_match(&table_name) { return Ok((Vec::new(), 0)); } let mut scanned = 0usize; let mut results: Vec = Vec::new(); for rel_key in &names.msg_db_keys { let resolved = match db.get_with_mode(rel_key).await? { Some(r) => r, None => continue, }; scanned += 1; let tname = table_name.clone(); let path2 = resolved.path.clone(); let max_ts: Option = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path2)?; let table_exists: Option = conn .query_row( "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", [&tname], |row| row.get(0), ) .ok() .flatten(); if table_exists.is_none() { return Ok::<_, anyhow::Error>(None); } let ts: Option = conn .query_row( &format!("SELECT MAX(create_time) FROM [{}]", tname), [], |row| row.get(0), ) .ok() .flatten(); Ok(ts) }) .await??; if let Some(ts) = max_ts { results.push(MessageShard { rel_key: rel_key.clone(), path: resolved.path.clone(), table: table_name.clone(), max_ts: ts, cache_mode: resolved.mode, }); } } // 按最大时间戳降序排列(最新的优先) results.sort_by_key(|s| std::cmp::Reverse(s.max_ts)); Ok((results, scanned)) } fn query_messages( db_path: &std::path::Path, table: &str, chat_username: &str, is_group: bool, names_map: &HashMap, group_nicknames: &HashMap, since: Option, until: Option, msg_type: Option, limit: usize, offset: usize, ) -> Result> { let conn = Connection::open(db_path)?; let id2u = load_id2u(&conn); let mut clauses: Vec = Vec::new(); let mut params: Vec> = Vec::new(); if let Some(s) = since { clauses.push("create_time >= ?".into()); params.push(Box::new(s)); } if let Some(u) = until { clauses.push("create_time <= ?".into()); params.push(Box::new(u)); } if let Some(t) = msg_type { push_msg_type_filter(&mut clauses, &mut params, t); } let where_clause = if clauses.is_empty() { String::new() } else { format!("WHERE {}", clauses.join(" AND ")) }; let sql = format!( "SELECT local_id, local_type, create_time, real_sender_id, message_content, WCDB_CT_message_content FROM [{}] {} ORDER BY create_time DESC LIMIT ? OFFSET ?", table, where_clause ); params.push(Box::new(limit as i64)); params.push(Box::new(offset as i64)); let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect(); let mut stmt = conn.prepare(&sql)?; let rows = stmt .query_map(params_ref.as_slice(), |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?, row.get::<_, i64>(3)?, get_content_bytes(row, 4), row.get::<_, i64>(5).unwrap_or(0), )) })? .filter_map(|r| r.ok()) .collect::>(); let mut result = Vec::new(); for (local_id, local_type, ts, real_sender_id, content_bytes, ct) in rows { let content = decompress_message(&content_bytes, ct); let sender_username = sender_username(real_sender_id, &content, is_group, chat_username, &id2u); let sender = sender_label( real_sender_id, &content, is_group, chat_username, &id2u, names_map, group_nicknames, ); let text = fmt_content(local_id, local_type, &content, is_group); let url = appmsg_url_for_message(local_type, &content); let mut msg = json!({ "timestamp": ts, "time": fmt_time(ts, "%Y-%m-%d %H:%M"), "sender": sender, "content": text, "type": fmt_type(local_type), "local_id": local_id, }); add_sender_identity(&mut msg, is_group, &sender_username, names_map, group_nicknames); if let Some(u) = url { msg["url"] = serde_json::Value::String(u); } result.push(msg); } Ok(result) } fn search_in_table( conn: &Connection, table: &str, chat_username: &str, is_group: bool, names_map: &HashMap, group_nicknames: &HashMap, keyword: &str, since: Option, until: Option, msg_type: Option, limit: usize, ) -> Result> { let id2u = load_id2u(conn); // 转义 LIKE 通配符,使用 '\' 作为 ESCAPE 字符 let escaped_kw = keyword .replace('\\', "\\\\") .replace('%', "\\%") .replace('_', "\\_"); let search_decoded_content = msg_type == Some(49); let keyword_lower = keyword.to_lowercase(); let mut clauses: Vec = Vec::new(); let mut params: Vec> = Vec::new(); if !search_decoded_content { clauses.push("message_content LIKE ? ESCAPE '\\'".to_string()); params.push(Box::new(format!("%{}%", escaped_kw))); } if let Some(s) = since { clauses.push("create_time >= ?".into()); params.push(Box::new(s)); } if let Some(u) = until { clauses.push("create_time <= ?".into()); params.push(Box::new(u)); } if let Some(t) = msg_type { push_msg_type_filter(&mut clauses, &mut params, t); } let where_clause = if clauses.is_empty() { String::new() } else { format!("WHERE {}", clauses.join(" AND ")) }; let limit_clause = if search_decoded_content { "" } else { " LIMIT ?" }; let sql = format!( "SELECT local_id, local_type, create_time, real_sender_id, message_content, WCDB_CT_message_content FROM [{}] {} ORDER BY create_time DESC{}", table, where_clause, limit_clause ); if !search_decoded_content { params.push(Box::new(limit as i64)); } let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect(); let mut stmt = conn.prepare(&sql)?; let rows = stmt .query_map(params_ref.as_slice(), |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?, row.get::<_, i64>(3)?, get_content_bytes(row, 4), row.get::<_, i64>(5).unwrap_or(0), )) })? .filter_map(|r| r.ok()) .collect::>(); let mut result = Vec::new(); for (local_id, local_type, ts, real_sender_id, content_bytes, ct) in rows { let content = decompress_message(&content_bytes, ct); let sender_username = sender_username(real_sender_id, &content, is_group, chat_username, &id2u); let sender = sender_label( real_sender_id, &content, is_group, chat_username, &id2u, names_map, group_nicknames, ); let text = fmt_content(local_id, local_type, &content, is_group); if search_decoded_content && !matches_search_text(&content, &text, keyword, &keyword_lower) { continue; } let url = appmsg_url_for_message(local_type, &content); let mut msg = json!({ "timestamp": ts, "time": fmt_time(ts, "%Y-%m-%d %H:%M"), "chat": "", "sender": sender, "content": text, "type": fmt_type(local_type), }); add_sender_identity(&mut msg, is_group, &sender_username, names_map, group_nicknames); if let Some(u) = url { msg["url"] = serde_json::Value::String(u); } result.push(msg); if search_decoded_content && result.len() >= limit { break; } } Ok(result) } fn push_msg_type_filter( clauses: &mut Vec, params: &mut Vec>, msg_type: i64, ) { clauses.push("(local_type & 4294967295) = ?".into()); params.push(Box::new(msg_type)); } fn matches_search_text(raw: &str, formatted: &str, keyword: &str, keyword_lower: &str) -> bool { contains_search_text(raw, keyword, keyword_lower) || contains_search_text(formatted, keyword, keyword_lower) } fn contains_search_text(haystack: &str, keyword: &str, keyword_lower: &str) -> bool { haystack.contains(keyword) || (!keyword_lower.is_empty() && haystack.to_lowercase().contains(keyword_lower)) } fn load_id2u(conn: &Connection) -> HashMap { let mut map = HashMap::new(); if let Ok(mut stmt) = conn.prepare("SELECT rowid, user_name FROM Name2Id") { let _ = stmt .query_map([], |row| { Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)) }) .map(|rows| { for r in rows.flatten() { map.insert(r.0, r.1); } }); } map } async fn load_group_nicknames( db: &DbCache, chat_username: &str, ) -> Result> { if !chat_username.contains("@chatroom") { return Ok(HashMap::new()); } let Some(contact_p) = db.get("contact/contact.db").await? else { return Ok(HashMap::new()); }; let chat = chat_username.to_string(); tokio::task::spawn_blocking(move || { let conn = Connection::open(&contact_p)?; Ok::<_, anyhow::Error>(load_group_nickname_map_from_conn(&conn, &chat, None)) }) .await? } async fn load_group_nickname_maps( db: &DbCache, chat_usernames: HashSet, ) -> Result>> { if chat_usernames.is_empty() { return Ok(HashMap::new()); } let Some(contact_p) = db.get("contact/contact.db").await? else { return Ok(HashMap::new()); }; tokio::task::spawn_blocking(move || { let conn = Connection::open(&contact_p)?; let mut out = HashMap::new(); for chat in chat_usernames { let nicknames = load_group_nickname_map_from_conn(&conn, &chat, None); if !nicknames.is_empty() { out.insert(chat, nicknames); } } Ok::<_, anyhow::Error>(out) }) .await? } fn load_group_nickname_map_from_conn( conn: &Connection, chat_username: &str, targets: Option<&HashSet>, ) -> HashMap { if !chat_username.contains("@chatroom") { return HashMap::new(); } let ext = load_group_ext_buffer(conn, chat_username); let owned_targets = if targets.is_none() { load_group_member_username_set(conn, chat_username) } else { None }; let targets = targets.or(owned_targets.as_ref()); ext.as_deref() .map(|buf| parse_group_nickname_map(buf, targets)) .unwrap_or_default() } fn load_group_ext_buffer(conn: &Connection, chat_username: &str) -> Option> { [ "SELECT ext_buffer FROM chat_room WHERE username = ? LIMIT 1", "SELECT ext_buffer FROM chat_room WHERE chat_room_name = ? LIMIT 1", "SELECT ext_buffer FROM chat_room WHERE name = ? LIMIT 1", ] .iter() .find_map(|sql| { conn.query_row(sql, [chat_username], |row| row.get::<_, Option>>(0)) .ok() .flatten() }) } fn load_group_member_username_set( conn: &Connection, chat_username: &str, ) -> Option> { let room_id: i64 = [ "SELECT id FROM chat_room WHERE username = ?", "SELECT id FROM chat_room WHERE chat_room_name = ?", "SELECT id FROM chat_room WHERE name = ?", ] .iter() .find_map(|sql| { conn.query_row(sql, [chat_username], |row| row.get::<_, i64>(0)) .ok() }) .unwrap_or(0); if room_id == 0 { return None; } let mut stmt = conn .prepare( "SELECT c.username FROM chatroom_member cm LEFT JOIN contact c ON c.id = cm.member_id WHERE cm.room_id = ?", ) .ok()?; let usernames: HashSet = stmt .query_map([room_id], |row| row.get::<_, String>(0)) .ok()? .filter_map(|r| r.ok()) .filter(|uid| !uid.is_empty()) .collect(); if usernames.is_empty() { None } else { Some(usernames) } } fn decode_proto_varint(raw: &[u8], offset: usize) -> Option<(u64, usize)> { let mut value = 0u64; let mut shift = 0u32; let mut pos = offset; while pos < raw.len() { let byte = raw[pos]; pos += 1; value |= u64::from(byte & 0x7f) << shift; if byte & 0x80 == 0 { return Some((value, pos)); } shift += 7; if shift > 63 { return None; } } None } fn proto_len_fields<'a>(raw: &'a [u8]) -> Vec<(u64, &'a [u8])> { let mut fields = Vec::new(); let mut idx = 0usize; while idx < raw.len() { let Some((tag, next)) = decode_proto_varint(raw, idx) else { break; }; if next <= idx { break; } idx = next; let field_no = tag >> 3; let wire_type = tag & 0x07; match wire_type { 0 => { let Some((_, next)) = decode_proto_varint(raw, idx) else { break; }; if next <= idx { break; } idx = next; } 1 => { let Some(next) = idx.checked_add(8) else { break; }; if next > raw.len() { break; } idx = next; } 2 => { let Some((size, next)) = decode_proto_varint(raw, idx) else { break; }; if next <= idx { break; } idx = next; let Ok(size) = usize::try_from(size) else { break; }; let Some(end) = idx.checked_add(size) else { break; }; if end > raw.len() { break; } fields.push((field_no, &raw[idx..end])); idx = end; } 5 => { let Some(next) = idx.checked_add(4) else { break; }; if next > raw.len() { break; } idx = next; } _ => break, } } fields } fn proto_string_fields(raw: &[u8]) -> Vec<(u64, String)> { proto_len_fields(raw) .into_iter() .filter_map(|(field_no, value)| { if value.is_empty() || value.len() > 256 { return None; } let text = std::str::from_utf8(value).ok()?.trim().to_string(); if text.is_empty() || text.chars().any(char::is_control) { return None; } Some((field_no, text)) }) .collect() } fn is_strong_username_hint(value: &str) -> bool { value.starts_with("wxid_") || value.ends_with("@chatroom") || value.starts_with("gh_") || value.contains('@') } fn looks_like_username(value: &str) -> bool { let value = value.trim(); if value.is_empty() { return false; } if is_strong_username_hint(value) { return true; } if value.len() < 6 || value.len() > 32 || value.chars().any(char::is_whitespace) { return false; } let mut chars = value.chars(); let Some(first) = chars.next() else { return false; }; first.is_ascii_alphabetic() && chars.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-') } fn pick_member_username( strings: &[(u64, String)], targets: Option<&HashSet>, ) -> Option { if let Some(targets) = targets { return strings .iter() .find(|(_, value)| targets.contains(value)) .map(|(_, value)| value.clone()); } for field_no in [1u64, 4u64] { if let Some((_, value)) = strings .iter() .find(|(f, value)| *f == field_no && looks_like_username(value)) { return Some(value.clone()); } } strings .iter() .find(|(_, value)| is_strong_username_hint(value)) .or_else(|| strings.iter().find(|(_, value)| looks_like_username(value))) .map(|(_, value)| value.clone()) } fn pick_group_nickname(strings: &[(u64, String)], username: &str) -> Option { let mut best_score = i64::MIN; let mut best = String::new(); for (idx, (field_no, value)) in strings.iter().enumerate() { // In current WeChat 4.x ext_buffer member chunks, field 2 is the group // card/nickname. Field 4 is often another username-like value such as an // inviter/owner and must not be promoted to a nickname. if *field_no != 2 { continue; } let value = value.trim(); if value.is_empty() || value == username || is_strong_username_hint(value) || value.contains('\n') || value.contains('\r') || value.len() > 64 { continue; } let mut score = 0i64; if !looks_like_username(value) { score += 20; } score += (32usize.saturating_sub(value.len())) as i64; score = score * 1000 - idx as i64; if score > best_score { best_score = score; best = value.to_string(); } } if best.is_empty() { None } else { Some(best) } } fn parse_group_nickname_map( ext_buffer: &[u8], targets: Option<&HashSet>, ) -> HashMap { let mut out = HashMap::new(); if ext_buffer.is_empty() { return out; } for (_, chunk) in proto_len_fields(ext_buffer) { let strings = proto_string_fields(chunk); if strings.is_empty() { continue; } let Some(username) = pick_member_username(&strings, targets) else { continue; }; if out.contains_key(&username) { continue; } if let Some(nickname) = pick_group_nickname(&strings, &username) { out.insert(username, nickname); } } out } fn contact_display( uid: &str, nick: &str, remark: &str, names_map: &HashMap, ) -> String { if !remark.is_empty() { remark.to_string() } else if !nick.is_empty() { nick.to_string() } else { names_map .get(uid) .cloned() .unwrap_or_else(|| uid.to_string()) } } fn sender_display( username: &str, fallback_sender_name: &str, names: &HashMap, group_nicknames: &HashMap, ) -> String { if username.is_empty() { return String::new(); } group_nicknames .get(username) .filter(|s| !s.is_empty()) .cloned() .or_else(|| names.get(username).cloned()) .or_else(|| { if fallback_sender_name.is_empty() { None } else { Some(fallback_sender_name.to_string()) } }) .unwrap_or_else(|| username.to_string()) } fn group_top_senders( sender_counts: &HashMap, names: &HashMap, group_nicknames: &HashMap, limit: usize, ) -> Vec { let mut top_senders: Vec = sender_counts .iter() .map(|(username, count)| { let mut row = json!({ "sender": sender_display(username, "", names, group_nicknames), "count": count, }); add_sender_identity(&mut row, true, username, names, group_nicknames); row }) .collect(); top_senders.sort_by(|a, b| { b["count"] .as_i64() .unwrap_or(0) .cmp(&a["count"].as_i64().unwrap_or(0)) .then_with(|| { a["sender"] .as_str() .unwrap_or("") .cmp(b["sender"].as_str().unwrap_or("")) }) }); top_senders.truncate(limit); top_senders } fn sender_username( real_sender_id: i64, content: &str, is_group: bool, chat_username: &str, id2u: &HashMap, ) -> String { let sender_uname = id2u.get(&real_sender_id).cloned().unwrap_or_default(); if !is_group { if !sender_uname.is_empty() && sender_uname != chat_username { return sender_uname; } return String::new(); } if !sender_uname.is_empty() && sender_uname != chat_username { return sender_uname; } if content.contains(":\n") { return content.splitn(2, ":\n").next().unwrap_or("").to_string(); } String::new() } fn add_sender_identity( row: &mut Value, is_group: bool, username: &str, names: &HashMap, group_nicknames: &HashMap, ) { if !is_group || username.is_empty() { return; } row["sender_username"] = Value::String(username.to_string()); row["sender_contact_display"] = Value::String( names.get(username).cloned().unwrap_or_else(|| username.to_string()) ); row["sender_group_nickname"] = Value::String( group_nicknames.get(username).cloned().unwrap_or_default() ); } fn sender_label( real_sender_id: i64, content: &str, is_group: bool, chat_username: &str, id2u: &HashMap, names: &HashMap, group_nicknames: &HashMap, ) -> String { let sender_uname = id2u.get(&real_sender_id).cloned().unwrap_or_default(); if is_group { if !sender_uname.is_empty() && sender_uname != chat_username { return sender_display(&sender_uname, "", names, group_nicknames); } if content.contains(":\n") { let raw = content.splitn(2, ":\n").next().unwrap_or(""); return sender_display(raw, "", names, group_nicknames); } return String::new(); } if !sender_uname.is_empty() && sender_uname != chat_username { return names.get(&sender_uname).cloned().unwrap_or(sender_uname); } String::new() } /// 读取消息内容列(兼容 TEXT 和 BLOB 两种存储类型) /// /// SQLite 中 message_content 在未压缩时为 TEXT,zstd 压缩后为 BLOB。 /// rusqlite 的 Vec FromSql 只接受 BLOB,读 TEXT 会静默返回空。 fn get_content_bytes(row: &rusqlite::Row<'_>, idx: usize) -> Vec { // 先尝试 BLOB,再 fallback 到 TEXT→bytes row.get::<_, Vec>(idx) .or_else(|_| row.get::<_, String>(idx).map(|s| s.into_bytes())) .unwrap_or_default() } fn decompress_message(data: &[u8], ct: i64) -> String { if ct == 4 && !data.is_empty() { // zstd 压缩 if let Ok(dec) = zstd::decode_all(data) { return String::from_utf8_lossy(&dec).into_owned(); } } String::from_utf8_lossy(data).into_owned() } fn decompress_or_str(data: &[u8]) -> String { if data.is_empty() { return String::new(); } // 尝试 zstd 解压 if let Ok(dec) = zstd::decode_all(data) { if let Ok(s) = String::from_utf8(dec) { return s; } } String::from_utf8_lossy(data).into_owned() } fn strip_group_prefix(s: &str) -> String { if s.contains(":\n") { s.splitn(2, ":\n").nth(1).unwrap_or(s).to_string() } else { s.to_string() } } pub fn fmt_type(t: i64) -> String { let base = (t as u64 & 0xFFFFFFFF) as i64; match base { 1 => "文本".into(), 3 => "图片".into(), 34 => "语音".into(), 42 => "名片".into(), 43 => "视频".into(), 47 => "表情".into(), 48 => "位置".into(), 49 => "链接/文件".into(), 50 => "通话".into(), 10000 => "系统".into(), 10002 => "撤回".into(), _ => format!("type={}", base), } } fn fmt_content(local_id: i64, local_type: i64, content: &str, is_group: bool) -> String { let base = (local_type as u64 & 0xFFFFFFFF) as i64; match base { 3 => return format!("[图片] local_id={}", local_id), 34 => return "[语音]".into(), 43 => return "[视频]".into(), 47 => return "[表情]".into(), 50 => return "[通话]".into(), 10000 => return parse_sysmsg(content).unwrap_or_else(|| "[系统消息]".into()), 10002 => return parse_revoke(content).unwrap_or_else(|| "[撤回了一条消息]".into()), _ => {} } let text = if is_group && content.contains(":\n") { content.splitn(2, ":\n").nth(1).unwrap_or(content) } else { content }; if base == 49 && text.contains("...` fn parse_revoke(xml: &str) -> Option { let inner = extract_xml_text(xml, "content")?; // 有时 content 是 "xxx recalled a message" 英文,有时是中文 if inner.is_empty() { return Some("[撤回了一条消息]".into()); } // 尝试简化:如果是 XML 格式的撤回内容,直接显示摘要 Some(format!( "[撤回] {}", inner.chars().take(30).collect::() )) } /// 解析系统消息 XML(群通知等) fn parse_sysmsg(xml: &str) -> Option { // 常见格式:... // 尝试提取 content 标签 if let Some(s) = extract_xml_text(xml, "content") { if !s.is_empty() { return Some(format!("[系统] {}", s.chars().take(50).collect::())); } } // 纯文本系统消息(无 XML) if !xml.starts_with('<') { return Some(format!( "[系统] {}", xml.chars().take(50).collect::() )); } Some("[系统消息]".into()) } fn parse_appmsg(text: &str) -> Option { if let Some(parsed) = parse_appmsg_dom(text) { return Some(parsed); } parse_appmsg_legacy(text) } fn parse_appmsg_dom(text: &str) -> Option { let doc = Document::parse(text).ok()?; let appmsg = doc.descendants().find(|node| node.has_tag_name("appmsg"))?; let title = xml_text(xml_child(appmsg, "title")).unwrap_or_default(); let atype = xml_text(xml_child(appmsg, "type")).unwrap_or_default(); match atype.as_str() { "6" => Some(format_file_appmsg(appmsg, &title)), "19" => Some(format_record_appmsg(appmsg, &title)), _ => None, } } fn parse_appmsg_legacy(text: &str) -> Option { let title = extract_xml_text(text, "title")?; let atype = extract_xml_text(text, "type").unwrap_or_default(); match atype.as_str() { "6" => Some(if !title.is_empty() { format!("[文件] {}", title) } else { "[文件]".into() }), "57" => { let ref_content = quote_refermsg_content(text) .or_else(|| { extract_xml_text(text, "content").and_then(|s| quote_content_text(&s, 40)) }) .unwrap_or_default(); let quote = if !title.is_empty() { format!("[引用] {}", title) } else { "[引用]".into() }; if !ref_content.is_empty() { Some(format!("{}\n \u{21b3} {}", quote, ref_content)) } else { Some(quote) } } "33" | "36" | "44" => Some(if !title.is_empty() { format!("[小程序] {}", title) } else { "[小程序]".into() }), _ => Some(if !title.is_empty() { format!("[链接] {}", title) } else { "[链接/文件]".into() }), } } fn format_file_appmsg<'a, 'input>(appmsg: Node<'a, 'input>, title: &str) -> String { let mut meta = Vec::new(); if let Some(size) = xml_child(appmsg, "appattach") .and_then(|attach| xml_text(xml_child(attach, "totallen"))) .and_then(|value| value.parse::().ok()) .filter(|size| *size > 0) { meta.push(format_byte_size(size)); } if let Some(ext) = xml_child(appmsg, "appattach") .and_then(|attach| xml_text(xml_child(attach, "fileext"))) .filter(|ext| !ext.is_empty()) { meta.push(ext); } let base = if !title.is_empty() { format!("[文件] {}", title) } else { "[文件]".into() }; if meta.is_empty() { base } else { format!("{} ({})", base, meta.join(", ")) } } fn format_record_appmsg<'a, 'input>(appmsg: Node<'a, 'input>, title: &str) -> String { let items = record_item_lines(appmsg); let mut header = if !title.is_empty() { format!("[合并聊天记录] {}", title) } else { "[合并聊天记录]".into() }; if !items.is_empty() { header.push_str(&format!(" ({}条)", items.len())); } let mut lines = vec![header]; if items.is_empty() { if let Some(desc) = xml_text(xml_child(appmsg, "des")).filter(|desc| !desc.is_empty()) { lines.push(format!(" {}", collapse_text(&desc, 120))); } } else { for item in items.iter().take(10) { lines.push(format!(" - {}", item)); } if items.len() > 10 { lines.push(format!(" - ... 还有{}条", items.len() - 10)); } } lines.join("\n") } fn record_item_lines<'a, 'input>(appmsg: Node<'a, 'input>) -> Vec { let mut lines = record_item_lines_from_node(appmsg); if !lines.is_empty() { return lines; } let Some(record_xml) = xml_text(xml_child(appmsg, "recorditem")).filter(|value| !value.is_empty()) else { return Vec::new(); }; let unescaped = unescape_html(&record_xml); for candidate in [&record_xml, &unescaped] { if let Ok(doc) = Document::parse(candidate) { lines = record_item_lines_from_node(doc.root_element()); if !lines.is_empty() { break; } } } lines } fn record_item_lines_from_node<'a, 'input>(node: Node<'a, 'input>) -> Vec { node.descendants() .filter(|child| child.has_tag_name("dataitem")) .filter_map(format_record_item) .collect() } fn format_record_item<'a, 'input>(item: Node<'a, 'input>) -> Option { let name = first_child_text(item, &["sourcename", "datasrcname", "sourceusername"]); let desc = first_child_text(item, &["datadesc", "datatitle", "datafmt"]).or_else(|| { item.attribute("datatype") .and_then(record_datatype_label) .map(str::to_string) })?; let desc = collapse_text(&desc, 100); if let Some(name) = name.filter(|value| !value.is_empty()) { Some(format!("{}: {}", name, desc)) } else { Some(desc) } } fn first_child_text<'a, 'input>(node: Node<'a, 'input>, tags: &[&str]) -> Option { tags.iter() .find_map(|tag| xml_text(xml_child(node, tag))) .filter(|value| !value.is_empty()) } fn record_datatype_label(datatype: &str) -> Option<&'static str> { match datatype { "1" => Some("[文本]"), "2" => Some("[图片]"), "3" => Some("[语音]"), "4" => Some("[视频]"), "6" => Some("[文件]"), "17" => Some("[链接]"), _ => None, } } fn quote_refermsg_content(text: &str) -> Option { let refer = extract_xml_text(text, "refermsg")?; let content = extract_xml_text(&refer, "content") .and_then(|s| quote_content_text(&s, 80)) .or_else(|| { extract_xml_text(&refer, "type") .and_then(|t| quote_refermsg_type_label(&t).map(str::to_string)) })?; match extract_xml_text(&refer, "displayname") { Some(name) if !name.is_empty() => Some(format!("{}: {}", name, content)), _ => Some(content), } } fn quote_content_text(raw: &str, max_chars: usize) -> Option { let unescaped = unescape_html(raw); if unescaped.contains(" Option<&'static str> { match t { "1" => None, "3" => Some("[图片]"), "34" => Some("[语音]"), "43" => Some("[视频]"), "47" => Some("[表情]"), "49" => Some("[链接/文件]"), _ => None, } } fn collapse_text(text: &str, max_chars: usize) -> String { let collapsed = text.split_whitespace().collect::>().join(" "); if collapsed.chars().count() > max_chars { format!( "{}...", collapsed.chars().take(max_chars).collect::() ) } else { collapsed } } fn format_byte_size(bytes: u64) -> String { const KB: f64 = 1024.0; const MB: f64 = KB * 1024.0; const GB: f64 = MB * 1024.0; let bytes_f = bytes as f64; if bytes_f >= GB { format_decimal_unit(bytes_f / GB, "GB") } else if bytes_f >= MB { format_decimal_unit(bytes_f / MB, "MB") } else if bytes_f >= KB { format_decimal_unit(bytes_f / KB, "KB") } else { format!("{} B", bytes) } } fn format_decimal_unit(value: f64, unit: &str) -> String { let mut s = format!("{:.1}", value); if s.ends_with(".0") { s.truncate(s.len() - 2); } format!("{} {}", s, unit) } fn extract_xml_text(xml: &str, tag: &str) -> Option { let open = format!("<{}>", tag); let close = format!("", tag); let start = xml.find(&open)?; let content_start = start + open.len(); let end = xml[content_start..].find(&close)?; Some(xml[content_start..content_start + end].trim().to_string()) } fn appmsg_url_for_message(local_type: i64, content: &str) -> Option { if (local_type as u64 & 0xFFFFFFFF) != 49 { return None; } extract_appmsg_url(content) } fn extract_favorite_url(content: &str) -> Option { let url = extract_xml_text(content, "link").map(|s| unescape_html(strip_xml_cdata(&s)))?; if url.is_empty() || !(url.starts_with("http://") || url.starts_with("https://")) { return None; } Some(url) } fn strip_xml_cdata(s: &str) -> &str { s.strip_prefix("")) .unwrap_or(s) } /// 从 appmsg XML 中提取链接 URL(优先取 ,fallback 到 ) fn extract_appmsg_url(text: &str) -> Option { let xml = strip_group_prefix(text); if !xml.contains(" Option { let open = format!("<{}", tag); let start = xml.find(&open)?; let tag_end = start + xml[start..].find('>')?; let attr_pat = format!(r#"{}=""#, attr); let attr_start = start + xml[start..tag_end].find(&attr_pat)? + attr_pat.len(); let attr_end = attr_start + xml[attr_start..tag_end].find('"')?; let value = xml[attr_start..attr_end].trim(); if value.is_empty() { None } else { Some(value.to_string()) } } fn unescape_html(s: &str) -> String { s.replace("<", "<") .replace(">", ">") .replace("&", "&") .replace(""", "\"") .replace("'", "'") } #[cfg(test)] mod appmsg_tests { use super::*; #[test] fn parse_forwarded_chat_record_expands_record_items() { let xml = r#" 群聊的聊天记录 张三: 早上好 李四: 收到 19 <recordinfo><datalist count="2"><dataitem datatype="1"><sourcename>张三</sourcename><sourcetime>1710000000</sourcetime><datadesc>早上好 &amp; coffee</datadesc></dataitem><dataitem datatype="2"><sourcename>李四</sourcename><sourcetime>1710000060</sourcetime><datafmt>图片</datafmt><datadesc>[图片]</datadesc></dataitem></datalist></recordinfo> "#; assert_eq!( parse_appmsg(xml).as_deref(), Some( "[合并聊天记录] 群聊的聊天记录 (2条)\n - 张三: 早上好 & coffee\n - 李四: [图片]" ) ); } #[test] fn parse_file_appmsg_includes_attachment_metadata() { let xml = r#" report.pdf 6 1536 pdf abcdef123456 "#; assert_eq!( parse_appmsg(xml).as_deref(), Some("[文件] report.pdf (1.5 KB, pdf)") ); } #[test] fn parse_quote_appmsg_reads_refermsg_content() { let xml = r#" 我也没有用ai啊 57 1 不再熬夜 昨天用 claude 爬小红书数据来着 "#; assert_eq!( parse_appmsg(xml).as_deref(), Some("[引用] 我也没有用ai啊\n \u{21b3} 不再熬夜: 昨天用 claude 爬小红书数据来着") ); } #[test] fn query_messages_filters_appmsg_by_base_type() { let path = temp_db_path("query_messages_filters_appmsg_by_base_type"); { let conn = Connection::open(&path).expect("open temp db"); conn.execute( "CREATE TABLE Msg_test ( local_id INTEGER, local_type INTEGER, create_time INTEGER, real_sender_id INTEGER, message_content TEXT, WCDB_CT_message_content INTEGER )", [], ) .expect("create message table"); conn.execute( "INSERT INTO Msg_test VALUES (?1, ?2, ?3, ?4, ?5, ?6)", rusqlite::params![ 1_i64, ((57_i64) << 32) | 49_i64, 1775146911_i64, 0_i64, r#"我也没有用ai啊57不再熬夜昨天用 claude 爬小红书数据来着"#, 0_i64 ], ) .expect("insert quote message"); } let rows = query_messages( &path, "Msg_test", "wxid_r605h38n08mv22", false, &HashMap::new(), &HashMap::new(), None, None, Some(49), 10, 0, ) .expect("query messages"); let _ = std::fs::remove_file(&path); assert_eq!(rows.len(), 1); assert_eq!( rows[0]["content"].as_str(), Some("[引用] 我也没有用ai啊\n \u{21b3} 不再熬夜: 昨天用 claude 爬小红书数据来着") ); } #[test] fn query_messages_includes_stable_group_sender_identity() { let path = temp_db_path("query_messages_includes_stable_group_sender_identity"); { let conn = Connection::open(&path).expect("open temp db"); conn.execute( "CREATE TABLE Name2Id ( user_name TEXT )", [], ) .expect("create Name2Id table"); conn.execute( "INSERT INTO Name2Id(rowid, user_name) VALUES (?1, ?2)", rusqlite::params![42_i64, "wxid_alice"], ) .expect("insert Name2Id row"); conn.execute( "CREATE TABLE Msg_test ( local_id INTEGER, local_type INTEGER, create_time INTEGER, real_sender_id INTEGER, message_content TEXT, WCDB_CT_message_content INTEGER )", [], ) .expect("create message table"); conn.execute( "INSERT INTO Msg_test VALUES (?1, ?2, ?3, ?4, ?5, ?6)", rusqlite::params![ 1_i64, 1_i64, 1775146911_i64, 42_i64, "hello", 0_i64 ], ) .expect("insert text message"); } let names = HashMap::from([("wxid_alice".to_string(), "Alice Contact".to_string())]); let group_nicknames = HashMap::from([("wxid_alice".to_string(), "同名".to_string())]); let rows = query_messages( &path, "Msg_test", "123@chatroom", true, &names, &group_nicknames, None, None, None, 10, 0, ) .expect("query messages"); let _ = std::fs::remove_file(&path); assert_eq!(rows.len(), 1); assert_eq!(rows[0]["sender"].as_str(), Some("同名")); assert_eq!(rows[0]["sender_username"].as_str(), Some("wxid_alice")); assert_eq!(rows[0]["sender_contact_display"].as_str(), Some("Alice Contact")); assert_eq!(rows[0]["sender_group_nickname"].as_str(), Some("同名")); } #[test] fn search_in_table_includes_stable_group_sender_identity() { let conn = Connection::open_in_memory().expect("open in-memory db"); conn.execute( "CREATE TABLE Name2Id ( user_name TEXT )", [], ) .expect("create Name2Id table"); conn.execute( "INSERT INTO Name2Id(rowid, user_name) VALUES (?1, ?2)", rusqlite::params![42_i64, "wxid_alice"], ) .expect("insert Name2Id row"); conn.execute( "CREATE TABLE Msg_test ( local_id INTEGER, local_type INTEGER, create_time INTEGER, real_sender_id INTEGER, message_content TEXT, WCDB_CT_message_content INTEGER )", [], ) .expect("create message table"); conn.execute( "INSERT INTO Msg_test VALUES (?1, ?2, ?3, ?4, ?5, ?6)", rusqlite::params![1_i64, 1_i64, 1775146911_i64, 42_i64, "needle", 0_i64], ) .expect("insert text message"); let names = HashMap::from([("wxid_alice".to_string(), "Alice Contact".to_string())]); let group_nicknames = HashMap::from([("wxid_alice".to_string(), "同名".to_string())]); let rows = search_in_table( &conn, "Msg_test", "123@chatroom", true, &names, &group_nicknames, "needle", None, None, None, 10, ) .expect("search messages"); assert_eq!(rows.len(), 1); assert_eq!(rows[0]["sender"].as_str(), Some("同名")); assert_eq!(rows[0]["sender_username"].as_str(), Some("wxid_alice")); assert_eq!(rows[0]["sender_contact_display"].as_str(), Some("Alice Contact")); assert_eq!(rows[0]["sender_group_nickname"].as_str(), Some("同名")); } /// q_attachments 是异步 + 依赖 DbCache,无法直接 unit-test 整条 pipeline。 /// 这里锁住 attachment row 复用 `add_sender_identity` 后的最终 JSON 形状: /// 两个 group nickname 同为 "同名" 的成员,attachment 行可以通过 sender_username 区分。 #[test] fn attachment_row_gets_stable_group_sender_identity_via_helper() { let names: HashMap = HashMap::from([ ("wxid_alice".to_string(), "Alice Contact".to_string()), ("wxid_bob".to_string(), "Bob Contact".to_string()), ]); let group_nicknames: HashMap = HashMap::from([ ("wxid_alice".to_string(), "同名".to_string()), ("wxid_bob".to_string(), "同名".to_string()), ]); let mut alice_row = json!({ "attachment_id": "abc", "kind": "image", "type": "Image", "local_id": 1, "timestamp": 1775146911, "time": "2026-04-30 12:00", "sender": "同名", }); add_sender_identity(&mut alice_row, true, "wxid_alice", &names, &group_nicknames); assert_eq!(alice_row["sender"].as_str(), Some("同名")); assert_eq!(alice_row["sender_username"].as_str(), Some("wxid_alice")); assert_eq!(alice_row["sender_contact_display"].as_str(), Some("Alice Contact")); assert_eq!(alice_row["sender_group_nickname"].as_str(), Some("同名")); let mut bob_row = json!({ "attachment_id": "def", "kind": "image", "type": "Image", "local_id": 2, "timestamp": 1775146922, "time": "2026-04-30 12:00", "sender": "同名", }); add_sender_identity(&mut bob_row, true, "wxid_bob", &names, &group_nicknames); assert_eq!(bob_row["sender_username"].as_str(), Some("wxid_bob")); // 同样 sender_group_nickname 都是 "同名",但 sender_username 能区分 assert_ne!( alice_row["sender_username"], bob_row["sender_username"], "sender_username 必须区分两位同名成员" ); // 非群 chat 不该追加 identity 字段(行为对齐 history/search/new-messages) let mut private_row = json!({"attachment_id": "ghi", "sender": ""}); add_sender_identity(&mut private_row, false, "wxid_alice", &names, &group_nicknames); assert!(private_row.get("sender_username").is_none()); assert!(private_row.get("sender_contact_display").is_none()); assert!(private_row.get("sender_group_nickname").is_none()); // group 但 sender_username 解析为空(非常老的格式、id2u 没命中、content 也没 wxid_xxx:\n 前缀): // 不要伪造空字段,整段 identity 也不追加 let mut unknown_row = json!({"attachment_id": "jkl", "sender": ""}); add_sender_identity(&mut unknown_row, true, "", &names, &group_nicknames); assert!(unknown_row.get("sender_username").is_none()); assert!(unknown_row.get("sender_contact_display").is_none()); assert!(unknown_row.get("sender_group_nickname").is_none()); } #[test] fn search_in_table_filters_appmsg_by_base_type() { let conn = Connection::open_in_memory().expect("open in-memory db"); conn.execute( "CREATE TABLE Msg_test ( local_id INTEGER, local_type INTEGER, create_time INTEGER, real_sender_id INTEGER, message_content TEXT, WCDB_CT_message_content INTEGER )", [], ) .expect("create message table"); conn.execute( "INSERT INTO Msg_test VALUES (?1, ?2, ?3, ?4, ?5, ?6)", rusqlite::params![ 1_i64, ((57_i64) << 32) | 49_i64, 1775146911_i64, 0_i64, r#"我也没有用ai啊57不再熬夜昨天用 claude 爬小红书数据来着"#, 0_i64 ], ) .expect("insert quote message"); let rows = search_in_table( &conn, "Msg_test", "wxid_r605h38n08mv22", false, &HashMap::new(), &HashMap::new(), "claude", None, None, Some(49), 10, ) .expect("search messages"); assert_eq!(rows.len(), 1); assert_eq!( rows[0]["content"].as_str(), Some("[引用] 我也没有用ai啊\n \u{21b3} 不再熬夜: 昨天用 claude 爬小红书数据来着") ); } #[test] fn search_in_table_matches_decompressed_formatted_appmsg_content() { let conn = Connection::open_in_memory().expect("open in-memory db"); conn.execute( "CREATE TABLE Msg_test ( local_id INTEGER, local_type INTEGER, create_time INTEGER, real_sender_id INTEGER, message_content BLOB, WCDB_CT_message_content INTEGER )", [], ) .expect("create message table"); let xml = r#"我也没有用ai啊57不再熬夜昨天用 claude 爬小红书数据来着"#; let compressed = zstd::encode_all(xml.as_bytes(), 0).expect("compress appmsg xml"); conn.execute( "INSERT INTO Msg_test VALUES (?1, ?2, ?3, ?4, ?5, ?6)", rusqlite::params![ 1_i64, ((57_i64) << 32) | 49_i64, 1775146911_i64, 0_i64, compressed, 4_i64 ], ) .expect("insert compressed quote message"); let rows = search_in_table( &conn, "Msg_test", "wxid_r605h38n08mv22", false, &HashMap::new(), &HashMap::new(), "claude", None, None, Some(49), 10, ) .expect("search messages"); assert_eq!(rows.len(), 1); assert_eq!( rows[0]["content"].as_str(), Some("[引用] 我也没有用ai啊\n \u{21b3} 不再熬夜: 昨天用 claude 爬小红书数据来着") ); } fn temp_db_path(name: &str) -> std::path::PathBuf { let unique = format!( "wx-cli-{}-{}-{}.db", name, std::process::id(), std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("clock before unix epoch") .as_nanos() ); std::env::temp_dir().join(unique) } } fn fmt_time(ts: i64, fmt: &str) -> String { Local .timestamp_opt(ts, 0) .single() .map(|dt| dt.format(fmt).to_string()) .unwrap_or_else(|| ts.to_string()) } // ─── 新增命令查询函数 ────────────────────────────────────────────────────────── /// 查询有未读消息的会话 /// /// `filter`:按 chat_type 过滤,None 或空 Vec 等价于 "all"。 /// 可选值:`private` / `group` / `official` / `folded` / `all`。 /// 多选支持在 CLI 层用逗号分隔后传入多个元素。 pub async fn q_unread( db: &DbCache, names: &Names, limit: usize, filter: Option>, with_meta: bool, debug_source: bool, ) -> Result { let path = db .get("session/session.db") .await? .context("无法解密 session.db")?; // 归一化 filter:小写 + 去除别名。返回 None 代表"不过滤"。 let filter_set: Option> = filter.and_then(|v| { let mut set = std::collections::HashSet::new(); for raw in v { match raw.trim().to_lowercase().as_str() { "" | "all" => return None, "private" => { set.insert("private"); } "group" => { set.insert("group"); } "official" | "official_account" => { set.insert("official_account"); } "folded" | "fold" => { set.insert("folded"); } _ => {} // 未知值忽略,避免拼错导致什么都不返回 } } if set.is_empty() { None } else { Some(set) } }); // 有 filter 时必须全表扫:SQL LIMIT 会把想要的公众号先筛掉。 // 无 filter 时保留 LIMIT,避免重度用户的大量未读会话拖慢默认路径。 let has_filter = filter_set.is_some(); let limit_val = limit; let rows: Vec<(String, i64, Vec, i64, i64, String, String)> = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path)?; let sql = if has_filter { "SELECT username, unread_count, summary, last_timestamp, last_msg_type, last_msg_sender, last_sender_display_name FROM SessionTable WHERE unread_count > 0 ORDER BY last_timestamp DESC" } else { "SELECT username, unread_count, summary, last_timestamp, last_msg_type, last_msg_sender, last_sender_display_name FROM SessionTable WHERE unread_count > 0 ORDER BY last_timestamp DESC LIMIT ?" }; let mut stmt = conn.prepare(sql)?; let map_row = |row: &rusqlite::Row<'_>| { Ok(( row.get::<_, String>(0)?, row.get::<_, i64>(1).unwrap_or(0), get_content_bytes(row, 2), row.get::<_, i64>(3).unwrap_or(0), row.get::<_, i64>(4).unwrap_or(0), row.get::<_, String>(5).unwrap_or_default(), row.get::<_, String>(6).unwrap_or_default(), )) }; let rows = if has_filter { stmt.query_map([], map_row)? .collect::>>()? } else { stmt.query_map([limit_val as i64], map_row)? .collect::>>()? }; Ok::<_, anyhow::Error>(rows) }) .await??; let mut results = Vec::new(); let mut group_nickname_cache: HashMap> = HashMap::new(); for (username, unread, summary_bytes, ts, msg_type, sender, sender_name) in rows { let chat_type = chat_type_of(&username, names); if let Some(ref set) = filter_set { if !set.contains(chat_type) { continue; } } if results.len() >= limit { break; } let display = names.display(&username); let is_group = chat_type == "group"; let summary = decompress_or_str(&summary_bytes); let summary = strip_group_prefix(&summary); let sender_display = if is_group && !sender.is_empty() { if !group_nickname_cache.contains_key(&username) { let nicknames = load_group_nicknames(db, &username) .await .unwrap_or_default(); group_nickname_cache.insert(username.clone(), nicknames); } let empty = HashMap::new(); let group_nicknames = group_nickname_cache.get(&username).unwrap_or(&empty); sender_display(&sender, &sender_name, &names.map, group_nicknames) } else { String::new() }; results.push(json!({ "chat": display, "username": username, "is_group": is_group, "chat_type": chat_type, "unread": unread, "last_msg_type": fmt_type(msg_type), "last_sender": sender_display, "summary": summary, "timestamp": ts, "time": fmt_time(ts, "%m-%d %H:%M"), })); } let total = results.len(); let latest_ts = results .first() .and_then(|v| v.get("timestamp")) .and_then(|v| v.as_i64()); let unknown_shards = current_unknown_shards(db, names); let meta = Meta { chat_latest_timestamp: latest_ts, chat_latest_db: latest_ts.map(|_| "session/session.db".to_string()), session_last_timestamp: None, shards_scanned: 0, shards_hit: 0, unknown_shards: unknown_shards.clone(), status: derive_status(latest_ts, None, &unknown_shards, false), per_shard_latest: if with_meta || debug_source { Some(HashMap::new()) } else { None }, cache_mode_per_shard: None, shard_paths: None, }; Ok(json!({ "sessions": results, "total": total, "meta": meta })) } /// 查询群成员:优先从 contact.db 的 chatroom_member/chat_room 表获取完整列表, /// 若表不存在则退化为从消息记录聚合有发言记录的成员 pub async fn q_members(db: &DbCache, names: &Names, chat: &str) -> Result { let username = resolve_username(chat, names).with_context(|| format!("找不到联系人: {}", chat))?; if !username.contains("@chatroom") { anyhow::bail!("'{}' 不是群聊,无法查看群成员", names.display(&username)); } let display = names.display(&username); let names_map = names.map.clone(); // 优先路径:contact.db → chatroom_member + chat_room(完整成员列表) if let Some(contact_p) = db.get("contact/contact.db").await? { let uname2 = username.clone(); let names_map2 = names_map.clone(); let members_opt: Option> = tokio::task::spawn_blocking(move || { let conn = Connection::open(&contact_p)?; let has_table: bool = conn .query_row( "SELECT 1 FROM sqlite_master WHERE type='table' AND name='chatroom_member'", [], |_| Ok(true), ) .unwrap_or(false); if !has_table { return Ok::<_, anyhow::Error>(None); } // 从 chat_room 表获取整数 room_id 和群主 // WeChat 不同版本列名可能不同:username / chat_room_name / name let (room_id, owner): (i64, String) = [ "SELECT id, owner FROM chat_room WHERE username = ?", "SELECT id, owner FROM chat_room WHERE chat_room_name = ?", "SELECT id, owner FROM chat_room WHERE name = ?", ] .iter() .find_map(|sql| { conn.query_row(sql, [&uname2], |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, String>(1).unwrap_or_default(), )) }) .ok() }) .unwrap_or((0, String::new())); if room_id == 0 { return Ok::<_, anyhow::Error>(None); } let mut stmt = conn.prepare( "SELECT c.username, c.nick_name, c.remark FROM chatroom_member cm LEFT JOIN contact c ON c.id = cm.member_id WHERE cm.room_id = ?", )?; let raw: Vec<(String, String, String)> = stmt .query_map([room_id], |row| { Ok(( row.get::<_, String>(0).unwrap_or_default(), row.get::<_, String>(1).unwrap_or_default(), row.get::<_, String>(2).unwrap_or_default(), )) })? .filter_map(|r| r.ok()) .filter(|(uid, _, _)| !uid.is_empty()) .collect(); if raw.is_empty() { return Ok(None); } let target_usernames: HashSet = raw.iter().map(|(uid, _, _)| uid.clone()).collect(); let group_nicknames = load_group_nickname_map_from_conn(&conn, &uname2, Some(&target_usernames)); let mut members: Vec = raw .iter() .map(|(uid, nick, remark)| { let contact_display = contact_display(uid, nick, remark, &names_map2); let group_nickname = group_nicknames.get(uid).cloned().unwrap_or_default(); let disp = if group_nickname.is_empty() { contact_display.clone() } else { group_nickname.clone() }; let is_owner = uid == &owner && !owner.is_empty(); json!({ "username": uid, "display": disp, "contact_display": contact_display, "group_nickname": group_nickname, "is_owner": is_owner, }) }) .collect(); // 群主排首位,其余按 display 字典序 members.sort_by(|a, b| { let ao = a["is_owner"].as_bool().unwrap_or(false); let bo = b["is_owner"].as_bool().unwrap_or(false); if ao != bo { return bo.cmp(&ao); } a["display"] .as_str() .unwrap_or("") .cmp(b["display"].as_str().unwrap_or("")) }); Ok(Some(members)) }) .await??; if let Some(members) = members_opt { return Ok(json!({ "chat": display, "username": username, "count": members.len(), "members": members, })); } } // 降级路径:从消息记录中聚合发言过的成员 let tables = find_msg_tables(db, names, &username).await?; if tables.is_empty() { return Ok(json!({ "chat": display, "username": username, "count": 0, "members": [], })); } let mut sender_set: std::collections::HashSet = std::collections::HashSet::new(); for (db_path, table_name) in &tables { let path = db_path.clone(); let tname = table_name.clone(); let uname = username.clone(); let senders: Vec = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path)?; let id2u = load_id2u(&conn); let mut stmt = conn.prepare(&format!( "SELECT DISTINCT real_sender_id FROM [{}] WHERE real_sender_id > 0", tname ))?; let ids: Vec = stmt .query_map([], |row| row.get(0))? .filter_map(|r| r.ok()) .collect(); let senders: Vec = ids .iter() .filter_map(|id| id2u.get(id)) .filter(|u| *u != &uname) .cloned() .collect(); Ok::<_, anyhow::Error>(senders) }) .await??; sender_set.extend(senders); } let group_nicknames = load_group_nicknames(db, &username) .await .unwrap_or_default(); let mut members: Vec = sender_set .iter() .map(|u| { let contact_display = names_map.get(u).cloned().unwrap_or_else(|| u.clone()); let group_nickname = group_nicknames.get(u).cloned().unwrap_or_default(); let display = if group_nickname.is_empty() { contact_display.clone() } else { group_nickname.clone() }; json!({ "username": u, "display": display, "contact_display": contact_display, "group_nickname": group_nickname, "is_owner": false, }) }) .collect(); members.sort_by(|a, b| { a["display"] .as_str() .unwrap_or("") .cmp(b["display"].as_str().unwrap_or("")) }); Ok(json!({ "chat": display, "username": username, "count": members.len(), "members": members, })) } /// 查询新消息:以 session.db 的 last_timestamp 作为 inbox 索引, /// 只查询 last_timestamp > state[username] 的会话,精确且高效 pub async fn q_new_messages( db: &DbCache, names: &Names, state: Option>, limit: usize, with_meta: bool, debug_source: bool, ) -> Result { // 首次运行(state=None)或未见过的会话,用 24h 前作为起点, // 避免第一次运行时把全量历史消息涌入 let fallback_ts = chrono::Utc::now().timestamp() - 86400; // 1. 从 session.db 读取所有会话的当前 last_timestamp let session_path = db .get("session/session.db") .await? .context("无法解密 session.db")?; let all_sessions: Vec<(String, i64)> = tokio::task::spawn_blocking(move || { let conn = Connection::open(&session_path)?; let mut stmt = conn.prepare( "SELECT username, last_timestamp FROM SessionTable WHERE last_timestamp > 0", )?; let rows = stmt .query_map([], |row| { Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1).unwrap_or(0))) })? .collect::>>()?; Ok::<_, anyhow::Error>(rows) }) .await??; // 2. 记录 session.db 的当前快照(用于构建 new_state 基础) let session_ts_map: HashMap = all_sessions .iter() .map(|(u, ts)| (u.clone(), *ts)) .collect(); // 3. 找出有新消息的会话 // 不在 state 中的会话(首次运行或新会话)以 fallback_ts 为基准 let changed: Vec<(String, i64)> = all_sessions .into_iter() .filter(|(uname, ts)| { let last_known = state .as_ref() .and_then(|m| m.get(uname)) .copied() .unwrap_or(fallback_ts); *ts > last_known }) .collect(); let unknown_shards = current_unknown_shards(db, names); if changed.is_empty() { let meta = meta_for_global_query( 0, 0, unknown_shards, true, with_meta, debug_source, Some(HashMap::new()), Some(HashMap::new()), ); return Ok(json!({ "count": 0, "messages": [], "new_state": session_ts_map, "meta": meta, })); } // 4. 只查询有新消息的会话的消息表 // per_table_limit 取 limit*5 防止单表截断,最终由全局 truncate 收尾 let per_table_limit = limit.saturating_mul(5).max(200); let mut all_msgs: Vec = Vec::new(); let mut scanned_rel_keys: HashSet = HashSet::new(); let mut hit_rel_keys: HashSet = HashSet::new(); let mut cache_modes: HashMap = HashMap::new(); let mut shard_paths: HashMap = HashMap::new(); for (uname, _) in &changed { let since_ts = state .as_ref() .and_then(|m| m.get(uname)) .copied() .unwrap_or(fallback_ts); let (shards, _) = find_msg_shards(db, names, uname).await?; if shards.is_empty() { continue; } for shard in &shards { scanned_rel_keys.insert(shard.rel_key.clone()); cache_modes.insert(shard.rel_key.clone(), shard.cache_mode.as_str().to_string()); shard_paths.insert( shard.rel_key.clone(), shard.path.to_string_lossy().into_owned(), ); } let display = names.display(uname); let chat_type = chat_type_of(uname, names); let is_group = chat_type == "group"; let group_nicknames = if is_group { load_group_nicknames(db, uname).await.unwrap_or_default() } else { HashMap::new() }; for shard in &shards { let path = shard.path.clone(); let tname = shard.table.clone(); let uname2 = uname.clone(); let display2 = display.clone(); let names_map = names.map.clone(); let group_nicknames2 = group_nicknames.clone(); let tname_for_log = tname.clone(); let rel_key_for_hit = shard.rel_key.clone(); let msgs: Vec = match tokio::task::spawn_blocking(move || { let conn = Connection::open(&path)?; let id2u = load_id2u(&conn); let sql = format!( "SELECT local_id, local_type, create_time, real_sender_id, message_content, WCDB_CT_message_content FROM [{}] WHERE create_time > ? ORDER BY create_time ASC LIMIT ?", tname ); let rows: Vec<_> = conn .prepare(&sql) .and_then(|mut stmt| { stmt.query_map(rusqlite::params![since_ts, per_table_limit as i64], |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?, row.get::<_, i64>(3)?, get_content_bytes(row, 4), row.get::<_, i64>(5).unwrap_or(0), )) }) .map(|it| it.filter_map(|r| r.ok()).collect()) }) .unwrap_or_default(); let mut result = Vec::new(); for (local_id, local_type, ts, real_sender_id, content_bytes, ct) in rows { let content = decompress_message(&content_bytes, ct); let sender_username = sender_username(real_sender_id, &content, is_group, &uname2, &id2u); let sender = sender_label( real_sender_id, &content, is_group, &uname2, &id2u, &names_map, &group_nicknames2, ); let text = fmt_content(local_id, local_type, &content, is_group); let url = appmsg_url_for_message(local_type, &content); let mut msg = json!({ "chat": display2, "username": uname2, "is_group": is_group, "chat_type": chat_type, "timestamp": ts, "time": fmt_time(ts, "%Y-%m-%d %H:%M"), "sender": sender, "content": text, "type": fmt_type(local_type), }); add_sender_identity(&mut msg, is_group, &sender_username, &names_map, &group_nicknames2); if let Some(u) = url { msg["url"] = serde_json::Value::String(u); } result.push(msg); } Ok::<_, anyhow::Error>(result) }) .await { Ok(Ok(v)) => v, Ok(Err(e)) => { eprintln!("[new-messages] skip {}: {}", tname_for_log, e); continue; } Err(e) => { eprintln!("[new-messages] task error: {}", e); continue; } }; if !msgs.is_empty() { hit_rel_keys.insert(rel_key_for_hit); } all_msgs.extend(msgs); } } all_msgs.sort_by_key(|m| m["timestamp"].as_i64().unwrap_or(0)); all_msgs.truncate(limit); // 5. 重建 new_state,防止全局 limit 截断导致消息永久丢失: // - 未变化的会话:沿用 session.db 的 last_timestamp(即 session_ts_map) // - 变化但全被截断(无消息在最终结果中): // * 后续调用 (state=Some):保留旧 since_ts,下次重试拿这部分消息 // * 首次调用 (state=None):advance 到 session_ts,避免 since_ts 锁死在 // fallback_ts 导致后续每次都回扫 24h。窗口会随调用次数 + 时间累积扩大, // 性能持续衰退。代价:首次 + 被截断会话的老消息看不到,需走 `wx history`。 // - 变化且有消息返回:advance 到该会话在结果中的最大 timestamp(增量 fetch 标准语义) let returned_max_ts: HashMap = { let mut m: HashMap = HashMap::new(); for msg in &all_msgs { if let (Some(u), Some(ts)) = (msg["username"].as_str(), msg["timestamp"].as_i64()) { let e = m.entry(u.to_string()).or_insert(0); if ts > *e { *e = ts; } } } m }; let mut new_state = session_ts_map; for (uname, _) in &changed { let in_results = returned_max_ts.contains_key(uname); let prev = state.as_ref().and_then(|m| m.get(uname)).copied(); let next_ts = match (in_results, prev) { (true, _) => { // 有消息返回:advance 到 returned_max;返回的最大 ts 通常 ≤ session_ts, // 这样下次查 `since > returned_max` 仍能拿到 returned_max..session_ts 的截断尾巴。 returned_max_ts[uname] } (false, Some(prev)) => prev, // 后续 + 截断:保持旧 since (false, None) => { // 首次 + 截断:advance 到 session_ts 兜底,避免 since_ts 锁死。 new_state.get(uname).copied().unwrap_or(fallback_ts) } }; new_state.insert(uname.clone(), next_ts); } let meta = meta_for_global_query( scanned_rel_keys.len(), hit_rel_keys.len(), unknown_shards, true, with_meta, debug_source, Some(cache_modes), Some(shard_paths), ); Ok(json!({ "count": all_msgs.len(), "messages": all_msgs, "new_state": new_state, "meta": meta, })) } /// 查询收藏内容(favorite/favorite.db 的 fav_db_item 表) pub async fn q_favorites( db: &DbCache, limit: usize, fav_type: Option, query: Option, ) -> Result { let path = db .get("favorite/favorite.db") .await? .context("找不到 favorite.db,请确认微信数据目录")?; let rows: Vec = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path)?; let mut clauses: Vec<&'static str> = Vec::new(); let mut params: Vec> = Vec::new(); if let Some(t) = fav_type { clauses.push("type = ?"); params.push(Box::new(t)); } let like_str: Option = query.map(|q| { let esc = q .replace('\\', "\\\\") .replace('%', "\\%") .replace('_', "\\_"); format!("%{}%", esc) }); if let Some(ref s) = like_str { clauses.push("content LIKE ? ESCAPE '\\'"); params.push(Box::new(s.clone())); } let where_clause = if clauses.is_empty() { String::new() } else { format!("WHERE {}", clauses.join(" AND ")) }; params.push(Box::new(limit as i64)); let sql = format!( "SELECT local_id, type, update_time, content, fromusr, realchatname FROM fav_db_item {} ORDER BY update_time DESC LIMIT ?", where_clause ); let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect(); let mut stmt = conn.prepare(&sql)?; let rows: Vec = stmt .query_map(params_ref.as_slice(), |row| { Ok(( row.get::<_, i64>(0).unwrap_or(0), row.get::<_, i64>(1).unwrap_or(0), row.get::<_, i64>(2).unwrap_or(0), row.get::<_, String>(3).unwrap_or_default(), row.get::<_, String>(4).unwrap_or_default(), row.get::<_, String>(5).unwrap_or_default(), )) })? .filter_map(|r| r.ok()) .map(|(local_id, ftype, ts, content, fromusr, chatname)| { let type_str = match ftype { 1 => "文本", 2 => "图片", 5 => "文章", 19 => "名片", 20 => "视频", _ => "其他", }; // 安全截断(按 Unicode 字符而非字节) let preview: String = content.chars().take(100).collect(); let preview = if content.chars().count() > 100 { format!("{}...", preview) } else { preview }; // WeChat 部分版本的 update_time 为毫秒,10位以上判定为毫秒后转秒 let ts_secs = if ts > 9_999_999_999 { ts / 1000 } else { ts }; let mut item = json!({ "id": local_id, "type": type_str, "type_num": ftype, "time": fmt_time(ts_secs, "%Y-%m-%d %H:%M"), "timestamp": ts_secs, "preview": preview, "from": fromusr, "chat": chatname, }); if ftype == 5 { if let Some(url) = extract_favorite_url(&content) { item["url"] = Value::String(url); } } item }) .collect(); Ok::<_, anyhow::Error>(rows) }) .await??; Ok(json!({ "count": rows.len(), "items": rows, })) } /// 聊天统计:消息总数、类型分布、发言排行、24小时分布 pub async fn q_stats( db: &DbCache, names: &Names, chat: &str, since: Option, until: Option, with_meta: bool, debug_source: bool, ) -> Result { let username = resolve_username(chat, names).with_context(|| format!("找不到联系人: {}", chat))?; let display = names.display(&username); let chat_type = chat_type_of(&username, names); let is_group = chat_type == "group"; let (shards, scanned) = find_msg_shards(db, names, &username).await?; if shards.is_empty() { anyhow::bail!("找不到 {} 的消息记录", display); } // 跨所有分片 DB 累计统计 let mut total: i64 = 0; let mut type_counts: HashMap = HashMap::new(); let mut sender_counts: HashMap = HashMap::new(); let mut hour_counts = [0i64; 24]; let group_nicknames = if is_group { load_group_nicknames(db, &username) .await .unwrap_or_default() } else { HashMap::new() }; let mut shard_hits = 0usize; for shard in &shards { let path = shard.path.clone(); let tname = shard.table.clone(); let uname = username.clone(); let is_group2 = is_group; // 用 SQL GROUP BY 在数据库侧聚合,避免把全量消息内容加载进内存 let result: (i64, HashMap, HashMap, [i64; 24]) = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path)?; let id2u = load_id2u(&conn); let mut clauses = Vec::new(); let mut params: Vec> = Vec::new(); if let Some(s) = since { clauses.push("create_time >= ?"); params.push(Box::new(s)); } if let Some(u) = until { clauses.push("create_time <= ?"); params.push(Box::new(u)); } let where_clause = if clauses.is_empty() { String::new() } else { format!("WHERE {}", clauses.join(" AND ")) }; let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect(); // 1. 总数 let count: i64 = conn.query_row( &format!("SELECT COUNT(*) FROM [{}] {}", tname, where_clause), params_ref.as_slice(), |row| row.get(0), ).unwrap_or(0); // 2. 类型分布:SQL GROUP BY,不加载消息内容 let type_sql = format!( "SELECT (local_type & 0xFFFFFFFF), COUNT(*) FROM [{}] {} GROUP BY (local_type & 0xFFFFFFFF)", tname, where_clause ); let mut type_c: HashMap = HashMap::new(); if let Ok(mut stmt) = conn.prepare(&type_sql) { let _ = stmt.query_map(params_ref.as_slice(), |row| { Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)) }).map(|rows| { for r in rows.flatten() { *type_c.entry(fmt_type(r.0)).or_insert(0) += r.1; } }); } // 3. 小时分布:只取时间戳,不加载消息内容 let hour_sql = format!( "SELECT create_time FROM [{}] {}", tname, where_clause ); let mut hour_c = [0i64; 24]; if let Ok(mut stmt) = conn.prepare(&hour_sql) { let _ = stmt.query_map(params_ref.as_slice(), |row| row.get::<_, i64>(0)) .map(|rows| { for ts in rows.flatten() { if let Some(dt) = Local.timestamp_opt(ts, 0).single() { let h = dt.hour() as usize; if h < 24 { hour_c[h] += 1; } } } }); } // 4. 发言排行:只取 real_sender_id,不加载消息内容 // where_clause 可能已含 WHERE,用 AND 追加而非重复写 WHERE let sender_filter = if where_clause.is_empty() { "WHERE real_sender_id > 0".to_string() } else { format!("{} AND real_sender_id > 0", where_clause) }; let sender_sql = format!( "SELECT real_sender_id, COUNT(*) FROM [{}] {} GROUP BY real_sender_id", tname, sender_filter ); let mut sender_c: HashMap = HashMap::new(); if is_group2 { if let Ok(mut stmt) = conn.prepare(&sender_sql) { let _ = stmt.query_map(params_ref.as_slice(), |row| { Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)) }).map(|rows| { for (id, cnt) in rows.flatten() { if let Some(u) = id2u.get(&id) { if u != &uname { *sender_c.entry(u.clone()).or_insert(0) += cnt; } } } }); } } Ok::<_, anyhow::Error>((count, type_c, sender_c, hour_c)) }).await??; let (count, type_c, sender_c, hour_c) = result; if count > 0 { shard_hits += 1; } total += count; for (k, v) in type_c { *type_counts.entry(k).or_insert(0) += v; } for (k, v) in sender_c { *sender_counts.entry(k).or_insert(0) += v; } for i in 0..24 { hour_counts[i] += hour_c[i]; } } // 类型分布,按数量降序 let mut by_type: Vec = type_counts .iter() .map(|(t, c)| json!({ "type": t, "count": c })) .collect(); by_type.sort_by_key(|v| std::cmp::Reverse(v["count"].as_i64().unwrap_or(0))); // 发言排行,Top 10 let top_senders = group_top_senders(&sender_counts, &names.map, &group_nicknames, 10); // 24小时分布 let by_hour: Vec = hour_counts .iter() .enumerate() .map(|(h, c)| json!({ "hour": h, "count": c })) .collect(); let windowed = since.is_some() || until.is_some(); let unknown_shards = current_unknown_shards(db, names); let session_ts = session_last_timestamp(db, &username).await; let meta = meta_for_shards( scanned, &shards, shard_hits, unknown_shards, session_ts, windowed, with_meta, debug_source, ); Ok(json!({ "chat": display, "username": username, "is_group": is_group, "chat_type": chat_type, "total": total, "by_type": by_type, "top_senders": top_senders, "by_hour": by_hour, "meta": meta, })) } /// 查询朋友圈互动通知(点赞 + 评论),对应微信 app 右上角的红点入口。 /// 空 `content` 是点赞,非空是评论正文。 pub async fn q_sns_notifications( db: &DbCache, names: &Names, limit: usize, since: Option, until: Option, include_read: bool, ) -> Result { let path = db.get("sns/sns.db").await?.context("无法解密 sns.db")?; let path2 = path.clone(); type Row = (i64, i64, i64, i64, String, String, String); let rows: Vec = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path2)?; let mut clauses: Vec<&str> = Vec::new(); let mut params: Vec> = Vec::new(); if !include_read { clauses.push("is_unread = 1"); } if let Some(s) = since { clauses.push("create_time >= ?"); params.push(Box::new(s)); } if let Some(u) = until { clauses.push("create_time <= ?"); params.push(Box::new(u)); } let where_clause = if clauses.is_empty() { String::new() } else { format!("WHERE {}", clauses.join(" AND ")) }; let sql = format!( "SELECT local_id, create_time, type, feed_id, from_username, from_nickname, content FROM SnsMessage_tmp3 {} ORDER BY create_time DESC LIMIT ?", where_clause ); params.push(Box::new(limit as i64)); let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect(); let mut stmt = conn.prepare(&sql)?; let rows = stmt .query_map(params_ref.as_slice(), |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2).unwrap_or(0), row.get::<_, i64>(3).unwrap_or(0), row.get::<_, String>(4).unwrap_or_default(), row.get::<_, String>(5).unwrap_or_default(), row.get::<_, String>(6).unwrap_or_default(), )) })? .collect::>>()?; Ok::<_, anyhow::Error>(rows) }) .await??; // 一次性取出涉及的 feed 原帖,避免 N+1 查询 let feed_ids: Vec = { let mut v: Vec = rows.iter().map(|r| r.3).collect(); v.sort_unstable(); v.dedup(); v }; let path3 = path.clone(); let feed_ids_clone = feed_ids.clone(); let feeds: HashMap = tokio::task::spawn_blocking(move || { if feed_ids_clone.is_empty() { return Ok::<_, anyhow::Error>(HashMap::new()); } let conn = Connection::open(&path3)?; let placeholders = std::iter::repeat("?") .take(feed_ids_clone.len()) .collect::>() .join(","); let sql = format!( "SELECT tid, user_name, content FROM SnsTimeLine WHERE tid IN ({})", placeholders ); let params: Vec<&dyn rusqlite::types::ToSql> = feed_ids_clone .iter() .map(|id| id as &dyn rusqlite::types::ToSql) .collect(); let mut stmt = conn.prepare(&sql)?; let mut map = HashMap::new(); let mut rows2 = stmt.query(params.as_slice())?; while let Some(row) = rows2.next()? { let tid: i64 = row.get(0)?; let author: String = row.get::<_, String>(1).unwrap_or_default(); let content: String = row.get::<_, String>(2).unwrap_or_default(); let preview = extract_xml_text(&content, "contentDesc") .map(|s| s.chars().take(60).collect::()) .unwrap_or_default(); // 原帖 user_name 偶尔为空(转发帖),再从 XML 兜一下 let author = if author.is_empty() { extract_xml_text(&content, "username").unwrap_or_default() } else { author }; map.insert(tid, (author, preview)); } Ok(map) }) .await??; let mut out = Vec::with_capacity(rows.len()); for (_local_id, ct, _typ, fid, from_u, from_nick, content) in rows { let kind = if content.trim().is_empty() { "like" } else { "comment" }; let display = if !from_nick.is_empty() { from_nick.clone() } else { names.display(&from_u) }; let (feed_author_u, feed_preview) = feeds.get(&fid).cloned().unwrap_or_default(); let feed_author_display = if feed_author_u.is_empty() { String::new() } else { names.display(&feed_author_u) }; out.push(json!({ "type": kind, "time": fmt_time(ct, "%m-%d %H:%M"), "timestamp": ct, "from_username": from_u, "from_nickname": display, "content": content, "feed_id": fid, "feed_author_username": feed_author_u, "feed_author": feed_author_display, "feed_preview": feed_preview, })); } let total = out.len(); Ok(json!({ "notifications": out, "total": total })) } // 朋友圈扫描的硬上限:单次查询最多解析这么多行 SnsTimeLine, // 防止用户传超大 limit 或者底层数据异常时把 daemon 卡住。 // 当前账号 ~10k+ 帖子,5w 上限留足缓冲。 const SNS_MAX_LIMIT: usize = 10_000; const SNS_MAX_SCAN: usize = 50_000; /// 转义 SQL LIKE 模式中的元字符。配合 `ESCAPE '\\'` 使用。 /// 反斜杠必须最先转义,否则后续替换出的 `\%` / `\_` 会被再次吞掉。 fn escape_like_pattern(s: &str) -> String { s.replace('\\', r"\\") .replace('%', r"\%") .replace('_', r"\_") } fn xml_child<'a, 'input>(node: Node<'a, 'input>, tag: &str) -> Option> { node.children() .find(|child| child.is_element() && child.has_tag_name(tag)) } fn xml_text<'a, 'input>(node: Option>) -> Option { node.and_then(|n| n.text()) .map(str::trim) .filter(|s| !s.is_empty()) .map(str::to_string) } fn xml_attr<'a, 'input>(node: Option>, attr: &str) -> Option { node.and_then(|n| n.attribute(attr)) .map(str::trim) .filter(|s| !s.is_empty()) .map(str::to_string) } fn insert_media_string(out: &mut serde_json::Map, key: &str, value: Option) { if let Some(value) = value { out.insert(key.to_string(), Value::String(value)); } } fn insert_media_i64(out: &mut serde_json::Map, key: &str, value: Option) { if let Some(value) = value { out.insert(key.to_string(), Value::from(value)); } } /// 从已经定位到的 `` 节点里抽 `/` 数组。 /// 字段名与 artifacts 仓库 `wechat_sns_dump.py::_parse_media` 对齐, /// 便于跨实现 diff。缺失字段直接省略(不输出 null),供下游代理图片 / 离线渲染。 fn parse_media_from_timeline(timeline: Node) -> Vec { let Some(media_list) = xml_child(timeline, "ContentObject").and_then(|node| xml_child(node, "mediaList")) else { return Vec::new(); }; media_list .children() .filter(|node| node.is_element() && node.has_tag_name("media")) .map(|media| { let url_el = xml_child(media, "url"); let thumb_el = xml_child(media, "thumb"); let size_el = xml_child(media, "size"); let mut out = serde_json::Map::new(); insert_media_string(&mut out, "type", xml_text(xml_child(media, "type"))); insert_media_string(&mut out, "sub_type", xml_text(xml_child(media, "sub_type"))); insert_media_string(&mut out, "url", xml_text(url_el)); insert_media_string(&mut out, "thumb", xml_text(thumb_el)); insert_media_string(&mut out, "md5", xml_attr(url_el, "md5")); insert_media_string(&mut out, "url_key", xml_attr(url_el, "key")); insert_media_string(&mut out, "url_token", xml_attr(url_el, "token")); insert_media_string(&mut out, "url_enc_idx", xml_attr(url_el, "enc_idx")); insert_media_string(&mut out, "thumb_key", xml_attr(thumb_el, "key")); insert_media_string(&mut out, "thumb_token", xml_attr(thumb_el, "token")); insert_media_string(&mut out, "thumb_enc_idx", xml_attr(thumb_el, "enc_idx")); insert_media_i64( &mut out, "width", xml_attr(size_el, "width").and_then(|v| v.parse::().ok()), ); insert_media_i64( &mut out, "height", xml_attr(size_el, "height").and_then(|v| v.parse::().ok()), ); insert_media_i64( &mut out, "total_size", xml_attr(size_el, "totalSize").and_then(|v| v.parse::().ok()), ); insert_media_string( &mut out, "video_md5", xml_text(xml_child(media, "videomd5")), ); insert_media_i64( &mut out, "video_duration", xml_text(xml_child(media, "videoDuration")).and_then(|v| v.parse::().ok()), ); Value::Object(out) }) .collect() } /// 从 `SnsTimeLine.content` 整段 XML 抽 media[]。仅供单测使用 —— 生产路径走 /// `parse_post_xml`,那边已经把整份 doc parse 一次直接复用 timeline 节点。 #[cfg(test)] fn parse_post_media(xml: &str) -> Vec { let Ok(doc) = Document::parse(xml) else { return Vec::new(); }; let Some(timeline) = doc.descendants().find(|n| n.has_tag_name("TimelineObject")) else { return Vec::new(); }; parse_media_from_timeline(timeline) } /// SnsTimeLine 行解析产物。不含 display name(依赖 Names,需要出 spawn_blocking 再补)。 struct ParsedPost { tid: i64, create_time: i64, author_username: String, content: String, media: Vec, location: String, } fn parse_post_xml_fallback(tid: i64, user_name_column: &str, content: &str) -> ParsedPost { let create_time = extract_xml_text(content, "createTime") .and_then(|s| s.parse::().ok()) .unwrap_or(0); let text = extract_xml_text(content, "contentDesc") .map(|s| unescape_html(&s)) .unwrap_or_default(); let author_username = if user_name_column.is_empty() { extract_xml_text(content, "username") .map(|s| unescape_html(&s)) .unwrap_or_default() } else { user_name_column.to_string() }; let location = extract_xml_attr(content, "location", "poiName") .map(|s| unescape_html(&s)) .unwrap_or_default(); ParsedPost { tid, create_time, author_username, content: text, media: Vec::new(), location, } } /// 纯 XML 解析,无 Names 依赖,可以在 spawn_blocking 里跑。 /// user_name_column 为空时从 TimelineObject/ 兜底(转发帖)。 /// /// 单 roxmltree DOM 解析一次出全部字段(createTime / contentDesc / username / media / location), /// 取代旧版 regex + DOM 双解析。XML entity 解码(`<` / `&` 等)由 roxmltree 自动处理, /// 旧版 `extract_xml_text` 是字符串扫描不解码 —— 因此 `content` / `location` / `username` 字段 /// 现在会输出解码后的文本,对下游是更正确的语义。 /// 如果 XML 已损坏到无法 DOM parse,或缺少 `TimelineObject`,则退回轻量 string /// fallback,尽量保住 createTime / contentDesc / username / location,避免一条帖子 /// 因为局部坏 XML 被整体打成零值,影响排序 / 搜索 / 作者过滤语义。 fn parse_post_xml(tid: i64, user_name_column: &str, content: &str) -> ParsedPost { let Ok(doc) = Document::parse(content) else { return parse_post_xml_fallback(tid, user_name_column, content); }; let Some(timeline) = doc.descendants().find(|n| n.has_tag_name("TimelineObject")) else { return parse_post_xml_fallback(tid, user_name_column, content); }; let create_time = xml_text(xml_child(timeline, "createTime")) .and_then(|s| s.parse::().ok()) .unwrap_or(0); let text = xml_text(xml_child(timeline, "contentDesc")).unwrap_or_default(); let author_username = if user_name_column.is_empty() { xml_text(xml_child(timeline, "username")).unwrap_or_default() } else { user_name_column.to_string() }; let media = parse_media_from_timeline(timeline); let location = xml_child(timeline, "location") .and_then(|n| n.attribute("poiName")) .map(str::to_string) .unwrap_or_default(); ParsedPost { tid, create_time, author_username, content: text, media, location, } } fn post_to_value(p: ParsedPost, names: &Names) -> Value { let author = if p.author_username.is_empty() { String::new() } else { names.display(&p.author_username) }; json!({ "tid": p.tid, "timestamp": p.create_time, "time": fmt_time(p.create_time, "%Y-%m-%d %H:%M"), "author_username": p.author_username, "author": author, "content": p.content, "media_count": p.media.len() as i64, "media": p.media, "location": p.location, }) } /// 查询朋友圈时间线:按时间/作者筛选。用于浏览自己或好友的朋友圈。 pub async fn q_sns_feed( db: &DbCache, names: &Names, limit: usize, since: Option, until: Option, user: Option<&str>, ) -> Result { let path = db.get("sns/sns.db").await?.context("无法解密 sns.db")?; let limit = limit.min(SNS_MAX_LIMIT); let user_uname = match user { Some(q) => { Some(resolve_username(q, names).with_context(|| format!("找不到联系人: {}", q))?) } None => None, }; // user 过滤不在 SQL 层做:SnsTimeLine.user_name 列对部分(转发)帖子是空, // 真正作者只在 XML 里。SQL 层 `user_name = ?` 会把这部分提前漏掉, // 让 parse_post_xml 的 fallback 失效。所以扫全表 → parse → 用 ParsedPost.author_username 过滤。 // (createTime 也不是列,本来就要扫全表 parse XML 才能正确按时间排序。) let path2 = path.clone(); let parsed: Vec = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path2)?; let sql = "SELECT tid, user_name, content FROM SnsTimeLine ORDER BY tid DESC"; let mut stmt = conn.prepare(sql)?; let rows = stmt.query_map([], |row| Ok(( row.get::<_, i64>(0)?, row.get::<_, String>(1).unwrap_or_default(), row.get::<_, String>(2).unwrap_or_default(), )))?; let mut scanned = 0usize; let mut out: Vec = Vec::new(); for row in rows { scanned += 1; if scanned > SNS_MAX_SCAN { eprintln!( "[sns_feed] scan 超过硬上限 {},结果可能不完整。建议加 --user / --since 缩小范围。", SNS_MAX_SCAN ); break; } let (tid, uname, content) = row?; let p = parse_post_xml(tid, &uname, &content); if let Some(u) = user_uname.as_ref() { if &p.author_username != u { continue; } } if let Some(s) = since { if p.create_time < s { continue; } } if let Some(u) = until { if p.create_time > u { continue; } } out.push(p); } // tid DESC 不严格等于 createTime DESC(不同账号 tid 生成算法不同), // 所以要先收齐全部匹配的、按 create_time 排序,再 truncate —— 否则会丢帖。 out.sort_by_key(|p| std::cmp::Reverse(p.create_time)); out.truncate(limit); Ok::<_, anyhow::Error>(out) }).await??; let posts: Vec = parsed .into_iter() .map(|p| post_to_value(p, names)) .collect(); let total = posts.len(); Ok(json!({ "posts": posts, "total": total })) } /// 搜索朋友圈全文:在 contentDesc(正文)里匹配 keyword,可叠加时间 / 作者过滤。 pub async fn q_sns_search( db: &DbCache, names: &Names, keyword: &str, limit: usize, since: Option, until: Option, user: Option<&str>, ) -> Result { if keyword.trim().is_empty() { anyhow::bail!("搜索关键词不能为空"); } let path = db.get("sns/sns.db").await?.context("无法解密 sns.db")?; let limit = limit.min(SNS_MAX_LIMIT); let user_uname = match user { Some(q) => { Some(resolve_username(q, names).with_context(|| format!("找不到联系人: {}", q))?) } None => None, }; // SQL LIKE 在 content 上粗筛 keyword(这步省掉绝大多数行的 XML parse 开销)。 // user 不在 SQL 层过滤,原因同 q_sns_feed:SnsTimeLine.user_name 列对部分(转发) // 帖子为空,真实作者只在 XML 里。 let like_pattern = format!("%{}%", escape_like_pattern(keyword)); let keyword_owned = keyword.to_string(); let path2 = path.clone(); let parsed: Vec = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path2)?; let sql = "SELECT tid, user_name, content FROM SnsTimeLine \ WHERE content LIKE ? ESCAPE '\\' ORDER BY tid DESC"; let mut stmt = conn.prepare(sql)?; let rows = stmt.query_map([&like_pattern], |row| Ok(( row.get::<_, i64>(0)?, row.get::<_, String>(1).unwrap_or_default(), row.get::<_, String>(2).unwrap_or_default(), )))?; let needle = keyword_owned.to_lowercase(); let mut scanned = 0usize; let mut out: Vec = Vec::new(); for row in rows { scanned += 1; if scanned > SNS_MAX_SCAN { eprintln!( "[sns_search] scan 超过硬上限 {},结果可能不完整。建议缩小 keyword 或加 --user / --since。", SNS_MAX_SCAN ); break; } let (tid, uname, content) = row?; let desc = extract_xml_text(&content, "contentDesc").unwrap_or_default(); if !desc.to_lowercase().contains(&needle) { continue; } let p = parse_post_xml(tid, &uname, &content); if let Some(u) = user_uname.as_ref() { if &p.author_username != u { continue; } } if let Some(s) = since { if p.create_time < s { continue; } } if let Some(u) = until { if p.create_time > u { continue; } } out.push(p); } out.sort_by_key(|p| std::cmp::Reverse(p.create_time)); out.truncate(limit); Ok::<_, anyhow::Error>(out) }).await??; let posts: Vec = parsed .into_iter() .map(|p| post_to_value(p, names)) .collect(); let total = posts.len(); Ok(json!({ "keyword": keyword, "posts": posts, "total": total })) } // ─── 公众号文章查询 ─────────────────────────────────────────────────────────── /// 一条公众号文章的解析产物 #[derive(Debug)] struct BizArticle { /// 接收该推送的时间戳(即消息的 create_time) recv_time: i64, /// 公众号 username account_username: String, /// 文章标题 title: String, /// 文章链接 url: String, /// 摘要 digest: String, /// 封面图 cover: String, /// 文章发布时间(pub_time,单位秒) pub_time: i64, } /// 从 biz_message 表的单条 XML 解析出全部 article items fn parse_biz_xml_items(recv_time: i64, account_username: &str, xml: &str) -> Vec { let mut items = Vec::new(); let mut search_from = 0; loop { let Some(item_start) = xml[search_from..].find("") else { break; }; let abs_start = search_from + item_start; let Some(item_end) = xml[abs_start..].find("") else { break; }; let abs_end = abs_start + item_end + 7; let item_xml = &xml[abs_start..abs_end]; let title = extract_cdata(item_xml, "title").unwrap_or_default(); let url = extract_cdata(item_xml, "url").unwrap_or_default(); // Skip items with no URL or empty title (e.g. payment entries) if url.is_empty() || title.is_empty() { search_from = abs_end; continue; } let digest = extract_cdata(item_xml, "digest").unwrap_or_default(); let cover = extract_cdata(item_xml, "cover").unwrap_or_default(); let pub_time = extract_xml_text(item_xml, "pub_time") .and_then(|s| s.parse::().ok()) .unwrap_or(recv_time); items.push(BizArticle { recv_time, account_username: account_username.to_string(), title, url, digest, cover, pub_time, }); search_from = abs_end; } items } /// 提取 CDATA 或普通文本内容: `` 或 `...` /// /// 注意: 内容匹配到 `` 之前的内容。CDATA 块中的 "]]"已在 "]]\x3e" 之前, /// 所以 inner 为 `` 或 `" 被 close tag 吸掉) fn extract_cdata(xml: &str, tag: &str) -> Option { let open = format!("<{}>", tag); let close = format!("", tag); let start = xml.find(&open)? + open.len(); let end = xml[start..].find(&close)?; let inner = xml[start..start + end].trim(); if inner.starts_with("` → strip 9-char `` suffix let body = &inner[9..]; // Strip `]]>` (normal) or `]]` (edge case) let cdata_end = b"]]>"; let cdata_end2 = b"]]"; let content: &str = if body.as_bytes().ends_with(cdata_end) { &body[..body.len() - 3] } else if body.as_bytes().ends_with(cdata_end2) { &body[..body.len() - 2] } else { body }; let content = content.trim(); if content.is_empty() { None } else { Some(content.to_string()) } } else if inner.is_empty() { None } else { Some(unescape_html(inner)) } } /// 查询公众号文章推送(biz_message_*.db 分片) /// /// 每条消息可能包含多篇文章(多图文推送)。返回所有文章展开就的平底列表。 pub async fn q_biz_articles( db: &DbCache, names: &Names, limit: usize, account: Option, since: Option, until: Option, unread: bool, ) -> Result { let mut biz_paths = Vec::new(); for rel_key in &names.biz_msg_db_keys { if let Some(path) = db.get(rel_key).await? { biz_paths.push(path); } } if biz_paths.is_empty() { return Err(anyhow::anyhow!( "无法解密任何 biz_message_*.db,请确认 all_keys.json 包含对应密钥" )); } // 开启 --unread:从 session.db 拿“公众号 + unread_count>0”的 username 子集, // 作为合集过滤(与 --account 取交集),后续结果按 account_username 去重取顶 1 篇。 let unread_usernames: Option> = if unread { let session_path = db .get("session/session.db") .await? .context("无法解密 session.db")?; let session_path2 = session_path.clone(); let unread_rows: Vec = tokio::task::spawn_blocking(move || { let conn = Connection::open(&session_path2)?; let mut stmt = conn.prepare("SELECT username FROM SessionTable WHERE unread_count > 0")?; let rows: Vec = stmt .query_map([], |row| row.get::<_, String>(0))? .filter_map(|r| r.ok()) .collect(); Ok::<_, anyhow::Error>(rows) }) .await??; // 仅保留公众号类型的未读会话 let set: std::collections::HashSet = unread_rows .into_iter() .filter(|u| chat_type_of(u, names) == "official_account") .collect(); if set.is_empty() { // 没有未读公众号 → 直接空返回,避免打 biz 表扫描 return Ok(json!({ "count": 0, "articles": [] })); } Some(set) } else { None }; // 1. 从全部 biz shard 的 Name2Id 表收集 username,再推导 md5 -> username let biz_paths2 = biz_paths.clone(); let biz_usernames: HashSet = tokio::task::spawn_blocking(move || { let mut usernames = HashSet::new(); for biz_path in biz_paths2 { let conn = Connection::open(&biz_path)?; let mut stmt = conn.prepare( "SELECT DISTINCT user_name FROM Name2Id \ WHERE user_name IS NOT NULL AND user_name != ''", )?; let rows: Vec = stmt .query_map([], |row| row.get::<_, String>(0))? .filter_map(|r| r.ok()) .collect(); usernames.extend(rows); } Ok::<_, anyhow::Error>(usernames) }) .await??; // 构建 md5(username) -> username 映射 let md5_to_uname: HashMap = biz_usernames .iter() .map(|u| (format!("{:x}", md5::compute(u.as_bytes())), u.clone())) .collect(); // 2. 如果 指定了 --account,找到匹配的 username 列表 let account_low = account.as_deref().map(|s| s.to_lowercase()); let mut target_usernames: Option> = account_low.as_ref().map(|low| { biz_usernames .iter() .filter(|u| { let display = names.display(u); display.to_lowercase().contains(low.as_str()) || u.to_lowercase().contains(low.as_str()) }) .cloned() .collect() }); // --unread 与 --account 取交集(进一步缩小范围) if let Some(ref unread_set) = unread_usernames { target_usernames = Some(match target_usernames.take() { Some(acc_list) => acc_list .into_iter() .filter(|u| unread_set.contains(u)) .collect(), None => unread_set.iter().cloned().collect(), }); // 交集为空 → 提前返回 if target_usernames .as_ref() .map(|v| v.is_empty()) .unwrap_or(false) { return Ok(json!({ "count": 0, "articles": [] })); } } // 3. 进行数据库查询 let biz_paths3 = biz_paths; let since2 = since; let until2 = until; let target_hashes: Option> = target_usernames.as_ref().map(|unames| { unames .iter() .map(|u| format!("{:x}", md5::compute(u.as_bytes()))) .collect() }); let rows: Vec<(String, i64, i64, Vec, i64)> = tokio::task::spawn_blocking(move || { let re = regex::Regex::new(r"^Msg_[0-9a-f]{32}$").unwrap(); let mut all_rows: Vec<(String, i64, i64, Vec, i64)> = Vec::new(); for biz_path in biz_paths3 { let conn = Connection::open(&biz_path)?; let mut stmt = conn.prepare( "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'", )?; let table_names: Vec = stmt .query_map([], |row| row.get(0))? .filter_map(|r| r.ok()) .collect(); for tname in &table_names { if !re.is_match(tname) { continue; } let hash = &tname[4..]; // account 过滤 if let Some(ref hashes) = target_hashes { if !hashes.iter().any(|h| h == hash) { continue; } } let username = md5_to_uname.get(hash).cloned().unwrap_or_default(); // 构建过滤条件 let mut clauses: Vec = Vec::new(); let mut params: Vec> = Vec::new(); // local_type & 0xFFFFFFFF = 49 是 appmsg(公众号文章) clauses.push("(local_type & 4294967295) = 49".to_string()); if let Some(s) = since2 { clauses.push("create_time >= ?".to_string()); params.push(Box::new(s)); } if let Some(u) = until2 { clauses.push("create_time <= ?".to_string()); params.push(Box::new(u)); } let where_clause = format!("WHERE {}", clauses.join(" AND ")); let sql = format!( "SELECT create_time, WCDB_CT_message_content, message_content \ FROM [{}] {} ORDER BY create_time DESC", tname, where_clause ); let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect(); if let Ok(mut inner_stmt) = conn.prepare(&sql) { let msg_rows: Vec<_> = inner_stmt .query_map(params_ref.as_slice(), |row| { Ok(( username.clone(), row.get::<_, i64>(0)?, row.get::<_, i64>(1).unwrap_or(0), get_content_bytes(row, 2), 0i64, )) }) .map(|it| it.filter_map(|r| r.ok()).collect()) .unwrap_or_default(); all_rows.extend(msg_rows); } } } Ok::<_, anyhow::Error>(all_rows) }) .await??; // 4. 解压并解析 XML let mut articles: Vec = Vec::new(); for (username, recv_time, ct, content_bytes, _) in rows { let content = decompress_message(&content_bytes, ct); if content.is_empty() { continue; } let items = parse_biz_xml_items(recv_time, &username, &content); articles.extend(items); } // 5. 按 pub_time DESC 排序 articles.sort_by_key(|a| std::cmp::Reverse(a.pub_time)); // --unread 语义 A:每个公众号只保留最新 1 篇(已按 pub_time 排序,取首条即可) if unread { let mut seen = std::collections::HashSet::::new(); articles.retain(|a| seen.insert(a.account_username.clone())); } articles.truncate(limit); let results: Vec = articles .into_iter() .map(|a| { let account_display = names.display(&a.account_username); json!({ "time": fmt_time(a.pub_time, "%Y-%m-%d %H:%M"), "timestamp": a.pub_time, "recv_time": a.recv_time, "recv_time_str": fmt_time(a.recv_time, "%Y-%m-%d %H:%M"), "account": account_display, "account_username": a.account_username, "title": a.title, "url": a.url, "digest": a.digest, "cover_url": a.cover, }) }) .collect(); Ok(json!({ "count": results.len(), "articles": results })) } // ─── 附件(当前先支持图片)查询与提取 ───────────────────────────────── // // 设计要点: // - `q_attachments` 只走 `Msg_` 表,按 `local_type & 0xFFFFFFFF IN (...)` 过滤 // 出附件消息行,再编出 `attachment_id`。**不**去翻 `message_resource.db`,因为列出动作 // 要可枚举几千条;resource lookup 留到 `q_extract` 才做。 // - `q_extract` 走完整链:`AttachmentId` → `message_resource.db` 查 md5 → // `/msg/attach/...` 找 .dat → 按 magic 分发到 v1/v2 decoder → 写盘。 // - V2 image AES key 通过 `image_key::default_provider()` 拿(codex 后续填实现)。 // 缺 key 时 V2 解码会返回明确错误,CLI 直接抛给用户。 /// 列出某会话内的附件消息(当前仅 image)。返回每条的 `attachment_id`, /// 后续传给 `Extract` 才真正读 message_resource.db + 解密 .dat。 pub async fn q_attachments( db: &DbCache, names: &Names, chat: &str, kinds: Option>, limit: usize, offset: usize, since: Option, until: Option, with_meta: bool, debug_source: bool, ) -> Result { use crate::attachment::{AttachmentId, AttachmentKind}; let username = resolve_username(chat, names).with_context(|| format!("找不到联系人: {}", chat))?; let display = names.display(&username); let chat_type = chat_type_of(&username, names); let is_group = chat_type == "group"; // 解析 kinds → 低 32 bit local_type 集合 let kind_filters: Vec<(AttachmentKind, i64)> = parse_attachment_kinds(kinds.as_deref())?; if kind_filters.is_empty() { anyhow::bail!("kinds 为空 — 当前至少传一种 image"); } let lo32_types: Vec = kind_filters.iter().map(|(_, t)| *t).collect(); // local_type → AttachmentKind 反查(mask 完后定 kind) let type_to_kind: HashMap = kind_filters.iter().map(|(k, t)| (*t, *k)).collect(); let (shards, scanned) = find_msg_shards(db, names, &username).await?; if shards.is_empty() { anyhow::bail!("找不到 {} 的消息记录", display); } // 群聊需要 sender 显示名 let group_nicknames = if is_group { load_group_nicknames(db, &username) .await .unwrap_or_default() } else { HashMap::new() }; let mut all_rows: Vec<(i64, i64, i64, i64, String, String, i64, i64)> = Vec::new(); let mut shard_hits = 0usize; // 元组:(local_id, local_type_lo32, create_time, real_sender_id, sender_label, // sender_username, ts_for_sort, db_idx) // sender_username 是稳定 wxid,用来让 sender_contact_display / sender_group_nickname // 落在 attachment row 上(消除"两个同名成员的图分不清谁发的"歧义)。 for (db_idx, shard) in shards.iter().enumerate() { let path = shard.path.clone(); let tname = shard.table.clone(); let uname = username.clone(); let is_group2 = is_group; let names_map = names.map.clone(); let group_nicknames2 = group_nicknames.clone(); let lo32_types2 = lo32_types.clone(); let since2 = since; let until2 = until; // per-DB 软上限避免巨群全量加载 let per_db_cap = (offset + limit).max(limit) * 2; let db_idx2 = db_idx as i64; let rows: Vec<(i64, i64, i64, i64, String, String, i64, i64)> = tokio::task::spawn_blocking(move || { let conn = Connection::open(&path)?; let id2u = load_id2u(&conn); // local_type 在 DB 里可能带高位 flag,过滤要 mask 低 32 bit let placeholders = lo32_types2 .iter() .map(|_| "?") .collect::>() .join(","); let mut clauses: Vec = vec![format!("(local_type & 4294967295) IN ({})", placeholders)]; let mut params: Vec> = lo32_types2 .iter() .map(|t| Box::new(*t) as Box) .collect(); if let Some(s) = since2 { clauses.push("create_time >= ?".into()); params.push(Box::new(s)); } if let Some(u) = until2 { clauses.push("create_time <= ?".into()); params.push(Box::new(u)); } let where_clause = format!("WHERE {}", clauses.join(" AND ")); let sql = format!( "SELECT local_id, local_type, create_time, real_sender_id, message_content, WCDB_CT_message_content FROM [{}] {} ORDER BY create_time DESC LIMIT ?", tname, where_clause ); params.push(Box::new(per_db_cap as i64)); let params_ref: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect(); let mut stmt = conn.prepare(&sql)?; let rows: Vec<(i64, i64, i64, i64, String, String, i64, i64)> = stmt .query_map(params_ref.as_slice(), |row| { let local_id: i64 = row.get(0)?; let raw_type: i64 = row.get(1)?; let lo32 = (raw_type as u64 & 0xFFFFFFFF) as i64; let ts: i64 = row.get(2)?; let real_sender_id: i64 = row.get(3)?; let content_bytes = get_content_bytes(row, 4); let ct: i64 = row.get::<_, i64>(5).unwrap_or(0); let content = decompress_message(&content_bytes, ct); let (sender, sender_uname) = if is_group2 { ( sender_label( real_sender_id, &content, true, &uname, &id2u, &names_map, &group_nicknames2, ), sender_username( real_sender_id, &content, true, &uname, &id2u, ), ) } else { (String::new(), String::new()) }; Ok((local_id, lo32, ts, real_sender_id, sender, sender_uname, ts, db_idx2)) })? .filter_map(|r| r.ok()) .collect(); Ok::<_, anyhow::Error>(rows) }) .await??; if !rows.is_empty() { shard_hits += 1; } all_rows.extend(rows); } // 全局按 ts DESC 排序后分页(ts_for_sort 在 tuple index 6) all_rows.sort_by_key(|r| std::cmp::Reverse(r.6)); let paged: Vec<_> = all_rows.into_iter().skip(offset).take(limit).collect(); // 翻成 JSON let mut results: Vec = Vec::with_capacity(paged.len()); for (local_id, lo32, ts, _real_sender_id, sender, sender_uname, _ts2, _db_idx) in paged { let kind = type_to_kind .get(&lo32) .copied() .unwrap_or(AttachmentKind::Image); // 理论不会 fallthrough let id = AttachmentId { v: 1, chat: username.clone(), local_id, create_time: ts, kind, db: None, }; let id_str = id.encode()?; let mut row = json!({ "attachment_id": id_str, "kind": kind.as_str(), "type": fmt_type(lo32), "local_id": local_id, "timestamp": ts, "time": fmt_time(ts, "%Y-%m-%d %H:%M"), }); if is_group && !sender.is_empty() { row["sender"] = Value::String(sender); } add_sender_identity(&mut row, is_group, &sender_uname, &names.map, &group_nicknames); results.push(row); } let unknown_shards = current_unknown_shards(db, names); let session_ts = session_last_timestamp(db, &username).await; let meta = meta_for_shards( scanned, &shards, shard_hits, unknown_shards, session_ts, true, with_meta, debug_source, ); Ok(json!({ "chat": display, "username": username, "is_group": is_group, "chat_type": chat_type, "count": results.len(), "attachments": results, "meta": meta, })) } /// 解码 attachment_id → 查 message_resource.db → 找本地 .dat → 解密 → 写盘。 pub async fn q_extract( db: &DbCache, _names: &Names, attachment_id: &str, output: &str, overwrite: bool, ) -> Result { use crate::attachment::{ attachment_id::AttachmentId, decoder::{self, V2KeyMaterial}, image_key, resolver, }; let id = AttachmentId::decode(attachment_id) .context("解析 attachment_id 失败(不是合法 base64url(json)?)")?; let output_path = std::path::PathBuf::from(output); if output_path.exists() && !overwrite { anyhow::bail!( "目标已存在:{}(加 --overwrite 覆盖)", output_path.display() ); } if let Some(parent) = output_path.parent() { if !parent.as_os_str().is_empty() { tokio::fs::create_dir_all(parent) .await .with_context(|| format!("创建输出目录失败:{}", parent.display()))?; } } // 1) 拿 message_resource.db let resource_path = db .get("message/message_resource.db") .await? .context("无法解密 message_resource.db(请确认 all_keys.json 包含该 DB 的密钥)")?; // 2) 推 wxchat_base = db_dir.parent(),再拼 attach_root let wxchat_base = db .db_dir() .parent() .ok_or_else(|| anyhow::anyhow!("db_dir 没有 parent,无法推断 xwechat_files 根目录"))? .to_path_buf(); let attach_root = resolver::attach_root_for(&wxchat_base); // 3) blocking pool 跑 resolver + 读盘 + 解码 let id_for_task = id.clone(); let resource_path2 = resource_path.clone(); let attach_root2 = attach_root.clone(); let wxchat_base2 = wxchat_base.clone(); let output_path2 = output_path.clone(); let report: Value = tokio::task::spawn_blocking(move || -> Result { let resolved = resolver::resolve_blocking(&id_for_task, &resource_path2, &attach_root2)?; let dat_bytes = std::fs::read(&resolved.dat_path) .with_context(|| format!("读取 .dat 失败:{}", resolved.dat_path.display()))?; // V2 image key — 平台相关。`ImageKeyMaterial` 同时给 aes_key + xor_key。 // xor_key 不能硬编码 0x88:实测 macOS 真实账号上是 `uin & 0xff` 派生的(0xa2 等), // 所以这里桥接时必须把 provider 的 xor_key 透传给 V2KeyMaterial。 // 缺 key 时让 decoder 自己抛带诊断的错。 let provider = image_key::default_provider(); let key_material = if let Some(p) = provider.as_ref() { // 从 wxchat_base 末段拿 wxid let wxid = wxchat_base2 .file_name() .and_then(|s| s.to_str()) .unwrap_or_default() .to_string(); if wxid.is_empty() { None } else { match p.get_key(&wxid) { Ok(km) => Some(km), Err(e) => { eprintln!( "[extract] image key 提取失败 (wxid={}): {} — V2 文件将无法解码", wxid, e ); None } } } } else { None }; let v2_key = match key_material.as_ref() { Some(km) => V2KeyMaterial { aes_key: Some(&km.aes_key), xor_key: km.xor_key, }, None => V2KeyMaterial::default(), }; let decoded = decoder::dispatch(&dat_bytes, v2_key)?; // 写盘 std::fs::write(&output_path2, &decoded.data) .with_context(|| format!("写出文件失败:{}", output_path2.display()))?; // 注意:不要在这里塞 `ok: true`。dispatch 会用 Response::ok(v) 包一层, // Response 的 `data: Value` 字段是 #[serde(flatten)] 写出的,本 payload // 的 `ok` 会和 Response 自带的 `ok` 在线上拼成两个同名 key,CLI 反序列化时 // serde_json 直接报 "duplicate field",业务请求看上去像 daemon 解析失败。 Ok(json!({ "kind": id_for_task.kind.as_str(), "md5": resolved.md5, "dat_path": resolved.dat_path.display().to_string(), "dat_size": resolved.size, "output": output_path2.display().to_string(), "output_size": decoded.data.len(), "format": decoded.format, "decoder": decoded.decoder, })) }) .await??; Ok(report) } /// 解码本地朋友圈 cache//Sns/Img 中已加载的图片。 pub async fn q_sns_extract( db: &DbCache, output_dir: &str, month: Option<&str>, limit: usize, overwrite: bool, xor_key: u8, ) -> Result { let db_dir = db.db_dir().to_path_buf(); let output_dir = PathBuf::from(output_dir); let month = month.map(str::to_string); tokio::task::spawn_blocking(move || -> Result { use crate::attachment::{ decoder::{self, V2KeyMaterial}, image_key, }; let wxchat_base = db_dir .parent() .context("db_dir 缺少账号根目录,无法定位 cache/Sns/Img")? .to_path_buf(); let wxid = wxchat_base .file_name() .and_then(|s| s.to_str()) .context("无法从账号根目录推断 wxid")? .to_string(); let provider = image_key::default_provider().context("当前平台没有 V2 image key provider")?; let key_material = provider.get_key(&wxid)?; let v2_key = V2KeyMaterial { aes_key: Some(&key_material.aes_key), xor_key, }; fs::create_dir_all(&output_dir) .with_context(|| format!("创建输出目录失败: {}", output_dir.display()))?; let roots = sns_img_roots(&wxchat_base, month.as_deref())?; let mut files = Vec::new(); for root in roots { collect_sns_img_files(&root, &mut files)?; } files.sort_by_key(|path| { fs::metadata(path) .and_then(|m| m.modified()) .unwrap_or(std::time::SystemTime::UNIX_EPOCH) }); files.reverse(); let mut exported = Vec::new(); let mut skipped_existing = 0usize; let mut skipped_unsupported = 0usize; let mut failed = Vec::new(); let max = limit.max(1); for path in files { if exported.len() >= max { break; } let bytes = match fs::read(&path) { Ok(v) => v, Err(e) => { failed.push(json!({"path": path.display().to_string(), "error": e.to_string()})); continue; } }; if !bytes.starts_with(&decoder::V2_MAGIC) { skipped_unsupported += 1; continue; } let decoded = match decoder::dispatch(&bytes, v2_key) { Ok(v) => v, Err(e) => { failed.push(json!({"path": path.display().to_string(), "error": e.to_string()})); continue; } }; let out_path = output_dir.join(format!("{}.{}", sns_output_stem(&path), decoded.format)); if out_path.exists() && !overwrite { skipped_existing += 1; continue; } fs::write(&out_path, &decoded.data) .with_context(|| format!("写出 SNS 图片失败: {}", out_path.display()))?; exported.push(json!({ "source": path.display().to_string(), "output": out_path.display().to_string(), "format": decoded.format, "decoder": decoded.decoder, "output_size": decoded.data.len(), "xor_key": format!("0x{:02x}", xor_key), })); } let total_exported = exported.len(); let total_failed = failed.len(); Ok(json!({ "output_dir": output_dir.display().to_string(), "month": month, "exported": exported, "total_exported": total_exported, "skipped_existing": skipped_existing, "skipped_unsupported": skipped_unsupported, "failed": failed, "total_failed": total_failed, "xor_key": format!("0x{:02x}", xor_key), })) }) .await? } fn sns_img_roots(wxchat_base: &Path, month: Option<&str>) -> Result> { let cache_root = wxchat_base.join("cache"); if let Some(month) = month { let root = cache_root.join(month).join("Sns").join("Img"); return Ok(root.is_dir().then_some(root).into_iter().collect()); } let mut roots = Vec::new(); if !cache_root.is_dir() { return Ok(roots); } for entry in fs::read_dir(&cache_root)? { let path = entry?.path().join("Sns").join("Img"); if path.is_dir() { roots.push(path); } } Ok(roots) } fn collect_sns_img_files(dir: &Path, out: &mut Vec) -> Result<()> { for entry in fs::read_dir(dir)? { let path = entry?.path(); if path.is_dir() { collect_sns_img_files(&path, out)?; } else if path.is_file() { out.push(path); } } Ok(()) } fn sns_output_stem(path: &Path) -> String { let parent = path .parent() .and_then(|p| p.file_name()) .and_then(|s| s.to_str()) .unwrap_or("sns"); let name = path.file_name().and_then(|s| s.to_str()).unwrap_or("image"); format!("{}_{}", parent, name) } /// 解析 `kinds` 参数到 `(AttachmentKind, lo32_local_type)` 列表。 /// 当前只支持 image;命令名保留成 `attachments` 是为了后续扩到其他附件类型时不 break CLI。 fn parse_attachment_kinds( kinds: Option<&[String]>, ) -> Result> { use crate::attachment::AttachmentKind; let raw = kinds.unwrap_or(&[]); if raw.is_empty() { return Ok(vec![(AttachmentKind::Image, 3)]); } let mut out: Vec<(AttachmentKind, i64)> = Vec::with_capacity(raw.len()); let mut seen = HashSet::<&'static str>::new(); for k in raw { let (kind, t): (AttachmentKind, i64) = match k.to_ascii_lowercase().as_str() { "image" | "img" => (AttachmentKind::Image, 3), "voice" | "audio" | "video" | "file" => { anyhow::bail!( "当前只支持 image 提取;video/file/voice 的资源路径与 decoder 还没接通" ) } other => anyhow::bail!("未知附件类型:{}(当前仅支持 image)", other), }; if seen.insert(kind.as_str()) { out.push((kind, t)); } } Ok(out) } #[cfg(test)] mod biz_tests { use super::*; #[test] fn extract_cdata_normal() { let xml = "<![CDATA[TencentResearch]]>"; assert_eq!(extract_cdata(xml, "title"), Some("TencentResearch".into())); } #[test] fn extract_cdata_empty() { let xml = ""; assert_eq!(extract_cdata(xml, "cover"), None); } #[test] fn extract_cdata_url() { let xml = ""; let result = extract_cdata(xml, "url"); assert!(result.is_some()); let url = result.unwrap(); assert!(url.starts_with("http://mp.weixin.qq.com")); assert!(!url.contains("CDATA")); } #[test] fn extract_cdata_no_cdata_wrapper() { let xml = "1700000000"; assert_eq!(extract_cdata(xml, "pub_time"), Some("1700000000".into())); } #[test] fn parse_biz_xml_items_single_article() { let xml = r#" <![CDATA[Test Article Title]]> 1700000000 "#; let items = parse_biz_xml_items(1699999999, "gh_test123", xml); assert_eq!(items.len(), 1); assert_eq!(items[0].title, "Test Article Title"); assert_eq!(items[0].url, "http://mp.weixin.qq.com/s?test=1"); assert_eq!(items[0].digest, "Test Digest"); assert_eq!(items[0].pub_time, 1700000000); assert_eq!(items[0].account_username, "gh_test123"); } #[test] fn parse_biz_xml_items_skips_no_url() { let xml = r#" <![CDATA[Has Title No URL]]> 1700000001 "#; let items = parse_biz_xml_items(1700000001, "gh_test", xml); assert_eq!(items.len(), 0); } #[test] fn parse_biz_xml_items_multi_article() { let xml = r#" <![CDATA[Article 1]]> 1700000010 <![CDATA[Article 2]]> 1700000020 "#; let items = parse_biz_xml_items(1700000000, "gh_multi", xml); assert_eq!(items.len(), 2); assert_eq!(items[0].title, "Article 1"); assert_eq!(items[1].title, "Article 2"); } #[test] fn parse_biz_xml_items_pub_time_fallback() { // When pub_time is missing, should fall back to recv_time let xml = r#" <![CDATA[No PubTime]]> "#; let items = parse_biz_xml_items(1700000099, "gh_fallback", xml); assert_eq!(items.len(), 1); assert_eq!(items[0].pub_time, 1700000099); // falls back to recv_time } } #[cfg(test)] mod group_nickname_tests { use super::*; fn varint(mut value: u64) -> Vec { let mut out = Vec::new(); loop { let mut byte = (value & 0x7f) as u8; value >>= 7; if value != 0 { byte |= 0x80; } out.push(byte); if value == 0 { return out; } } } fn len_field(field_no: u64, bytes: &[u8]) -> Vec { let mut out = varint((field_no << 3) | 2); out.extend(varint(bytes.len() as u64)); out.extend(bytes); out } fn string_field(field_no: u64, value: &str) -> Vec { len_field(field_no, value.as_bytes()) } fn member_chunk(username: &str, group_nickname: &str) -> Vec { let mut member = Vec::new(); member.extend(string_field(1, username)); member.extend(string_field(2, group_nickname)); len_field(1, &member) } #[test] fn parses_group_nickname_member_chunks() { let mut ext_buffer = Vec::new(); ext_buffer.extend(member_chunk("wxid_alice", "Alice In Group")); ext_buffer.extend(member_chunk("bob_123456", "Bob Card")); let nicknames = parse_group_nickname_map(&ext_buffer, None); assert_eq!( nicknames.get("wxid_alice").map(String::as_str), Some("Alice In Group") ); assert_eq!( nicknames.get("bob_123456").map(String::as_str), Some("Bob Card") ); } #[test] fn target_filter_anchors_member_username_choice() { let mut member = Vec::new(); member.extend(string_field(3, "candidate_name")); member.extend(string_field(4, "wxid_target")); member.extend(string_field(2, "Target Card")); let ext_buffer = len_field(1, &member); let targets = HashSet::from(["wxid_target".to_string()]); let nicknames = parse_group_nickname_map(&ext_buffer, Some(&targets)); assert_eq!( nicknames.get("wxid_target").map(String::as_str), Some("Target Card") ); assert!(!nicknames.contains_key("candidate_name")); } #[test] fn ignores_non_card_string_fields_as_group_nicknames() { let mut ext_buffer = Vec::new(); let mut member_without_card = Vec::new(); member_without_card.extend(string_field(1, "wxid_alice")); member_without_card.extend(string_field(4, "owner_or_inviter")); ext_buffer.extend(len_field(1, &member_without_card)); let mut member_with_card = Vec::new(); member_with_card.extend(string_field(1, "wxid_bob")); member_with_card.extend(string_field(2, "Bob In Group")); member_with_card.extend(string_field(4, "owner_or_inviter")); ext_buffer.extend(len_field(1, &member_with_card)); let nicknames = parse_group_nickname_map(&ext_buffer, None); assert!(!nicknames.contains_key("wxid_alice")); assert_eq!( nicknames.get("wxid_bob").map(String::as_str), Some("Bob In Group") ); } #[test] fn group_top_senders_keeps_duplicate_display_names_separate() { let sender_counts = HashMap::from([("wxid_alice".to_string(), 7), ("wxid_bob".to_string(), 3)]); let names = HashMap::from([ ("wxid_alice".to_string(), "Alice Contact".to_string()), ("wxid_bob".to_string(), "Bob Contact".to_string()), ]); let group_nicknames = HashMap::from([ ("wxid_alice".to_string(), "同名".to_string()), ("wxid_bob".to_string(), "同名".to_string()), ]); let top = group_top_senders(&sender_counts, &names, &group_nicknames, 10); assert_eq!(top.len(), 2); assert_eq!(top[0]["sender"].as_str(), Some("同名")); assert_eq!(top[0]["sender_username"].as_str(), Some("wxid_alice")); assert_eq!(top[0]["sender_contact_display"].as_str(), Some("Alice Contact")); assert_eq!(top[0]["sender_group_nickname"].as_str(), Some("同名")); assert_eq!(top[0]["count"].as_i64(), Some(7)); assert_eq!(top[1]["sender"].as_str(), Some("同名")); assert_eq!(top[1]["sender_username"].as_str(), Some("wxid_bob")); assert_eq!(top[1]["sender_contact_display"].as_str(), Some("Bob Contact")); assert_eq!(top[1]["sender_group_nickname"].as_str(), Some("同名")); assert_eq!(top[1]["count"].as_i64(), Some(3)); } } #[cfg(test)] mod sns_tests { use super::*; fn make_post_xml( create_time: &str, desc: &str, username_tag: Option<&str>, media: usize, location: Option<&str>, ) -> String { let username = username_tag .map(|u| format!("{}", u)) .unwrap_or_default(); let media_tags = "2".repeat(media); let content_object = if media > 0 { format!( "{}", media_tags ) } else { String::new() }; let loc = location .map(|p| format!(r#""#, p)) .unwrap_or_default(); format!( "{}{}{}{}{}", username, create_time, desc, content_object, loc ) } #[test] fn parse_uses_user_name_column_when_present() { let xml = make_post_xml("1700000000", "hello", Some("wxid_xml"), 0, None); let p = parse_post_xml(1, "wxid_column", &xml); assert_eq!(p.author_username, "wxid_column"); assert_eq!(p.create_time, 1700000000); assert_eq!(p.content, "hello"); assert_eq!(p.media.len(), 0); assert_eq!(p.location, ""); } #[test] fn parse_falls_back_to_xml_username_when_column_empty() { let xml = make_post_xml("1700000001", "world", Some("wxid_xml_only"), 0, None); let p = parse_post_xml(2, "", &xml); assert_eq!(p.author_username, "wxid_xml_only"); } #[test] fn parse_handles_missing_create_time() { let xml = "x"; let p = parse_post_xml(3, "wxid", xml); assert_eq!(p.create_time, 0); assert_eq!(p.content, "x"); } #[test] fn parse_counts_media_and_extracts_location() { let xml = make_post_xml("1700000002", "post", None, 3, Some("Wuxi")); let p = parse_post_xml(4, "wxid", &xml); assert_eq!(p.media.len(), 3); assert_eq!(p.location, "Wuxi"); } #[test] fn parse_when_both_column_and_xml_username_empty_returns_empty_author() { let xml = "1700000003orphan"; let p = parse_post_xml(5, "", xml); assert_eq!(p.author_username, ""); } #[test] fn parse_decodes_xml_entities_in_content() { // 单 DOM 解析的副作用:roxmltree 自动把 < / & / " 等还原成原字符; // 旧版 extract_xml_text 字符串扫描不解码,会把 "<world>" 原样输出。 // 新版语义对下游更正确(拿到的就是用户真实内容),把这个行为锁进测试。 let xml = "Hello <world> & friends"; let p = parse_post_xml(6, "wxid", xml); assert_eq!(p.content, "Hello & friends"); } #[test] fn parse_malformed_xml_falls_back_to_string_fields_when_column_present() { let xml = "1700000007A & B", "5", "https://mp.weixin.qq.com/s?__biz=MzI4&mid=2247&idx=1", "" ); assert_eq!( extract_appmsg_url(xml).as_deref(), Some("https://mp.weixin.qq.com/s?__biz=MzI4&mid=2247&idx=1") ); } #[test] fn extract_appmsg_url_strips_group_prefix_and_cdata() { let xml = concat!( "wxid_sender:\n", "", "5", "", "" ); assert_eq!( extract_appmsg_url(xml).as_deref(), Some("https://example.com/x?a=1&b=2") ); } #[test] fn extract_appmsg_url_falls_back_to_url1() { let xml = concat!( "", "5", "https://example.com/fallback", "" ); assert_eq!( extract_appmsg_url(xml).as_deref(), Some("https://example.com/fallback") ); } #[test] fn extract_appmsg_url_ignores_non_http_values() { let xml = concat!( "", "5", "weixin://bizmsgmenu?msgmenucontent=foo", "" ); assert_eq!(extract_appmsg_url(xml), None); } #[test] fn extract_appmsg_url_ignores_refermsg() { let xml = concat!( "", "57", "https://example.com/nested", "" ); assert_eq!(extract_appmsg_url(xml), None); } #[test] fn extract_favorite_url_reads_link_tag() { let xml = concat!( "", "5", "", "" ); assert_eq!( extract_favorite_url(xml).as_deref(), Some("https://mp.weixin.qq.com/s?__biz=foo&mid=1") ); } #[test] fn extract_favorite_url_ignores_non_http_values() { let xml = concat!( "", "5", "weixin://favorites/item/1", "" ); assert_eq!(extract_favorite_url(xml), None); } fn media_object(value: &Value) -> &serde_json::Map { value.as_object().expect("media entry should be an object") } #[test] fn single_image_media() { let xml = r#" 2 https://szmmsns.qpic.cn/<redacted>/image.jpg https://szmmsns.qpic.cn/<redacted>/thumb.jpg "#; let media = parse_post_media(xml); assert_eq!(media.len(), 1); let item = media_object(&media[0]); assert_eq!(item.get("type").and_then(Value::as_str), Some("2")); assert_eq!( item.get("url").and_then(Value::as_str), Some("https://szmmsns.qpic.cn//image.jpg") ); assert_eq!( item.get("thumb").and_then(Value::as_str), Some("https://szmmsns.qpic.cn//thumb.jpg") ); assert_eq!(item.get("url_enc_idx").and_then(Value::as_str), Some("1")); assert_eq!( item.get("url_key").and_then(Value::as_str), Some("placeholder-key") ); assert_eq!( item.get("url_token").and_then(Value::as_str), Some("placeholder-token") ); assert_eq!( item.get("md5").and_then(Value::as_str), Some("placeholder-md5") ); assert_eq!(item.get("width").and_then(Value::as_i64), Some(1440)); assert_eq!(item.get("height").and_then(Value::as_i64), Some(1080)); assert_eq!(item.get("total_size").and_then(Value::as_i64), Some(123456)); } #[test] fn three_images_media() { let xml = r#" 2 10 https://szmmsns.qpic.cn/<redacted>/image-1.jpg https://szmmsns.qpic.cn/<redacted>/thumb-1.jpg 2 11 https://szmmsns.qpic.cn/<redacted>/image-2.jpg https://szmmsns.qpic.cn/<redacted>/thumb-2.jpg 6 https://szmmsns.qpic.cn/<redacted>/image-3.jpg https://szmmsns.qpic.cn/<redacted>/thumb-3.jpg "#; let media = parse_post_media(xml); assert_eq!(media.len(), 3); let first = media_object(&media[0]); assert_eq!(first.get("sub_type").and_then(Value::as_str), Some("10")); assert_eq!( first.get("url_key").and_then(Value::as_str), Some("placeholder-key-1") ); let second = media_object(&media[1]); assert_eq!(second.get("sub_type").and_then(Value::as_str), Some("11")); assert_eq!(second.get("width").and_then(Value::as_i64), Some(300)); let third = media_object(&media[2]); assert_eq!(third.get("type").and_then(Value::as_str), Some("6")); assert_eq!( third.get("thumb_key").and_then(Value::as_str), Some("placeholder-thumb-key-3") ); } #[test] fn video_media() { let xml = r#" 15 https://szmmsns.qpic.cn/<redacted>/video.mp4 https://szmmsns.qpic.cn/<redacted>/video-thumb.jpg <placeholder-video-md5> 37 "#; let media = parse_post_media(xml); assert_eq!(media.len(), 1); let item = media_object(&media[0]); assert_eq!( item.get("video_md5").and_then(Value::as_str), Some("") ); assert_eq!(item.get("video_duration").and_then(Value::as_i64), Some(37)); assert!(!item.contains_key("total_size")); } #[test] fn text_only_post() { let without_media_list = r#" 1 "#; let empty_media_list = r#" "#; assert!(parse_post_media(without_media_list).is_empty()); assert!(parse_post_media(empty_media_list).is_empty()); } #[test] fn malformed_xml() { let xml = r#" 2 "#; assert!(parse_post_media(xml).is_empty()); } #[test] fn size_without_total_size_omits_total_size_key() { let xml = r#" 2 "#; let media = parse_post_media(xml); assert_eq!(media.len(), 1); let item = media_object(&media[0]); assert_eq!(item.get("width").and_then(Value::as_i64), Some(640)); assert_eq!(item.get("height").and_then(Value::as_i64), Some(480)); assert!(!item.contains_key("total_size")); } }