mirror of
https://github.com/PR0M3TH3AN/Marlin.git
synced 2025-09-08 07:08:44 +00:00
Handle remove-create renames
This commit is contained in:
@@ -18,6 +18,7 @@ tracing = "0.1"
|
|||||||
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
|
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
|
||||||
walkdir = "2.5"
|
walkdir = "2.5"
|
||||||
shlex = "1.3"
|
shlex = "1.3"
|
||||||
|
same-file = "1"
|
||||||
shellexpand = "3.1"
|
shellexpand = "3.1"
|
||||||
serde_json = { version = "1", optional = true }
|
serde_json = { version = "1", optional = true }
|
||||||
|
|
||||||
|
@@ -12,6 +12,7 @@ use notify::{
|
|||||||
event::{ModifyKind, RemoveKind, RenameMode},
|
event::{ModifyKind, RemoveKind, RenameMode},
|
||||||
Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait,
|
Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait,
|
||||||
};
|
};
|
||||||
|
use same_file::Handle;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
@@ -84,6 +85,61 @@ struct EventDebouncer {
|
|||||||
last_flush: Instant,
|
last_flush: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct RemoveTracker {
|
||||||
|
map: HashMap<u64, (PathBuf, Instant)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RemoveTracker {
|
||||||
|
fn record(&mut self, path: &PathBuf) {
|
||||||
|
if let Ok(h) = Handle::from_path(path) {
|
||||||
|
self.map.insert(h.ino(), (path.clone(), Instant::now()));
|
||||||
|
} else {
|
||||||
|
// fall back to hashing path if inode not available
|
||||||
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
|
let mut hasher = DefaultHasher::new();
|
||||||
|
path.hash(&mut hasher);
|
||||||
|
self.map
|
||||||
|
.insert(hasher.finish(), (path.clone(), Instant::now()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn match_create(&mut self, path: &PathBuf, window: Duration) -> Option<PathBuf> {
|
||||||
|
if let Ok(h) = Handle::from_path(path) {
|
||||||
|
if let Some((old, ts)) = self.map.remove(&h.ino()) {
|
||||||
|
if Instant::now().duration_since(ts) <= window {
|
||||||
|
return Some(old);
|
||||||
|
} else {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush_expired(&mut self, window: Duration, debouncer: &mut EventDebouncer) {
|
||||||
|
let now = Instant::now();
|
||||||
|
let mut expired = Vec::new();
|
||||||
|
for (ino, (path, ts)) in &self.map {
|
||||||
|
if now.duration_since(*ts) > window {
|
||||||
|
debouncer.add_event(ProcessedEvent {
|
||||||
|
path: path.clone(),
|
||||||
|
old_path: None,
|
||||||
|
new_path: None,
|
||||||
|
kind: EventKind::Remove(RemoveKind::Any),
|
||||||
|
priority: EventPriority::Delete,
|
||||||
|
timestamp: *ts,
|
||||||
|
});
|
||||||
|
expired.push(*ino);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for ino in expired {
|
||||||
|
self.map.remove(&ino);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl EventDebouncer {
|
impl EventDebouncer {
|
||||||
fn new(debounce_window_ms: u64) -> Self {
|
fn new(debounce_window_ms: u64) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -187,7 +243,7 @@ impl FileWatcher {
|
|||||||
let processor_thread = thread::spawn(move || {
|
let processor_thread = thread::spawn(move || {
|
||||||
let mut debouncer = EventDebouncer::new(config_clone.debounce_ms);
|
let mut debouncer = EventDebouncer::new(config_clone.debounce_ms);
|
||||||
let mut rename_cache: HashMap<usize, PathBuf> = HashMap::new();
|
let mut rename_cache: HashMap<usize, PathBuf> = HashMap::new();
|
||||||
let mut pending_remove: Option<(PathBuf, Instant)> = None;
|
let mut remove_tracker = RemoveTracker::default();
|
||||||
|
|
||||||
while !stop_flag_clone.load(Ordering::Relaxed) {
|
while !stop_flag_clone.load(Ordering::Relaxed) {
|
||||||
// honour current state
|
// honour current state
|
||||||
@@ -223,38 +279,27 @@ impl FileWatcher {
|
|||||||
|
|
||||||
// ── per-event logic ───────────────────────────────
|
// ── per-event logic ───────────────────────────────
|
||||||
match event.kind {
|
match event.kind {
|
||||||
// 1. remove-then-create → rename heuristic
|
// 1. remove-then-create → rename heuristic using inode
|
||||||
EventKind::Remove(_) if event.paths.len() == 1 => {
|
EventKind::Remove(_) if event.paths.len() == 1 => {
|
||||||
pending_remove = Some((event.paths[0].clone(), Instant::now()));
|
remove_tracker.record(&event.paths[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
EventKind::Create(_) if event.paths.len() == 1 => {
|
EventKind::Create(_) if event.paths.len() == 1 => {
|
||||||
if let Some((old_p, ts)) = pending_remove.take() {
|
if let Some(old_p) = remove_tracker
|
||||||
if Instant::now().duration_since(ts)
|
.match_create(&event.paths[0], Duration::from_millis(500))
|
||||||
<= Duration::from_millis(500)
|
{
|
||||||
{
|
let new_p = event.paths[0].clone();
|
||||||
let new_p = event.paths[0].clone();
|
debouncer.add_event(ProcessedEvent {
|
||||||
debouncer.add_event(ProcessedEvent {
|
path: old_p.clone(),
|
||||||
path: old_p.clone(),
|
old_path: Some(old_p),
|
||||||
old_path: Some(old_p),
|
new_path: Some(new_p),
|
||||||
new_path: Some(new_p),
|
kind: EventKind::Modify(ModifyKind::Name(
|
||||||
kind: EventKind::Modify(ModifyKind::Name(
|
RenameMode::Both,
|
||||||
RenameMode::Both,
|
)),
|
||||||
)),
|
priority: prio,
|
||||||
priority: prio,
|
timestamp: Instant::now(),
|
||||||
timestamp: Instant::now(),
|
});
|
||||||
});
|
continue;
|
||||||
continue; // handled as rename
|
|
||||||
} else {
|
|
||||||
debouncer.add_event(ProcessedEvent {
|
|
||||||
path: old_p.clone(),
|
|
||||||
old_path: None,
|
|
||||||
new_path: None,
|
|
||||||
kind: EventKind::Remove(RemoveKind::Any),
|
|
||||||
priority: EventPriority::Delete,
|
|
||||||
timestamp: ts,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for p in event.paths {
|
for p in event.paths {
|
||||||
@@ -377,21 +422,8 @@ impl FileWatcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// deal with orphaned remove
|
// deal with orphaned removes
|
||||||
if let Some((old_p, ts)) = pending_remove.take() {
|
remove_tracker.flush_expired(Duration::from_millis(500), &mut debouncer);
|
||||||
if Instant::now().duration_since(ts) > Duration::from_millis(500) {
|
|
||||||
debouncer.add_event(ProcessedEvent {
|
|
||||||
path: old_p.clone(),
|
|
||||||
old_path: None,
|
|
||||||
new_path: None,
|
|
||||||
kind: EventKind::Remove(RemoveKind::Any),
|
|
||||||
priority: EventPriority::Delete,
|
|
||||||
timestamp: ts,
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
pending_remove = Some((old_p, ts));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
queue_size_clone.store(debouncer.len(), Ordering::SeqCst);
|
queue_size_clone.store(debouncer.len(), Ordering::SeqCst);
|
||||||
|
|
||||||
@@ -438,6 +470,7 @@ impl FileWatcher {
|
|||||||
} // main loop
|
} // main loop
|
||||||
|
|
||||||
// final flush on shutdown
|
// final flush on shutdown
|
||||||
|
remove_tracker.flush_expired(Duration::from_millis(500), &mut debouncer);
|
||||||
if debouncer.len() > 0 {
|
if debouncer.len() > 0 {
|
||||||
let final_evts = debouncer.flush();
|
let final_evts = debouncer.flush();
|
||||||
events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst);
|
events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst);
|
||||||
@@ -777,6 +810,45 @@ mod event_debouncer_tests {
|
|||||||
assert_eq!(flushed.len(), 2);
|
assert_eq!(flushed.len(), 2);
|
||||||
assert!(flushed.iter().any(|e| e.path == dir));
|
assert!(flushed.iter().any(|e| e.path == dir));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn remove_create_same_inode_produces_rename() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let old_p = tmp.path().join("old.txt");
|
||||||
|
std::fs::write(&old_p, b"hi").unwrap();
|
||||||
|
|
||||||
|
let mut debouncer = EventDebouncer::new(100);
|
||||||
|
let mut tracker = RemoveTracker::default();
|
||||||
|
|
||||||
|
tracker.record(&old_p);
|
||||||
|
|
||||||
|
let new_p = tmp.path().join("new.txt");
|
||||||
|
std::fs::rename(&old_p, &new_p).unwrap();
|
||||||
|
|
||||||
|
if let Some(orig) = tracker.match_create(&new_p, Duration::from_millis(500)) {
|
||||||
|
debouncer.add_event(ProcessedEvent {
|
||||||
|
path: orig.clone(),
|
||||||
|
old_path: Some(orig),
|
||||||
|
new_path: Some(new_p.clone()),
|
||||||
|
kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)),
|
||||||
|
priority: EventPriority::Modify,
|
||||||
|
timestamp: Instant::now(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
tracker.flush_expired(Duration::from_millis(500), &mut debouncer);
|
||||||
|
let flushed = debouncer.flush();
|
||||||
|
assert_eq!(flushed.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
flushed[0].kind,
|
||||||
|
EventKind::Modify(ModifyKind::Name(RenameMode::Both))
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
flushed[0].old_path.as_ref().unwrap(),
|
||||||
|
&tmp.path().join("old.txt")
|
||||||
|
);
|
||||||
|
assert_eq!(flushed[0].new_path.as_ref().unwrap(), &new_p);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
Reference in New Issue
Block a user