diff --git a/src/daemon/cache.rs b/src/daemon/cache.rs index 128d428..561df51 100644 --- a/src/daemon/cache.rs +++ b/src/daemon/cache.rs @@ -23,14 +23,25 @@ struct CacheEntry { decrypted_path: PathBuf, } +/// `DbCache::get_with_mode()` 本次解析 rel_key 时实际走了哪条路径。 +/// +/// latency tier: +/// - `CacheHit`:~0ms,只返回已有解密产物 +/// - `WalIncremental`:典型 <10s,只在 cached DB 上增量 apply WAL +/// - `FullDecrypt`:最慢路径,大库上可能到 ~120s #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CacheMode { + /// Path 1:主 `.db` 和 WAL 都没变,直接命中缓存。 CacheHit, + /// Path 2:主 `.db` 没变、只有 WAL 变了,在 cached DB 上增量 apply。 WalIncremental, + /// Path 3:主 `.db` 变了或缓存 miss,重新 full decrypt。 FullDecrypt, } impl CacheMode { + /// 手工固定为 snake_case 字符串,避免未来给 enum 直接 derive `Serialize` + /// 时静默改变 wire 形态。 pub fn as_str(self) -> &'static str { match self { CacheMode::CacheHit => "cache_hit", @@ -56,7 +67,6 @@ pub struct DbCache { mtime_file: PathBuf, all_keys: HashMap, // rel_key -> enc_key(hex) inner: Arc>>, - last_modes: Arc>>, } impl DbCache { @@ -79,7 +89,6 @@ impl DbCache { mtime_file, all_keys, inner: Arc::new(Mutex::new(HashMap::new())), - last_modes: Arc::new(Mutex::new(HashMap::new())), }; cache.load_persistent().await; @@ -92,18 +101,6 @@ impl DbCache { &self.db_dir } - #[allow(dead_code)] - pub async fn last_mode(&self, rel_key: &str) -> Option { - self.last_modes.lock().await.get(rel_key).copied() - } - - async fn stamp_mode(&self, rel_key: &str, mode: CacheMode) { - self.last_modes - .lock() - .await - .insert(rel_key.to_string(), mode); - } - fn cache_file_path(&self, rel_key: &str) -> PathBuf { let hash = format!("{:x}", md5::compute(rel_key.as_bytes())); self.cache_dir.join(format!("{}.db", hash)) @@ -237,7 +234,6 @@ impl DbCache { if let Some(entry) = cached.as_ref() { if entry.db_mtime == db_mt && entry.decrypted_path.exists() { if entry.wal_mtime == wal_mt { - self.stamp_mode(rel_key, CacheMode::CacheHit).await; return Ok(Some(CacheResolve { path: entry.decrypted_path.clone(), mode: CacheMode::CacheHit, @@ -274,7 +270,6 @@ impl DbCache { }, ); } - self.stamp_mode(rel_key, CacheMode::WalIncremental).await; self.save_persistent().await; return Ok(Some(CacheResolve { path: out_path, @@ -317,7 +312,6 @@ impl DbCache { }, ); } - self.stamp_mode(rel_key, CacheMode::FullDecrypt).await; self.save_persistent().await; Ok(Some(CacheResolve { @@ -537,6 +531,72 @@ mod tests { ); } + #[tokio::test] + async fn get_with_mode_reports_each_path() { + let root = unique_tmpdir("getwithmode"); + let db_dir = root.join("db_storage"); + let cache_dir = root.join("cache"); + std::fs::create_dir_all(&db_dir).unwrap(); + std::fs::create_dir_all(&cache_dir).unwrap(); + + let rel_key = "message_0.db".to_string(); + let db_path = db_dir.join(&rel_key); + std::fs::write(&db_path, b"fake encrypted db").unwrap(); + let wal_path = wal_path_for(&db_path); + std::fs::write(&wal_path, [0u8; 31]).unwrap(); + + let cached_hash = format!("{:x}", md5::compute(rel_key.as_bytes())); + let decrypted_path = cache_dir.join(format!("{}.db", cached_hash)); + std::fs::write(&decrypted_path, ORIGINAL_CACHED_BYTES).unwrap(); + + let db_mt = mtime_nanos(&db_path); + let wal_mt0 = mtime_nanos(&wal_path); + let mtime_file = cache_dir.join("_mtimes.json"); + let payload = serde_json::to_string(&serde_json::json!({ + &rel_key: { + "db_mt": db_mt, + "wal_mt": wal_mt0, + "path": decrypted_path.display().to_string(), + } + })) + .unwrap(); + std::fs::write(&mtime_file, payload).unwrap(); + + let mut all_keys = HashMap::new(); + all_keys.insert(rel_key.clone(), FAKE_KEY_HEX.to_string()); + let cache = DbCache::with_dirs(db_dir, cache_dir, mtime_file, all_keys) + .await + .unwrap(); + + let hit = cache + .get_with_mode(&rel_key) + .await + .unwrap() + .expect("cache should hit"); + assert_eq!(hit.path, decrypted_path); + assert_eq!(hit.mode, CacheMode::CacheHit); + + std::thread::sleep(std::time::Duration::from_millis(20)); + std::fs::write(&wal_path, [0xffu8; 31]).unwrap(); + let wal = cache + .get_with_mode(&rel_key) + .await + .unwrap() + .expect("WAL-only change should stay incremental"); + assert_eq!(wal.path, decrypted_path); + assert_eq!(wal.mode, CacheMode::WalIncremental); + + std::thread::sleep(std::time::Duration::from_millis(20)); + std::fs::write(&db_path, b"different bytes").unwrap(); + let full = cache + .get_with_mode(&rel_key) + .await + .unwrap() + .expect("db mtime change should trigger full decrypt"); + assert_eq!(full.path, decrypted_path); + assert_eq!(full.mode, CacheMode::FullDecrypt); + } + #[tokio::test] async fn restart_with_wal_change_still_reuses_cached_db_then_applies_wal() { let root = unique_tmpdir("restart-wal"); diff --git a/src/daemon/meta.rs b/src/daemon/meta.rs index 26ba2a4..1db37f5 100644 --- a/src/daemon/meta.rs +++ b/src/daemon/meta.rs @@ -65,26 +65,25 @@ pub struct Meta { pub enum MetaStatus { #[default] Ok, + /// `session.db` 的最新时间明显领先于本次消息查询结果,说明数据可能过期或不完整。 PossiblyStale, + /// 最强信号:磁盘上出现 daemon 不认识的新分片,通常必须重跑 `wx init --force`。 PossiblyStaleUnknownShards, + /// 调用方主动传了 `since` / `until` / `offset` 等窗口条件,结果天然是局部视图。 Windowed, } -impl MetaStatus { - #[allow(dead_code)] - pub fn as_str(&self) -> &'static str { - match self { - MetaStatus::Ok => "ok", - MetaStatus::PossiblyStale => "possibly_stale", - MetaStatus::PossiblyStaleUnknownShards => "possibly_stale_unknown_shards", - MetaStatus::Windowed => "windowed", - } - } -} - -/// session 领先 history 多少秒就报 PossiblyStale。 +/// session 领先 history 多少秒就报 `PossiblyStale`。 +/// +/// 24h 的取值是故意保守的:活跃群聊/私聊很少会整整一天没有新消息, +/// 超过这个窗口就值得显式提醒 agent 不要把结果当成“当前最新状态”。 pub const STALE_THRESHOLD_SECS: i64 = 24 * 3600; +/// 统一 freshness status 的优先级: +/// 1. `unknown_shards` 非空:daemon 整体视图已经过期,优先返回 `PossiblyStaleUnknownShards` +/// 2. `windowed=true`:调用方本来就在看局部窗口,不参与 stale 推导 +/// 3. `session_last - chat_latest > STALE_THRESHOLD_SECS`:返回 `PossiblyStale` +/// 4. 其他情况:`Ok` pub fn derive_status( chat_latest: Option, session_last: Option, @@ -103,6 +102,13 @@ pub fn derive_status( } } +/// 扫描 `/message/` 下真实存在的 `message_*.db`,diff 出 daemon 当前没有 key +/// 的未知分片。 +/// +/// 契约: +/// - 返回值一律是 `/` 分隔的 rel_key(如 `message/message_3.db`),与 `all_keys.json` 对齐 +/// - 结果按字典序排序,方便测试和 CLI 稳定显示 +/// - 排除 `_fts*` / `_resource*`,因为它们是索引/附件库,不属于消息分片真相 pub fn discover_unknown_shards(db_dir: &Path, known: &[String]) -> Vec { let known_set: std::collections::HashSet = known.iter().map(|k| k.replace('\\', "/")).collect(); diff --git a/src/daemon/query.rs b/src/daemon/query.rs index 13cbf61..30f755f 100644 --- a/src/daemon/query.rs +++ b/src/daemon/query.rs @@ -699,6 +699,8 @@ pub async fn q_search( results.sort_by_key(|r| std::cmp::Reverse(r["timestamp"].as_i64().unwrap_or(0))); let paged: Vec = results.into_iter().take(limit).collect(); let unknown_shards = current_unknown_shards(db, names); + // 全局搜索 / keyword 过滤天然是窗口化结果,没有稳定的 chat-level latest baseline, + // 不参与 stale 推导;这里只保留 unknown_shards 这类 daemon 全局健康信号。 let meta = meta_for_global_query( scanned_rel_keys.len(), hit_rel_keys.len(),