diff --git a/libmarlin/Cargo.toml b/libmarlin/Cargo.toml index af61a9b..9cc4d1b 100644 --- a/libmarlin/Cargo.toml +++ b/libmarlin/Cargo.toml @@ -18,6 +18,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } walkdir = "2.5" shlex = "1.3" +same-file = "1" shellexpand = "3.1" serde_json = { version = "1", optional = true } diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index 6681cae..0d2a94c 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -12,6 +12,7 @@ use notify::{ event::{ModifyKind, RemoveKind, RenameMode}, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait, }; +use same_file::Handle; use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -84,6 +85,61 @@ struct EventDebouncer { last_flush: Instant, } +#[derive(Default)] +struct RemoveTracker { + map: HashMap, +} + +impl RemoveTracker { + fn record(&mut self, path: &PathBuf) { + if let Ok(h) = Handle::from_path(path) { + self.map.insert(h.ino(), (path.clone(), Instant::now())); + } else { + // fall back to hashing path if inode not available + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + path.hash(&mut hasher); + self.map + .insert(hasher.finish(), (path.clone(), Instant::now())); + } + } + + fn match_create(&mut self, path: &PathBuf, window: Duration) -> Option { + if let Ok(h) = Handle::from_path(path) { + if let Some((old, ts)) = self.map.remove(&h.ino()) { + if Instant::now().duration_since(ts) <= window { + return Some(old); + } else { + return None; + } + } + } + None + } + + fn flush_expired(&mut self, window: Duration, debouncer: &mut EventDebouncer) { + let now = Instant::now(); + let mut expired = Vec::new(); + for (ino, (path, ts)) in &self.map { + if now.duration_since(*ts) > window { + debouncer.add_event(ProcessedEvent { + path: path.clone(), + old_path: None, + new_path: None, + kind: EventKind::Remove(RemoveKind::Any), + priority: EventPriority::Delete, + timestamp: *ts, + }); + expired.push(*ino); + } + } + for ino in expired { + self.map.remove(&ino); + } + } +} + impl EventDebouncer { fn new(debounce_window_ms: u64) -> Self { Self { @@ -187,7 +243,7 @@ impl FileWatcher { let processor_thread = thread::spawn(move || { let mut debouncer = EventDebouncer::new(config_clone.debounce_ms); let mut rename_cache: HashMap = HashMap::new(); - let mut pending_remove: Option<(PathBuf, Instant)> = None; + let mut remove_tracker = RemoveTracker::default(); while !stop_flag_clone.load(Ordering::Relaxed) { // honour current state @@ -223,38 +279,27 @@ impl FileWatcher { // ── per-event logic ─────────────────────────────── match event.kind { - // 1. remove-then-create → rename heuristic + // 1. remove-then-create → rename heuristic using inode EventKind::Remove(_) if event.paths.len() == 1 => { - pending_remove = Some((event.paths[0].clone(), Instant::now())); + remove_tracker.record(&event.paths[0]); } EventKind::Create(_) if event.paths.len() == 1 => { - if let Some((old_p, ts)) = pending_remove.take() { - if Instant::now().duration_since(ts) - <= Duration::from_millis(500) - { - let new_p = event.paths[0].clone(); - debouncer.add_event(ProcessedEvent { - path: old_p.clone(), - old_path: Some(old_p), - new_path: Some(new_p), - kind: EventKind::Modify(ModifyKind::Name( - RenameMode::Both, - )), - priority: prio, - timestamp: Instant::now(), - }); - continue; // handled as rename - } else { - debouncer.add_event(ProcessedEvent { - path: old_p.clone(), - old_path: None, - new_path: None, - kind: EventKind::Remove(RemoveKind::Any), - priority: EventPriority::Delete, - timestamp: ts, - }); - } + if let Some(old_p) = remove_tracker + .match_create(&event.paths[0], Duration::from_millis(500)) + { + let new_p = event.paths[0].clone(); + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + continue; } for p in event.paths { @@ -377,21 +422,8 @@ impl FileWatcher { } } - // deal with orphaned remove - if let Some((old_p, ts)) = pending_remove.take() { - if Instant::now().duration_since(ts) > Duration::from_millis(500) { - debouncer.add_event(ProcessedEvent { - path: old_p.clone(), - old_path: None, - new_path: None, - kind: EventKind::Remove(RemoveKind::Any), - priority: EventPriority::Delete, - timestamp: ts, - }); - } else { - pending_remove = Some((old_p, ts)); - } - } + // deal with orphaned removes + remove_tracker.flush_expired(Duration::from_millis(500), &mut debouncer); queue_size_clone.store(debouncer.len(), Ordering::SeqCst); @@ -438,6 +470,7 @@ impl FileWatcher { } // main loop // final flush on shutdown + remove_tracker.flush_expired(Duration::from_millis(500), &mut debouncer); if debouncer.len() > 0 { let final_evts = debouncer.flush(); events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst); @@ -777,6 +810,45 @@ mod event_debouncer_tests { assert_eq!(flushed.len(), 2); assert!(flushed.iter().any(|e| e.path == dir)); } + + #[test] + fn remove_create_same_inode_produces_rename() { + let tmp = tempfile::tempdir().unwrap(); + let old_p = tmp.path().join("old.txt"); + std::fs::write(&old_p, b"hi").unwrap(); + + let mut debouncer = EventDebouncer::new(100); + let mut tracker = RemoveTracker::default(); + + tracker.record(&old_p); + + let new_p = tmp.path().join("new.txt"); + std::fs::rename(&old_p, &new_p).unwrap(); + + if let Some(orig) = tracker.match_create(&new_p, Duration::from_millis(500)) { + debouncer.add_event(ProcessedEvent { + path: orig.clone(), + old_path: Some(orig), + new_path: Some(new_p.clone()), + kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)), + priority: EventPriority::Modify, + timestamp: Instant::now(), + }); + } + + tracker.flush_expired(Duration::from_millis(500), &mut debouncer); + let flushed = debouncer.flush(); + assert_eq!(flushed.len(), 1); + assert_eq!( + flushed[0].kind, + EventKind::Modify(ModifyKind::Name(RenameMode::Both)) + ); + assert_eq!( + flushed[0].old_path.as_ref().unwrap(), + &tmp.path().join("old.txt") + ); + assert_eq!(flushed[0].new_path.as_ref().unwrap(), &new_p); + } } #[cfg(test)]