from __future__ import annotations import base64 import mimetypes import random import re import time import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any from fastapi import Depends, FastAPI, Header, HTTPException, Request, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel from .config import Config from .connection_manager import ConnectionManager from .models import DeviceHello, DispatchRequest, DispatchResult, UpdateManifest from .store import store app = FastAPI(title="NotifyPulse V4 Server", version="4.0.0-alpha") manager = ConnectionManager() _pwa_clients: dict[str, dict[str, Any]] = {} _pending_wallpapers: dict[str, dict[str, str]] = {} _pwa_overlay: dict[str, Any] = {"active": False, "image": None, "until_ms": 0} _IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} ALL_BLOCKS = [ {"id": "notifications", "name": "Notifications", "description": "Server-dispatched notifications", "icon": "🔔"}, {"id": "wallpaper", "name": "Wallpaper", "description": "Wallpaper command dispatch", "icon": "🖼️"}, {"id": "overlay", "name": "Overlay", "description": "Overlay command dispatch", "icon": "✨"}, {"id": "timer", "name": "Timer", "description": "Scheduled triggers (server-side in V4)", "icon": "⏱️"}, {"id": "mobile_wallpaper", "name": "Mobile Wallpaper", "description": "PWA wallpaper receiver", "icon": "📱"}, ] app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def require_token(x_api_token: str = Header(default="")) -> None: # Optional auth: if NP4_API_TOKEN is empty, routes are open. if Config.api_token and x_api_token != Config.api_token: raise HTTPException(status_code=401, detail="invalid API token") def _active_usecase_id() -> str: return str(store.get_settings().get("active_usecase", "")) def _active_usecase() -> dict[str, Any] | None: uid = _active_usecase_id() for uc in store.get_usecases(): if uc.get("id") == uid: return uc return None def _set_active_usecase(uid: str) -> None: store.update_settings({"active_usecase": uid}) def _parse_entry(line: str) -> dict[str, Any]: parts = [p.strip() for p in line.split("|", 1)] text = parts[0] weight = None trigger_time = None if len(parts) > 1: rhs = parts[1] m_pct = re.match(r"^(\d+(?:\.\d+)?)\s*%$", rhs) m_time = re.match(r"^([01]?\d|2[0-3]):([0-5]\d)$", rhs) if m_pct: weight = float(m_pct.group(1)) elif m_time: trigger_time = f"{int(m_time.group(1)):02d}:{m_time.group(2)}" return {"text": text, "weight": weight, "trigger_time": trigger_time} def _img_to_datauri(path: Path) -> str | None: try: mime = mimetypes.guess_type(path.name)[0] or "image/jpeg" data = base64.b64encode(path.read_bytes()).decode("ascii") return f"data:{mime};base64,{data}" except Exception: return None def _legacy_log_item(msg: str, level: str = "info") -> dict[str, Any]: return {"time": utc_now_iso(), "msg": msg, "level": level} def _event_to_legacy_log(event: dict[str, Any]) -> dict[str, Any]: if "msg" in event: return { "time": event.get("time", utc_now_iso()), "msg": str(event.get("msg", "")), "level": str(event.get("level", "info")), } et = str(event.get("event", "event")) level = "info" msg = et if et == "device_connected": msg = f"Client connected: {event.get('device_id', '?')} ({event.get('platform', 'unknown')})" elif et == "device_disconnected": msg = f"Client disconnected: {event.get('device_id', '?')}" elif et == "command_dispatched": msg = f"Dispatch {event.get('action', '?')} -> {event.get('delivered_count', 0)} client(s)" elif et == "command_ack": st = str(event.get("status", "ok")) detail = str(event.get("detail", "")).strip() msg = f"Ack {event.get('command_id', '?')} from {event.get('device_id', '?')}: {st}" if detail: msg += f" ({detail})" if st in {"error", "unsupported"}: level = "warn" if st == "unsupported" else "error" elif et == "client_event": msg = f"Client event {event.get('name', '?')} from {event.get('device_id', '?')}" elif et == "socket_error": msg = f"Socket error ({event.get('device_id', '?')}): {event.get('error', '')}" level = "error" elif et == "unknown_message": msg = f"Unknown message from {event.get('device_id', '?')}" level = "warn" return {"time": event.get("time", utc_now_iso()), "msg": msg, "level": level} async def _append_log(msg: str, level: str = "info") -> None: await manager.add_event( { "event": "legacy_log", "msg": msg, "level": level, "time": utc_now_iso(), } ) def _ensure_usecase_dirs(uid: str) -> None: base = Config.data_dir / "usecases" / uid base.mkdir(parents=True, exist_ok=True) for sub in ("wallpapers", "overlay", "mobile"): (base / sub).mkdir(parents=True, exist_ok=True) def _list_images(folder: Path) -> list[Path]: if not folder.exists(): return [] return [ p for p in folder.iterdir() if p.is_file() and p.suffix.lower() in _IMAGE_EXTS ] def _pick_usecase_image(bucket: str) -> Path | None: uc = _active_usecase() if not uc: return None uid = str(uc.get("id", "")).strip() if not uid: return None files = _list_images(Config.data_dir / "usecases" / uid / bucket) if not files: return None return random.choice(files) def _pick_mobile_wallpapers() -> tuple[Path, Path] | None: uc = _active_usecase() if not uc: return None uid = str(uc.get("id", "")).strip() if not uid: return None files = _list_images(Config.data_dir / "usecases" / uid / "mobile") if not files: return None lock_pref = [p for p in files if re.search(r"(lock|locks|screenlock)", p.stem, re.IGNORECASE)] bg_pref = [p for p in files if re.search(r"(home|bg|wall|back)", p.stem, re.IGNORECASE)] if lock_pref and bg_pref: lock = random.choice(lock_pref) bg_choices = [p for p in bg_pref if p != lock] or bg_pref bg = random.choice(bg_choices) return lock, bg if len(files) >= 2: a, b = random.sample(files, 2) return a, b return files[0], files[0] def _build_asset_url(request: Request, uid: str, bucket: str, filename: str) -> str: base = str(request.base_url).rstrip("/") return f"{base}/api/v4/assets/usecases/{uid}/{bucket}/{filename}" def _queue_mobile_payload(payload: dict[str, str]) -> int: active_clients = [cid for cid, meta in _pwa_clients.items() if (time.time() - meta.get("last_seen", 0)) <= 90] if active_clients: for cid in active_clients: _pending_wallpapers[cid] = dict(payload) return len(active_clients) _pending_wallpapers["__broadcast__"] = dict(payload) return 0 def _pick_mobile_wallpaper_payload() -> dict[str, str] | None: uc = _active_usecase() if not uc: return None uid = str(uc.get("id", "")).strip() if not uid: return None chosen = _pick_mobile_wallpapers() if not chosen: return None lock_uri = _img_to_datauri(chosen[0]) bg_uri = _img_to_datauri(chosen[1]) if not lock_uri or not bg_uri: return None return {"lockscreen": lock_uri, "background": bg_uri} @app.get("/api/v4/health") async def health() -> dict[str, Any]: return { "ok": True, "service": "notifypulse-v4-server", "time": utc_now_iso(), } @app.get("/api/v4/state") async def api_state() -> dict[str, Any]: devices = [d.model_dump() for d in await manager.list_devices()] logs = await manager.get_logs() return { "version": app.version, "settings": store.get_settings(), "usecases": store.get_usecases(), "connected_devices": devices, "events": logs, } @app.get("/api/v4/settings") async def get_settings() -> dict[str, Any]: return store.get_settings() @app.put("/api/v4/settings", dependencies=[Depends(require_token)]) async def put_settings(payload: dict[str, Any]) -> dict[str, Any]: return store.update_settings(payload) @app.get("/api/v4/usecases") async def get_usecases() -> dict[str, Any]: return {"usecases": store.get_usecases()} class UsecasesPayload(BaseModel): usecases: list[dict[str, Any]] @app.put("/api/v4/usecases", dependencies=[Depends(require_token)]) async def put_usecases(payload: UsecasesPayload) -> dict[str, Any]: updated = store.set_usecases(payload.usecases) await manager.dispatch_command( command_id=str(uuid.uuid4()), action="sync_settings", data={"settings": store.get_settings(), "usecases": updated}, target_scope="all", device_ids=[], ) return {"ok": True, "usecases": updated} @app.post("/api/v4/dispatch", dependencies=[Depends(require_token)]) async def dispatch(payload: DispatchRequest) -> DispatchResult: command_id = str(uuid.uuid4()) delivered, skipped = await manager.dispatch_command( command_id=command_id, action=payload.action, data=payload.data, target_scope=payload.target_scope, device_ids=payload.device_ids, ) return DispatchResult(command_id=command_id, delivered_to=delivered, skipped=skipped) @app.get("/api/v4/update/manifest/{platform}") async def get_update_manifest(platform: str) -> UpdateManifest: return store.get_update_manifest(platform) @app.put("/api/v4/update/manifest/{platform}", dependencies=[Depends(require_token)]) async def put_update_manifest(platform: str, manifest: UpdateManifest) -> dict[str, Any]: store.set_update_manifest(platform, manifest) return {"ok": True} @app.websocket("/ws/device") async def device_socket(websocket: WebSocket) -> None: await websocket.accept() device_id = "" try: first = await websocket.receive_json() if first.get("type") != "hello": await websocket.send_json({"type": "error", "error": "first frame must be hello"}) await websocket.close(code=1008) return token = first.get("token", "") if Config.api_token and token != Config.api_token: await websocket.send_json({"type": "error", "error": "invalid token"}) await websocket.close(code=1008) return hello = DeviceHello.model_validate(first.get("payload", {})) device_id = hello.device_id await manager.connect(hello, websocket) await websocket.send_json( { "type": "welcome", "server_time": utc_now_iso(), "settings": store.get_settings(), } ) while True: msg = await websocket.receive_json() mtype = msg.get("type") if mtype == "heartbeat": await manager.touch(device_id) await websocket.send_json({"type": "heartbeat_ack", "time": utc_now_iso()}) elif mtype == "ack": await manager.add_event( { "event": "command_ack", "device_id": device_id, "command_id": msg.get("command_id", ""), "status": msg.get("status", "ok"), "detail": msg.get("detail", ""), } ) elif mtype == "event": await manager.add_event( { "event": "client_event", "device_id": device_id, "name": msg.get("name", ""), "data": msg.get("data", {}), } ) else: await manager.add_event( { "event": "unknown_message", "device_id": device_id, "payload": msg, } ) except WebSocketDisconnect: pass except Exception as exc: await manager.add_event( { "event": "socket_error", "device_id": device_id, "error": str(exc), } ) finally: if device_id: await manager.disconnect(device_id) # ---------------------------- # Legacy compatibility routes # ---------------------------- @app.get("/api/usecases") async def legacy_get_usecases() -> dict[str, Any]: return { "usecases": store.get_usecases(), "active": _active_usecase_id(), "blocks": ALL_BLOCKS, } @app.post("/api/usecases") async def legacy_create_usecase(payload: dict[str, Any]) -> dict[str, Any]: name = str(payload.get("name", "")).strip() if not name: return {"ok": False, "error": "name required"} uid = re.sub(r"[^a-z0-9_]", "_", name.lower()) + f"_{random.randint(100,999)}" uc = { "id": uid, "name": name, "color": payload.get("color", "#4f8ef7"), "group": str(payload.get("group", "")).strip(), "blocks": payload.get("blocks", ["notifications"]), "min_interval": int(payload.get("min_interval", 10)), "max_interval": int(payload.get("max_interval", 30)), "notifications": payload.get("notifications", []), "dashboard_layout": payload.get("dashboard_layout", {}), } usecases = store.get_usecases() usecases.append(uc) store.set_usecases(usecases) _ensure_usecase_dirs(uid) await _append_log(f"Usecase created: {name}") return {"ok": True, "usecase": uc} @app.put("/api/usecases/{uid}") async def legacy_update_usecase(uid: str, payload: dict[str, Any]) -> dict[str, Any]: usecases = store.get_usecases() found = None for uc in usecases: if uc.get("id") == uid: uc.update({k: v for k, v in payload.items() if k != "id"}) found = uc break if not found: return {"ok": False, "error": "not found"} store.set_usecases(usecases) _ensure_usecase_dirs(uid) await _append_log(f"Usecase updated: {found.get('name', uid)}") return {"ok": True, "usecase": found} @app.delete("/api/usecases/{uid}") async def legacy_delete_usecase(uid: str) -> dict[str, Any]: usecases = store.get_usecases() after = [u for u in usecases if u.get("id") != uid] if len(after) == len(usecases): return {"ok": False, "error": "not found"} store.set_usecases(after) if _active_usecase_id() == uid: _set_active_usecase(after[0]["id"] if after else "") await _append_log(f"Usecase deleted: {uid}", "warn") return {"ok": True} @app.post("/api/usecases/{uid}/activate") async def legacy_activate_usecase(uid: str) -> dict[str, Any]: usecases = store.get_usecases() exists = any(u.get("id") == uid for u in usecases) if not exists: return {"ok": False, "error": "not found"} _set_active_usecase(uid) uc = next((u for u in usecases if u.get("id") == uid), {}) await _append_log(f"Switched to usecase '{uc.get('name', uid)}'") return {"ok": True, "active": uid} @app.post("/api/usecases/{uid}/open_folder") async def legacy_open_usecase_folder(uid: str) -> dict[str, Any]: _ensure_usecase_dirs(uid) d = Config.data_dir / "usecases" / uid return {"ok": True, "path": str(d)} @app.get("/api/state") async def legacy_state() -> dict[str, Any]: settings = store.get_settings() uc = _active_usecase() logs_raw = await manager.get_logs() logs = [_event_to_legacy_log(e) for e in logs_raw[:100]] connected_devices = [d.model_dump() for d in await manager.list_devices()] entries: list[dict[str, Any]] = [] if uc: entries = [_parse_entry(x) for x in uc.get("notifications", []) if str(x).strip()] min_interval = int((uc or {}).get("min_interval", 10)) next_fire_at = int(time.time()) + (min_interval * 60) cutoff = time.time() - 60 stale = [k for k, v in _pwa_clients.items() if v.get("last_seen", 0) < cutoff] for k in stale: _pwa_clients.pop(k, None) return { "app_name": (uc or {}).get("name") or settings.get("app_name", "NotifyPulse"), "settings_app_name": settings.get("app_name", "NotifyPulse"), "version": app.version, "paused": bool(settings.get("paused", False)), "next_fire_at": next_fire_at, "entries": entries, "log": logs, "pwa_clients": len(_pwa_clients), "connected_devices": connected_devices, "settings": settings, "active_usecase": _active_usecase_id(), "usecase": uc, } @app.post("/api/pause") async def legacy_pause() -> dict[str, Any]: settings = store.get_settings() paused = not bool(settings.get("paused", False)) store.update_settings({"paused": paused}) return {"paused": paused} @app.post("/api/fire_now") async def legacy_fire_now(request: Request) -> dict[str, Any]: uc = _active_usecase() if not uc: return {"ok": False, "error": "no active usecase"} entries = [str(x) for x in uc.get("notifications", []) if str(x).strip()] if not entries: return {"ok": False, "error": "No entries"} entry = random.choice(entries).split("|", 1)[0].strip() tl = entry.lower() if tl == "change.wallpaper": img = _pick_usecase_image("wallpapers") if not img: await _append_log("Fire now: wallpaper missing", "warn") return {"ok": False, "error": "No wallpaper images found"} uid = str(uc.get("id", "")) url = _build_asset_url(request, uid, "wallpapers", img.name) res = await dispatch( DispatchRequest( action="wallpaper_set", data={"url": url}, target_scope="windows", ) ) await _append_log(f"Fire now: wallpaper -> {img.name}") return { "ok": True, "fired": entry, "action": "wallpaper_set", "asset_url": url, "command_id": res.command_id, } if tl.startswith("show.overlay"): sec = 6 m = re.match(r"^show\.overlay\.(\d+)$", tl) if m: sec = max(1, min(60, int(m.group(1)))) img = _pick_usecase_image("overlay") if not img: await _append_log("Fire now: overlay missing", "warn") return {"ok": False, "error": "No overlay images found"} _pwa_overlay.update( { "active": True, "image": _img_to_datauri(img), "until_ms": int(time.time() * 1000) + (sec * 1000), } ) overlay_url = _build_asset_url(request, str(uc.get("id", "")), "overlay", img.name) settings = store.get_settings() opacity = float(settings.get("overlay_opacity", 0.4)) stretch = bool(settings.get("overlay_stretch", False)) res = await dispatch( DispatchRequest( action="overlay_show", data={ "url": overlay_url, "duration_ms": sec * 1000, "opacity": opacity, "stretch": stretch, }, target_scope="windows", ) ) await _append_log(f"Fire now: overlay -> {img.name} ({sec}s)") return { "ok": True, "fired": entry, "action": "overlay_show", "duration_sec": sec, "command_id": res.command_id, } if tl == "change.wallpaper.mobile": payload = _pick_mobile_wallpaper_payload() if not payload: await _append_log("Fire now: mobile wallpaper missing", "warn") return {"ok": False, "error": "No mobile images found"} queued = _queue_mobile_payload(payload) await _append_log(f"Fire now: mobile wallpaper queued -> {queued} client(s)") return {"ok": True, "fired": entry} cmd = DispatchRequest( action="notify", data={"title": uc.get("name", "NotifyPulse"), "message": entry}, target_scope="windows", ) res = await dispatch(cmd) await _append_log(f"Fire now notification -> {entry}") return {"ok": True, "fired": entry, "command_id": res.command_id} @app.get("/api/settings") async def legacy_get_settings() -> dict[str, Any]: return store.get_settings() @app.post("/api/settings") async def legacy_post_settings(payload: dict[str, Any]) -> dict[str, Any]: store.update_settings(payload) return {"ok": True} @app.get("/api/log") async def legacy_log(limit: int = 100) -> list[dict[str, Any]]: logs = await manager.get_logs() adapted = [_event_to_legacy_log(e) for e in logs] return adapted[: min(limit, 200)] @app.post("/api/test_notification") async def legacy_test_notification(payload: dict[str, Any] | None = None) -> dict[str, Any]: p = payload or {} cmd = DispatchRequest( action="notify", data={"title": "NotifyPulse", "message": p.get("message", "Test from Web UI")}, target_scope="windows", ) res = await dispatch(cmd) if not res.delivered_to: await _append_log("Desktop test notification: no Windows clients connected", "warn") return {"ok": False, "error": "No Windows clients connected"} await _append_log("Desktop test notification sent") return {"ok": True, "command_id": res.command_id} @app.post("/api/test_wallpaper") async def legacy_test_wallpaper(request: Request) -> dict[str, Any]: uc = _active_usecase() if not uc: return {"ok": False, "error": "No active usecase"} img = _pick_usecase_image("wallpapers") if not img: await _append_log("Desktop test wallpaper: no wallpaper images found", "warn") return {"ok": False, "error": "No images in wallpapers folder"} uid = str(uc.get("id", "")) url = _build_asset_url(request, uid, "wallpapers", img.name) res = await dispatch( DispatchRequest( action="wallpaper_set", data={"url": url}, target_scope="windows", ) ) if not res.delivered_to: await _append_log("Desktop test wallpaper: no Windows clients connected", "warn") return {"ok": False, "error": "No Windows clients connected"} await _append_log(f"Desktop test wallpaper -> {img.name}") return {"ok": True, "command_id": res.command_id} @app.post("/api/test_overlay") async def legacy_test_overlay(request: Request) -> dict[str, Any]: uc = _active_usecase() if not uc: return {"ok": False, "error": "No active usecase"} img = _pick_usecase_image("overlay") if not img: await _append_log("Desktop test overlay: no overlay images found", "warn") return {"ok": False, "error": "No images in overlay folder"} uid = str(uc.get("id", "")) url = _build_asset_url(request, uid, "overlay", img.name) settings = store.get_settings() dur_ms = int(float(settings.get("overlay_duration", 6)) * 1000) opacity = float(settings.get("overlay_opacity", 0.4)) stretch = bool(settings.get("overlay_stretch", False)) _pwa_overlay.update( { "active": True, "image": _img_to_datauri(img), "until_ms": int(time.time() * 1000) + dur_ms, } ) res = await dispatch( DispatchRequest( action="overlay_show", data={ "url": url, "duration_ms": dur_ms, "opacity": opacity, "stretch": stretch, }, target_scope="windows", ) ) if not res.delivered_to: await _append_log("Desktop test overlay: no Windows clients connected", "warn") return {"ok": False, "error": "No Windows clients connected"} await _append_log(f"Desktop test overlay -> {img.name}") return {"ok": True, "command_id": res.command_id} @app.post("/api/test_mobile_wallpaper") async def legacy_test_mobile() -> dict[str, Any]: payload = _pick_mobile_wallpaper_payload() if not payload: await _append_log("Mobile test wallpaper: no mobile images found", "warn") return {"ok": False, "error": "no mobile wallpaper found for active usecase"} queued = _queue_mobile_payload(payload) await _append_log(f"Mobile test wallpaper queued -> {queued} client(s)") return {"ok": True} @app.post("/api/pwa/ping") async def legacy_pwa_ping(payload: dict[str, Any]) -> dict[str, Any]: cid = str(payload.get("client_id", "")).strip() if not cid: return {"error": "no client_id"} _pwa_clients[cid] = {"last_seen": time.time()} return {"ok": True, "clients": len(_pwa_clients)} @app.get("/api/pwa/wallpaper") async def legacy_pwa_wallpaper(client_id: str = "") -> dict[str, Any]: cid = client_id.strip() if not cid: return {"pending": False} cmd = _pending_wallpapers.pop(cid, None) or _pending_wallpapers.pop("__broadcast__", None) if not cmd: return {"pending": False} return {"pending": True, **cmd} @app.get("/api/pwa/app_name") async def legacy_pwa_app_name() -> dict[str, Any]: settings = store.get_settings() return { "app_name": (_active_usecase() or {}).get("name") or settings.get("app_name", "NotifyPulse"), "settings_app_name": settings.get("app_name", "NotifyPulse"), "active": _active_usecase_id(), "usecases": [ { "id": u.get("id", ""), "name": u.get("name", ""), "color": u.get("color", "#4f8ef7"), "group": u.get("group", ""), "blocks": u.get("blocks", []), "min_interval": u.get("min_interval", 10), "max_interval": u.get("max_interval", 30), } for u in store.get_usecases() ], "version": app.version, "blocks": ALL_BLOCKS, "pwa_bg_blur": settings.get("pwa_bg_blur", 18), "pwa_bg_opacity": settings.get("pwa_bg_opacity", 0.72), } @app.get("/api/pwa/splash_image") async def legacy_pwa_splash() -> dict[str, Any]: splash_dir = Config.data_dir / "splash" candidates: list[Path] = [] if splash_dir.exists(): candidates = [ p for p in splash_dir.iterdir() if p.is_file() and p.suffix.lower() in {".png", ".jpg", ".jpeg", ".webp"} ] if not candidates: for name in ("main.png", "main.jpg", "main.jpeg", "main.webp"): p = Config.data_dir / name if p.exists(): candidates = [p] break if not candidates: return {"image": "", "has_custom": False} chosen = random.choice(candidates) return {"image": _img_to_datauri(chosen) or "", "has_custom": True} @app.get("/api/pwa/overlay") async def legacy_pwa_overlay() -> dict[str, Any]: if not _pwa_overlay.get("active"): return {"active": False, "image": None} remaining = int(_pwa_overlay.get("until_ms", 0) - (time.time() * 1000)) if remaining <= 0: _pwa_overlay.update({"active": False, "image": None, "until_ms": 0}) return {"active": False, "image": None} return {"active": True, "image": _pwa_overlay.get("image"), "remaining_ms": remaining} @app.post("/api/pwa/trigger_wallpaper") async def legacy_pwa_trigger_wallpaper() -> dict[str, Any]: payload = _pick_mobile_wallpaper_payload() if not payload: await _append_log("PWA wallpaper request: no mobile images found", "warn") return {"ok": False, "error": "no mobile wallpaper found for active usecase"} queued = _queue_mobile_payload(payload) await _append_log(f"PWA wallpaper queued -> {queued} client(s)") return {"ok": True} @app.post("/api/pwa/activate_usecase") async def legacy_pwa_activate_usecase(payload: dict[str, Any]) -> dict[str, Any]: uid = str(payload.get("usecase_id", "")).strip() usecases = store.get_usecases() uc = next((u for u in usecases if u.get("id") == uid), None) if not uc: return {"ok": False, "error": "not found"} _set_active_usecase(uid) return {"ok": True, "name": uc.get("name", "")} @app.get("/api/v4/assets/usecases/{uid}/{bucket}/{filename}") async def usecase_asset(uid: str, bucket: str, filename: str): if bucket not in {"wallpapers", "overlay", "mobile"}: raise HTTPException(status_code=404, detail="invalid asset bucket") if "/" in filename or "\\" in filename: raise HTTPException(status_code=400, detail="invalid filename") path = (Config.data_dir / "usecases" / uid / bucket / filename).resolve() root = (Config.data_dir / "usecases" / uid / bucket).resolve() if root not in path.parents and path != root: raise HTTPException(status_code=400, detail="invalid asset path") if not path.exists() or not path.is_file(): raise HTTPException(status_code=404, detail="asset not found") return FileResponse(path) def _safe_file(path: Path) -> FileResponse | JSONResponse: if path.exists() and path.is_file(): return FileResponse(path) return JSONResponse( {"error": "file not found", "path": str(path)}, status_code=404, ) @app.get("/") async def web_ui_index(): return _safe_file(Config.ui_dir / "index.html") @app.get("/pwa") @app.get("/pwa/") async def pwa_index(): return _safe_file(Config.pwa_dir / "index.html") @app.get("/pwa/{filename:path}") async def pwa_assets(filename: str): return _safe_file(Config.pwa_dir / filename)