diff --git a/src/password_manager/encryption.py b/src/password_manager/encryption.py index de0c020..c0f6954 100644 --- a/src/password_manager/encryption.py +++ b/src/password_manager/encryption.py @@ -132,35 +132,22 @@ class EncryptionManager: fh.seek(0) encrypted_data = fh.read() - try: - decrypted = self.decrypt_data(encrypted_data) - parent_seed = decrypted.decode("utf-8").strip() + is_legacy = not encrypted_data.startswith(b"V2:") + decrypted = self.decrypt_data(encrypted_data) + parent_seed = decrypted.decode("utf-8").strip() + + if is_legacy: + legacy_path = parent_seed_path.with_suffix( + parent_seed_path.suffix + ".fernet" + ) + os.rename(parent_seed_path, legacy_path) + self.encrypt_parent_seed(parent_seed) logger.debug( - f"Parent seed decrypted successfully from '{parent_seed_path}'." + f"Parent seed migrated from Fernet and re-encrypted at '{parent_seed_path}'." ) - return parent_seed - except (InvalidTag, InvalidToken): - logger.info( - "AES-GCM decryption failed for parent seed, attempting Fernet fallback" - ) - try: - decrypted = self.fernet.decrypt(encrypted_data) - except InvalidToken as e: - logger.error( - f"Fernet decryption failed for '{parent_seed_path}': {e}", - exc_info=True, - ) - raise - parent_seed = decrypted.decode("utf-8").strip() - - legacy_path = parent_seed_path.with_suffix( - parent_seed_path.suffix + ".fernet" - ) - os.rename(parent_seed_path, legacy_path) - self.encrypt_parent_seed(parent_seed) logger.debug( - f"Parent seed decrypted with Fernet and re-encrypted using AES-GCM at '{parent_seed_path}'." + f"Parent seed decrypted successfully from '{parent_seed_path}'." ) return parent_seed except Exception as e: @@ -169,12 +156,12 @@ class EncryptionManager: raise def encrypt_data(self, data: bytes) -> bytes: - """Encrypt ``data`` with AES-GCM and prepend the nonce.""" + """Encrypt ``data`` with AES-GCM and prepend a version header.""" try: nonce = os.urandom(12) ciphertext = self.cipher.encrypt(nonce, data, None) - encrypted_data = nonce + ciphertext + encrypted_data = b"V2:" + nonce + ciphertext logger.debug("Data encrypted successfully with AES-GCM.") return encrypted_data except Exception as e: @@ -183,22 +170,31 @@ class EncryptionManager: raise def decrypt_data(self, encrypted_data: bytes) -> bytes: - """Decrypt AES-GCM data that includes a prepended nonce.""" + """Decrypt data using AES-GCM or legacy Fernet.""" try: - nonce, ciphertext = encrypted_data[:12], encrypted_data[12:] + # Attempt AES-GCM decryption first + if encrypted_data.startswith(b"V2:"): + nonce = encrypted_data[3:15] + ciphertext = encrypted_data[15:] + else: + nonce = encrypted_data[:12] + ciphertext = encrypted_data[12:] + decrypted_data = self.cipher.decrypt(nonce, ciphertext, None) logger.debug("Data decrypted successfully with AES-GCM.") return decrypted_data except InvalidTag: - logger.error( - "Invalid encryption key or corrupted data while decrypting data." - ) - raise - except Exception as e: - logger.error(f"Failed to decrypt data: {e}", exc_info=True) - print(colored(f"Error: Failed to decrypt data: {e}", "red")) - raise + if encrypted_data.startswith(b"V2:"): + logger.error("AES-GCM decryption failed: invalid tag", exc_info=True) + raise + try: + decrypted_data = self.fernet.decrypt(encrypted_data) + logger.info("Legacy Fernet data decrypted successfully.") + return decrypted_data + except InvalidToken: + logger.error("Legacy Fernet decryption failed", exc_info=True) + raise InvalidTag("Data could not be decrypted") def encrypt_and_save_file(self, data: bytes, relative_path: Path) -> None: """ @@ -262,9 +258,10 @@ class EncryptionManager: decrypted_data = self.decrypt_data(encrypted_data) logger.debug(f"Data decrypted successfully from '{file_path}'.") return decrypted_data - except InvalidTag: + except (InvalidTag, InvalidToken) as e: logger.error( - "Invalid encryption key or corrupted data while decrypting file." + "Invalid encryption key or corrupted data while decrypting file.", + exc_info=True, ) raise except Exception as e: @@ -331,37 +328,33 @@ class EncryptionManager: ) return {"entries": {}} + with exclusive_lock(file_path) as fh: + fh.seek(0) + encrypted_bytes = fh.read() + + is_legacy = not encrypted_bytes.startswith(b"V2:") + try: - decrypted_data = self.decrypt_file(relative_path) - json_content = decrypted_data.decode("utf-8").strip() - data = json.loads(json_content) - logger.debug(f"JSON data loaded and decrypted from '{file_path}': {data}") + decrypted_data = self.decrypt_data(encrypted_bytes) + data = json.loads(decrypted_data.decode("utf-8")) + + if is_legacy: + legacy_path = file_path.with_suffix(file_path.suffix + ".fernet") + os.rename(file_path, legacy_path) + chk = file_path.parent / f"{file_path.stem}_checksum.txt" + if chk.exists(): + chk.rename(chk.with_suffix(chk.suffix + ".fernet")) + self.save_json_data(data, relative_path) + self.update_checksum(relative_path) + logger.info(f"Migrated legacy vault file: {file_path}") + + logger.debug(f"JSON data loaded and decrypted from '{file_path}'") return data - except (InvalidTag, json.JSONDecodeError): - logger.info( - f"AES-GCM decryption failed for '{file_path}', attempting Fernet fallback" + except (InvalidTag, InvalidToken, json.JSONDecodeError) as e: + logger.error( + f"Could not load or migrate data from {file_path}: {e}", exc_info=True ) - with exclusive_lock(file_path) as fh: - fh.seek(0) - legacy_bytes = fh.read() - try: - legacy_plain = decrypt_legacy_fernet(self.key_b64, legacy_bytes) - data = json.loads(legacy_plain.decode("utf-8").strip()) - except (InvalidToken, json.JSONDecodeError) as e: - logger.error( - f"Legacy decryption failed for '{file_path}': {e}", exc_info=True - ) - raise - - legacy_path = file_path.with_suffix(file_path.suffix + ".fernet") - os.rename(file_path, legacy_path) - chk = file_path.parent / f"{file_path.stem}_checksum.txt" - if chk.exists(): - chk.rename(chk.with_suffix(chk.suffix + ".fernet")) - - self.save_json_data(data, relative_path) - self.update_checksum(relative_path) - return data + raise except Exception as e: logger.error( f"Failed to load JSON data from '{file_path}': {e}", exc_info=True @@ -468,8 +461,18 @@ class EncryptionManager: data = json.loads(decrypted_data.decode("utf-8")) self.save_json_data(data, relative_path) self.update_checksum(relative_path) - logger.info("Index file updated from Nostr successfully.") + logger.info("Index file from Nostr was processed and saved successfully.") print(colored("Index file updated from Nostr successfully.", "green")) + except (InvalidToken, InvalidTag, json.JSONDecodeError) as e: + logger.error( + f"Failed to decrypt and save data from Nostr: {e}", exc_info=True + ) + print( + colored( + f"Error: Failed to decrypt and save data from Nostr: {e}", "red" + ) + ) + raise except Exception as e: logger.error( f"Failed to decrypt and save data from Nostr: {e}", exc_info=True @@ -479,7 +482,6 @@ class EncryptionManager: f"Error: Failed to decrypt and save data from Nostr: {e}", "red" ) ) - # Re-raise the exception to inform the calling function of the failure raise def validate_seed(self, seed_phrase: str) -> bool: