Add daemonized watcher with control socket

This commit is contained in:
thePR0M3TH3AN
2025-05-24 22:21:14 -04:00
parent bad08cbefb
commit 8b07ced20d
5 changed files with 208 additions and 82 deletions

1
Cargo.lock generated
View File

@@ -678,6 +678,7 @@ dependencies = [
"glob", "glob",
"libc", "libc",
"libmarlin", "libmarlin",
"nix",
"once_cell", "once_cell",
"predicates", "predicates",
"rusqlite", "rusqlite",

View File

@@ -21,8 +21,10 @@ shlex = "1.3"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
walkdir = "2.5" walkdir = "2.5"
serde_json = { version = "1", optional = true } serde_json = "1"
serde = { version = "1", features = ["derive"] }
once_cell = "1" once_cell = "1"
nix = "0.30"
[dev-dependencies] [dev-dependencies]
assert_cmd = "2" assert_cmd = "2"
@@ -34,7 +36,7 @@ libc = "0.2"
[features] [features]
# Enable JSON output with `--features json` # Enable JSON output with `--features json`
json = ["serde_json"] json = []
[build-dependencies] [build-dependencies]
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }

View File

@@ -16,7 +16,7 @@ use clap::{Parser, Subcommand, ValueEnum};
use clap_complete::Shell; use clap_complete::Shell;
/// Output format for commands. /// Output format for commands.
#[derive(ValueEnum, Clone, Copy, Debug)] #[derive(ValueEnum, Clone, Copy, Debug, PartialEq)]
pub enum Format { pub enum Format {
Text, Text,
Json, Json,

View File

@@ -1,118 +1,246 @@
// src/cli/watch.rs
use anyhow::Result; use anyhow::Result;
use clap::Subcommand; use clap::Subcommand;
use libmarlin::watcher::{WatcherConfig, WatcherState}; use libmarlin::config::Config;
use libmarlin::watcher::{WatcherConfig, WatcherState, WatcherStatus};
use rusqlite::Connection; use rusqlite::Connection;
use std::path::PathBuf; use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tracing::info; use tracing::info;
use serde_json;
use once_cell::sync::Lazy;
use std::sync::Mutex;
#[allow(dead_code)] #[allow(dead_code)]
static LAST_WATCHER_STATE: Lazy<Mutex<Option<WatcherState>>> = Lazy::new(|| Mutex::new(None)); static LAST_WATCHER_STATE: once_cell::sync::Lazy<Mutex<Option<WatcherState>>> =
once_cell::sync::Lazy::new(|| Mutex::new(None));
#[allow(dead_code)] #[allow(dead_code)]
pub fn last_watcher_state() -> Option<WatcherState> { pub fn last_watcher_state() -> Option<WatcherState> {
LAST_WATCHER_STATE.lock().unwrap().clone() LAST_WATCHER_STATE.lock().unwrap().clone()
} }
/// Commands related to file watching functionality
#[derive(Subcommand, Debug)] #[derive(Subcommand, Debug)]
pub enum WatchCmd { pub enum WatchCmd {
/// Start watching a directory for changes
Start { Start {
/// Directory to watch (defaults to current directory)
#[arg(default_value = ".")] #[arg(default_value = ".")]
path: PathBuf, path: PathBuf,
/// Debounce window in milliseconds (default: 100ms)
#[arg(long, default_value = "100")] #[arg(long, default_value = "100")]
debounce_ms: u64, debounce_ms: u64,
}, },
/// Show status of currently active watcher
Status, Status,
/// Stop the currently running watcher
Stop, Stop,
#[command(hide = true)]
Daemon {
path: PathBuf,
debounce_ms: u64,
port: u16,
control: PathBuf,
},
} }
/// Run a watch command #[derive(Serialize, Deserialize)]
pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Result<()> { struct ControlInfo {
pid: u32,
port: u16,
}
#[derive(Serialize, Deserialize)]
struct StatusDto {
state: String,
events_processed: usize,
queue_size: usize,
uptime_secs: u64,
}
fn control_path(db_path: &Path) -> PathBuf {
db_path.with_extension("watch.json")
}
fn choose_port(db_path: &Path) -> u16 {
let mut h = DefaultHasher::new();
db_path.hash(&mut h);
31000 + ((h.finish() % 1000) as u16)
}
fn read_control(path: &Path) -> Result<ControlInfo> {
let txt = std::fs::read_to_string(path)?;
Ok(serde_json::from_str(&txt)?)
}
fn process_alive(pid: u32) -> bool {
#[cfg(unix)]
{
use nix::sys::signal::kill;
use nix::unistd::Pid;
kill(Pid::from_raw(pid as i32), None).is_ok()
}
#[cfg(not(unix))]
{
true // fallback, assume alive
}
}
fn send_request(port: u16, msg: &str) -> Result<String> {
let mut stream = TcpStream::connect(("127.0.0.1", port))?;
stream.write_all(msg.as_bytes())?;
let mut buf = String::new();
stream.read_to_string(&mut buf)?;
Ok(buf)
}
fn status_to_dto(st: WatcherStatus) -> StatusDto {
StatusDto {
state: format!("{:?}", st.state),
events_processed: st.events_processed,
queue_size: st.queue_size,
uptime_secs: st
.start_time
.map(|t| t.elapsed().as_secs())
.unwrap_or_default(),
}
}
pub fn run(cmd: &WatchCmd, _conn: &mut Connection, fmt: super::Format) -> Result<()> {
match cmd { match cmd {
WatchCmd::Start { path, debounce_ms } => { WatchCmd::Start { path, debounce_ms } => {
let cfg = Config::load()?;
let control = control_path(&cfg.db_path);
if control.exists() {
let info = read_control(&control)?;
if process_alive(info.pid) {
info!("Watcher already running with PID {}", info.pid);
return Ok(());
} else {
std::fs::remove_file(&control).ok();
}
}
let port = choose_port(&cfg.db_path);
let exe = std::env::current_exe()?;
let child = std::process::Command::new(exe)
.arg("watch")
.arg("daemon")
.arg("--path")
.arg(path)
.arg("--debounce-ms")
.arg(debounce_ms.to_string())
.arg("--port")
.arg(port.to_string())
.arg("--control")
.arg(&control)
.spawn()?;
info!("Started watcher daemon with PID {}", child.id());
Ok(())
}
WatchCmd::Daemon {
path,
debounce_ms,
port,
control,
} => {
let mut marlin = libmarlin::Marlin::open_default()?; let mut marlin = libmarlin::Marlin::open_default()?;
let config = WatcherConfig { let config = WatcherConfig {
debounce_ms: *debounce_ms, debounce_ms: *debounce_ms,
..Default::default() ..Default::default()
}; };
let canon_path = path.canonicalize().unwrap_or_else(|_| path.clone()); let canon_path = path.canonicalize().unwrap_or_else(|_| path.clone());
info!("Starting watcher for directory: {}", canon_path.display()); let watcher = Arc::new(Mutex::new(marlin.watch(&canon_path, Some(config))?));
let mut watcher = marlin.watch(&canon_path, Some(config))?;
let status = watcher.status()?;
info!("Watcher started. Press Ctrl+C to stop watching.");
info!("Watching {} paths", status.watched_paths.len());
let start_time = Instant::now();
let mut last_status_time = Instant::now();
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
let r_clone = running.clone(); let srv_running = running.clone();
let w_clone = watcher.clone();
ctrlc::set_handler(move || { let port_val = *port;
info!("Ctrl+C received. Signaling watcher to stop..."); let server = thread::spawn(move || {
r_clone.store(false, Ordering::SeqCst); let listener = TcpListener::bind(("127.0.0.1", port_val)).unwrap();
})?; for mut s in listener.incoming().flatten() {
let mut buf = String::new();
info!("Watcher run loop started. Waiting for Ctrl+C or stop signal..."); if s.read_to_string(&mut buf).is_ok() {
if buf.contains("status") {
if let Ok(st) = w_clone.lock().unwrap().status() {
let dto = status_to_dto(st);
let _ = s.write_all(serde_json::to_string(&dto).unwrap().as_bytes());
}
} else if buf.contains("stop") {
let _ = s.write_all(b"ok");
srv_running.store(false, Ordering::SeqCst);
break;
}
}
}
});
let info = ControlInfo {
pid: std::process::id(),
port: *port,
};
std::fs::write(control, serde_json::to_string(&info)?)?;
while running.load(Ordering::SeqCst) { while running.load(Ordering::SeqCst) {
let current_status = watcher.status()?;
if current_status.state == WatcherState::Stopped {
info!("Watcher has stopped (detected by state). Exiting loop.");
break;
}
// Corrected line: removed the extra closing parenthesis
if last_status_time.elapsed() > Duration::from_secs(10) {
let uptime = start_time.elapsed();
info!(
"Watcher running for {}s, processed {} events, queue: {}, state: {:?}",
uptime.as_secs(),
current_status.events_processed,
current_status.queue_size,
current_status.state
);
last_status_time = Instant::now();
}
thread::sleep(Duration::from_millis(200)); thread::sleep(Duration::from_millis(200));
} }
watcher.lock().unwrap().stop()?;
info!("Watcher run loop ended. Explicitly stopping watcher instance..."); server.join().ok();
watcher.stop()?; std::fs::remove_file(control).ok();
{ {
let mut guard = LAST_WATCHER_STATE.lock().unwrap(); let mut guard = LAST_WATCHER_STATE.lock().unwrap();
*guard = Some(watcher.status()?.state); *guard = Some(WatcherState::Stopped);
} }
info!("Watcher instance fully stopped.");
Ok(()) Ok(())
} }
WatchCmd::Status => { WatchCmd::Status => {
info!( let cfg = Config::load()?;
"Status command: No active watcher process to query in this CLI invocation model." let control = control_path(&cfg.db_path);
); if !control.exists() {
info!("To see live status, run 'marlin watch start' which prints periodic updates."); info!("Status command: No active watcher process to query …");
return Ok(());
}
let info = read_control(&control)?;
let resp = send_request(info.port, "status");
match resp {
Ok(txt) => {
if fmt == super::Format::Json {
println!("{txt}");
} else {
let dto: StatusDto = serde_json::from_str(&txt)?;
println!(
"state: {} processed:{} queue:{} uptime:{}s",
dto.state, dto.events_processed, dto.queue_size, dto.uptime_secs
);
}
}
Err(_) => {
info!("Failed to query watcher status");
}
}
Ok(()) Ok(())
} }
WatchCmd::Stop => { WatchCmd::Stop => {
info!("Stop command: No active watcher process to stop in this CLI invocation model."); let cfg = Config::load()?;
info!("Please use Ctrl+C in the terminal where 'marlin watch start' is running."); let control = control_path(&cfg.db_path);
if !control.exists() {
info!("Stop command: No active watcher process to stop …");
return Ok(());
}
let info = read_control(&control)?;
let _ = send_request(info.port, "stop");
let start = Instant::now();
while start.elapsed() < Duration::from_secs(5) {
if !process_alive(info.pid) {
break;
}
thread::sleep(Duration::from_millis(200));
}
if process_alive(info.pid) {
#[cfg(unix)]
{
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let _ = kill(Pid::from_raw(info.pid as i32), Signal::SIGTERM);
}
}
std::fs::remove_file(control).ok();
Ok(()) Ok(())
} }
} }

View File

@@ -2,8 +2,6 @@ use std::thread;
use std::time::Duration; use std::time::Duration;
use tempfile::tempdir; use tempfile::tempdir;
use libc;
use libmarlin::watcher::WatcherState;
use libmarlin::{self as marlin, db}; use libmarlin::{self as marlin, db};
use marlin_cli::cli::watch::WatchCmd; use marlin_cli::cli::watch::WatchCmd;
use marlin_cli::cli::{watch, Format}; use marlin_cli::cli::{watch, Format};
@@ -11,12 +9,10 @@ use marlin_cli::cli::{watch, Format};
#[cfg(unix)] #[cfg(unix)]
#[test] #[test]
fn watch_start_and_stop_quickly() { fn watch_start_and_stop_quickly() {
// TODO: Use a Windows console control handler and enable this test on Windows.
let tmp = tempdir().unwrap(); let tmp = tempdir().unwrap();
let db_path = tmp.path().join("index.db"); let db_path = tmp.path().join("index.db");
std::env::set_var("MARLIN_DB_PATH", &db_path); std::env::set_var("MARLIN_DB_PATH", &db_path);
// create database
let _m = marlin::Marlin::open_default().unwrap(); let _m = marlin::Marlin::open_default().unwrap();
let mut conn = db::open(&db_path).unwrap(); let mut conn = db::open(&db_path).unwrap();
@@ -27,14 +23,13 @@ fn watch_start_and_stop_quickly() {
debounce_ms: 50, debounce_ms: 50,
}; };
// send SIGINT shortly after watcher starts
let t = thread::spawn(|| {
thread::sleep(Duration::from_millis(200));
unsafe { libc::raise(libc::SIGINT) };
});
watch::run(&cmd, &mut conn, Format::Text).unwrap(); watch::run(&cmd, &mut conn, Format::Text).unwrap();
t.join().unwrap(); thread::sleep(Duration::from_millis(500));
assert_eq!(watch::last_watcher_state(), Some(WatcherState::Stopped)); watch::run(&WatchCmd::Status, &mut conn, Format::Text).unwrap();
watch::run(&WatchCmd::Stop, &mut conn, Format::Text).unwrap();
let cfg = libmarlin::config::Config::load().unwrap();
let control = cfg.db_path.with_extension("watch.json");
assert!(!control.exists());
} }