Merge pull request #166 from PR0M3TH3AN/codex/add-migration-v1-to-v2-and-update-tests

Add migration to schema v2 and adjust tests
This commit is contained in:
thePR0M3TH3AN
2025-07-02 22:17:41 -04:00
committed by GitHub
10 changed files with 70 additions and 37 deletions

View File

@@ -291,7 +291,7 @@ class EncryptionManager:
"yellow", "yellow",
) )
) )
return {"passwords": {}} return {"entries": {}}
try: try:
decrypted_data = self.decrypt_file(relative_path) decrypted_data = self.decrypt_file(relative_path)

View File

@@ -62,12 +62,12 @@ class EntryManager:
return data return data
except Exception as e: except Exception as e:
logger.error(f"Failed to load index: {e}") logger.error(f"Failed to load index: {e}")
return {"schema_version": LATEST_VERSION, "passwords": {}} return {"schema_version": LATEST_VERSION, "entries": {}}
else: else:
logger.info( logger.info(
f"Index file '{self.index_file}' not found. Initializing new password database." f"Index file '{self.index_file}' not found. Initializing new password database."
) )
return {"schema_version": LATEST_VERSION, "passwords": {}} return {"schema_version": LATEST_VERSION, "entries": {}}
def _save_index(self, data: Dict[str, Any]) -> None: def _save_index(self, data: Dict[str, Any]) -> None:
try: try:
@@ -85,8 +85,8 @@ class EntryManager:
""" """
try: try:
data = self.vault.load_index() data = self.vault.load_index()
if "passwords" in data and isinstance(data["passwords"], dict): if "entries" in data and isinstance(data["entries"], dict):
indices = [int(idx) for idx in data["passwords"].keys()] indices = [int(idx) for idx in data["entries"].keys()]
next_index = max(indices) + 1 if indices else 0 next_index = max(indices) + 1 if indices else 0
else: else:
next_index = 0 next_index = 0
@@ -119,7 +119,8 @@ class EntryManager:
index = self.get_next_index() index = self.get_next_index()
data = self.vault.load_index() data = self.vault.load_index()
data["passwords"][str(index)] = { data.setdefault("entries", {})
data["entries"][str(index)] = {
"website": website_name, "website": website_name,
"length": length, "length": length,
"username": username if username else "", "username": username if username else "",
@@ -127,9 +128,7 @@ class EntryManager:
"blacklisted": blacklisted, "blacklisted": blacklisted,
} }
logger.debug( logger.debug(f"Added entry at index {index}: {data['entries'][str(index)]}")
f"Added entry at index {index}: {data['passwords'][str(index)]}"
)
self._save_index(data) self._save_index(data)
self.update_checksum() self.update_checksum()
@@ -169,7 +168,7 @@ class EntryManager:
""" """
try: try:
data = self.vault.load_index() data = self.vault.load_index()
entry = data.get("passwords", {}).get(str(index)) entry = data.get("entries", {}).get(str(index))
if entry: if entry:
logger.debug(f"Retrieved entry at index {index}: {entry}") logger.debug(f"Retrieved entry at index {index}: {entry}")
@@ -205,7 +204,7 @@ class EntryManager:
""" """
try: try:
data = self.vault.load_index() data = self.vault.load_index()
entry = data.get("passwords", {}).get(str(index)) entry = data.get("entries", {}).get(str(index))
if not entry: if not entry:
logger.warning( logger.warning(
@@ -233,7 +232,7 @@ class EntryManager:
f"Updated blacklist status to '{blacklisted}' for index {index}." f"Updated blacklist status to '{blacklisted}' for index {index}."
) )
data["passwords"][str(index)] = entry data["entries"][str(index)] = entry
logger.debug(f"Modified entry at index {index}: {entry}") logger.debug(f"Modified entry at index {index}: {entry}")
self._save_index(data) self._save_index(data)
@@ -259,15 +258,15 @@ class EntryManager:
""" """
try: try:
data = self.vault.load_index() data = self.vault.load_index()
passwords = data.get("passwords", {}) entries_data = data.get("entries", {})
if not passwords: if not entries_data:
logger.info("No password entries found.") logger.info("No password entries found.")
print(colored("No password entries found.", "yellow")) print(colored("No password entries found.", "yellow"))
return [] return []
entries = [] entries = []
for idx, entry in sorted(passwords.items(), key=lambda x: int(x[0])): for idx, entry in sorted(entries_data.items(), key=lambda x: int(x[0])):
entries.append( entries.append(
( (
int(idx), int(idx),
@@ -302,8 +301,8 @@ class EntryManager:
""" """
try: try:
data = self.vault.load_index() data = self.vault.load_index()
if "passwords" in data and str(index) in data["passwords"]: if "entries" in data and str(index) in data["entries"]:
del data["passwords"][str(index)] del data["entries"][str(index)]
logger.debug(f"Deleted entry at index {index}.") logger.debug(f"Deleted entry at index {index}.")
self.vault.save_index(data) self.vault.save_index(data)
self.update_checksum() self.update_checksum()

View File

@@ -26,7 +26,20 @@ def _v0_to_v1(data: dict) -> dict:
return data return data
LATEST_VERSION = 1 @migration(1)
def _v1_to_v2(data: dict) -> dict:
passwords = data.pop("passwords", {})
entries = {}
for k, v in passwords.items():
v.setdefault("type", "password")
v.setdefault("notes", "")
entries[k] = v
data["entries"] = entries
data["schema_version"] = 2
return data
LATEST_VERSION = 2
def apply_migrations(data: dict) -> dict: def apply_migrations(data: dict) -> dict:

View File

@@ -19,7 +19,12 @@ def test_backup_restore_workflow(monkeypatch):
index_file = fp_dir / "seedpass_passwords_db.json.enc" index_file = fp_dir / "seedpass_passwords_db.json.enc"
data1 = {"passwords": {"0": {"website": "a", "length": 10}}} data1 = {
"schema_version": 2,
"entries": {
"0": {"website": "a", "length": 10, "type": "password", "notes": ""}
},
}
vault.save_index(data1) vault.save_index(data1)
os.utime(index_file, (1, 1)) os.utime(index_file, (1, 1))
@@ -29,7 +34,12 @@ def test_backup_restore_workflow(monkeypatch):
assert backup1.exists() assert backup1.exists()
assert backup1.stat().st_mode & 0o777 == 0o600 assert backup1.stat().st_mode & 0o777 == 0o600
data2 = {"passwords": {"0": {"website": "b", "length": 12}}} data2 = {
"schema_version": 2,
"entries": {
"0": {"website": "b", "length": 12, "type": "password", "notes": ""}
},
}
vault.save_index(data2) vault.save_index(data2)
os.utime(index_file, (2, 2)) os.utime(index_file, (2, 2))
@@ -39,13 +49,13 @@ def test_backup_restore_workflow(monkeypatch):
assert backup2.exists() assert backup2.exists()
assert backup2.stat().st_mode & 0o777 == 0o600 assert backup2.stat().st_mode & 0o777 == 0o600
vault.save_index({"passwords": {"temp": {}}}) vault.save_index({"schema_version": 2, "entries": {"temp": {}}})
backup_mgr.restore_latest_backup() backup_mgr.restore_latest_backup()
assert vault.load_index()["passwords"] == data2["passwords"] assert vault.load_index()["entries"] == data2["entries"]
vault.save_index({"passwords": {}}) vault.save_index({"schema_version": 2, "entries": {}})
backup_mgr.restore_backup_by_timestamp(1111) backup_mgr.restore_backup_by_timestamp(1111)
assert vault.load_index()["passwords"] == data1["passwords"] assert vault.load_index()["entries"] == data1["entries"]
backup1.unlink() backup1.unlink()
current = vault.load_index() current = vault.load_index()

View File

@@ -26,5 +26,5 @@ def test_add_and_retrieve_entry():
} }
data = enc_mgr.load_json_data(entry_mgr.index_file) data = enc_mgr.load_json_data(entry_mgr.index_file)
assert str(index) in data.get("passwords", {}) assert str(index) in data.get("entries", {})
assert data["passwords"][str(index)] == entry assert data["entries"][str(index)] == entry

View File

@@ -16,7 +16,7 @@ def test_update_checksum_writes_to_expected_path():
entry_mgr = EntryManager(vault, tmp_path) entry_mgr = EntryManager(vault, tmp_path)
# create an empty index file # create an empty index file
vault.save_index({"passwords": {}}) vault.save_index({"entries": {}})
entry_mgr.update_checksum() entry_mgr.update_checksum()
expected = tmp_path / "seedpass_passwords_db_checksum.txt" expected = tmp_path / "seedpass_passwords_db_checksum.txt"
@@ -29,7 +29,7 @@ def test_backup_index_file_creates_backup_in_directory():
vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD) vault, enc_mgr = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
entry_mgr = EntryManager(vault, tmp_path) entry_mgr = EntryManager(vault, tmp_path)
vault.save_index({"passwords": {}}) vault.save_index({"entries": {}})
entry_mgr.backup_index_file() entry_mgr.backup_index_file()
backups = list(tmp_path.glob("passwords_db_backup_*.json.enc")) backups = list(tmp_path.glob("passwords_db_backup_*.json.enc"))

View File

@@ -30,17 +30,27 @@ def test_index_export_import_round_trip():
tmp = Path(td) tmp = Path(td)
vault = setup_vault(tmp) vault = setup_vault(tmp)
original = {"passwords": {"0": {"website": "example"}}} original = {
"schema_version": 2,
"entries": {"0": {"website": "example", "type": "password", "notes": ""}},
}
vault.save_index(original) vault.save_index(original)
encrypted = vault.get_encrypted_index() encrypted = vault.get_encrypted_index()
assert isinstance(encrypted, bytes) assert isinstance(encrypted, bytes)
vault.save_index({"passwords": {"0": {"website": "changed"}}}) vault.save_index(
{
"schema_version": 2,
"entries": {
"0": {"website": "changed", "type": "password", "notes": ""}
},
}
)
vault.decrypt_and_save_index_from_nostr(encrypted) vault.decrypt_and_save_index_from_nostr(encrypted)
loaded = vault.load_index() loaded = vault.load_index()
assert loaded["passwords"] == original["passwords"] assert loaded["entries"] == original["entries"]
def test_get_encrypted_index_missing_file(tmp_path): def test_get_encrypted_index_missing_file(tmp_path):

View File

@@ -13,18 +13,19 @@ def setup(tmp_path: Path):
return enc_mgr, vault return enc_mgr, vault
def test_migrate_v0_to_v1(tmp_path: Path): def test_migrate_v0_to_v2(tmp_path: Path):
enc_mgr, vault = setup(tmp_path) enc_mgr, vault = setup(tmp_path)
legacy = {"passwords": {"0": {"website": "a", "length": 8}}} legacy = {"passwords": {"0": {"website": "a", "length": 8}}}
enc_mgr.save_json_data(legacy) enc_mgr.save_json_data(legacy)
data = vault.load_index() data = vault.load_index()
assert data["schema_version"] == LATEST_VERSION assert data["schema_version"] == LATEST_VERSION
assert data["passwords"] == legacy["passwords"] expected_entry = {"website": "a", "length": 8, "type": "password", "notes": ""}
assert data["entries"]["0"] == expected_entry
def test_error_on_future_version(tmp_path: Path): def test_error_on_future_version(tmp_path: Path):
enc_mgr, vault = setup(tmp_path) enc_mgr, vault = setup(tmp_path)
future = {"schema_version": LATEST_VERSION + 1, "passwords": {}} future = {"schema_version": LATEST_VERSION + 1, "entries": {}}
enc_mgr.save_json_data(future) enc_mgr.save_json_data(future)
with pytest.raises(ValueError): with pytest.raises(ValueError):
vault.load_index() vault.load_index()

View File

@@ -32,7 +32,7 @@ def test_password_change_and_unlock(monkeypatch):
entry_mgr = EntryManager(vault, fp) entry_mgr = EntryManager(vault, fp)
cfg_mgr = ConfigManager(vault, fp) cfg_mgr = ConfigManager(vault, fp)
vault.save_index({"passwords": {}}) vault.save_index({"entries": {}})
cfg_mgr.save_config( cfg_mgr.save_config(
{ {
"relays": [], "relays": [],

View File

@@ -58,7 +58,7 @@ def test_add_and_delete_entry(monkeypatch):
pm.entry_manager = entry_mgr pm.entry_manager = entry_mgr
index = entry_mgr.add_entry("example.com", 12) index = entry_mgr.add_entry("example.com", 12)
assert str(index) in vault.load_index()["passwords"] assert str(index) in vault.load_index()["entries"]
published = [] published = []
pm.nostr_client = SimpleNamespace( pm.nostr_client = SimpleNamespace(
@@ -73,5 +73,5 @@ def test_add_and_delete_entry(monkeypatch):
pm.delete_entry() pm.delete_entry()
assert str(index) not in vault.load_index()["passwords"] assert str(index) not in vault.load_index()["entries"]
assert published assert published