mirror of https://github.com/jackwener/wx-cli.git
perf(daemon): Arc<Names> + tokio RwLock, O(1) clone per IPC request
Was: Arc<std::sync::RwLock<Names>>; each dispatch clone_names() copied 4 HashMaps (~100KB for a user with 2700 contacts) and used std RwLock which blocks the tokio worker thread during the clone. Now: Arc<tokio::sync::RwLock<Arc<Names>>>; dispatch takes the read guard, does Arc::clone (pointer bump), drops the guard, then spawns the query work. Names is immutable after daemon startup; Arc is ideal. Smoke tested: `wx sessions --json` returns correct data including chat_type; 8 concurrent clients finish in 12ms.pull/13/head
parent
e977007306
commit
1e52014a6b
|
|
@ -75,7 +75,10 @@ async fn async_run() -> Result<()> {
|
|||
let _ = db.get("session/session.db").await;
|
||||
eprintln!("[daemon] 预热完成,联系人 {} 个", names.map.len());
|
||||
|
||||
let names_arc = Arc::new(std::sync::RwLock::new(names));
|
||||
// 包一层内部 Arc:IPC 请求取 guard 后只做 Arc::clone(O(1)),
|
||||
// 避免每次请求都全量 clone 几千个联系人的 HashMap。
|
||||
// 用 tokio::sync::RwLock 允许 guard 跨 await(当前不跨,为未来 reload 留余地)。
|
||||
let names_arc = Arc::new(tokio::sync::RwLock::new(Arc::new(names)));
|
||||
|
||||
// 启动 IPC server(阻塞)
|
||||
server::serve(Arc::clone(&db), Arc::clone(&names_arc)).await?;
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use super::query::Names;
|
|||
/// 启动 IPC server(Unix socket / Windows named pipe)
|
||||
pub async fn serve(
|
||||
db: Arc<DbCache>,
|
||||
names: Arc<std::sync::RwLock<Names>>,
|
||||
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
||||
) -> Result<()> {
|
||||
#[cfg(unix)]
|
||||
serve_unix(db, names).await?;
|
||||
|
|
@ -21,7 +21,7 @@ pub async fn serve(
|
|||
#[cfg(unix)]
|
||||
async fn serve_unix(
|
||||
db: Arc<DbCache>,
|
||||
names: Arc<std::sync::RwLock<Names>>,
|
||||
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
||||
) -> Result<()> {
|
||||
use tokio::net::UnixListener;
|
||||
let sock_path = crate::config::sock_path();
|
||||
|
|
@ -58,7 +58,7 @@ async fn serve_unix(
|
|||
async fn handle_connection_unix(
|
||||
stream: tokio::net::UnixStream,
|
||||
db: Arc<DbCache>,
|
||||
names: Arc<std::sync::RwLock<Names>>,
|
||||
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
||||
) -> Result<()> {
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
let mut lines = BufReader::new(reader).lines();
|
||||
|
|
@ -86,7 +86,7 @@ async fn handle_connection_unix(
|
|||
#[cfg(windows)]
|
||||
async fn serve_windows(
|
||||
db: Arc<DbCache>,
|
||||
names: Arc<std::sync::RwLock<Names>>,
|
||||
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
||||
) -> Result<()> {
|
||||
use interprocess::local_socket::{
|
||||
tokio::prelude::*, GenericNamespaced, ListenerOptions,
|
||||
|
|
@ -117,7 +117,7 @@ async fn serve_windows(
|
|||
async fn handle_connection_windows(
|
||||
conn: interprocess::local_socket::tokio::Stream,
|
||||
db: Arc<DbCache>,
|
||||
names: Arc<std::sync::RwLock<Names>>,
|
||||
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
||||
) -> Result<()> {
|
||||
let (reader, mut writer) = tokio::io::split(conn);
|
||||
let mut lines = BufReader::new(reader).lines();
|
||||
|
|
@ -144,79 +144,59 @@ async fn handle_connection_windows(
|
|||
async fn dispatch(
|
||||
req: Request,
|
||||
db: &DbCache,
|
||||
names: &std::sync::RwLock<Names>,
|
||||
names: &tokio::sync::RwLock<Arc<Names>>,
|
||||
) -> Response {
|
||||
use crate::ipc::Request::*;
|
||||
use super::query;
|
||||
|
||||
// 取 guard → O(1) clone Arc → 立即 drop 锁。后续 await 期间不持有锁,
|
||||
// 多个并发 IPC 请求可以真正并行。Names 本身不可变(由 daemon 启动时
|
||||
// 一次性构建),共享 Arc 即可。
|
||||
let names_arc: Arc<Names> = {
|
||||
let guard = names.read().await;
|
||||
Arc::clone(&*guard)
|
||||
};
|
||||
|
||||
match req {
|
||||
Ping => Response::ok(serde_json::json!({ "pong": true })),
|
||||
Sessions { limit } => {
|
||||
let names_snapshot = match clone_names(names) {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Response::err(e),
|
||||
};
|
||||
match query::q_sessions(db, &names_snapshot, limit).await {
|
||||
match query::q_sessions(db, &names_arc, limit).await {
|
||||
Ok(v) => Response::ok(v),
|
||||
Err(e) => Response::err(e.to_string()),
|
||||
}
|
||||
}
|
||||
History { chat, limit, offset, since, until, msg_type } => {
|
||||
let names_snapshot = match clone_names(names) {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Response::err(e),
|
||||
};
|
||||
match query::q_history(db, &names_snapshot, &chat, limit, offset, since, until, msg_type).await {
|
||||
match query::q_history(db, &names_arc, &chat, limit, offset, since, until, msg_type).await {
|
||||
Ok(v) => Response::ok(v),
|
||||
Err(e) => Response::err(e.to_string()),
|
||||
}
|
||||
}
|
||||
Search { keyword, chats, limit, since, until, msg_type } => {
|
||||
let names_snapshot = match clone_names(names) {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Response::err(e),
|
||||
};
|
||||
match query::q_search(db, &names_snapshot, &keyword, chats, limit, since, until, msg_type).await {
|
||||
match query::q_search(db, &names_arc, &keyword, chats, limit, since, until, msg_type).await {
|
||||
Ok(v) => Response::ok(v),
|
||||
Err(e) => Response::err(e.to_string()),
|
||||
}
|
||||
}
|
||||
Contacts { query, limit } => {
|
||||
let names_snapshot = match clone_names(names) {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Response::err(e),
|
||||
};
|
||||
match query::q_contacts(&names_snapshot, query.as_deref(), limit).await {
|
||||
match query::q_contacts(&names_arc, query.as_deref(), limit).await {
|
||||
Ok(v) => Response::ok(v),
|
||||
Err(e) => Response::err(e.to_string()),
|
||||
}
|
||||
}
|
||||
Unread { limit, filter } => {
|
||||
let names_snapshot = match clone_names(names) {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Response::err(e),
|
||||
};
|
||||
match query::q_unread(db, &names_snapshot, limit, filter).await {
|
||||
match query::q_unread(db, &names_arc, limit, filter).await {
|
||||
Ok(v) => Response::ok(v),
|
||||
Err(e) => Response::err(e.to_string()),
|
||||
}
|
||||
}
|
||||
Members { chat } => {
|
||||
let names_snapshot = match clone_names(names) {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Response::err(e),
|
||||
};
|
||||
match query::q_members(db, &names_snapshot, &chat).await {
|
||||
match query::q_members(db, &names_arc, &chat).await {
|
||||
Ok(v) => Response::ok(v),
|
||||
Err(e) => Response::err(e.to_string()),
|
||||
}
|
||||
}
|
||||
NewMessages { state, limit } => {
|
||||
let names_snapshot = match clone_names(names) {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Response::err(e),
|
||||
};
|
||||
match query::q_new_messages(db, &names_snapshot, state, limit).await {
|
||||
match query::q_new_messages(db, &names_arc, state, limit).await {
|
||||
Ok(v) => Response::ok(v),
|
||||
Err(e) => Response::err(e.to_string()),
|
||||
}
|
||||
|
|
@ -228,25 +208,10 @@ async fn dispatch(
|
|||
}
|
||||
}
|
||||
Stats { chat, since, until } => {
|
||||
let names_snapshot = match clone_names(names) {
|
||||
Ok(n) => n,
|
||||
Err(e) => return Response::err(e),
|
||||
};
|
||||
match query::q_stats(db, &names_snapshot, &chat, since, until).await {
|
||||
match query::q_stats(db, &names_arc, &chat, since, until).await {
|
||||
Ok(v) => Response::ok(v),
|
||||
Err(e) => Response::err(e.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 克隆 Names 以避免 RwLockGuard 跨 await
|
||||
fn clone_names(names: &std::sync::RwLock<Names>) -> Result<Names, String> {
|
||||
let guard = names.read().map_err(|_| "内部错误: names lock poisoned".to_string())?;
|
||||
Ok(Names {
|
||||
map: guard.map.clone(),
|
||||
md5_to_uname: guard.md5_to_uname.clone(),
|
||||
msg_db_keys: guard.msg_db_keys.clone(),
|
||||
verify_flags: guard.verify_flags.clone(),
|
||||
})
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue