mirror of https://github.com/jackwener/wx-cli.git
feat: Wired transport module into daemon server, added TCP listening al…
- src/daemon/server.rs - src/daemon/mod.rs - src/cli/daemon_cmd.rs - src/cli/mod.rs GSD context: - Milestone: M001 - TCP Transport - Slice: S01 - Task: T02 - Wired transport module into daemon server, added TCP listening alongside local transport, and implemented `wx daemon start [--tcp ADDR]` subcommand GSD-Task: S01/T02pull/43/head
parent
189110f36d
commit
1f7b843a1a
|
|
@ -8,6 +8,7 @@ pub fn cmd_daemon(cmd: DaemonCommands) -> Result<()> {
|
||||||
DaemonCommands::Status => cmd_status(),
|
DaemonCommands::Status => cmd_status(),
|
||||||
DaemonCommands::Stop => cmd_stop(),
|
DaemonCommands::Stop => cmd_stop(),
|
||||||
DaemonCommands::Logs { follow, lines } => cmd_logs(follow, lines),
|
DaemonCommands::Logs { follow, lines } => cmd_logs(follow, lines),
|
||||||
|
DaemonCommands::Start { tcp } => crate::daemon::run_start(tcp),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -262,6 +262,12 @@ pub enum DaemonCommands {
|
||||||
#[arg(short = 'n', long, default_value = "50")]
|
#[arg(short = 'n', long, default_value = "50")]
|
||||||
lines: usize,
|
lines: usize,
|
||||||
},
|
},
|
||||||
|
/// 启动 daemon
|
||||||
|
Start {
|
||||||
|
/// 同时监听 TCP 地址(如 127.0.0.1:9876)
|
||||||
|
#[arg(long)]
|
||||||
|
tcp: Option<String>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run() {
|
pub fn run() {
|
||||||
|
|
|
||||||
|
|
@ -13,13 +13,48 @@ use crate::config;
|
||||||
/// 当 WX_DAEMON_MODE 环境变量设置时,main() 调用此函数
|
/// 当 WX_DAEMON_MODE 环境变量设置时,main() 调用此函数
|
||||||
pub fn run() {
|
pub fn run() {
|
||||||
let rt = tokio::runtime::Runtime::new().expect("无法创建 tokio runtime");
|
let rt = tokio::runtime::Runtime::new().expect("无法创建 tokio runtime");
|
||||||
if let Err(e) = rt.block_on(async_run()) {
|
if let Err(e) = rt.block_on(start_daemon(None)) {
|
||||||
eprintln!("[daemon] 启动失败: {}", e);
|
eprintln!("[daemon] 启动失败: {}", e);
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn async_run() -> Result<()> {
|
/// 从 CLI `wx daemon start [--tcp ADDR]` 调用
|
||||||
|
///
|
||||||
|
/// 查找当前可执行文件路径,设置 WX_DAEMON_MODE=1,后台启动新进程。
|
||||||
|
pub fn run_start(tcp_addr: Option<String>) -> Result<()> {
|
||||||
|
let exe = std::env::current_exe()?;
|
||||||
|
let log = config::log_path();
|
||||||
|
|
||||||
|
let mut cmd = std::process::Command::new(&exe);
|
||||||
|
cmd.env("WX_DAEMON_MODE", "1");
|
||||||
|
if let Some(addr) = &tcp_addr {
|
||||||
|
cmd.env("WX_DAEMON_TCP_ADDR", addr);
|
||||||
|
}
|
||||||
|
// 日志重定向
|
||||||
|
let log_file = std::fs::OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(&log)?;
|
||||||
|
cmd.stdout(log_file.try_clone()?).stderr(log_file);
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
{
|
||||||
|
use std::os::unix::process::CommandExt;
|
||||||
|
unsafe { cmd.pre_exec(|| {
|
||||||
|
libc::setsid();
|
||||||
|
Ok(())
|
||||||
|
}) };
|
||||||
|
}
|
||||||
|
|
||||||
|
let child = cmd.spawn()?;
|
||||||
|
let pid = child.id();
|
||||||
|
eprintln!("[daemon] 已启动 daemon 进程 (PID {})", pid);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// daemon 核心启动逻辑(被 run() 和 WX_DAEMON_MODE 路径共享)
|
||||||
|
pub async fn start_daemon(tcp_addr: Option<String>) -> Result<()> {
|
||||||
// 确保工作目录存在
|
// 确保工作目录存在
|
||||||
let cli_dir = config::cli_dir();
|
let cli_dir = config::cli_dir();
|
||||||
tokio::fs::create_dir_all(&cli_dir).await?;
|
tokio::fs::create_dir_all(&cli_dir).await?;
|
||||||
|
|
@ -76,13 +111,21 @@ async fn async_run() -> Result<()> {
|
||||||
let _ = db.get("sns/sns.db").await;
|
let _ = db.get("sns/sns.db").await;
|
||||||
eprintln!("[daemon] 预热完成,联系人 {} 个", names.map.len());
|
eprintln!("[daemon] 预热完成,联系人 {} 个", names.map.len());
|
||||||
|
|
||||||
// 包一层内部 Arc:IPC 请求取 guard 后只做 Arc::clone(O(1)),
|
// 包一层内部 Arc
|
||||||
// 避免每次请求都全量 clone 几千个联系人的 HashMap。
|
|
||||||
// 用 tokio::sync::RwLock 允许 guard 跨 await(当前不跨,为未来 reload 留余地)。
|
|
||||||
let names_arc = Arc::new(tokio::sync::RwLock::new(Arc::new(names)));
|
let names_arc = Arc::new(tokio::sync::RwLock::new(Arc::new(names)));
|
||||||
|
|
||||||
|
// 检查环境变量中的 TCP 地址(WX_DAEMON_MODE 路径下通过 env 传入)
|
||||||
|
let effective_tcp_addr = tcp_addr.or_else(|| std::env::var("WX_DAEMON_TCP_ADDR").ok());
|
||||||
|
|
||||||
// 启动 IPC server(阻塞)
|
// 启动 IPC server(阻塞)
|
||||||
server::serve(Arc::clone(&db), Arc::clone(&names_arc)).await?;
|
server::serve(Arc::clone(&db), Arc::clone(&names_arc), effective_tcp_addr.as_deref()).await?;
|
||||||
|
|
||||||
|
// 正常退出时清理(signal 路径下由 cleanup_and_exit 处理,不会走到这里)
|
||||||
|
#[allow(unreachable_code)]
|
||||||
|
{
|
||||||
|
let _ = std::fs::remove_file(config::sock_path());
|
||||||
|
let _ = std::fs::remove_file(config::pid_path());
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -132,7 +175,9 @@ async fn setup_signal_handler() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
fn cleanup_and_exit() {
|
fn cleanup_and_exit() {
|
||||||
|
// 仅清理 local socket 文件,TCP 端口由 OS 自动回收
|
||||||
let _ = std::fs::remove_file(config::sock_path());
|
let _ = std::fs::remove_file(config::sock_path());
|
||||||
let _ = std::fs::remove_file(config::pid_path());
|
let _ = std::fs::remove_file(config::pid_path());
|
||||||
std::process::exit(0);
|
std::process::exit(0);
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,32 @@
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
|
||||||
|
|
||||||
use crate::ipc::{Request, Response};
|
use crate::transport::{self, Listener};
|
||||||
use super::cache::DbCache;
|
use super::cache::DbCache;
|
||||||
use super::query::Names;
|
use super::query::Names;
|
||||||
|
|
||||||
/// 启动 IPC server(Unix socket / Windows named pipe)
|
/// 启动 IPC server(Unix socket / Windows named pipe + 可选 TCP)
|
||||||
|
///
|
||||||
|
/// 当 `tcp_addr` 为 `Some` 时,同时监听 TCP 端口;daemon 在 local listener 退出时退出。
|
||||||
pub async fn serve(
|
pub async fn serve(
|
||||||
db: Arc<DbCache>,
|
db: Arc<DbCache>,
|
||||||
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
||||||
|
tcp_addr: Option<&str>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
// TCP 先启动为后台任务
|
||||||
|
if let Some(addr) = tcp_addr {
|
||||||
|
let socket_addr: std::net::SocketAddr = addr.parse().map_err(|e| {
|
||||||
|
anyhow::anyhow!("TCP 地址解析失败 '{}': {}", addr, e)
|
||||||
|
})?;
|
||||||
|
let db_tcp = Arc::clone(&db);
|
||||||
|
let names_tcp = Arc::clone(&names);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = serve_tcp(socket_addr, db_tcp, names_tcp).await {
|
||||||
|
eprintln!("[server] TCP 监听错误: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
serve_unix(db, names).await?;
|
serve_unix(db, names).await?;
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
|
|
@ -18,6 +34,28 @@ pub async fn serve(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn serve_tcp(
|
||||||
|
addr: std::net::SocketAddr,
|
||||||
|
db: Arc<DbCache>,
|
||||||
|
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let listener = transport::TcpListener::bind(addr).await?;
|
||||||
|
eprintln!("[server] 监听 TCP {}", addr);
|
||||||
|
|
||||||
|
// TcpListener::accept 返回 Pin<Box<dyn Future>>,需要 Box::pin 包装循环
|
||||||
|
let mut listener = listener;
|
||||||
|
loop {
|
||||||
|
let stream = listener.accept().await?;
|
||||||
|
let db2 = Arc::clone(&db);
|
||||||
|
let names2 = Arc::clone(&names);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = transport::handle_connection(stream, &db2, &names2).await {
|
||||||
|
eprintln!("[server] 连接处理错误: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
async fn serve_unix(
|
async fn serve_unix(
|
||||||
db: Arc<DbCache>,
|
db: Arc<DbCache>,
|
||||||
|
|
@ -47,42 +85,13 @@ async fn serve_unix(
|
||||||
let names2 = Arc::clone(&names);
|
let names2 = Arc::clone(&names);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = handle_connection_unix(stream, db2, names2).await {
|
if let Err(e) = transport::handle_connection(stream, &db2, &names2).await {
|
||||||
eprintln!("[server] 连接处理错误: {}", e);
|
eprintln!("[server] 连接处理错误: {}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
|
||||||
async fn handle_connection_unix(
|
|
||||||
stream: tokio::net::UnixStream,
|
|
||||||
db: Arc<DbCache>,
|
|
||||||
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let (reader, mut writer) = stream.into_split();
|
|
||||||
let mut lines = BufReader::new(reader).lines();
|
|
||||||
|
|
||||||
let line = match lines.next_line().await? {
|
|
||||||
Some(l) => l,
|
|
||||||
None => return Ok(()),
|
|
||||||
};
|
|
||||||
|
|
||||||
// 解析请求
|
|
||||||
let req: Request = match serde_json::from_str(&line) {
|
|
||||||
Ok(r) => r,
|
|
||||||
Err(e) => {
|
|
||||||
let resp = Response::err(format!("JSON 解析错误: {}", e));
|
|
||||||
writer.write_all(resp.to_json_line()?.as_bytes()).await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let resp = dispatch(req, &db, &names).await;
|
|
||||||
writer.write_all(resp.to_json_line()?.as_bytes()).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
async fn serve_windows(
|
async fn serve_windows(
|
||||||
db: Arc<DbCache>,
|
db: Arc<DbCache>,
|
||||||
|
|
@ -106,130 +115,9 @@ async fn serve_windows(
|
||||||
let names2 = Arc::clone(&names);
|
let names2 = Arc::clone(&names);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = handle_connection_windows(conn, db2, names2).await {
|
if let Err(e) = transport::handle_connection(conn, &db2, &names2).await {
|
||||||
eprintln!("[server] 连接处理错误: {}", e);
|
eprintln!("[server] 连接处理错误: {}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(windows)]
|
|
||||||
async fn handle_connection_windows(
|
|
||||||
conn: interprocess::local_socket::tokio::Stream,
|
|
||||||
db: Arc<DbCache>,
|
|
||||||
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let (reader, mut writer) = tokio::io::split(conn);
|
|
||||||
let mut lines = BufReader::new(reader).lines();
|
|
||||||
|
|
||||||
let line = match lines.next_line().await? {
|
|
||||||
Some(l) => l,
|
|
||||||
None => return Ok(()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let req: Request = match serde_json::from_str(&line) {
|
|
||||||
Ok(r) => r,
|
|
||||||
Err(e) => {
|
|
||||||
let resp = Response::err(format!("JSON 解析错误: {}", e));
|
|
||||||
writer.write_all(resp.to_json_line()?.as_bytes()).await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let resp = dispatch(req, &db, &names).await;
|
|
||||||
writer.write_all(resp.to_json_line()?.as_bytes()).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn dispatch(
|
|
||||||
req: Request,
|
|
||||||
db: &DbCache,
|
|
||||||
names: &tokio::sync::RwLock<Arc<Names>>,
|
|
||||||
) -> Response {
|
|
||||||
use crate::ipc::Request::*;
|
|
||||||
use super::query;
|
|
||||||
|
|
||||||
// 取 guard → O(1) clone Arc → 立即 drop 锁。后续 await 期间不持有锁,
|
|
||||||
// 多个并发 IPC 请求可以真正并行。Names 本身不可变(由 daemon 启动时
|
|
||||||
// 一次性构建),共享 Arc 即可。
|
|
||||||
let names_arc: Arc<Names> = {
|
|
||||||
let guard = names.read().await;
|
|
||||||
Arc::clone(&*guard)
|
|
||||||
};
|
|
||||||
|
|
||||||
match req {
|
|
||||||
Ping => Response::ok(serde_json::json!({ "pong": true })),
|
|
||||||
Sessions { limit } => {
|
|
||||||
match query::q_sessions(db, &names_arc, limit).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
History { chat, limit, offset, since, until, msg_type } => {
|
|
||||||
match query::q_history(db, &names_arc, &chat, limit, offset, since, until, msg_type).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Search { keyword, chats, limit, since, until, msg_type } => {
|
|
||||||
match query::q_search(db, &names_arc, &keyword, chats, limit, since, until, msg_type).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Contacts { query, limit } => {
|
|
||||||
match query::q_contacts(&names_arc, query.as_deref(), limit).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Unread { limit, filter } => {
|
|
||||||
match query::q_unread(db, &names_arc, limit, filter).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Members { chat } => {
|
|
||||||
match query::q_members(db, &names_arc, &chat).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
NewMessages { state, limit } => {
|
|
||||||
match query::q_new_messages(db, &names_arc, state, limit).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Favorites { limit, fav_type, query } => {
|
|
||||||
match query::q_favorites(db, limit, fav_type, query).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Stats { chat, since, until } => {
|
|
||||||
match query::q_stats(db, &names_arc, &chat, since, until).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SnsNotifications { limit, since, until, include_read } => {
|
|
||||||
match query::q_sns_notifications(db, &names_arc, limit, since, until, include_read).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SnsFeed { limit, since, until, user } => {
|
|
||||||
match query::q_sns_feed(db, &names_arc, limit, since, until, user.as_deref()).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SnsSearch { keyword, limit, since, until, user } => {
|
|
||||||
match query::q_sns_search(db, &names_arc, &keyword, limit, since, until, user.as_deref()).await {
|
|
||||||
Ok(v) => Response::ok(v),
|
|
||||||
Err(e) => Response::err(e.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue