diff --git a/src/cli/daemon_cmd.rs b/src/cli/daemon_cmd.rs index 31b0792..4b539d6 100644 --- a/src/cli/daemon_cmd.rs +++ b/src/cli/daemon_cmd.rs @@ -8,6 +8,7 @@ pub fn cmd_daemon(cmd: DaemonCommands) -> Result<()> { DaemonCommands::Status => cmd_status(), DaemonCommands::Stop => cmd_stop(), DaemonCommands::Logs { follow, lines } => cmd_logs(follow, lines), + DaemonCommands::Start { tcp } => crate::daemon::run_start(tcp), } } diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 3a28060..786b62b 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -262,6 +262,12 @@ pub enum DaemonCommands { #[arg(short = 'n', long, default_value = "50")] lines: usize, }, + /// 启动 daemon + Start { + /// 同时监听 TCP 地址(如 127.0.0.1:9876) + #[arg(long)] + tcp: Option, + }, } pub fn run() { diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 02dc99f..b25a5e1 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -13,13 +13,48 @@ use crate::config; /// 当 WX_DAEMON_MODE 环境变量设置时,main() 调用此函数 pub fn run() { let rt = tokio::runtime::Runtime::new().expect("无法创建 tokio runtime"); - if let Err(e) = rt.block_on(async_run()) { + if let Err(e) = rt.block_on(start_daemon(None)) { eprintln!("[daemon] 启动失败: {}", e); std::process::exit(1); } } -async fn async_run() -> Result<()> { +/// 从 CLI `wx daemon start [--tcp ADDR]` 调用 +/// +/// 查找当前可执行文件路径,设置 WX_DAEMON_MODE=1,后台启动新进程。 +pub fn run_start(tcp_addr: Option) -> Result<()> { + let exe = std::env::current_exe()?; + let log = config::log_path(); + + let mut cmd = std::process::Command::new(&exe); + cmd.env("WX_DAEMON_MODE", "1"); + if let Some(addr) = &tcp_addr { + cmd.env("WX_DAEMON_TCP_ADDR", addr); + } + // 日志重定向 + let log_file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&log)?; + cmd.stdout(log_file.try_clone()?).stderr(log_file); + + #[cfg(unix)] + { + use std::os::unix::process::CommandExt; + unsafe { cmd.pre_exec(|| { + libc::setsid(); + Ok(()) + }) }; + } + + let child = cmd.spawn()?; + let pid = child.id(); + eprintln!("[daemon] 已启动 daemon 进程 (PID {})", pid); + Ok(()) +} + +/// daemon 核心启动逻辑(被 run() 和 WX_DAEMON_MODE 路径共享) +pub async fn start_daemon(tcp_addr: Option) -> Result<()> { // 确保工作目录存在 let cli_dir = config::cli_dir(); tokio::fs::create_dir_all(&cli_dir).await?; @@ -76,13 +111,21 @@ async fn async_run() -> Result<()> { let _ = db.get("sns/sns.db").await; eprintln!("[daemon] 预热完成,联系人 {} 个", names.map.len()); - // 包一层内部 Arc:IPC 请求取 guard 后只做 Arc::clone(O(1)), - // 避免每次请求都全量 clone 几千个联系人的 HashMap。 - // 用 tokio::sync::RwLock 允许 guard 跨 await(当前不跨,为未来 reload 留余地)。 + // 包一层内部 Arc let names_arc = Arc::new(tokio::sync::RwLock::new(Arc::new(names))); + // 检查环境变量中的 TCP 地址(WX_DAEMON_MODE 路径下通过 env 传入) + let effective_tcp_addr = tcp_addr.or_else(|| std::env::var("WX_DAEMON_TCP_ADDR").ok()); + // 启动 IPC server(阻塞) - server::serve(Arc::clone(&db), Arc::clone(&names_arc)).await?; + server::serve(Arc::clone(&db), Arc::clone(&names_arc), effective_tcp_addr.as_deref()).await?; + + // 正常退出时清理(signal 路径下由 cleanup_and_exit 处理,不会走到这里) + #[allow(unreachable_code)] + { + let _ = std::fs::remove_file(config::sock_path()); + let _ = std::fs::remove_file(config::pid_path()); + } Ok(()) } @@ -132,7 +175,9 @@ async fn setup_signal_handler() { }); } +#[cfg(unix)] fn cleanup_and_exit() { + // 仅清理 local socket 文件,TCP 端口由 OS 自动回收 let _ = std::fs::remove_file(config::sock_path()); let _ = std::fs::remove_file(config::pid_path()); std::process::exit(0); diff --git a/src/daemon/server.rs b/src/daemon/server.rs index 896a08e..ce4ad3c 100644 --- a/src/daemon/server.rs +++ b/src/daemon/server.rs @@ -1,16 +1,32 @@ use anyhow::Result; use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use crate::ipc::{Request, Response}; +use crate::transport::{self, Listener}; use super::cache::DbCache; use super::query::Names; -/// 启动 IPC server(Unix socket / Windows named pipe) +/// 启动 IPC server(Unix socket / Windows named pipe + 可选 TCP) +/// +/// 当 `tcp_addr` 为 `Some` 时,同时监听 TCP 端口;daemon 在 local listener 退出时退出。 pub async fn serve( db: Arc, names: Arc>>, + tcp_addr: Option<&str>, ) -> Result<()> { + // TCP 先启动为后台任务 + if let Some(addr) = tcp_addr { + let socket_addr: std::net::SocketAddr = addr.parse().map_err(|e| { + anyhow::anyhow!("TCP 地址解析失败 '{}': {}", addr, e) + })?; + let db_tcp = Arc::clone(&db); + let names_tcp = Arc::clone(&names); + tokio::spawn(async move { + if let Err(e) = serve_tcp(socket_addr, db_tcp, names_tcp).await { + eprintln!("[server] TCP 监听错误: {}", e); + } + }); + } + #[cfg(unix)] serve_unix(db, names).await?; #[cfg(windows)] @@ -18,6 +34,28 @@ pub async fn serve( Ok(()) } +async fn serve_tcp( + addr: std::net::SocketAddr, + db: Arc, + names: Arc>>, +) -> Result<()> { + let listener = transport::TcpListener::bind(addr).await?; + eprintln!("[server] 监听 TCP {}", addr); + + // TcpListener::accept 返回 Pin>,需要 Box::pin 包装循环 + let mut listener = listener; + loop { + let stream = listener.accept().await?; + let db2 = Arc::clone(&db); + let names2 = Arc::clone(&names); + tokio::spawn(async move { + if let Err(e) = transport::handle_connection(stream, &db2, &names2).await { + eprintln!("[server] 连接处理错误: {}", e); + } + }); + } +} + #[cfg(unix)] async fn serve_unix( db: Arc, @@ -47,42 +85,13 @@ async fn serve_unix( let names2 = Arc::clone(&names); tokio::spawn(async move { - if let Err(e) = handle_connection_unix(stream, db2, names2).await { + if let Err(e) = transport::handle_connection(stream, &db2, &names2).await { eprintln!("[server] 连接处理错误: {}", e); } }); } } -#[cfg(unix)] -async fn handle_connection_unix( - stream: tokio::net::UnixStream, - db: Arc, - names: Arc>>, -) -> Result<()> { - let (reader, mut writer) = stream.into_split(); - let mut lines = BufReader::new(reader).lines(); - - let line = match lines.next_line().await? { - Some(l) => l, - None => return Ok(()), - }; - - // 解析请求 - let req: Request = match serde_json::from_str(&line) { - Ok(r) => r, - Err(e) => { - let resp = Response::err(format!("JSON 解析错误: {}", e)); - writer.write_all(resp.to_json_line()?.as_bytes()).await?; - return Ok(()); - } - }; - - let resp = dispatch(req, &db, &names).await; - writer.write_all(resp.to_json_line()?.as_bytes()).await?; - Ok(()) -} - #[cfg(windows)] async fn serve_windows( db: Arc, @@ -106,130 +115,9 @@ async fn serve_windows( let names2 = Arc::clone(&names); tokio::spawn(async move { - if let Err(e) = handle_connection_windows(conn, db2, names2).await { + if let Err(e) = transport::handle_connection(conn, &db2, &names2).await { eprintln!("[server] 连接处理错误: {}", e); } }); } } - -#[cfg(windows)] -async fn handle_connection_windows( - conn: interprocess::local_socket::tokio::Stream, - db: Arc, - names: Arc>>, -) -> Result<()> { - let (reader, mut writer) = tokio::io::split(conn); - let mut lines = BufReader::new(reader).lines(); - - let line = match lines.next_line().await? { - Some(l) => l, - None => return Ok(()), - }; - - let req: Request = match serde_json::from_str(&line) { - Ok(r) => r, - Err(e) => { - let resp = Response::err(format!("JSON 解析错误: {}", e)); - writer.write_all(resp.to_json_line()?.as_bytes()).await?; - return Ok(()); - } - }; - - let resp = dispatch(req, &db, &names).await; - writer.write_all(resp.to_json_line()?.as_bytes()).await?; - Ok(()) -} - -async fn dispatch( - req: Request, - db: &DbCache, - names: &tokio::sync::RwLock>, -) -> Response { - use crate::ipc::Request::*; - use super::query; - - // 取 guard → O(1) clone Arc → 立即 drop 锁。后续 await 期间不持有锁, - // 多个并发 IPC 请求可以真正并行。Names 本身不可变(由 daemon 启动时 - // 一次性构建),共享 Arc 即可。 - let names_arc: Arc = { - let guard = names.read().await; - Arc::clone(&*guard) - }; - - match req { - Ping => Response::ok(serde_json::json!({ "pong": true })), - Sessions { limit } => { - match query::q_sessions(db, &names_arc, limit).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - History { chat, limit, offset, since, until, msg_type } => { - match query::q_history(db, &names_arc, &chat, limit, offset, since, until, msg_type).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - Search { keyword, chats, limit, since, until, msg_type } => { - match query::q_search(db, &names_arc, &keyword, chats, limit, since, until, msg_type).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - Contacts { query, limit } => { - match query::q_contacts(&names_arc, query.as_deref(), limit).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - Unread { limit, filter } => { - match query::q_unread(db, &names_arc, limit, filter).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - Members { chat } => { - match query::q_members(db, &names_arc, &chat).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - NewMessages { state, limit } => { - match query::q_new_messages(db, &names_arc, state, limit).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - Favorites { limit, fav_type, query } => { - match query::q_favorites(db, limit, fav_type, query).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - Stats { chat, since, until } => { - match query::q_stats(db, &names_arc, &chat, since, until).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - SnsNotifications { limit, since, until, include_read } => { - match query::q_sns_notifications(db, &names_arc, limit, since, until, include_read).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - SnsFeed { limit, since, until, user } => { - match query::q_sns_feed(db, &names_arc, limit, since, until, user.as_deref()).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - SnsSearch { keyword, limit, since, until, user } => { - match query::q_sns_search(db, &names_arc, &keyword, limit, since, until, user.as_deref()).await { - Ok(v) => Response::ok(v), - Err(e) => Response::err(e.to_string()), - } - } - } -}