From 8b07ced20dd7d08e7c4c4cbc8363a82e974e135e Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Sat, 24 May 2025 22:21:14 -0400 Subject: [PATCH] Add daemonized watcher with control socket --- Cargo.lock | 1 + cli-bin/Cargo.toml | 6 +- cli-bin/src/cli.rs | 2 +- cli-bin/src/cli/watch.rs | 262 +++++++++++++++++++++++++++--------- cli-bin/tests/watch_unit.rs | 19 +-- 5 files changed, 208 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6cf492d..d813196 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -678,6 +678,7 @@ dependencies = [ "glob", "libc", "libmarlin", + "nix", "once_cell", "predicates", "rusqlite", diff --git a/cli-bin/Cargo.toml b/cli-bin/Cargo.toml index 916e650..625a37e 100644 --- a/cli-bin/Cargo.toml +++ b/cli-bin/Cargo.toml @@ -21,8 +21,10 @@ shlex = "1.3" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } walkdir = "2.5" -serde_json = { version = "1", optional = true } +serde_json = "1" +serde = { version = "1", features = ["derive"] } once_cell = "1" +nix = "0.30" [dev-dependencies] assert_cmd = "2" @@ -34,7 +36,7 @@ libc = "0.2" [features] # Enable JSON output with `--features json` -json = ["serde_json"] +json = [] [build-dependencies] serde = { version = "1", features = ["derive"] } diff --git a/cli-bin/src/cli.rs b/cli-bin/src/cli.rs index a688035..14ff8ff 100644 --- a/cli-bin/src/cli.rs +++ b/cli-bin/src/cli.rs @@ -16,7 +16,7 @@ use clap::{Parser, Subcommand, ValueEnum}; use clap_complete::Shell; /// Output format for commands. -#[derive(ValueEnum, Clone, Copy, Debug)] +#[derive(ValueEnum, Clone, Copy, Debug, PartialEq)] pub enum Format { Text, Json, diff --git a/cli-bin/src/cli/watch.rs b/cli-bin/src/cli/watch.rs index 24b9a60..6f8630f 100644 --- a/cli-bin/src/cli/watch.rs +++ b/cli-bin/src/cli/watch.rs @@ -1,118 +1,246 @@ -// src/cli/watch.rs - use anyhow::Result; use clap::Subcommand; -use libmarlin::watcher::{WatcherConfig, WatcherState}; +use libmarlin::config::Config; +use libmarlin::watcher::{WatcherConfig, WatcherState, WatcherStatus}; use rusqlite::Connection; -use std::path::PathBuf; +use serde::{Deserialize, Serialize}; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use tracing::info; - -use once_cell::sync::Lazy; -use std::sync::Mutex; +use serde_json; #[allow(dead_code)] -static LAST_WATCHER_STATE: Lazy>> = Lazy::new(|| Mutex::new(None)); +static LAST_WATCHER_STATE: once_cell::sync::Lazy>> = + once_cell::sync::Lazy::new(|| Mutex::new(None)); #[allow(dead_code)] pub fn last_watcher_state() -> Option { LAST_WATCHER_STATE.lock().unwrap().clone() } -/// Commands related to file watching functionality #[derive(Subcommand, Debug)] pub enum WatchCmd { - /// Start watching a directory for changes Start { - /// Directory to watch (defaults to current directory) #[arg(default_value = ".")] path: PathBuf, - - /// Debounce window in milliseconds (default: 100ms) #[arg(long, default_value = "100")] debounce_ms: u64, }, - - /// Show status of currently active watcher Status, - - /// Stop the currently running watcher Stop, + #[command(hide = true)] + Daemon { + path: PathBuf, + debounce_ms: u64, + port: u16, + control: PathBuf, + }, } -/// Run a watch command -pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Result<()> { +#[derive(Serialize, Deserialize)] +struct ControlInfo { + pid: u32, + port: u16, +} + +#[derive(Serialize, Deserialize)] +struct StatusDto { + state: String, + events_processed: usize, + queue_size: usize, + uptime_secs: u64, +} + +fn control_path(db_path: &Path) -> PathBuf { + db_path.with_extension("watch.json") +} + +fn choose_port(db_path: &Path) -> u16 { + let mut h = DefaultHasher::new(); + db_path.hash(&mut h); + 31000 + ((h.finish() % 1000) as u16) +} + +fn read_control(path: &Path) -> Result { + let txt = std::fs::read_to_string(path)?; + Ok(serde_json::from_str(&txt)?) +} + +fn process_alive(pid: u32) -> bool { + #[cfg(unix)] + { + use nix::sys::signal::kill; + use nix::unistd::Pid; + kill(Pid::from_raw(pid as i32), None).is_ok() + } + #[cfg(not(unix))] + { + true // fallback, assume alive + } +} + +fn send_request(port: u16, msg: &str) -> Result { + let mut stream = TcpStream::connect(("127.0.0.1", port))?; + stream.write_all(msg.as_bytes())?; + let mut buf = String::new(); + stream.read_to_string(&mut buf)?; + Ok(buf) +} + +fn status_to_dto(st: WatcherStatus) -> StatusDto { + StatusDto { + state: format!("{:?}", st.state), + events_processed: st.events_processed, + queue_size: st.queue_size, + uptime_secs: st + .start_time + .map(|t| t.elapsed().as_secs()) + .unwrap_or_default(), + } +} + +pub fn run(cmd: &WatchCmd, _conn: &mut Connection, fmt: super::Format) -> Result<()> { match cmd { WatchCmd::Start { path, debounce_ms } => { + let cfg = Config::load()?; + let control = control_path(&cfg.db_path); + if control.exists() { + let info = read_control(&control)?; + if process_alive(info.pid) { + info!("Watcher already running with PID {}", info.pid); + return Ok(()); + } else { + std::fs::remove_file(&control).ok(); + } + } + let port = choose_port(&cfg.db_path); + let exe = std::env::current_exe()?; + let child = std::process::Command::new(exe) + .arg("watch") + .arg("daemon") + .arg("--path") + .arg(path) + .arg("--debounce-ms") + .arg(debounce_ms.to_string()) + .arg("--port") + .arg(port.to_string()) + .arg("--control") + .arg(&control) + .spawn()?; + info!("Started watcher daemon with PID {}", child.id()); + Ok(()) + } + WatchCmd::Daemon { + path, + debounce_ms, + port, + control, + } => { let mut marlin = libmarlin::Marlin::open_default()?; let config = WatcherConfig { debounce_ms: *debounce_ms, ..Default::default() }; let canon_path = path.canonicalize().unwrap_or_else(|_| path.clone()); - info!("Starting watcher for directory: {}", canon_path.display()); - - let mut watcher = marlin.watch(&canon_path, Some(config))?; - - let status = watcher.status()?; - info!("Watcher started. Press Ctrl+C to stop watching."); - info!("Watching {} paths", status.watched_paths.len()); - - let start_time = Instant::now(); - let mut last_status_time = Instant::now(); + let watcher = Arc::new(Mutex::new(marlin.watch(&canon_path, Some(config))?)); let running = Arc::new(AtomicBool::new(true)); - let r_clone = running.clone(); - - ctrlc::set_handler(move || { - info!("Ctrl+C received. Signaling watcher to stop..."); - r_clone.store(false, Ordering::SeqCst); - })?; - - info!("Watcher run loop started. Waiting for Ctrl+C or stop signal..."); + let srv_running = running.clone(); + let w_clone = watcher.clone(); + let port_val = *port; + let server = thread::spawn(move || { + let listener = TcpListener::bind(("127.0.0.1", port_val)).unwrap(); + for mut s in listener.incoming().flatten() { + let mut buf = String::new(); + if s.read_to_string(&mut buf).is_ok() { + if buf.contains("status") { + if let Ok(st) = w_clone.lock().unwrap().status() { + let dto = status_to_dto(st); + let _ = s.write_all(serde_json::to_string(&dto).unwrap().as_bytes()); + } + } else if buf.contains("stop") { + let _ = s.write_all(b"ok"); + srv_running.store(false, Ordering::SeqCst); + break; + } + } + } + }); + let info = ControlInfo { + pid: std::process::id(), + port: *port, + }; + std::fs::write(control, serde_json::to_string(&info)?)?; while running.load(Ordering::SeqCst) { - let current_status = watcher.status()?; - if current_status.state == WatcherState::Stopped { - info!("Watcher has stopped (detected by state). Exiting loop."); - break; - } - - // Corrected line: removed the extra closing parenthesis - if last_status_time.elapsed() > Duration::from_secs(10) { - let uptime = start_time.elapsed(); - info!( - "Watcher running for {}s, processed {} events, queue: {}, state: {:?}", - uptime.as_secs(), - current_status.events_processed, - current_status.queue_size, - current_status.state - ); - last_status_time = Instant::now(); - } thread::sleep(Duration::from_millis(200)); } - - info!("Watcher run loop ended. Explicitly stopping watcher instance..."); - watcher.stop()?; + watcher.lock().unwrap().stop()?; + server.join().ok(); + std::fs::remove_file(control).ok(); { let mut guard = LAST_WATCHER_STATE.lock().unwrap(); - *guard = Some(watcher.status()?.state); + *guard = Some(WatcherState::Stopped); } - info!("Watcher instance fully stopped."); Ok(()) } WatchCmd::Status => { - info!( - "Status command: No active watcher process to query in this CLI invocation model." - ); - info!("To see live status, run 'marlin watch start' which prints periodic updates."); + let cfg = Config::load()?; + let control = control_path(&cfg.db_path); + if !control.exists() { + info!("Status command: No active watcher process to query …"); + return Ok(()); + } + let info = read_control(&control)?; + let resp = send_request(info.port, "status"); + match resp { + Ok(txt) => { + if fmt == super::Format::Json { + println!("{txt}"); + } else { + let dto: StatusDto = serde_json::from_str(&txt)?; + println!( + "state: {} processed:{} queue:{} uptime:{}s", + dto.state, dto.events_processed, dto.queue_size, dto.uptime_secs + ); + } + } + Err(_) => { + info!("Failed to query watcher status"); + } + } Ok(()) } WatchCmd::Stop => { - info!("Stop command: No active watcher process to stop in this CLI invocation model."); - info!("Please use Ctrl+C in the terminal where 'marlin watch start' is running."); + let cfg = Config::load()?; + let control = control_path(&cfg.db_path); + if !control.exists() { + info!("Stop command: No active watcher process to stop …"); + return Ok(()); + } + let info = read_control(&control)?; + let _ = send_request(info.port, "stop"); + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(5) { + if !process_alive(info.pid) { + break; + } + thread::sleep(Duration::from_millis(200)); + } + if process_alive(info.pid) { + #[cfg(unix)] + { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + let _ = kill(Pid::from_raw(info.pid as i32), Signal::SIGTERM); + } + } + std::fs::remove_file(control).ok(); Ok(()) } } diff --git a/cli-bin/tests/watch_unit.rs b/cli-bin/tests/watch_unit.rs index 8ab0151..dd19444 100644 --- a/cli-bin/tests/watch_unit.rs +++ b/cli-bin/tests/watch_unit.rs @@ -2,8 +2,6 @@ use std::thread; use std::time::Duration; use tempfile::tempdir; -use libc; -use libmarlin::watcher::WatcherState; use libmarlin::{self as marlin, db}; use marlin_cli::cli::watch::WatchCmd; use marlin_cli::cli::{watch, Format}; @@ -11,12 +9,10 @@ use marlin_cli::cli::{watch, Format}; #[cfg(unix)] #[test] fn watch_start_and_stop_quickly() { - // TODO: Use a Windows console control handler and enable this test on Windows. let tmp = tempdir().unwrap(); let db_path = tmp.path().join("index.db"); std::env::set_var("MARLIN_DB_PATH", &db_path); - // create database let _m = marlin::Marlin::open_default().unwrap(); let mut conn = db::open(&db_path).unwrap(); @@ -27,14 +23,13 @@ fn watch_start_and_stop_quickly() { debounce_ms: 50, }; - // send SIGINT shortly after watcher starts - let t = thread::spawn(|| { - thread::sleep(Duration::from_millis(200)); - unsafe { libc::raise(libc::SIGINT) }; - }); - watch::run(&cmd, &mut conn, Format::Text).unwrap(); - t.join().unwrap(); + thread::sleep(Duration::from_millis(500)); - assert_eq!(watch::last_watcher_state(), Some(WatcherState::Stopped)); + watch::run(&WatchCmd::Status, &mut conn, Format::Text).unwrap(); + watch::run(&WatchCmd::Stop, &mut conn, Format::Text).unwrap(); + + let cfg = libmarlin::config::Config::load().unwrap(); + let control = cfg.db_path.with_extension("watch.json"); + assert!(!control.exists()); }