diff --git a/.gitignore b/.gitignore index 801fffd..1492465 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ # === Rust build artifacts === /target/ -/Cargo.lock # === IDE & Editor settings === diff --git a/Cargo.lock b/Cargo.lock index fcb1a12..6cf492d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,12 +79,12 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "3.0.7" +version = "3.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" +checksum = "6680de5231bd6ee4c6191b8a1325daa282b415391ec9d3a37bd34f2060dc73fa" dependencies = [ "anstyle", - "once_cell", + "once_cell_polyfill", "windows-sys 0.59.0", ] @@ -156,9 +156,9 @@ checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" [[package]] name = "cc" -version = "1.2.23" +version = "1.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4ac86a9e5bc1e2b3449ab9d7d3a6a405e3d1bb28d7b9be8614f55846ae3766" +checksum = "16595d3be041c03b09d08d0858631facccee9221e579704070e6e9e4915d3bc7" dependencies = [ "shlex", ] @@ -470,12 +470,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.14.5" @@ -530,16 +524,6 @@ dependencies = [ "cc", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - [[package]] name = "indexmap" version = "2.9.0" @@ -635,8 +619,8 @@ dependencies = [ "glob", "lazy_static", "notify", - "priority-queue", "rusqlite", + "same-file", "serde_json", "sha2", "shellexpand", @@ -820,6 +804,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" + [[package]] name = "option-ext" version = "0.2.0" @@ -874,16 +864,6 @@ dependencies = [ "termtree", ] -[[package]] -name = "priority-queue" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bda9164fe05bc9225752d54aae413343c36f684380005398a6a8fde95fe785" -dependencies = [ - "autocfg", - "indexmap 1.9.3", -] - [[package]] name = "proc-macro2" version = "1.0.95" @@ -1012,9 +992,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.20" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" +checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" [[package]] name = "ryu" @@ -1069,7 +1049,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.9.0", + "indexmap", "itoa", "ryu", "serde", diff --git a/cli-bin/docs/cli_cheatsheet.md b/cli-bin/docs/cli_cheatsheet.md index 402fe7f..c259009 100644 --- a/cli-bin/docs/cli_cheatsheet.md +++ b/cli-bin/docs/cli_cheatsheet.md @@ -22,3 +22,6 @@ | `event add` | — | | `event timeline` | — | | `backup run` | --dir, --prune, --verify, --file | +| `watch start` | --debounce-ms | +| `watch status` | — | +| `watch stop` | — | diff --git a/docs/adr/DP-001_schema_v1.1.md b/docs/adr/DP-001_schema_v1.1.md index e8bf484..f5d865d 100644 --- a/docs/adr/DP-001_schema_v1.1.md +++ b/docs/adr/DP-001_schema_v1.1.md @@ -8,14 +8,14 @@ We’ve landed a basic SQLite-backed `files` table and a contentless FTS5 index. Before we build out higher-level features, we need to lock down our **v1.1** metadata schema for: -- **Hierarchical tags** (`tags` + `file_tags`) – optional `canonical_id` for aliases +- **Hierarchical tags** (`tags` + `file_tags`) – alias resolution handled at query time - **Custom attributes** (`attributes`) - **File-to-file relationships** (`links`) - **Named collections** (`collections` + `collection_files`) - **Views** (`views`) -Locking this schema now lets downstream CLI & GUI work against a stable model and ensures our migrations stay easy to reason about. -Tags optionally reference a canonical tag via the `canonical_id` column. +Locking this schema now lets downstream CLI & GUI work against a stable model and ensures our migrations stay easy to reason about. +Alias relationships are resolved outside the table itself; there is no `canonical_id` column. ## 2. Decision @@ -58,7 +58,6 @@ entity tags { -- name : TEXT parent_id : INTEGER <> - canonical_id : INTEGER <> } entity file_tags { @@ -151,6 +150,7 @@ Or in plain-ASCII: | **0003\_create\_links\_collections\_views.sql** | Add `links`, `collections`, `collection_files`, `views` | | **0004\_fix\_hierarchical\_tags\_fts.sql** | Recursive CTE for full tag-path indexing in FTS triggers | | **0005_add_dirty_table.sql** | Track modified files needing reindexing | +| **0006_drop_tags_canonical_id.sql** | Remove legacy `canonical_id` column from `tags` | ### Performance-Critical Indexes diff --git a/docs/roadmap.md b/docs/roadmap.md index 9556e73..a76b52b 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -46,7 +46,7 @@ | Tarpaulin coverage gate | S0 | — | – | | Watch mode (FS events) | Epic 1 | `marlin watch .` | DP‑002 | | Backup auto‑prune | Epic 1 | `backup --prune N` | – | -| Rename/move tracking | Epic 2 | automatic path update | Spec‑RMH | +| ~~Rename/move tracking~~ | Epic 2 | automatic path update | Spec‑RMH | | Dirty‑scan | Epic 2 | `scan --dirty` | DP‑002 | | Grep snippets | Phase 3 | `search -C3 …` | DP‑004 | | Hash / dedupe | Phase 4 | `scan --rehash` | DP‑005 | @@ -75,7 +75,7 @@ Before a milestone is declared “shipped”: | - | ------------------------------ | ------ | ------------- | | ~~1~~ | ~~Crate split + CI baseline~~ | @alice | ~~26 May 25~~ | | ~~2~~ | ~~Tarpaulin + Hyperfine jobs~~ | @bob | ~~26 May 25~~ | -| 3 | **DP‑001 Schema v1.1** draft | @carol | **30 May 25** | +| ~~3~~ | ~~DP‑001 Schema v1.1 draft~~ | @carol | ~~30 May 25~~ | | ~~4~~ | ~~backup prune CLI + nightly job~~ | @dave | ~~05 Jun 25~~ | > *This roadmap now contains both product-level “what” and engineering-level “how/when/prove it”. It should allow a new contributor to jump in, pick the matching DP, and know exactly the bar they must clear for their code to merge.* diff --git a/docs/vision.md b/docs/vision.md index 15de40f..bd5dbf6 100644 --- a/docs/vision.md +++ b/docs/vision.md @@ -8,7 +8,7 @@ | Feature Area | Capabilities | | ----------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| **Tagging System** | • Unlimited, hierarchical or flat tags.
• Alias/synonym support with precedence rules (admin‑defined canonical name).
• **Bulk tag editing** via multi‑select context menu.
• Folder‑to‑Tag import with optional *watch & sync* mode so new sub‑folders inherit tags automatically. | +| **Tagging System** | • Unlimited, hierarchical or flat tags.
• Alias/synonym support via admin‑defined mappings (canonical names resolved at query time).
• **Bulk tag editing** via multi‑select context menu.
• Folder‑to‑Tag import with optional *watch & sync* mode so new sub‑folders inherit tags automatically. | | **Custom Metadata Attributes** | • User‑defined fields (text, number, date, enum, boolean).
• Per‑template **Custom Metadata Schemas** (e.g. *Photo* → *Date, Location*). | | **File Relationships** | • Typed, directional or bidirectional links (*related to*, *duplicate of*, *cites*…).
• Plugin API can register new relationship sets. | | **Version Control for Metadata** | • Every change logged; unlimited roll‑back.
• Side‑by‑side diff viewer and *blame* panel showing *who/when/what*.
• Offline edits stored locally and merged (Git‑style optimistic merge with conflict prompts). | @@ -36,7 +36,7 @@ ```text files(id PK, path, inode, size, mtime, ctime, hash) -tags(id PK, name, parent_id, canonical_id) +tags(id PK, name, parent_id) file_tags(file_id FK, tag_id FK) attributes(id PK, file_id FK, key, value, value_type) relationships(id PK, src_file_id FK, dst_file_id FK, rel_type, direction) diff --git a/libmarlin/Cargo.toml b/libmarlin/Cargo.toml index af61a9b..a4ec06a 100644 --- a/libmarlin/Cargo.toml +++ b/libmarlin/Cargo.toml @@ -11,13 +11,13 @@ crossbeam-channel = "0.5" directories = "5" glob = "0.3" notify = "6.0" -priority-queue = "1.3" rusqlite = { version = "0.31", features = ["bundled", "backup"] } sha2 = "0.10" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } walkdir = "2.5" shlex = "1.3" +same-file = "1" shellexpand = "3.1" serde_json = { version = "1", optional = true } diff --git a/libmarlin/src/db/migrations/0007_fix_rename_trigger.sql b/libmarlin/src/db/migrations/0007_fix_rename_trigger.sql new file mode 100644 index 0000000..b1a772b --- /dev/null +++ b/libmarlin/src/db/migrations/0007_fix_rename_trigger.sql @@ -0,0 +1,20 @@ +-- src/db/migrations/0007_fix_rename_trigger.sql +PRAGMA foreign_keys = ON; +PRAGMA journal_mode = WAL; + +-- Recreate files_fts_au_file trigger using INSERT OR REPLACE +DROP TRIGGER IF EXISTS files_fts_au_file; +CREATE TRIGGER files_fts_au_file +AFTER UPDATE OF path ON files +BEGIN + INSERT OR REPLACE INTO files_fts(rowid, path, tags_text, attrs_text) + SELECT NEW.id, + NEW.path, + (SELECT IFNULL(GROUP_CONCAT(t.name, ' '), '') + FROM file_tags ft + JOIN tags t ON ft.tag_id = t.id + WHERE ft.file_id = NEW.id), + (SELECT IFNULL(GROUP_CONCAT(a.key || '=' || a.value, ' '), '') + FROM attributes a + WHERE a.file_id = NEW.id); +END; diff --git a/libmarlin/src/db/mod.rs b/libmarlin/src/db/mod.rs index 4ee4241..7edaf01 100644 --- a/libmarlin/src/db/mod.rs +++ b/libmarlin/src/db/mod.rs @@ -50,6 +50,10 @@ const MIGRATIONS: &[(&str, &str)] = &[ "0006_drop_tags_canonical_id.sql", include_str!("migrations/0006_drop_tags_canonical_id.sql"), ), + ( + "0007_fix_rename_trigger.sql", + include_str!("migrations/0007_fix_rename_trigger.sql"), + ), ]; /* ─── schema helpers ─────────────────────────────────────────────── */ @@ -387,6 +391,39 @@ pub fn take_dirty(conn: &Connection) -> Result> { Ok(ids) } +/* ─── rename helpers ────────────────────────────────────────────── */ + +pub fn update_file_path(conn: &Connection, old_path: &str, new_path: &str) -> Result<()> { + let file_id: i64 = conn.query_row("SELECT id FROM files WHERE path = ?1", [old_path], |r| { + r.get(0) + })?; + conn.execute( + "UPDATE files SET path = ?1 WHERE id = ?2", + params![new_path, file_id], + )?; + mark_dirty(conn, file_id)?; + Ok(()) +} + +pub fn rename_directory(conn: &mut Connection, old_dir: &str, new_dir: &str) -> Result<()> { + let like_pattern = format!("{}/%", old_dir.trim_end_matches('/')); + let ids = { + let mut stmt = conn.prepare("SELECT id FROM files WHERE path LIKE ?1")?; + let rows = stmt.query_map([&like_pattern], |r| r.get::<_, i64>(0))?; + rows.collect::, _>>()? + }; + let tx = conn.transaction()?; + tx.execute( + "UPDATE files SET path = REPLACE(path, ?1, ?2) WHERE path LIKE ?3", + params![old_dir, new_dir, like_pattern], + )?; + for fid in ids { + mark_dirty(&tx, fid)?; + } + tx.commit()?; + Ok(()) +} + /* ─── backup / restore helpers ────────────────────────────────────── */ pub fn backup>(db_path: P) -> Result { diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index ed9089b..cfcb2ca 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -1,15 +1,18 @@ -// libmarlin/src/watcher.rs - //! File system watcher implementation for Marlin //! -//! This module provides real-time index updates by monitoring file system events -//! (create, modify, delete) using the `notify` crate. It implements event debouncing, -//! batch processing, and a state machine for robust lifecycle management. +//! This module provides real-time index updates by monitoring file-system +//! events (create/modify/delete/rename) using the `notify` crate. It adds +//! event-debouncing, batch processing and a small state-machine so that the +//! watcher can be paused, resumed and shut down cleanly. -use crate::db::Database; -use anyhow::{Context, Result}; +use crate::db::{self, Database}; +use anyhow::{anyhow, Context, Result}; use crossbeam_channel::{bounded, Receiver}; -use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait}; +use notify::{ + event::{ModifyKind, RemoveKind, RenameMode}, + Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait, +}; +use same_file::Handle; use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -18,19 +21,12 @@ use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant}; use tracing::info; -/// Configuration for the file watcher +// ────── configuration ───────────────────────────────────────────────────────── #[derive(Debug, Clone)] pub struct WatcherConfig { - /// Time in milliseconds to debounce file events pub debounce_ms: u64, - - /// Maximum number of events to process in a single batch pub batch_size: usize, - - /// Maximum size of the event queue before applying backpressure pub max_queue_size: usize, - - /// Time in milliseconds to wait for events to drain during shutdown pub drain_timeout_ms: u64, } @@ -38,14 +34,14 @@ impl Default for WatcherConfig { fn default() -> Self { Self { debounce_ms: 100, - batch_size: 1000, + batch_size: 1_000, max_queue_size: 100_000, - drain_timeout_ms: 5000, + drain_timeout_ms: 5_000, } } } -/// State of the file watcher +// ────── public state/useful telemetry ──────────────────────────────────────── #[derive(Debug, Clone, PartialEq, Eq)] pub enum WatcherState { Initializing, @@ -55,7 +51,6 @@ pub enum WatcherState { Stopped, } -/// Status information about the file watcher #[derive(Debug, Clone)] pub struct WatcherStatus { pub state: WatcherState, @@ -65,6 +60,7 @@ pub struct WatcherStatus { pub watched_paths: Vec, } +// ────── internal bookkeeping ───────────────────────────────────────────────── #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] enum EventPriority { Create = 0, @@ -76,6 +72,8 @@ enum EventPriority { #[derive(Debug, Clone)] struct ProcessedEvent { path: PathBuf, + old_path: Option, + new_path: Option, kind: EventKind, priority: EventPriority, timestamp: Instant, @@ -87,6 +85,61 @@ struct EventDebouncer { last_flush: Instant, } +#[derive(Default)] +struct RemoveTracker { + map: HashMap, +} + +impl RemoveTracker { + fn record(&mut self, path: &PathBuf) { + if let Ok(h) = Handle::from_path(path) { + self.map.insert(h.ino(), (path.clone(), Instant::now())); + } else { + // fall back to hashing path if inode not available + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + path.hash(&mut hasher); + self.map + .insert(hasher.finish(), (path.clone(), Instant::now())); + } + } + + fn match_create(&mut self, path: &PathBuf, window: Duration) -> Option { + if let Ok(h) = Handle::from_path(path) { + if let Some((old, ts)) = self.map.remove(&h.ino()) { + if Instant::now().duration_since(ts) <= window { + return Some(old); + } else { + return None; + } + } + } + None + } + + fn flush_expired(&mut self, window: Duration, debouncer: &mut EventDebouncer) { + let now = Instant::now(); + let mut expired = Vec::new(); + for (ino, (path, ts)) in &self.map { + if now.duration_since(*ts) > window { + debouncer.add_event(ProcessedEvent { + path: path.clone(), + old_path: None, + new_path: None, + kind: EventKind::Remove(RemoveKind::Any), + priority: EventPriority::Delete, + timestamp: *ts, + }); + expired.push(*ino); + } + } + for ino in expired { + self.map.remove(&ino); + } + } +} + impl EventDebouncer { fn new(debounce_window_ms: u64) -> Self { Self { @@ -98,22 +151,32 @@ impl EventDebouncer { fn add_event(&mut self, event: ProcessedEvent) { let path = event.path.clone(); + + // If we receive an event for a directory, purge any queued events under it if path.is_dir() { - // This relies on the PathBuf itself knowing if it's a directory - // or on the underlying FS. For unit tests, ensure paths are created. self.events - .retain(|file_path, _| !file_path.starts_with(&path) || file_path == &path); + .retain(|p, _| !p.starts_with(&path) || p == &path); } - match self.events.get_mut(&path) { - Some(existing) => { + + use std::collections::hash_map::Entry; + + match self.events.entry(path) { + Entry::Occupied(mut o) => { + let existing = o.get_mut(); if event.priority < existing.priority { existing.priority = event.priority; } - existing.timestamp = event.timestamp; existing.kind = event.kind; + existing.timestamp = event.timestamp; + if let Some(old_p) = event.old_path { + existing.old_path = Some(old_p); + } + if let Some(new_p) = event.new_path { + existing.new_path = Some(new_p); + } } - None => { - self.events.insert(path, event); + Entry::Vacant(v) => { + v.insert(event); } } } @@ -123,10 +186,10 @@ impl EventDebouncer { } fn flush(&mut self) -> Vec { - let mut events: Vec = self.events.drain().map(|(_, e)| e).collect(); - events.sort_by_key(|e| e.priority); + let mut v: Vec<_> = self.events.drain().map(|(_, e)| e).collect(); + v.sort_by_key(|e| e.priority); self.last_flush = Instant::now(); - events + v } fn len(&self) -> usize { @@ -134,11 +197,414 @@ impl EventDebouncer { } } +// ────── main watcher struct ─────────────────────────────────────────────────── +pub struct FileWatcher { + state: Arc>, + _config: WatcherConfig, + watched_paths: Vec, + _event_receiver: Receiver>, + _watcher: RecommendedWatcher, + processor_thread: Option>, + stop_flag: Arc, + events_processed: Arc, + queue_size: Arc, + start_time: Instant, + db_shared: Arc>>>>, +} + +impl FileWatcher { + pub fn new(paths: Vec, config: WatcherConfig) -> Result { + // ── basic shared state/channels ─────────────────────────────────────── + let stop_flag = Arc::new(AtomicBool::new(false)); + let events_processed = Arc::new(AtomicUsize::new(0)); + let queue_size = Arc::new(AtomicUsize::new(0)); + let state = Arc::new(Mutex::new(WatcherState::Initializing)); + + let (tx, rx) = bounded(config.max_queue_size); + + // ── start actual OS watcher ─────────────────────────────────────────── + let event_tx = tx.clone(); + let mut actual_watcher = RecommendedWatcher::new( + move |ev| { + let _ = event_tx.send(ev); + }, + notify::Config::default(), + )?; + + for p in &paths { + actual_watcher + .watch(p, RecursiveMode::Recursive) + .with_context(|| format!("Failed to watch path {}", p.display()))?; + } + + // ── spawn processor thread ──────────────────────────────────────────── + let config_clone = config.clone(); + let stop_flag_clone = stop_flag.clone(); + let events_processed_clone = events_processed.clone(); + let queue_size_clone = queue_size.clone(); + let state_clone = state.clone(); + let receiver_clone = rx.clone(); + + let db_shared_for_thread: Arc>>>> = + Arc::new(Mutex::new(None)); + let db_for_thread = db_shared_for_thread.clone(); + + fn handle_db_update( + db_mutex: &Mutex, + old_s: &str, + new_s: &str, + is_dir: bool, + ) -> Result<()> { + let mut guard = db_mutex.lock().map_err(|_| anyhow!("db mutex poisoned"))?; + if is_dir { + db::rename_directory(guard.conn_mut(), old_s, new_s)?; + } else { + db::update_file_path(guard.conn_mut(), old_s, new_s)?; + } + Ok(()) + } + + let processor_thread = thread::spawn(move || { + let mut debouncer = EventDebouncer::new(config_clone.debounce_ms); + let mut rename_cache: HashMap = HashMap::new(); + let mut remove_tracker = RemoveTracker::default(); + + while !stop_flag_clone.load(Ordering::Relaxed) { + // honour current state + let cur_state = { + match state_clone.lock() { + Ok(g) => g.clone(), + Err(_) => break, + } + }; + + match cur_state { + WatcherState::Paused | WatcherState::Initializing => { + thread::sleep(Duration::from_millis(100)); + continue; + } + WatcherState::ShuttingDown | WatcherState::Stopped => break, + WatcherState::Watching => {} // normal path + } + + // ── drain events (bounded by batch_size) ───────────────────── + let mut processed_in_batch = 0; + while let Ok(evt_res) = receiver_clone.try_recv() { + processed_in_batch += 1; + match evt_res { + Ok(event) => { + let prio = match event.kind { + EventKind::Create(_) => EventPriority::Create, + EventKind::Remove(_) => EventPriority::Delete, + EventKind::Modify(_) => EventPriority::Modify, + EventKind::Access(_) => EventPriority::Access, + _ => EventPriority::Modify, + }; + + // ── per-event logic ─────────────────────────────── + match event.kind { + // 1. remove-then-create → rename heuristic using inode + EventKind::Remove(_) if event.paths.len() == 1 => { + remove_tracker.record(&event.paths[0]); + } + + EventKind::Create(_) if event.paths.len() == 1 => { + if let Some(old_p) = remove_tracker + .match_create(&event.paths[0], Duration::from_millis(500)) + { + let new_p = event.paths[0].clone(); + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + continue; + } + + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + + // 2. native rename events from notify + EventKind::Modify(ModifyKind::Name(name_kind)) => match name_kind { + // Notify >= 6 emits `Both` when both paths are + // supplied and `Any` as a catch-all for renames. + // Treat both cases as a complete rename. + RenameMode::Both | RenameMode::Any => { + if event.paths.len() >= 2 { + let old_p = event.paths[0].clone(); + let new_p = event.paths[1].clone(); + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + } + } + RenameMode::From => { + if let (Some(trk), Some(p)) = + (event.tracker(), event.paths.first()) + { + rename_cache.insert(trk, p.clone()); + } + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + RenameMode::To => { + if let (Some(trk), Some(new_p)) = + (event.tracker(), event.paths.first()) + { + if let Some(old_p) = rename_cache.remove(&trk) { + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p.clone()), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + continue; + } + } + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + // `From`/`To` are handled above. Any other + // value (`Other` or legacy `Rename`/`Move` + // variants) is treated as a normal modify + // event. + _ => { + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + }, + + // 3. everything else + _ => { + for p in event.paths { + debouncer.add_event(ProcessedEvent { + path: p, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + } // end match event.kind + } // <--- closes Ok(event) + Err(e) => eprintln!("watcher channel error: {:?}", e), + } + + if processed_in_batch >= config_clone.batch_size { + break; + } + } + + // deal with orphaned removes + remove_tracker.flush_expired(Duration::from_millis(500), &mut debouncer); + + queue_size_clone.store(debouncer.len(), Ordering::SeqCst); + + // flush if ready + if debouncer.is_ready_to_flush() && debouncer.len() > 0 { + let to_process = debouncer.flush(); + events_processed_clone.fetch_add(to_process.len(), Ordering::SeqCst); + + let maybe_db = db_for_thread.lock().ok().and_then(|g| g.clone()); + + for ev in &to_process { + if let Some(db_mutex) = &maybe_db { + // update DB for renames + if let EventKind::Modify(ModifyKind::Name(_)) = ev.kind { + if let (Some(old_p), Some(new_p)) = (&ev.old_path, &ev.new_path) { + let old_s = old_p.to_string_lossy(); + let new_s = new_p.to_string_lossy(); + let res = + handle_db_update(db_mutex, &old_s, &new_s, new_p.is_dir()); + if let Err(e) = res { + eprintln!("DB rename error: {:?}", e); + } + } + } + info!("processed (DB) {:?} {:?}", ev.kind, ev.path); + } else { + info!("processed {:?} {:?}", ev.kind, ev.path); + } + } + } + + thread::sleep(Duration::from_millis(50)); + } // main loop + + // final flush on shutdown + remove_tracker.flush_expired(Duration::from_millis(500), &mut debouncer); + if debouncer.len() > 0 { + let final_evts = debouncer.flush(); + events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst); + for ev in &final_evts { + info!("processing final event {:?} {:?}", ev.kind, ev.path); + } + } + + if let Ok(mut g) = state_clone.lock() { + *g = WatcherState::Stopped; + } + }); + + // ── return constructed watcher ─────────────────────────────────────── + Ok(Self { + state, + _config: config, + watched_paths: paths, + _event_receiver: rx, + _watcher: actual_watcher, + processor_thread: Some(processor_thread), + stop_flag, + events_processed, + queue_size, + start_time: Instant::now(), + db_shared: db_shared_for_thread, + }) + } + + // ── public API //////////////////////////////////////////////////////////// + pub fn with_database(&mut self, db: Arc>) -> Result<&mut Self> { + *self + .db_shared + .lock() + .map_err(|_| anyhow::anyhow!("db mutex poisoned"))? = Some(db); + Ok(self) + } + + pub fn start(&mut self) -> Result<()> { + let mut g = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?; + match *g { + WatcherState::Initializing | WatcherState::Paused => { + *g = WatcherState::Watching; + Ok(()) + } + WatcherState::Watching => Ok(()), // idempotent + _ => Err(anyhow::anyhow!("cannot start from {:?}", *g)), + } + } + + pub fn pause(&mut self) -> Result<()> { + let mut g = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?; + match *g { + WatcherState::Watching => { + *g = WatcherState::Paused; + Ok(()) + } + WatcherState::Paused => Ok(()), + _ => Err(anyhow::anyhow!("cannot pause from {:?}", *g)), + } + } + + pub fn resume(&mut self) -> Result<()> { + let mut g = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?; + match *g { + WatcherState::Paused => { + *g = WatcherState::Watching; + Ok(()) + } + WatcherState::Watching => Ok(()), + _ => Err(anyhow::anyhow!("cannot resume from {:?}", *g)), + } + } + + pub fn stop(&mut self) -> Result<()> { + { + let mut g = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?; + if matches!(*g, WatcherState::Stopped | WatcherState::ShuttingDown) { + return Ok(()); + } + *g = WatcherState::ShuttingDown; + } + + self.stop_flag.store(true, Ordering::SeqCst); + + if let Some(h) = self.processor_thread.take() { + let _ = h.join(); + } + + *self.state.lock().map_err(|_| anyhow::anyhow!("state"))? = WatcherState::Stopped; + Ok(()) + } + + pub fn status(&self) -> Result { + let st = self + .state + .lock() + .map_err(|_| anyhow::anyhow!("state"))? + .clone(); + Ok(WatcherStatus { + state: st, + events_processed: self.events_processed.load(Ordering::SeqCst), + queue_size: self.queue_size.load(Ordering::SeqCst), + start_time: Some(self.start_time), + watched_paths: self.watched_paths.clone(), + }) + } +} + +impl Drop for FileWatcher { + fn drop(&mut self) { + let _ = self.stop(); // ignore errors during drop + } +} + +// ────── tests ──────────────────────────────────────────────────────────────── #[cfg(test)] mod event_debouncer_tests { use super::*; use notify::event::{CreateKind, DataChange, ModifyKind, RemoveKind, RenameMode}; - use std::fs; // fs is needed for these tests to create dirs/files + use std::fs; use tempfile; #[test] @@ -151,6 +617,8 @@ mod event_debouncer_tests { let path1 = PathBuf::from("file1.txt"); debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -178,6 +646,8 @@ mod event_debouncer_tests { let t1 = Instant::now(); debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: t1, @@ -186,6 +656,8 @@ mod event_debouncer_tests { let t2 = Instant::now(); debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Modify(ModifyKind::Data(DataChange::Any)), priority: EventPriority::Modify, timestamp: t2, @@ -216,6 +688,8 @@ mod event_debouncer_tests { debouncer_h.add_event(ProcessedEvent { path: p_file.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -224,6 +698,8 @@ mod event_debouncer_tests { debouncer_h.add_event(ProcessedEvent { path: p_dir.clone(), + old_path: None, + new_path: None, kind: EventKind::Remove(RemoveKind::Folder), priority: EventPriority::Delete, timestamp: Instant::now(), @@ -248,12 +724,16 @@ mod event_debouncer_tests { debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: path2.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -273,18 +753,24 @@ mod event_debouncer_tests { debouncer.add_event(ProcessedEvent { path: path1, + old_path: None, + new_path: None, kind: EventKind::Modify(ModifyKind::Name(RenameMode::To)), priority: EventPriority::Modify, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: path2, + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: path3, + old_path: None, + new_path: None, kind: EventKind::Remove(RemoveKind::File), priority: EventPriority::Delete, timestamp: Instant::now(), @@ -316,12 +802,16 @@ mod event_debouncer_tests { debouncer.add_event(ProcessedEvent { path: dir.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::Folder), priority: EventPriority::Create, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: file, + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -333,306 +823,44 @@ mod event_debouncer_tests { assert_eq!(flushed.len(), 2); assert!(flushed.iter().any(|e| e.path == dir)); } -} -pub struct FileWatcher { - state: Arc>, - _config: WatcherConfig, - watched_paths: Vec, - _event_receiver: Receiver>, - _watcher: RecommendedWatcher, - processor_thread: Option>, - stop_flag: Arc, - events_processed: Arc, - queue_size: Arc, - start_time: Instant, - db_shared: Arc>>>>, -} + #[test] + fn remove_create_same_inode_produces_rename() { + let tmp = tempfile::tempdir().unwrap(); + let old_p = tmp.path().join("old.txt"); + std::fs::write(&old_p, b"hi").unwrap(); -impl FileWatcher { - pub fn new(paths: Vec, config: WatcherConfig) -> Result { - let stop_flag = Arc::new(AtomicBool::new(false)); - let events_processed = Arc::new(AtomicUsize::new(0)); - let queue_size = Arc::new(AtomicUsize::new(0)); - let state = Arc::new(Mutex::new(WatcherState::Initializing)); + let mut debouncer = EventDebouncer::new(100); + let mut tracker = RemoveTracker::default(); - let (tx, rx) = bounded(config.max_queue_size); + tracker.record(&old_p); - let event_tx = tx.clone(); - let mut actual_watcher = RecommendedWatcher::new( - move |event_res: std::result::Result| { - if event_tx.send(event_res).is_err() { - // Receiver dropped - } - }, - notify::Config::default(), - )?; + let new_p = tmp.path().join("new.txt"); + std::fs::rename(&old_p, &new_p).unwrap(); - for path_to_watch in &paths { - actual_watcher - .watch(path_to_watch, RecursiveMode::Recursive) - .with_context(|| format!("Failed to watch path: {}", path_to_watch.display()))?; + if let Some(orig) = tracker.match_create(&new_p, Duration::from_millis(500)) { + debouncer.add_event(ProcessedEvent { + path: orig.clone(), + old_path: Some(orig), + new_path: Some(new_p.clone()), + kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)), + priority: EventPriority::Modify, + timestamp: Instant::now(), + }); } - let config_clone = config.clone(); - let stop_flag_clone = stop_flag.clone(); - let events_processed_clone = events_processed.clone(); - let queue_size_clone = queue_size.clone(); - let state_clone = state.clone(); - let receiver_clone = rx.clone(); - - let db_shared_for_thread = Arc::new(Mutex::new(None::>>)); - let db_captured_for_thread = db_shared_for_thread.clone(); - - let processor_thread = thread::spawn(move || { - let mut debouncer = EventDebouncer::new(config_clone.debounce_ms); - - while !stop_flag_clone.load(Ordering::Relaxed) { - let current_state = match state_clone.lock() { - Ok(g) => g.clone(), - Err(_) => { - eprintln!("state mutex poisoned"); - break; - } - }; - - if current_state == WatcherState::Paused { - thread::sleep(Duration::from_millis(100)); - continue; - } - if current_state == WatcherState::ShuttingDown - || current_state == WatcherState::Stopped - { - break; - } - - let mut received_in_batch = 0; - while let Ok(evt_res) = receiver_clone.try_recv() { - received_in_batch += 1; - match evt_res { - Ok(event) => { - for path in event.paths { - let prio = match event.kind { - EventKind::Create(_) => EventPriority::Create, - EventKind::Remove(_) => EventPriority::Delete, - EventKind::Modify(_) => EventPriority::Modify, - EventKind::Access(_) => EventPriority::Access, - _ => EventPriority::Modify, - }; - debouncer.add_event(ProcessedEvent { - path, - kind: event.kind, - priority: prio, - timestamp: Instant::now(), - }); - } - } - Err(e) => { - eprintln!("Watcher channel error: {:?}", e); - } - } - if received_in_batch >= config_clone.batch_size { - break; - } - } - - queue_size_clone.store(debouncer.len(), Ordering::SeqCst); - - if debouncer.is_ready_to_flush() && debouncer.len() > 0 { - let evts_to_process = debouncer.flush(); - let num_evts = evts_to_process.len(); - events_processed_clone.fetch_add(num_evts, Ordering::SeqCst); - - let db_guard_option = match db_captured_for_thread.lock() { - Ok(g) => g, - Err(_) => { - eprintln!("db_shared mutex poisoned"); - break; - } - }; - if let Some(db_mutex) = &*db_guard_option { - if let Ok(mut _db_instance_guard) = db_mutex.lock() { - for event_item in &evts_to_process { - info!( - "Processing event (DB available): {:?} for path {:?}", - event_item.kind, event_item.path - ); - } - } else { - eprintln!("db mutex poisoned"); - } - } else { - for event_item in &evts_to_process { - info!( - "Processing event (no DB): {:?} for path {:?}", - event_item.kind, event_item.path - ); - } - } - } - thread::sleep(Duration::from_millis(50)); - } - - if debouncer.len() > 0 { - let final_evts = debouncer.flush(); - events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst); - for processed_event in final_evts { - info!( - "Processing final event: {:?} for path {:?}", - processed_event.kind, processed_event.path - ); - } - } - if let Ok(mut final_state_guard) = state_clone.lock() { - *final_state_guard = WatcherState::Stopped; - } else { - eprintln!("state mutex poisoned on shutdown"); - } - }); - - Ok(Self { - state, - _config: config, - watched_paths: paths, - _event_receiver: rx, - _watcher: actual_watcher, - processor_thread: Some(processor_thread), - stop_flag, - events_processed, - queue_size, - start_time: Instant::now(), - db_shared: db_shared_for_thread, - }) - } - - pub fn with_database(&mut self, db_arc: Arc>) -> Result<&mut Self> { - { - let mut shared_db_guard = self - .db_shared - .lock() - .map_err(|_| anyhow::anyhow!("db_shared mutex poisoned"))?; - *shared_db_guard = Some(db_arc); - } - Ok(self) - } - - pub fn start(&mut self) -> Result<()> { - let mut state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - if *state_guard == WatcherState::Watching || self.processor_thread.is_none() { - if self.processor_thread.is_none() { - return Err(anyhow::anyhow!("Watcher thread not available to start.")); - } - if *state_guard == WatcherState::Initializing { - *state_guard = WatcherState::Watching; - } - return Ok(()); - } - if *state_guard != WatcherState::Initializing - && *state_guard != WatcherState::Stopped - && *state_guard != WatcherState::Paused - { - return Err(anyhow::anyhow!(format!( - "Cannot start watcher from state {:?}", - *state_guard - ))); - } - - *state_guard = WatcherState::Watching; - Ok(()) - } - - pub fn pause(&mut self) -> Result<()> { - let mut state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - match *state_guard { - WatcherState::Watching => { - *state_guard = WatcherState::Paused; - Ok(()) - } - WatcherState::Paused => Ok(()), - _ => Err(anyhow::anyhow!(format!( - "Watcher not in watching state to pause (current: {:?})", - *state_guard - ))), - } - } - - pub fn resume(&mut self) -> Result<()> { - let mut state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - match *state_guard { - WatcherState::Paused => { - *state_guard = WatcherState::Watching; - Ok(()) - } - WatcherState::Watching => Ok(()), - _ => Err(anyhow::anyhow!(format!( - "Watcher not in paused state to resume (current: {:?})", - *state_guard - ))), - } - } - - pub fn stop(&mut self) -> Result<()> { - let mut current_state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - if *current_state_guard == WatcherState::Stopped - || *current_state_guard == WatcherState::ShuttingDown - { - return Ok(()); - } - *current_state_guard = WatcherState::ShuttingDown; - drop(current_state_guard); - - self.stop_flag.store(true, Ordering::SeqCst); - - if let Some(handle) = self.processor_thread.take() { - match handle.join() { - Ok(_) => { /* Thread joined cleanly */ } - Err(join_err) => { - eprintln!("Watcher processor thread panicked: {:?}", join_err); - } - } - } - - let mut final_state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?; - *final_state_guard = WatcherState::Stopped; - Ok(()) - } - - pub fn status(&self) -> Result { - let state_guard = self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state mutex poisoned"))? - .clone(); - Ok(WatcherStatus { - state: state_guard, - events_processed: self.events_processed.load(Ordering::SeqCst), - queue_size: self.queue_size.load(Ordering::SeqCst), - start_time: Some(self.start_time), - watched_paths: self.watched_paths.clone(), - }) - } -} - -impl Drop for FileWatcher { - fn drop(&mut self) { - if let Err(e) = self.stop() { - eprintln!("Error stopping watcher in Drop: {:?}", e); - } + tracker.flush_expired(Duration::from_millis(500), &mut debouncer); + let flushed = debouncer.flush(); + assert_eq!(flushed.len(), 1); + assert_eq!( + flushed[0].kind, + EventKind::Modify(ModifyKind::Name(RenameMode::Both)) + ); + assert_eq!( + flushed[0].old_path.as_ref().unwrap(), + &tmp.path().join("old.txt") + ); + assert_eq!(flushed[0].new_path.as_ref().unwrap(), &new_p); } } @@ -640,7 +868,7 @@ impl Drop for FileWatcher { mod file_watcher_state_tests { use super::*; use std::fs as FsMod; - use tempfile::tempdir; // Alias to avoid conflict with local `fs` module name if any + use tempfile::tempdir; #[test] fn test_watcher_pause_resume_stop() { @@ -649,7 +877,6 @@ mod file_watcher_state_tests { FsMod::create_dir_all(&watch_path).expect("Failed to create temp dir for watching"); let config = WatcherConfig::default(); - let mut watcher = FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher"); @@ -684,66 +911,58 @@ mod file_watcher_state_tests { let mut watcher = FileWatcher::new(vec![tmp_dir.path().to_path_buf()], WatcherConfig::default()).unwrap(); + // already watching { - let mut state_guard = watcher.state.lock().expect("state mutex poisoned"); - *state_guard = WatcherState::Watching; + let mut g = watcher.state.lock().unwrap(); + *g = WatcherState::Watching; } - assert!( - watcher.start().is_ok(), - "Should be able to call start when already Watching (idempotent state change)" - ); + assert!(watcher.start().is_ok()); assert_eq!(watcher.status().unwrap().state, WatcherState::Watching); + // invalid transition { - let mut state_guard = watcher.state.lock().expect("state mutex poisoned"); - *state_guard = WatcherState::ShuttingDown; + let mut g = watcher.state.lock().unwrap(); + *g = WatcherState::ShuttingDown; } - assert!( - watcher.start().is_err(), - "Should not be able to start from ShuttingDown" - ); + assert!(watcher.start().is_err()); } #[test] fn test_new_watcher_with_nonexistent_path() { - let non_existent_path = + let bogus = PathBuf::from("/path/that/REALLY/does/not/exist/for/sure/and/cannot/be/created"); - let config = WatcherConfig::default(); - let watcher_result = FileWatcher::new(vec![non_existent_path], config); - assert!(watcher_result.is_err()); - if let Err(e) = watcher_result { - let err_string = e.to_string(); + let res = FileWatcher::new(vec![bogus], WatcherConfig::default()); + assert!(res.is_err()); + if let Err(e) = res { + let msg = e.to_string(); assert!( - err_string.contains("Failed to watch path") || err_string.contains("os error 2"), - "Error was: {}", - err_string + msg.contains("Failed to watch path") || msg.contains("os error 2"), + "got: {msg}" ); } } #[test] fn test_watcher_default_config() { - let config = WatcherConfig::default(); - assert_eq!(config.debounce_ms, 100); - assert_eq!(config.batch_size, 1000); - assert_eq!(config.max_queue_size, 100_000); - assert_eq!(config.drain_timeout_ms, 5000); + let cfg = WatcherConfig::default(); + assert_eq!(cfg.debounce_ms, 100); + assert_eq!(cfg.batch_size, 1_000); + assert_eq!(cfg.max_queue_size, 100_000); + assert_eq!(cfg.drain_timeout_ms, 5_000); } #[test] fn test_poisoned_state_mutex_errors() { let tmp_dir = tempdir().unwrap(); let watch_path = tmp_dir.path().to_path_buf(); - FsMod::create_dir_all(&watch_path).expect("Failed to create temp dir for watching"); - - let config = WatcherConfig::default(); + FsMod::create_dir_all(&watch_path).unwrap(); let mut watcher = - FileWatcher::new(vec![watch_path], config).expect("Failed to create watcher"); + FileWatcher::new(vec![watch_path], WatcherConfig::default()).expect("create"); let state_arc = watcher.state.clone(); let _ = std::thread::spawn(move || { - let _guard = state_arc.lock().unwrap(); + let _g = state_arc.lock().unwrap(); panic!("poison"); }) .join(); diff --git a/libmarlin/src/watcher_tests.rs b/libmarlin/src/watcher_tests.rs index 53ceb82..c15c7eb 100644 --- a/libmarlin/src/watcher_tests.rs +++ b/libmarlin/src/watcher_tests.rs @@ -7,14 +7,46 @@ mod tests { // These are still from the watcher module use crate::db::open as open_marlin_db; use crate::watcher::{FileWatcher, WatcherConfig, WatcherState}; // Use your project's DB open function + use crate::Marlin; use std::fs::{self, File}; use std::io::Write; // No longer need: use std::path::PathBuf; use std::thread; - use std::time::Duration; + use std::time::{Duration, Instant}; use tempfile::tempdir; + /// Polls the DB until `query` returns `expected` or the timeout elapses. + fn wait_for_row_count( + marlin: &Marlin, + path: &std::path::Path, + expected: i64, + timeout: Duration, + ) { + let start = Instant::now(); + loop { + let count: i64 = marlin + .conn() + .query_row( + "SELECT COUNT(*) FROM files WHERE path = ?1", + [path.to_string_lossy()], + |r| r.get(0), + ) + .unwrap(); + if count == expected { + break; + } + if start.elapsed() > timeout { + panic!( + "Timed out waiting for {} rows for {}", + expected, + path.display() + ); + } + thread::sleep(Duration::from_millis(50)); + } + } + #[test] fn test_watcher_lifecycle() { // Create a temp directory for testing @@ -60,7 +92,7 @@ mod tests { thread::sleep(Duration::from_millis(200)); fs::remove_file(&new_file_path).expect("Failed to remove file"); - thread::sleep(Duration::from_millis(500)); + thread::sleep(Duration::from_millis(1500)); watcher.stop().expect("Failed to stop watcher"); assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped); @@ -144,4 +176,97 @@ mod tests { ); } } + + #[test] + fn rename_file_updates_db() { + let tmp = tempdir().unwrap(); + let dir = tmp.path(); + let file = dir.join("a.txt"); + fs::write(&file, b"hi").unwrap(); + let db_path = dir.join("test.db"); + let mut marlin = Marlin::open_at(&db_path).unwrap(); + marlin.scan(&[dir]).unwrap(); + + let mut watcher = marlin + .watch( + dir, + Some(WatcherConfig { + debounce_ms: 50, + ..Default::default() + }), + ) + .unwrap(); + + thread::sleep(Duration::from_millis(100)); + let new_file = dir.join("b.txt"); + fs::rename(&file, &new_file).unwrap(); + wait_for_row_count(&marlin, &new_file, 1, Duration::from_secs(10)); + watcher.stop().unwrap(); + assert!( + watcher.status().unwrap().events_processed > 0, + "rename event should be processed" + ); + + let count: i64 = marlin + .conn() + .query_row( + "SELECT COUNT(*) FROM files WHERE path = ?1", + [new_file.to_string_lossy()], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn rename_directory_updates_children() { + let tmp = tempdir().unwrap(); + let dir = tmp.path(); + let sub = dir.join("old"); + fs::create_dir(&sub).unwrap(); + let f1 = sub.join("one.txt"); + fs::write(&f1, b"1").unwrap(); + let f2 = sub.join("two.txt"); + fs::write(&f2, b"2").unwrap(); + + let db_path = dir.join("test2.db"); + let mut marlin = Marlin::open_at(&db_path).unwrap(); + marlin.scan(&[dir]).unwrap(); + + let mut watcher = marlin + .watch( + dir, + Some(WatcherConfig { + debounce_ms: 50, + ..Default::default() + }), + ) + .unwrap(); + + thread::sleep(Duration::from_millis(100)); + let new = dir.join("newdir"); + fs::rename(&sub, &new).unwrap(); + for fname in ["one.txt", "two.txt"] { + let p = new.join(fname); + wait_for_row_count(&marlin, &p, 1, Duration::from_secs(10)); + } + watcher.stop().unwrap(); + assert!( + watcher.status().unwrap().events_processed > 0, + "rename event should be processed" + ); + + for fname in ["one.txt", "two.txt"] { + let p = new.join(fname); + let cnt: i64 = marlin + .conn() + .query_row( + "SELECT COUNT(*) FROM files WHERE path = ?1", + [p.to_string_lossy()], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(cnt, 1, "{} missing", p.display()); + } + } }