Merge pull request #835 from PR0M3TH3AN/codex/update-encryptionmanager-for-nonce-handling

Add nonce tracking and V3 encryption format
This commit is contained in:
thePR0M3TH3AN
2025-08-20 20:02:07 -04:00
committed by GitHub
7 changed files with 70 additions and 26 deletions

View File

@@ -16,6 +16,7 @@ except Exception: # pragma: no cover - fallback for environments without orjson
import hashlib
import os
import base64
import zlib
from dataclasses import asdict
from pathlib import Path
from typing import Optional, Tuple
@@ -91,16 +92,23 @@ class EncryptionManager:
# Track user preference for handling legacy indexes
self._legacy_migrate_flag = True
self.last_migration_performed = False
# Track nonces to detect accidental reuse
self.nonce_crc_table: set[int] = set()
def encrypt_data(self, data: bytes) -> bytes:
"""
(2) Encrypts data using the NEW AES-GCM format, prepending a version
header and the nonce. All new data will be in this format.
Encrypt data using AES-GCM, emitting ``b"V3|" + nonce + ciphertext + tag``.
A fresh 96-bit nonce is generated for each call and tracked via a CRC
table to detect accidental reuse during batch operations.
"""
try:
nonce = os.urandom(12) # 96-bit nonce is recommended for AES-GCM
crc = zlib.crc32(nonce)
if crc in self.nonce_crc_table:
raise ValueError("Nonce reuse detected")
self.nonce_crc_table.add(crc)
ciphertext = self.cipher.encrypt(nonce, data, None)
return b"V2:" + nonce + ciphertext
return b"V3|" + nonce + ciphertext
except Exception as e:
logger.error(f"Failed to encrypt data: {e}", exc_info=True)
raise
@@ -122,7 +130,21 @@ class EncryptionManager:
ctx = f" {context}" if context else ""
try:
# Try the new V2 format first
# Try the new V3 format first
if encrypted_data.startswith(b"V3|"):
try:
nonce = encrypted_data[3:15]
ciphertext = encrypted_data[15:]
if len(ciphertext) < 16:
logger.error("AES-GCM payload too short")
raise InvalidToken("AES-GCM payload too short")
return self.cipher.decrypt(nonce, ciphertext, None)
except InvalidTag as e:
msg = f"Failed to decrypt{ctx}: invalid key or corrupt file"
logger.error(msg)
raise InvalidToken(msg) from e
# Next try the older V2 format
if encrypted_data.startswith(b"V2:"):
try:
nonce = encrypted_data[3:15]
@@ -146,19 +168,18 @@ class EncryptionManager:
logger.error(msg)
raise InvalidToken(msg) from e
# If it's not V2, it must be the legacy Fernet format
else:
logger.warning("Data is in legacy Fernet format. Attempting migration.")
try:
return self.fernet.decrypt(encrypted_data)
except InvalidToken as e:
logger.error(
"Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect."
)
raise e
# If it's neither V3 nor V2, assume legacy Fernet format
logger.warning("Data is in legacy Fernet format. Attempting migration.")
try:
return self.fernet.decrypt(encrypted_data)
except InvalidToken as e:
logger.error(
"Legacy Fernet decryption failed. Vault may be corrupt or key is incorrect."
)
raise e
except (InvalidToken, InvalidTag) as e:
if encrypted_data.startswith(b"V2:"):
if encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:"):
# Already determined not to be legacy; re-raise
raise
if isinstance(e, InvalidToken) and str(e) == "AES-GCM payload too short":
@@ -248,11 +269,13 @@ class EncryptionManager:
blob = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not encrypted_data.startswith(b"V2:")
is_legacy = not (
encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:")
)
decrypted_data = self.decrypt_data(encrypted_data, context="seed")
if is_legacy:
logger.info("Parent seed was in legacy format. Re-encrypting to V2 format.")
logger.info("Parent seed was in legacy format. Re-encrypting to V3 format.")
self.encrypt_parent_seed(decrypted_data.decode("utf-8").strip(), kdf=kdf)
return decrypted_data.decode("utf-8").strip()
@@ -362,7 +385,9 @@ class EncryptionManager:
blob = fh.read()
kdf, encrypted_data = self._deserialize(blob)
is_legacy = not encrypted_data.startswith(b"V2:")
is_legacy = not (
encrypted_data.startswith(b"V3|") or encrypted_data.startswith(b"V2:")
)
self.last_migration_performed = False
try:
@@ -424,7 +449,7 @@ class EncryptionManager:
relative_path = Path("seedpass_entries_db.json.enc")
kdf, ciphertext = self._deserialize(encrypted_data)
is_legacy = not ciphertext.startswith(b"V2:")
is_legacy = not (ciphertext.startswith(b"V3|") or ciphertext.startswith(b"V2:"))
self.last_migration_performed = False
def _process(decrypted: bytes) -> dict:

View File

@@ -83,7 +83,7 @@ def test_failed_migration_restores_legacy(monkeypatch, tmp_path: Path):
assert not vault.migrated_from_legacy
def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
def test_migrated_index_has_v3_prefix(monkeypatch, tmp_path: Path):
vault, _ = create_vault(tmp_path, TEST_SEED, TEST_PASSWORD)
key = derive_index_key(TEST_SEED)
@@ -101,7 +101,7 @@ def test_migrated_index_has_v2_prefix(monkeypatch, tmp_path: Path):
new_file = tmp_path / "seedpass_entries_db.json.enc"
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
assert vault.migrated_from_legacy

View File

@@ -67,4 +67,4 @@ def test_migrate_iterations(tmp_path, monkeypatch, iterations):
assert cfg.get_kdf_iterations() == iterations
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert base64.b64decode(payload["ct"]).startswith(b"V3|")

View File

@@ -51,5 +51,5 @@ def test_migrate_legacy_sets_flag(tmp_path, monkeypatch):
monkeypatch.setattr("builtins.input", lambda _: "2")
vault.load_index()
payload = json.loads((tmp_path / "seedpass_entries_db.json.enc").read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
assert vault.encryption_manager.last_migration_performed is True

View File

@@ -36,7 +36,7 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
vault.load_index()
new_file = fp_dir / "seedpass_entries_db.json.enc"
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert base64.b64decode(payload["ct"]).startswith(b"V3|")
new_enc_mgr = EncryptionManager(key, fp_dir)
new_vault = Vault(new_enc_mgr, fp_dir)
@@ -62,4 +62,4 @@ def test_legacy_migration_second_session(monkeypatch, tmp_path: Path) -> None:
pm.initialize_managers()
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert base64.b64decode(payload["ct"]).startswith(b"V3|")

View File

@@ -0,0 +1,19 @@
from pathlib import Path
from helpers import TEST_SEED
from utils.key_derivation import derive_index_key
from seedpass.core.encryption import EncryptionManager
def test_nonce_uniqueness(tmp_path: Path) -> None:
key = derive_index_key(TEST_SEED)
manager = EncryptionManager(key, tmp_path)
plaintext = b"repeat"
nonces = set()
for _ in range(10):
payload = manager.encrypt_data(plaintext)
assert payload.startswith(b"V3|")
nonce = payload[3:15]
assert nonce not in nonces
nonces.add(nonce)
assert len(nonces) == 10

View File

@@ -31,4 +31,4 @@ def test_parent_seed_migrates_from_fernet(tmp_path: Path) -> None:
assert new_file.exists()
assert new_file.read_bytes() != encrypted
payload = json.loads(new_file.read_text())
assert base64.b64decode(payload["ct"]).startswith(b"V2:")
assert base64.b64decode(payload["ct"]).startswith(b"V3|")