feat: centralize decryption with version prefix

This commit is contained in:
thePR0M3TH3AN
2025-07-13 09:48:36 -04:00
parent 73196e20ec
commit dacc591c25

View File

@@ -132,35 +132,22 @@ class EncryptionManager:
fh.seek(0) fh.seek(0)
encrypted_data = fh.read() encrypted_data = fh.read()
try: is_legacy = not encrypted_data.startswith(b"V2:")
decrypted = self.decrypt_data(encrypted_data) decrypted = self.decrypt_data(encrypted_data)
parent_seed = decrypted.decode("utf-8").strip() parent_seed = decrypted.decode("utf-8").strip()
if is_legacy:
legacy_path = parent_seed_path.with_suffix(
parent_seed_path.suffix + ".fernet"
)
os.rename(parent_seed_path, legacy_path)
self.encrypt_parent_seed(parent_seed)
logger.debug( logger.debug(
f"Parent seed decrypted successfully from '{parent_seed_path}'." f"Parent seed migrated from Fernet and re-encrypted at '{parent_seed_path}'."
) )
return parent_seed
except (InvalidTag, InvalidToken):
logger.info(
"AES-GCM decryption failed for parent seed, attempting Fernet fallback"
)
try:
decrypted = self.fernet.decrypt(encrypted_data)
except InvalidToken as e:
logger.error(
f"Fernet decryption failed for '{parent_seed_path}': {e}",
exc_info=True,
)
raise
parent_seed = decrypted.decode("utf-8").strip()
legacy_path = parent_seed_path.with_suffix(
parent_seed_path.suffix + ".fernet"
)
os.rename(parent_seed_path, legacy_path)
self.encrypt_parent_seed(parent_seed)
logger.debug( logger.debug(
f"Parent seed decrypted with Fernet and re-encrypted using AES-GCM at '{parent_seed_path}'." f"Parent seed decrypted successfully from '{parent_seed_path}'."
) )
return parent_seed return parent_seed
except Exception as e: except Exception as e:
@@ -169,12 +156,12 @@ class EncryptionManager:
raise raise
def encrypt_data(self, data: bytes) -> bytes: def encrypt_data(self, data: bytes) -> bytes:
"""Encrypt ``data`` with AES-GCM and prepend the nonce.""" """Encrypt ``data`` with AES-GCM and prepend a version header."""
try: try:
nonce = os.urandom(12) nonce = os.urandom(12)
ciphertext = self.cipher.encrypt(nonce, data, None) ciphertext = self.cipher.encrypt(nonce, data, None)
encrypted_data = nonce + ciphertext encrypted_data = b"V2:" + nonce + ciphertext
logger.debug("Data encrypted successfully with AES-GCM.") logger.debug("Data encrypted successfully with AES-GCM.")
return encrypted_data return encrypted_data
except Exception as e: except Exception as e:
@@ -183,22 +170,31 @@ class EncryptionManager:
raise raise
def decrypt_data(self, encrypted_data: bytes) -> bytes: def decrypt_data(self, encrypted_data: bytes) -> bytes:
"""Decrypt AES-GCM data that includes a prepended nonce.""" """Decrypt data using AES-GCM or legacy Fernet."""
try: try:
nonce, ciphertext = encrypted_data[:12], encrypted_data[12:] # Attempt AES-GCM decryption first
if encrypted_data.startswith(b"V2:"):
nonce = encrypted_data[3:15]
ciphertext = encrypted_data[15:]
else:
nonce = encrypted_data[:12]
ciphertext = encrypted_data[12:]
decrypted_data = self.cipher.decrypt(nonce, ciphertext, None) decrypted_data = self.cipher.decrypt(nonce, ciphertext, None)
logger.debug("Data decrypted successfully with AES-GCM.") logger.debug("Data decrypted successfully with AES-GCM.")
return decrypted_data return decrypted_data
except InvalidTag: except InvalidTag:
logger.error( if encrypted_data.startswith(b"V2:"):
"Invalid encryption key or corrupted data while decrypting data." logger.error("AES-GCM decryption failed: invalid tag", exc_info=True)
) raise
raise try:
except Exception as e: decrypted_data = self.fernet.decrypt(encrypted_data)
logger.error(f"Failed to decrypt data: {e}", exc_info=True) logger.info("Legacy Fernet data decrypted successfully.")
print(colored(f"Error: Failed to decrypt data: {e}", "red")) return decrypted_data
raise except InvalidToken:
logger.error("Legacy Fernet decryption failed", exc_info=True)
raise InvalidTag("Data could not be decrypted")
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None: def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
""" """
@@ -262,9 +258,10 @@ class EncryptionManager:
decrypted_data = self.decrypt_data(encrypted_data) decrypted_data = self.decrypt_data(encrypted_data)
logger.debug(f"Data decrypted successfully from '{file_path}'.") logger.debug(f"Data decrypted successfully from '{file_path}'.")
return decrypted_data return decrypted_data
except InvalidTag: except (InvalidTag, InvalidToken) as e:
logger.error( logger.error(
"Invalid encryption key or corrupted data while decrypting file." "Invalid encryption key or corrupted data while decrypting file.",
exc_info=True,
) )
raise raise
except Exception as e: except Exception as e:
@@ -331,37 +328,33 @@ class EncryptionManager:
) )
return {"entries": {}} return {"entries": {}}
with exclusive_lock(file_path) as fh:
fh.seek(0)
encrypted_bytes = fh.read()
is_legacy = not encrypted_bytes.startswith(b"V2:")
try: try:
decrypted_data = self.decrypt_file(relative_path) decrypted_data = self.decrypt_data(encrypted_bytes)
json_content = decrypted_data.decode("utf-8").strip() data = json.loads(decrypted_data.decode("utf-8"))
data = json.loads(json_content)
logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}") if is_legacy:
legacy_path = file_path.with_suffix(file_path.suffix + ".fernet")
os.rename(file_path, legacy_path)
chk = file_path.parent / f"{file_path.stem}_checksum.txt"
if chk.exists():
chk.rename(chk.with_suffix(chk.suffix + ".fernet"))
self.save_json_data(data, relative_path)
self.update_checksum(relative_path)
logger.info(f"Migrated legacy vault file: {file_path}")
logger.debug(f"JSON data loaded and decrypted from '{file_path}'")
return data return data
except (InvalidTag, json.JSONDecodeError): except (InvalidTag, InvalidToken, json.JSONDecodeError) as e:
logger.info( logger.error(
f"AES-GCM decryption failed for '{file_path}', attempting Fernet fallback" f"Could not load or migrate data from {file_path}: {e}", exc_info=True
) )
with exclusive_lock(file_path) as fh: raise
fh.seek(0)
legacy_bytes = fh.read()
try:
legacy_plain = decrypt_legacy_fernet(self.key_b64, legacy_bytes)
data = json.loads(legacy_plain.decode("utf-8").strip())
except (InvalidToken, json.JSONDecodeError) as e:
logger.error(
f"Legacy decryption failed for '{file_path}': {e}", exc_info=True
)
raise
legacy_path = file_path.with_suffix(file_path.suffix + ".fernet")
os.rename(file_path, legacy_path)
chk = file_path.parent / f"{file_path.stem}_checksum.txt"
if chk.exists():
chk.rename(chk.with_suffix(chk.suffix + ".fernet"))
self.save_json_data(data, relative_path)
self.update_checksum(relative_path)
return data
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Failed to load JSON data from '{file_path}': {e}", exc_info=True f"Failed to load JSON data from '{file_path}': {e}", exc_info=True
@@ -468,8 +461,18 @@ class EncryptionManager:
data = json.loads(decrypted_data.decode("utf-8")) data = json.loads(decrypted_data.decode("utf-8"))
self.save_json_data(data, relative_path) self.save_json_data(data, relative_path)
self.update_checksum(relative_path) self.update_checksum(relative_path)
logger.info("Index file updated from Nostr successfully.") logger.info("Index file from Nostr was processed and saved successfully.")
print(colored("Index file updated from Nostr successfully.", "green")) print(colored("Index file updated from Nostr successfully.", "green"))
except (InvalidToken, InvalidTag, json.JSONDecodeError) as e:
logger.error(
f"Failed to decrypt and save data from Nostr: {e}", exc_info=True
)
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}", "red"
)
)
raise
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Failed to decrypt and save data from Nostr: {e}", exc_info=True f"Failed to decrypt and save data from Nostr: {e}", exc_info=True
@@ -479,7 +482,6 @@ class EncryptionManager:
f"Error: Failed to decrypt and save data from Nostr: {e}", "red" f"Error: Failed to decrypt and save data from Nostr: {e}", "red"
) )
) )
# Re-raise the exception to inform the calling function of the failure
raise raise
def validate_seed(self, seed_phrase: str) -> bool: def validate_seed(self, seed_phrase: str) -> bool: