mirror of
https://github.com/PR0M3TH3AN/Marlin.git
synced 2025-09-08 07:08:44 +00:00
Refactor watcher mutex handling and add tests
This commit is contained in:
@@ -56,8 +56,8 @@ pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Re
|
||||
info!("Starting watcher for directory: {}", canon_path.display());
|
||||
|
||||
let mut watcher = marlin.watch(&canon_path, Some(config))?;
|
||||
|
||||
let status = watcher.status();
|
||||
|
||||
let status = watcher.status()?;
|
||||
info!("Watcher started. Press Ctrl+C to stop watching.");
|
||||
info!("Watching {} paths", status.watched_paths.len());
|
||||
|
||||
@@ -73,7 +73,7 @@ pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Re
|
||||
|
||||
info!("Watcher run loop started. Waiting for Ctrl+C or stop signal...");
|
||||
while running.load(Ordering::SeqCst) {
|
||||
let current_status = watcher.status();
|
||||
let current_status = watcher.status()?;
|
||||
if current_status.state == WatcherState::Stopped {
|
||||
info!("Watcher has stopped (detected by state). Exiting loop.");
|
||||
break;
|
||||
@@ -98,7 +98,7 @@ pub fn run(cmd: &WatchCmd, _conn: &mut Connection, _format: super::Format) -> Re
|
||||
watcher.stop()?;
|
||||
{
|
||||
let mut guard = LAST_WATCHER_STATE.lock().unwrap();
|
||||
*guard = Some(watcher.status().state);
|
||||
*guard = Some(watcher.status()?.state);
|
||||
}
|
||||
info!("Watcher instance fully stopped.");
|
||||
Ok(())
|
||||
|
@@ -129,7 +129,7 @@ fn test_basic_watch_functionality() {
|
||||
let finished_watcher = watcher_thread.join().expect("Watcher thread panicked");
|
||||
|
||||
// Check status after processing events
|
||||
let status = finished_watcher.status();
|
||||
let status = finished_watcher.status().unwrap();
|
||||
|
||||
// Assertions
|
||||
assert_eq!(status.state, WatcherState::Stopped);
|
||||
@@ -190,7 +190,7 @@ fn test_debouncing() {
|
||||
|
||||
// Complete the test
|
||||
let finished_watcher = watcher_thread.join().expect("Watcher thread panicked");
|
||||
let status = finished_watcher.status();
|
||||
let status = finished_watcher.status().unwrap();
|
||||
|
||||
// We should have processed fewer events than modifications made
|
||||
// due to debouncing (exact count depends on implementation details)
|
||||
@@ -248,7 +248,7 @@ fn test_event_flood() {
|
||||
|
||||
// Complete the test
|
||||
let finished_watcher = watcher_thread.join().expect("Watcher thread panicked");
|
||||
let status = finished_watcher.status();
|
||||
let status = finished_watcher.status().unwrap();
|
||||
|
||||
// Verify processing occurred
|
||||
assert!(status.events_processed > 0, "Expected events to be processed");
|
||||
@@ -355,7 +355,7 @@ fn test_graceful_shutdown() {
|
||||
"Shutdown took too long");
|
||||
|
||||
// Verify final state
|
||||
let status = watcher.status();
|
||||
let status = watcher.status().unwrap();
|
||||
assert_eq!(status.state, WatcherState::Stopped);
|
||||
assert_eq!(status.queue_size, 0, "Queue should be empty after shutdown");
|
||||
|
||||
|
@@ -199,7 +199,7 @@ impl Marlin {
|
||||
let watcher_db = Arc::new(Mutex::new(db::Database::new(new_conn)));
|
||||
|
||||
let mut owned_w = watcher::FileWatcher::new(vec![p], cfg)?;
|
||||
owned_w.with_database(watcher_db); // Modifies owned_w in place
|
||||
owned_w.with_database(watcher_db)?; // Modifies owned_w in place
|
||||
owned_w.start()?; // Start the watcher after it has been fully configured
|
||||
|
||||
Ok(owned_w) // Return the owned FileWatcher
|
||||
|
@@ -367,7 +367,13 @@ impl FileWatcher {
|
||||
let mut debouncer = EventDebouncer::new(config_clone.debounce_ms);
|
||||
|
||||
while !stop_flag_clone.load(Ordering::Relaxed) {
|
||||
let current_state = { state_clone.lock().unwrap().clone() };
|
||||
let current_state = match state_clone.lock() {
|
||||
Ok(g) => g.clone(),
|
||||
Err(_) => {
|
||||
eprintln!("state mutex poisoned");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if current_state == WatcherState::Paused {
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
@@ -414,14 +420,23 @@ impl FileWatcher {
|
||||
let num_evts = evts_to_process.len();
|
||||
events_processed_clone.fetch_add(num_evts, Ordering::SeqCst);
|
||||
|
||||
let db_guard_option = db_captured_for_thread.lock().unwrap();
|
||||
let db_guard_option = match db_captured_for_thread.lock() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
eprintln!("db_shared mutex poisoned");
|
||||
break;
|
||||
}
|
||||
};
|
||||
if let Some(db_mutex) = &*db_guard_option {
|
||||
let mut _db_instance_guard = db_mutex.lock().unwrap();
|
||||
for event_item in &evts_to_process {
|
||||
println!(
|
||||
"Processing event (DB available): {:?} for path {:?}",
|
||||
event_item.kind, event_item.path
|
||||
);
|
||||
if let Ok(mut _db_instance_guard) = db_mutex.lock() {
|
||||
for event_item in &evts_to_process {
|
||||
println!(
|
||||
"Processing event (DB available): {:?} for path {:?}",
|
||||
event_item.kind, event_item.path
|
||||
);
|
||||
}
|
||||
} else {
|
||||
eprintln!("db mutex poisoned");
|
||||
}
|
||||
} else {
|
||||
for event_item in &evts_to_process {
|
||||
@@ -445,8 +460,11 @@ impl FileWatcher {
|
||||
);
|
||||
}
|
||||
}
|
||||
let mut final_state_guard = state_clone.lock().unwrap();
|
||||
*final_state_guard = WatcherState::Stopped;
|
||||
if let Ok(mut final_state_guard) = state_clone.lock() {
|
||||
*final_state_guard = WatcherState::Stopped;
|
||||
} else {
|
||||
eprintln!("state mutex poisoned on shutdown");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
@@ -464,16 +482,22 @@ impl FileWatcher {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn with_database(&mut self, db_arc: Arc<Mutex<Database>>) -> &mut Self {
|
||||
{
|
||||
let mut shared_db_guard = self.db_shared.lock().unwrap();
|
||||
pub fn with_database(&mut self, db_arc: Arc<Mutex<Database>>) -> Result<&mut Self> {
|
||||
{
|
||||
let mut shared_db_guard = self
|
||||
.db_shared
|
||||
.lock()
|
||||
.map_err(|_| anyhow::anyhow!("db_shared mutex poisoned"))?;
|
||||
*shared_db_guard = Some(db_arc);
|
||||
}
|
||||
self
|
||||
}
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn start(&mut self) -> Result<()> {
|
||||
let mut state_guard = self.state.lock().unwrap();
|
||||
let mut state_guard = self
|
||||
.state
|
||||
.lock()
|
||||
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
|
||||
if *state_guard == WatcherState::Watching || self.processor_thread.is_none() {
|
||||
if self.processor_thread.is_none() {
|
||||
return Err(anyhow::anyhow!("Watcher thread not available to start."));
|
||||
@@ -492,7 +516,10 @@ impl FileWatcher {
|
||||
}
|
||||
|
||||
pub fn pause(&mut self) -> Result<()> {
|
||||
let mut state_guard = self.state.lock().unwrap();
|
||||
let mut state_guard = self
|
||||
.state
|
||||
.lock()
|
||||
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
|
||||
match *state_guard {
|
||||
WatcherState::Watching => {
|
||||
*state_guard = WatcherState::Paused;
|
||||
@@ -504,7 +531,10 @@ impl FileWatcher {
|
||||
}
|
||||
|
||||
pub fn resume(&mut self) -> Result<()> {
|
||||
let mut state_guard = self.state.lock().unwrap();
|
||||
let mut state_guard = self
|
||||
.state
|
||||
.lock()
|
||||
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
|
||||
match *state_guard {
|
||||
WatcherState::Paused => {
|
||||
*state_guard = WatcherState::Watching;
|
||||
@@ -516,7 +546,10 @@ impl FileWatcher {
|
||||
}
|
||||
|
||||
pub fn stop(&mut self) -> Result<()> {
|
||||
let mut current_state_guard = self.state.lock().unwrap();
|
||||
let mut current_state_guard = self
|
||||
.state
|
||||
.lock()
|
||||
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
|
||||
if *current_state_guard == WatcherState::Stopped || *current_state_guard == WatcherState::ShuttingDown {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -534,20 +567,27 @@ impl FileWatcher {
|
||||
}
|
||||
}
|
||||
|
||||
let mut final_state_guard = self.state.lock().unwrap();
|
||||
let mut final_state_guard = self
|
||||
.state
|
||||
.lock()
|
||||
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
|
||||
*final_state_guard = WatcherState::Stopped;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn status(&self) -> WatcherStatus {
|
||||
let state_guard = self.state.lock().unwrap().clone();
|
||||
WatcherStatus {
|
||||
pub fn status(&self) -> Result<WatcherStatus> {
|
||||
let state_guard = self
|
||||
.state
|
||||
.lock()
|
||||
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?
|
||||
.clone();
|
||||
Ok(WatcherStatus {
|
||||
state: state_guard,
|
||||
events_processed: self.events_processed.load(Ordering::SeqCst),
|
||||
queue_size: self.queue_size.load(Ordering::SeqCst),
|
||||
start_time: Some(self.start_time),
|
||||
watched_paths: self.watched_paths.clone(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -576,28 +616,28 @@ mod file_watcher_state_tests {
|
||||
|
||||
let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher");
|
||||
|
||||
assert_eq!(watcher.status().state, WatcherState::Initializing);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Initializing);
|
||||
|
||||
watcher.start().expect("Start failed");
|
||||
assert_eq!(watcher.status().state, WatcherState::Watching);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
|
||||
|
||||
watcher.pause().expect("Pause failed");
|
||||
assert_eq!(watcher.status().state, WatcherState::Paused);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Paused);
|
||||
|
||||
watcher.pause().expect("Second pause failed");
|
||||
assert_eq!(watcher.status().state, WatcherState::Paused);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Paused);
|
||||
|
||||
watcher.resume().expect("Resume failed");
|
||||
assert_eq!(watcher.status().state, WatcherState::Watching);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
|
||||
|
||||
watcher.resume().expect("Second resume failed");
|
||||
assert_eq!(watcher.status().state, WatcherState::Watching);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
|
||||
|
||||
watcher.stop().expect("Stop failed");
|
||||
assert_eq!(watcher.status().state, WatcherState::Stopped);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped);
|
||||
|
||||
watcher.stop().expect("Second stop failed");
|
||||
assert_eq!(watcher.status().state, WatcherState::Stopped);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -607,14 +647,20 @@ mod file_watcher_state_tests {
|
||||
let mut watcher = FileWatcher::new(vec![tmp_dir.path().to_path_buf()], WatcherConfig::default()).unwrap();
|
||||
|
||||
{
|
||||
let mut state_guard = watcher.state.lock().unwrap();
|
||||
*state_guard = WatcherState::Watching;
|
||||
let mut state_guard = watcher
|
||||
.state
|
||||
.lock()
|
||||
.expect("state mutex poisoned");
|
||||
*state_guard = WatcherState::Watching;
|
||||
}
|
||||
assert!(watcher.start().is_ok(), "Should be able to call start when already Watching (idempotent state change)");
|
||||
assert_eq!(watcher.status().state, WatcherState::Watching);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
|
||||
|
||||
{
|
||||
let mut state_guard = watcher.state.lock().unwrap();
|
||||
let mut state_guard = watcher
|
||||
.state
|
||||
.lock()
|
||||
.expect("state mutex poisoned");
|
||||
*state_guard = WatcherState::ShuttingDown;
|
||||
}
|
||||
assert!(watcher.start().is_err(), "Should not be able to start from ShuttingDown");
|
||||
@@ -634,10 +680,34 @@ mod file_watcher_state_tests {
|
||||
|
||||
#[test]
|
||||
fn test_watcher_default_config() {
|
||||
let config = WatcherConfig::default();
|
||||
let config = WatcherConfig::default();
|
||||
assert_eq!(config.debounce_ms, 100);
|
||||
assert_eq!(config.batch_size, 1000);
|
||||
assert_eq!(config.max_queue_size, 100_000);
|
||||
assert_eq!(config.drain_timeout_ms, 5000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poisoned_state_mutex_errors() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let watch_path = tmp_dir.path().to_path_buf();
|
||||
FsMod::create_dir_all(&watch_path).expect("Failed to create temp dir for watching");
|
||||
|
||||
let config = WatcherConfig::default();
|
||||
|
||||
let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher");
|
||||
|
||||
let state_arc = watcher.state.clone();
|
||||
let _ = std::thread::spawn(move || {
|
||||
let _guard = state_arc.lock().unwrap();
|
||||
panic!("poison");
|
||||
})
|
||||
.join();
|
||||
|
||||
assert!(watcher.start().is_err());
|
||||
assert!(watcher.pause().is_err());
|
||||
assert!(watcher.resume().is_err());
|
||||
assert!(watcher.stop().is_err());
|
||||
assert!(watcher.status().is_err());
|
||||
}
|
||||
}
|
@@ -40,7 +40,7 @@ mod tests {
|
||||
.expect("Failed to create watcher");
|
||||
|
||||
watcher.start().expect("Failed to start watcher");
|
||||
assert_eq!(watcher.status().state, WatcherState::Watching);
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
|
||||
|
||||
thread::sleep(Duration::from_millis(200));
|
||||
let new_file_path = temp_path.join("new_file.txt");
|
||||
@@ -63,8 +63,8 @@ mod tests {
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
watcher.stop().expect("Failed to stop watcher");
|
||||
|
||||
assert_eq!(watcher.status().state, WatcherState::Stopped);
|
||||
assert!(watcher.status().events_processed > 0, "Expected some file events to be processed");
|
||||
assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped);
|
||||
assert!(watcher.status().unwrap().events_processed > 0, "Expected some file events to be processed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
Reference in New Issue
Block a user