From 3fb6504cb04064dea3cc50ceedc225d91b48f149 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Wed, 21 May 2025 09:23:14 -0400 Subject: [PATCH] Refactor watcher mutex handling and add tests --- cli-bin/src/cli/watch.rs | 8 +- .../tests/integration/watcher/watcher_test.rs | 8 +- libmarlin/src/lib.rs | 2 +- libmarlin/src/watcher.rs | 144 +++++++++++++----- libmarlin/src/watcher_tests.rs | 6 +- 5 files changed, 119 insertions(+), 49 deletions(-) diff --git a/cli-bin/src/cli/watch.rs b/cli-bin/src/cli/watch.rs index 4c3cc9f..a1e8314 100644 --- a/cli-bin/src/cli/watch.rs +++ b/cli-bin/src/cli/watch.rs @@ -56,8 +56,8 @@ pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Re info!("Starting watcher for directory: {}", canon_path.display()); let mut watcher = marlin.watch(&canon_path, Some(config))?; - - let status = watcher.status(); + + let status = watcher.status()?; info!("Watcher started. Press Ctrl+C to stop watching."); info!("Watching {} paths", status.watched_paths.len()); @@ -73,7 +73,7 @@ pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Re info!("Watcher run loop started. Waiting for Ctrl+C or stop signal..."); while running.load(Ordering::SeqCst) { - let current_status = watcher.status(); + let current_status = watcher.status()?; if current_status.state == WatcherState::Stopped { info!("Watcher has stopped (detected by state). Exiting loop."); break; @@ -98,7 +98,7 @@ pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Re watcher.stop()?; { let mut guard = LAST_WATCHER_STATE.lock().unwrap(); - *guard = Some(watcher.status().state); + *guard = Some(watcher.status()?.state); } info!("Watcher instance fully stopped."); Ok(()) diff --git a/cli-bin/tests/integration/watcher/watcher_test.rs b/cli-bin/tests/integration/watcher/watcher_test.rs index 35af06a..1092669 100644 --- a/cli-bin/tests/integration/watcher/watcher_test.rs +++ b/cli-bin/tests/integration/watcher/watcher_test.rs @@ -129,7 +129,7 @@ fn test_basic_watch_functionality() { let finished_watcher = watcher_thread.join().expect("Watcher thread panicked"); // Check status after processing events - let status = finished_watcher.status(); + let status = finished_watcher.status().unwrap(); // Assertions assert_eq!(status.state, WatcherState::Stopped); @@ -190,7 +190,7 @@ fn test_debouncing() { // Complete the test let finished_watcher = watcher_thread.join().expect("Watcher thread panicked"); - let status = finished_watcher.status(); + let status = finished_watcher.status().unwrap(); // We should have processed fewer events than modifications made // due to debouncing (exact count depends on implementation details) @@ -248,7 +248,7 @@ fn test_event_flood() { // Complete the test let finished_watcher = watcher_thread.join().expect("Watcher thread panicked"); - let status = finished_watcher.status(); + let status = finished_watcher.status().unwrap(); // Verify processing occurred assert!(status.events_processed > 0, "Expected events to be processed"); @@ -355,7 +355,7 @@ fn test_graceful_shutdown() { "Shutdown took too long"); // Verify final state - let status = watcher.status(); + let status = watcher.status().unwrap(); assert_eq!(status.state, WatcherState::Stopped); assert_eq!(status.queue_size, 0, "Queue should be empty after shutdown"); diff --git a/libmarlin/src/lib.rs b/libmarlin/src/lib.rs index 87ba697..89d0e0a 100644 --- a/libmarlin/src/lib.rs +++ b/libmarlin/src/lib.rs @@ -199,7 +199,7 @@ impl Marlin { let watcher_db = Arc::new(Mutex::new(db::Database::new(new_conn))); let mut owned_w = watcher::FileWatcher::new(vec![p], cfg)?; - owned_w.with_database(watcher_db); // Modifies owned_w in place + owned_w.with_database(watcher_db)?; // Modifies owned_w in place owned_w.start()?; // Start the watcher after it has been fully configured Ok(owned_w) // Return the owned FileWatcher diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index f5f428d..68e92a7 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -367,7 +367,13 @@ impl FileWatcher { let mut debouncer = EventDebouncer::new(config_clone.debounce_ms); while !stop_flag_clone.load(Ordering::Relaxed) { - let current_state = { state_clone.lock().unwrap().clone() }; + let current_state = match state_clone.lock() { + Ok(g) => g.clone(), + Err(_) => { + eprintln!("state mutex poisoned"); + break; + } + }; if current_state == WatcherState::Paused { thread::sleep(Duration::from_millis(100)); @@ -414,14 +420,23 @@ impl FileWatcher { let num_evts = evts_to_process.len(); events_processed_clone.fetch_add(num_evts, Ordering::SeqCst); - let db_guard_option = db_captured_for_thread.lock().unwrap(); + let db_guard_option = match db_captured_for_thread.lock() { + Ok(g) => g, + Err(_) => { + eprintln!("db_shared mutex poisoned"); + break; + } + }; if let Some(db_mutex) = &*db_guard_option { - let mut _db_instance_guard = db_mutex.lock().unwrap(); - for event_item in &evts_to_process { - println!( - "Processing event (DB available): {:?} for path {:?}", - event_item.kind, event_item.path - ); + if let Ok(mut _db_instance_guard) = db_mutex.lock() { + for event_item in &evts_to_process { + println!( + "Processing event (DB available): {:?} for path {:?}", + event_item.kind, event_item.path + ); + } + } else { + eprintln!("db mutex poisoned"); } } else { for event_item in &evts_to_process { @@ -445,8 +460,11 @@ impl FileWatcher { ); } } - let mut final_state_guard = state_clone.lock().unwrap(); - *final_state_guard = WatcherState::Stopped; + if let Ok(mut final_state_guard) = state_clone.lock() { + *final_state_guard = WatcherState::Stopped; + } else { + eprintln!("state mutex poisoned on shutdown"); + } }); Ok(Self { @@ -464,16 +482,22 @@ impl FileWatcher { }) } - pub fn with_database(&mut self, db_arc: Arc>) -> &mut Self { - { - let mut shared_db_guard = self.db_shared.lock().unwrap(); + pub fn with_database(&mut self, db_arc: Arc>) -> Result<&mut Self> { + { + let mut shared_db_guard = self + .db_shared + .lock() + .map_err(|_| anyhow::anyhow!("db_shared mutex poisoned"))?; *shared_db_guard = Some(db_arc); - } - self + } + Ok(self) } pub fn start(&mut self) -> Result<()> { - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self + .state + .lock() + .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; if *state_guard == WatcherState::Watching || self.processor_thread.is_none() { if self.processor_thread.is_none() { return Err(anyhow::anyhow!("Watcher thread not available to start.")); @@ -492,7 +516,10 @@ impl FileWatcher { } pub fn pause(&mut self) -> Result<()> { - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self + .state + .lock() + .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; match *state_guard { WatcherState::Watching => { *state_guard = WatcherState::Paused; @@ -504,7 +531,10 @@ impl FileWatcher { } pub fn resume(&mut self) -> Result<()> { - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self + .state + .lock() + .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; match *state_guard { WatcherState::Paused => { *state_guard = WatcherState::Watching; @@ -516,7 +546,10 @@ impl FileWatcher { } pub fn stop(&mut self) -> Result<()> { - let mut current_state_guard = self.state.lock().unwrap(); + let mut current_state_guard = self + .state + .lock() + .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; if *current_state_guard == WatcherState::Stopped || *current_state_guard == WatcherState::ShuttingDown { return Ok(()); } @@ -534,20 +567,27 @@ impl FileWatcher { } } - let mut final_state_guard = self.state.lock().unwrap(); + let mut final_state_guard = self + .state + .lock() + .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; *final_state_guard = WatcherState::Stopped; Ok(()) } - pub fn status(&self) -> WatcherStatus { - let state_guard = self.state.lock().unwrap().clone(); - WatcherStatus { + pub fn status(&self) -> Result { + let state_guard = self + .state + .lock() + .map_err(|_| anyhow::anyhow!("state mutex poisoned"))? + .clone(); + Ok(WatcherStatus { state: state_guard, events_processed: self.events_processed.load(Ordering::SeqCst), queue_size: self.queue_size.load(Ordering::SeqCst), start_time: Some(self.start_time), watched_paths: self.watched_paths.clone(), - } + }) } } @@ -576,28 +616,28 @@ mod file_watcher_state_tests { let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher"); - assert_eq!(watcher.status().state, WatcherState::Initializing); + assert_eq!(watcher.status().unwrap().state, WatcherState::Initializing); watcher.start().expect("Start failed"); - assert_eq!(watcher.status().state, WatcherState::Watching); + assert_eq!(watcher.status().unwrap().state, WatcherState::Watching); watcher.pause().expect("Pause failed"); - assert_eq!(watcher.status().state, WatcherState::Paused); + assert_eq!(watcher.status().unwrap().state, WatcherState::Paused); watcher.pause().expect("Second pause failed"); - assert_eq!(watcher.status().state, WatcherState::Paused); + assert_eq!(watcher.status().unwrap().state, WatcherState::Paused); watcher.resume().expect("Resume failed"); - assert_eq!(watcher.status().state, WatcherState::Watching); + assert_eq!(watcher.status().unwrap().state, WatcherState::Watching); watcher.resume().expect("Second resume failed"); - assert_eq!(watcher.status().state, WatcherState::Watching); + assert_eq!(watcher.status().unwrap().state, WatcherState::Watching); watcher.stop().expect("Stop failed"); - assert_eq!(watcher.status().state, WatcherState::Stopped); + assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped); watcher.stop().expect("Second stop failed"); - assert_eq!(watcher.status().state, WatcherState::Stopped); + assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped); } #[test] @@ -607,14 +647,20 @@ mod file_watcher_state_tests { let mut watcher = FileWatcher::new(vec![tmp_dir.path().to_path_buf()], WatcherConfig::default()).unwrap(); { - let mut state_guard = watcher.state.lock().unwrap(); - *state_guard = WatcherState::Watching; + let mut state_guard = watcher + .state + .lock() + .expect("state mutex poisoned"); + *state_guard = WatcherState::Watching; } assert!(watcher.start().is_ok(), "Should be able to call start when already Watching (idempotent state change)"); - assert_eq!(watcher.status().state, WatcherState::Watching); + assert_eq!(watcher.status().unwrap().state, WatcherState::Watching); { - let mut state_guard = watcher.state.lock().unwrap(); + let mut state_guard = watcher + .state + .lock() + .expect("state mutex poisoned"); *state_guard = WatcherState::ShuttingDown; } assert!(watcher.start().is_err(), "Should not be able to start from ShuttingDown"); @@ -634,10 +680,34 @@ mod file_watcher_state_tests { #[test] fn test_watcher_default_config() { - let config = WatcherConfig::default(); + let config = WatcherConfig::default(); assert_eq!(config.debounce_ms, 100); assert_eq!(config.batch_size, 1000); assert_eq!(config.max_queue_size, 100_000); assert_eq!(config.drain_timeout_ms, 5000); } + + #[test] + fn test_poisoned_state_mutex_errors() { + let tmp_dir = tempdir().unwrap(); + let watch_path = tmp_dir.path().to_path_buf(); + FsMod::create_dir_all(&watch_path).expect("Failed to create temp dir for watching"); + + let config = WatcherConfig::default(); + + let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher"); + + let state_arc = watcher.state.clone(); + let _ = std::thread::spawn(move || { + let _guard = state_arc.lock().unwrap(); + panic!("poison"); + }) + .join(); + + assert!(watcher.start().is_err()); + assert!(watcher.pause().is_err()); + assert!(watcher.resume().is_err()); + assert!(watcher.stop().is_err()); + assert!(watcher.status().is_err()); + } } \ No newline at end of file diff --git a/libmarlin/src/watcher_tests.rs b/libmarlin/src/watcher_tests.rs index 888c408..2d126c2 100644 --- a/libmarlin/src/watcher_tests.rs +++ b/libmarlin/src/watcher_tests.rs @@ -40,7 +40,7 @@ mod tests { .expect("Failed to create watcher"); watcher.start().expect("Failed to start watcher"); - assert_eq!(watcher.status().state, WatcherState::Watching); + assert_eq!(watcher.status().unwrap().state, WatcherState::Watching); thread::sleep(Duration::from_millis(200)); let new_file_path = temp_path.join("new_file.txt"); @@ -63,8 +63,8 @@ mod tests { thread::sleep(Duration::from_millis(500)); watcher.stop().expect("Failed to stop watcher"); - assert_eq!(watcher.status().state, WatcherState::Stopped); - assert!(watcher.status().events_processed > 0, "Expected some file events to be processed"); + assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped); + assert!(watcher.status().unwrap().events_processed > 0, "Expected some file events to be processed"); } #[test]