Improve performance metrics and enhance backup error handling

- Update benchmark results for `full-scan` and `dirty-scan` in `bench/dirty-vs-full.md` to reflect improved performance.
- Refactor error handling in `libmarlin/src/backup.rs` to provide clearer messages when the live database path is missing or invalid.
- Clean up code in `libmarlin/src/backup.rs` for better readability and maintainability.
- Minor adjustments in documentation and test files for consistency.
This commit is contained in:
thePR0M3TH3AN
2025-05-19 22:13:25 -04:00
parent 2f97bd8c3f
commit 9c325366f9
7 changed files with 750 additions and 233 deletions

View File

@@ -1,19 +1,22 @@
// libmarlin/src/watcher.rs
//! File system watcher implementation for Marlin
//!
//! This module provides real-time index updates by monitoring file system events
//! (create, modify, delete) using the `notify` crate. It implements event debouncing,
//! batch processing, and a state machine for robust lifecycle management.
use anyhow::Result;
use anyhow::{Result, Context};
use crate::db::Database;
use crossbeam_channel::{bounded, Receiver};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
// REMOVED: use std::fs; // <<<<<<<<<<<< THIS LINE WAS REMOVED
/// Configuration for the file watcher
#[derive(Debug, Clone)]
@@ -45,87 +48,46 @@ impl Default for WatcherConfig {
/// State of the file watcher
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WatcherState {
/// The watcher is initializing
Initializing,
/// The watcher is actively monitoring file system events
Watching,
/// The watcher is paused (receiving but not processing events)
Paused,
/// The watcher is shutting down
ShuttingDown,
/// The watcher has stopped
Stopped,
}
/// Status information about the file watcher
#[derive(Debug, Clone)]
pub struct WatcherStatus {
/// Current state of the watcher
pub state: WatcherState,
/// Number of events processed since startup
pub events_processed: usize,
/// Current size of the event queue
pub queue_size: usize,
/// Time the watcher was started
pub start_time: Option<Instant>,
/// Paths being watched
pub watched_paths: Vec<PathBuf>,
}
/// Priority levels for different types of events
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum EventPriority {
/// File creation events (high priority)
Create = 0,
/// File deletion events (high priority)
Delete = 1,
/// File modification events (medium priority)
Modify = 2,
/// File access events (low priority)
Access = 3,
}
/// Processed file system event with metadata
#[derive(Debug, Clone)]
struct ProcessedEvent {
/// Path to the file or directory
path: PathBuf,
/// Type of event
kind: EventKind,
/// Priority of the event for processing order
priority: EventPriority,
/// Time the event was received
timestamp: Instant,
}
/// Event debouncer for coalescing multiple events on the same file
struct EventDebouncer {
/// Map of file paths to their latest events
events: HashMap<PathBuf, ProcessedEvent>,
/// Debounce window in milliseconds
debounce_window_ms: u64,
/// Last time the debouncer was flushed
last_flush: Instant,
}
impl EventDebouncer {
/// Create a new event debouncer with the specified debounce window
fn new(debounce_window_ms: u64) -> Self {
Self {
events: HashMap::new(),
@@ -134,19 +96,14 @@ impl EventDebouncer {
}
}
/// Add an event to the debouncer
fn add_event(&mut self, event: ProcessedEvent) {
let path = event.path.clone();
// Apply hierarchical debouncing: directory events override contained files
if path.is_dir() {
self.events.retain(|file_path, _| !file_path.starts_with(&path));
if path.is_dir() { // This relies on the PathBuf itself knowing if it's a directory
// or on the underlying FS. For unit tests, ensure paths are created.
self.events.retain(|file_path, _| !file_path.starts_with(&path) || file_path == &path );
}
// Update or insert the event for the file
match self.events.get_mut(&path) {
Some(existing) => {
// Keep the higher priority event
if event.priority < existing.priority {
existing.priority = event.priority;
}
@@ -159,12 +116,10 @@ impl EventDebouncer {
}
}
/// Check if the debouncer is ready to flush events
fn is_ready_to_flush(&self) -> bool {
self.last_flush.elapsed() >= Duration::from_millis(self.debounce_window_ms)
}
/// Flush all events, sorted by priority, and reset the debouncer
fn flush(&mut self) -> Vec<ProcessedEvent> {
let mut events: Vec<ProcessedEvent> = self.events.drain().map(|(_, e)| e).collect();
events.sort_by_key(|e| e.priority);
@@ -172,54 +127,186 @@ impl EventDebouncer {
events
}
/// Get the number of events in the debouncer
#[allow(dead_code)]
#[allow(dead_code)]
fn len(&self) -> usize {
self.events.len()
}
}
/// Main file watcher implementation
#[cfg(test)]
mod event_debouncer_tests {
use super::*;
use notify::event::{CreateKind, DataChange, ModifyKind, RemoveKind, RenameMode};
use std::fs; // fs is needed for these tests to create dirs/files
use tempfile;
#[test]
fn debouncer_add_and_flush() {
let mut debouncer = EventDebouncer::new(100);
std::thread::sleep(Duration::from_millis(110));
assert!(debouncer.is_ready_to_flush());
assert_eq!(debouncer.len(), 0);
let path1 = PathBuf::from("file1.txt");
debouncer.add_event(ProcessedEvent {
path: path1.clone(),
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
});
assert_eq!(debouncer.len(), 1);
debouncer.last_flush = Instant::now();
assert!(!debouncer.is_ready_to_flush());
std::thread::sleep(Duration::from_millis(110));
assert!(debouncer.is_ready_to_flush());
let flushed = debouncer.flush();
assert_eq!(flushed.len(), 1);
assert_eq!(flushed[0].path, path1);
assert_eq!(debouncer.len(), 0);
assert!(!debouncer.is_ready_to_flush());
}
#[test]
fn debouncer_coalesce_events() {
let mut debouncer = EventDebouncer::new(100);
let path1 = PathBuf::from("file1.txt");
let t1 = Instant::now();
debouncer.add_event(ProcessedEvent {
path: path1.clone(),
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: t1,
});
std::thread::sleep(Duration::from_millis(10));
let t2 = Instant::now();
debouncer.add_event(ProcessedEvent {
path: path1.clone(),
kind: EventKind::Modify(ModifyKind::Data(DataChange::Any)),
priority: EventPriority::Modify,
timestamp: t2,
});
assert_eq!(debouncer.len(), 1);
std::thread::sleep(Duration::from_millis(110));
let flushed = debouncer.flush();
assert_eq!(flushed.len(), 1);
assert_eq!(flushed[0].path, path1);
assert_eq!(flushed[0].priority, EventPriority::Create);
assert_eq!(
flushed[0].kind,
EventKind::Modify(ModifyKind::Data(DataChange::Any))
);
assert_eq!(flushed[0].timestamp, t2);
}
#[test]
fn debouncer_hierarchical() {
let mut debouncer_h = EventDebouncer::new(100);
let temp_dir_obj = tempfile::tempdir().expect("Failed to create temp dir");
let p_dir = temp_dir_obj.path().to_path_buf();
let p_file = p_dir.join("file.txt");
fs::File::create(&p_file).expect("Failed to create test file for hierarchical debounce");
debouncer_h.add_event(ProcessedEvent {
path: p_file.clone(),
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
});
assert_eq!(debouncer_h.len(), 1);
debouncer_h.add_event(ProcessedEvent {
path: p_dir.clone(),
kind: EventKind::Remove(RemoveKind::Folder),
priority: EventPriority::Delete,
timestamp: Instant::now(),
});
assert_eq!(debouncer_h.len(), 1, "Hierarchical debounce should remove child event, leaving only parent dir event");
std::thread::sleep(Duration::from_millis(110));
let flushed = debouncer_h.flush();
assert_eq!(flushed.len(), 1);
assert_eq!(flushed[0].path, p_dir);
}
#[test]
fn debouncer_different_files() {
let mut debouncer = EventDebouncer::new(100);
let path1 = PathBuf::from("file1.txt");
let path2 = PathBuf::from("file2.txt");
debouncer.add_event(ProcessedEvent {
path: path1.clone(),
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
});
debouncer.add_event(ProcessedEvent {
path: path2.clone(),
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
});
assert_eq!(debouncer.len(), 2);
std::thread::sleep(Duration::from_millis(110));
let flushed = debouncer.flush();
assert_eq!(flushed.len(), 2);
}
#[test]
fn debouncer_priority_sorting_on_flush() {
let mut debouncer = EventDebouncer::new(100);
let path1 = PathBuf::from("file1.txt");
let path2 = PathBuf::from("file2.txt");
let path3 = PathBuf::from("file3.txt");
debouncer.add_event(ProcessedEvent { path: path1, kind: EventKind::Modify(ModifyKind::Name(RenameMode::To)), priority: EventPriority::Modify, timestamp: Instant::now() });
debouncer.add_event(ProcessedEvent { path: path2, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now() });
debouncer.add_event(ProcessedEvent { path: path3, kind: EventKind::Remove(RemoveKind::File), priority: EventPriority::Delete, timestamp: Instant::now() });
std::thread::sleep(Duration::from_millis(110));
let flushed = debouncer.flush();
assert_eq!(flushed.len(), 3);
assert_eq!(flushed[0].priority, EventPriority::Create);
assert_eq!(flushed[1].priority, EventPriority::Delete);
assert_eq!(flushed[2].priority, EventPriority::Modify);
}
#[test]
fn debouncer_no_events_flush_empty() {
let mut debouncer = EventDebouncer::new(100);
std::thread::sleep(Duration::from_millis(110));
let flushed = debouncer.flush();
assert!(flushed.is_empty());
assert_eq!(debouncer.len(), 0);
}
}
pub struct FileWatcher {
/// Current state of the watcher
state: Arc<Mutex<WatcherState>>,
/// Configuration for the watcher
#[allow(dead_code)]
#[allow(dead_code)]
config: WatcherConfig,
/// Paths being watched
watched_paths: Vec<PathBuf>,
/// Notify event receiver (original receiver, clone is used in thread)
#[allow(dead_code)]
#[allow(dead_code)]
event_receiver: Receiver<std::result::Result<Event, notify::Error>>,
/// Notify watcher instance (must be kept alive for watching to continue)
#[allow(dead_code)]
watcher: RecommendedWatcher,
/// Event processor thread
processor_thread: Option<JoinHandle<()>>,
/// Flag to signal the processor thread to stop
stop_flag: Arc<AtomicBool>,
/// Number of events processed
events_processed: Arc<AtomicUsize>,
/// Current queue size
queue_size: Arc<AtomicUsize>,
/// Start time of the watcher
start_time: Instant,
/// Optional database connection, shared with the processor thread.
db_shared: Arc<Mutex<Option<Arc<Mutex<Database>>>>>,
}
impl FileWatcher {
/// Create a new file watcher for the given paths
pub fn new(paths: Vec<PathBuf>, config: WatcherConfig) -> Result<Self> {
let stop_flag = Arc::new(AtomicBool::new(false));
let events_processed = Arc::new(AtomicUsize::new(0));
@@ -227,44 +314,50 @@ impl FileWatcher {
let state = Arc::new(Mutex::new(WatcherState::Initializing));
let (tx, rx) = bounded(config.max_queue_size);
let actual_watcher = notify::recommended_watcher(move |event_res| {
if tx.send(event_res).is_err() {
// eprintln!("Watcher: Failed to send event to channel (receiver likely dropped)");
}
})?;
let mut mutable_watcher_ref = actual_watcher;
for path in &paths {
mutable_watcher_ref.watch(path, RecursiveMode::Recursive)?;
let event_tx = tx.clone();
let mut actual_watcher = RecommendedWatcher::new(
move |event_res: std::result::Result<Event, notify::Error>| {
if event_tx.send(event_res).is_err() {
// Receiver dropped
}
},
notify::Config::default(),
)?;
for path_to_watch in &paths {
actual_watcher
.watch(path_to_watch, RecursiveMode::Recursive)
.with_context(|| format!("Failed to watch path: {}", path_to_watch.display()))?;
}
let config_clone = config.clone();
let config_clone = config.clone();
let stop_flag_clone = stop_flag.clone();
let events_processed_clone = events_processed.clone();
let queue_size_clone = queue_size.clone();
let state_clone = state.clone();
let receiver_clone = rx.clone();
// Correct initialization: Mutex protecting an Option, which starts as None.
let db_shared_for_thread = Arc::new(Mutex::new(None::<Arc<Mutex<Database>>>));
let db_captured_for_thread = db_shared_for_thread.clone();
let processor_thread = thread::spawn(move || {
let mut debouncer = EventDebouncer::new(config_clone.debounce_ms);
while !stop_flag_clone.load(Ordering::SeqCst) {
{
let state_guard = state_clone.lock().unwrap();
if *state_guard == WatcherState::Paused {
drop(state_guard);
thread::sleep(Duration::from_millis(100));
continue;
}
}
while let Ok(evt_res) = receiver_clone.try_recv() {
while !stop_flag_clone.load(Ordering::Relaxed) {
let current_state = { state_clone.lock().unwrap().clone() };
if current_state == WatcherState::Paused {
thread::sleep(Duration::from_millis(100));
continue;
}
if current_state == WatcherState::ShuttingDown || current_state == WatcherState::Stopped {
break;
}
let mut received_in_batch = 0;
while let Ok(evt_res) = receiver_clone.try_recv() {
received_in_batch +=1;
match evt_res {
Ok(event) => {
for path in event.paths {
@@ -273,7 +366,7 @@ impl FileWatcher {
EventKind::Remove(_) => EventPriority::Delete,
EventKind::Modify(_) => EventPriority::Modify,
EventKind::Access(_) => EventPriority::Access,
_ => EventPriority::Modify,
_ => EventPriority::Modify,
};
debouncer.add_event(ProcessedEvent {
path,
@@ -283,83 +376,99 @@ impl FileWatcher {
});
}
}
Err(e) => eprintln!("Watcher channel error: {:?}", e),
Err(e) => {
eprintln!("Watcher channel error: {:?}", e);
}
}
if received_in_batch >= config_clone.batch_size {
break;
}
}
queue_size_clone.store(debouncer.len(), Ordering::SeqCst);
if debouncer.is_ready_to_flush() && debouncer.len() > 0 {
let evts = debouncer.flush();
let num_evts = evts.len();
let evts_to_process = debouncer.flush();
let num_evts = evts_to_process.len();
events_processed_clone.fetch_add(num_evts, Ordering::SeqCst);
let db_opt_arc_guard = db_captured_for_thread.lock().unwrap();
if let Some(db_arc) = &*db_opt_arc_guard {
let _db_guard = db_arc.lock().unwrap();
for event in &evts {
println!("Processing event (DB available): {:?} for path {:?}", event.kind, event.path);
let db_guard_option = db_captured_for_thread.lock().unwrap();
if let Some(db_mutex) = &*db_guard_option {
let mut _db_instance_guard = db_mutex.lock().unwrap();
for event_item in &evts_to_process {
println!(
"Processing event (DB available): {:?} for path {:?}",
event_item.kind, event_item.path
);
}
} else {
for event in &evts {
println!("Processing event (no DB): {:?} for path {:?}", event.kind, event.path);
for event_item in &evts_to_process {
println!(
"Processing event (no DB): {:?} for path {:?}",
event_item.kind, event_item.path
);
}
}
}
thread::sleep(Duration::from_millis(10));
thread::sleep(Duration::from_millis(50));
}
if debouncer.len() > 0 {
let evts = debouncer.flush();
events_processed_clone.fetch_add(evts.len(), Ordering::SeqCst);
for processed_event in evts {
println!("Processing final event: {:?} for path {:?}", processed_event.kind, processed_event.path);
}
let final_evts = debouncer.flush();
events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst);
for processed_event in final_evts {
println!(
"Processing final event: {:?} for path {:?}",
processed_event.kind, processed_event.path
);
}
}
let mut state_guard = state_clone.lock().unwrap();
*state_guard = WatcherState::Stopped;
let mut final_state_guard = state_clone.lock().unwrap();
*final_state_guard = WatcherState::Stopped;
});
let watcher_instance = Self {
Ok(Self {
state,
config,
watched_paths: paths,
event_receiver: rx,
watcher: mutable_watcher_ref,
event_receiver: rx,
watcher: actual_watcher,
processor_thread: Some(processor_thread),
stop_flag,
events_processed,
queue_size,
start_time: Instant::now(),
db_shared: db_shared_for_thread,
};
Ok(watcher_instance)
db_shared: db_shared_for_thread,
})
}
/// Set the database connection for the watcher.
pub fn with_database(&mut self, db_arc: Arc<Mutex<Database>>) -> &mut Self {
{
{
let mut shared_db_guard = self.db_shared.lock().unwrap();
*shared_db_guard = Some(db_arc);
}
self
}
/// Start the file watcher.
pub fn start(&mut self) -> Result<()> {
let mut state_guard = self.state.lock().unwrap();
if *state_guard == WatcherState::Watching || (*state_guard == WatcherState::Initializing && self.processor_thread.is_some()) {
if *state_guard == WatcherState::Initializing {
*state_guard = WatcherState::Watching;
}
return Ok(());
if *state_guard == WatcherState::Watching || self.processor_thread.is_none() {
if self.processor_thread.is_none() {
return Err(anyhow::anyhow!("Watcher thread not available to start."));
}
if *state_guard == WatcherState::Initializing {
*state_guard = WatcherState::Watching;
}
return Ok(());
}
if *state_guard != WatcherState::Initializing && *state_guard != WatcherState::Stopped && *state_guard != WatcherState::Paused {
return Err(anyhow::anyhow!(format!("Cannot start watcher from state {:?}", *state_guard)));
}
*state_guard = WatcherState::Watching;
Ok(())
}
/// Pause the watcher.
pub fn pause(&mut self) -> Result<()> {
let mut state_guard = self.state.lock().unwrap();
match *state_guard {
@@ -367,11 +476,11 @@ impl FileWatcher {
*state_guard = WatcherState::Paused;
Ok(())
}
_ => Err(anyhow::anyhow!("Watcher not in watching state to pause")),
WatcherState::Paused => Ok(()),
_ => Err(anyhow::anyhow!(format!("Watcher not in watching state to pause (current: {:?})", *state_guard))),
}
}
/// Resume a paused watcher.
pub fn resume(&mut self) -> Result<()> {
let mut state_guard = self.state.lock().unwrap();
match *state_guard {
@@ -379,24 +488,27 @@ impl FileWatcher {
*state_guard = WatcherState::Watching;
Ok(())
}
_ => Err(anyhow::anyhow!("Watcher not in paused state to resume")),
WatcherState::Watching => Ok(()),
_ => Err(anyhow::anyhow!(format!("Watcher not in paused state to resume (current: {:?})", *state_guard))),
}
}
/// Stop the watcher.
pub fn stop(&mut self) -> Result<()> {
let mut state_guard = self.state.lock().unwrap();
if *state_guard == WatcherState::Stopped || *state_guard == WatcherState::ShuttingDown {
let mut current_state_guard = self.state.lock().unwrap();
if *current_state_guard == WatcherState::Stopped || *current_state_guard == WatcherState::ShuttingDown {
return Ok(());
}
*state_guard = WatcherState::ShuttingDown;
drop(state_guard);
*current_state_guard = WatcherState::ShuttingDown;
drop(current_state_guard);
self.stop_flag.store(true, Ordering::SeqCst);
if let Some(handle) = self.processor_thread.take() {
match handle.join() {
Ok(_) => (),
Err(e) => eprintln!("Failed to join processor thread: {:?}", e),
Ok(_) => { /* Thread joined cleanly */ }
Err(join_err) => {
eprintln!("Watcher processor thread panicked: {:?}", join_err);
}
}
}
@@ -405,7 +517,6 @@ impl FileWatcher {
Ok(())
}
/// Get the current status of the watcher.
pub fn status(&self) -> WatcherStatus {
let state_guard = self.state.lock().unwrap().clone();
WatcherStatus {
@@ -419,10 +530,92 @@ impl FileWatcher {
}
impl Drop for FileWatcher {
/// Ensure the watcher is stopped when dropped to prevent resource leaks.
fn drop(&mut self) {
if let Err(e) = self.stop() {
eprintln!("Error stopping watcher in Drop: {:?}", e);
}
}
}
#[cfg(test)]
mod file_watcher_state_tests {
use super::*;
use tempfile::tempdir;
use std::fs as FsMod; // Alias to avoid conflict with local `fs` module name if any
#[test]
fn test_watcher_pause_resume_stop() {
let tmp_dir = tempdir().unwrap();
let watch_path = tmp_dir.path().to_path_buf();
FsMod::create_dir_all(&watch_path).expect("Failed to create temp dir for watching");
let config = WatcherConfig::default();
let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher");
assert_eq!(watcher.status().state, WatcherState::Initializing);
watcher.start().expect("Start failed");
assert_eq!(watcher.status().state, WatcherState::Watching);
watcher.pause().expect("Pause failed");
assert_eq!(watcher.status().state, WatcherState::Paused);
watcher.pause().expect("Second pause failed");
assert_eq!(watcher.status().state, WatcherState::Paused);
watcher.resume().expect("Resume failed");
assert_eq!(watcher.status().state, WatcherState::Watching);
watcher.resume().expect("Second resume failed");
assert_eq!(watcher.status().state, WatcherState::Watching);
watcher.stop().expect("Stop failed");
assert_eq!(watcher.status().state, WatcherState::Stopped);
watcher.stop().expect("Second stop failed");
assert_eq!(watcher.status().state, WatcherState::Stopped);
}
#[test]
fn test_watcher_start_errors() {
let tmp_dir = tempdir().unwrap();
FsMod::create_dir_all(tmp_dir.path()).expect("Failed to create temp dir for watching");
let mut watcher = FileWatcher::new(vec![tmp_dir.path().to_path_buf()], WatcherConfig::default()).unwrap();
{
let mut state_guard = watcher.state.lock().unwrap();
*state_guard = WatcherState::Watching;
}
assert!(watcher.start().is_ok(), "Should be able to call start when already Watching (idempotent state change)");
assert_eq!(watcher.status().state, WatcherState::Watching);
{
let mut state_guard = watcher.state.lock().unwrap();
*state_guard = WatcherState::ShuttingDown;
}
assert!(watcher.start().is_err(), "Should not be able to start from ShuttingDown");
}
#[test]
fn test_new_watcher_with_nonexistent_path() {
let non_existent_path = PathBuf::from("/path/that/REALLY/does/not/exist/for/sure/and/cannot/be/created");
let config = WatcherConfig::default();
let watcher_result = FileWatcher::new(vec![non_existent_path], config);
assert!(watcher_result.is_err());
if let Err(e) = watcher_result {
let err_string = e.to_string();
assert!(err_string.contains("Failed to watch path") || err_string.contains("os error 2"), "Error was: {}", err_string);
}
}
#[test]
fn test_watcher_default_config() {
let config = WatcherConfig::default();
assert_eq!(config.debounce_ms, 100);
assert_eq!(config.batch_size, 1000);
assert_eq!(config.max_queue_size, 100_000);
assert_eq!(config.drain_timeout_ms, 5000);
}
}