feat(meta): expose freshness coverage in query output

pull/68/head
jackwener 2026-05-16 02:22:03 +08:00
commit b5edaf7177
18 changed files with 2534 additions and 826 deletions

View File

@ -36,7 +36,7 @@ npx skills add jackwener/wx-cli -g
- **零依赖安装** — 单一 Rust 二进制,一行命令装完
- **毫秒级响应** — 后台 daemon 持久缓存解密数据库mtime 不变则复用
- **AI 友好**默认 YAML 输出,更省 token & 易读;`--json` 可切换为 JSON方便 `jq` 处理等)
- **AI 友好**`history` / `search` / `sessions` / `new-messages` / `stats` / `attachments` 默认返回 `{..., meta}` wrapperagent 能直接消费 freshness / source 信息
- **完全本地** — 数据不出本机,实时解密,无需全量预解密
---
@ -168,6 +168,15 @@ wx search "会议" --in "工作群" --since 2026-01-01
群聊里的 `last_sender`、`sender` 和 `stats``top_senders` 会优先使用群昵称(群名片)。如果本地数据库里没有对应群昵称,则回退到联系人备注、微信昵称或 username。
`history` / `search` / `sessions` / `unread` / `new-messages` / `stats` / `attachments` 现在都会附带 `meta`
- `status`: `ok` / `possibly_stale` / `possibly_stale_unknown_shards` / `windowed`
- `unknown_shards`: 磁盘上存在、但 daemon 当前没有 key 的 `message_N.db` 分片;非空时应先跑 `wx init --force`
- `chat_latest_timestamp` / `chat_latest_db`: 当前命中数据里最新一条消息的时间和分片来源
- `session_last_timestamp`: `session.db` 里 WeChat 自己记录的最新时间;如果明显领先于 `chat_latest_timestamp`,说明结果可能漏了消息
默认情况下,人类用户会在 stderr 看到可执行的 warningagent / 脚本可直接读 stdout 里的 `meta`。传 `--with-meta` 会额外返回 `per_shard_latest` / `cache_mode_per_shard`,传隐藏 flag `--debug-source` 还会带真实 `shard_paths`
引用消息会在 `history` / `search` / `new-messages` 输出中显示当前回复和被引用原文:
```text
@ -278,12 +287,14 @@ wx export "AI群" --since 2026-01-01 --format json
### 输出格式
默认输出 YAML,更省 token & 易读;`--json` 可切换为 JSON方便 `jq` 处理等)
默认输出 YAML`--json` 可切换为 JSON。对 agent 而言,`history` / `search` / `sessions` / `new-messages` / `stats` / `attachments` 的 stdout 现在是 wrapper而不是裸数组
```bash
wx sessions --json
wx search "关键词" --json | jq '.[0].content'
wx search "关键词" --json | jq '.results[0].content'
wx new-messages --json
wx history "张三" --json | jq '.meta'
wx history "张三" --json --with-meta | jq '.meta.cache_mode_per_shard'
```
### Daemon 管理

View File

@ -159,6 +159,29 @@ wx search "会议" --in "工作群" --since 2026-01-01
群聊消息里的 `last_sender`、`sender` 和 `stats.top_senders` 会优先显示群昵称(群名片)。如果本地数据库没有群昵称,再回退到联系人备注、微信昵称或 username。
`sessions` / `unread` / `history` / `search` / `new-messages` / `stats` / `attachments` 的 stdout 现在统一是 wrapper
```json
{
"messages": [...],
"meta": {
"status": "ok",
"unknown_shards": [],
"chat_latest_timestamp": 1715750400,
"chat_latest_db": "message/message_2.db",
"session_last_timestamp": 1715760000
}
}
```
其中:
- `status = possibly_stale_unknown_shards`:磁盘上出现 daemon 不认识的新 `message_N.db`,先跑 `wx init --force`
- `status = possibly_stale``session.db` 记录的最新时间明显领先于本次查到的最新消息,结果可能漏消息
- `status = windowed`:这次查询本来就是窗口化/过滤后的局部视图,不应把它当作"全量最新状态"
- `--with-meta`:额外返回 `per_shard_latest` / `cache_mode_per_shard`
- `--debug-source`:在 `--with-meta` 基础上再暴露真实 `shard_paths`
引用消息appmsg `type=57`)在 `history` / `search` / `new-messages` 输出里会展开为两行:第一行是当前回复,第二行以 `↳` 开头显示被引用原文,例如:
```text
@ -315,8 +338,10 @@ wx daemon logs --follow
```bash
wx sessions --json
wx new-messages --json
wx search "关键词" --json
wx history "张三" --json -n 50
wx search "关键词" --json | jq '.results[0]'
wx history "张三" --json -n 50 | jq '.messages[0]'
wx history "张三" --json | jq '.meta'
wx history "张三" --json --with-meta | jq '.meta.cache_mode_per_shard'
```
CHAT 参数支持昵称、备注名、微信 ID模糊匹配。不确定准确名称时先用 `wx contacts --query` 搜索。

View File

@ -1,9 +1,9 @@
use anyhow::Result;
use crate::ipc::Request;
use super::history::{parse_time, parse_time_end};
use super::output::{print_value, resolve};
use super::output::{emit_warnings, print_response, OutputOpts};
use super::transport;
use crate::ipc::Request;
/// `wx attachments` — 列出指定会话的附件消息(默认 image可多选
///
@ -16,10 +16,11 @@ pub fn cmd_attachments(
offset: usize,
since: Option<String>,
until: Option<String>,
json: bool,
opts: OutputOpts,
) -> Result<()> {
let since_ts = since.as_deref().map(parse_time).transpose()?;
let until_ts = until.as_deref().map(parse_time_end).transpose()?;
let (with_meta, debug_source) = opts.request_flags();
// CLI 收上来的 Vec<String> 为空时按默认image让 daemon 决定 fallback。
let kinds_param = if kinds.is_empty() { None } else { Some(kinds) };
@ -31,12 +32,10 @@ pub fn cmd_attachments(
offset,
since: since_ts,
until: until_ts,
with_meta,
debug_source,
};
let resp = transport::send(req)?;
let data = resp
.data
.get("attachments")
.cloned()
.unwrap_or(serde_json::Value::Array(vec![]));
print_value(&data, &resolve(json))
emit_warnings(&resp.data);
print_response(&resp.data, &opts)
}

View File

@ -1,7 +1,8 @@
use anyhow::Result;
use crate::ipc::Request;
use super::transport;
use super::history::{parse_time, parse_time_end};
use super::output::{emit_warnings, warning_block_markdown, warning_block_text, OutputOpts};
use super::transport;
use crate::ipc::Request;
use anyhow::Result;
pub fn cmd_export(
chat: String,
@ -10,9 +11,11 @@ pub fn cmd_export(
limit: usize,
format: String,
output: Option<String>,
opts: OutputOpts,
) -> Result<()> {
let since_ts = since.as_deref().map(parse_time).transpose()?;
let until_ts = until.as_deref().map(parse_time_end).transpose()?;
let (with_meta, debug_source) = opts.request_flags();
let req = Request::History {
chat,
@ -21,24 +24,42 @@ pub fn cmd_export(
since: since_ts,
until: until_ts,
msg_type: None,
with_meta,
debug_source,
};
let resp = transport::send(req)?;
let messages = resp.data["messages"].as_array().cloned().unwrap_or_default();
emit_warnings(&resp.data);
let messages = resp.data["messages"]
.as_array()
.cloned()
.unwrap_or_default();
let chat_name = resp.data["chat"].as_str().unwrap_or("").to_string();
let is_group = resp.data["is_group"].as_bool().unwrap_or(false);
let count = messages.len();
let text = match format.as_str() {
"json" => serde_json::to_string_pretty(&resp.data)?,
"yaml" => serde_yaml::to_string(&resp.data)?,
"txt" => {
let group_str = if is_group { "[群]" } else { "" };
let mut lines = vec![format!("=== {}{} ({} 条) ===\n", chat_name, group_str, count)];
let mut lines = vec![format!(
"=== {}{} ({} 条) ===\n",
chat_name, group_str, count
)];
if let Some(warn) = warning_block_text(&resp.data) {
lines.push(warn);
lines.push(String::new());
}
for m in &messages {
let time = m["time"].as_str().unwrap_or("");
let sender = m["sender"].as_str().unwrap_or("");
let content = m["content"].as_str().unwrap_or("");
let sender_str = if !sender.is_empty() { format!("{}: ", sender) } else { String::new() };
let sender_str = if !sender.is_empty() {
format!("{}: ", sender)
} else {
String::new()
};
lines.push(format!("[{}] {}{}", time, sender_str, content));
}
lines.join("\n")
@ -50,11 +71,18 @@ pub fn cmd_export(
format!("# {}{}", chat_name, group_str),
format!("\n> 导出 {} 条消息\n", count),
];
if let Some(warn) = warning_block_markdown(&resp.data) {
lines.push(warn);
}
for m in &messages {
let time = m["time"].as_str().unwrap_or("");
let sender = m["sender"].as_str().unwrap_or("");
let content = m["content"].as_str().unwrap_or("").replace('\n', "\n> ");
let sender_md = if !sender.is_empty() { format!("**{}**: ", sender) } else { String::new() };
let sender_md = if !sender.is_empty() {
format!("**{}**: ", sender)
} else {
String::new()
};
lines.push(format!("### {}\n\n{}{}\n", time, sender_md, content));
}
lines.join("\n")

View File

@ -1,7 +1,7 @@
use anyhow::Result;
use crate::ipc::Request;
use super::output::{emit_warnings, print_response, OutputOpts};
use super::transport;
use super::output::{resolve, print_value};
use crate::ipc::Request;
use anyhow::Result;
pub fn cmd_history(
chat: String,
@ -10,37 +10,51 @@ pub fn cmd_history(
since: Option<String>,
until: Option<String>,
msg_type: Option<String>,
json: bool,
opts: OutputOpts,
) -> Result<()> {
let since_ts = since.as_deref().map(parse_time).transpose()?;
let until_ts = until.as_deref().map(parse_time_end).transpose()?;
let type_val = msg_type.as_deref().and_then(parse_msg_type);
let (with_meta, debug_source) = opts.request_flags();
let req = Request::History { chat, limit, offset, since: since_ts, until: until_ts, msg_type: type_val };
let req = Request::History {
chat,
limit,
offset,
since: since_ts,
until: until_ts,
msg_type: type_val,
with_meta,
debug_source,
};
let resp = transport::send(req)?;
let msgs = resp.data.get("messages")
.cloned()
.unwrap_or(serde_json::Value::Array(vec![]));
print_value(&msgs, &resolve(json))
emit_warnings(&resp.data);
print_response(&resp.data, &opts)
}
pub fn parse_time(s: &str) -> Result<i64> {
use chrono::{Local, TimeZone};
for fmt in &["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M"] {
if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, fmt) {
return Local.from_local_datetime(&dt).single()
return Local
.from_local_datetime(&dt)
.single()
.map(|d| d.timestamp())
.ok_or_else(|| anyhow::anyhow!("本地时间歧义: {}", s));
}
}
if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
let dt = d.and_hms_opt(0, 0, 0).unwrap();
return Local.from_local_datetime(&dt).single()
return Local
.from_local_datetime(&dt)
.single()
.map(|d| d.timestamp())
.ok_or_else(|| anyhow::anyhow!("本地时间歧义: {}", s));
}
anyhow::bail!("无法解析时间 '{}',支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS", s)
anyhow::bail!(
"无法解析时间 '{}',支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS",
s
)
}
pub fn parse_time_end(s: &str) -> Result<i64> {
@ -48,7 +62,9 @@ pub fn parse_time_end(s: &str) -> Result<i64> {
if s.len() == 10 {
if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
let dt = d.and_hms_opt(23, 59, 59).unwrap();
return Local.from_local_datetime(&dt).single()
return Local
.from_local_datetime(&dt)
.single()
.map(|d| d.timestamp())
.ok_or_else(|| anyhow::anyhow!("本地时间歧义: {}", s));
}
@ -59,15 +75,15 @@ pub fn parse_time_end(s: &str) -> Result<i64> {
/// 将消息类型字符串转为 local_type 整数,未知类型返回 None
pub fn parse_msg_type(s: &str) -> Option<i64> {
match s {
"text" => Some(1),
"image" => Some(3),
"voice" => Some(34),
"video" => Some(43),
"sticker" => Some(47),
"text" => Some(1),
"image" => Some(3),
"voice" => Some(34),
"video" => Some(43),
"sticker" => Some(47),
"location" => Some(48),
"link" | "file" => Some(49),
"call" => Some(50),
"system" => Some(10000),
_ => None,
"call" => Some(50),
"system" => Some(10000),
_ => None,
}
}

View File

@ -1,24 +1,25 @@
mod init;
pub mod attachments;
pub mod biz_articles;
pub mod extract;
pub mod sessions;
pub mod history;
pub mod search;
pub mod contacts;
pub mod export;
pub mod daemon_cmd;
pub mod transport;
pub mod output;
pub mod unread;
pub mod export;
pub mod extract;
pub mod favorites;
pub mod history;
mod init;
pub mod members;
pub mod new_messages;
pub mod stats;
pub mod favorites;
pub mod sns_notifications;
pub mod output;
pub mod search;
pub mod sessions;
pub mod sns_feed;
pub mod sns_notifications;
pub mod sns_search;
pub mod stats;
pub mod transport;
pub mod unread;
use self::output::OutputOpts;
use anyhow::Result;
use clap::{Parser, Subcommand};
@ -26,6 +27,12 @@ use clap::{Parser, Subcommand};
#[derive(Parser)]
#[command(name = "wx", version = env!("CARGO_PKG_VERSION"), about = "wx — 微信本地数据 CLI")]
pub struct Cli {
/// 返回更重的 freshness/source 元数据(如 per-shard latest、cache modes
#[arg(long, global = true)]
with_meta: bool,
/// 在 meta 里暴露真实 shard 路径(调试用)
#[arg(long, global = true, hide = true)]
debug_source: bool,
#[command(subcommand)]
command: Commands,
}
@ -335,46 +342,184 @@ pub fn run() {
}
fn dispatch(cli: Cli) -> Result<()> {
let base_with_meta = cli.with_meta;
let base_debug_source = cli.debug_source;
match cli.command {
Commands::Init { force } => init::cmd_init(force),
Commands::Sessions { limit, json } => sessions::cmd_sessions(limit, json),
Commands::History { chat, limit, offset, since, until, msg_type, json } => {
history::cmd_history(chat, limit, offset, since, until, msg_type, json)
}
Commands::Search { keyword, chats, limit, since, until, msg_type, json } => {
search::cmd_search(keyword, chats, limit, since, until, msg_type, json)
}
Commands::Sessions { limit, json } => sessions::cmd_sessions(
limit,
OutputOpts {
json,
with_meta: base_with_meta,
debug_source: base_debug_source,
},
),
Commands::History {
chat,
limit,
offset,
since,
until,
msg_type,
json,
} => history::cmd_history(
chat,
limit,
offset,
since,
until,
msg_type,
OutputOpts {
json,
with_meta: base_with_meta,
debug_source: base_debug_source,
},
),
Commands::Search {
keyword,
chats,
limit,
since,
until,
msg_type,
json,
} => search::cmd_search(
keyword,
chats,
limit,
since,
until,
msg_type,
OutputOpts {
json,
with_meta: base_with_meta,
debug_source: base_debug_source,
},
),
Commands::Contacts { query, limit, json } => contacts::cmd_contacts(query, limit, json),
Commands::Export { chat, since, until, limit, format, output } => {
export::cmd_export(chat, since, until, limit, format, output)
Commands::Export {
chat,
since,
until,
limit,
format,
output,
} => {
let export_json = format == "json";
export::cmd_export(
chat,
since,
until,
limit,
format,
output,
OutputOpts {
json: export_json,
with_meta: base_with_meta,
debug_source: base_debug_source,
},
)
}
Commands::Unread { limit, filter, json } => unread::cmd_unread(limit, filter, json),
Commands::Unread {
limit,
filter,
json,
} => unread::cmd_unread(
limit,
filter,
OutputOpts {
json,
with_meta: base_with_meta,
debug_source: base_debug_source,
},
),
Commands::Members { chat, json } => members::cmd_members(chat, json),
Commands::NewMessages { limit, json } => new_messages::cmd_new_messages(limit, json),
Commands::Stats { chat, since, until, json } => {
stats::cmd_stats(chat, since, until, json)
}
Commands::Favorites { limit, fav_type, query, json } => {
favorites::cmd_favorites(limit, fav_type, query, json)
}
Commands::SnsNotifications { limit, since, until, include_read, json } => {
sns_notifications::cmd_sns_notifications(limit, since, until, include_read, json)
}
Commands::SnsFeed { limit, since, until, user, json } => {
sns_feed::cmd_sns_feed(limit, since, until, user, json)
}
Commands::SnsSearch { keyword, limit, since, until, user, json } => {
sns_search::cmd_sns_search(keyword, limit, since, until, user, json)
}
Commands::BizArticles { limit, account, since, until, unread, json } => {
biz_articles::cmd_biz_articles(limit, account, since, until, unread, json)
}
Commands::Attachments { chat, kinds, limit, offset, since, until, json } => {
attachments::cmd_attachments(chat, kinds, limit, offset, since, until, json)
}
Commands::Extract { attachment_id, output, overwrite, json } => {
extract::cmd_extract(attachment_id, output, overwrite, json)
}
Commands::NewMessages { limit, json } => new_messages::cmd_new_messages(
limit,
OutputOpts {
json,
with_meta: base_with_meta,
debug_source: base_debug_source,
},
),
Commands::Stats {
chat,
since,
until,
json,
} => stats::cmd_stats(
chat,
since,
until,
OutputOpts {
json,
with_meta: base_with_meta,
debug_source: base_debug_source,
},
),
Commands::Favorites {
limit,
fav_type,
query,
json,
} => favorites::cmd_favorites(limit, fav_type, query, json),
Commands::SnsNotifications {
limit,
since,
until,
include_read,
json,
} => sns_notifications::cmd_sns_notifications(limit, since, until, include_read, json),
Commands::SnsFeed {
limit,
since,
until,
user,
json,
} => sns_feed::cmd_sns_feed(limit, since, until, user, json),
Commands::SnsSearch {
keyword,
limit,
since,
until,
user,
json,
} => sns_search::cmd_sns_search(keyword, limit, since, until, user, json),
Commands::BizArticles {
limit,
account,
since,
until,
unread,
json,
} => biz_articles::cmd_biz_articles(limit, account, since, until, unread, json),
Commands::Attachments {
chat,
kinds,
limit,
offset,
since,
until,
json,
} => attachments::cmd_attachments(
chat,
kinds,
limit,
offset,
since,
until,
OutputOpts {
json,
with_meta: base_with_meta,
debug_source: base_debug_source,
},
),
Commands::Extract {
attachment_id,
output,
overwrite,
json,
} => extract::cmd_extract(attachment_id, output, overwrite, json),
Commands::Daemon { cmd } => daemon_cmd::cmd_daemon(cmd),
}
}

View File

@ -1,8 +1,8 @@
use super::output::{emit_warnings, print_response, OutputOpts};
use super::transport;
use crate::ipc::Request;
use anyhow::Result;
use std::collections::HashMap;
use crate::ipc::Request;
use super::transport;
use super::output::{resolve, print_value};
fn state_file() -> std::path::PathBuf {
dirs::home_dir()
@ -18,7 +18,8 @@ fn load_state() -> Option<HashMap<String, i64>> {
let data = std::fs::read_to_string(state_file()).ok()?;
let v: serde_json::Value = serde_json::from_str(&data).ok()?;
// 旧格式(只有 timestamp 字段)没有 sessions key → 返回 None 触发首次运行逻辑
let map: HashMap<String, i64> = v.get("sessions")?
let map: HashMap<String, i64> = v
.get("sessions")?
.as_object()?
.iter()
.filter_map(|(k, v)| v.as_i64().map(|ts| (k.clone(), ts)))
@ -33,17 +34,27 @@ fn save_state(new_state: &HashMap<String, i64>) -> Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&path, serde_json::to_string(&serde_json::json!({ "sessions": new_state }))?)?;
std::fs::write(
&path,
serde_json::to_string(&serde_json::json!({ "sessions": new_state }))?,
)?;
Ok(())
}
pub fn cmd_new_messages(limit: usize, json: bool) -> Result<()> {
pub fn cmd_new_messages(limit: usize, opts: OutputOpts) -> Result<()> {
let state = load_state();
let resp = transport::send(Request::NewMessages { state, limit })?;
let (with_meta, debug_source) = opts.request_flags();
let resp = transport::send(Request::NewMessages {
state,
limit,
with_meta,
debug_source,
})?;
// 保存 daemon 返回的 new_state
if let Some(obj) = resp.data.get("new_state").and_then(|v| v.as_object()) {
let map: HashMap<String, i64> = obj.iter()
let map: HashMap<String, i64> = obj
.iter()
.filter_map(|(k, v)| v.as_i64().map(|ts| (k.clone(), ts)))
.collect();
if !map.is_empty() {
@ -51,8 +62,6 @@ pub fn cmd_new_messages(limit: usize, json: bool) -> Result<()> {
}
}
let messages = resp.data.get("messages")
.cloned()
.unwrap_or(serde_json::Value::Array(vec![]));
print_value(&messages, &resolve(json))
emit_warnings(&resp.data);
print_response(&resp.data, &opts)
}

View File

@ -1,12 +1,31 @@
use chrono::{Local, TimeZone};
/// 输出格式
pub enum Fmt {
Yaml,
Json,
}
#[derive(Clone, Copy, Debug)]
pub struct OutputOpts {
pub json: bool,
pub with_meta: bool,
pub debug_source: bool,
}
impl OutputOpts {
pub fn request_flags(self) -> (bool, bool) {
(self.with_meta || self.debug_source, self.debug_source)
}
}
/// 默认 YAML--json 时输出 JSON
pub fn resolve(json: bool) -> Fmt {
if json { Fmt::Json } else { Fmt::Yaml }
if json {
Fmt::Json
} else {
Fmt::Yaml
}
}
pub fn print_value(value: &serde_json::Value, fmt: &Fmt) -> anyhow::Result<()> {
@ -16,3 +35,95 @@ pub fn print_value(value: &serde_json::Value, fmt: &Fmt) -> anyhow::Result<()> {
}
Ok(())
}
pub fn print_response(data: &serde_json::Value, opts: &OutputOpts) -> anyhow::Result<()> {
print_value(data, &resolve(opts.json))
}
pub fn emit_warnings(data: &serde_json::Value) {
for line in warning_lines(data) {
eprintln!("[wx] 警告:{}", line);
}
}
pub fn warning_lines(data: &serde_json::Value) -> Vec<String> {
let mut lines = Vec::new();
let meta = match data.get("meta") {
Some(v) if v.is_object() => v,
_ => return lines,
};
let unknown_shards: Vec<String> = meta
.get("unknown_shards")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
if !unknown_shards.is_empty() {
lines.push(format!(
"磁盘上发现 daemon 不认识的分片 {},结果可能不完整;运行 `wx init --force` 重新提取密钥。",
unknown_shards.join(", ")
));
}
let status = meta.get("status").and_then(|v| v.as_str()).unwrap_or("");
if status == "possibly_stale" || status == "possibly_stale_unknown_shards" {
let session_ts = meta.get("session_last_timestamp").and_then(|v| v.as_i64());
let chat_ts = meta.get("chat_latest_timestamp").and_then(|v| v.as_i64());
if let (Some(session_ts), Some(chat_ts)) = (session_ts, chat_ts) {
let subject = data
.get("chat")
.and_then(|v| v.as_str())
.or_else(|| data.get("username").and_then(|v| v.as_str()))
.unwrap_or("当前查询");
lines.push(format!(
"session.db 显示 '{}' 最新到 {},但本次扫描只到 {},结果可能过期或不完整。",
subject,
fmt_meta_ts(session_ts),
fmt_meta_ts(chat_ts),
));
}
}
lines
}
pub fn warning_block_text(data: &serde_json::Value) -> Option<String> {
let lines = warning_lines(data);
if lines.is_empty() {
return None;
}
Some(
lines
.into_iter()
.map(|line| format!("[wx] 警告:{}", line))
.collect::<Vec<_>>()
.join("\n"),
)
}
pub fn warning_block_markdown(data: &serde_json::Value) -> Option<String> {
let lines = warning_lines(data);
if lines.is_empty() {
return None;
}
let mut out = String::from("> [!WARNING]\n");
for line in lines {
out.push_str("> ");
out.push_str(&line);
out.push('\n');
}
Some(out)
}
fn fmt_meta_ts(ts: i64) -> String {
Local
.timestamp_opt(ts, 0)
.single()
.map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_else(|| ts.to_string())
}

View File

@ -1,8 +1,8 @@
use anyhow::Result;
use crate::ipc::Request;
use super::history::{parse_msg_type, parse_time, parse_time_end};
use super::output::{emit_warnings, print_response, OutputOpts};
use super::transport;
use super::history::{parse_time, parse_time_end, parse_msg_type};
use super::output::{resolve, print_value};
use crate::ipc::Request;
use anyhow::Result;
pub fn cmd_search(
keyword: String,
@ -11,12 +11,13 @@ pub fn cmd_search(
since: Option<String>,
until: Option<String>,
msg_type: Option<String>,
json: bool,
opts: OutputOpts,
) -> Result<()> {
let since_ts = since.as_deref().map(parse_time).transpose()?;
let until_ts = until.as_deref().map(parse_time_end).transpose()?;
let type_val = msg_type.as_deref().and_then(parse_msg_type);
let chats_opt = if chats.is_empty() { None } else { Some(chats) };
let (with_meta, debug_source) = opts.request_flags();
let req = Request::Search {
keyword,
@ -25,11 +26,11 @@ pub fn cmd_search(
since: since_ts,
until: until_ts,
msg_type: type_val,
with_meta,
debug_source,
};
let resp = transport::send(req)?;
let results = resp.data.get("results")
.cloned()
.unwrap_or(serde_json::Value::Array(vec![]));
print_value(&results, &resolve(json))
emit_warnings(&resp.data);
print_response(&resp.data, &opts)
}

View File

@ -1,12 +1,15 @@
use anyhow::Result;
use crate::ipc::Request;
use super::output::{emit_warnings, print_response, OutputOpts};
use super::transport;
use super::output::{resolve, print_value};
use crate::ipc::Request;
use anyhow::Result;
pub fn cmd_sessions(limit: usize, json: bool) -> Result<()> {
let resp = transport::send(Request::Sessions { limit })?;
let data = resp.data.get("sessions")
.cloned()
.unwrap_or(serde_json::Value::Array(vec![]));
print_value(&data, &resolve(json))
pub fn cmd_sessions(limit: usize, opts: OutputOpts) -> Result<()> {
let (with_meta, debug_source) = opts.request_flags();
let resp = transport::send(Request::Sessions {
limit,
with_meta,
debug_source,
})?;
emit_warnings(&resp.data);
print_response(&resp.data, &opts)
}

View File

@ -1,18 +1,25 @@
use anyhow::Result;
use crate::ipc::Request;
use super::transport;
use super::history::{parse_time, parse_time_end};
use super::output::{resolve, print_value};
use super::output::{emit_warnings, print_response, OutputOpts};
use super::transport;
use crate::ipc::Request;
use anyhow::Result;
pub fn cmd_stats(
chat: String,
since: Option<String>,
until: Option<String>,
json: bool,
opts: OutputOpts,
) -> Result<()> {
let since_ts = since.as_deref().map(parse_time).transpose()?;
let until_ts = until.as_deref().map(parse_time_end).transpose()?;
let resp = transport::send(Request::Stats { chat, since: since_ts, until: until_ts })?;
print_value(&resp.data, &resolve(json))
let (with_meta, debug_source) = opts.request_flags();
let resp = transport::send(Request::Stats {
chat,
since: since_ts,
until: until_ts,
with_meta,
debug_source,
})?;
emit_warnings(&resp.data);
print_response(&resp.data, &opts)
}

View File

@ -1,18 +1,22 @@
use anyhow::Result;
use crate::ipc::Request;
use super::output::{emit_warnings, print_response, OutputOpts};
use super::transport;
use super::output::{resolve, print_value};
use crate::ipc::Request;
use anyhow::Result;
pub fn cmd_unread(limit: usize, filter: Vec<String>, json: bool) -> Result<()> {
pub fn cmd_unread(limit: usize, filter: Vec<String>, opts: OutputOpts) -> Result<()> {
// 空或含 "all" 视为不过滤;其他值已被 clap value_parser 验证过,直接透传给 daemon。
let filter_vec = if filter.is_empty() || filter.iter().any(|s| s == "all") {
None
} else {
Some(filter)
};
let resp = transport::send(Request::Unread { limit, filter: filter_vec })?;
let data = resp.data.get("sessions")
.cloned()
.unwrap_or(serde_json::Value::Array(vec![]));
print_value(&data, &resolve(json))
let (with_meta, debug_source) = opts.request_flags();
let resp = transport::send(Request::Unread {
limit,
filter: filter_vec,
with_meta,
debug_source,
})?;
emit_warnings(&resp.data);
print_response(&resp.data, &opts)
}

View File

@ -23,6 +23,40 @@ struct CacheEntry {
decrypted_path: PathBuf,
}
/// `DbCache::get_with_mode()` 本次解析 rel_key 时实际走了哪条路径。
///
/// latency tier:
/// - `CacheHit`~0ms只返回已有解密产物
/// - `WalIncremental`:典型 <10s只在 cached DB 上增量 apply WAL
/// - `FullDecrypt`:最慢路径,大库上可能到 ~120s
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CacheMode {
/// Path 1主 `.db` 和 WAL 都没变,直接命中缓存。
CacheHit,
/// Path 2主 `.db` 没变、只有 WAL 变了,在 cached DB 上增量 apply。
WalIncremental,
/// Path 3主 `.db` 变了或缓存 miss重新 full decrypt。
FullDecrypt,
}
impl CacheMode {
/// 手工固定为 snake_case 字符串,避免未来给 enum 直接 derive `Serialize`
/// 时静默改变 wire 形态。
pub fn as_str(self) -> &'static str {
match self {
CacheMode::CacheHit => "cache_hit",
CacheMode::WalIncremental => "wal_incremental",
CacheMode::FullDecrypt => "full_decrypt",
}
}
}
#[derive(Debug, Clone)]
pub struct CacheResolve {
pub path: PathBuf,
pub mode: CacheMode,
}
/// 解密后数据库的 mtime-aware 缓存
///
/// 当数据库文件(.db或 WAL 文件(.db-wal的 mtime 发生变化时,
@ -36,10 +70,7 @@ pub struct DbCache {
}
impl DbCache {
pub async fn new(
db_dir: PathBuf,
all_keys: HashMap<String, String>,
) -> Result<Self> {
pub async fn new(db_dir: PathBuf, all_keys: HashMap<String, String>) -> Result<Self> {
Self::with_dirs(db_dir, config::cache_dir(), config::mtime_file(), all_keys).await
}
@ -94,23 +125,34 @@ impl DbCache {
if !dec_path.exists() {
continue;
}
let db_path = self.db_dir.join(rel_key.replace('\\', std::path::MAIN_SEPARATOR_STR).replace('/', std::path::MAIN_SEPARATOR_STR));
let db_path = self.db_dir.join(
rel_key
.replace('\\', std::path::MAIN_SEPARATOR_STR)
.replace('/', std::path::MAIN_SEPARATOR_STR),
);
let wal_path = wal_path_for(&db_path);
let db_mt = mtime_nanos(&db_path);
let _wal_mt = if wal_path.exists() { mtime_nanos(&wal_path) } else { 0 };
let _wal_mt = if wal_path.exists() {
mtime_nanos(&wal_path)
} else {
0
};
// 只要主 .db 没变,就把 cached 产物载回来。
// 如果 WAL mtime 变了,后续 `get()` 会自动走 Path 2在已有 cached DB 上增量 apply_wal
// 而不是 daemon 重启后第一条请求又退回全量解密。
if db_mt == entry.db_mt {
inner.insert(rel_key.clone(), CacheEntry {
db_mtime: db_mt,
// 保留"cached 产物构建时看到的 wal_mtime",让 `get()` 去比较当前 WAL
// 是否发生了变化,从而决定 exact-hit 还是 WAL 增量。
wal_mtime: entry.wal_mt,
decrypted_path: dec_path,
});
inner.insert(
rel_key.clone(),
CacheEntry {
db_mtime: db_mt,
// 保留"cached 产物构建时看到的 wal_mtime",让 `get()` 去比较当前 WAL
// 是否发生了变化,从而决定 exact-hit 还是 WAL 增量。
wal_mtime: entry.wal_mt,
decrypted_path: dec_path,
},
);
reused += 1;
}
}
@ -123,13 +165,19 @@ impl DbCache {
async fn save_persistent(&self) {
let mtime_file = &self.mtime_file;
let inner = self.inner.lock().await;
let data: HashMap<String, MtimeEntry> = inner.iter().map(|(k, v)| {
(k.clone(), MtimeEntry {
db_mt: v.db_mtime,
wal_mt: v.wal_mtime,
path: v.decrypted_path.to_string_lossy().into_owned(),
let data: HashMap<String, MtimeEntry> = inner
.iter()
.map(|(k, v)| {
(
k.clone(),
MtimeEntry {
db_mt: v.db_mtime,
wal_mt: v.wal_mtime,
path: v.decrypted_path.to_string_lossy().into_owned(),
},
)
})
}).collect();
.collect();
drop(inner);
if let Ok(json) = serde_json::to_string_pretty(&data) {
@ -148,14 +196,19 @@ impl DbCache {
/// WeChat 在写消息时只 append WAL除非触发 checkpoint因此 path 2 是常态;
/// 这条路径把"每次请求都全量解密 ~1.8GB DB~120s"压到"只解 WAL 帧(典型 < 10s"。
pub async fn get(&self, rel_key: &str) -> Result<Option<PathBuf>> {
Ok(self.get_with_mode(rel_key).await?.map(|r| r.path))
}
pub async fn get_with_mode(&self, rel_key: &str) -> Result<Option<CacheResolve>> {
let enc_key_hex = match self.all_keys.get(rel_key) {
Some(k) => k.clone(),
None => return Ok(None),
};
let db_path = self.db_dir.join(
rel_key.replace('\\', std::path::MAIN_SEPARATOR_STR)
.replace('/', std::path::MAIN_SEPARATOR_STR)
rel_key
.replace('\\', std::path::MAIN_SEPARATOR_STR)
.replace('/', std::path::MAIN_SEPARATOR_STR),
);
if !db_path.exists() {
return Ok(None);
@ -163,21 +216,28 @@ impl DbCache {
let wal_path = wal_path_for(&db_path);
let db_mt = mtime_nanos(&db_path);
let wal_mt = if wal_path.exists() { mtime_nanos(&wal_path) } else { 0 };
let wal_mt = if wal_path.exists() {
mtime_nanos(&wal_path)
} else {
0
};
let cached = {
let inner = self.inner.lock().await;
inner.get(rel_key).cloned()
};
let enc_key_bytes = hex_to_32bytes(&enc_key_hex)
.with_context(|| format!("密钥格式错误: {}", rel_key))?;
let enc_key_bytes =
hex_to_32bytes(&enc_key_hex).with_context(|| format!("密钥格式错误: {}", rel_key))?;
// Path 1 / Path 2主 .db mtime 未变且 cached 产物仍在
if let Some(entry) = cached.as_ref() {
if entry.db_mtime == db_mt && entry.decrypted_path.exists() {
if entry.wal_mtime == wal_mt {
return Ok(Some(entry.decrypted_path.clone()));
return Ok(Some(CacheResolve {
path: entry.decrypted_path.clone(),
mode: CacheMode::CacheHit,
}));
}
// Path 2: WAL-only 变化 → 在 cached 产物上重新 apply_wal
@ -190,20 +250,31 @@ impl DbCache {
let key_copy = enc_key_bytes;
tokio::task::spawn_blocking(move || {
wal::apply_wal(&wal_path2, &out_path2, &key_copy)
}).await??;
})
.await??;
}
eprintln!("[cache] WAL 增量 {} ({}ms)", rel_key, t0.elapsed().as_millis());
eprintln!(
"[cache] WAL 增量 {} ({}ms)",
rel_key,
t0.elapsed().as_millis()
);
{
let mut inner = self.inner.lock().await;
inner.insert(rel_key.to_string(), CacheEntry {
db_mtime: db_mt,
wal_mtime: wal_mt,
decrypted_path: out_path.clone(),
});
inner.insert(
rel_key.to_string(),
CacheEntry {
db_mtime: db_mt,
wal_mtime: wal_mt,
decrypted_path: out_path.clone(),
},
);
}
self.save_persistent().await;
return Ok(Some(out_path));
return Ok(Some(CacheResolve {
path: out_path,
mode: CacheMode::WalIncremental,
}));
}
}
@ -213,39 +284,51 @@ impl DbCache {
let db_path2 = db_path.clone();
let out_path2 = out_path.clone();
let key_copy = enc_key_bytes;
tokio::task::spawn_blocking(move || {
crypto::full_decrypt(&db_path2, &out_path2, &key_copy)
}).await??;
tokio::task::spawn_blocking(move || crypto::full_decrypt(&db_path2, &out_path2, &key_copy))
.await??;
if wal_path.exists() {
let out_path3 = out_path.clone();
let wal_path3 = wal_path.clone();
let key_copy2 = enc_key_bytes;
tokio::task::spawn_blocking(move || {
wal::apply_wal(&wal_path3, &out_path3, &key_copy2)
}).await??;
tokio::task::spawn_blocking(move || wal::apply_wal(&wal_path3, &out_path3, &key_copy2))
.await??;
}
eprintln!("[cache] 全量解密 {} ({}ms)", rel_key, t0.elapsed().as_millis());
eprintln!(
"[cache] 全量解密 {} ({}ms)",
rel_key,
t0.elapsed().as_millis()
);
{
let mut inner = self.inner.lock().await;
inner.insert(rel_key.to_string(), CacheEntry {
db_mtime: db_mt,
wal_mtime: wal_mt,
decrypted_path: out_path.clone(),
});
inner.insert(
rel_key.to_string(),
CacheEntry {
db_mtime: db_mt,
wal_mtime: wal_mt,
decrypted_path: out_path.clone(),
},
);
}
self.save_persistent().await;
Ok(Some(out_path))
Ok(Some(CacheResolve {
path: out_path,
mode: CacheMode::FullDecrypt,
}))
}
}
pub(super) fn mtime_nanos(path: &Path) -> u64 {
std::fs::metadata(path)
.and_then(|m| m.modified())
.map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos() as u64)
.map(|t| {
t.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
})
.unwrap_or(0)
}
@ -273,8 +356,7 @@ mod tests {
use super::*;
/// 64 字符 hex不需要是真 SQLCipher key — 仅用来证明"是否触发了 full_decrypt"
const FAKE_KEY_HEX: &str =
"0000000000000000000000000000000000000000000000000000000000000000";
const FAKE_KEY_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
/// 路径区分约定:
/// - 完全 hit / WAL 增量 → `decrypted_path` **内容不变**
@ -337,7 +419,11 @@ mod tests {
let (cache, _db_path, decrypted_path, _mtime_file, rel_key) =
setup_seeded_cache("exact").await;
let p = cache.get(&rel_key).await.unwrap().expect("cache should hit");
let p = cache
.get(&rel_key)
.await
.unwrap()
.expect("cache should hit");
assert_eq!(p, decrypted_path);
// 完全 hit → cached file 内容不应被改
@ -387,7 +473,10 @@ mod tests {
// 第一次:完全 hit
let p1 = cache.get(&rel_key).await.unwrap().expect("first get hits");
assert_eq!(p1, decrypted_path);
assert_eq!(std::fs::read(&decrypted_path).unwrap(), ORIGINAL_CACHED_BYTES);
assert_eq!(
std::fs::read(&decrypted_path).unwrap(),
ORIGINAL_CACHED_BYTES
);
// bump WAL mtime重写仍 31 bytesapply_wal 仍 noop
std::thread::sleep(std::time::Duration::from_millis(20));
@ -442,6 +531,72 @@ mod tests {
);
}
#[tokio::test]
async fn get_with_mode_reports_each_path() {
let root = unique_tmpdir("getwithmode");
let db_dir = root.join("db_storage");
let cache_dir = root.join("cache");
std::fs::create_dir_all(&db_dir).unwrap();
std::fs::create_dir_all(&cache_dir).unwrap();
let rel_key = "message_0.db".to_string();
let db_path = db_dir.join(&rel_key);
std::fs::write(&db_path, b"fake encrypted db").unwrap();
let wal_path = wal_path_for(&db_path);
std::fs::write(&wal_path, [0u8; 31]).unwrap();
let cached_hash = format!("{:x}", md5::compute(rel_key.as_bytes()));
let decrypted_path = cache_dir.join(format!("{}.db", cached_hash));
std::fs::write(&decrypted_path, ORIGINAL_CACHED_BYTES).unwrap();
let db_mt = mtime_nanos(&db_path);
let wal_mt0 = mtime_nanos(&wal_path);
let mtime_file = cache_dir.join("_mtimes.json");
let payload = serde_json::to_string(&serde_json::json!({
&rel_key: {
"db_mt": db_mt,
"wal_mt": wal_mt0,
"path": decrypted_path.display().to_string(),
}
}))
.unwrap();
std::fs::write(&mtime_file, payload).unwrap();
let mut all_keys = HashMap::new();
all_keys.insert(rel_key.clone(), FAKE_KEY_HEX.to_string());
let cache = DbCache::with_dirs(db_dir, cache_dir, mtime_file, all_keys)
.await
.unwrap();
let hit = cache
.get_with_mode(&rel_key)
.await
.unwrap()
.expect("cache should hit");
assert_eq!(hit.path, decrypted_path);
assert_eq!(hit.mode, CacheMode::CacheHit);
std::thread::sleep(std::time::Duration::from_millis(20));
std::fs::write(&wal_path, [0xffu8; 31]).unwrap();
let wal = cache
.get_with_mode(&rel_key)
.await
.unwrap()
.expect("WAL-only change should stay incremental");
assert_eq!(wal.path, decrypted_path);
assert_eq!(wal.mode, CacheMode::WalIncremental);
std::thread::sleep(std::time::Duration::from_millis(20));
std::fs::write(&db_path, b"different bytes").unwrap();
let full = cache
.get_with_mode(&rel_key)
.await
.unwrap()
.expect("db mtime change should trigger full decrypt");
assert_eq!(full.path, decrypted_path);
assert_eq!(full.mode, CacheMode::FullDecrypt);
}
#[tokio::test]
async fn restart_with_wal_change_still_reuses_cached_db_then_applies_wal() {
let root = unique_tmpdir("restart-wal");
@ -486,7 +641,11 @@ mod tests {
.await
.unwrap();
let p = cache.get(&rel_key).await.unwrap().expect("cache should reuse persisted DB");
let p = cache
.get(&rel_key)
.await
.unwrap()
.expect("cache should reuse persisted DB");
assert_eq!(p, decrypted_path);
let body = std::fs::read(&decrypted_path).unwrap();
assert_eq!(

269
src/daemon/meta.rs 100644
View File

@ -0,0 +1,269 @@
//! Freshness metadata appended to every q_* response.
//!
//! 背景:`all_keys.json` 是 `wx init` 时的快照。WeChat 在 daemon 启动后随时可能创建
//! 新的 `message_N.db` 分片;如果只信任 init 时收到的 `msg_db_keys` 列表,新分片里
//! 的数据对 daemon 完全不可见 → 调用方拿到的是看似正常但缺数据的结果("stale")。
//!
//! 本模块的职责:
//! 1. 提供 `Meta` 结构体,由各 `q_*` 函数填充后塞进 response顶层 `meta` 字段)。
//! 2. 提供 `discover_unknown_shards(db_dir, msg_db_keys)`:扫描磁盘上当前真实存在的
//! `message/message_*.db` 文件diff 出 daemon 未持有 enc_key 的"未知分片"列表。
//! 3. 集中 `MetaStatus` 的判定规则,避免 8 个 q_* 各自判,规则漂移。
use serde::Serialize;
use std::collections::HashMap;
use std::path::Path;
/// 每条 q_* 响应附带的"新鲜度元数据"。
///
/// 序列化为 JSON 时,所有 `Option` 字段在 `None` 时省略,让最常见的命令调用
/// 输出尽量短重负载字段per_shard_*、shard_paths默认不填由 CLI 层
/// 通过 `--debug-source` 等开关显式请求时才放进来。
#[derive(Debug, Clone, Serialize, Default)]
pub struct Meta {
/// 命中数据中最新一条的 create_timeunix 秒)。
/// `q_history` / `q_search` / `q_new_messages` 等基于 Msg_ 表的查询都应填。
/// `q_sessions` / `q_unread` 这类基于 SessionTable 的查询填会话维度的最新 ts。
#[serde(skip_serializing_if = "Option::is_none")]
pub chat_latest_timestamp: Option<i64>,
/// 上面那条最新消息所在的分片 rel_key`message/message_3.db`)。
/// 让 agent 一眼看出"当前命中的最新数据来自哪个分片"。
#[serde(skip_serializing_if = "Option::is_none")]
pub chat_latest_db: Option<String>,
/// 该 chat 在 `session.db.SessionTable.last_timestamp` 里的值(如果可读)。
/// 这是 WeChat 自己写的"最近一条消息时间",与上面 `chat_latest_timestamp` 比较
/// 即可发现"session 说有更新但 history 没读到" → 漏分片。
#[serde(skip_serializing_if = "Option::is_none")]
pub session_last_timestamp: Option<i64>,
/// 本次查询实际遍历的分片数(即 `names.msg_db_keys.len()` 的子集;包括命中 0 行的)。
pub shards_scanned: usize,
/// 本次查询里至少返回了 1 行的分片数。
pub shards_hit: usize,
/// 磁盘上存在但 daemon 没有 enc_key 的分片 rel_key 列表。
/// 非空 ⇒ `wx init` 之后 WeChat 又分裂了新分片 → 必须重跑 `wx init`。
pub unknown_shards: Vec<String>,
/// 由上述字段派生出的总体状态CLI / agent 主要看这一个。
pub status: MetaStatus,
// 重负载/调试字段默认不填CLI 层显式开启
#[serde(skip_serializing_if = "Option::is_none")]
pub per_shard_latest: Option<HashMap<String, i64>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cache_mode_per_shard: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shard_paths: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum MetaStatus {
#[default]
Ok,
/// `session.db` 的最新时间明显领先于本次消息查询结果,说明数据可能过期或不完整。
PossiblyStale,
/// 最强信号:磁盘上出现 daemon 不认识的新分片,通常必须重跑 `wx init --force`。
PossiblyStaleUnknownShards,
/// 调用方主动传了 `since` / `until` / `offset` 等窗口条件,结果天然是局部视图。
Windowed,
}
/// session 领先 history 多少秒就报 `PossiblyStale`。
///
/// 24h 的取值是故意保守的:活跃群聊/私聊很少会整整一天没有新消息,
/// 超过这个窗口就值得显式提醒 agent 不要把结果当成“当前最新状态”。
pub const STALE_THRESHOLD_SECS: i64 = 24 * 3600;
/// 统一 freshness status 的优先级:
/// 1. `unknown_shards` 非空daemon 整体视图已经过期,优先返回 `PossiblyStaleUnknownShards`
/// 2. `windowed=true`:调用方本来就在看局部窗口,不参与 stale 推导
/// 3. `session_last - chat_latest > STALE_THRESHOLD_SECS`:返回 `PossiblyStale`
/// 4. 其他情况:`Ok`
pub fn derive_status(
chat_latest: Option<i64>,
session_last: Option<i64>,
unknown_shards: &[String],
windowed: bool,
) -> MetaStatus {
if !unknown_shards.is_empty() {
return MetaStatus::PossiblyStaleUnknownShards;
}
if windowed {
return MetaStatus::Windowed;
}
match (chat_latest, session_last) {
(Some(c), Some(s)) if s - c > STALE_THRESHOLD_SECS => MetaStatus::PossiblyStale,
_ => MetaStatus::Ok,
}
}
/// 扫描 `<db_dir>/message/` 下真实存在的 `message_*.db`diff 出 daemon 当前没有 key
/// 的未知分片。
///
/// 契约:
/// - 返回值一律是 `/` 分隔的 rel_key如 `message/message_3.db`),与 `all_keys.json` 对齐
/// - 结果按字典序排序,方便测试和 CLI 稳定显示
/// - 排除 `_fts*` / `_resource*`,因为它们是索引/附件库,不属于消息分片真相
pub fn discover_unknown_shards(db_dir: &Path, known: &[String]) -> Vec<String> {
let known_set: std::collections::HashSet<String> =
known.iter().map(|k| k.replace('\\', "/")).collect();
let msg_dir = db_dir.join("message");
let entries = match std::fs::read_dir(&msg_dir) {
Ok(it) => it,
Err(_) => return Vec::new(),
};
let mut unknown: Vec<String> = Vec::new();
for entry in entries.flatten() {
let name = entry.file_name();
let Some(name_str) = name.to_str() else {
continue;
};
if !is_message_shard(name_str) {
continue;
}
let rel = format!("message/{}", name_str);
if !known_set.contains(&rel) {
unknown.push(rel);
}
}
unknown.sort();
unknown
}
fn is_message_shard(file_name: &str) -> bool {
if !file_name.starts_with("message_") || !file_name.ends_with(".db") {
return false;
}
if file_name.contains("_fts") || file_name.contains("_resource") {
return false;
}
let stem = &file_name["message_".len()..file_name.len() - ".db".len()];
!stem.is_empty() && stem.chars().all(|c| c.is_ascii_digit())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn is_message_shard_accepts_normal_shards() {
assert!(is_message_shard("message_0.db"));
assert!(is_message_shard("message_12.db"));
}
#[test]
fn is_message_shard_rejects_fts_and_resource() {
assert!(!is_message_shard("message_0_fts.db"));
assert!(!is_message_shard("message_fts.db"));
assert!(!is_message_shard("message_0_resource.db"));
assert!(!is_message_shard("message_resource.db"));
}
#[test]
fn is_message_shard_rejects_non_digits() {
assert!(!is_message_shard("message_a.db"));
assert!(!is_message_shard("message_.db"));
assert!(!is_message_shard("session.db"));
assert!(!is_message_shard("message_0.db.bak"));
}
#[test]
fn discover_unknown_shards_finds_disk_only_shards() {
let dir = tempdir();
let msg_dir = dir.join("message");
std::fs::create_dir_all(&msg_dir).unwrap();
for f in [
"message_0.db",
"message_1.db",
"message_2.db",
"message_0_fts.db",
] {
std::fs::write(msg_dir.join(f), b"").unwrap();
}
let known = vec![
"message/message_0.db".to_string(),
"message/message_1.db".to_string(),
];
let unknown = discover_unknown_shards(&dir, &known);
assert_eq!(unknown, vec!["message/message_2.db".to_string()]);
}
#[test]
fn discover_unknown_shards_normalizes_backslash_in_known_keys() {
let dir = tempdir();
let msg_dir = dir.join("message");
std::fs::create_dir_all(&msg_dir).unwrap();
std::fs::write(msg_dir.join("message_0.db"), b"").unwrap();
let known = vec!["message\\message_0.db".to_string()];
assert!(discover_unknown_shards(&dir, &known).is_empty());
}
#[test]
fn discover_unknown_shards_returns_empty_when_message_dir_missing() {
let dir = tempdir();
assert!(discover_unknown_shards(&dir, &[]).is_empty());
}
#[test]
fn derive_status_unknown_shards_overrides_windowed() {
let unknown = vec!["message/message_3.db".to_string()];
assert_eq!(
derive_status(Some(100), Some(100), &unknown, true),
MetaStatus::PossiblyStaleUnknownShards
);
}
#[test]
fn derive_status_windowed_when_user_paginates() {
assert_eq!(
derive_status(Some(100), Some(999_999), &[], true),
MetaStatus::Windowed,
);
}
#[test]
fn derive_status_possibly_stale_when_session_far_ahead() {
let chat = Some(1_000_000);
let session = Some(1_000_000 + STALE_THRESHOLD_SECS + 1);
assert_eq!(
derive_status(chat, session, &[], false),
MetaStatus::PossiblyStale
);
}
#[test]
fn derive_status_ok_when_within_threshold() {
let chat = Some(1_000_000);
let session = Some(1_000_000 + STALE_THRESHOLD_SECS - 1);
assert_eq!(derive_status(chat, session, &[], false), MetaStatus::Ok);
}
#[test]
fn derive_status_ok_when_either_side_unknown() {
assert_eq!(
derive_status(None, Some(999_999_999), &[], false),
MetaStatus::Ok
);
assert_eq!(derive_status(Some(1), None, &[], false), MetaStatus::Ok);
assert_eq!(derive_status(None, None, &[], false), MetaStatus::Ok);
}
fn tempdir() -> std::path::PathBuf {
let pid = std::process::id();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let p = std::env::temp_dir().join(format!("wx-cli-meta-test-{}-{}", pid, nanos));
std::fs::create_dir_all(&p).unwrap();
p
}
}

View File

@ -1,4 +1,5 @@
pub mod cache;
pub mod meta;
pub mod query;
pub mod server;

File diff suppressed because it is too large Load Diff

View File

@ -2,15 +2,12 @@ use anyhow::Result;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use crate::ipc::{Request, Response};
use super::cache::DbCache;
use super::query::Names;
use crate::ipc::{Request, Response};
/// 启动 IPC serverUnix socket / Windows named pipe
pub async fn serve(
db: Arc<DbCache>,
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
) -> Result<()> {
pub async fn serve(db: Arc<DbCache>, names: Arc<tokio::sync::RwLock<Arc<Names>>>) -> Result<()> {
#[cfg(unix)]
serve_unix(db, names).await?;
#[cfg(windows)]
@ -19,10 +16,7 @@ pub async fn serve(
}
#[cfg(unix)]
async fn serve_unix(
db: Arc<DbCache>,
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
) -> Result<()> {
async fn serve_unix(db: Arc<DbCache>, names: Arc<tokio::sync::RwLock<Arc<Names>>>) -> Result<()> {
use tokio::net::UnixListener;
let sock_path = crate::config::sock_path();
@ -88,9 +82,7 @@ async fn serve_windows(
db: Arc<DbCache>,
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
) -> Result<()> {
use interprocess::local_socket::{
tokio::prelude::*, GenericNamespaced, ListenerOptions,
};
use interprocess::local_socket::{tokio::prelude::*, GenericNamespaced, ListenerOptions};
// interprocess 的 GenericNamespaced 在 Windows 上会自动拼接 `\\.\pipe\` 前缀,
// 这里必须传相对名client 端用 `\\.\pipe\wx-cli-daemon` 直接打开可以对上
@ -141,13 +133,9 @@ async fn handle_connection_windows(
Ok(())
}
async fn dispatch(
req: Request,
db: &DbCache,
names: &tokio::sync::RwLock<Arc<Names>>,
) -> Response {
use crate::ipc::Request::*;
async fn dispatch(req: Request, db: &DbCache, names: &tokio::sync::RwLock<Arc<Names>>) -> Response {
use super::query;
use crate::ipc::Request::*;
// 取 guard → O(1) clone Arc → 立即 drop 锁。后续 await 期间不持有锁,
// 多个并发 IPC 请求可以真正并行。Names 本身不可变(由 daemon 启动时
@ -159,20 +147,66 @@ async fn dispatch(
match req {
Ping => Response::ok(serde_json::json!({ "pong": true })),
Sessions { limit } => {
match query::q_sessions(db, &names_arc, limit).await {
Sessions {
limit,
with_meta,
debug_source,
} => match query::q_sessions(db, &names_arc, limit, with_meta, debug_source).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
},
History {
chat,
limit,
offset,
since,
until,
msg_type,
with_meta,
debug_source,
} => {
match query::q_history(
db,
&names_arc,
&chat,
limit,
offset,
since,
until,
msg_type,
with_meta,
debug_source,
)
.await
{
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
History { chat, limit, offset, since, until, msg_type } => {
match query::q_history(db, &names_arc, &chat, limit, offset, since, until, msg_type).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Search { keyword, chats, limit, since, until, msg_type } => {
match query::q_search(db, &names_arc, &keyword, chats, limit, since, until, msg_type).await {
Search {
keyword,
chats,
limit,
since,
until,
msg_type,
with_meta,
debug_source,
} => {
match query::q_search(
db,
&names_arc,
&keyword,
chats,
limit,
since,
until,
msg_type,
with_meta,
debug_source,
)
.await
{
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
@ -183,74 +217,145 @@ async fn dispatch(
Err(e) => Response::err(e.to_string()),
}
}
Unread { limit, filter } => {
match query::q_unread(db, &names_arc, limit, filter).await {
Unread {
limit,
filter,
with_meta,
debug_source,
} => match query::q_unread(db, &names_arc, limit, filter, with_meta, debug_source).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
},
Members { chat } => match query::q_members(db, &names_arc, &chat).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
},
NewMessages {
state,
limit,
with_meta,
debug_source,
} => {
match query::q_new_messages(db, &names_arc, state, limit, with_meta, debug_source).await
{
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Members { chat } => {
match query::q_members(db, &names_arc, &chat).await {
Favorites {
limit,
fav_type,
query,
} => match query::q_favorites(db, limit, fav_type, query).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
},
Stats {
chat,
since,
until,
with_meta,
debug_source,
} => {
match query::q_stats(db, &names_arc, &chat, since, until, with_meta, debug_source).await
{
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
NewMessages { state, limit } => {
match query::q_new_messages(db, &names_arc, state, limit).await {
SnsNotifications {
limit,
since,
until,
include_read,
} => {
match query::q_sns_notifications(db, &names_arc, limit, since, until, include_read)
.await
{
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Favorites { limit, fav_type, query } => {
match query::q_favorites(db, limit, fav_type, query).await {
SnsFeed {
limit,
since,
until,
user,
} => match query::q_sns_feed(db, &names_arc, limit, since, until, user.as_deref()).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
},
SnsSearch {
keyword,
limit,
since,
until,
user,
} => {
match query::q_sns_search(
db,
&names_arc,
&keyword,
limit,
since,
until,
user.as_deref(),
)
.await
{
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Stats { chat, since, until } => {
match query::q_stats(db, &names_arc, &chat, since, until).await {
ReloadConfig => Response::ok(serde_json::json!({ "reloading": true })),
BizArticles {
limit,
account,
since,
until,
unread,
} => {
match query::q_biz_articles(db, &names_arc, limit, account, since, until, unread).await
{
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
SnsNotifications { limit, since, until, include_read } => {
match query::q_sns_notifications(db, &names_arc, limit, since, until, include_read).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
SnsFeed { limit, since, until, user } => {
match query::q_sns_feed(db, &names_arc, limit, since, until, user.as_deref()).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
SnsSearch { keyword, limit, since, until, user } => {
match query::q_sns_search(db, &names_arc, &keyword, limit, since, until, user.as_deref()).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
ReloadConfig => {
Response::ok(serde_json::json!({ "reloading": true }))
}
BizArticles { limit, account, since, until, unread } => {
match query::q_biz_articles(db, &names_arc, limit, account, since, until, unread).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Attachments { chat, kinds, limit, offset, since, until } => {
match query::q_attachments(db, &names_arc, &chat, kinds, limit, offset, since, until).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Extract { attachment_id, output, overwrite } => {
match query::q_extract(db, &names_arc, &attachment_id, &output, overwrite).await {
Attachments {
chat,
kinds,
limit,
offset,
since,
until,
with_meta,
debug_source,
} => {
match query::q_attachments(
db,
&names_arc,
&chat,
kinds,
limit,
offset,
since,
until,
with_meta,
debug_source,
)
.await
{
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Extract {
attachment_id,
output,
overwrite,
} => match query::q_extract(db, &names_arc, &attachment_id, &output, overwrite).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
},
}
}

View File

@ -1,6 +1,6 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// CLI 向 daemon 发送的请求(换行符分隔 JSON与 Python 版兼容)
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -10,6 +10,10 @@ pub enum Request {
Sessions {
#[serde(default = "default_limit_20")]
limit: usize,
#[serde(default, skip_serializing_if = "is_false")]
with_meta: bool,
#[serde(default, skip_serializing_if = "is_false")]
debug_source: bool,
},
History {
chat: String,
@ -23,6 +27,10 @@ pub enum Request {
until: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
msg_type: Option<i64>,
#[serde(default, skip_serializing_if = "is_false")]
with_meta: bool,
#[serde(default, skip_serializing_if = "is_false")]
debug_source: bool,
},
Search {
keyword: String,
@ -36,6 +44,10 @@ pub enum Request {
until: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
msg_type: Option<i64>,
#[serde(default, skip_serializing_if = "is_false")]
with_meta: bool,
#[serde(default, skip_serializing_if = "is_false")]
debug_source: bool,
},
Contacts {
#[serde(skip_serializing_if = "Option::is_none")]
@ -49,6 +61,10 @@ pub enum Request {
/// 按会话类型过滤private / group / official / folded / all支持多选
#[serde(default, skip_serializing_if = "Option::is_none")]
filter: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "is_false")]
with_meta: bool,
#[serde(default, skip_serializing_if = "is_false")]
debug_source: bool,
},
Members {
chat: String,
@ -60,6 +76,10 @@ pub enum Request {
state: Option<HashMap<String, i64>>,
#[serde(default = "default_limit_200")]
limit: usize,
#[serde(default, skip_serializing_if = "is_false")]
with_meta: bool,
#[serde(default, skip_serializing_if = "is_false")]
debug_source: bool,
},
Stats {
chat: String,
@ -67,6 +87,10 @@ pub enum Request {
since: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
until: Option<i64>,
#[serde(default, skip_serializing_if = "is_false")]
with_meta: bool,
#[serde(default, skip_serializing_if = "is_false")]
debug_source: bool,
},
Favorites {
#[serde(default = "default_limit_50")]
@ -146,6 +170,10 @@ pub enum Request {
since: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
until: Option<i64>,
#[serde(default, skip_serializing_if = "is_false")]
with_meta: bool,
#[serde(default, skip_serializing_if = "is_false")]
debug_source: bool,
},
/// 提取(解密)单个附件的本体到指定路径
Extract {
@ -159,7 +187,6 @@ pub enum Request {
},
}
/// daemon 的响应
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Response {
@ -172,11 +199,19 @@ pub struct Response {
impl Response {
pub fn ok(data: Value) -> Self {
Self { ok: true, error: None, data }
Self {
ok: true,
error: None,
data,
}
}
pub fn err(msg: impl Into<String>) -> Self {
Self { ok: false, error: Some(msg.into()), data: Value::Null }
Self {
ok: false,
error: Some(msg.into()),
data: Value::Null,
}
}
pub fn to_json_line(&self) -> anyhow::Result<String> {
@ -185,6 +220,15 @@ impl Response {
}
}
fn default_limit_20() -> usize { 20 }
fn default_limit_50() -> usize { 50 }
fn default_limit_200() -> usize { 200 }
fn default_limit_20() -> usize {
20
}
fn default_limit_50() -> usize {
50
}
fn default_limit_200() -> usize {
200
}
fn is_false(v: &bool) -> bool {
!*v
}