fix: 修复全部 medium/low 优先级问题

- cache/daemon: mtime 比较从 f64(secs) 改为 u64(nanos),避免浮点误差丢失变更
- transport: Unix 启动 daemon 前调用 setsid(),使其脱离控制终端防止 SIGHUP
- daemon/mod: 删除对已废弃 watcher 模块的引用
- watcher.rs: 删除全量死代码文件(功能已内联至 daemon/mod.rs)
- query: find_msg_tables 实际按 max_ts 降序排序(原注释有排序但无实现)
- scanner: 移除三平台 scan_memory 中的 dedup_by(search_pattern 已全局去重)
- watch: Windows 平台返回明确错误而非静默失败
- CI: cargo build 增加 --locked 确保使用 Cargo.lock 版本
pull/1/head
jackwener 2026-04-16 15:12:33 +08:00
parent 113e1d2907
commit 8bfea8869e
10 changed files with 46 additions and 195 deletions

View File

@ -44,7 +44,7 @@ jobs:
restore-keys: ${{ runner.os }}-cargo- restore-keys: ${{ runner.os }}-cargo-
- name: Build release - name: Build release
run: cargo build --release --target ${{ matrix.target }} run: cargo build --release --locked --target ${{ matrix.target }}
- name: Rename binary (Unix) - name: Rename binary (Unix)
if: matrix.os != 'windows-latest' if: matrix.os != 'windows-latest'

View File

@ -66,13 +66,15 @@ fn start_daemon() -> Result<()> {
#[cfg(unix)] #[cfg(unix)]
{ {
let _ = std::process::Command::new(&exe) use std::os::unix::process::CommandExt;
.env("WX_DAEMON_MODE", "1") let mut cmd = std::process::Command::new(&exe);
cmd.env("WX_DAEMON_MODE", "1")
.stdin(std::process::Stdio::null()) .stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null()) .stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null()) .stderr(std::process::Stdio::null());
.spawn() // SAFETY: setsid() 在 fork 后的子进程中调用,使 daemon 脱离控制终端
.context("无法启动 daemon 进程")?; unsafe { cmd.pre_exec(|| { libc::setsid(); Ok(()) }); }
let _ = cmd.spawn().context("无法启动 daemon 进程")?;
} }
#[cfg(windows)] #[cfg(windows)]

View File

@ -28,6 +28,11 @@ pub fn cmd_watch(chat: Option<String>, json: bool) -> Result<()> {
eprintln!("监听中Ctrl+C 退出)...\n"); eprintln!("监听中Ctrl+C 退出)...\n");
} }
#[cfg(windows)]
{
anyhow::bail!("watch 命令在 Windows 上暂不支持,请使用 Unix 系统");
}
#[cfg(unix)] #[cfg(unix)]
{ {
let reader = std::io::BufReader::new(stream.try_clone()?); let reader = std::io::BufReader::new(stream.try_clone()?);

View File

@ -11,15 +11,15 @@ use crate::crypto::wal;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
struct MtimeEntry { struct MtimeEntry {
db_mt: f64, db_mt: u64,
wal_mt: f64, wal_mt: u64,
path: String, path: String,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct CacheEntry { struct CacheEntry {
db_mtime: f64, db_mtime: u64,
wal_mtime: f64, wal_mtime: u64,
decrypted_path: PathBuf, decrypted_path: PathBuf,
} }
@ -83,10 +83,10 @@ impl DbCache {
let wal_path_str = format!("{}-wal", db_path.display()); let wal_path_str = format!("{}-wal", db_path.display());
let wal_path = Path::new(&wal_path_str); let wal_path = Path::new(&wal_path_str);
let db_mt = mtime_f64(&db_path); let db_mt = mtime_nanos(&db_path);
let wal_mt = if wal_path.exists() { mtime_f64(wal_path) } else { 0.0 }; let wal_mt = if wal_path.exists() { mtime_nanos(wal_path) } else { 0 };
if (db_mt - entry.db_mt).abs() < 0.001 && (wal_mt - entry.wal_mt).abs() < 0.001 { if db_mt == entry.db_mt && wal_mt == entry.wal_mt {
inner.insert(rel_key.clone(), CacheEntry { inner.insert(rel_key.clone(), CacheEntry {
db_mtime: db_mt, db_mtime: db_mt,
wal_mtime: wal_mt, wal_mtime: wal_mt,
@ -138,15 +138,15 @@ impl DbCache {
let wal_path_str = format!("{}-wal", db_path.display()); let wal_path_str = format!("{}-wal", db_path.display());
let wal_path = Path::new(&wal_path_str).to_path_buf(); let wal_path = Path::new(&wal_path_str).to_path_buf();
let db_mt = mtime_f64(&db_path); let db_mt = mtime_nanos(&db_path);
let wal_mt = if wal_path.exists() { mtime_f64(&wal_path) } else { 0.0 }; let wal_mt = if wal_path.exists() { mtime_nanos(&wal_path) } else { 0 };
// 检查缓存 // 检查缓存
{ {
let inner = self.inner.lock().await; let inner = self.inner.lock().await;
if let Some(entry) = inner.get(rel_key) { if let Some(entry) = inner.get(rel_key) {
if (entry.db_mtime - db_mt).abs() < 0.001 if entry.db_mtime == db_mt
&& (entry.wal_mtime - wal_mt).abs() < 0.001 && entry.wal_mtime == wal_mt
&& entry.decrypted_path.exists() && entry.decrypted_path.exists()
{ {
return Ok(Some(entry.decrypted_path.clone())); return Ok(Some(entry.decrypted_path.clone()));
@ -195,11 +195,11 @@ impl DbCache {
} }
} }
fn mtime_f64(path: &Path) -> f64 { fn mtime_nanos(path: &Path) -> u64 {
std::fs::metadata(path) std::fs::metadata(path)
.and_then(|m| m.modified()) .and_then(|m| m.modified())
.map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs_f64()) .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos() as u64)
.unwrap_or(0.0) .unwrap_or(0)
} }
fn hex_to_32bytes(s: &str) -> Result<[u8; 32]> { fn hex_to_32bytes(s: &str) -> Result<[u8; 32]> {

View File

@ -1,6 +1,5 @@
pub mod cache; pub mod cache;
pub mod query; pub mod query;
pub mod watcher;
pub mod server; pub mod server;
use anyhow::Result; use anyhow::Result;
@ -108,7 +107,7 @@ async fn run_watcher(
use std::time::Duration; use std::time::Duration;
use crate::ipc::WatchEvent; use crate::ipc::WatchEvent;
let mut last_mtime = 0.0f64; let mut last_mtime = 0u64;
let mut last_ts: HashMap<String, i64> = HashMap::new(); let mut last_ts: HashMap<String, i64> = HashMap::new();
let mut initialized = false; let mut initialized = false;
@ -119,11 +118,11 @@ async fn run_watcher(
continue; continue;
} }
let wal_mtime = match mtime_f64(&session_wal) { let wal_mtime = match mtime_nanos(&session_wal) {
Some(m) => m, 0 => continue,
None => continue, m => m,
}; };
if (wal_mtime - last_mtime).abs() < 0.001 { if wal_mtime == last_mtime {
continue; continue;
} }
last_mtime = wal_mtime; last_mtime = wal_mtime;
@ -206,10 +205,11 @@ async fn run_watcher(
} }
} }
fn mtime_f64(path: &std::path::Path) -> Option<f64> { fn mtime_nanos(path: &std::path::Path) -> u64 {
std::fs::metadata(path).ok()? std::fs::metadata(path)
.modified().ok() .and_then(|m| m.modified())
.map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs_f64()) .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos() as u64)
.unwrap_or(0)
} }
fn decompress_or_str(data: &[u8]) -> String { fn decompress_or_str(data: &[u8]) -> String {

View File

@ -349,7 +349,7 @@ async fn find_msg_tables(
return Ok(Vec::new()); return Ok(Vec::new());
} }
let mut results = Vec::new(); let mut results: Vec<(i64, std::path::PathBuf, String)> = Vec::new();
for rel_key in &names.msg_db_keys { for rel_key in &names.msg_db_keys {
let path = match db.get(rel_key).await? { let path = match db.get(rel_key).await? {
Some(p) => p, Some(p) => p,
@ -357,9 +357,8 @@ async fn find_msg_tables(
}; };
let tname = table_name.clone(); let tname = table_name.clone();
let path2 = path.clone(); let path2 = path.clone();
let exists: Option<i64> = tokio::task::spawn_blocking(move || { let max_ts: Option<i64> = tokio::task::spawn_blocking(move || {
let conn = Connection::open(&path2)?; let conn = Connection::open(&path2)?;
// 检查表是否存在
let table_exists: Option<i64> = conn.query_row( let table_exists: Option<i64> = conn.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?",
[&tname], [&tname],
@ -368,21 +367,22 @@ async fn find_msg_tables(
if table_exists.is_none() { if table_exists.is_none() {
return Ok::<_, anyhow::Error>(None); return Ok::<_, anyhow::Error>(None);
} }
let max_ts: Option<i64> = conn.query_row( let ts: Option<i64> = conn.query_row(
&format!("SELECT MAX(create_time) FROM [{}]", tname), &format!("SELECT MAX(create_time) FROM [{}]", tname),
[], [],
|row| row.get(0), |row| row.get(0),
).ok().flatten(); ).ok().flatten();
Ok(max_ts) Ok(ts)
}).await??; }).await??;
if exists.is_some() { if let Some(ts) = max_ts {
results.push((path.clone(), table_name.clone())); results.push((ts, path.clone(), table_name.clone()));
} }
} }
// 按最大时间戳排序(最新的优先) // 按最大时间戳降序排列(最新的优先)
Ok(results) results.sort_by_key(|(ts, _, _)| std::cmp::Reverse(*ts));
Ok(results.into_iter().map(|(_, p, t)| (p, t)).collect())
} }
fn query_messages( fn query_messages(

View File

@ -1,151 +0,0 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::broadcast;
use super::cache::DbCache;
use super::query::{fmt_type, Names};
use crate::ipc::WatchEvent;
/// 启动 WAL 变化监听 task
///
/// 每 500ms 检测 session.db-wal 的 mtime有变化时重新读 session.db
/// 找到 timestamp 更新的行broadcast 到所有 watch 客户端
#[allow(dead_code)]
pub async fn start_watcher(
db: &'static DbCache,
names_ref: &'static std::sync::RwLock<Names>,
tx: broadcast::Sender<WatchEvent>,
session_wal_path: PathBuf,
) {
tokio::spawn(async move {
let mut last_mtime = 0.0f64;
let mut last_ts: HashMap<String, i64> = HashMap::new();
let mut initialized = false;
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
// 如果没有订阅者,跳过
if tx.receiver_count() == 0 {
continue;
}
let wal_mtime = match mtime_f64(&session_wal_path) {
Some(m) => m,
None => continue,
};
if (wal_mtime - last_mtime).abs() < 0.001 {
continue;
}
last_mtime = wal_mtime;
// 重新解密 session.db
let path = match db.get("session/session.db").await {
Ok(Some(p)) => p,
_ => continue,
};
let path2 = path.clone();
let rows: Vec<(String, Vec<u8>, i64, i64, String)> = match tokio::task::spawn_blocking(move || {
let conn = rusqlite::Connection::open(&path2)?;
let mut stmt = conn.prepare(
"SELECT username, summary, last_timestamp, last_msg_type, last_msg_sender
FROM SessionTable WHERE last_timestamp > 0
ORDER BY last_timestamp DESC LIMIT 50"
)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Vec<u8>>(1).unwrap_or_default(),
row.get::<_, i64>(2)?,
row.get::<_, i64>(3).unwrap_or(0),
row.get::<_, String>(4).unwrap_or_default(),
))
})?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok::<_, anyhow::Error>(rows)
}).await {
Ok(Ok(r)) => r,
_ => continue,
};
let names_guard = names_ref.read().expect("names lock poisoned");
for (username, summary_bytes, ts, msg_type, sender) in &rows {
if !initialized {
last_ts.insert(username.clone(), *ts);
continue;
}
let prev_ts = last_ts.get(username).copied().unwrap_or(0);
if *ts <= prev_ts {
continue;
}
last_ts.insert(username.clone(), *ts);
let display = names_guard.display(username);
let is_group = username.contains("@chatroom");
let summary = decompress_or_str(summary_bytes);
let summary = if summary.contains(":\n") {
summary.splitn(2, ":\n").nth(1).unwrap_or(&summary).to_string()
} else {
summary
};
let sender_display = if !sender.is_empty() {
names_guard.map.get(sender).cloned().unwrap_or_else(|| sender.clone())
} else {
String::new()
};
let event = WatchEvent {
event: "message".into(),
time: Some(fmt_time_hhmm(*ts)),
chat: Some(display),
username: Some(username.clone()),
is_group: Some(is_group),
sender: Some(sender_display),
content: Some(summary),
msg_type: Some(fmt_type(*msg_type)),
timestamp: Some(*ts),
};
let _ = tx.send(event);
}
if !initialized {
initialized = true;
}
}
});
}
fn mtime_f64(path: &std::path::Path) -> Option<f64> {
std::fs::metadata(path)
.and_then(|m| m.modified())
.ok()
.map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs_f64())
}
fn decompress_or_str(data: &[u8]) -> String {
if data.is_empty() {
return String::new();
}
if let Ok(dec) = zstd::decode_all(data) {
if let Ok(s) = String::from_utf8(dec) {
return s;
}
}
String::from_utf8_lossy(data).into_owned()
}
fn fmt_time_hhmm(ts: i64) -> String {
use chrono::{Local, TimeZone};
Local.timestamp_opt(ts, 0)
.single()
.map(|dt| dt.format("%H:%M").to_string())
.unwrap_or_else(|| ts.to_string())
}

View File

@ -87,8 +87,6 @@ pub fn scan_keys(db_dir: &Path) -> Result<Vec<KeyEntry>> {
for (start, end) in &regions { for (start, end) in &regions {
scan_region(&mut mem_file, *start, *end, &mut raw_keys); scan_region(&mut mem_file, *start, *end, &mut raw_keys);
} }
// 去重
raw_keys.dedup_by(|a, b| a.0 == b.0 && a.1 == b.1);
eprintln!("找到 {} 个候选密钥", raw_keys.len()); eprintln!("找到 {} 个候选密钥", raw_keys.len());
let mut entries = Vec::new(); let mut entries = Vec::new();

View File

@ -193,8 +193,6 @@ fn scan_memory(task: mach_port_t) -> Result<Vec<(String, String)>> {
addr = addr.saturating_add(size); addr = addr.saturating_add(size);
} }
// 去重
results.dedup_by(|a, b| a.0 == b.0 && a.1 == b.1);
Ok(results) Ok(results)
} }

View File

@ -130,7 +130,6 @@ fn scan_memory(process: HANDLE) -> Result<Vec<(String, String)>> {
} }
} }
results.dedup_by(|a, b| a.0 == b.0 && a.1 == b.1);
Ok(results) Ok(results)
} }