Format codebase with rustfmt

This commit is contained in:
thePR0M3TH3AN
2025-05-21 11:24:49 -04:00
parent 9360efee2a
commit 567f1cd1a5
33 changed files with 780 additions and 481 deletions

View File

@@ -6,8 +6,8 @@
//! (create, modify, delete) using the `notify` crate. It implements event debouncing,
//! batch processing, and a state machine for robust lifecycle management.
use anyhow::{Result, Context};
use crate::db::Database;
use anyhow::{Context, Result};
use crossbeam_channel::{bounded, Receiver};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait};
use std::collections::HashMap;
@@ -98,9 +98,11 @@ impl EventDebouncer {
fn add_event(&mut self, event: ProcessedEvent) {
let path = event.path.clone();
if path.is_dir() { // This relies on the PathBuf itself knowing if it's a directory
// or on the underlying FS. For unit tests, ensure paths are created.
self.events.retain(|file_path, _| !file_path.starts_with(&path) || file_path == &path );
if path.is_dir() {
// This relies on the PathBuf itself knowing if it's a directory
// or on the underlying FS. For unit tests, ensure paths are created.
self.events
.retain(|file_path, _| !file_path.starts_with(&path) || file_path == &path);
}
match self.events.get_mut(&path) {
Some(existing) => {
@@ -137,12 +139,12 @@ mod event_debouncer_tests {
use super::*;
use notify::event::{CreateKind, DataChange, ModifyKind, RemoveKind, RenameMode};
use std::fs; // fs is needed for these tests to create dirs/files
use tempfile;
use tempfile;
#[test]
fn debouncer_add_and_flush() {
let mut debouncer = EventDebouncer::new(100);
std::thread::sleep(Duration::from_millis(110));
std::thread::sleep(Duration::from_millis(110));
assert!(debouncer.is_ready_to_flush());
assert_eq!(debouncer.len(), 0);
@@ -154,8 +156,8 @@ mod event_debouncer_tests {
timestamp: Instant::now(),
});
assert_eq!(debouncer.len(), 1);
debouncer.last_flush = Instant::now();
debouncer.last_flush = Instant::now();
assert!(!debouncer.is_ready_to_flush());
std::thread::sleep(Duration::from_millis(110));
@@ -165,7 +167,7 @@ mod event_debouncer_tests {
assert_eq!(flushed.len(), 1);
assert_eq!(flushed[0].path, path1);
assert_eq!(debouncer.len(), 0);
assert!(!debouncer.is_ready_to_flush());
assert!(!debouncer.is_ready_to_flush());
}
#[test]
@@ -188,15 +190,15 @@ mod event_debouncer_tests {
priority: EventPriority::Modify,
timestamp: t2,
});
assert_eq!(debouncer.len(), 1);
std::thread::sleep(Duration::from_millis(110));
let flushed = debouncer.flush();
assert_eq!(flushed.len(), 1);
assert_eq!(flushed[0].path, path1);
assert_eq!(flushed[0].priority, EventPriority::Create);
assert_eq!(
assert_eq!(flushed[0].priority, EventPriority::Create);
assert_eq!(
flushed[0].kind,
EventKind::Modify(ModifyKind::Data(DataChange::Any))
);
@@ -207,9 +209,9 @@ mod event_debouncer_tests {
fn debouncer_hierarchical() {
let mut debouncer_h = EventDebouncer::new(100);
let temp_dir_obj = tempfile::tempdir().expect("Failed to create temp dir");
let p_dir = temp_dir_obj.path().to_path_buf();
let p_dir = temp_dir_obj.path().to_path_buf();
let p_file = p_dir.join("file.txt");
fs::File::create(&p_file).expect("Failed to create test file for hierarchical debounce");
debouncer_h.add_event(ProcessedEvent {
@@ -219,15 +221,19 @@ mod event_debouncer_tests {
timestamp: Instant::now(),
});
assert_eq!(debouncer_h.len(), 1);
debouncer_h.add_event(ProcessedEvent {
path: p_dir.clone(),
kind: EventKind::Remove(RemoveKind::Folder),
path: p_dir.clone(),
kind: EventKind::Remove(RemoveKind::Folder),
priority: EventPriority::Delete,
timestamp: Instant::now(),
});
assert_eq!(debouncer_h.len(), 1, "Hierarchical debounce should remove child event, leaving only parent dir event");
assert_eq!(
debouncer_h.len(),
1,
"Hierarchical debounce should remove child event, leaving only parent dir event"
);
std::thread::sleep(Duration::from_millis(110));
let flushed = debouncer_h.flush();
assert_eq!(flushed.len(), 1);
@@ -261,20 +267,35 @@ mod event_debouncer_tests {
#[test]
fn debouncer_priority_sorting_on_flush() {
let mut debouncer = EventDebouncer::new(100);
let path1 = PathBuf::from("file1.txt");
let path2 = PathBuf::from("file2.txt");
let path3 = PathBuf::from("file3.txt");
let path1 = PathBuf::from("file1.txt");
let path2 = PathBuf::from("file2.txt");
let path3 = PathBuf::from("file3.txt");
debouncer.add_event(ProcessedEvent {
path: path1,
kind: EventKind::Modify(ModifyKind::Name(RenameMode::To)),
priority: EventPriority::Modify,
timestamp: Instant::now(),
});
debouncer.add_event(ProcessedEvent {
path: path2,
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
});
debouncer.add_event(ProcessedEvent {
path: path3,
kind: EventKind::Remove(RemoveKind::File),
priority: EventPriority::Delete,
timestamp: Instant::now(),
});
debouncer.add_event(ProcessedEvent { path: path1, kind: EventKind::Modify(ModifyKind::Name(RenameMode::To)), priority: EventPriority::Modify, timestamp: Instant::now() });
debouncer.add_event(ProcessedEvent { path: path2, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now() });
debouncer.add_event(ProcessedEvent { path: path3, kind: EventKind::Remove(RemoveKind::File), priority: EventPriority::Delete, timestamp: Instant::now() });
std::thread::sleep(Duration::from_millis(110));
let flushed = debouncer.flush();
assert_eq!(flushed.len(), 3);
assert_eq!(flushed[0].priority, EventPriority::Create);
assert_eq!(flushed[1].priority, EventPriority::Delete);
assert_eq!(flushed[2].priority, EventPriority::Modify);
assert_eq!(flushed[0].priority, EventPriority::Create);
assert_eq!(flushed[1].priority, EventPriority::Delete);
assert_eq!(flushed[2].priority, EventPriority::Modify);
}
#[test]
@@ -314,7 +335,6 @@ mod event_debouncer_tests {
}
}
pub struct FileWatcher {
state: Arc<Mutex<WatcherState>>,
_config: WatcherConfig,
@@ -359,7 +379,7 @@ impl FileWatcher {
let events_processed_clone = events_processed.clone();
let queue_size_clone = queue_size.clone();
let state_clone = state.clone();
let receiver_clone = rx.clone();
let receiver_clone = rx.clone();
let db_shared_for_thread = Arc::new(Mutex::new(None::<Arc<Mutex<Database>>>));
let db_captured_for_thread = db_shared_for_thread.clone();
@@ -367,7 +387,7 @@ impl FileWatcher {
let processor_thread = thread::spawn(move || {
let mut debouncer = EventDebouncer::new(config_clone.debounce_ms);
while !stop_flag_clone.load(Ordering::Relaxed) {
while !stop_flag_clone.load(Ordering::Relaxed) {
let current_state = match state_clone.lock() {
Ok(g) => g.clone(),
Err(_) => {
@@ -380,13 +400,15 @@ impl FileWatcher {
thread::sleep(Duration::from_millis(100));
continue;
}
if current_state == WatcherState::ShuttingDown || current_state == WatcherState::Stopped {
if current_state == WatcherState::ShuttingDown
|| current_state == WatcherState::Stopped
{
break;
}
let mut received_in_batch = 0;
while let Ok(evt_res) = receiver_clone.try_recv() {
received_in_batch +=1;
received_in_batch += 1;
match evt_res {
Ok(event) => {
for path in event.paths {
@@ -431,7 +453,7 @@ impl FileWatcher {
if let Some(db_mutex) = &*db_guard_option {
if let Ok(mut _db_instance_guard) = db_mutex.lock() {
for event_item in &evts_to_process {
info!(
info!(
"Processing event (DB available): {:?} for path {:?}",
event_item.kind, event_item.path
);
@@ -441,7 +463,7 @@ impl FileWatcher {
}
} else {
for event_item in &evts_to_process {
info!(
info!(
"Processing event (no DB): {:?} for path {:?}",
event_item.kind, event_item.path
);
@@ -504,12 +526,18 @@ impl FileWatcher {
return Err(anyhow::anyhow!("Watcher thread not available to start."));
}
if *state_guard == WatcherState::Initializing {
*state_guard = WatcherState::Watching;
*state_guard = WatcherState::Watching;
}
return Ok(());
}
if *state_guard != WatcherState::Initializing && *state_guard != WatcherState::Stopped && *state_guard != WatcherState::Paused {
return Err(anyhow::anyhow!(format!("Cannot start watcher from state {:?}", *state_guard)));
if *state_guard != WatcherState::Initializing
&& *state_guard != WatcherState::Stopped
&& *state_guard != WatcherState::Paused
{
return Err(anyhow::anyhow!(format!(
"Cannot start watcher from state {:?}",
*state_guard
)));
}
*state_guard = WatcherState::Watching;
@@ -526,8 +554,11 @@ impl FileWatcher {
*state_guard = WatcherState::Paused;
Ok(())
}
WatcherState::Paused => Ok(()),
_ => Err(anyhow::anyhow!(format!("Watcher not in watching state to pause (current: {:?})", *state_guard))),
WatcherState::Paused => Ok(()),
_ => Err(anyhow::anyhow!(format!(
"Watcher not in watching state to pause (current: {:?})",
*state_guard
))),
}
}
@@ -541,8 +572,11 @@ impl FileWatcher {
*state_guard = WatcherState::Watching;
Ok(())
}
WatcherState::Watching => Ok(()),
_ => Err(anyhow::anyhow!(format!("Watcher not in paused state to resume (current: {:?})", *state_guard))),
WatcherState::Watching => Ok(()),
_ => Err(anyhow::anyhow!(format!(
"Watcher not in paused state to resume (current: {:?})",
*state_guard
))),
}
}
@@ -551,7 +585,9 @@ impl FileWatcher {
.state
.lock()
.map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
if *current_state_guard == WatcherState::Stopped || *current_state_guard == WatcherState::ShuttingDown {
if *current_state_guard == WatcherState::Stopped
|| *current_state_guard == WatcherState::ShuttingDown
{
return Ok(());
}
*current_state_guard = WatcherState::ShuttingDown;
@@ -567,7 +603,7 @@ impl FileWatcher {
}
}
}
let mut final_state_guard = self
.state
.lock()
@@ -600,12 +636,11 @@ impl Drop for FileWatcher {
}
}
#[cfg(test)]
mod file_watcher_state_tests {
mod file_watcher_state_tests {
use super::*;
use tempfile::tempdir;
use std::fs as FsMod; // Alias to avoid conflict with local `fs` module name if any
use std::fs as FsMod;
use tempfile::tempdir; // Alias to avoid conflict with local `fs` module name if any
#[test]
fn test_watcher_pause_resume_stop() {
@@ -615,7 +650,8 @@ mod file_watcher_state_tests {
let config = WatcherConfig::default();
let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher");
let mut watcher =
FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher");
assert_eq!(watcher.status().unwrap().state, WatcherState::Initializing);
@@ -630,7 +666,7 @@ mod file_watcher_state_tests {
watcher.resume().expect("Resume failed");
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
watcher.resume().expect("Second resume failed");
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
@@ -645,37 +681,43 @@ mod file_watcher_state_tests {
fn test_watcher_start_errors() {
let tmp_dir = tempdir().unwrap();
FsMod::create_dir_all(tmp_dir.path()).expect("Failed to create temp dir for watching");
let mut watcher = FileWatcher::new(vec![tmp_dir.path().to_path_buf()], WatcherConfig::default()).unwrap();
let mut watcher =
FileWatcher::new(vec![tmp_dir.path().to_path_buf()], WatcherConfig::default()).unwrap();
{
let mut state_guard = watcher
.state
.lock()
.expect("state mutex poisoned");
let mut state_guard = watcher.state.lock().expect("state mutex poisoned");
*state_guard = WatcherState::Watching;
}
assert!(watcher.start().is_ok(), "Should be able to call start when already Watching (idempotent state change)");
assert!(
watcher.start().is_ok(),
"Should be able to call start when already Watching (idempotent state change)"
);
assert_eq!(watcher.status().unwrap().state, WatcherState::Watching);
{
let mut state_guard = watcher
.state
.lock()
.expect("state mutex poisoned");
let mut state_guard = watcher.state.lock().expect("state mutex poisoned");
*state_guard = WatcherState::ShuttingDown;
}
assert!(watcher.start().is_err(), "Should not be able to start from ShuttingDown");
assert!(
watcher.start().is_err(),
"Should not be able to start from ShuttingDown"
);
}
#[test]
#[test]
fn test_new_watcher_with_nonexistent_path() {
let non_existent_path = PathBuf::from("/path/that/REALLY/does/not/exist/for/sure/and/cannot/be/created");
let non_existent_path =
PathBuf::from("/path/that/REALLY/does/not/exist/for/sure/and/cannot/be/created");
let config = WatcherConfig::default();
let watcher_result = FileWatcher::new(vec![non_existent_path], config);
assert!(watcher_result.is_err());
if let Err(e) = watcher_result {
let err_string = e.to_string();
assert!(err_string.contains("Failed to watch path") || err_string.contains("os error 2"), "Error was: {}", err_string);
assert!(
err_string.contains("Failed to watch path") || err_string.contains("os error 2"),
"Error was: {}",
err_string
);
}
}
@@ -696,7 +738,8 @@ mod file_watcher_state_tests {
let config = WatcherConfig::default();
let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher");
let mut watcher =
FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher");
let state_arc = watcher.state.clone();
let _ = std::thread::spawn(move || {