diff --git a/src/seedpass/core/encryption.py b/src/seedpass/core/encryption.py index 8218c88..e38e9a2 100644 --- a/src/seedpass/core/encryption.py +++ b/src/seedpass/core/encryption.py @@ -16,6 +16,7 @@ except Exception: # pragma: no cover - fallback for environments without orjson import hashlib import os import base64 +import zlib from dataclasses import asdict from pathlib import Path from typing import Optional, Tuple @@ -91,16 +92,23 @@ class EncryptionManager: # Track user preference for handling legacy indexes self._legacy_migrate_flag = True self.last_migration_performed = False + # Track nonces to detect accidental reuse + self.nonce_crc_table: set[int] = set() def encrypt_data(self, data: bytes) -> bytes: """ - (2) Encrypts data using the NEW AES-GCM format, prepending a version - header and the nonce. All new data will be in this format. + Encrypt data using AES-GCM, emitting ``b"V3|" + nonce + ciphertext + tag``. + A fresh 96-bit nonce is generated for each call and tracked via a CRC + table to detect accidental reuse during batch operations. """ try: nonce = os.urandom(12) # 96-bit nonce is recommended for AES-GCM + crc = zlib.crc32(nonce) + if crc in self.nonce_crc_table: + raise ValueError("Nonce reuse detected") + self.nonce_crc_table.add(crc) ciphertext = self.cipher.encrypt(nonce, data, None) - return b"V2:" + nonce + ciphertext + return b"V3|" + nonce + ciphertext except Exception as e: logger.error(f"Failed to encrypt data: {e}", exc_info=True) raise @@ -122,7 +130,21 @@ class EncryptionManager: ctx = f" {context}" if context else "" try: - # Try the new V2 format first + # Try the new V3 format first + if encrypted_data.startswith(b"V3|"): + try: + nonce = encrypted_data[3:15] + ciphertext = encrypted_data[15:] + if len(ciphertext) < 16: + logger.error("AES-GCM payload too short") + raise InvalidToken("AES-GCM payload too short") + return self.cipher.decrypt(nonce, ciphertext, None) + except InvalidTag as e: + msg = f"Failed to decrypt{ctx}: invalid key or corrupt file" + logger.error(msg) + raise InvalidToken(msg) from e + + # Next try the older V2 format if encrypted_data.startswith(b"V2:"): try: nonce = encrypted_data[3:15] @@ -146,19 +168,18 @@ class EncryptionManager: logger.error(msg) raise InvalidToken(msg) from e - # If it's not V2, it must be the legacy Fernet format - else: - logger.warning("Data is in legacy Fernet format. Attempting migration.") - try: - return self.fernet.decrypt(encrypted_data) - except InvalidToken as e: - logger.error( - "Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect." - ) - raise e + # If it's neither V3 nor V2, assume legacy Fernet format + logger.warning("Data is in legacy Fernet format. Attempting migration.") + try: + return self.fernet.decrypt(encrypted_data) + except InvalidToken as e: + logger.error( + "Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect." + ) + raise e except (InvalidToken, InvalidTag) as e: - if encrypted_data.startswith(b"V2:"): + if encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:"): # Already determined not to be legacy; re-raise raise if isinstance(e, InvalidToken) and str(e) == "AES-GCM payload too short": @@ -248,11 +269,13 @@ class EncryptionManager: blob = fh.read() kdf, encrypted_data = self._deserialize(blob) - is_legacy = not encrypted_data.startswith(b"V2:") + is_legacy = not ( + encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:") + ) decrypted_data = self.decrypt_data(encrypted_data, context="seed") if is_legacy: - logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.") + logger.info("Parent seed was in legacy format. Re-encrypting to V3 format.") self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf) return decrypted_data.decode("utf-8").strip() @@ -362,7 +385,9 @@ class EncryptionManager: blob = fh.read() kdf, encrypted_data = self._deserialize(blob) - is_legacy = not encrypted_data.startswith(b"V2:") + is_legacy = not ( + encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:") + ) self.last_migration_performed = False try: @@ -424,7 +449,7 @@ class EncryptionManager: relative_path = Path("seedpass_entries_db.json.enc") kdf, ciphertext = self._deserialize(encrypted_data) - is_legacy = not ciphertext.startswith(b"V2:") + is_legacy = not (ciphertext.startswith(b"V3|") or ciphertext.startswith(b"V2:")) self.last_migration_performed = False def _process(decrypted: bytes) -> dict: diff --git a/src/tests/test_legacy_migration.py b/src/tests/test_legacy_migration.py index fa2b25f..1ee0b54 100644 --- a/src/tests/test_legacy_migration.py +++ b/src/tests/test_legacy_migration.py @@ -83,7 +83,7 @@ def test_failed_migration_restores_legacy(monkeypatch, tmp_path: Path): assert not vault.migrated_from_legacy -def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path): +def test_migrated_index_has_v3_prefix(monkeypatch, tmp_path: Path): vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD) key = derive_index_key(TEST_SEED) @@ -101,7 +101,7 @@ def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path): new_file = tmp_path / "seedpass_entries_db.json.enc" payload = json.loads(new_file.read_text()) - assert base64.b64decode(payload["ct"]).startswith(b"V2:") + assert base64.b64decode(payload["ct"]).startswith(b"V3|") assert vault.migrated_from_legacy diff --git a/src/tests/test_legacy_migration_iterations.py b/src/tests/test_legacy_migration_iterations.py index e5f791a..e2007e6 100644 --- a/src/tests/test_legacy_migration_iterations.py +++ b/src/tests/test_legacy_migration_iterations.py @@ -67,4 +67,4 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations): assert cfg.get_kdf_iterations() == iterations payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text()) - assert base64.b64decode(payload["ct"]).startswith(b"V2:") + assert base64.b64decode(payload["ct"]).startswith(b"V3|") diff --git a/src/tests/test_legacy_migration_prompt.py b/src/tests/test_legacy_migration_prompt.py index bbc2b43..13b5d43 100644 --- a/src/tests/test_legacy_migration_prompt.py +++ b/src/tests/test_legacy_migration_prompt.py @@ -51,5 +51,5 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch): monkeypatch.setattr("builtins.input", lambda _: "2") vault.load_index() payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text()) - assert base64.b64decode(payload["ct"]).startswith(b"V2:") + assert base64.b64decode(payload["ct"]).startswith(b"V3|") assert vault.encryption_manager.last_migration_performed is True diff --git a/src/tests/test_legacy_migration_second_session.py b/src/tests/test_legacy_migration_second_session.py index 99654cd..e20eefe 100644 --- a/src/tests/test_legacy_migration_second_session.py +++ b/src/tests/test_legacy_migration_second_session.py @@ -36,7 +36,7 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None: vault.load_index() new_file = fp_dir / "seedpass_entries_db.json.enc" payload = json.loads(new_file.read_text()) - assert base64.b64decode(payload["ct"]).startswith(b"V2:") + assert base64.b64decode(payload["ct"]).startswith(b"V3|") new_enc_mgr = EncryptionManager(key, fp_dir) new_vault = Vault(new_enc_mgr, fp_dir) @@ -62,4 +62,4 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None: pm.initialize_managers() payload = json.loads(new_file.read_text()) - assert base64.b64decode(payload["ct"]).startswith(b"V2:") + assert base64.b64decode(payload["ct"]).startswith(b"V3|") diff --git a/src/tests/test_nonce_uniqueness.py b/src/tests/test_nonce_uniqueness.py new file mode 100644 index 0000000..a1f8461 --- /dev/null +++ b/src/tests/test_nonce_uniqueness.py @@ -0,0 +1,19 @@ +from pathlib import Path + +from helpers import TEST_SEED +from utils.key_derivation import derive_index_key +from seedpass.core.encryption import EncryptionManager + + +def test_nonce_uniqueness(tmp_path: Path) -> None: + key = derive_index_key(TEST_SEED) + manager = EncryptionManager(key, tmp_path) + plaintext = b"repeat" + nonces = set() + for _ in range(10): + payload = manager.encrypt_data(plaintext) + assert payload.startswith(b"V3|") + nonce = payload[3:15] + assert nonce not in nonces + nonces.add(nonce) + assert len(nonces) == 10 diff --git a/src/tests/test_seed_migration.py b/src/tests/test_seed_migration.py index 5f5b28b..5951e91 100644 --- a/src/tests/test_seed_migration.py +++ b/src/tests/test_seed_migration.py @@ -31,4 +31,4 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None: assert new_file.exists() assert new_file.read_bytes() != encrypted payload = json.loads(new_file.read_text()) - assert base64.b64decode(payload["ct"]).startswith(b"V2:") + assert base64.b64decode(payload["ct"]).startswith(b"V3|")