mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 23:38:49 +00:00
Validate import path and extension
This commit is contained in:
@@ -111,11 +111,15 @@ def _require_password(password: str | None) -> None:
|
|||||||
raise HTTPException(status_code=401, detail="Invalid password")
|
raise HTTPException(status_code=401, detail="Invalid password")
|
||||||
|
|
||||||
|
|
||||||
def _validate_encryption_path(path: Path) -> None:
|
def _validate_encryption_path(path: Path) -> Path:
|
||||||
"""Validate that ``path`` stays within the active fingerprint directory."""
|
"""Validate and normalize ``path`` within the active fingerprint directory.
|
||||||
|
|
||||||
|
Returns the resolved absolute path if validation succeeds.
|
||||||
|
"""
|
||||||
|
|
||||||
assert _pm is not None
|
assert _pm is not None
|
||||||
try:
|
try:
|
||||||
_pm.encryption_manager.resolve_relative_path(path)
|
return _pm.encryption_manager.resolve_relative_path(path)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise HTTPException(status_code=400, detail=str(e))
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
|
|
||||||
@@ -595,11 +599,19 @@ async def import_vault(
|
|||||||
os.unlink(tmp_path)
|
os.unlink(tmp_path)
|
||||||
else:
|
else:
|
||||||
body = await request.json()
|
body = await request.json()
|
||||||
path = body.get("path")
|
path_str = body.get("path")
|
||||||
|
|
||||||
if not path:
|
if not path_str:
|
||||||
raise HTTPException(status_code=400, detail="Missing file or path")
|
raise HTTPException(status_code=400, detail="Missing file or path")
|
||||||
_pm.handle_import_database(Path(path))
|
|
||||||
|
path = _validate_encryption_path(Path(path_str))
|
||||||
|
if not str(path).endswith(".json.enc"):
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail="Selected file must be a '.json.enc' backup",
|
||||||
|
)
|
||||||
|
|
||||||
|
_pm.handle_import_database(path)
|
||||||
_pm.sync_vault()
|
_pm.sync_vault()
|
||||||
return {"status": "ok"}
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
@@ -3,6 +3,7 @@ from pathlib import Path
|
|||||||
import os
|
import os
|
||||||
import base64
|
import base64
|
||||||
import pytest
|
import pytest
|
||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
from seedpass import api
|
from seedpass import api
|
||||||
from test_api import client
|
from test_api import client
|
||||||
@@ -225,7 +226,8 @@ def test_vault_import_via_path(client, tmp_path):
|
|||||||
|
|
||||||
api._pm.handle_import_database = import_db
|
api._pm.handle_import_database = import_db
|
||||||
api._pm.sync_vault = lambda: called.setdefault("sync", True)
|
api._pm.sync_vault = lambda: called.setdefault("sync", True)
|
||||||
file_path = tmp_path / "b.json"
|
api._pm.encryption_manager = SimpleNamespace(resolve_relative_path=lambda p: p)
|
||||||
|
file_path = tmp_path / "b.json.enc"
|
||||||
file_path.write_text("{}")
|
file_path.write_text("{}")
|
||||||
|
|
||||||
headers = {"Authorization": f"Bearer {token}"}
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
@@ -265,6 +267,37 @@ def test_vault_import_via_upload(client, tmp_path):
|
|||||||
assert called.get("sync") is True
|
assert called.get("sync") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_vault_import_invalid_extension(client):
|
||||||
|
cl, token = client
|
||||||
|
api._pm.handle_import_database = lambda path: None
|
||||||
|
api._pm.sync_vault = lambda: None
|
||||||
|
api._pm.encryption_manager = SimpleNamespace(resolve_relative_path=lambda p: p)
|
||||||
|
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
res = cl.post(
|
||||||
|
"/api/v1/vault/import",
|
||||||
|
json={"path": "bad.txt"},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert res.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
|
def test_vault_import_path_traversal_blocked(client, tmp_path):
|
||||||
|
cl, token = client
|
||||||
|
key = base64.urlsafe_b64encode(os.urandom(32))
|
||||||
|
api._pm.encryption_manager = EncryptionManager(key, tmp_path)
|
||||||
|
api._pm.handle_import_database = lambda path: None
|
||||||
|
api._pm.sync_vault = lambda: None
|
||||||
|
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
res = cl.post(
|
||||||
|
"/api/v1/vault/import",
|
||||||
|
json={"path": "../evil.json.enc"},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert res.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
def test_vault_lock_endpoint(client):
|
def test_vault_lock_endpoint(client):
|
||||||
cl, token = client
|
cl, token = client
|
||||||
called = {}
|
called = {}
|
||||||
|
Reference in New Issue
Block a user