mirror of
https://github.com/PR0M3TH3AN/SeedPass.git
synced 2025-09-08 07:18:47 +00:00
Add FastAPI API server
This commit is contained in:
@@ -19,6 +19,7 @@ cryptography==45.0.4
|
||||
ecdsa==0.19.1
|
||||
ed25519-blake2b==1.4.1
|
||||
execnet==2.1.1
|
||||
fastapi==0.116.0
|
||||
frozenlist==1.7.0
|
||||
glob2==0.7
|
||||
hypothesis==6.135.20
|
||||
@@ -57,6 +58,7 @@ termcolor==3.1.0
|
||||
toml==0.10.2
|
||||
tomli==2.2.1
|
||||
urllib3==2.5.0
|
||||
uvicorn==0.35.0
|
||||
varint==1.0.2
|
||||
websocket-client==1.7.0
|
||||
websockets==15.0.1
|
||||
|
@@ -25,3 +25,5 @@ freezegun
|
||||
pyperclip
|
||||
qrcode>=8.2
|
||||
typer>=0.12.3
|
||||
fastapi>=0.116.0
|
||||
uvicorn>=0.35.0
|
||||
|
104
src/seedpass/api.py
Normal file
104
src/seedpass/api.py
Normal file
@@ -0,0 +1,104 @@
|
||||
"""SeedPass FastAPI server."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import secrets
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from fastapi import FastAPI, Header, HTTPException
|
||||
import asyncio
|
||||
import sys
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from password_manager.manager import PasswordManager
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
_pm: Optional[PasswordManager] = None
|
||||
_token: str = ""
|
||||
|
||||
|
||||
def _check_token(auth: str | None) -> None:
|
||||
if auth != f"Bearer {_token}":
|
||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||
|
||||
|
||||
def start_server() -> str:
|
||||
"""Initialize global state and return the API token."""
|
||||
global _pm, _token
|
||||
_pm = PasswordManager()
|
||||
_token = secrets.token_urlsafe(16)
|
||||
print(f"API token: {_token}")
|
||||
origins = [
|
||||
o.strip()
|
||||
for o in os.getenv("SEEDPASS_CORS_ORIGINS", "").split(",")
|
||||
if o.strip()
|
||||
]
|
||||
if origins and app.middleware_stack is None:
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=origins,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
return _token
|
||||
|
||||
|
||||
@app.get("/api/v1/entry")
|
||||
def search_entry(query: str, authorization: str | None = Header(None)) -> List[Any]:
|
||||
_check_token(authorization)
|
||||
assert _pm is not None
|
||||
results = _pm.entry_manager.search_entries(query)
|
||||
return [
|
||||
{
|
||||
"id": idx,
|
||||
"label": label,
|
||||
"username": username,
|
||||
"url": url,
|
||||
"archived": archived,
|
||||
}
|
||||
for idx, label, username, url, archived in results
|
||||
]
|
||||
|
||||
|
||||
@app.get("/api/v1/entry/{entry_id}")
|
||||
def get_entry(entry_id: int, authorization: str | None = Header(None)) -> Any:
|
||||
_check_token(authorization)
|
||||
assert _pm is not None
|
||||
entry = _pm.entry_manager.retrieve_entry(entry_id)
|
||||
if entry is None:
|
||||
raise HTTPException(status_code=404, detail="Not found")
|
||||
return entry
|
||||
|
||||
|
||||
@app.get("/api/v1/config/{key}")
|
||||
def get_config(key: str, authorization: str | None = Header(None)) -> Any:
|
||||
_check_token(authorization)
|
||||
assert _pm is not None
|
||||
value = _pm.config_manager.load_config(require_pin=False).get(key)
|
||||
if value is None:
|
||||
raise HTTPException(status_code=404, detail="Not found")
|
||||
return {"key": key, "value": value}
|
||||
|
||||
|
||||
@app.get("/api/v1/fingerprint")
|
||||
def list_fingerprints(authorization: str | None = Header(None)) -> List[str]:
|
||||
_check_token(authorization)
|
||||
assert _pm is not None
|
||||
return _pm.fingerprint_manager.list_fingerprints()
|
||||
|
||||
|
||||
@app.get("/api/v1/nostr/pubkey")
|
||||
def get_nostr_pubkey(authorization: str | None = Header(None)) -> Any:
|
||||
_check_token(authorization)
|
||||
assert _pm is not None
|
||||
return {"npub": _pm.nostr_client.key_manager.get_npub()}
|
||||
|
||||
|
||||
@app.post("/api/v1/shutdown")
|
||||
async def shutdown_server(authorization: str | None = Header(None)) -> dict[str, str]:
|
||||
_check_token(authorization)
|
||||
asyncio.get_event_loop().call_soon(sys.exit, 0)
|
||||
return {"status": "shutting down"}
|
@@ -5,6 +5,8 @@ import typer
|
||||
|
||||
from password_manager.manager import PasswordManager
|
||||
from password_manager.entry_types import EntryType
|
||||
import uvicorn
|
||||
from . import api as api_module
|
||||
|
||||
app = typer.Typer(help="SeedPass command line interface")
|
||||
|
||||
@@ -23,6 +25,7 @@ nostr_app = typer.Typer(help="Interact with Nostr relays")
|
||||
config_app = typer.Typer(help="Manage configuration values")
|
||||
fingerprint_app = typer.Typer(help="Manage seed profiles")
|
||||
util_app = typer.Typer(help="Utility commands")
|
||||
api_app = typer.Typer(help="Run the API server")
|
||||
|
||||
app.add_typer(entry_app, name="entry")
|
||||
app.add_typer(vault_app, name="vault")
|
||||
@@ -30,6 +33,7 @@ app.add_typer(nostr_app, name="nostr")
|
||||
app.add_typer(config_app, name="config")
|
||||
app.add_typer(fingerprint_app, name="fingerprint")
|
||||
app.add_typer(util_app, name="util")
|
||||
app.add_typer(api_app, name="api")
|
||||
|
||||
|
||||
def _get_pm(ctx: typer.Context) -> PasswordManager:
|
||||
@@ -169,3 +173,26 @@ def fingerprint_list(ctx: typer.Context) -> None:
|
||||
def generate_password(ctx: typer.Context, length: int = 24) -> None:
|
||||
"""Generate a strong password."""
|
||||
typer.echo(f"Generate password of length {length} for {ctx.obj.get('fingerprint')}")
|
||||
|
||||
|
||||
@api_app.command("start")
|
||||
def api_start(host: str = "127.0.0.1", port: int = 8000) -> None:
|
||||
"""Start the SeedPass API server."""
|
||||
token = api_module.start_server()
|
||||
typer.echo(f"API token: {token}")
|
||||
uvicorn.run(api_module.app, host=host, port=port)
|
||||
|
||||
|
||||
@api_app.command("stop")
|
||||
def api_stop(host: str = "127.0.0.1", port: int = 8000) -> None:
|
||||
"""Stop the SeedPass API server."""
|
||||
import requests
|
||||
|
||||
try:
|
||||
requests.post(
|
||||
f"http://{host}:{port}/api/v1/shutdown",
|
||||
headers={"Authorization": f"Bearer {api_module._token}"},
|
||||
timeout=2,
|
||||
)
|
||||
except Exception as exc: # pragma: no cover - best effort
|
||||
typer.echo(f"Failed to stop server: {exc}")
|
||||
|
50
src/tests/test_api.py
Normal file
50
src/tests/test_api.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from types import SimpleNamespace
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
sys.path.append(str(Path(__file__).resolve().parents[1]))
|
||||
|
||||
from seedpass import api
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(monkeypatch):
|
||||
dummy = SimpleNamespace(
|
||||
entry_manager=SimpleNamespace(
|
||||
search_entries=lambda q: [(1, "Site", "user", "url", False)],
|
||||
retrieve_entry=lambda i: {"label": "Site"},
|
||||
),
|
||||
config_manager=SimpleNamespace(
|
||||
load_config=lambda require_pin=False: {"k": "v"}
|
||||
),
|
||||
fingerprint_manager=SimpleNamespace(list_fingerprints=lambda: ["fp"]),
|
||||
nostr_client=SimpleNamespace(
|
||||
key_manager=SimpleNamespace(get_npub=lambda: "np")
|
||||
),
|
||||
)
|
||||
monkeypatch.setattr(api, "PasswordManager", lambda: dummy)
|
||||
monkeypatch.setenv("SEEDPASS_CORS_ORIGINS", "http://example.com")
|
||||
token = api.start_server()
|
||||
client = TestClient(api.app)
|
||||
return client, token
|
||||
|
||||
|
||||
def test_cors_and_auth(client):
|
||||
cl, token = client
|
||||
headers = {"Authorization": f"Bearer {token}", "Origin": "http://example.com"}
|
||||
res = cl.get("/api/v1/entry", params={"query": "s"}, headers=headers)
|
||||
assert res.status_code == 200
|
||||
assert res.headers.get("access-control-allow-origin") == "http://example.com"
|
||||
|
||||
|
||||
def test_invalid_token(client):
|
||||
cl, _token = client
|
||||
res = cl.get(
|
||||
"/api/v1/entry",
|
||||
params={"query": "s"},
|
||||
headers={"Authorization": "Bearer bad"},
|
||||
)
|
||||
assert res.status_code == 401
|
Reference in New Issue
Block a user