Merge pull request #31 from PR0M3TH3AN/codex/replace-unwrap-with-safer-error-handling-in-watcher.rs

Handle mutex poisoning in watcher
This commit is contained in:
thePR0M3TH3AN
2025-05-21 09:23:26 -04:00
committed by GitHub
5 changed files with 119 additions and 49 deletions

View File

@@ -56,8 +56,8 @@ pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Re
info!("Starting watcher for directory: {}", canon_path.display());
let mut watcher = marlin.watch(&canon_path, Some(config))?;
let status = watcher.status();
let status = watcher.status()?;
info!("Watcher started. Press Ctrl+C to stop watching.");
info!("Watching {} paths", status.watched_paths.len());
@@ -73,7 +73,7 @@ pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Re
info!("Watcher run loop started. Waiting for Ctrl+C or stop signal...");
while running.load(Ordering::SeqCst) {
let current_status = watcher.status();
let current_status = watcher.status()?;
if current_status.state == WatcherState::Stopped {
info!("Watcher has stopped (detected by state). Exiting loop.");
break;
@@ -98,7 +98,7 @@ pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Re
watcher.stop()?;
{
let mut guard = LAST_WATCHER_STATE.lock().unwrap();
*guard = Some(watcher.status().state);
*guard = Some(watcher.status()?.state);
}
info!("Watcher instance fully stopped.");
Ok(())

View File

@@ -129,7 +129,7 @@ fn test_basic_watch_functionality() {
let finished_watcher = watcher_thread.join().expect("Watcher thread panicked");
// Check status after processing events
let status = finished_watcher.status();
let status = finished_watcher.status().unwrap();
// Assertions
assert_eq!(status.state, WatcherState::Stopped);
@@ -190,7 +190,7 @@ fn test_debouncing() {
// Complete the test
let finished_watcher = watcher_thread.join().expect("Watcher thread panicked");
let status = finished_watcher.status();
let status = finished_watcher.status().unwrap();
// We should have processed fewer events than modifications made
// due to debouncing (exact count depends on implementation details)
@@ -248,7 +248,7 @@ fn test_event_flood() {
// Complete the test
let finished_watcher = watcher_thread.join().expect("Watcher thread panicked");
let status = finished_watcher.status();
let status = finished_watcher.status().unwrap();
// Verify processing occurred
assert!(status.events_processed > 0, "Expected events to be processed");
@@ -355,7 +355,7 @@ fn test_graceful_shutdown() {
"Shutdown took too long");
// Verify final state
let status = watcher.status();
let status = watcher.status().unwrap();
assert_eq!(status.state, WatcherState::Stopped);
assert_eq!(status.queue_size, 0, "Queue should be empty after shutdown");

View File

@@ -199,7 +199,7 @@ impl Marlin {
let watcher_db = Arc::new(Mutex::new(db::Database::new(new_conn)));
let mut owned_w = watcher::FileWatcher::new(vec![p], cfg)?;
owned_w.with_database(watcher_db); // Modifies owned_w in place
owned_w.with_database(watcher_db)?; // Modifies owned_w in place
owned_w.start()?; // Start the watcher after it has been fully configured
Ok(owned_w) // Return the owned FileWatcher

View File

@@ -367,7 +367,13 @@ impl FileWatcher {
let mut debouncer = EventDebouncer::new(config_clone.debounce_ms);
while !stop_flag_clone.load(Ordering::Relaxed) {
let current_state = { state_clone.lock().unwrap().clone() };
let current_state = match state_clone.lock() {
Ok(g) => g.clone(),
Err(_) => {
eprintln!("state mutex poisoned");
break;
}
};
if current_state == WatcherState::Paused {
thread::sleep(Duration::from_millis(100));
@@ -414,14 +420,23 @@ impl FileWatcher {
let num_evts = evts_to_process.len();
events_processed_clone.fetch_add(num_evts, Ordering::SeqCst);
let db_guard_option = db_captured_for_thread.lock().unwrap();
let db_guard_option = match db_captured_for_thread.lock() {
Ok(g) => g,
Err(_) => {
eprintln!("db_shared mutex poisoned");
break;
}
};
if let Some(db_mutex) = &*db_guard_option {
let mut _db_instance_guard = db_mutex.lock().unwrap();
for event_item in &evts_to_process {
println!(
"Processing event (DB available): {:?} for path {:?}",
event_item.kind, event_item.path
);
if let Ok(mut _db_instance_guard) = db_mutex.lock() {
for event_item in &evts_to_process {
println!(
"Processing event (DB available): {:?} for path {:?}",
event_item.kind, event_item.path
);
}
} else {
eprintln!("db mutex poisoned");
}
} else {
for event_item in &evts_to_process {
@@ -445,8 +460,11 @@ impl FileWatcher {
);
}
}
let mut final_state_guard = state_clone.lock().unwrap();
*final_state_guard = WatcherState::Stopped;
if let Ok(mut final_state_guard) = state_clone.lock() {
*final_state_guard = WatcherState::Stopped;
} else {
eprintln!("state mutex poisoned on shutdown");
}
});
Ok(Self {
@@ -464,16 +482,22 @@ impl FileWatcher {
})
}
pub fn with_database(&mut self, db_arc: Arc<Mutex<Database>>) -> &mut Self {
{
let mut shared_db_guard = self.db_shared.lock().unwrap();
pub fn with_database(&mut self, db_arc: Arc<Mutex<Database>>) -> Result<&mut Self> {
{
let mut shared_db_guard = self
.db_shared
.lock()
.map_err(|_| anyhow::anyhow!("db_shared mutex poisoned"))?;
*shared_db_guard = Some(db_arc);
}
self
}
Ok(self)
}
pub fn start(&mut self) -> Result<()> {
let mut state_guard = self.state.lock().unwrap();
let mut state_guard = self
.state
.lock()
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
if *state_guard == WatcherState::Watching || self.processor_thread.is_none() {
if self.processor_thread.is_none() {
return Err(anyhow::anyhow!("Watcher thread not available to start."));
@@ -492,7 +516,10 @@ impl FileWatcher {
}
pub fn pause(&mut self) -> Result<()> {
let mut state_guard = self.state.lock().unwrap();
let mut state_guard = self
.state
.lock()
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
match *state_guard {
WatcherState::Watching => {
*state_guard = WatcherState::Paused;
@@ -504,7 +531,10 @@ impl FileWatcher {
}
pub fn resume(&mut self) -> Result<()> {
let mut state_guard = self.state.lock().unwrap();
let mut state_guard = self
.state
.lock()
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
match *state_guard {
WatcherState::Paused => {
*state_guard = WatcherState::Watching;
@@ -516,7 +546,10 @@ impl FileWatcher {
}
pub fn stop(&mut self) -> Result<()> {
let mut current_state_guard = self.state.lock().unwrap();
let mut current_state_guard = self
.state
.lock()
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
if *current_state_guard == WatcherState::Stopped || *current_state_guard == WatcherState::ShuttingDown {
return Ok(());
}
@@ -534,20 +567,27 @@ impl FileWatcher {
}
}
let mut final_state_guard = self.state.lock().unwrap();
let mut final_state_guard = self
.state
.lock()
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
*final_state_guard = WatcherState::Stopped;
Ok(())
}
pub fn status(&self) -> WatcherStatus {
let state_guard = self.state.lock().unwrap().clone();
WatcherStatus {
pub fn status(&self) -> Result<WatcherStatus> {
let state_guard = self
.state
.lock()
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?
.clone();
Ok(WatcherStatus {
state: state_guard,
events_processed: self.events_processed.load(Ordering::SeqCst),
queue_size: self.queue_size.load(Ordering::SeqCst),
start_time: Some(self.start_time),
watched_paths: self.watched_paths.clone(),
}
})
}
}
@@ -576,28 +616,28 @@ mod file_watcher_state_tests {
let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher");
assert_eq!(watcher.status().state, WatcherState::Initializing);
assert_eq!(watcher.status().unwrap().state, WatcherState::Initializing);
watcher.start().expect("Start failed");
assert_eq!(watcher.status().state, WatcherState::Watching);
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
watcher.pause().expect("Pause failed");
assert_eq!(watcher.status().state, WatcherState::Paused);
assert_eq!(watcher.status().unwrap().state, WatcherState::Paused);
watcher.pause().expect("Second pause failed");
assert_eq!(watcher.status().state, WatcherState::Paused);
assert_eq!(watcher.status().unwrap().state, WatcherState::Paused);
watcher.resume().expect("Resume failed");
assert_eq!(watcher.status().state, WatcherState::Watching);
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
watcher.resume().expect("Second resume failed");
assert_eq!(watcher.status().state, WatcherState::Watching);
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
watcher.stop().expect("Stop failed");
assert_eq!(watcher.status().state, WatcherState::Stopped);
assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped);
watcher.stop().expect("Second stop failed");
assert_eq!(watcher.status().state, WatcherState::Stopped);
assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped);
}
#[test]
@@ -607,14 +647,20 @@ mod file_watcher_state_tests {
let mut watcher = FileWatcher::new(vec![tmp_dir.path().to_path_buf()], WatcherConfig::default()).unwrap();
{
let mut state_guard = watcher.state.lock().unwrap();
*state_guard = WatcherState::Watching;
let mut state_guard = watcher
.state
.lock()
.expect("state mutex poisoned");
*state_guard = WatcherState::Watching;
}
assert!(watcher.start().is_ok(), "Should be able to call start when already Watching (idempotent state change)");
assert_eq!(watcher.status().state, WatcherState::Watching);
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
{
let mut state_guard = watcher.state.lock().unwrap();
let mut state_guard = watcher
.state
.lock()
.expect("state mutex poisoned");
*state_guard = WatcherState::ShuttingDown;
}
assert!(watcher.start().is_err(), "Should not be able to start from ShuttingDown");
@@ -634,10 +680,34 @@ mod file_watcher_state_tests {
#[test]
fn test_watcher_default_config() {
let config = WatcherConfig::default();
let config = WatcherConfig::default();
assert_eq!(config.debounce_ms, 100);
assert_eq!(config.batch_size, 1000);
assert_eq!(config.max_queue_size, 100_000);
assert_eq!(config.drain_timeout_ms, 5000);
}
#[test]
fn test_poisoned_state_mutex_errors() {
let tmp_dir = tempdir().unwrap();
let watch_path = tmp_dir.path().to_path_buf();
FsMod::create_dir_all(&watch_path).expect("Failed to create temp dir for watching");
let config = WatcherConfig::default();
let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher");
let state_arc = watcher.state.clone();
let _ = std::thread::spawn(move || {
let _guard = state_arc.lock().unwrap();
panic!("poison");
})
.join();
assert!(watcher.start().is_err());
assert!(watcher.pause().is_err());
assert!(watcher.resume().is_err());
assert!(watcher.stop().is_err());
assert!(watcher.status().is_err());
}
}

View File

@@ -40,7 +40,7 @@ mod tests {
.expect("Failed to create watcher");
watcher.start().expect("Failed to start watcher");
assert_eq!(watcher.status().state, WatcherState::Watching);
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
thread::sleep(Duration::from_millis(200));
let new_file_path = temp_path.join("new_file.txt");
@@ -63,8 +63,8 @@ mod tests {
thread::sleep(Duration::from_millis(500));
watcher.stop().expect("Failed to stop watcher");
assert_eq!(watcher.status().state, WatcherState::Stopped);
assert!(watcher.status().events_processed > 0, "Expected some file events to be processed");
assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped);
assert!(watcher.status().unwrap().events_processed > 0, "Expected some file events to be processed");
}
#[test]