fix(query): three correctness/latency fixes from deep review

- q_contacts: replaced ad-hoc `gh_*`/`biz_*` prefix filter with
  `chat_type_of == "private"`. The old filter leaked groups
  (`@chatroom`), folded entries (`brandsessionholder` /
  `@placeholder_foldgroup`), verified service accounts
  (`verify_flag != 0`), and internal `@xxx` system accounts into
  `wx contacts` output.

- q_search: parallelized the per-message-DB blocking phase via
  `JoinSet::spawn_blocking`. Previously the `for (db_path, ...) in
  by_path { ... .await }` loop ran one DB at a time; users with N
  message_*.db shards paid N× latency. Each DB now runs concurrently
  on the blocking pool; total latency collapses to a single slow DB.

- q_new_messages: fixed `new_state` reset path so first-run + truncated
  sessions don't lock `since_ts` at `fallback_ts` forever. Old code
  always wrote `state[uname] = old_since_ts || fallback_ts` for changed
  sessions, then advanced only those that appeared in `all_msgs`. On
  first run (state=None) truncated sessions ended up with
  `state[uname] = now-86400` and stayed there across calls — every
  subsequent call re-scanned a window that grew with elapsed time.
  New logic separates three cases:
    * in_results        → advance to returned_max (incremental fetch)
    * truncated + state → keep prev since_ts (retry next call)
    * truncated + none  → advance to session_ts (avoid lock-in; old
                          messages remain reachable via `wx history`).
pull/51/head
jackwener 2026-05-14 16:40:25 +08:00
parent f0f3d3cf22
commit 3e3478c80a
1 changed files with 58 additions and 29 deletions

View File

@ -335,17 +335,22 @@ pub async fn q_search(
.unwrap_or_default();
let group_nicknames_by_chat = Arc::new(group_nicknames_by_chat);
let mut results: Vec<Value> = Vec::new();
// 多个 message_*.db 之间没有数据依赖,并发解密 + 查询。每个 DB 内部仍按
// table 串行(共享同一 sqlite Connection 不能跨线程移动)。原版本是 N 个 DB
// 串行 await活跃账号上 N 个分片要轮 N 次磁盘 IO现在 JoinSet 把它们一次
// 全部 dispatch 到 blocking pool整体 latency 退化为单 DB 慢路径。
let kw = keyword.to_string();
let mut join_set: tokio::task::JoinSet<Result<Vec<Value>>> = tokio::task::JoinSet::new();
for (db_path, table_list) in by_path {
let kw2 = kw.clone();
let since2 = since;
let until2 = until;
let limit2 = limit * 3;
let names_map2 = names.map.clone();
let group_nicknames_by_chat2 = Arc::clone(&group_nicknames_by_chat);
let found: Vec<Value> = match tokio::task::spawn_blocking(move || {
let db_path_for_log = db_path.clone();
join_set.spawn_blocking(move || {
let conn = Connection::open(&db_path)?;
let mut all = Vec::new();
let empty_group_nicknames = HashMap::new();
@ -369,17 +374,20 @@ pub async fn q_search(
all.push(row);
}
}
Err(e) => eprintln!("[search] skip table {}: {}", tname, e),
Err(e) => eprintln!("[search] skip table {} (db={}): {}", tname, db_path_for_log, e),
}
}
Ok::<_, anyhow::Error>(all)
}).await {
Ok(Ok(v)) => v,
Ok(Err(e)) => { eprintln!("[search] skip DB: {}", e); continue; }
Err(e) => { eprintln!("[search] task error: {}", e); continue; }
};
Ok(all)
});
}
results.extend(found);
let mut results: Vec<Value> = Vec::new();
while let Some(joined) = join_set.join_next().await {
match joined {
Ok(Ok(rows)) => results.extend(rows),
Ok(Err(e)) => eprintln!("[search] skip DB: {}", e),
Err(e) => eprintln!("[search] task error: {}", e),
}
}
results.sort_by_key(|r| std::cmp::Reverse(r["timestamp"].as_i64().unwrap_or(0)));
@ -388,9 +396,14 @@ pub async fn q_search(
}
/// 查询联系人
///
/// 只返回真实联系人(`chat_type_of == "private"`)。`names.map` 是从 `contact` 表
/// 全量加载的,里面同时包含群(`@chatroom`)、公众号(`gh_*` / `biz_*` / verify_flag != 0
/// 折叠入口(`brandsessionholder` / `@placeholder_foldgroup`)以及微信内部 `@xxx` 系统账号。
/// 这些都不应该出现在 `wx contacts` 输出里,统一走 `chat_type_of` 这条同样的真相判定。
pub async fn q_contacts(names: &Names, query: Option<&str>, limit: usize) -> Result<Value> {
let mut contacts: Vec<Value> = names.map.iter()
.filter(|(u, _)| !u.starts_with("gh_") && !u.starts_with("biz_"))
.filter(|(u, _)| chat_type_of(u, names) == "private")
.map(|(u, d)| json!({ "username": u, "display": d }))
.collect();
@ -2184,24 +2197,40 @@ pub async fn q_new_messages(
all_msgs.truncate(limit);
// 5. 重建 new_state防止全局 limit 截断导致消息永久丢失:
// - 未变化的会话:沿用 session.db 的 last_timestamp
// - 变化但全被截断(无消息在最终结果中):保留旧 since_ts下次重试
// - 变化且有消息返回:推进到该会话在结果中的最大 timestamp
let mut new_state = session_ts_map;
// 先把 changed 会话重置回旧 since_ts
for (uname, _) in &changed {
let old_ts = state.as_ref()
.and_then(|m| m.get(uname))
.copied()
.unwrap_or(fallback_ts);
new_state.insert(uname.clone(), old_ts);
}
// 再根据实际返回的消息向前推进
for m in &all_msgs {
if let (Some(uname), Some(ts)) = (m["username"].as_str(), m["timestamp"].as_i64()) {
let e = new_state.entry(uname.to_string()).or_insert(0);
if ts > *e { *e = ts; }
// - 未变化的会话:沿用 session.db 的 last_timestamp即 session_ts_map
// - 变化但全被截断(无消息在最终结果中):
// * 后续调用 (state=Some):保留旧 since_ts下次重试拿这部分消息
// * 首次调用 (state=None)advance 到 session_ts避免 since_ts 锁死在
// fallback_ts 导致后续每次都回扫 24h。窗口会随调用次数 + 时间累积扩大,
// 性能持续衰退。代价:首次 + 被截断会话的老消息看不到,需走 `wx history`。
// - 变化且有消息返回advance 到该会话在结果中的最大 timestamp增量 fetch 标准语义)
let returned_max_ts: HashMap<String, i64> = {
let mut m: HashMap<String, i64> = HashMap::new();
for msg in &all_msgs {
if let (Some(u), Some(ts)) = (msg["username"].as_str(), msg["timestamp"].as_i64()) {
let e = m.entry(u.to_string()).or_insert(0);
if ts > *e { *e = ts; }
}
}
m
};
let mut new_state = session_ts_map;
for (uname, _) in &changed {
let in_results = returned_max_ts.contains_key(uname);
let prev = state.as_ref().and_then(|m| m.get(uname)).copied();
let next_ts = match (in_results, prev) {
(true, _) => {
// 有消息返回advance 到 returned_max返回的最大 ts 通常 ≤ session_ts
// 这样下次查 `since > returned_max` 仍能拿到 returned_max..session_ts 的截断尾巴。
returned_max_ts[uname]
}
(false, Some(prev)) => prev, // 后续 + 截断:保持旧 since
(false, None) => {
// 首次 + 截断advance 到 session_ts 兜底,避免 since_ts 锁死。
new_state.get(uname).copied().unwrap_or(fallback_ts)
}
};
new_state.insert(uname.clone(), next_ts);
}
Ok(json!({