Handle rename events

This commit is contained in:
thePR0M3TH3AN
2025-05-22 16:41:50 -04:00
parent 30922c70c0
commit 6b9d684b15
5 changed files with 367 additions and 24 deletions

View File

@@ -6,10 +6,13 @@
//! (create, modify, delete) using the `notify` crate. It implements event debouncing,
//! batch processing, and a state machine for robust lifecycle management.
use crate::db::Database;
use crate::db::{self, Database};
use anyhow::{Context, Result};
use crossbeam_channel::{bounded, Receiver};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait};
use notify::{
event::{ModifyKind, RemoveKind, RenameMode},
Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait,
};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -76,6 +79,8 @@ enum EventPriority {
#[derive(Debug, Clone)]
struct ProcessedEvent {
path: PathBuf,
old_path: Option<PathBuf>,
new_path: Option<PathBuf>,
kind: EventKind,
priority: EventPriority,
timestamp: Instant,
@@ -151,6 +156,8 @@ mod event_debouncer_tests {
let path1 = PathBuf::from("file1.txt");
debouncer.add_event(ProcessedEvent {
path: path1.clone(),
old_path: None,
new_path: None,
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
@@ -178,6 +185,8 @@ mod event_debouncer_tests {
let t1 = Instant::now();
debouncer.add_event(ProcessedEvent {
path: path1.clone(),
old_path: None,
new_path: None,
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: t1,
@@ -186,6 +195,8 @@ mod event_debouncer_tests {
let t2 = Instant::now();
debouncer.add_event(ProcessedEvent {
path: path1.clone(),
old_path: None,
new_path: None,
kind: EventKind::Modify(ModifyKind::Data(DataChange::Any)),
priority: EventPriority::Modify,
timestamp: t2,
@@ -216,6 +227,8 @@ mod event_debouncer_tests {
debouncer_h.add_event(ProcessedEvent {
path: p_file.clone(),
old_path: None,
new_path: None,
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
@@ -224,6 +237,8 @@ mod event_debouncer_tests {
debouncer_h.add_event(ProcessedEvent {
path: p_dir.clone(),
old_path: None,
new_path: None,
kind: EventKind::Remove(RemoveKind::Folder),
priority: EventPriority::Delete,
timestamp: Instant::now(),
@@ -248,12 +263,16 @@ mod event_debouncer_tests {
debouncer.add_event(ProcessedEvent {
path: path1.clone(),
old_path: None,
new_path: None,
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
});
debouncer.add_event(ProcessedEvent {
path: path2.clone(),
old_path: None,
new_path: None,
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
@@ -273,18 +292,24 @@ mod event_debouncer_tests {
debouncer.add_event(ProcessedEvent {
path: path1,
old_path: None,
new_path: None,
kind: EventKind::Modify(ModifyKind::Name(RenameMode::To)),
priority: EventPriority::Modify,
timestamp: Instant::now(),
});
debouncer.add_event(ProcessedEvent {
path: path2,
old_path: None,
new_path: None,
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
});
debouncer.add_event(ProcessedEvent {
path: path3,
old_path: None,
new_path: None,
kind: EventKind::Remove(RemoveKind::File),
priority: EventPriority::Delete,
timestamp: Instant::now(),
@@ -316,12 +341,16 @@ mod event_debouncer_tests {
debouncer.add_event(ProcessedEvent {
path: dir.clone(),
old_path: None,
new_path: None,
kind: EventKind::Create(CreateKind::Folder),
priority: EventPriority::Create,
timestamp: Instant::now(),
});
debouncer.add_event(ProcessedEvent {
path: file,
old_path: None,
new_path: None,
kind: EventKind::Create(CreateKind::File),
priority: EventPriority::Create,
timestamp: Instant::now(),
@@ -386,6 +415,8 @@ impl FileWatcher {
let processor_thread = thread::spawn(move || {
let mut debouncer = EventDebouncer::new(config_clone.debounce_ms);
let mut rename_cache: HashMap<usize, PathBuf> = HashMap::new();
let mut pending_remove: Option<(PathBuf, Instant)> = None;
while !stop_flag_clone.load(Ordering::Relaxed) {
let current_state = match state_clone.lock() {
@@ -396,7 +427,9 @@ impl FileWatcher {
}
};
if current_state == WatcherState::Paused {
if current_state == WatcherState::Paused
|| current_state == WatcherState::Initializing
{
thread::sleep(Duration::from_millis(100));
continue;
}
@@ -411,20 +444,145 @@ impl FileWatcher {
received_in_batch += 1;
match evt_res {
Ok(event) => {
for path in event.paths {
let prio = match event.kind {
EventKind::Create(_) => EventPriority::Create,
EventKind::Remove(_) => EventPriority::Delete,
EventKind::Modify(_) => EventPriority::Modify,
EventKind::Access(_) => EventPriority::Access,
_ => EventPriority::Modify,
};
debouncer.add_event(ProcessedEvent {
path,
kind: event.kind,
priority: prio,
timestamp: Instant::now(),
});
let prio = match event.kind {
EventKind::Create(_) => EventPriority::Create,
EventKind::Remove(_) => EventPriority::Delete,
EventKind::Modify(_) => EventPriority::Modify,
EventKind::Access(_) => EventPriority::Access,
_ => EventPriority::Modify,
};
match event.kind {
EventKind::Remove(_) if event.paths.len() == 1 => {
pending_remove = Some((event.paths[0].clone(), Instant::now()));
}
EventKind::Create(_) if event.paths.len() == 1 => {
if let Some((old_p, ts)) = pending_remove.take() {
if Instant::now().duration_since(ts)
<= Duration::from_millis(500)
{
let new_p = event.paths[0].clone();
debouncer.add_event(ProcessedEvent {
path: old_p.clone(),
old_path: Some(old_p),
new_path: Some(new_p),
kind: EventKind::Modify(ModifyKind::Name(
RenameMode::Both,
)),
priority: prio,
timestamp: Instant::now(),
});
continue;
} else {
debouncer.add_event(ProcessedEvent {
path: old_p.clone(),
old_path: None,
new_path: None,
kind: EventKind::Remove(RemoveKind::Any),
priority: EventPriority::Delete,
timestamp: ts,
});
}
}
for path in event.paths {
debouncer.add_event(ProcessedEvent {
path,
old_path: None,
new_path: None,
kind: event.kind,
priority: prio,
timestamp: Instant::now(),
});
}
}
EventKind::Modify(ModifyKind::Name(mode)) => match mode {
RenameMode::Both => {
if event.paths.len() >= 2 {
let old_path = event.paths[0].clone();
let new_path = event.paths[1].clone();
debouncer.add_event(ProcessedEvent {
path: old_path.clone(),
old_path: Some(old_path),
new_path: Some(new_path),
kind: EventKind::Modify(ModifyKind::Name(
RenameMode::Both,
)),
priority: prio,
timestamp: Instant::now(),
});
}
}
RenameMode::From => {
if let (Some(trk), Some(p)) =
(event.tracker(), event.paths.first())
{
rename_cache.insert(trk, p.clone());
}
for path in event.paths {
debouncer.add_event(ProcessedEvent {
path,
old_path: None,
new_path: None,
kind: event.kind,
priority: prio,
timestamp: Instant::now(),
});
}
}
RenameMode::To => {
if let (Some(trk), Some(new_p)) =
(event.tracker(), event.paths.first())
{
if let Some(old_p) = rename_cache.remove(&trk) {
debouncer.add_event(ProcessedEvent {
path: old_p.clone(),
old_path: Some(old_p),
new_path: Some(new_p.clone()),
kind: EventKind::Modify(ModifyKind::Name(
RenameMode::Both,
)),
priority: prio,
timestamp: Instant::now(),
});
continue;
}
}
for path in event.paths {
debouncer.add_event(ProcessedEvent {
path,
old_path: None,
new_path: None,
kind: event.kind,
priority: prio,
timestamp: Instant::now(),
});
}
}
_ => {
for path in event.paths {
debouncer.add_event(ProcessedEvent {
path,
old_path: None,
new_path: None,
kind: event.kind,
priority: prio,
timestamp: Instant::now(),
});
}
}
},
_ => {
for path in event.paths {
debouncer.add_event(ProcessedEvent {
path,
old_path: None,
new_path: None,
kind: event.kind,
priority: prio,
timestamp: Instant::now(),
});
}
}
}
}
Err(e) => {
@@ -436,6 +594,21 @@ impl FileWatcher {
}
}
if let Some((old_p, ts)) = pending_remove.take() {
if Instant::now().duration_since(ts) > Duration::from_millis(500) {
debouncer.add_event(ProcessedEvent {
path: old_p.clone(),
old_path: None,
new_path: None,
kind: EventKind::Remove(RemoveKind::Any),
priority: EventPriority::Delete,
timestamp: ts,
});
} else {
pending_remove = Some((old_p, ts));
}
}
queue_size_clone.store(debouncer.len(), Ordering::SeqCst);
if debouncer.is_ready_to_flush() && debouncer.len() > 0 {
@@ -451,8 +624,32 @@ impl FileWatcher {
}
};
if let Some(db_mutex) = &*db_guard_option {
if let Ok(mut _db_instance_guard) = db_mutex.lock() {
if let Ok(mut db_inst) = db_mutex.lock() {
for event_item in &evts_to_process {
if let EventKind::Modify(ModifyKind::Name(_)) = event_item.kind {
if let (Some(ref old_p), Some(ref new_p)) =
(&event_item.old_path, &event_item.new_path)
{
let old_str = old_p.to_string_lossy();
let new_str = new_p.to_string_lossy();
let res = if new_p.is_dir() {
db::rename_directory(
db_inst.conn_mut(),
&old_str,
&new_str,
)
} else {
db::update_file_path(
db_inst.conn_mut(),
&old_str,
&new_str,
)
};
if let Err(e) = res {
eprintln!("DB rename error: {:?}", e);
}
}
}
info!(
"Processing event (DB available): {:?} for path {:?}",
event_item.kind, event_item.path
@@ -476,11 +673,38 @@ impl FileWatcher {
if debouncer.len() > 0 {
let final_evts = debouncer.flush();
events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst);
for processed_event in final_evts {
info!(
"Processing final event: {:?} for path {:?}",
processed_event.kind, processed_event.path
);
if let Some(db_mutex) = &*db_captured_for_thread.lock().unwrap() {
if let Ok(mut db_inst) = db_mutex.lock() {
for processed_event in &final_evts {
if let EventKind::Modify(ModifyKind::Name(_)) = processed_event.kind {
if let (Some(ref old_p), Some(ref new_p)) =
(&processed_event.old_path, &processed_event.new_path)
{
let old_str = old_p.to_string_lossy();
let new_str = new_p.to_string_lossy();
let res = if new_p.is_dir() {
db::rename_directory(db_inst.conn_mut(), &old_str, &new_str)
} else {
db::update_file_path(db_inst.conn_mut(), &old_str, &new_str)
};
if let Err(e) = res {
eprintln!("DB rename error: {:?}", e);
}
}
}
info!(
"Processing final event: {:?} for path {:?}",
processed_event.kind, processed_event.path
);
}
}
} else {
for processed_event in final_evts {
info!(
"Processing final event: {:?} for path {:?}",
processed_event.kind, processed_event.path
);
}
}
}
if let Ok(mut final_state_guard) = state_clone.lock() {