diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index 73d7d23..f95181a 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -1,17 +1,16 @@ -// libmarlin/src/watcher.rs - //! File system watcher implementation for Marlin //! -//! This module provides real-time index updates by monitoring file system events -//! (create, modify, delete) using the `notify` crate. It implements event debouncing, -//! batch processing, and a state machine for robust lifecycle management. +//! This module provides real-time index updates by monitoring file-system +//! events (create/modify/delete/rename) using the `notify` crate. It adds +//! event-debouncing, batch processing and a small state-machine so that the +//! watcher can be paused, resumed and shut down cleanly. use crate::db::{self, Database}; use anyhow::{Context, Result}; use crossbeam_channel::{bounded, Receiver}; use notify::{ - event::ModifyKind, Event, EventKind, RecommendedWatcher, RecursiveMode, - Watcher as NotifyWatcherTrait, + event::{ModifyKind, RemoveKind, RenameMode}, + Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait, }; use std::collections::HashMap; use std::path::PathBuf; @@ -21,19 +20,12 @@ use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant}; use tracing::info; -/// Configuration for the file watcher +// ────── configuration ───────────────────────────────────────────────────────── #[derive(Debug, Clone)] pub struct WatcherConfig { - /// Time in milliseconds to debounce file events pub debounce_ms: u64, - - /// Maximum number of events to process in a single batch pub batch_size: usize, - - /// Maximum size of the event queue before applying backpressure pub max_queue_size: usize, - - /// Time in milliseconds to wait for events to drain during shutdown pub drain_timeout_ms: u64, } @@ -41,14 +33,14 @@ impl Default for WatcherConfig { fn default() -> Self { Self { debounce_ms: 100, - batch_size: 1000, + batch_size: 1_000, max_queue_size: 100_000, - drain_timeout_ms: 5000, + drain_timeout_ms: 5_000, } } } -/// State of the file watcher +// ────── public state/useful telemetry ──────────────────────────────────────── #[derive(Debug, Clone, PartialEq, Eq)] pub enum WatcherState { Initializing, @@ -58,7 +50,6 @@ pub enum WatcherState { Stopped, } -/// Status information about the file watcher #[derive(Debug, Clone)] pub struct WatcherStatus { pub state: WatcherState, @@ -68,6 +59,7 @@ pub struct WatcherStatus { pub watched_paths: Vec, } +// ────── internal bookkeeping ───────────────────────────────────────────────── #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] enum EventPriority { Create = 0, @@ -103,19 +95,20 @@ impl EventDebouncer { fn add_event(&mut self, event: ProcessedEvent) { let path = event.path.clone(); + + // If we receive an event for a directory, purge any queued events under it if path.is_dir() { - // This relies on the PathBuf itself knowing if it's a directory - // or on the underlying FS. For unit tests, ensure paths are created. self.events - .retain(|file_path, _| !file_path.starts_with(&path) || file_path == &path); + .retain(|p, _| !p.starts_with(&path) || p == &path); } + match self.events.get_mut(&path) { Some(existing) => { if event.priority < existing.priority { existing.priority = event.priority; } - existing.timestamp = event.timestamp; existing.kind = event.kind; + existing.timestamp = event.timestamp; } None => { self.events.insert(path, event); @@ -128,10 +121,10 @@ impl EventDebouncer { } fn flush(&mut self) -> Vec { - let mut events: Vec = self.events.drain().map(|(_, e)| e).collect(); - events.sort_by_key(|e| e.priority); + let mut v: Vec<_> = self.events.drain().map(|(_, e)| e).collect(); + v.sort_by_key(|e| e.priority); self.last_flush = Instant::now(); - events + v } fn len(&self) -> usize { @@ -139,11 +132,425 @@ impl EventDebouncer { } } +// ────── main watcher struct ─────────────────────────────────────────────────── +pub struct FileWatcher { + state: Arc>, + _config: WatcherConfig, + watched_paths: Vec, + _event_receiver: Receiver>, + _watcher: RecommendedWatcher, + processor_thread: Option>, + stop_flag: Arc, + events_processed: Arc, + queue_size: Arc, + start_time: Instant, + db_shared: Arc>>>>, +} + +impl FileWatcher { + pub fn new(paths: Vec, config: WatcherConfig) -> Result { + // ── basic shared state/channels ─────────────────────────────────────── + let stop_flag = Arc::new(AtomicBool::new(false)); + let events_processed = Arc::new(AtomicUsize::new(0)); + let queue_size = Arc::new(AtomicUsize::new(0)); + let state = Arc::new(Mutex::new(WatcherState::Initializing)); + + let (tx, rx) = bounded(config.max_queue_size); + + // ── start actual OS watcher ─────────────────────────────────────────── + let event_tx = tx.clone(); + let mut actual_watcher = RecommendedWatcher::new( + move |ev| { + let _ = event_tx.send(ev); + }, + notify::Config::default(), + )?; + + for p in &paths { + actual_watcher + .watch(p, RecursiveMode::Recursive) + .with_context(|| format!("Failed to watch path {}", p.display()))?; + } + + // ── spawn processor thread ──────────────────────────────────────────── + let config_clone = config.clone(); + let stop_flag_clone = stop_flag.clone(); + let events_processed_clone = events_processed.clone(); + let queue_size_clone = queue_size.clone(); + let state_clone = state.clone(); + let receiver_clone = rx.clone(); + + let db_shared_for_thread: Arc>>>> = + Arc::new(Mutex::new(None)); + let db_for_thread = db_shared_for_thread.clone(); + + let processor_thread = thread::spawn(move || { + let mut debouncer = EventDebouncer::new(config_clone.debounce_ms); + let mut rename_cache: HashMap = HashMap::new(); + let mut pending_remove: Option<(PathBuf, Instant)> = None; + + while !stop_flag_clone.load(Ordering::Relaxed) { + // honour current state + let cur_state = { + match state_clone.lock() { + Ok(g) => g.clone(), + Err(_) => break, + } + }; + + match cur_state { + WatcherState::Paused | WatcherState::Initializing => { + thread::sleep(Duration::from_millis(100)); + continue; + } + WatcherState::ShuttingDown | WatcherState::Stopped => break, + WatcherState::Watching => {} // normal path + } + + // ── drain events (bounded by batch_size) ───────────────────── + let mut processed_in_batch = 0; + while let Ok(evt_res) = receiver_clone.try_recv() { + processed_in_batch += 1; + match evt_res { + Ok(event) => { + let prio = match event.kind { + EventKind::Create(_) => EventPriority::Create, + EventKind::Remove(_) => EventPriority::Delete, + EventKind::Modify(_) => EventPriority::Modify, + EventKind::Access(_) => EventPriority::Access, + _ => EventPriority::Modify, + }; + + // ── per-event logic ─────────────────────────────── + match event.kind { + // 1. remove-then-create → rename heuristic + EventKind::Remove(_) if event.paths.len() == 1 => { + pending_remove = Some((event.paths[0].clone(), Instant::now())); + } + + EventKind::Create(_) if event.paths.len() == 1 => { + if let Some((old_p, ts)) = pending_remove.take() { + if Instant::now().duration_since(ts) + <= Duration::from_millis(500) + { + let new_p = event.paths[0].clone(); + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + continue; // handled as rename + } else { + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: None, + new_path: None, + kind: EventKind::Remove(RemoveKind::Any), + priority: EventPriority::Delete, + timestamp: ts, + }); + } + } + + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + + // 2. native rename events from notify + EventKind::Modify(ModifyKind::Name(mode)) => match mode { + RenameMode::Both => { + if event.paths.len() >= 2 { + let old_p = event.paths[0].clone(); + let new_p = event.paths[1].clone(); + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + } + } + RenameMode::From => { + if let (Some(trk), Some(p)) = + (event.tracker(), event.paths.first()) + { + rename_cache.insert(trk, p.clone()); + } + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + RenameMode::To => { + if let (Some(trk), Some(new_p)) = + (event.tracker(), event.paths.first()) + { + if let Some(old_p) = rename_cache.remove(&trk) { + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p.clone()), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + continue; + } + } + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + _ => { + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + }, + + // 3. everything else + _ => { + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + } // end match event.kind + } // <--- closes Ok(event) + Err(e) => eprintln!("watcher channel error: {:?}", e), + } + + if processed_in_batch >= config_clone.batch_size { + break; + } + } + + // deal with orphaned remove + if let Some((old_p, ts)) = pending_remove.take() { + if Instant::now().duration_since(ts) > Duration::from_millis(500) { + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: None, + new_path: None, + kind: EventKind::Remove(RemoveKind::Any), + priority: EventPriority::Delete, + timestamp: ts, + }); + } else { + pending_remove = Some((old_p, ts)); + } + } + + queue_size_clone.store(debouncer.len(), Ordering::SeqCst); + + // flush if ready + if debouncer.is_ready_to_flush() && debouncer.len() > 0 { + let to_process = debouncer.flush(); + events_processed_clone.fetch_add(to_process.len(), Ordering::SeqCst); + + let maybe_db = db_for_thread.lock().ok().and_then(|g| g.clone()); + + for ev in &to_process { + if let Some(db_mutex) = &maybe_db { + // update DB for renames + if let EventKind::Modify(ModifyKind::Name(_)) = ev.kind { + if let (Some(old_p), Some(new_p)) = (&ev.old_path, &ev.new_path) { + let old_s = old_p.to_string_lossy(); + let new_s = new_p.to_string_lossy(); + let res = if new_p.is_dir() { + db::rename_directory( + db_mutex.lock().unwrap().conn_mut(), + &old_s, + &new_s, + ) + } else { + db::update_file_path( + db_mutex.lock().unwrap().conn_mut(), + &old_s, + &new_s, + ) + }; + if let Err(e) = res { + eprintln!("DB rename error: {:?}", e); + } + } + } + info!("processed (DB) {:?} {:?}", ev.kind, ev.path); + } else { + info!("processed {:?} {:?}", ev.kind, ev.path); + } + } + } + + thread::sleep(Duration::from_millis(50)); + } // main loop + + // final flush on shutdown + if debouncer.len() > 0 { + let final_evts = debouncer.flush(); + events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst); + for ev in &final_evts { + info!("processing final event {:?} {:?}", ev.kind, ev.path); + } + } + + if let Ok(mut g) = state_clone.lock() { + *g = WatcherState::Stopped; + } + }); + + // ── return constructed watcher ─────────────────────────────────────── + Ok(Self { + state, + _config: config, + watched_paths: paths, + _event_receiver: rx, + _watcher: actual_watcher, + processor_thread: Some(processor_thread), + stop_flag, + events_processed, + queue_size, + start_time: Instant::now(), + db_shared: db_shared_for_thread, + }) + } + + // ── public API //////////////////////////////////////////////////////////// + pub fn with_database(&mut self, db: Arc>) -> Result<&mut Self> { + *self + .db_shared + .lock() + .map_err(|_| anyhow::anyhow!("db mutex poisoned"))? = Some(db); + Ok(self) + } + + pub fn start(&mut self) -> Result<()> { + let mut g = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?; + match *g { + WatcherState::Initializing | WatcherState::Paused => { + *g = WatcherState::Watching; + Ok(()) + } + WatcherState::Watching => Ok(()), // idempotent + _ => Err(anyhow::anyhow!("cannot start from {:?}", *g)), + } + } + + pub fn pause(&mut self) -> Result<()> { + let mut g = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?; + match *g { + WatcherState::Watching => { + *g = WatcherState::Paused; + Ok(()) + } + WatcherState::Paused => Ok(()), + _ => Err(anyhow::anyhow!("cannot pause from {:?}", *g)), + } + } + + pub fn resume(&mut self) -> Result<()> { + let mut g = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?; + match *g { + WatcherState::Paused => { + *g = WatcherState::Watching; + Ok(()) + } + WatcherState::Watching => Ok(()), + _ => Err(anyhow::anyhow!("cannot resume from {:?}", *g)), + } + } + + pub fn stop(&mut self) -> Result<()> { + { + let mut g = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?; + if matches!(*g, WatcherState::Stopped | WatcherState::ShuttingDown) { + return Ok(()); + } + *g = WatcherState::ShuttingDown; + } + + self.stop_flag.store(true, Ordering::SeqCst); + + if let Some(h) = self.processor_thread.take() { + let _ = h.join(); + } + + *self + .state + .lock() + .map_err(|_| anyhow::anyhow!("state"))? = WatcherState::Stopped; + Ok(()) + } + + pub fn status(&self) -> Result { + let st = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?.clone(); + Ok(WatcherStatus { + state: st, + events_processed: self.events_processed.load(Ordering::SeqCst), + queue_size: self.queue_size.load(Ordering::SeqCst), + start_time: Some(self.start_time), + watched_paths: self.watched_paths.clone(), + }) + } +} + +impl Drop for FileWatcher { + fn drop(&mut self) { + let _ = self.stop(); // ignore errors during drop + } +} + +// ────── tests ──────────────────────────────────────────────────────────────── #[cfg(test)] mod event_debouncer_tests { use super::*; use notify::event::{CreateKind, DataChange, ModifyKind, RemoveKind, RenameMode}; - use std::fs; // fs is needed for these tests to create dirs/files + use std::fs; use tempfile; #[test] @@ -364,381 +771,11 @@ mod event_debouncer_tests { } } -pub struct FileWatcher { - state: Arc>, - _config: WatcherConfig, - watched_paths: Vec, - _event_receiver: Receiver>, - _watcher: RecommendedWatcher, - processor_thread: Option>, - stop_flag: Arc, - events_processed: Arc, - queue_size: Arc, - start_time: Instant, - db_shared: Arc>>>>, -} - -impl FileWatcher { - pub fn new(paths: Vec, config: WatcherConfig) -> Result { - let stop_flag = Arc::new(AtomicBool::new(false)); - let events_processed = Arc::new(AtomicUsize::new(0)); - let queue_size = Arc::new(AtomicUsize::new(0)); - let state = Arc::new(Mutex::new(WatcherState::Initializing)); - - let (tx, rx) = bounded(config.max_queue_size); - - let event_tx = tx.clone(); - let mut actual_watcher = RecommendedWatcher::new( - move |event_res: std::result::Result| { - if event_tx.send(event_res).is_err() { - // Receiver dropped - } - }, - notify::Config::default(), - )?; - - for path_to_watch in &paths { - actual_watcher - .watch(path_to_watch, RecursiveMode::Recursive) - .with_context(|| format!("Failed to watch path: {}", path_to_watch.display()))?; - } - - let config_clone = config.clone(); - let stop_flag_clone = stop_flag.clone(); - let events_processed_clone = events_processed.clone(); - let queue_size_clone = queue_size.clone(); - let state_clone = state.clone(); - let receiver_clone = rx.clone(); - - let db_shared_for_thread = Arc::new(Mutex::new(None::>>)); - let db_captured_for_thread = db_shared_for_thread.clone(); - - let processor_thread = thread::spawn(move || { - let mut debouncer = EventDebouncer::new(config_clone.debounce_ms); - - while !stop_flag_clone.load(Ordering::Relaxed) { - let current_state = match state_clone.lock() { - Ok(g) => g.clone(), - Err(_) => { - eprintln!("state mutex poisoned"); - break; - } - }; - - if current_state == WatcherState::Paused { - thread::sleep(Duration::from_millis(100)); - continue; - } - if current_state == WatcherState::ShuttingDown - || current_state == WatcherState::Stopped - { - break; - } - - let mut received_in_batch = 0; - while let Ok(evt_res) = receiver_clone.try_recv() { - received_in_batch += 1; - match evt_res { - Ok(event) => { - let prio = match event.kind { - EventKind::Create(_) => EventPriority::Create, - EventKind::Remove(_) => EventPriority::Delete, - EventKind::Modify(_) => EventPriority::Modify, - EventKind::Access(_) => EventPriority::Access, - _ => EventPriority::Modify, - }; - - if matches!(event.kind, EventKind::Modify(ModifyKind::Name(_))) - && event.paths.len() >= 2 - { - let old_path = event.paths[0].clone(); - let new_path = event.paths[1].clone(); - debouncer.add_event(ProcessedEvent { - path: old_path.clone(), - old_path: Some(old_path), - new_path: Some(new_path), - kind: event.kind, - priority: prio, - timestamp: Instant::now(), - }); - } else { - for path in event.paths { - debouncer.add_event(ProcessedEvent { - path, - old_path: None, - new_path: None, - kind: event.kind, - priority: prio, - timestamp: Instant::now(), - }); - } - } - } - Err(e) => { - eprintln!("Watcher channel error: {:?}", e); - } - } - if received_in_batch >= config_clone.batch_size { - break; - } - } - - queue_size_clone.store(debouncer.len(), Ordering::SeqCst); - - if debouncer.is_ready_to_flush() && debouncer.len() > 0 { - let evts_to_process = debouncer.flush(); - let num_evts = evts_to_process.len(); - events_processed_clone.fetch_add(num_evts, Ordering::SeqCst); - - let db_guard_option = match db_captured_for_thread.lock() { - Ok(g) => g, - Err(_) => { - eprintln!("db_shared mutex poisoned"); - break; - } - }; - if let Some(db_mutex) = &*db_guard_option { - if let Ok(mut db_inst) = db_mutex.lock() { - for event_item in &evts_to_process { - if let EventKind::Modify(ModifyKind::Name(_)) = event_item.kind { - if let (Some(ref old_p), Some(ref new_p)) = - (&event_item.old_path, &event_item.new_path) - { - let old_str = old_p.to_string_lossy(); - let new_str = new_p.to_string_lossy(); - let res = if new_p.is_dir() { - db::rename_directory( - db_inst.conn_mut(), - &old_str, - &new_str, - ) - } else { - db::update_file_path( - db_inst.conn_mut(), - &old_str, - &new_str, - ) - }; - if let Err(e) = res { - eprintln!("DB rename error: {:?}", e); - } - } - } - info!( - "Processing event (DB available): {:?} for path {:?}", - event_item.kind, event_item.path - ); - } - } else { - eprintln!("db mutex poisoned"); - } - } else { - for event_item in &evts_to_process { - info!( - "Processing event (no DB): {:?} for path {:?}", - event_item.kind, event_item.path - ); - } - } - } - thread::sleep(Duration::from_millis(50)); - } - - if debouncer.len() > 0 { - let final_evts = debouncer.flush(); - events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst); - if let Some(db_mutex) = &*db_captured_for_thread.lock().unwrap() { - if let Ok(mut db_inst) = db_mutex.lock() { - for processed_event in &final_evts { - if let EventKind::Modify(ModifyKind::Name(_)) = processed_event.kind { - if let (Some(ref old_p), Some(ref new_p)) = - (&processed_event.old_path, &processed_event.new_path) - { - let old_str = old_p.to_string_lossy(); - let new_str = new_p.to_string_lossy(); - let res = if new_p.is_dir() { - db::rename_directory(db_inst.conn_mut(), &old_str, &new_str) - } else { - db::update_file_path(db_inst.conn_mut(), &old_str, &new_str) - }; - if let Err(e) = res { - eprintln!("DB rename error: {:?}", e); - } - } - } - info!( - "Processing final event: {:?} for path {:?}", - processed_event.kind, processed_event.path - ); - } - } - } else { - for processed_event in final_evts { - info!( - "Processing final event: {:?} for path {:?}", - processed_event.kind, processed_event.path - ); - } - } - } - if let Ok(mut final_state_guard) = state_clone.lock() { - *final_state_guard = WatcherState::Stopped; - } else { - eprintln!("state mutex poisoned on shutdown"); - } - }); - - Ok(Self { - state, - _config: config, - watched_paths: paths, - _event_receiver: rx, - _watcher: actual_watcher, - processor_thread: Some(processor_thread), - stop_flag, - events_processed, - queue_size, - start_time: Instant::now(), - db_shared: db_shared_for_thread, - }) - } - - pub fn with_database(&mut self, db_arc: Arc>) -> Result<&mut Self> { - { - let mut shared_db_guard = self - .db_shared - .lock() - .map_err(|_| anyhow::anyhow!("db_shared mutex poisoned"))?; - *shared_db_guard = Some(db_arc); - } - Ok(self) - } - - pub fn start(&mut self) -> Result<()> { - let mut state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - if *state_guard == WatcherState::Watching || self.processor_thread.is_none() { - if self.processor_thread.is_none() { - return Err(anyhow::anyhow!("Watcher thread not available to start.")); - } - if *state_guard == WatcherState::Initializing { - *state_guard = WatcherState::Watching; - } - return Ok(()); - } - if *state_guard != WatcherState::Initializing - && *state_guard != WatcherState::Stopped - && *state_guard != WatcherState::Paused - { - return Err(anyhow::anyhow!(format!( - "Cannot start watcher from state {:?}", - *state_guard - ))); - } - - *state_guard = WatcherState::Watching; - Ok(()) - } - - pub fn pause(&mut self) -> Result<()> { - let mut state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - match *state_guard { - WatcherState::Watching => { - *state_guard = WatcherState::Paused; - Ok(()) - } - WatcherState::Paused => Ok(()), - _ => Err(anyhow::anyhow!(format!( - "Watcher not in watching state to pause (current: {:?})", - *state_guard - ))), - } - } - - pub fn resume(&mut self) -> Result<()> { - let mut state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - match *state_guard { - WatcherState::Paused => { - *state_guard = WatcherState::Watching; - Ok(()) - } - WatcherState::Watching => Ok(()), - _ => Err(anyhow::anyhow!(format!( - "Watcher not in paused state to resume (current: {:?})", - *state_guard - ))), - } - } - - pub fn stop(&mut self) -> Result<()> { - let mut current_state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - if *current_state_guard == WatcherState::Stopped - || *current_state_guard == WatcherState::ShuttingDown - { - return Ok(()); - } - *current_state_guard = WatcherState::ShuttingDown; - drop(current_state_guard); - - self.stop_flag.store(true, Ordering::SeqCst); - - if let Some(handle) = self.processor_thread.take() { - match handle.join() { - Ok(_) => { /* Thread joined cleanly */ } - Err(join_err) => { - eprintln!("Watcher processor thread panicked: {:?}", join_err); - } - } - } - - let mut final_state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - *final_state_guard = WatcherState::Stopped; - Ok(()) - } - - pub fn status(&self) -> Result { - let state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))? - .clone(); - Ok(WatcherStatus { - state: state_guard, - events_processed: self.events_processed.load(Ordering::SeqCst), - queue_size: self.queue_size.load(Ordering::SeqCst), - start_time: Some(self.start_time), - watched_paths: self.watched_paths.clone(), - }) - } -} - -impl Drop for FileWatcher { - fn drop(&mut self) { - if let Err(e) = self.stop() { - eprintln!("Error stopping watcher in Drop: {:?}", e); - } - } -} - #[cfg(test)] mod file_watcher_state_tests { use super::*; use std::fs as FsMod; - use tempfile::tempdir; // Alias to avoid conflict with local `fs` module name if any + use tempfile::tempdir; #[test] fn test_watcher_pause_resume_stop() { @@ -747,7 +784,6 @@ mod file_watcher_state_tests { FsMod::create_dir_all(&watch_path).expect("Failed to create temp dir for watching"); let config = WatcherConfig::default(); - let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher"); @@ -782,66 +818,58 @@ mod file_watcher_state_tests { let mut watcher = FileWatcher::new(vec![tmp_dir.path().to_path_buf()], WatcherConfig::default()).unwrap(); + // already watching { - let mut state_guard = watcher.state.lock().expect("state mutex poisoned"); - *state_guard = WatcherState::Watching; + let mut g = watcher.state.lock().unwrap(); + *g = WatcherState::Watching; } - assert!( - watcher.start().is_ok(), - "Should be able to call start when already Watching (idempotent state change)" - ); + assert!(watcher.start().is_ok()); assert_eq!(watcher.status().unwrap().state, WatcherState::Watching); + // invalid transition { - let mut state_guard = watcher.state.lock().expect("state mutex poisoned"); - *state_guard = WatcherState::ShuttingDown; + let mut g = watcher.state.lock().unwrap(); + *g = WatcherState::ShuttingDown; } - assert!( - watcher.start().is_err(), - "Should not be able to start from ShuttingDown" - ); + assert!(watcher.start().is_err()); } #[test] fn test_new_watcher_with_nonexistent_path() { - let non_existent_path = + let bogus = PathBuf::from("/path/that/REALLY/does/not/exist/for/sure/and/cannot/be/created"); - let config = WatcherConfig::default(); - let watcher_result = FileWatcher::new(vec![non_existent_path], config); - assert!(watcher_result.is_err()); - if let Err(e) = watcher_result { - let err_string = e.to_string(); + let res = FileWatcher::new(vec![bogus], WatcherConfig::default()); + assert!(res.is_err()); + if let Err(e) = res { + let msg = e.to_string(); assert!( - err_string.contains("Failed to watch path") || err_string.contains("os error 2"), - "Error was: {}", - err_string + msg.contains("Failed to watch path") || msg.contains("os error 2"), + "got: {msg}" ); } } #[test] fn test_watcher_default_config() { - let config = WatcherConfig::default(); - assert_eq!(config.debounce_ms, 100); - assert_eq!(config.batch_size, 1000); - assert_eq!(config.max_queue_size, 100_000); - assert_eq!(config.drain_timeout_ms, 5000); + let cfg = WatcherConfig::default(); + assert_eq!(cfg.debounce_ms, 100); + assert_eq!(cfg.batch_size, 1_000); + assert_eq!(cfg.max_queue_size, 100_000); + assert_eq!(cfg.drain_timeout_ms, 5_000); } #[test] fn test_poisoned_state_mutex_errors() { let tmp_dir = tempdir().unwrap(); let watch_path = tmp_dir.path().to_path_buf(); - FsMod::create_dir_all(&watch_path).expect("Failed to create temp dir for watching"); - - let config = WatcherConfig::default(); + FsMod::create_dir_all(&watch_path).unwrap(); let mut watcher = - FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher"); + FileWatcher::new(vec![watch_path], WatcherConfig::default()).expect("create"); let state_arc = watcher.state.clone(); let _ = std::thread::spawn(move || { - let _guard = state_arc.lock().unwrap(); + let _g = state_arc.lock().unwrap(); panic!("poison"); }) .join();