Merge pull request #417 from PR0M3TH3AN/codex/add-fastapi-and-implement-api-endpoints

Add FastAPI server and API tests
This commit is contained in:
thePR0M3TH3AN
2025-07-09 09:40:08 -04:00
committed by GitHub
5 changed files with 187 additions and 0 deletions

View File

@@ -19,6 +19,7 @@ cryptography==45.0.4
ecdsa==0.19.1
ed25519-blake2b==1.4.1
execnet==2.1.1
fastapi==0.116.0
frozenlist==1.7.0
glob2==0.7
hypothesis==6.135.20
@@ -57,6 +58,8 @@ termcolor==3.1.0
toml==0.10.2
tomli==2.2.1
urllib3==2.5.0
uvicorn==0.35.0
httpx==0.28.1
varint==1.0.2
websocket-client==1.7.0
websockets==15.0.1

View File

@@ -25,3 +25,6 @@ freezegun
pyperclip
qrcode>=8.2
typer>=0.12.3
fastapi>=0.116.0
uvicorn>=0.35.0
httpx>=0.28.1

104
src/seedpass/api.py Normal file
View File

@@ -0,0 +1,104 @@
"""SeedPass FastAPI server."""
from __future__ import annotations
import os
import secrets
from typing import Any, List, Optional
from fastapi import FastAPI, Header, HTTPException
import asyncio
import sys
from fastapi.middleware.cors import CORSMiddleware
from password_manager.manager import PasswordManager
app = FastAPI()
_pm: Optional[PasswordManager] = None
_token: str = ""
def _check_token(auth: str | None) -> None:
if auth != f"Bearer {_token}":
raise HTTPException(status_code=401, detail="Unauthorized")
def start_server() -> str:
"""Initialize global state and return the API token."""
global _pm, _token
_pm = PasswordManager()
_token = secrets.token_urlsafe(16)
print(f"API token: {_token}")
origins = [
o.strip()
for o in os.getenv("SEEDPASS_CORS_ORIGINS", "").split(",")
if o.strip()
]
if origins and app.middleware_stack is None:
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_methods=["*"],
allow_headers=["*"],
)
return _token
@app.get("/api/v1/entry")
def search_entry(query: str, authorization: str | None = Header(None)) -> List[Any]:
_check_token(authorization)
assert _pm is not None
results = _pm.entry_manager.search_entries(query)
return [
{
"id": idx,
"label": label,
"username": username,
"url": url,
"archived": archived,
}
for idx, label, username, url, archived in results
]
@app.get("/api/v1/entry/{entry_id}")
def get_entry(entry_id: int, authorization: str | None = Header(None)) -> Any:
_check_token(authorization)
assert _pm is not None
entry = _pm.entry_manager.retrieve_entry(entry_id)
if entry is None:
raise HTTPException(status_code=404, detail="Not found")
return entry
@app.get("/api/v1/config/{key}")
def get_config(key: str, authorization: str | None = Header(None)) -> Any:
_check_token(authorization)
assert _pm is not None
value = _pm.config_manager.load_config(require_pin=False).get(key)
if value is None:
raise HTTPException(status_code=404, detail="Not found")
return {"key": key, "value": value}
@app.get("/api/v1/fingerprint")
def list_fingerprints(authorization: str | None = Header(None)) -> List[str]:
_check_token(authorization)
assert _pm is not None
return _pm.fingerprint_manager.list_fingerprints()
@app.get("/api/v1/nostr/pubkey")
def get_nostr_pubkey(authorization: str | None = Header(None)) -> Any:
_check_token(authorization)
assert _pm is not None
return {"npub": _pm.nostr_client.key_manager.get_npub()}
@app.post("/api/v1/shutdown")
async def shutdown_server(authorization: str | None = Header(None)) -> dict[str, str]:
_check_token(authorization)
asyncio.get_event_loop().call_soon(sys.exit, 0)
return {"status": "shutting down"}

View File

@@ -5,6 +5,8 @@ import typer
from password_manager.manager import PasswordManager
from password_manager.entry_types import EntryType
import uvicorn
from . import api as api_module
app = typer.Typer(help="SeedPass command line interface")
@@ -23,6 +25,7 @@ nostr_app = typer.Typer(help="Interact with Nostr relays")
config_app = typer.Typer(help="Manage configuration values")
fingerprint_app = typer.Typer(help="Manage seed profiles")
util_app = typer.Typer(help="Utility commands")
api_app = typer.Typer(help="Run the API server")
app.add_typer(entry_app, name="entry")
app.add_typer(vault_app, name="vault")
@@ -30,6 +33,7 @@ app.add_typer(nostr_app, name="nostr")
app.add_typer(config_app, name="config")
app.add_typer(fingerprint_app, name="fingerprint")
app.add_typer(util_app, name="util")
app.add_typer(api_app, name="api")
def _get_pm(ctx: typer.Context) -> PasswordManager:
@@ -169,3 +173,26 @@ def fingerprint_list(ctx: typer.Context) -> None:
def generate_password(ctx: typer.Context, length: int = 24) -> None:
"""Generate a strong password."""
typer.echo(f"Generate password of length {length} for {ctx.obj.get('fingerprint')}")
@api_app.command("start")
def api_start(host: str = "127.0.0.1", port: int = 8000) -> None:
"""Start the SeedPass API server."""
token = api_module.start_server()
typer.echo(f"API token: {token}")
uvicorn.run(api_module.app, host=host, port=port)
@api_app.command("stop")
def api_stop(host: str = "127.0.0.1", port: int = 8000) -> None:
"""Stop the SeedPass API server."""
import requests
try:
requests.post(
f"http://{host}:{port}/api/v1/shutdown",
headers={"Authorization": f"Bearer {api_module._token}"},
timeout=2,
)
except Exception as exc: # pragma: no cover - best effort
typer.echo(f"Failed to stop server: {exc}")

50
src/tests/test_api.py Normal file
View File

@@ -0,0 +1,50 @@
from types import SimpleNamespace
from pathlib import Path
import sys
import pytest
from fastapi.testclient import TestClient
sys.path.append(str(Path(__file__).resolve().parents[1]))
from seedpass import api
@pytest.fixture
def client(monkeypatch):
dummy = SimpleNamespace(
entry_manager=SimpleNamespace(
search_entries=lambda q: [(1, "Site", "user", "url", False)],
retrieve_entry=lambda i: {"label": "Site"},
),
config_manager=SimpleNamespace(
load_config=lambda require_pin=False: {"k": "v"}
),
fingerprint_manager=SimpleNamespace(list_fingerprints=lambda: ["fp"]),
nostr_client=SimpleNamespace(
key_manager=SimpleNamespace(get_npub=lambda: "np")
),
)
monkeypatch.setattr(api, "PasswordManager", lambda: dummy)
monkeypatch.setenv("SEEDPASS_CORS_ORIGINS", "http://example.com")
token = api.start_server()
client = TestClient(api.app)
return client, token
def test_cors_and_auth(client):
cl, token = client
headers = {"Authorization": f"Bearer {token}", "Origin": "http://example.com"}
res = cl.get("/api/v1/entry", params={"query": "s"}, headers=headers)
assert res.status_code == 200
assert res.headers.get("access-control-allow-origin") == "http://example.com"
def test_invalid_token(client):
cl, _token = client
res = cl.get(
"/api/v1/entry",
params={"query": "s"},
headers={"Authorization": "Bearer bad"},
)
assert res.status_code == 401