From 3e3478c80a0faed37597f9289e371d9fd5921862 Mon Sep 17 00:00:00 2001 From: jackwener Date: Thu, 14 May 2026 16:40:25 +0800 Subject: [PATCH] fix(query): three correctness/latency fixes from deep review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - q_contacts: replaced ad-hoc `gh_*`/`biz_*` prefix filter with `chat_type_of == "private"`. The old filter leaked groups (`@chatroom`), folded entries (`brandsessionholder` / `@placeholder_foldgroup`), verified service accounts (`verify_flag != 0`), and internal `@xxx` system accounts into `wx contacts` output. - q_search: parallelized the per-message-DB blocking phase via `JoinSet::spawn_blocking`. Previously the `for (db_path, ...) in by_path { ... .await }` loop ran one DB at a time; users with N message_*.db shards paid N× latency. Each DB now runs concurrently on the blocking pool; total latency collapses to a single slow DB. - q_new_messages: fixed `new_state` reset path so first-run + truncated sessions don't lock `since_ts` at `fallback_ts` forever. Old code always wrote `state[uname] = old_since_ts || fallback_ts` for changed sessions, then advanced only those that appeared in `all_msgs`. On first run (state=None) truncated sessions ended up with `state[uname] = now-86400` and stayed there across calls — every subsequent call re-scanned a window that grew with elapsed time. New logic separates three cases: * in_results → advance to returned_max (incremental fetch) * truncated + state → keep prev since_ts (retry next call) * truncated + none → advance to session_ts (avoid lock-in; old messages remain reachable via `wx history`). --- src/daemon/query.rs | 87 ++++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 29 deletions(-) diff --git a/src/daemon/query.rs b/src/daemon/query.rs index 03b764b..167d88a 100644 --- a/src/daemon/query.rs +++ b/src/daemon/query.rs @@ -335,17 +335,22 @@ pub async fn q_search( .unwrap_or_default(); let group_nicknames_by_chat = Arc::new(group_nicknames_by_chat); - let mut results: Vec = Vec::new(); + // 多个 message_*.db 之间没有数据依赖,并发解密 + 查询。每个 DB 内部仍按 + // table 串行(共享同一 sqlite Connection 不能跨线程移动)。原版本是 N 个 DB + // 串行 await,活跃账号上 N 个分片要轮 N 次磁盘 IO;现在 JoinSet 把它们一次 + // 全部 dispatch 到 blocking pool,整体 latency 退化为单 DB 慢路径。 let kw = keyword.to_string(); + let mut join_set: tokio::task::JoinSet>> = tokio::task::JoinSet::new(); for (db_path, table_list) in by_path { let kw2 = kw.clone(); let since2 = since; let until2 = until; let limit2 = limit * 3; - let names_map2 = names.map.clone(); let group_nicknames_by_chat2 = Arc::clone(&group_nicknames_by_chat); - let found: Vec = match tokio::task::spawn_blocking(move || { + let db_path_for_log = db_path.clone(); + + join_set.spawn_blocking(move || { let conn = Connection::open(&db_path)?; let mut all = Vec::new(); let empty_group_nicknames = HashMap::new(); @@ -369,17 +374,20 @@ pub async fn q_search( all.push(row); } } - Err(e) => eprintln!("[search] skip table {}: {}", tname, e), + Err(e) => eprintln!("[search] skip table {} (db={}): {}", tname, db_path_for_log, e), } } - Ok::<_, anyhow::Error>(all) - }).await { - Ok(Ok(v)) => v, - Ok(Err(e)) => { eprintln!("[search] skip DB: {}", e); continue; } - Err(e) => { eprintln!("[search] task error: {}", e); continue; } - }; + Ok(all) + }); + } - results.extend(found); + let mut results: Vec = Vec::new(); + while let Some(joined) = join_set.join_next().await { + match joined { + Ok(Ok(rows)) => results.extend(rows), + Ok(Err(e)) => eprintln!("[search] skip DB: {}", e), + Err(e) => eprintln!("[search] task error: {}", e), + } } results.sort_by_key(|r| std::cmp::Reverse(r["timestamp"].as_i64().unwrap_or(0))); @@ -388,9 +396,14 @@ pub async fn q_search( } /// 查询联系人 +/// +/// 只返回真实联系人(`chat_type_of == "private"`)。`names.map` 是从 `contact` 表 +/// 全量加载的,里面同时包含群(`@chatroom`)、公众号(`gh_*` / `biz_*` / verify_flag != 0)、 +/// 折叠入口(`brandsessionholder` / `@placeholder_foldgroup`)以及微信内部 `@xxx` 系统账号。 +/// 这些都不应该出现在 `wx contacts` 输出里,统一走 `chat_type_of` 这条同样的真相判定。 pub async fn q_contacts(names: &Names, query: Option<&str>, limit: usize) -> Result { let mut contacts: Vec = names.map.iter() - .filter(|(u, _)| !u.starts_with("gh_") && !u.starts_with("biz_")) + .filter(|(u, _)| chat_type_of(u, names) == "private") .map(|(u, d)| json!({ "username": u, "display": d })) .collect(); @@ -2184,24 +2197,40 @@ pub async fn q_new_messages( all_msgs.truncate(limit); // 5. 重建 new_state,防止全局 limit 截断导致消息永久丢失: - // - 未变化的会话:沿用 session.db 的 last_timestamp - // - 变化但全被截断(无消息在最终结果中):保留旧 since_ts,下次重试 - // - 变化且有消息返回:推进到该会话在结果中的最大 timestamp - let mut new_state = session_ts_map; - // 先把 changed 会话重置回旧 since_ts - for (uname, _) in &changed { - let old_ts = state.as_ref() - .and_then(|m| m.get(uname)) - .copied() - .unwrap_or(fallback_ts); - new_state.insert(uname.clone(), old_ts); - } - // 再根据实际返回的消息向前推进 - for m in &all_msgs { - if let (Some(uname), Some(ts)) = (m["username"].as_str(), m["timestamp"].as_i64()) { - let e = new_state.entry(uname.to_string()).or_insert(0); - if ts > *e { *e = ts; } + // - 未变化的会话:沿用 session.db 的 last_timestamp(即 session_ts_map) + // - 变化但全被截断(无消息在最终结果中): + // * 后续调用 (state=Some):保留旧 since_ts,下次重试拿这部分消息 + // * 首次调用 (state=None):advance 到 session_ts,避免 since_ts 锁死在 + // fallback_ts 导致后续每次都回扫 24h。窗口会随调用次数 + 时间累积扩大, + // 性能持续衰退。代价:首次 + 被截断会话的老消息看不到,需走 `wx history`。 + // - 变化且有消息返回:advance 到该会话在结果中的最大 timestamp(增量 fetch 标准语义) + let returned_max_ts: HashMap = { + let mut m: HashMap = HashMap::new(); + for msg in &all_msgs { + if let (Some(u), Some(ts)) = (msg["username"].as_str(), msg["timestamp"].as_i64()) { + let e = m.entry(u.to_string()).or_insert(0); + if ts > *e { *e = ts; } + } } + m + }; + let mut new_state = session_ts_map; + for (uname, _) in &changed { + let in_results = returned_max_ts.contains_key(uname); + let prev = state.as_ref().and_then(|m| m.get(uname)).copied(); + let next_ts = match (in_results, prev) { + (true, _) => { + // 有消息返回:advance 到 returned_max;返回的最大 ts 通常 ≤ session_ts, + // 这样下次查 `since > returned_max` 仍能拿到 returned_max..session_ts 的截断尾巴。 + returned_max_ts[uname] + } + (false, Some(prev)) => prev, // 后续 + 截断:保持旧 since + (false, None) => { + // 首次 + 截断:advance 到 session_ts 兜底,避免 since_ts 锁死。 + new_state.get(uname).copied().unwrap_or(fallback_ts) + } + }; + new_state.insert(uname.clone(), next_ts); } Ok(json!({