diff --git a/src/cli/contacts.rs b/src/cli/contacts.rs index e52a30b..bc49dc7 100644 --- a/src/cli/contacts.rs +++ b/src/cli/contacts.rs @@ -3,8 +3,8 @@ use crate::ipc::Request; use super::transport; use super::output::{resolve, print_value}; -pub fn cmd_contacts(query: Option, limit: usize, json: bool) -> Result<()> { - let resp = transport::send(Request::Contacts { query, limit })?; +pub fn cmd_contacts(query: Option, limit: usize, json: bool, tcp_addr: Option<&str>) -> Result<()> { + let resp = transport::send(Request::Contacts { query, limit }, tcp_addr)?; let contacts = resp.data.get("contacts") .cloned() .unwrap_or(serde_json::Value::Array(vec![])); diff --git a/src/cli/daemon_cmd.rs b/src/cli/daemon_cmd.rs index 4b539d6..03f1127 100644 --- a/src/cli/daemon_cmd.rs +++ b/src/cli/daemon_cmd.rs @@ -3,22 +3,26 @@ use crate::config; use crate::cli::DaemonCommands; use crate::cli::transport; -pub fn cmd_daemon(cmd: DaemonCommands) -> Result<()> { +pub fn cmd_daemon(cmd: DaemonCommands, tcp_addr: Option<&str>) -> Result<()> { match cmd { - DaemonCommands::Status => cmd_status(), + DaemonCommands::Status => cmd_status(tcp_addr), DaemonCommands::Stop => cmd_stop(), DaemonCommands::Logs { follow, lines } => cmd_logs(follow, lines), DaemonCommands::Start { tcp } => crate::daemon::run_start(tcp), } } -fn cmd_status() -> Result<()> { - if transport::is_alive() { +fn cmd_status(tcp_addr: Option<&str>) -> Result<()> { + if transport::is_alive(tcp_addr) { let pid_path = config::pid_path(); let pid = std::fs::read_to_string(&pid_path) .map(|s| s.trim().to_string()) .unwrap_or_else(|_| "?".into()); - println!("wx-daemon 运行中 (PID {})", pid); + if let Some(addr) = tcp_addr { + println!("wx-daemon 运行中 (TCP {})", addr); + } else { + println!("wx-daemon 运行中 (PID {})", pid); + } } else { println!("wx-daemon 未运行"); } diff --git a/src/cli/export.rs b/src/cli/export.rs index 85a6989..9d09748 100644 --- a/src/cli/export.rs +++ b/src/cli/export.rs @@ -10,6 +10,7 @@ pub fn cmd_export( limit: usize, format: String, output: Option, + tcp_addr: Option<&str>, ) -> Result<()> { let since_ts = since.as_deref().map(parse_time).transpose()?; let until_ts = until.as_deref().map(parse_time_end).transpose()?; @@ -23,7 +24,7 @@ pub fn cmd_export( msg_type: None, }; - let resp = transport::send(req)?; + let resp = transport::send(req, tcp_addr)?; let messages = resp.data["messages"].as_array().cloned().unwrap_or_default(); let chat_name = resp.data["chat"].as_str().unwrap_or("").to_string(); let is_group = resp.data["is_group"].as_bool().unwrap_or(false); diff --git a/src/cli/favorites.rs b/src/cli/favorites.rs index 84db1d6..3813614 100644 --- a/src/cli/favorites.rs +++ b/src/cli/favorites.rs @@ -19,9 +19,10 @@ pub fn cmd_favorites( fav_type: Option, query: Option, json: bool, + tcp_addr: Option<&str>, ) -> Result<()> { let type_val = fav_type.as_deref().and_then(parse_fav_type); - let resp = transport::send(Request::Favorites { limit, fav_type: type_val, query })?; + let resp = transport::send(Request::Favorites { limit, fav_type: type_val, query }, tcp_addr)?; let items = resp.data.get("items") .cloned() .unwrap_or(serde_json::Value::Array(vec![])); diff --git a/src/cli/history.rs b/src/cli/history.rs index b5fabb7..aeaadb3 100644 --- a/src/cli/history.rs +++ b/src/cli/history.rs @@ -11,13 +11,14 @@ pub fn cmd_history( until: Option, msg_type: Option, json: bool, + tcp_addr: Option<&str>, ) -> Result<()> { let since_ts = since.as_deref().map(parse_time).transpose()?; let until_ts = until.as_deref().map(parse_time_end).transpose()?; let type_val = msg_type.as_deref().and_then(parse_msg_type); let req = Request::History { chat, limit, offset, since: since_ts, until: until_ts, msg_type: type_val }; - let resp = transport::send(req)?; + let resp = transport::send(req, tcp_addr)?; let msgs = resp.data.get("messages") .cloned() diff --git a/src/cli/members.rs b/src/cli/members.rs index 2579fd1..37012b0 100644 --- a/src/cli/members.rs +++ b/src/cli/members.rs @@ -3,8 +3,8 @@ use crate::ipc::Request; use super::transport; use super::output::{resolve, print_value}; -pub fn cmd_members(chat: String, json: bool) -> Result<()> { - let resp = transport::send(Request::Members { chat })?; +pub fn cmd_members(chat: String, json: bool, tcp_addr: Option<&str>) -> Result<()> { + let resp = transport::send(Request::Members { chat }, tcp_addr)?; let members = resp.data.get("members") .cloned() .unwrap_or(serde_json::Value::Array(vec![])); diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 786b62b..1815f22 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -23,6 +23,10 @@ use clap::{Parser, Subcommand}; #[derive(Parser)] #[command(name = "wx", version = env!("CARGO_PKG_VERSION"), about = "wx — 微信本地数据 CLI")] pub struct Cli { + /// 通过 TCP 连接 daemon(如 127.0.0.1:9876) + #[arg(long)] + pub tcp: Option, + #[command(subcommand)] command: Commands, } @@ -272,44 +276,45 @@ pub enum DaemonCommands { pub fn run() { let cli = Cli::parse(); - if let Err(e) = dispatch(cli) { + let tcp_addr = cli.tcp.clone(); + if let Err(e) = dispatch(cli, tcp_addr.as_deref()) { eprintln!("错误: {}", e); std::process::exit(1); } } -fn dispatch(cli: Cli) -> Result<()> { +fn dispatch(cli: Cli, tcp_addr: Option<&str>) -> Result<()> { match cli.command { Commands::Init { force } => init::cmd_init(force), - Commands::Sessions { limit, json } => sessions::cmd_sessions(limit, json), + Commands::Sessions { limit, json } => sessions::cmd_sessions(limit, json, tcp_addr), Commands::History { chat, limit, offset, since, until, msg_type, json } => { - history::cmd_history(chat, limit, offset, since, until, msg_type, json) + history::cmd_history(chat, limit, offset, since, until, msg_type, json, tcp_addr) } Commands::Search { keyword, chats, limit, since, until, msg_type, json } => { - search::cmd_search(keyword, chats, limit, since, until, msg_type, json) + search::cmd_search(keyword, chats, limit, since, until, msg_type, json, tcp_addr) } - Commands::Contacts { query, limit, json } => contacts::cmd_contacts(query, limit, json), + Commands::Contacts { query, limit, json } => contacts::cmd_contacts(query, limit, json, tcp_addr), Commands::Export { chat, since, until, limit, format, output } => { - export::cmd_export(chat, since, until, limit, format, output) + export::cmd_export(chat, since, until, limit, format, output, tcp_addr) } - Commands::Unread { limit, filter, json } => unread::cmd_unread(limit, filter, json), - Commands::Members { chat, json } => members::cmd_members(chat, json), - Commands::NewMessages { limit, json } => new_messages::cmd_new_messages(limit, json), + Commands::Unread { limit, filter, json } => unread::cmd_unread(limit, filter, json, tcp_addr), + Commands::Members { chat, json } => members::cmd_members(chat, json, tcp_addr), + Commands::NewMessages { limit, json } => new_messages::cmd_new_messages(limit, json, tcp_addr), Commands::Stats { chat, since, until, json } => { - stats::cmd_stats(chat, since, until, json) + stats::cmd_stats(chat, since, until, json, tcp_addr) } Commands::Favorites { limit, fav_type, query, json } => { - favorites::cmd_favorites(limit, fav_type, query, json) + favorites::cmd_favorites(limit, fav_type, query, json, tcp_addr) } Commands::SnsNotifications { limit, since, until, include_read, json } => { - sns_notifications::cmd_sns_notifications(limit, since, until, include_read, json) + sns_notifications::cmd_sns_notifications(limit, since, until, include_read, json, tcp_addr) } Commands::SnsFeed { limit, since, until, user, json } => { - sns_feed::cmd_sns_feed(limit, since, until, user, json) + sns_feed::cmd_sns_feed(limit, since, until, user, json, tcp_addr) } Commands::SnsSearch { keyword, limit, since, until, user, json } => { - sns_search::cmd_sns_search(keyword, limit, since, until, user, json) + sns_search::cmd_sns_search(keyword, limit, since, until, user, json, tcp_addr) } - Commands::Daemon { cmd } => daemon_cmd::cmd_daemon(cmd), + Commands::Daemon { cmd } => daemon_cmd::cmd_daemon(cmd, tcp_addr), } } diff --git a/src/cli/new_messages.rs b/src/cli/new_messages.rs index b847210..0c05916 100644 --- a/src/cli/new_messages.rs +++ b/src/cli/new_messages.rs @@ -37,9 +37,9 @@ fn save_state(new_state: &HashMap) -> Result<()> { Ok(()) } -pub fn cmd_new_messages(limit: usize, json: bool) -> Result<()> { +pub fn cmd_new_messages(limit: usize, json: bool, tcp_addr: Option<&str>) -> Result<()> { let state = load_state(); - let resp = transport::send(Request::NewMessages { state, limit })?; + let resp = transport::send(Request::NewMessages { state, limit }, tcp_addr)?; // 保存 daemon 返回的 new_state if let Some(obj) = resp.data.get("new_state").and_then(|v| v.as_object()) { diff --git a/src/cli/search.rs b/src/cli/search.rs index e6f3d00..c2ab254 100644 --- a/src/cli/search.rs +++ b/src/cli/search.rs @@ -12,6 +12,7 @@ pub fn cmd_search( until: Option, msg_type: Option, json: bool, + tcp_addr: Option<&str>, ) -> Result<()> { let since_ts = since.as_deref().map(parse_time).transpose()?; let until_ts = until.as_deref().map(parse_time_end).transpose()?; @@ -27,7 +28,7 @@ pub fn cmd_search( msg_type: type_val, }; - let resp = transport::send(req)?; + let resp = transport::send(req, tcp_addr)?; let results = resp.data.get("results") .cloned() .unwrap_or(serde_json::Value::Array(vec![])); diff --git a/src/cli/sessions.rs b/src/cli/sessions.rs index 9ccadb8..f9e3643 100644 --- a/src/cli/sessions.rs +++ b/src/cli/sessions.rs @@ -3,8 +3,8 @@ use crate::ipc::Request; use super::transport; use super::output::{resolve, print_value}; -pub fn cmd_sessions(limit: usize, json: bool) -> Result<()> { - let resp = transport::send(Request::Sessions { limit })?; +pub fn cmd_sessions(limit: usize, json: bool, tcp_addr: Option<&str>) -> Result<()> { + let resp = transport::send(Request::Sessions { limit }, tcp_addr)?; let data = resp.data.get("sessions") .cloned() .unwrap_or(serde_json::Value::Array(vec![])); diff --git a/src/cli/sns_feed.rs b/src/cli/sns_feed.rs index afb30a5..45ae618 100644 --- a/src/cli/sns_feed.rs +++ b/src/cli/sns_feed.rs @@ -10,6 +10,7 @@ pub fn cmd_sns_feed( until: Option, user: Option, json: bool, + tcp_addr: Option<&str>, ) -> Result<()> { let since_ts = since.as_deref().map(parse_time).transpose()?; let until_ts = until.as_deref().map(parse_time_end).transpose()?; @@ -20,7 +21,7 @@ pub fn cmd_sns_feed( until: until_ts, user, }; - let resp = transport::send(req)?; + let resp = transport::send(req, tcp_addr)?; let data = resp.data.get("posts") .cloned() .unwrap_or(serde_json::Value::Array(vec![])); diff --git a/src/cli/sns_notifications.rs b/src/cli/sns_notifications.rs index 42fa30f..fc631fc 100644 --- a/src/cli/sns_notifications.rs +++ b/src/cli/sns_notifications.rs @@ -10,6 +10,7 @@ pub fn cmd_sns_notifications( until: Option, include_read: bool, json: bool, + tcp_addr: Option<&str>, ) -> Result<()> { let since_ts = since.as_deref().map(parse_time).transpose()?; let until_ts = until.as_deref().map(parse_time_end).transpose()?; @@ -20,7 +21,7 @@ pub fn cmd_sns_notifications( until: until_ts, include_read, }; - let resp = transport::send(req)?; + let resp = transport::send(req, tcp_addr)?; let data = resp.data.get("notifications") .cloned() .unwrap_or(serde_json::Value::Array(vec![])); diff --git a/src/cli/sns_search.rs b/src/cli/sns_search.rs index 1ed4bda..5e8141e 100644 --- a/src/cli/sns_search.rs +++ b/src/cli/sns_search.rs @@ -11,6 +11,7 @@ pub fn cmd_sns_search( until: Option, user: Option, json: bool, + tcp_addr: Option<&str>, ) -> Result<()> { let since_ts = since.as_deref().map(parse_time).transpose()?; let until_ts = until.as_deref().map(parse_time_end).transpose()?; @@ -22,7 +23,7 @@ pub fn cmd_sns_search( until: until_ts, user, }; - let resp = transport::send(req)?; + let resp = transport::send(req, tcp_addr)?; let data = resp.data.get("posts") .cloned() .unwrap_or(serde_json::Value::Array(vec![])); diff --git a/src/cli/stats.rs b/src/cli/stats.rs index 2e9a293..1db5156 100644 --- a/src/cli/stats.rs +++ b/src/cli/stats.rs @@ -9,10 +9,11 @@ pub fn cmd_stats( since: Option, until: Option, json: bool, + tcp_addr: Option<&str>, ) -> Result<()> { let since_ts = since.as_deref().map(parse_time).transpose()?; let until_ts = until.as_deref().map(parse_time_end).transpose()?; - let resp = transport::send(Request::Stats { chat, since: since_ts, until: until_ts })?; + let resp = transport::send(Request::Stats { chat, since: since_ts, until: until_ts }, tcp_addr)?; print_value(&resp.data, &resolve(json)) } diff --git a/src/cli/transport.rs b/src/cli/transport.rs index ab62da5..a2d6681 100644 --- a/src/cli/transport.rs +++ b/src/cli/transport.rs @@ -1,14 +1,21 @@ use anyhow::{bail, Context, Result}; use std::io::{BufRead, BufReader, Write}; +use std::net::TcpStream; use std::time::Duration; use crate::config; use crate::ipc::{Request, Response}; const STARTUP_TIMEOUT_SECS: u64 = 15; +const TCP_CONNECT_TIMEOUT_SECS: u64 = 15; +const TCP_RW_TIMEOUT_SECS: u64 = 120; /// 检查 daemon 是否存活 -pub fn is_alive() -> bool { +pub fn is_alive(tcp_addr: Option<&str>) -> bool { + if let Some(addr) = tcp_addr { + return is_alive_tcp(addr); + } + #[cfg(unix)] { use std::os::unix::net::UnixStream; @@ -52,49 +59,60 @@ pub fn is_alive() -> bool { } } +/// TCP liveness check: send ping via TCP, return true if pong received +pub fn is_alive_tcp(addr: &str) -> bool { + let tcp_addr = match addr.parse() { + Ok(a) => a, + Err(_) => return false, + }; + let mut stream = match TcpStream::connect_timeout( + &tcp_addr, + Duration::from_secs(TCP_CONNECT_TIMEOUT_SECS), + ) { + Ok(s) => s, + Err(_) => return false, + }; + let _ = stream.set_read_timeout(Some(Duration::from_secs(2))); + let _ = stream.set_write_timeout(Some(Duration::from_secs(2))); + + let req = serde_json::json!({"cmd": "ping"}); + if write!(stream, "{}\n", req).is_err() { + return false; + } + let mut reader = BufReader::new(stream); + let mut line = String::new(); + if reader.read_line(&mut line).is_err() { + return false; + } + serde_json::from_str::(&line) + .ok() + .and_then(|v| v.get("pong").and_then(|p| p.as_bool())) + .unwrap_or(false) +} + /// 确保 daemon 运行,必要时自动启动 -pub fn ensure_daemon() -> Result<()> { - if is_alive() { +/// 当指定 tcp_addr 时,不会自动启动 daemon(用户显式选择了 TCP 模式) +pub fn ensure_daemon(tcp_addr: Option<&str>) -> Result<()> { + if is_alive(tcp_addr) { return Ok(()); } + + // TCP 模式下不自动启动 daemon,直接报错 + if tcp_addr.is_some() { + let addr = tcp_addr.unwrap(); + bail!( + "无法连接到 TCP daemon ({}):{}\n请确认 daemon 已通过 `wx daemon start --tcp {}` 启动", + addr, + std::io::Error::last_os_error(), + addr, + ); + } + eprintln!("启动 wx-daemon..."); start_daemon()?; Ok(()) } -/// 启动 daemon 前检查 `~/.wx-cli/` 可写,给出比"超时"更明确的错误。 -/// -/// 典型坑:旧版本 `sudo wx init` 把目录留成 root 属主,非 root 的 daemon -/// 连 socket/log 都建不了,会静默失败 15s 超时。 -fn preflight_cli_dir_writable() -> Result<()> { - let cli_dir = config::cli_dir(); - std::fs::create_dir_all(&cli_dir) - .with_context(|| format!("创建 {} 失败", cli_dir.display()))?; - - let probe = cli_dir.join(".daemon_probe"); - match std::fs::File::create(&probe) { - Ok(_) => { - let _ = std::fs::remove_file(&probe); - Ok(()) - } - Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => { - let dir = cli_dir.display(); - if cfg!(unix) { - bail!( - "无法写入 {dir}(权限不足)\n\n\ - 这通常是老版本的 `sudo wx init` 把目录属主留成了 root。\n\ - 修复:\n\n \ - sudo chown -R $(whoami) {dir}\n\n\ - (新版已修复此问题,下次 init 不会再发生)", - ) - } else { - bail!("无法写入 {dir}: {e}") - } - } - Err(e) => bail!("无法写入 {}: {}", cli_dir.display(), e), - } -} - /// 启动 daemon 进程(自身二进制,设置 WX_DAEMON_MODE=1) fn start_daemon() -> Result<()> { let exe = std::env::current_exe().context("无法获取当前可执行文件路径")?; @@ -155,7 +173,7 @@ fn start_daemon() -> Result<()> { let deadline = std::time::Instant::now() + Duration::from_secs(STARTUP_TIMEOUT_SECS); while std::time::Instant::now() < deadline { std::thread::sleep(Duration::from_millis(300)); - if is_alive() { + if is_alive(None) { return Ok(()); } } @@ -167,9 +185,46 @@ fn start_daemon() -> Result<()> { ) } +/// 启动 daemon 前检查 `~/.wx-cli/` 可写,给出比"超时"更明确的错误。 +/// +/// 典型坑:旧版本 `sudo wx init` 把目录留成 root 属主,非 root 的 daemon +/// 连 socket/log 都建不了,会静默失败 15s 超时。 +fn preflight_cli_dir_writable() -> Result<()> { + let cli_dir = config::cli_dir(); + std::fs::create_dir_all(&cli_dir) + .with_context(|| format!("创建 {} 失败", cli_dir.display()))?; + + let probe = cli_dir.join(".daemon_probe"); + match std::fs::File::create(&probe) { + Ok(_) => { + let _ = std::fs::remove_file(&probe); + Ok(()) + } + Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => { + let dir = cli_dir.display(); + if cfg!(unix) { + bail!( + "无法写入 {dir}(权限不足)\n\n\ + 这通常是老版本的 `sudo wx init` 把目录属主留成了 root。\n\ + 修复:\n\n \ + sudo chown -R $(whoami) {dir}\n\n\ + (新版已修复此问题,下次 init 不会再发生)", + ) + } else { + bail!("无法写入 {dir}: {e}") + } + } + Err(e) => bail!("无法写入 {}: {}", cli_dir.display(), e), + } +} + /// 向 daemon 发送请求并返回响应 -pub fn send(req: Request) -> Result { - ensure_daemon()?; +pub fn send(req: Request, tcp_addr: Option<&str>) -> Result { + if let Some(addr) = tcp_addr { + return send_tcp(req, addr); + } + + ensure_daemon(None)?; #[cfg(unix)] { @@ -185,6 +240,38 @@ pub fn send(req: Request) -> Result { } } +/// 通过 TCP 发送请求并返回响应 +pub fn send_tcp(req: Request, addr: &str) -> Result { + let mut stream = TcpStream::connect_timeout( + &addr.parse().context("TCP 地址格式无效")?, + Duration::from_secs(TCP_CONNECT_TIMEOUT_SECS), + ) + .context(format!("连接 TCP daemon ({}) 失败", addr))?; + + stream + .set_read_timeout(Some(Duration::from_secs(TCP_RW_TIMEOUT_SECS))) + .ok(); + stream + .set_write_timeout(Some(Duration::from_secs(TCP_RW_TIMEOUT_SECS))) + .ok(); + + let req_str = serde_json::to_string(&req)? + "\n"; + stream.write_all(req_str.as_bytes())?; + + let mut line = String::new(); + let mut reader = BufReader::new(&stream); + reader.read_line(&mut line)?; + + let resp: Response = serde_json::from_str(&line) + .context("解析 daemon 响应失败")?; + + if !resp.ok { + bail!("{}", resp.error.as_deref().unwrap_or("未知错误")); + } + + Ok(resp) +} + #[cfg(unix)] fn send_unix(req: Request) -> Result { use std::os::unix::net::UnixStream; diff --git a/src/cli/unread.rs b/src/cli/unread.rs index 031700c..8e7ef31 100644 --- a/src/cli/unread.rs +++ b/src/cli/unread.rs @@ -3,14 +3,14 @@ use crate::ipc::Request; use super::transport; use super::output::{resolve, print_value}; -pub fn cmd_unread(limit: usize, filter: Vec, json: bool) -> Result<()> { +pub fn cmd_unread(limit: usize, filter: Vec, json: bool, tcp_addr: Option<&str>) -> Result<()> { // 空或含 "all" 视为不过滤;其他值已被 clap value_parser 验证过,直接透传给 daemon。 let filter_vec = if filter.is_empty() || filter.iter().any(|s| s == "all") { None } else { Some(filter) }; - let resp = transport::send(Request::Unread { limit, filter: filter_vec })?; + let resp = transport::send(Request::Unread { limit, filter: filter_vec }, tcp_addr)?; let data = resp.data.get("sessions") .cloned() .unwrap_or(serde_json::Value::Array(vec![]));