mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 15:28:44 +00:00
Add legacy Fernet migration
This commit is contained in:
@@ -25,6 +25,8 @@ from typing import Optional
|
||||
import base64
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from cryptography.exceptions import InvalidTag
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
from termcolor import colored
|
||||
from utils.file_lock import (
|
||||
exclusive_lock,
|
||||
@@ -34,6 +36,16 @@ from utils.file_lock import (
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def decrypt_legacy_fernet(encryption_key: bytes | str, payload: bytes) -> bytes:
|
||||
"""Decrypt *payload* using legacy Fernet."""
|
||||
if isinstance(encryption_key, str):
|
||||
key = encryption_key.encode()
|
||||
else:
|
||||
key = encryption_key
|
||||
f = Fernet(key)
|
||||
return f.decrypt(payload)
|
||||
|
||||
|
||||
class EncryptionManager:
|
||||
"""
|
||||
EncryptionManager Class
|
||||
@@ -55,6 +67,7 @@ class EncryptionManager:
|
||||
try:
|
||||
if isinstance(encryption_key, str):
|
||||
encryption_key = encryption_key.encode()
|
||||
self.key_b64 = encryption_key
|
||||
self.key = base64.urlsafe_b64decode(encryption_key)
|
||||
self.cipher = AESGCM(self.key)
|
||||
logger.debug(
|
||||
@@ -304,16 +317,31 @@ class EncryptionManager:
|
||||
data = json.loads(json_content)
|
||||
logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}")
|
||||
return data
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(
|
||||
f"Failed to decode JSON data from '{file_path}': {e}", exc_info=True
|
||||
except (InvalidTag, json.JSONDecodeError):
|
||||
logger.info(
|
||||
f"AES-GCM decryption failed for '{file_path}', attempting legacy format"
|
||||
)
|
||||
raise
|
||||
except InvalidTag:
|
||||
logger.error(
|
||||
"Invalid encryption key or corrupted data while decrypting JSON data."
|
||||
)
|
||||
raise
|
||||
with exclusive_lock(file_path) as fh:
|
||||
fh.seek(0)
|
||||
legacy_bytes = fh.read()
|
||||
try:
|
||||
legacy_plain = decrypt_legacy_fernet(self.key_b64, legacy_bytes)
|
||||
data = json.loads(legacy_plain.decode("utf-8").strip())
|
||||
except (InvalidToken, json.JSONDecodeError) as e:
|
||||
logger.error(
|
||||
f"Legacy decryption failed for '{file_path}': {e}", exc_info=True
|
||||
)
|
||||
raise
|
||||
|
||||
legacy_path = file_path.with_suffix(file_path.suffix + ".fernet")
|
||||
os.rename(file_path, legacy_path)
|
||||
chk = file_path.parent / f"{file_path.stem}_checksum.txt"
|
||||
if chk.exists():
|
||||
chk.rename(chk.with_suffix(chk.suffix + ".fernet"))
|
||||
|
||||
self.save_json_data(data, relative_path)
|
||||
self.update_checksum(relative_path)
|
||||
return data
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to load JSON data from '{file_path}': {e}", exc_info=True
|
||||
|
@@ -30,6 +30,17 @@ class Vault:
|
||||
# ----- Password index helpers -----
|
||||
def load_index(self) -> dict:
|
||||
"""Return decrypted password index data as a dict, applying migrations."""
|
||||
legacy_file = self.fingerprint_dir / "seedpass_passwords_db.json.enc"
|
||||
if legacy_file.exists() and not self.index_file.exists():
|
||||
legacy_checksum = (
|
||||
self.fingerprint_dir / "seedpass_passwords_db_checksum.txt"
|
||||
)
|
||||
legacy_file.rename(self.index_file)
|
||||
if legacy_checksum.exists():
|
||||
legacy_checksum.rename(
|
||||
self.fingerprint_dir / "seedpass_entries_db_checksum.txt"
|
||||
)
|
||||
|
||||
data = self.encryption_manager.load_json_data(self.index_file)
|
||||
from .migrations import apply_migrations, LATEST_VERSION
|
||||
|
||||
|
43
src/tests/test_legacy_migration.py
Normal file
43
src/tests/test_legacy_migration.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import json
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
|
||||
from helpers import create_vault, TEST_SEED, TEST_PASSWORD
|
||||
from utils.key_derivation import derive_index_key
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
|
||||
def test_legacy_index_migrates(tmp_path: Path):
|
||||
vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
|
||||
|
||||
key = derive_index_key(TEST_SEED)
|
||||
data = {
|
||||
"schema_version": 4,
|
||||
"entries": {
|
||||
"0": {
|
||||
"label": "a",
|
||||
"length": 8,
|
||||
"type": "password",
|
||||
"kind": "password",
|
||||
"notes": "",
|
||||
"custom_fields": [],
|
||||
"origin": "",
|
||||
"tags": [],
|
||||
}
|
||||
},
|
||||
}
|
||||
enc = Fernet(key).encrypt(json.dumps(data).encode())
|
||||
legacy_file = tmp_path / "seedpass_passwords_db.json.enc"
|
||||
legacy_file.write_bytes(enc)
|
||||
(tmp_path / "seedpass_passwords_db_checksum.txt").write_text(
|
||||
hashlib.sha256(enc).hexdigest()
|
||||
)
|
||||
|
||||
loaded = vault.load_index()
|
||||
assert loaded == data
|
||||
|
||||
new_file = tmp_path / "seedpass_entries_db.json.enc"
|
||||
assert new_file.exists()
|
||||
assert not legacy_file.exists()
|
||||
assert not (tmp_path / "seedpass_passwords_db_checksum.txt").exists()
|
||||
assert (tmp_path / ("seedpass_entries_db.json.enc.fernet")).exists()
|
Reference in New Issue
Block a user