mirror of https://github.com/jackwener/wx-cli.git
review: restore cache mode coverage and rationale comments
parent
76024901e9
commit
9f6a2cfba3
|
|
@ -23,14 +23,25 @@ struct CacheEntry {
|
||||||
decrypted_path: PathBuf,
|
decrypted_path: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// `DbCache::get_with_mode()` 本次解析 rel_key 时实际走了哪条路径。
|
||||||
|
///
|
||||||
|
/// latency tier:
|
||||||
|
/// - `CacheHit`:~0ms,只返回已有解密产物
|
||||||
|
/// - `WalIncremental`:典型 <10s,只在 cached DB 上增量 apply WAL
|
||||||
|
/// - `FullDecrypt`:最慢路径,大库上可能到 ~120s
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub enum CacheMode {
|
pub enum CacheMode {
|
||||||
|
/// Path 1:主 `.db` 和 WAL 都没变,直接命中缓存。
|
||||||
CacheHit,
|
CacheHit,
|
||||||
|
/// Path 2:主 `.db` 没变、只有 WAL 变了,在 cached DB 上增量 apply。
|
||||||
WalIncremental,
|
WalIncremental,
|
||||||
|
/// Path 3:主 `.db` 变了或缓存 miss,重新 full decrypt。
|
||||||
FullDecrypt,
|
FullDecrypt,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CacheMode {
|
impl CacheMode {
|
||||||
|
/// 手工固定为 snake_case 字符串,避免未来给 enum 直接 derive `Serialize`
|
||||||
|
/// 时静默改变 wire 形态。
|
||||||
pub fn as_str(self) -> &'static str {
|
pub fn as_str(self) -> &'static str {
|
||||||
match self {
|
match self {
|
||||||
CacheMode::CacheHit => "cache_hit",
|
CacheMode::CacheHit => "cache_hit",
|
||||||
|
|
@ -56,7 +67,6 @@ pub struct DbCache {
|
||||||
mtime_file: PathBuf,
|
mtime_file: PathBuf,
|
||||||
all_keys: HashMap<String, String>, // rel_key -> enc_key(hex)
|
all_keys: HashMap<String, String>, // rel_key -> enc_key(hex)
|
||||||
inner: Arc<Mutex<HashMap<String, CacheEntry>>>,
|
inner: Arc<Mutex<HashMap<String, CacheEntry>>>,
|
||||||
last_modes: Arc<Mutex<HashMap<String, CacheMode>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DbCache {
|
impl DbCache {
|
||||||
|
|
@ -79,7 +89,6 @@ impl DbCache {
|
||||||
mtime_file,
|
mtime_file,
|
||||||
all_keys,
|
all_keys,
|
||||||
inner: Arc::new(Mutex::new(HashMap::new())),
|
inner: Arc::new(Mutex::new(HashMap::new())),
|
||||||
last_modes: Arc::new(Mutex::new(HashMap::new())),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
cache.load_persistent().await;
|
cache.load_persistent().await;
|
||||||
|
|
@ -92,18 +101,6 @@ impl DbCache {
|
||||||
&self.db_dir
|
&self.db_dir
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub async fn last_mode(&self, rel_key: &str) -> Option<CacheMode> {
|
|
||||||
self.last_modes.lock().await.get(rel_key).copied()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn stamp_mode(&self, rel_key: &str, mode: CacheMode) {
|
|
||||||
self.last_modes
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.insert(rel_key.to_string(), mode);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cache_file_path(&self, rel_key: &str) -> PathBuf {
|
fn cache_file_path(&self, rel_key: &str) -> PathBuf {
|
||||||
let hash = format!("{:x}", md5::compute(rel_key.as_bytes()));
|
let hash = format!("{:x}", md5::compute(rel_key.as_bytes()));
|
||||||
self.cache_dir.join(format!("{}.db", hash))
|
self.cache_dir.join(format!("{}.db", hash))
|
||||||
|
|
@ -237,7 +234,6 @@ impl DbCache {
|
||||||
if let Some(entry) = cached.as_ref() {
|
if let Some(entry) = cached.as_ref() {
|
||||||
if entry.db_mtime == db_mt && entry.decrypted_path.exists() {
|
if entry.db_mtime == db_mt && entry.decrypted_path.exists() {
|
||||||
if entry.wal_mtime == wal_mt {
|
if entry.wal_mtime == wal_mt {
|
||||||
self.stamp_mode(rel_key, CacheMode::CacheHit).await;
|
|
||||||
return Ok(Some(CacheResolve {
|
return Ok(Some(CacheResolve {
|
||||||
path: entry.decrypted_path.clone(),
|
path: entry.decrypted_path.clone(),
|
||||||
mode: CacheMode::CacheHit,
|
mode: CacheMode::CacheHit,
|
||||||
|
|
@ -274,7 +270,6 @@ impl DbCache {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
self.stamp_mode(rel_key, CacheMode::WalIncremental).await;
|
|
||||||
self.save_persistent().await;
|
self.save_persistent().await;
|
||||||
return Ok(Some(CacheResolve {
|
return Ok(Some(CacheResolve {
|
||||||
path: out_path,
|
path: out_path,
|
||||||
|
|
@ -317,7 +312,6 @@ impl DbCache {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
self.stamp_mode(rel_key, CacheMode::FullDecrypt).await;
|
|
||||||
|
|
||||||
self.save_persistent().await;
|
self.save_persistent().await;
|
||||||
Ok(Some(CacheResolve {
|
Ok(Some(CacheResolve {
|
||||||
|
|
@ -537,6 +531,72 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn get_with_mode_reports_each_path() {
|
||||||
|
let root = unique_tmpdir("getwithmode");
|
||||||
|
let db_dir = root.join("db_storage");
|
||||||
|
let cache_dir = root.join("cache");
|
||||||
|
std::fs::create_dir_all(&db_dir).unwrap();
|
||||||
|
std::fs::create_dir_all(&cache_dir).unwrap();
|
||||||
|
|
||||||
|
let rel_key = "message_0.db".to_string();
|
||||||
|
let db_path = db_dir.join(&rel_key);
|
||||||
|
std::fs::write(&db_path, b"fake encrypted db").unwrap();
|
||||||
|
let wal_path = wal_path_for(&db_path);
|
||||||
|
std::fs::write(&wal_path, [0u8; 31]).unwrap();
|
||||||
|
|
||||||
|
let cached_hash = format!("{:x}", md5::compute(rel_key.as_bytes()));
|
||||||
|
let decrypted_path = cache_dir.join(format!("{}.db", cached_hash));
|
||||||
|
std::fs::write(&decrypted_path, ORIGINAL_CACHED_BYTES).unwrap();
|
||||||
|
|
||||||
|
let db_mt = mtime_nanos(&db_path);
|
||||||
|
let wal_mt0 = mtime_nanos(&wal_path);
|
||||||
|
let mtime_file = cache_dir.join("_mtimes.json");
|
||||||
|
let payload = serde_json::to_string(&serde_json::json!({
|
||||||
|
&rel_key: {
|
||||||
|
"db_mt": db_mt,
|
||||||
|
"wal_mt": wal_mt0,
|
||||||
|
"path": decrypted_path.display().to_string(),
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
std::fs::write(&mtime_file, payload).unwrap();
|
||||||
|
|
||||||
|
let mut all_keys = HashMap::new();
|
||||||
|
all_keys.insert(rel_key.clone(), FAKE_KEY_HEX.to_string());
|
||||||
|
let cache = DbCache::with_dirs(db_dir, cache_dir, mtime_file, all_keys)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let hit = cache
|
||||||
|
.get_with_mode(&rel_key)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.expect("cache should hit");
|
||||||
|
assert_eq!(hit.path, decrypted_path);
|
||||||
|
assert_eq!(hit.mode, CacheMode::CacheHit);
|
||||||
|
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||||
|
std::fs::write(&wal_path, [0xffu8; 31]).unwrap();
|
||||||
|
let wal = cache
|
||||||
|
.get_with_mode(&rel_key)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.expect("WAL-only change should stay incremental");
|
||||||
|
assert_eq!(wal.path, decrypted_path);
|
||||||
|
assert_eq!(wal.mode, CacheMode::WalIncremental);
|
||||||
|
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||||
|
std::fs::write(&db_path, b"different bytes").unwrap();
|
||||||
|
let full = cache
|
||||||
|
.get_with_mode(&rel_key)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.expect("db mtime change should trigger full decrypt");
|
||||||
|
assert_eq!(full.path, decrypted_path);
|
||||||
|
assert_eq!(full.mode, CacheMode::FullDecrypt);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn restart_with_wal_change_still_reuses_cached_db_then_applies_wal() {
|
async fn restart_with_wal_change_still_reuses_cached_db_then_applies_wal() {
|
||||||
let root = unique_tmpdir("restart-wal");
|
let root = unique_tmpdir("restart-wal");
|
||||||
|
|
|
||||||
|
|
@ -65,26 +65,25 @@ pub struct Meta {
|
||||||
pub enum MetaStatus {
|
pub enum MetaStatus {
|
||||||
#[default]
|
#[default]
|
||||||
Ok,
|
Ok,
|
||||||
|
/// `session.db` 的最新时间明显领先于本次消息查询结果,说明数据可能过期或不完整。
|
||||||
PossiblyStale,
|
PossiblyStale,
|
||||||
|
/// 最强信号:磁盘上出现 daemon 不认识的新分片,通常必须重跑 `wx init --force`。
|
||||||
PossiblyStaleUnknownShards,
|
PossiblyStaleUnknownShards,
|
||||||
|
/// 调用方主动传了 `since` / `until` / `offset` 等窗口条件,结果天然是局部视图。
|
||||||
Windowed,
|
Windowed,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MetaStatus {
|
/// session 领先 history 多少秒就报 `PossiblyStale`。
|
||||||
#[allow(dead_code)]
|
///
|
||||||
pub fn as_str(&self) -> &'static str {
|
/// 24h 的取值是故意保守的:活跃群聊/私聊很少会整整一天没有新消息,
|
||||||
match self {
|
/// 超过这个窗口就值得显式提醒 agent 不要把结果当成“当前最新状态”。
|
||||||
MetaStatus::Ok => "ok",
|
|
||||||
MetaStatus::PossiblyStale => "possibly_stale",
|
|
||||||
MetaStatus::PossiblyStaleUnknownShards => "possibly_stale_unknown_shards",
|
|
||||||
MetaStatus::Windowed => "windowed",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// session 领先 history 多少秒就报 PossiblyStale。
|
|
||||||
pub const STALE_THRESHOLD_SECS: i64 = 24 * 3600;
|
pub const STALE_THRESHOLD_SECS: i64 = 24 * 3600;
|
||||||
|
|
||||||
|
/// 统一 freshness status 的优先级:
|
||||||
|
/// 1. `unknown_shards` 非空:daemon 整体视图已经过期,优先返回 `PossiblyStaleUnknownShards`
|
||||||
|
/// 2. `windowed=true`:调用方本来就在看局部窗口,不参与 stale 推导
|
||||||
|
/// 3. `session_last - chat_latest > STALE_THRESHOLD_SECS`:返回 `PossiblyStale`
|
||||||
|
/// 4. 其他情况:`Ok`
|
||||||
pub fn derive_status(
|
pub fn derive_status(
|
||||||
chat_latest: Option<i64>,
|
chat_latest: Option<i64>,
|
||||||
session_last: Option<i64>,
|
session_last: Option<i64>,
|
||||||
|
|
@ -103,6 +102,13 @@ pub fn derive_status(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 扫描 `<db_dir>/message/` 下真实存在的 `message_*.db`,diff 出 daemon 当前没有 key
|
||||||
|
/// 的未知分片。
|
||||||
|
///
|
||||||
|
/// 契约:
|
||||||
|
/// - 返回值一律是 `/` 分隔的 rel_key(如 `message/message_3.db`),与 `all_keys.json` 对齐
|
||||||
|
/// - 结果按字典序排序,方便测试和 CLI 稳定显示
|
||||||
|
/// - 排除 `_fts*` / `_resource*`,因为它们是索引/附件库,不属于消息分片真相
|
||||||
pub fn discover_unknown_shards(db_dir: &Path, known: &[String]) -> Vec<String> {
|
pub fn discover_unknown_shards(db_dir: &Path, known: &[String]) -> Vec<String> {
|
||||||
let known_set: std::collections::HashSet<String> =
|
let known_set: std::collections::HashSet<String> =
|
||||||
known.iter().map(|k| k.replace('\\', "/")).collect();
|
known.iter().map(|k| k.replace('\\', "/")).collect();
|
||||||
|
|
|
||||||
|
|
@ -699,6 +699,8 @@ pub async fn q_search(
|
||||||
results.sort_by_key(|r| std::cmp::Reverse(r["timestamp"].as_i64().unwrap_or(0)));
|
results.sort_by_key(|r| std::cmp::Reverse(r["timestamp"].as_i64().unwrap_or(0)));
|
||||||
let paged: Vec<Value> = results.into_iter().take(limit).collect();
|
let paged: Vec<Value> = results.into_iter().take(limit).collect();
|
||||||
let unknown_shards = current_unknown_shards(db, names);
|
let unknown_shards = current_unknown_shards(db, names);
|
||||||
|
// 全局搜索 / keyword 过滤天然是窗口化结果,没有稳定的 chat-level latest baseline,
|
||||||
|
// 不参与 stale 推导;这里只保留 unknown_shards 这类 daemon 全局健康信号。
|
||||||
let meta = meta_for_global_query(
|
let meta = meta_for_global_query(
|
||||||
scanned_rel_keys.len(),
|
scanned_rel_keys.len(),
|
||||||
hit_rel_keys.len(),
|
hit_rel_keys.len(),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue