From b696ec0fc36f7427bc7056182aa902a1e905c547 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Thu, 22 May 2025 15:16:15 -0400 Subject: [PATCH 01/14] Update docs for tag alias schema change --- docs/adr/DP-001_schema_v1.1.md | 8 ++++---- docs/roadmap.md | 2 +- docs/vision.md | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/adr/DP-001_schema_v1.1.md b/docs/adr/DP-001_schema_v1.1.md index e8bf484..f5d865d 100644 --- a/docs/adr/DP-001_schema_v1.1.md +++ b/docs/adr/DP-001_schema_v1.1.md @@ -8,14 +8,14 @@ We’ve landed a basic SQLite-backed `files` table and a contentless FTS5 index. Before we build out higher-level features, we need to lock down our **v1.1** metadata schema for: -- **Hierarchical tags** (`tags` + `file_tags`) – optional `canonical_id` for aliases +- **Hierarchical tags** (`tags` + `file_tags`) – alias resolution handled at query time - **Custom attributes** (`attributes`) - **File-to-file relationships** (`links`) - **Named collections** (`collections` + `collection_files`) - **Views** (`views`) -Locking this schema now lets downstream CLI & GUI work against a stable model and ensures our migrations stay easy to reason about. -Tags optionally reference a canonical tag via the `canonical_id` column. +Locking this schema now lets downstream CLI & GUI work against a stable model and ensures our migrations stay easy to reason about. +Alias relationships are resolved outside the table itself; there is no `canonical_id` column. ## 2. Decision @@ -58,7 +58,6 @@ entity tags { -- name : TEXT parent_id : INTEGER <> - canonical_id : INTEGER <> } entity file_tags { @@ -151,6 +150,7 @@ Or in plain-ASCII: | **0003\_create\_links\_collections\_views.sql** | Add `links`, `collections`, `collection_files`, `views` | | **0004\_fix\_hierarchical\_tags\_fts.sql** | Recursive CTE for full tag-path indexing in FTS triggers | | **0005_add_dirty_table.sql** | Track modified files needing reindexing | +| **0006_drop_tags_canonical_id.sql** | Remove legacy `canonical_id` column from `tags` | ### Performance-Critical Indexes diff --git a/docs/roadmap.md b/docs/roadmap.md index 9556e73..b493c72 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -75,7 +75,7 @@ Before a milestone is declared “shipped”: | - | ------------------------------ | ------ | ------------- | | ~~1~~ | ~~Crate split + CI baseline~~ | @alice | ~~26 May 25~~ | | ~~2~~ | ~~Tarpaulin + Hyperfine jobs~~ | @bob | ~~26 May 25~~ | -| 3 | **DP‑001 Schema v1.1** draft | @carol | **30 May 25** | +| ~~3~~ | ~~DP‑001 Schema v1.1 draft~~ | @carol | ~~30 May 25~~ | | ~~4~~ | ~~backup prune CLI + nightly job~~ | @dave | ~~05 Jun 25~~ | > *This roadmap now contains both product-level “what” and engineering-level “how/when/prove it”. It should allow a new contributor to jump in, pick the matching DP, and know exactly the bar they must clear for their code to merge.* diff --git a/docs/vision.md b/docs/vision.md index 15de40f..bd5dbf6 100644 --- a/docs/vision.md +++ b/docs/vision.md @@ -8,7 +8,7 @@ | Feature Area | Capabilities | | ----------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| **Tagging System** | • Unlimited, hierarchical or flat tags.
• Alias/synonym support with precedence rules (admin‑defined canonical name).
• **Bulk tag editing** via multi‑select context menu.
• Folder‑to‑Tag import with optional *watch & sync* mode so new sub‑folders inherit tags automatically. | +| **Tagging System** | • Unlimited, hierarchical or flat tags.
• Alias/synonym support via admin‑defined mappings (canonical names resolved at query time).
• **Bulk tag editing** via multi‑select context menu.
• Folder‑to‑Tag import with optional *watch & sync* mode so new sub‑folders inherit tags automatically. | | **Custom Metadata Attributes** | • User‑defined fields (text, number, date, enum, boolean).
• Per‑template **Custom Metadata Schemas** (e.g. *Photo* → *Date, Location*). | | **File Relationships** | • Typed, directional or bidirectional links (*related to*, *duplicate of*, *cites*…).
• Plugin API can register new relationship sets. | | **Version Control for Metadata** | • Every change logged; unlimited roll‑back.
• Side‑by‑side diff viewer and *blame* panel showing *who/when/what*.
• Offline edits stored locally and merged (Git‑style optimistic merge with conflict prompts). | @@ -36,7 +36,7 @@ ```text files(id PK, path, inode, size, mtime, ctime, hash) -tags(id PK, name, parent_id, canonical_id) +tags(id PK, name, parent_id) file_tags(file_id FK, tag_id FK) attributes(id PK, file_id FK, key, value, value_type) relationships(id PK, src_file_id FK, dst_file_id FK, rel_type, direction) From 23beef50455b45c5e187dcfc2e92b5438c15912e Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Thu, 22 May 2025 15:50:38 -0400 Subject: [PATCH 02/14] Handle rename events in watcher --- cli-bin/docs/cli_cheatsheet.md | 3 + docs/roadmap.md | 2 +- libmarlin/src/db/mod.rs | 33 +++++++++ libmarlin/src/watcher.rs | 132 ++++++++++++++++++++++++++++----- libmarlin/src/watcher_tests.rs | 83 +++++++++++++++++++++ 5 files changed, 235 insertions(+), 18 deletions(-) diff --git a/cli-bin/docs/cli_cheatsheet.md b/cli-bin/docs/cli_cheatsheet.md index 402fe7f..c259009 100644 --- a/cli-bin/docs/cli_cheatsheet.md +++ b/cli-bin/docs/cli_cheatsheet.md @@ -22,3 +22,6 @@ | `event add` | — | | `event timeline` | — | | `backup run` | --dir, --prune, --verify, --file | +| `watch start` | --debounce-ms | +| `watch status` | — | +| `watch stop` | — | diff --git a/docs/roadmap.md b/docs/roadmap.md index b493c72..a76b52b 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -46,7 +46,7 @@ | Tarpaulin coverage gate | S0 | — | – | | Watch mode (FS events) | Epic 1 | `marlin watch .` | DP‑002 | | Backup auto‑prune | Epic 1 | `backup --prune N` | – | -| Rename/move tracking | Epic 2 | automatic path update | Spec‑RMH | +| ~~Rename/move tracking~~ | Epic 2 | automatic path update | Spec‑RMH | | Dirty‑scan | Epic 2 | `scan --dirty` | DP‑002 | | Grep snippets | Phase 3 | `search -C3 …` | DP‑004 | | Hash / dedupe | Phase 4 | `scan --rehash` | DP‑005 | diff --git a/libmarlin/src/db/mod.rs b/libmarlin/src/db/mod.rs index 4ee4241..39a9996 100644 --- a/libmarlin/src/db/mod.rs +++ b/libmarlin/src/db/mod.rs @@ -387,6 +387,39 @@ pub fn take_dirty(conn: &Connection) -> Result> { Ok(ids) } +/* ─── rename helpers ────────────────────────────────────────────── */ + +pub fn update_file_path(conn: &Connection, old_path: &str, new_path: &str) -> Result<()> { + let file_id: i64 = conn.query_row("SELECT id FROM files WHERE path = ?1", [old_path], |r| { + r.get(0) + })?; + conn.execute( + "UPDATE files SET path = ?1 WHERE id = ?2", + params![new_path, file_id], + )?; + mark_dirty(conn, file_id)?; + Ok(()) +} + +pub fn rename_directory(conn: &mut Connection, old_dir: &str, new_dir: &str) -> Result<()> { + let like_pattern = format!("{}/%", old_dir.trim_end_matches('/')); + let ids = { + let mut stmt = conn.prepare("SELECT id FROM files WHERE path LIKE ?1")?; + let rows = stmt.query_map([&like_pattern], |r| r.get::<_, i64>(0))?; + rows.collect::, _>>()? + }; + let tx = conn.transaction()?; + tx.execute( + "UPDATE files SET path = REPLACE(path, ?1, ?2) WHERE path LIKE ?3", + params![old_dir, new_dir, like_pattern], + )?; + for fid in ids { + mark_dirty(&tx, fid)?; + } + tx.commit()?; + Ok(()) +} + /* ─── backup / restore helpers ────────────────────────────────────── */ pub fn backup>(db_path: P) -> Result { diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index ed9089b..73d7d23 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -6,10 +6,13 @@ //! (create, modify, delete) using the `notify` crate. It implements event debouncing, //! batch processing, and a state machine for robust lifecycle management. -use crate::db::Database; +use crate::db::{self, Database}; use anyhow::{Context, Result}; use crossbeam_channel::{bounded, Receiver}; -use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait}; +use notify::{ + event::ModifyKind, Event, EventKind, RecommendedWatcher, RecursiveMode, + Watcher as NotifyWatcherTrait, +}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -76,6 +79,8 @@ enum EventPriority { #[derive(Debug, Clone)] struct ProcessedEvent { path: PathBuf, + old_path: Option, + new_path: Option, kind: EventKind, priority: EventPriority, timestamp: Instant, @@ -151,6 +156,8 @@ mod event_debouncer_tests { let path1 = PathBuf::from("file1.txt"); debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -178,6 +185,8 @@ mod event_debouncer_tests { let t1 = Instant::now(); debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: t1, @@ -186,6 +195,8 @@ mod event_debouncer_tests { let t2 = Instant::now(); debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Modify(ModifyKind::Data(DataChange::Any)), priority: EventPriority::Modify, timestamp: t2, @@ -216,6 +227,8 @@ mod event_debouncer_tests { debouncer_h.add_event(ProcessedEvent { path: p_file.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -224,6 +237,8 @@ mod event_debouncer_tests { debouncer_h.add_event(ProcessedEvent { path: p_dir.clone(), + old_path: None, + new_path: None, kind: EventKind::Remove(RemoveKind::Folder), priority: EventPriority::Delete, timestamp: Instant::now(), @@ -248,12 +263,16 @@ mod event_debouncer_tests { debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: path2.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -273,18 +292,24 @@ mod event_debouncer_tests { debouncer.add_event(ProcessedEvent { path: path1, + old_path: None, + new_path: None, kind: EventKind::Modify(ModifyKind::Name(RenameMode::To)), priority: EventPriority::Modify, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: path2, + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: path3, + old_path: None, + new_path: None, kind: EventKind::Remove(RemoveKind::File), priority: EventPriority::Delete, timestamp: Instant::now(), @@ -316,12 +341,16 @@ mod event_debouncer_tests { debouncer.add_event(ProcessedEvent { path: dir.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::Folder), priority: EventPriority::Create, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: file, + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -411,20 +440,38 @@ impl FileWatcher { received_in_batch += 1; match evt_res { Ok(event) => { - for path in event.paths { - let prio = match event.kind { - EventKind::Create(_) => EventPriority::Create, - EventKind::Remove(_) => EventPriority::Delete, - EventKind::Modify(_) => EventPriority::Modify, - EventKind::Access(_) => EventPriority::Access, - _ => EventPriority::Modify, - }; + let prio = match event.kind { + EventKind::Create(_) => EventPriority::Create, + EventKind::Remove(_) => EventPriority::Delete, + EventKind::Modify(_) => EventPriority::Modify, + EventKind::Access(_) => EventPriority::Access, + _ => EventPriority::Modify, + }; + + if matches!(event.kind, EventKind::Modify(ModifyKind::Name(_))) + && event.paths.len() >= 2 + { + let old_path = event.paths[0].clone(); + let new_path = event.paths[1].clone(); debouncer.add_event(ProcessedEvent { - path, + path: old_path.clone(), + old_path: Some(old_path), + new_path: Some(new_path), kind: event.kind, priority: prio, timestamp: Instant::now(), }); + } else { + for path in event.paths { + debouncer.add_event(ProcessedEvent { + path, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } } } Err(e) => { @@ -451,8 +498,32 @@ impl FileWatcher { } }; if let Some(db_mutex) = &*db_guard_option { - if let Ok(mut _db_instance_guard) = db_mutex.lock() { + if let Ok(mut db_inst) = db_mutex.lock() { for event_item in &evts_to_process { + if let EventKind::Modify(ModifyKind::Name(_)) = event_item.kind { + if let (Some(ref old_p), Some(ref new_p)) = + (&event_item.old_path, &event_item.new_path) + { + let old_str = old_p.to_string_lossy(); + let new_str = new_p.to_string_lossy(); + let res = if new_p.is_dir() { + db::rename_directory( + db_inst.conn_mut(), + &old_str, + &new_str, + ) + } else { + db::update_file_path( + db_inst.conn_mut(), + &old_str, + &new_str, + ) + }; + if let Err(e) = res { + eprintln!("DB rename error: {:?}", e); + } + } + } info!( "Processing event (DB available): {:?} for path {:?}", event_item.kind, event_item.path @@ -476,11 +547,38 @@ impl FileWatcher { if debouncer.len() > 0 { let final_evts = debouncer.flush(); events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst); - for processed_event in final_evts { - info!( - "Processing final event: {:?} for path {:?}", - processed_event.kind, processed_event.path - ); + if let Some(db_mutex) = &*db_captured_for_thread.lock().unwrap() { + if let Ok(mut db_inst) = db_mutex.lock() { + for processed_event in &final_evts { + if let EventKind::Modify(ModifyKind::Name(_)) = processed_event.kind { + if let (Some(ref old_p), Some(ref new_p)) = + (&processed_event.old_path, &processed_event.new_path) + { + let old_str = old_p.to_string_lossy(); + let new_str = new_p.to_string_lossy(); + let res = if new_p.is_dir() { + db::rename_directory(db_inst.conn_mut(), &old_str, &new_str) + } else { + db::update_file_path(db_inst.conn_mut(), &old_str, &new_str) + }; + if let Err(e) = res { + eprintln!("DB rename error: {:?}", e); + } + } + } + info!( + "Processing final event: {:?} for path {:?}", + processed_event.kind, processed_event.path + ); + } + } + } else { + for processed_event in final_evts { + info!( + "Processing final event: {:?} for path {:?}", + processed_event.kind, processed_event.path + ); + } } } if let Ok(mut final_state_guard) = state_clone.lock() { diff --git a/libmarlin/src/watcher_tests.rs b/libmarlin/src/watcher_tests.rs index 53ceb82..445b84e 100644 --- a/libmarlin/src/watcher_tests.rs +++ b/libmarlin/src/watcher_tests.rs @@ -7,6 +7,7 @@ mod tests { // These are still from the watcher module use crate::db::open as open_marlin_db; use crate::watcher::{FileWatcher, WatcherConfig, WatcherState}; // Use your project's DB open function + use crate::Marlin; use std::fs::{self, File}; use std::io::Write; @@ -144,4 +145,86 @@ mod tests { ); } } + + #[test] + fn rename_file_updates_db() { + let tmp = tempdir().unwrap(); + let dir = tmp.path(); + let file = dir.join("a.txt"); + fs::write(&file, b"hi").unwrap(); + let db_path = dir.join("test.db"); + let mut marlin = Marlin::open_at(&db_path).unwrap(); + marlin.scan(&[dir]).unwrap(); + + let mut watcher = marlin + .watch( + dir, + Some(WatcherConfig { + debounce_ms: 50, + ..Default::default() + }), + ) + .unwrap(); + + thread::sleep(Duration::from_millis(100)); + let new_file = dir.join("b.txt"); + fs::rename(&file, &new_file).unwrap(); + thread::sleep(Duration::from_millis(200)); + watcher.stop().unwrap(); + + let count: i64 = marlin + .conn() + .query_row( + "SELECT COUNT(*) FROM files WHERE path = ?1", + [new_file.to_string_lossy()], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn rename_directory_updates_children() { + let tmp = tempdir().unwrap(); + let dir = tmp.path(); + let sub = dir.join("old"); + fs::create_dir(&sub).unwrap(); + let f1 = sub.join("one.txt"); + fs::write(&f1, b"1").unwrap(); + let f2 = sub.join("two.txt"); + fs::write(&f2, b"2").unwrap(); + + let db_path = dir.join("test2.db"); + let mut marlin = Marlin::open_at(&db_path).unwrap(); + marlin.scan(&[dir]).unwrap(); + + let mut watcher = marlin + .watch( + dir, + Some(WatcherConfig { + debounce_ms: 50, + ..Default::default() + }), + ) + .unwrap(); + + thread::sleep(Duration::from_millis(100)); + let new = dir.join("newdir"); + fs::rename(&sub, &new).unwrap(); + thread::sleep(Duration::from_millis(300)); + watcher.stop().unwrap(); + + for fname in ["one.txt", "two.txt"] { + let p = new.join(fname); + let cnt: i64 = marlin + .conn() + .query_row( + "SELECT COUNT(*) FROM files WHERE path = ?1", + [p.to_string_lossy()], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(cnt, 1, "{} missing", p.display()); + } + } } From 6b9d684b15f7558e879559d37bbcfa759ccc61e5 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Thu, 22 May 2025 16:41:50 -0400 Subject: [PATCH 03/14] Handle rename events --- cli-bin/docs/cli_cheatsheet.md | 3 + docs/roadmap.md | 2 +- libmarlin/src/db/mod.rs | 33 ++++ libmarlin/src/watcher.rs | 270 ++++++++++++++++++++++++++++++--- libmarlin/src/watcher_tests.rs | 83 ++++++++++ 5 files changed, 367 insertions(+), 24 deletions(-) diff --git a/cli-bin/docs/cli_cheatsheet.md b/cli-bin/docs/cli_cheatsheet.md index 402fe7f..c259009 100644 --- a/cli-bin/docs/cli_cheatsheet.md +++ b/cli-bin/docs/cli_cheatsheet.md @@ -22,3 +22,6 @@ | `event add` | — | | `event timeline` | — | | `backup run` | --dir, --prune, --verify, --file | +| `watch start` | --debounce-ms | +| `watch status` | — | +| `watch stop` | — | diff --git a/docs/roadmap.md b/docs/roadmap.md index b493c72..a76b52b 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -46,7 +46,7 @@ | Tarpaulin coverage gate | S0 | — | – | | Watch mode (FS events) | Epic 1 | `marlin watch .` | DP‑002 | | Backup auto‑prune | Epic 1 | `backup --prune N` | – | -| Rename/move tracking | Epic 2 | automatic path update | Spec‑RMH | +| ~~Rename/move tracking~~ | Epic 2 | automatic path update | Spec‑RMH | | Dirty‑scan | Epic 2 | `scan --dirty` | DP‑002 | | Grep snippets | Phase 3 | `search -C3 …` | DP‑004 | | Hash / dedupe | Phase 4 | `scan --rehash` | DP‑005 | diff --git a/libmarlin/src/db/mod.rs b/libmarlin/src/db/mod.rs index 4ee4241..39a9996 100644 --- a/libmarlin/src/db/mod.rs +++ b/libmarlin/src/db/mod.rs @@ -387,6 +387,39 @@ pub fn take_dirty(conn: &Connection) -> Result> { Ok(ids) } +/* ─── rename helpers ────────────────────────────────────────────── */ + +pub fn update_file_path(conn: &Connection, old_path: &str, new_path: &str) -> Result<()> { + let file_id: i64 = conn.query_row("SELECT id FROM files WHERE path = ?1", [old_path], |r| { + r.get(0) + })?; + conn.execute( + "UPDATE files SET path = ?1 WHERE id = ?2", + params![new_path, file_id], + )?; + mark_dirty(conn, file_id)?; + Ok(()) +} + +pub fn rename_directory(conn: &mut Connection, old_dir: &str, new_dir: &str) -> Result<()> { + let like_pattern = format!("{}/%", old_dir.trim_end_matches('/')); + let ids = { + let mut stmt = conn.prepare("SELECT id FROM files WHERE path LIKE ?1")?; + let rows = stmt.query_map([&like_pattern], |r| r.get::<_, i64>(0))?; + rows.collect::, _>>()? + }; + let tx = conn.transaction()?; + tx.execute( + "UPDATE files SET path = REPLACE(path, ?1, ?2) WHERE path LIKE ?3", + params![old_dir, new_dir, like_pattern], + )?; + for fid in ids { + mark_dirty(&tx, fid)?; + } + tx.commit()?; + Ok(()) +} + /* ─── backup / restore helpers ────────────────────────────────────── */ pub fn backup>(db_path: P) -> Result { diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index ed9089b..b50e1f3 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -6,10 +6,13 @@ //! (create, modify, delete) using the `notify` crate. It implements event debouncing, //! batch processing, and a state machine for robust lifecycle management. -use crate::db::Database; +use crate::db::{self, Database}; use anyhow::{Context, Result}; use crossbeam_channel::{bounded, Receiver}; -use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait}; +use notify::{ + event::{ModifyKind, RemoveKind, RenameMode}, + Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait, +}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -76,6 +79,8 @@ enum EventPriority { #[derive(Debug, Clone)] struct ProcessedEvent { path: PathBuf, + old_path: Option, + new_path: Option, kind: EventKind, priority: EventPriority, timestamp: Instant, @@ -151,6 +156,8 @@ mod event_debouncer_tests { let path1 = PathBuf::from("file1.txt"); debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -178,6 +185,8 @@ mod event_debouncer_tests { let t1 = Instant::now(); debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: t1, @@ -186,6 +195,8 @@ mod event_debouncer_tests { let t2 = Instant::now(); debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Modify(ModifyKind::Data(DataChange::Any)), priority: EventPriority::Modify, timestamp: t2, @@ -216,6 +227,8 @@ mod event_debouncer_tests { debouncer_h.add_event(ProcessedEvent { path: p_file.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -224,6 +237,8 @@ mod event_debouncer_tests { debouncer_h.add_event(ProcessedEvent { path: p_dir.clone(), + old_path: None, + new_path: None, kind: EventKind::Remove(RemoveKind::Folder), priority: EventPriority::Delete, timestamp: Instant::now(), @@ -248,12 +263,16 @@ mod event_debouncer_tests { debouncer.add_event(ProcessedEvent { path: path1.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: path2.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -273,18 +292,24 @@ mod event_debouncer_tests { debouncer.add_event(ProcessedEvent { path: path1, + old_path: None, + new_path: None, kind: EventKind::Modify(ModifyKind::Name(RenameMode::To)), priority: EventPriority::Modify, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: path2, + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: path3, + old_path: None, + new_path: None, kind: EventKind::Remove(RemoveKind::File), priority: EventPriority::Delete, timestamp: Instant::now(), @@ -316,12 +341,16 @@ mod event_debouncer_tests { debouncer.add_event(ProcessedEvent { path: dir.clone(), + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::Folder), priority: EventPriority::Create, timestamp: Instant::now(), }); debouncer.add_event(ProcessedEvent { path: file, + old_path: None, + new_path: None, kind: EventKind::Create(CreateKind::File), priority: EventPriority::Create, timestamp: Instant::now(), @@ -386,6 +415,8 @@ impl FileWatcher { let processor_thread = thread::spawn(move || { let mut debouncer = EventDebouncer::new(config_clone.debounce_ms); + let mut rename_cache: HashMap = HashMap::new(); + let mut pending_remove: Option<(PathBuf, Instant)> = None; while !stop_flag_clone.load(Ordering::Relaxed) { let current_state = match state_clone.lock() { @@ -396,7 +427,9 @@ impl FileWatcher { } }; - if current_state == WatcherState::Paused { + if current_state == WatcherState::Paused + || current_state == WatcherState::Initializing + { thread::sleep(Duration::from_millis(100)); continue; } @@ -411,20 +444,145 @@ impl FileWatcher { received_in_batch += 1; match evt_res { Ok(event) => { - for path in event.paths { - let prio = match event.kind { - EventKind::Create(_) => EventPriority::Create, - EventKind::Remove(_) => EventPriority::Delete, - EventKind::Modify(_) => EventPriority::Modify, - EventKind::Access(_) => EventPriority::Access, - _ => EventPriority::Modify, - }; - debouncer.add_event(ProcessedEvent { - path, - kind: event.kind, - priority: prio, - timestamp: Instant::now(), - }); + let prio = match event.kind { + EventKind::Create(_) => EventPriority::Create, + EventKind::Remove(_) => EventPriority::Delete, + EventKind::Modify(_) => EventPriority::Modify, + EventKind::Access(_) => EventPriority::Access, + _ => EventPriority::Modify, + }; + + match event.kind { + EventKind::Remove(_) if event.paths.len() == 1 => { + pending_remove = Some((event.paths[0].clone(), Instant::now())); + } + EventKind::Create(_) if event.paths.len() == 1 => { + if let Some((old_p, ts)) = pending_remove.take() { + if Instant::now().duration_since(ts) + <= Duration::from_millis(500) + { + let new_p = event.paths[0].clone(); + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + continue; + } else { + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: None, + new_path: None, + kind: EventKind::Remove(RemoveKind::Any), + priority: EventPriority::Delete, + timestamp: ts, + }); + } + } + for path in event.paths { + debouncer.add_event(ProcessedEvent { + path, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + EventKind::Modify(ModifyKind::Name(mode)) => match mode { + RenameMode::Both => { + if event.paths.len() >= 2 { + let old_path = event.paths[0].clone(); + let new_path = event.paths[1].clone(); + debouncer.add_event(ProcessedEvent { + path: old_path.clone(), + old_path: Some(old_path), + new_path: Some(new_path), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + } + } + RenameMode::From => { + if let (Some(trk), Some(p)) = + (event.tracker(), event.paths.first()) + { + rename_cache.insert(trk, p.clone()); + } + for path in event.paths { + debouncer.add_event(ProcessedEvent { + path, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + RenameMode::To => { + if let (Some(trk), Some(new_p)) = + (event.tracker(), event.paths.first()) + { + if let Some(old_p) = rename_cache.remove(&trk) { + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p.clone()), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + continue; + } + } + for path in event.paths { + debouncer.add_event(ProcessedEvent { + path, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + _ => { + for path in event.paths { + debouncer.add_event(ProcessedEvent { + path, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } + }, + _ => { + for path in event.paths { + debouncer.add_event(ProcessedEvent { + path, + old_path: None, + new_path: None, + kind: event.kind, + priority: prio, + timestamp: Instant::now(), + }); + } + } } } Err(e) => { @@ -436,6 +594,21 @@ impl FileWatcher { } } + if let Some((old_p, ts)) = pending_remove.take() { + if Instant::now().duration_since(ts) > Duration::from_millis(500) { + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: None, + new_path: None, + kind: EventKind::Remove(RemoveKind::Any), + priority: EventPriority::Delete, + timestamp: ts, + }); + } else { + pending_remove = Some((old_p, ts)); + } + } + queue_size_clone.store(debouncer.len(), Ordering::SeqCst); if debouncer.is_ready_to_flush() && debouncer.len() > 0 { @@ -451,8 +624,32 @@ impl FileWatcher { } }; if let Some(db_mutex) = &*db_guard_option { - if let Ok(mut _db_instance_guard) = db_mutex.lock() { + if let Ok(mut db_inst) = db_mutex.lock() { for event_item in &evts_to_process { + if let EventKind::Modify(ModifyKind::Name(_)) = event_item.kind { + if let (Some(ref old_p), Some(ref new_p)) = + (&event_item.old_path, &event_item.new_path) + { + let old_str = old_p.to_string_lossy(); + let new_str = new_p.to_string_lossy(); + let res = if new_p.is_dir() { + db::rename_directory( + db_inst.conn_mut(), + &old_str, + &new_str, + ) + } else { + db::update_file_path( + db_inst.conn_mut(), + &old_str, + &new_str, + ) + }; + if let Err(e) = res { + eprintln!("DB rename error: {:?}", e); + } + } + } info!( "Processing event (DB available): {:?} for path {:?}", event_item.kind, event_item.path @@ -476,11 +673,38 @@ impl FileWatcher { if debouncer.len() > 0 { let final_evts = debouncer.flush(); events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst); - for processed_event in final_evts { - info!( - "Processing final event: {:?} for path {:?}", - processed_event.kind, processed_event.path - ); + if let Some(db_mutex) = &*db_captured_for_thread.lock().unwrap() { + if let Ok(mut db_inst) = db_mutex.lock() { + for processed_event in &final_evts { + if let EventKind::Modify(ModifyKind::Name(_)) = processed_event.kind { + if let (Some(ref old_p), Some(ref new_p)) = + (&processed_event.old_path, &processed_event.new_path) + { + let old_str = old_p.to_string_lossy(); + let new_str = new_p.to_string_lossy(); + let res = if new_p.is_dir() { + db::rename_directory(db_inst.conn_mut(), &old_str, &new_str) + } else { + db::update_file_path(db_inst.conn_mut(), &old_str, &new_str) + }; + if let Err(e) = res { + eprintln!("DB rename error: {:?}", e); + } + } + } + info!( + "Processing final event: {:?} for path {:?}", + processed_event.kind, processed_event.path + ); + } + } + } else { + for processed_event in final_evts { + info!( + "Processing final event: {:?} for path {:?}", + processed_event.kind, processed_event.path + ); + } } } if let Ok(mut final_state_guard) = state_clone.lock() { diff --git a/libmarlin/src/watcher_tests.rs b/libmarlin/src/watcher_tests.rs index 53ceb82..445b84e 100644 --- a/libmarlin/src/watcher_tests.rs +++ b/libmarlin/src/watcher_tests.rs @@ -7,6 +7,7 @@ mod tests { // These are still from the watcher module use crate::db::open as open_marlin_db; use crate::watcher::{FileWatcher, WatcherConfig, WatcherState}; // Use your project's DB open function + use crate::Marlin; use std::fs::{self, File}; use std::io::Write; @@ -144,4 +145,86 @@ mod tests { ); } } + + #[test] + fn rename_file_updates_db() { + let tmp = tempdir().unwrap(); + let dir = tmp.path(); + let file = dir.join("a.txt"); + fs::write(&file, b"hi").unwrap(); + let db_path = dir.join("test.db"); + let mut marlin = Marlin::open_at(&db_path).unwrap(); + marlin.scan(&[dir]).unwrap(); + + let mut watcher = marlin + .watch( + dir, + Some(WatcherConfig { + debounce_ms: 50, + ..Default::default() + }), + ) + .unwrap(); + + thread::sleep(Duration::from_millis(100)); + let new_file = dir.join("b.txt"); + fs::rename(&file, &new_file).unwrap(); + thread::sleep(Duration::from_millis(200)); + watcher.stop().unwrap(); + + let count: i64 = marlin + .conn() + .query_row( + "SELECT COUNT(*) FROM files WHERE path = ?1", + [new_file.to_string_lossy()], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn rename_directory_updates_children() { + let tmp = tempdir().unwrap(); + let dir = tmp.path(); + let sub = dir.join("old"); + fs::create_dir(&sub).unwrap(); + let f1 = sub.join("one.txt"); + fs::write(&f1, b"1").unwrap(); + let f2 = sub.join("two.txt"); + fs::write(&f2, b"2").unwrap(); + + let db_path = dir.join("test2.db"); + let mut marlin = Marlin::open_at(&db_path).unwrap(); + marlin.scan(&[dir]).unwrap(); + + let mut watcher = marlin + .watch( + dir, + Some(WatcherConfig { + debounce_ms: 50, + ..Default::default() + }), + ) + .unwrap(); + + thread::sleep(Duration::from_millis(100)); + let new = dir.join("newdir"); + fs::rename(&sub, &new).unwrap(); + thread::sleep(Duration::from_millis(300)); + watcher.stop().unwrap(); + + for fname in ["one.txt", "two.txt"] { + let p = new.join(fname); + let cnt: i64 = marlin + .conn() + .query_row( + "SELECT COUNT(*) FROM files WHERE path = ?1", + [p.to_string_lossy()], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(cnt, 1, "{} missing", p.display()); + } + } } From 84d5e04ce771dd0b69fde82891ceb16be3ea37cf Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Thu, 22 May 2025 17:49:56 -0400 Subject: [PATCH 04/14] Format watcher module --- libmarlin/src/watcher.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index f95181a..276a688 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -520,15 +520,16 @@ impl FileWatcher { let _ = h.join(); } - *self - .state - .lock() - .map_err(|_| anyhow::anyhow!("state"))? = WatcherState::Stopped; + *self.state.lock().map_err(|_| anyhow::anyhow!("state"))? = WatcherState::Stopped; Ok(()) } pub fn status(&self) -> Result { - let st = self.state.lock().map_err(|_| anyhow::anyhow!("state"))?.clone(); + let st = self + .state + .lock() + .map_err(|_| anyhow::anyhow!("state"))? + .clone(); Ok(WatcherStatus { state: st, events_processed: self.events_processed.load(Ordering::SeqCst), From eb3aa169123b675da6f0455eee0a753f2a27b46e Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Thu, 22 May 2025 22:58:24 -0400 Subject: [PATCH 05/14] Handle Any rename events --- libmarlin/src/watcher.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index 276a688..6681cae 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -270,8 +270,11 @@ impl FileWatcher { } // 2. native rename events from notify - EventKind::Modify(ModifyKind::Name(mode)) => match mode { - RenameMode::Both => { + EventKind::Modify(ModifyKind::Name(name_kind)) => match name_kind { + // Notify >= 6 emits `Both` when both paths are + // supplied and `Any` as a catch-all for renames. + // Treat both cases as a complete rename. + RenameMode::Both | RenameMode::Any => { if event.paths.len() >= 2 { let old_p = event.paths[0].clone(); let new_p = event.paths[1].clone(); @@ -333,6 +336,10 @@ impl FileWatcher { }); } } + // `From`/`To` are handled above. Any other + // value (`Other` or legacy `Rename`/`Move` + // variants) is treated as a normal modify + // event. _ => { for p in event.paths { debouncer.add_event(ProcessedEvent { From 48b5ffd6fa2bdcbc44a7d94b1f4186f76c52ec22 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Thu, 22 May 2025 23:09:56 -0400 Subject: [PATCH 06/14] Wait for rename events --- libmarlin/src/watcher_tests.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/libmarlin/src/watcher_tests.rs b/libmarlin/src/watcher_tests.rs index 445b84e..221d689 100644 --- a/libmarlin/src/watcher_tests.rs +++ b/libmarlin/src/watcher_tests.rs @@ -61,7 +61,7 @@ mod tests { thread::sleep(Duration::from_millis(200)); fs::remove_file(&new_file_path).expect("Failed to remove file"); - thread::sleep(Duration::from_millis(500)); + thread::sleep(Duration::from_millis(1500)); watcher.stop().expect("Failed to stop watcher"); assert_eq!(watcher.status().unwrap().state, WatcherState::Stopped); @@ -169,8 +169,13 @@ mod tests { thread::sleep(Duration::from_millis(100)); let new_file = dir.join("b.txt"); fs::rename(&file, &new_file).unwrap(); - thread::sleep(Duration::from_millis(200)); + thread::sleep(Duration::from_millis(1500)); + assert!( + watcher.status().unwrap().events_processed > 0, + "rename event should be processed" + ); watcher.stop().unwrap(); + thread::sleep(Duration::from_millis(100)); let count: i64 = marlin .conn() @@ -211,8 +216,13 @@ mod tests { thread::sleep(Duration::from_millis(100)); let new = dir.join("newdir"); fs::rename(&sub, &new).unwrap(); - thread::sleep(Duration::from_millis(300)); + thread::sleep(Duration::from_millis(1500)); + assert!( + watcher.status().unwrap().events_processed > 0, + "rename event should be processed" + ); watcher.stop().unwrap(); + thread::sleep(Duration::from_millis(100)); for fname in ["one.txt", "two.txt"] { let p = new.join(fname); From 0a1eba0fa46657e1f5c5f12de297b8f6b421c322 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Fri, 23 May 2025 10:12:35 -0400 Subject: [PATCH 07/14] Handle remove-create renames --- libmarlin/Cargo.toml | 1 + libmarlin/src/watcher.rs | 160 ++++++++++++++++++++++++++++----------- 2 files changed, 117 insertions(+), 44 deletions(-) diff --git a/libmarlin/Cargo.toml b/libmarlin/Cargo.toml index af61a9b..9cc4d1b 100644 --- a/libmarlin/Cargo.toml +++ b/libmarlin/Cargo.toml @@ -18,6 +18,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } walkdir = "2.5" shlex = "1.3" +same-file = "1" shellexpand = "3.1" serde_json = { version = "1", optional = true } diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index 6681cae..0d2a94c 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -12,6 +12,7 @@ use notify::{ event::{ModifyKind, RemoveKind, RenameMode}, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait, }; +use same_file::Handle; use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -84,6 +85,61 @@ struct EventDebouncer { last_flush: Instant, } +#[derive(Default)] +struct RemoveTracker { + map: HashMap, +} + +impl RemoveTracker { + fn record(&mut self, path: &PathBuf) { + if let Ok(h) = Handle::from_path(path) { + self.map.insert(h.ino(), (path.clone(), Instant::now())); + } else { + // fall back to hashing path if inode not available + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + path.hash(&mut hasher); + self.map + .insert(hasher.finish(), (path.clone(), Instant::now())); + } + } + + fn match_create(&mut self, path: &PathBuf, window: Duration) -> Option { + if let Ok(h) = Handle::from_path(path) { + if let Some((old, ts)) = self.map.remove(&h.ino()) { + if Instant::now().duration_since(ts) <= window { + return Some(old); + } else { + return None; + } + } + } + None + } + + fn flush_expired(&mut self, window: Duration, debouncer: &mut EventDebouncer) { + let now = Instant::now(); + let mut expired = Vec::new(); + for (ino, (path, ts)) in &self.map { + if now.duration_since(*ts) > window { + debouncer.add_event(ProcessedEvent { + path: path.clone(), + old_path: None, + new_path: None, + kind: EventKind::Remove(RemoveKind::Any), + priority: EventPriority::Delete, + timestamp: *ts, + }); + expired.push(*ino); + } + } + for ino in expired { + self.map.remove(&ino); + } + } +} + impl EventDebouncer { fn new(debounce_window_ms: u64) -> Self { Self { @@ -187,7 +243,7 @@ impl FileWatcher { let processor_thread = thread::spawn(move || { let mut debouncer = EventDebouncer::new(config_clone.debounce_ms); let mut rename_cache: HashMap = HashMap::new(); - let mut pending_remove: Option<(PathBuf, Instant)> = None; + let mut remove_tracker = RemoveTracker::default(); while !stop_flag_clone.load(Ordering::Relaxed) { // honour current state @@ -223,38 +279,27 @@ impl FileWatcher { // ── per-event logic ─────────────────────────────── match event.kind { - // 1. remove-then-create → rename heuristic + // 1. remove-then-create → rename heuristic using inode EventKind::Remove(_) if event.paths.len() == 1 => { - pending_remove = Some((event.paths[0].clone(), Instant::now())); + remove_tracker.record(&event.paths[0]); } EventKind::Create(_) if event.paths.len() == 1 => { - if let Some((old_p, ts)) = pending_remove.take() { - if Instant::now().duration_since(ts) - <= Duration::from_millis(500) - { - let new_p = event.paths[0].clone(); - debouncer.add_event(ProcessedEvent { - path: old_p.clone(), - old_path: Some(old_p), - new_path: Some(new_p), - kind: EventKind::Modify(ModifyKind::Name( - RenameMode::Both, - )), - priority: prio, - timestamp: Instant::now(), - }); - continue; // handled as rename - } else { - debouncer.add_event(ProcessedEvent { - path: old_p.clone(), - old_path: None, - new_path: None, - kind: EventKind::Remove(RemoveKind::Any), - priority: EventPriority::Delete, - timestamp: ts, - }); - } + if let Some(old_p) = remove_tracker + .match_create(&event.paths[0], Duration::from_millis(500)) + { + let new_p = event.paths[0].clone(); + debouncer.add_event(ProcessedEvent { + path: old_p.clone(), + old_path: Some(old_p), + new_path: Some(new_p), + kind: EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + )), + priority: prio, + timestamp: Instant::now(), + }); + continue; } for p in event.paths { @@ -377,21 +422,8 @@ impl FileWatcher { } } - // deal with orphaned remove - if let Some((old_p, ts)) = pending_remove.take() { - if Instant::now().duration_since(ts) > Duration::from_millis(500) { - debouncer.add_event(ProcessedEvent { - path: old_p.clone(), - old_path: None, - new_path: None, - kind: EventKind::Remove(RemoveKind::Any), - priority: EventPriority::Delete, - timestamp: ts, - }); - } else { - pending_remove = Some((old_p, ts)); - } - } + // deal with orphaned removes + remove_tracker.flush_expired(Duration::from_millis(500), &mut debouncer); queue_size_clone.store(debouncer.len(), Ordering::SeqCst); @@ -438,6 +470,7 @@ impl FileWatcher { } // main loop // final flush on shutdown + remove_tracker.flush_expired(Duration::from_millis(500), &mut debouncer); if debouncer.len() > 0 { let final_evts = debouncer.flush(); events_processed_clone.fetch_add(final_evts.len(), Ordering::SeqCst); @@ -777,6 +810,45 @@ mod event_debouncer_tests { assert_eq!(flushed.len(), 2); assert!(flushed.iter().any(|e| e.path == dir)); } + + #[test] + fn remove_create_same_inode_produces_rename() { + let tmp = tempfile::tempdir().unwrap(); + let old_p = tmp.path().join("old.txt"); + std::fs::write(&old_p, b"hi").unwrap(); + + let mut debouncer = EventDebouncer::new(100); + let mut tracker = RemoveTracker::default(); + + tracker.record(&old_p); + + let new_p = tmp.path().join("new.txt"); + std::fs::rename(&old_p, &new_p).unwrap(); + + if let Some(orig) = tracker.match_create(&new_p, Duration::from_millis(500)) { + debouncer.add_event(ProcessedEvent { + path: orig.clone(), + old_path: Some(orig), + new_path: Some(new_p.clone()), + kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)), + priority: EventPriority::Modify, + timestamp: Instant::now(), + }); + } + + tracker.flush_expired(Duration::from_millis(500), &mut debouncer); + let flushed = debouncer.flush(); + assert_eq!(flushed.len(), 1); + assert_eq!( + flushed[0].kind, + EventKind::Modify(ModifyKind::Name(RenameMode::Both)) + ); + assert_eq!( + flushed[0].old_path.as_ref().unwrap(), + &tmp.path().join("old.txt") + ); + assert_eq!(flushed[0].new_path.as_ref().unwrap(), &new_p); + } } #[cfg(test)] From 84afb73bbd57685043a2bcd988e675d783fd89ab Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Fri, 23 May 2025 10:13:04 -0400 Subject: [PATCH 08/14] Add DB polling helper and update watcher rename tests --- libmarlin/src/watcher_tests.rs | 46 ++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/libmarlin/src/watcher_tests.rs b/libmarlin/src/watcher_tests.rs index 221d689..c15c7eb 100644 --- a/libmarlin/src/watcher_tests.rs +++ b/libmarlin/src/watcher_tests.rs @@ -13,9 +13,40 @@ mod tests { use std::io::Write; // No longer need: use std::path::PathBuf; use std::thread; - use std::time::Duration; + use std::time::{Duration, Instant}; use tempfile::tempdir; + /// Polls the DB until `query` returns `expected` or the timeout elapses. + fn wait_for_row_count( + marlin: &Marlin, + path: &std::path::Path, + expected: i64, + timeout: Duration, + ) { + let start = Instant::now(); + loop { + let count: i64 = marlin + .conn() + .query_row( + "SELECT COUNT(*) FROM files WHERE path = ?1", + [path.to_string_lossy()], + |r| r.get(0), + ) + .unwrap(); + if count == expected { + break; + } + if start.elapsed() > timeout { + panic!( + "Timed out waiting for {} rows for {}", + expected, + path.display() + ); + } + thread::sleep(Duration::from_millis(50)); + } + } + #[test] fn test_watcher_lifecycle() { // Create a temp directory for testing @@ -169,13 +200,12 @@ mod tests { thread::sleep(Duration::from_millis(100)); let new_file = dir.join("b.txt"); fs::rename(&file, &new_file).unwrap(); - thread::sleep(Duration::from_millis(1500)); + wait_for_row_count(&marlin, &new_file, 1, Duration::from_secs(10)); + watcher.stop().unwrap(); assert!( watcher.status().unwrap().events_processed > 0, "rename event should be processed" ); - watcher.stop().unwrap(); - thread::sleep(Duration::from_millis(100)); let count: i64 = marlin .conn() @@ -216,13 +246,15 @@ mod tests { thread::sleep(Duration::from_millis(100)); let new = dir.join("newdir"); fs::rename(&sub, &new).unwrap(); - thread::sleep(Duration::from_millis(1500)); + for fname in ["one.txt", "two.txt"] { + let p = new.join(fname); + wait_for_row_count(&marlin, &p, 1, Duration::from_secs(10)); + } + watcher.stop().unwrap(); assert!( watcher.status().unwrap().events_processed > 0, "rename event should be processed" ); - watcher.stop().unwrap(); - thread::sleep(Duration::from_millis(100)); for fname in ["one.txt", "two.txt"] { let p = new.join(fname); From 47d94358654018dc4969b15beedc3686bd3dffc7 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Fri, 23 May 2025 10:13:53 -0400 Subject: [PATCH 09/14] Handle DB mutex poisoning --- libmarlin/src/watcher.rs | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index 6681cae..27ba91c 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -6,7 +6,7 @@ //! watcher can be paused, resumed and shut down cleanly. use crate::db::{self, Database}; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use crossbeam_channel::{bounded, Receiver}; use notify::{ event::{ModifyKind, RemoveKind, RenameMode}, @@ -184,6 +184,21 @@ impl FileWatcher { Arc::new(Mutex::new(None)); let db_for_thread = db_shared_for_thread.clone(); + fn handle_db_update( + db_mutex: &Mutex, + old_s: &str, + new_s: &str, + is_dir: bool, + ) -> Result<()> { + let mut guard = db_mutex.lock().map_err(|_| anyhow!("db mutex poisoned"))?; + if is_dir { + db::rename_directory(guard.conn_mut(), old_s, new_s)?; + } else { + db::update_file_path(guard.conn_mut(), old_s, new_s)?; + } + Ok(()) + } + let processor_thread = thread::spawn(move || { let mut debouncer = EventDebouncer::new(config_clone.debounce_ms); let mut rename_cache: HashMap = HashMap::new(); @@ -409,19 +424,8 @@ impl FileWatcher { if let (Some(old_p), Some(new_p)) = (&ev.old_path, &ev.new_path) { let old_s = old_p.to_string_lossy(); let new_s = new_p.to_string_lossy(); - let res = if new_p.is_dir() { - db::rename_directory( - db_mutex.lock().unwrap().conn_mut(), - &old_s, - &new_s, - ) - } else { - db::update_file_path( - db_mutex.lock().unwrap().conn_mut(), - &old_s, - &new_s, - ) - }; + let res = + handle_db_update(db_mutex, &old_s, &new_s, new_p.is_dir()); if let Err(e) = res { eprintln!("DB rename error: {:?}", e); } From 6316d288738ce27691726712c78c8ffeac273d63 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Fri, 23 May 2025 10:14:19 -0400 Subject: [PATCH 10/14] Remove unused priority-queue --- Cargo.lock | 29 +---------------------------- libmarlin/Cargo.toml | 1 - 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fcb1a12..d7da07c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -470,12 +470,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.14.5" @@ -530,16 +524,6 @@ dependencies = [ "cc", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - [[package]] name = "indexmap" version = "2.9.0" @@ -635,7 +619,6 @@ dependencies = [ "glob", "lazy_static", "notify", - "priority-queue", "rusqlite", "serde_json", "sha2", @@ -874,16 +857,6 @@ dependencies = [ "termtree", ] -[[package]] -name = "priority-queue" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bda9164fe05bc9225752d54aae413343c36f684380005398a6a8fde95fe785" -dependencies = [ - "autocfg", - "indexmap 1.9.3", -] - [[package]] name = "proc-macro2" version = "1.0.95" @@ -1069,7 +1042,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.9.0", + "indexmap", "itoa", "ryu", "serde", diff --git a/libmarlin/Cargo.toml b/libmarlin/Cargo.toml index af61a9b..d35c54a 100644 --- a/libmarlin/Cargo.toml +++ b/libmarlin/Cargo.toml @@ -11,7 +11,6 @@ crossbeam-channel = "0.5" directories = "5" glob = "0.3" notify = "6.0" -priority-queue = "1.3" rusqlite = { version = "0.31", features = ["bundled", "backup"] } sha2 = "0.10" tracing = "0.1" From f7d9758014a66dac974c5b59d99ee9043564c97d Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Sat, 24 May 2025 19:03:08 -0400 Subject: [PATCH 11/14] chore: regenerate Cargo.lock after dep changes --- Cargo.lock | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7da07c..6cf492d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,12 +79,12 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "3.0.7" +version = "3.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" +checksum = "6680de5231bd6ee4c6191b8a1325daa282b415391ec9d3a37bd34f2060dc73fa" dependencies = [ "anstyle", - "once_cell", + "once_cell_polyfill", "windows-sys 0.59.0", ] @@ -156,9 +156,9 @@ checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" [[package]] name = "cc" -version = "1.2.23" +version = "1.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4ac86a9e5bc1e2b3449ab9d7d3a6a405e3d1bb28d7b9be8614f55846ae3766" +checksum = "16595d3be041c03b09d08d0858631facccee9221e579704070e6e9e4915d3bc7" dependencies = [ "shlex", ] @@ -620,6 +620,7 @@ dependencies = [ "lazy_static", "notify", "rusqlite", + "same-file", "serde_json", "sha2", "shellexpand", @@ -803,6 +804,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" + [[package]] name = "option-ext" version = "0.2.0" @@ -985,9 +992,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.20" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" +checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" [[package]] name = "ryu" From 354d8a7fbdcac4b4b89a0573026e67e11c725c61 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Sat, 24 May 2025 19:05:21 -0400 Subject: [PATCH 12/14] Remove Cargo.lock from .gitignore to allow version tracking of dependencies --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index 801fffd..1492465 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ # === Rust build artifacts === /target/ -/Cargo.lock # === IDE & Editor settings === From 45ab1f4cc6dd6c112b690789bbf6a9dd8abc75d0 Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Sat, 24 May 2025 19:38:49 -0400 Subject: [PATCH 13/14] Add trigger fix for rename handling --- .../db/migrations/0007_fix_rename_trigger.sql | 20 +++++++++++++++++++ libmarlin/src/db/mod.rs | 4 ++++ 2 files changed, 24 insertions(+) create mode 100644 libmarlin/src/db/migrations/0007_fix_rename_trigger.sql diff --git a/libmarlin/src/db/migrations/0007_fix_rename_trigger.sql b/libmarlin/src/db/migrations/0007_fix_rename_trigger.sql new file mode 100644 index 0000000..b1a772b --- /dev/null +++ b/libmarlin/src/db/migrations/0007_fix_rename_trigger.sql @@ -0,0 +1,20 @@ +-- src/db/migrations/0007_fix_rename_trigger.sql +PRAGMA foreign_keys = ON; +PRAGMA journal_mode = WAL; + +-- Recreate files_fts_au_file trigger using INSERT OR REPLACE +DROP TRIGGER IF EXISTS files_fts_au_file; +CREATE TRIGGER files_fts_au_file +AFTER UPDATE OF path ON files +BEGIN + INSERT OR REPLACE INTO files_fts(rowid, path, tags_text, attrs_text) + SELECT NEW.id, + NEW.path, + (SELECT IFNULL(GROUP_CONCAT(t.name, ' '), '') + FROM file_tags ft + JOIN tags t ON ft.tag_id = t.id + WHERE ft.file_id = NEW.id), + (SELECT IFNULL(GROUP_CONCAT(a.key || '=' || a.value, ' '), '') + FROM attributes a + WHERE a.file_id = NEW.id); +END; diff --git a/libmarlin/src/db/mod.rs b/libmarlin/src/db/mod.rs index 39a9996..7edaf01 100644 --- a/libmarlin/src/db/mod.rs +++ b/libmarlin/src/db/mod.rs @@ -50,6 +50,10 @@ const MIGRATIONS: &[(&str, &str)] = &[ "0006_drop_tags_canonical_id.sql", include_str!("migrations/0006_drop_tags_canonical_id.sql"), ), + ( + "0007_fix_rename_trigger.sql", + include_str!("migrations/0007_fix_rename_trigger.sql"), + ), ]; /* ─── schema helpers ─────────────────────────────────────────────── */ From 5b7e20e5f470e2ec522ad6f582d1a21f676afd3f Mon Sep 17 00:00:00 2001 From: thePR0M3TH3AN <53631862+PR0M3TH3AN@users.noreply.github.com> Date: Sat, 24 May 2025 20:39:35 -0400 Subject: [PATCH 14/14] Fix rename debouncer to preserve paths --- libmarlin/src/watcher.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/libmarlin/src/watcher.rs b/libmarlin/src/watcher.rs index 8d21aad..cfcb2ca 100644 --- a/libmarlin/src/watcher.rs +++ b/libmarlin/src/watcher.rs @@ -158,16 +158,25 @@ impl EventDebouncer { .retain(|p, _| !p.starts_with(&path) || p == &path); } - match self.events.get_mut(&path) { - Some(existing) => { + use std::collections::hash_map::Entry; + + match self.events.entry(path) { + Entry::Occupied(mut o) => { + let existing = o.get_mut(); if event.priority < existing.priority { existing.priority = event.priority; } existing.kind = event.kind; existing.timestamp = event.timestamp; + if let Some(old_p) = event.old_path { + existing.old_path = Some(old_p); + } + if let Some(new_p) = event.new_path { + existing.new_path = Some(new_p); + } } - None => { - self.events.insert(path, event); + Entry::Vacant(v) => { + v.insert(event); } } }