Merge pull request #493 from PR0M3TH3AN/codex/implement-dual-encryption-support

Centralize AES-GCM decryption with version header
This commit is contained in:
thePR0M3TH3AN
2025-07-13 09:50:21 -04:00
committed by GitHub

View File

@@ -132,35 +132,22 @@ class EncryptionManager:
fh.seek(0) fh.seek(0)
encrypted_data = fh.read() encrypted_data = fh.read()
try: is_legacy = not encrypted_data.startswith(b"V2:")
decrypted = self.decrypt_data(encrypted_data) decrypted = self.decrypt_data(encrypted_data)
parent_seed = decrypted.decode("utf-8").strip() parent_seed = decrypted.decode("utf-8").strip()
logger.debug(
f"Parent seed decrypted successfully from '{parent_seed_path}'."
)
return parent_seed
except (InvalidTag, InvalidToken):
logger.info(
"AES-GCM decryption failed for parent seed, attempting Fernet fallback"
)
try:
decrypted = self.fernet.decrypt(encrypted_data)
except InvalidToken as e:
logger.error(
f"Fernet decryption failed for '{parent_seed_path}': {e}",
exc_info=True,
)
raise
parent_seed = decrypted.decode("utf-8").strip()
if is_legacy:
legacy_path = parent_seed_path.with_suffix( legacy_path = parent_seed_path.with_suffix(
parent_seed_path.suffix + ".fernet" parent_seed_path.suffix + ".fernet"
) )
os.rename(parent_seed_path, legacy_path) os.rename(parent_seed_path, legacy_path)
self.encrypt_parent_seed(parent_seed) self.encrypt_parent_seed(parent_seed)
logger.debug( logger.debug(
f"Parent seed decrypted with Fernet and re-encrypted using AES-GCM at '{parent_seed_path}'." f"Parent seed migrated from Fernet and re-encrypted at '{parent_seed_path}'."
)
logger.debug(
f"Parent seed decrypted successfully from '{parent_seed_path}'."
) )
return parent_seed return parent_seed
except Exception as e: except Exception as e:
@@ -169,12 +156,12 @@ class EncryptionManager:
raise raise
def encrypt_data(self, data: bytes) -> bytes: def encrypt_data(self, data: bytes) -> bytes:
"""Encrypt ``data`` with AES-GCM and prepend the nonce.""" """Encrypt ``data`` with AES-GCM and prepend a version header."""
try: try:
nonce = os.urandom(12) nonce = os.urandom(12)
ciphertext = self.cipher.encrypt(nonce, data, None) ciphertext = self.cipher.encrypt(nonce, data, None)
encrypted_data = nonce + ciphertext encrypted_data = b"V2:" + nonce + ciphertext
logger.debug("Data encrypted successfully with AES-GCM.") logger.debug("Data encrypted successfully with AES-GCM.")
return encrypted_data return encrypted_data
except Exception as e: except Exception as e:
@@ -183,22 +170,31 @@ class EncryptionManager:
raise raise
def decrypt_data(self, encrypted_data: bytes) -> bytes: def decrypt_data(self, encrypted_data: bytes) -> bytes:
"""Decrypt AES-GCM data that includes a prepended nonce.""" """Decrypt data using AES-GCM or legacy Fernet."""
try: try:
nonce, ciphertext = encrypted_data[:12], encrypted_data[12:] # Attempt AES-GCM decryption first
if encrypted_data.startswith(b"V2:"):
nonce = encrypted_data[3:15]
ciphertext = encrypted_data[15:]
else:
nonce = encrypted_data[:12]
ciphertext = encrypted_data[12:]
decrypted_data = self.cipher.decrypt(nonce, ciphertext, None) decrypted_data = self.cipher.decrypt(nonce, ciphertext, None)
logger.debug("Data decrypted successfully with AES-GCM.") logger.debug("Data decrypted successfully with AES-GCM.")
return decrypted_data return decrypted_data
except InvalidTag: except InvalidTag:
logger.error( if encrypted_data.startswith(b"V2:"):
"Invalid encryption key or corrupted data while decrypting data." logger.error("AES-GCM decryption failed: invalid tag", exc_info=True)
)
raise
except Exception as e:
logger.error(f"Failed to decrypt data: {e}", exc_info=True)
print(colored(f"Error: Failed to decrypt data: {e}", "red"))
raise raise
try:
decrypted_data = self.fernet.decrypt(encrypted_data)
logger.info("Legacy Fernet data decrypted successfully.")
return decrypted_data
except InvalidToken:
logger.error("Legacy Fernet decryption failed", exc_info=True)
raise InvalidTag("Data could not be decrypted")
def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None: def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None:
""" """
@@ -262,9 +258,10 @@ class EncryptionManager:
decrypted_data = self.decrypt_data(encrypted_data) decrypted_data = self.decrypt_data(encrypted_data)
logger.debug(f"Data decrypted successfully from '{file_path}'.") logger.debug(f"Data decrypted successfully from '{file_path}'.")
return decrypted_data return decrypted_data
except InvalidTag: except (InvalidTag, InvalidToken) as e:
logger.error( logger.error(
"Invalid encryption key or corrupted data while decrypting file." "Invalid encryption key or corrupted data while decrypting file.",
exc_info=True,
) )
raise raise
except Exception as e: except Exception as e:
@@ -331,37 +328,33 @@ class EncryptionManager:
) )
return {"entries": {}} return {"entries": {}}
try:
decrypted_data = self.decrypt_file(relative_path)
json_content = decrypted_data.decode("utf-8").strip()
data = json.loads(json_content)
logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}")
return data
except (InvalidTag, json.JSONDecodeError):
logger.info(
f"AES-GCM decryption failed for '{file_path}', attempting Fernet fallback"
)
with exclusive_lock(file_path) as fh: with exclusive_lock(file_path) as fh:
fh.seek(0) fh.seek(0)
legacy_bytes = fh.read() encrypted_bytes = fh.read()
try:
legacy_plain = decrypt_legacy_fernet(self.key_b64, legacy_bytes)
data = json.loads(legacy_plain.decode("utf-8").strip())
except (InvalidToken, json.JSONDecodeError) as e:
logger.error(
f"Legacy decryption failed for '{file_path}': {e}", exc_info=True
)
raise
is_legacy = not encrypted_bytes.startswith(b"V2:")
try:
decrypted_data = self.decrypt_data(encrypted_bytes)
data = json.loads(decrypted_data.decode("utf-8"))
if is_legacy:
legacy_path = file_path.with_suffix(file_path.suffix + ".fernet") legacy_path = file_path.with_suffix(file_path.suffix + ".fernet")
os.rename(file_path, legacy_path) os.rename(file_path, legacy_path)
chk = file_path.parent / f"{file_path.stem}_checksum.txt" chk = file_path.parent / f"{file_path.stem}_checksum.txt"
if chk.exists(): if chk.exists():
chk.rename(chk.with_suffix(chk.suffix + ".fernet")) chk.rename(chk.with_suffix(chk.suffix + ".fernet"))
self.save_json_data(data, relative_path) self.save_json_data(data, relative_path)
self.update_checksum(relative_path) self.update_checksum(relative_path)
logger.info(f"Migrated legacy vault file: {file_path}")
logger.debug(f"JSON data loaded and decrypted from '{file_path}'")
return data return data
except (InvalidTag, InvalidToken, json.JSONDecodeError) as e:
logger.error(
f"Could not load or migrate data from {file_path}: {e}", exc_info=True
)
raise
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Failed to load JSON data from '{file_path}': {e}", exc_info=True f"Failed to load JSON data from '{file_path}': {e}", exc_info=True
@@ -468,8 +461,18 @@ class EncryptionManager:
data = json.loads(decrypted_data.decode("utf-8")) data = json.loads(decrypted_data.decode("utf-8"))
self.save_json_data(data, relative_path) self.save_json_data(data, relative_path)
self.update_checksum(relative_path) self.update_checksum(relative_path)
logger.info("Index file updated from Nostr successfully.") logger.info("Index file from Nostr was processed and saved successfully.")
print(colored("Index file updated from Nostr successfully.", "green")) print(colored("Index file updated from Nostr successfully.", "green"))
except (InvalidToken, InvalidTag, json.JSONDecodeError) as e:
logger.error(
f"Failed to decrypt and save data from Nostr: {e}", exc_info=True
)
print(
colored(
f"Error: Failed to decrypt and save data from Nostr: {e}", "red"
)
)
raise
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Failed to decrypt and save data from Nostr: {e}", exc_info=True f"Failed to decrypt and save data from Nostr: {e}", exc_info=True
@@ -479,7 +482,6 @@ class EncryptionManager:
f"Error: Failed to decrypt and save data from Nostr: {e}", "red" f"Error: Failed to decrypt and save data from Nostr: {e}", "red"
) )
) )
# Re-raise the exception to inform the calling function of the failure
raise raise
def validate_seed(self, seed_phrase: str) -> bool: def validate_seed(self, seed_phrase: str) -> bool: