# NotifyPulse - Multi-Profile Block Architecture # Config: AppData/Roaming/NotifyPulse/ # Web UI: http://localhost:5000 # PWA: http://:5000/pwa from __future__ import annotations from queue import Empty, Queue import sys, os, ctypes, ctypes.wintypes, threading, time, random, re, json import webbrowser, shutil, hashlib, base64, mimetypes import subprocess from datetime import datetime from pathlib import Path from collections import deque # ── Dependency check ────────────────────────────────────────────────────────── try: import keyboard from winotify import Notification, audio import pystray from PIL import Image, ImageDraw, ImageTk import schedule import tkinter as tk from flask import Flask, jsonify, request, send_file, abort, Response from flask_cors import CORS except ImportError as e: import tkinter as tk from tkinter import messagebox root = tk.Tk(); root.withdraw() messagebox.showerror("Missing dependency", f"Run: pip install -r requirements.txt\n\nMissing: {e}") sys.exit(1) # ── Constants ───────────────────────────────────────────────────────────────── HOTKEY = "F13" WEB_PORT = 5000 DEFAULT_APP_NAME = "NotifyPulse" VERSION = "3.0.0" MOBILE_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".webp", ".heic", ".heif") SPI_SETDESKWALLPAPER = 0x0014 SPIF_UPDATEINIFILE = 0x01 SPIF_SENDCHANGE = 0x02 # ── Available Blocks ────────────────────────────────────────────────────────── # Each block has an id, display name, description, and optional required dirs ALL_BLOCKS = [ { "id": "notifications", "name": "Notifications", "description": "Random & scheduled toast notifications", "icon": "🔔", "dirs": [], }, { "id": "wallpaper", "name": "Wallpaper", "description": "Random desktop wallpaper changes", "icon": "🖼️", "dirs": ["wallpapers"], }, { "id": "overlay", "name": "Overlay", "description": "Fullscreen image overlay on monitor", "icon": "✨", "dirs": ["overlay"], }, { "id": "timer", "name": "Timer", "description": "Scheduled time-based triggers", "icon": "⏱️", "dirs": [], }, { "id": "mobile_wallpaper", "name": "Mobile Wallpaper", "description": "Push wallpapers to phone via PWA", "icon": "📱", "dirs": ["mobile"], }, ] # ── Paths ───────────────────────────────────────────────────────────────────── APPDATA_DIR: Path | None = None USECASES_FILE: Path | None = None # usecases.json SETTINGS_FILE: Path | None = None # global settings.json ICON_FILE: Path | None = None # ── Global state ────────────────────────────────────────────────────────────── paused: bool = False state_lock = threading.Lock() tray_icon = None root_instance = None APP_NAME: str = DEFAULT_APP_NAME APP_ID: str = "NotifyPulse" _active_usecase_id: str = "" # which usecase is currently running _usecases: list = [] # list of usecase dicts _usecases_lock = threading.Lock() _notification_queue: list = [] _entries: list = [] _entries_lock = threading.Lock() _file_mtime: float = 0.0 _next_fire_at: float = 0.0 _active_overlays: list = [] _overlays_lock = threading.Lock() _default_wallpaper: str = "" _last_wallpaper: str = "" _pwa_clients: dict = {} _pwa_clients_lock = threading.Lock() _pending_wallpapers: dict = {} _pending_lock = threading.Lock() _log: deque = deque(maxlen=200) _log_lock = threading.Lock() _settings: dict = {} _notification_wake = threading.Event() _reload_flag = threading.Event() # set by apply_usecase to reload without firing _config_monitor = None def _ps_quote_literal(path: Path) -> str: return str(path).replace("'", "''") def _ps_read_file_utf8(path: Path) -> str | None: if os.name != "nt": return None quoted = _ps_quote_literal(path) cmd = ( "$ErrorActionPreference='Stop';" f"$p='{quoted}';" "if (Test-Path -LiteralPath $p) {" " $b=[IO.File]::ReadAllBytes($p);" " [Console]::Out.Write('1:' + [Convert]::ToBase64String($b));" "} else {" " [Console]::Out.Write('0:');" "}" ) try: out = subprocess.check_output( ["powershell", "-NoProfile", "-Command", cmd], encoding="ascii", errors="ignore", ).strip() if not out.startswith("1:"): return None payload = out[2:] return "" if payload == "" else base64.b64decode(payload).decode("utf-8") except Exception: return None def _ps_write_file_utf8(path: Path, content: str) -> bool: if os.name != "nt": return False quoted = _ps_quote_literal(path) payload = base64.b64encode(content.encode("utf-8")).decode("ascii") cmd = ( "$ErrorActionPreference='Stop';" f"$p='{quoted}';" "$d=[IO.Path]::GetDirectoryName($p);" "if ($d) { [IO.Directory]::CreateDirectory($d) | Out-Null };" f"[IO.File]::WriteAllBytes($p,[Convert]::FromBase64String('{payload}'));" ) try: subprocess.check_call( ["powershell", "-NoProfile", "-Command", cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return True except Exception: return False # ── Logging ─────────────────────────────────────────────────────────────────── def log(msg: str, level: str = "info"): ts = datetime.now().strftime("%H:%M:%S") entry = {"time": ts, "msg": msg, "level": level} with _log_lock: _log.appendleft(entry) # ── Settings ────────────────────────────────────────────────────────────────── DEFAULT_SETTINGS = { "overlay_stretch": False, "overlay_opacity": 0.4, "overlay_duration": 6, "overlay_monitor": 0, # 0 = primary, -1 = all monitors "startup_toast": True, "hotkey": "F13", "web_port": 5000, "theme": "dark", "notify_sound": True, "auto_open_browser": True, "active_usecase": "", # empty = no usecase selected yet "app_name": "NotifyPulse", # shown before any usecase is selected "minimize_to_tray": True, # hide window to tray instead of closing "log_max_entries": 100, # how many log entries to keep in memory "notification_duration": 5, # toast display duration in seconds "wallpaper_fit": "fill", # fill | fit | stretch | center | tile "pwa_port": 5000, # port exposed to the LAN for the PWA "run_on_startup": False, # register in Windows startup "confirm_delete": True, # show confirmation before deleting usecases "entry_display_mode": "percent", # percent | chance — default entry list display } def load_settings() -> dict: global _settings if SETTINGS_FILE and SETTINGS_FILE.exists(): try: raw = json.loads(SETTINGS_FILE.read_text("utf-8")) _settings = {**DEFAULT_SETTINGS, **raw} return _settings except Exception: pass _settings = dict(DEFAULT_SETTINGS) return _settings def save_settings(): if SETTINGS_FILE: try: SETTINGS_FILE.write_text(json.dumps(_settings, indent=2), "utf-8") except Exception as e: log(f"Settings save error: {e}", "error") def get_setting(key: str, default=None): return _settings.get(key, DEFAULT_SETTINGS.get(key, default)) # ── Usecase management ──────────────────────────────────────────────────────── DEFAULT_USECASE = { "id": "", "name": "Default", "color": "#4f8ef7", "blocks": ["notifications", "wallpaper", "overlay", "timer", "mobile_wallpaper"], "min_interval": 10, "max_interval": 30, "notifications": [], } def usecase_dir(uc: dict) -> Path: """Return the folder for this usecase inside APPDATA_DIR.""" safe = re.sub(r"[^A-Za-z0-9_\-]", "_", uc["id"]) return APPDATA_DIR / "usecases" / safe def ensure_usecase_dirs(uc: dict): """Create all required dirs for enabled blocks.""" base = usecase_dir(uc) base.mkdir(parents=True, exist_ok=True) enabled = set(uc.get("blocks", [])) for block in ALL_BLOCKS: if block["id"] in enabled: for d in block["dirs"]: (base / d).mkdir(exist_ok=True) def load_usecases() -> list: global _usecases if USECASES_FILE and USECASES_FILE.exists(): try: raw = json.loads(USECASES_FILE.read_text("utf-8")) if isinstance(raw, list): _usecases = raw elif isinstance(raw, dict): # Wrapped format — shouldn't happen but handle it _usecases = list(raw.values()) if raw else [] else: _usecases = [] return _usecases except Exception: pass # First run — create a sample usecase sample = { "id": "wellness", "name": "Wellness", "color": "#22c55e", "blocks": ["notifications", "wallpaper", "overlay"], "min_interval": 10, "max_interval": 30, "notifications": [ "Take a short break and stretch! | 35%", "Drink some water — stay hydrated! | 30%", "Check your posture! | 20%", "Deep breath — you got this! | 15%", "change.wallpaper | 20%", "show.overlay | 15%", "Morning standup time! | 09:00", "End of day — wrap up! | 17:30", ], } _usecases = [sample] save_usecases() return _usecases def save_usecases(): if USECASES_FILE: USECASES_FILE.write_text(json.dumps(_usecases, indent=2), "utf-8") def get_usecase(uid: str) -> dict | None: if not uid or not isinstance(uid, str): return None with _usecases_lock: for uc in _usecases: if isinstance(uc, dict) and uc.get("id") == uid: return uc return None def active_usecase() -> dict | None: return get_usecase(_active_usecase_id) # ── Config parsing ───────────────────────────────────────────────────────────── class Entry: __slots__ = ("text", "weight", "trigger_time") def __init__(self, text, weight, trigger_time): self.text = text self.weight = weight self.trigger_time = trigger_time def parse_entries(lines: list[str]) -> list[Entry]: entries = [] for raw in lines: line = raw.strip() if not line or line.startswith("#"): continue if line.startswith("@"): continue if "|" in line: text, tag = [p.strip() for p in line.split("|", 1)] else: text, tag = line, None if tag and re.match(r"^\d{1,2}:\d{2}$", tag): h, m = tag.split(":") h, m = int(h), int(m) if 0 <= h <= 23 and 0 <= m <= 59: entries.append(Entry(text, None, f"{h:02d}:{m:02d}")) else: entries.append(Entry(text, 1.0, None)) # invalid time → treat as weighted elif tag and tag.endswith("%"): try: w = float(tag[:-1]) except ValueError: w = 1.0 entries.append(Entry(text, w, None)) else: entries.append(Entry(text, 1.0, None)) return entries def apply_usecase(uc: dict, silent=False): global _active_usecase_id, APP_NAME, APP_ID global _entries, _next_fire_at, _file_mtime _active_usecase_id = uc["id"] APP_NAME = uc.get("name", DEFAULT_APP_NAME) APP_ID = re.sub(r"[^A-Za-z0-9]", "", APP_NAME) or "NotifyPulse" _settings["active_usecase"] = uc["id"] save_settings() lines = uc.get("notifications", []) entries = parse_entries(lines) mn = uc.get("min_interval", 10) * 60 mx = max(mn, uc.get("max_interval", 30) * 60) # guard: mx must be >= mn schedule.clear() for entry in (e for e in entries if e.trigger_time): def make_job(e): def job(): fire_entry(e) return job schedule.every().day.at(entry.trigger_time).do(make_job(entry)) with _entries_lock: _entries = entries interval = random.randint(mn, mx) _next_fire_at = time.time() + interval _reload_flag.set() _notification_wake.set() ensure_usecase_dirs(uc) update_tray_title() if not silent: log(f"Switched to usecase '{APP_NAME}' — {mn//60}–{mx//60}min, {len(entries)} entries") # ── Background worker ───────────────────────────────────────────────────────── class BackgroundWorker: def __init__(self): self.queue = Queue() self.thread: threading.Thread | None = None self.lock = threading.Lock() def enqueue(self, fn, *args, **kwargs): self.queue.put((fn, args, kwargs)) self._ensure_thread() def _ensure_thread(self): with self.lock: if self.thread and self.thread.is_alive(): return self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() def _run(self): while True: try: fn, args, kwargs = self.queue.get(timeout=5) except Empty: with self.lock: if self.queue.empty(): self.thread = None return continue try: fn(*args, **kwargs) except Exception as exc: log(f"Background task error: {exc}", "error") finally: self.queue.task_done() _bg = BackgroundWorker() def enqueue_bg(fn, *args, **kwargs): _bg.enqueue(fn, *args, **kwargs) # ── Block helpers ───────────────────────────────────────────────────────────── def block_enabled(block_id: str) -> bool: uc = active_usecase() if not uc: return False return block_id in uc.get("blocks", []) def uc_dir(sub: str = "") -> Path | None: uc = active_usecase() if not uc: return None base = usecase_dir(uc) return base / sub if sub else base # ── Wallpaper block ─────────────────────────────────────────────────────────── def get_current_wallpaper() -> str: try: import winreg key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop") val, _ = winreg.QueryValueEx(key, "WallPaper") winreg.CloseKey(key) return val except Exception: return "" def set_wallpaper(path: str): ctypes.windll.user32.SystemParametersInfoW( SPI_SETDESKWALLPAPER, 0, path, SPIF_UPDATEINIFILE | SPIF_SENDCHANGE) def do_wallpaper_change(): global _last_wallpaper d = uc_dir("wallpapers") if not d or not d.exists(): log("Wallpaper folder missing", "warn") return files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp") for f in d.glob(ext)] if not files: log("No wallpaper images found", "warn") return wp = str(random.choice(files)) _last_wallpaper = wp set_wallpaper(wp) log(f"Wallpaper → {Path(wp).name}") # ── Mobile wallpaper block ──────────────────────────────────────────────────── def _list_mobile_images(folder: Path) -> list[Path]: files = [] for ext in MOBILE_EXTS: files.extend(folder.glob(f"*{ext}")) return [p for p in files if p.is_file()] def _copy_for_send(src: Path, prefix: str, cache_dir: Path) -> Path: cache_dir.mkdir(exist_ok=True) for old in cache_dir.glob(f"{prefix}_*"): try: old.unlink() except Exception: pass ext = src.suffix if src.suffix else ".jpg" rand = random.randint(1000, 9999) dest = cache_dir / f"{prefix}_{rand}{ext.lower()}" shutil.copyfile(src, dest) return dest def do_mobile_wallpaper(): d = uc_dir("mobile") if not d or not d.exists(): log("Mobile wallpaper dir missing", "warn") return images = _list_mobile_images(d) if not images: log("No images in Mobile folder", "warn") return if len(images) >= 2: chosen = random.sample(images, 2) else: chosen = [images[0], images[0]] cache_dir = d / "_send_cache" ls_path = _copy_for_send(chosen[0], "Lockscreen", cache_dir) bg_path = _copy_for_send(chosen[1], "Background", cache_dir) with _pwa_clients_lock: client_ids = list(_pwa_clients.keys()) with _pending_lock: for cid in client_ids if client_ids else ["__broadcast__"]: _pending_wallpapers[cid] = { "lockscreen": str(ls_path), "background": str(bg_path), } log(f"Mobile wallpaper queued → {len(client_ids)} client(s)") # ── Overlay block ───────────────────────────────────────────────────────────── def _get_monitors(): monitors = [] try: MonitorEnumProc = ctypes.WINFUNCTYPE( ctypes.c_bool, ctypes.c_ulong, ctypes.c_ulong, ctypes.POINTER(ctypes.wintypes.RECT), ctypes.c_double) def _cb(hMon, hdcMon, lprcMon, dwData): r = lprcMon.contents monitors.append((r.left, r.top, r.right - r.left, r.bottom - r.top)) return True ctypes.windll.user32.EnumDisplayMonitors(None, None, MonitorEnumProc(_cb), 0) except Exception: pass return monitors or [None] def do_screen_overlay(duration_ms: int = 6000): d = uc_dir("overlay") if not d or not d.exists(): log("Overlay folder missing", "warn") return files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp") for f in d.glob(ext)] if not files: log("No overlay images found", "warn") return img_path = str(random.choice(files)) monitors = _get_monitors() opacity = float(get_setting("overlay_opacity", 0.4)) stretch = bool(get_setting("overlay_stretch", False)) log(f"Overlay: {Path(img_path).name} — {duration_ms//1000}s on {len(monitors)} screen(s)") def _make_tk_image(pil_img, sw, sh): if stretch: return ImageTk.PhotoImage(pil_img.resize((sw, sh), Image.Resampling.LANCZOS)) img_w, img_h = pil_img.size scale = min(sw / img_w, sh / img_h) nw, nh = int(img_w * scale), int(img_h * scale) resized = pil_img.resize((nw, nh), Image.Resampling.LANCZOS) canvas = Image.new("RGB", (sw, sh), "black") canvas.paste(resized, ((sw - nw) // 2, (sh - nh) // 2)) return ImageTk.PhotoImage(canvas) def create_overlay(): try: pil_img = Image.open(img_path) except Exception as e: log(f"Overlay open error: {e}", "error") return with state_lock: is_paused = paused for mon in monitors: if mon is None: tmp = tk.Toplevel(); tmp.withdraw(); tmp.update_idletasks() sw, sh, mx, my = tmp.winfo_screenwidth(), tmp.winfo_screenheight(), 0, 0 tmp.destroy() else: mx, my, sw, sh = mon ov = tk.Toplevel() ov.attributes("-topmost", True) ov.attributes("-alpha", opacity) ov.overrideredirect(True) ov.update_idletasks() hwnd = ctypes.windll.user32.GetAncestor(ov.winfo_id(), 2) or ov.winfo_id() style = ctypes.windll.user32.GetWindowLongW(hwnd, -20) ctypes.windll.user32.SetWindowLongW(hwnd, -20, style | 0x80000 | 0x20) ov.geometry(f"{sw}x{sh}+{mx}+{my}") try: tk_img = _make_tk_image(pil_img, sw, sh) lbl = tk.Label(ov, image=tk_img, bg="black") lbl.image = tk_img lbl.pack(fill="both", expand=True) except Exception as e: log(f"Overlay render error: {e}", "error") ov.destroy() continue entry = {"window": ov, "remaining_ms": duration_ms, "after_id": None, "started_at": time.monotonic()} def on_dismiss(e=entry, w=ov): with _overlays_lock: if e in _active_overlays: _active_overlays.remove(e) if w.winfo_exists(): w.destroy() def schedule_dismiss(e=entry, w=ov): e["started_at"] = time.monotonic() e["after_id"] = w.after(e["remaining_ms"], lambda: on_dismiss(e, w)) entry["schedule_dismiss"] = schedule_dismiss with _overlays_lock: _active_overlays.append(entry) if not is_paused: schedule_dismiss() else: ov.withdraw() if root_instance: root_instance.after(0, create_overlay) # ── Icon ────────────────────────────────────────────────────────────────────── def make_default_icon(size=256): img = Image.new("RGBA", (size, size), (0, 0, 0, 0)) d = ImageDraw.Draw(img) s = size / 64 d.ellipse([8*s, 20*s, 56*s, 56*s], fill="#4f8ef7") d.polygon([(32*s, 4*s), (16*s, 24*s), (48*s, 24*s)], fill="#4f8ef7") d.rectangle([24*s, 56*s, 40*s, 62*s], fill="#4f8ef7") return img def load_tray_icon(): if ICON_FILE and ICON_FILE.exists(): try: return Image.open(ICON_FILE).convert("RGBA") except Exception: pass return make_default_icon() # ── Toast ───────────────────────────────────────────────────────────────────── def send_toast(message: str, title: str | None = None): icon_path = str(ICON_FILE) if ICON_FILE and ICON_FILE.exists() else "" t = title or APP_NAME notif = Notification(app_id=APP_ID, title=t, msg=message, duration="short", icon=icon_path) if get_setting("notify_sound", True): notif.set_audio(audio.Default, loop=False) notif.show() log(f"Toast: {message}") # ── Notification logic ──────────────────────────────────────────────────────── OVERLAY_MAGIC = "show.overlay" WALLPAPER_MAGIC = "change.wallpaper" def parse_overlay_duration(text: str) -> int: parts = text.strip().lower().split(".") if len(parts) >= 3: try: return max(1, int(parts[2])) * 1000 except ValueError: pass return int(get_setting("overlay_duration", 6)) * 1000 def pick_weighted(entries): weighted = [e for e in entries if e.weight is not None] if not weighted: return None return random.choices(weighted, weights=[e.weight for e in weighted], k=1)[0] def _dispatch(text: str): tl = text.lower().strip() uc = active_usecase() blocks = set(uc.get("blocks", [])) if uc else set() if tl == WALLPAPER_MAGIC: if "wallpaper" in blocks: enqueue_bg(do_wallpaper_change) elif tl == "change.wallpaper.mobile": if "mobile_wallpaper" in blocks: enqueue_bg(do_mobile_wallpaper) elif tl == OVERLAY_MAGIC or tl.startswith(OVERLAY_MAGIC + "."): if "overlay" in blocks: dur = parse_overlay_duration(tl) enqueue_bg(do_screen_overlay, dur) else: if "notifications" in blocks: send_toast(text) def fire_entry(entry: Entry): text = entry.text.strip() with state_lock: if paused: _notification_queue.append(text) log(f"Queued (paused): {text}") return _dispatch(text) def flush_queued(queued: list): for item in queued: _dispatch(item) time.sleep(1.2) def notification_loop(): global _next_fire_at while True: uc = active_usecase() if uc and _active_usecase_id: mn = uc.get("min_interval", 10) * 60 mx = max(mn, uc.get("max_interval", 30) * 60) # guard: mx must be >= mn with _entries_lock: entries = list(_entries) # Only fire if this iteration was triggered by the timer expiring, # not by a reload signal from apply_usecase / PWA activation. if not _reload_flag.is_set(): chosen = pick_weighted(entries) if chosen: fire_entry(chosen) _reload_flag.clear() interval = random.randint(mn, mx) else: interval = 60 _next_fire_at = time.time() + interval log(f"Next in {interval//60}m {interval%60}s") while True: remaining = _next_fire_at - time.time() if remaining <= 0: break triggered = _notification_wake.wait(min(remaining, 5)) if triggered: _notification_wake.clear() break def schedule_runner(): while True: schedule.run_pending() time.sleep(1) # ── Pause / Resume ──────────────────────────────────────────────────────────── def pause_overlays(): def _do(): with _overlays_lock: overlays = list(_active_overlays) for e in overlays: ov = e["window"] if not ov.winfo_exists(): continue if e["after_id"] is not None: try: ov.after_cancel(e["after_id"]) except Exception: pass e["remaining_ms"] = max(0, e["remaining_ms"] - int((time.monotonic() - e["started_at"]) * 1000)) e["after_id"] = None ov.withdraw() if root_instance: root_instance.after(0, _do) def _do(): with _overlays_lock: overlays = list(_active_overlays) for e in overlays: ov = e["window"] if not ov.winfo_exists(): with _overlays_lock: if e in _active_overlays: _active_overlays.remove(e) continue if e["remaining_ms"] <= 0: ov.destroy() with _overlays_lock: if e in _active_overlays: _active_overlays.remove(e) continue ov.deiconify() e["schedule_dismiss"]() if root_instance: root_instance.after(0, _do) def toggle_pause(): global paused queued = [] with state_lock: paused = not paused if not paused: queued = _notification_queue.copy() _notification_queue.clear() if paused: log("Paused") pause_overlays() if _default_wallpaper: threading.Thread(target=set_wallpaper, args=(_default_wallpaper,), daemon=True).start() else: log("Resumed") resume_overlays() if _last_wallpaper: threading.Thread(target=set_wallpaper, args=(_last_wallpaper,), daemon=True).start() threading.Thread(target=flush_queued, args=(queued,), daemon=True).start() update_tray_title() # ── Tray ────────────────────────────────────────────────────────────────────── def update_tray_title(): if tray_icon: name = APP_NAME or DEFAULT_APP_NAME tray_icon.title = f"{name} V3 — {'PAUSED' if paused else 'Running'}" tray_icon.update_menu() def build_tray_menu(): def status_text(item): state = "Paused" if paused else "Running" uc = active_usecase() name = uc["name"] if uc else "None" return f"{state} | {name}" return pystray.Menu( pystray.MenuItem(status_text, lambda i, it: toggle_pause()), pystray.MenuItem("Open Web UI", lambda i, it: webbrowser.open(f"http://localhost:{WEB_PORT}")), pystray.Menu.SEPARATOR, pystray.MenuItem("Quit", lambda i, it: (i.stop(), os._exit(0))), ) def run_tray(): global tray_icon tray_icon = pystray.Icon("NotifyPulse", load_tray_icon(), "NotifyPulse V3", menu=build_tray_menu()) tray_icon.run() # ── Flask App ───────────────────────────────────────────────────────────────── flask_app = Flask(__name__, static_folder=None) CORS(flask_app, resources={r"/api/*": {"origins": "*"}, r"/pwa*": {"origins": "*"}}) import logging as _logging _logging.getLogger("werkzeug").setLevel(_logging.ERROR) PWA_DIR = Path(__file__).parent / "pwa" UI_DIR = Path(__file__).parent / "ui" def _img_to_datauri(path: str) -> str | None: try: p = Path(path) mime = mimetypes.guess_type(p.name)[0] or "image/jpeg" data = base64.b64encode(p.read_bytes()).decode() return f"data:{mime};base64,{data}" except Exception: return None # ── API: Usecases ───────────────────────────────────────────────────────────── @flask_app.route("/api/usecases", methods=["GET"]) def api_get_usecases(): with _usecases_lock: return jsonify({ "usecases": list(_usecases), "active": _active_usecase_id, "blocks": ALL_BLOCKS, }) @flask_app.route("/api/usecases", methods=["POST"]) def api_create_usecase(): data = request.get_json(silent=True) or {} name = data.get("name", "").strip() if not name: return jsonify({"ok": False, "error": "name required"}), 400 uid = re.sub(r"[^a-z0-9_]", "_", name.lower()) + f"_{random.randint(100,999)}" uc = { "id": uid, "name": name, "color": data.get("color", "#4f8ef7"), "blocks": data.get("blocks", ["notifications"]), "min_interval": int(data.get("min_interval", 10)), "max_interval": int(data.get("max_interval", 30)), "notifications": data.get("notifications", []), "dashboard_layout": data.get("dashboard_layout", {}), } with _usecases_lock: _usecases.append(uc) save_usecases() ensure_usecase_dirs(uc) log(f"Usecase created: {name}") return jsonify({"ok": True, "usecase": uc}) @flask_app.route("/api/usecases/", methods=["PUT"]) def api_update_usecase(uid): data = request.get_json(silent=True) or {} updated_uc = None is_active = False with _usecases_lock: for uc in _usecases: if uc["id"] == uid: uc.update({k: v for k, v in data.items() if k != "id"}) save_usecases() ensure_usecase_dirs(uc) is_active = (uid == _active_usecase_id) updated_uc = dict(uc) log(f"Usecase updated: {uc['name']}") break if updated_uc is None: return jsonify({"ok": False, "error": "not found"}), 404 # Call apply_usecase OUTSIDE the lock to avoid deadlock if is_active: apply_usecase(updated_uc) return jsonify({"ok": True, "usecase": updated_uc}) @flask_app.route("/api/usecases/", methods=["DELETE"]) def api_delete_usecase(uid): global _active_usecase_id new_active_uc = None with _usecases_lock: before = len(_usecases) _usecases[:] = [u for u in _usecases if u["id"] != uid] if len(_usecases) < before: if _active_usecase_id == uid: _active_usecase_id = _usecases[0]["id"] if _usecases else "" new_active_uc = _usecases[0] if _usecases else None save_usecases() log(f"Usecase deleted: {uid}") else: return jsonify({"ok": False, "error": "not found"}), 404 # Apply the new active usecase OUTSIDE the lock to avoid deadlock if new_active_uc: apply_usecase(new_active_uc, silent=True) return jsonify({"ok": True}) @flask_app.route("/api/usecases//activate", methods=["POST"]) def api_activate_usecase(uid): uc = get_usecase(uid) if not uc: return jsonify({"ok": False, "error": "not found"}), 404 apply_usecase(uc) if get_setting("startup_toast", True): send_toast(f"Switched to '{uc['name']}'") return jsonify({"ok": True, "active": uid}) @flask_app.route("/api/usecases//open_folder", methods=["POST"]) def api_open_usecase_folder(uid): uc = get_usecase(uid) if not uc: return jsonify({"ok": False, "error": "not found"}), 404 d = usecase_dir(uc) d.mkdir(parents=True, exist_ok=True) if os.name == "nt": os.startfile(str(d)) return jsonify({"ok": True, "path": str(d)}) # ── API: State ──────────────────────────────────────────────────────────────── @flask_app.route("/api/state") def api_state(): with _entries_lock: entries_data = [{"text": e.text, "weight": e.weight, "trigger_time": e.trigger_time} for e in _entries] with _log_lock: log_data = list(_log) with _pwa_clients_lock: pwa_count = len(_pwa_clients) uc = active_usecase() return jsonify({ "app_name": APP_NAME, # active usecase name (or settings name if none) "settings_app_name": get_setting("app_name", "NotifyPulse"), # always the settings @name "version": VERSION, "paused": paused, "next_fire_at": _next_fire_at, "entries": entries_data, "log": log_data, "pwa_clients": pwa_count, "settings": _settings, "active_usecase": _active_usecase_id, "usecase": uc, }) @flask_app.route("/api/pause", methods=["POST"]) def api_pause(): toggle_pause() return jsonify({"paused": paused}) @flask_app.route("/api/fire_now", methods=["POST"]) def api_fire_now(): with _entries_lock: entries = list(_entries) chosen = pick_weighted(entries) if chosen: threading.Thread(target=fire_entry, args=(chosen,), daemon=True).start() return jsonify({"ok": True, "fired": chosen.text}) return jsonify({"ok": False, "error": "No entries"}) @flask_app.route("/api/settings", methods=["GET", "POST"]) def api_settings(): if request.method == "GET": return jsonify(_settings) data = request.get_json(silent=True) or {} _settings.update({k: v for k, v in data.items() if k in DEFAULT_SETTINGS}) save_settings() return jsonify({"ok": True}) @flask_app.route("/api/log") def api_log(): limit = min(int(request.args.get("limit", 100)), 200) with _log_lock: return jsonify(list(_log)[:limit]) @flask_app.route("/api/test_notification", methods=["POST"]) def api_test_notif(): data = request.get_json(silent=True) or {} send_toast(data.get("message", "Test from NotifyPulse V3!")) return jsonify({"ok": True}) @flask_app.route("/api/test_wallpaper", methods=["POST"]) def api_test_wallpaper(): enqueue_bg(do_wallpaper_change) return jsonify({"ok": True}) @flask_app.route("/api/test_overlay", methods=["POST"]) def api_test_overlay(): enqueue_bg(do_screen_overlay) return jsonify({"ok": True}) @flask_app.route("/api/test_mobile_wallpaper", methods=["POST"]) def api_test_mobile_wallpaper(): enqueue_bg(do_mobile_wallpaper) return jsonify({"ok": True}) # ── API: PWA ────────────────────────────────────────────────────────────────── @flask_app.route("/api/pwa/ping", methods=["POST"]) def pwa_ping(): data = request.get_json(silent=True) or {} cid = data.get("client_id", "") if not cid: return jsonify({"error": "no client_id"}), 400 ua = request.headers.get("User-Agent", "") with _pwa_clients_lock: _pwa_clients[cid] = {"last_seen": time.time(), "user_agent": ua} cutoff = time.time() - 60 expired = [k for k, v in _pwa_clients.items() if v["last_seen"] < cutoff] for k in expired: del _pwa_clients[k] return jsonify({"ok": True, "clients": len(_pwa_clients)}) @flask_app.route("/api/pwa/wallpaper", methods=["GET"]) def pwa_wallpaper_poll(): cid = request.args.get("client_id", "") if not cid: return jsonify({"pending": False}) with _pending_lock: cmd = _pending_wallpapers.pop(cid, None) or _pending_wallpapers.pop("__broadcast__", None) if not cmd: return jsonify({"pending": False}) result = {"pending": True} for key in ("lockscreen", "background"): if cmd.get(key): uri = _img_to_datauri(cmd[key]) if uri: result[key] = uri return jsonify(result) @flask_app.route("/api/pwa/app_name") def pwa_app_name(): with _usecases_lock: usecases_for_pwa = [ {"id": u["id"], "name": u["name"], "color": u.get("color","#4f8ef7"), "blocks": u.get("blocks", []), "min_interval": u.get("min_interval",10), "max_interval": u.get("max_interval",30)} for u in _usecases ] resp = jsonify({ "app_name": APP_NAME, "settings_app_name": get_setting("app_name", "NotifyPulse"), "active": _active_usecase_id, "usecases": usecases_for_pwa, "version": VERSION, "blocks": ALL_BLOCKS, }) resp.headers["Cache-Control"] = "no-store" return resp @flask_app.route("/api/pwa/splash_image") def pwa_splash_image(): # Serve main.png from APPDATA_DIR if it exists for name in ("main.png", "main.jpg", "main.jpeg", "main.webp"): p = APPDATA_DIR / name if p and p.exists(): uri = _img_to_datauri(str(p)) return jsonify({"image": uri or "", "has_custom": True}) return jsonify({"image": "", "has_custom": False}) @flask_app.route("/api/pwa/trigger_wallpaper", methods=["POST"]) def pwa_trigger_wallpaper(): enqueue_bg(do_mobile_wallpaper) return jsonify({"ok": True}) @flask_app.route("/api/pwa/activate_usecase", methods=["POST"]) def pwa_activate_usecase(): data = request.get_json(silent=True) or {} uid = data.get("usecase_id", "") uc = get_usecase(uid) if not uc: return jsonify({"ok": False, "error": "not found"}), 404 apply_usecase(uc) return jsonify({"ok": True, "name": uc["name"]}) # ── Static routes ───────────────────────────────────────────────────────────── @flask_app.route("/pwa") @flask_app.route("/pwa/") def pwa_index(): p = PWA_DIR / "index.html" if p.exists(): return send_file(p) return "PWA files not found.", 404 @flask_app.route("/pwa/") def pwa_static(filename): p = PWA_DIR / filename if p.exists() and p.is_file(): return send_file(p) abort(404) @flask_app.route("/") def desktop_index(): p = UI_DIR / "index.html" if p.exists(): return send_file(p) return "UI files not found.", 404 def run_flask(): flask_app.run(host="0.0.0.0", port=WEB_PORT, debug=False, use_reloader=False, threaded=True) # ── Main ────────────────────────────────────────────────────────────────────── def main(): global APP_NAME, APP_ID, APPDATA_DIR, USECASES_FILE, SETTINGS_FILE, ICON_FILE global _default_wallpaper, root_instance, _active_usecase_id from pathlib import Path base = Path(os.environ.get("APPDATA", Path.home() / "AppData" / "Roaming")) APPDATA_DIR = base / "NotifyPulse" APPDATA_DIR.mkdir(parents=True, exist_ok=True) (APPDATA_DIR / "usecases").mkdir(exist_ok=True) USECASES_FILE = APPDATA_DIR / "usecases.json" SETTINGS_FILE = APPDATA_DIR / "settings.json" ICON_FILE = APPDATA_DIR / "icon.png" load_settings() load_usecases() _default_wallpaper = get_current_wallpaper() # Use the settings app_name as the heading until a usecase is chosen APP_NAME = get_setting("app_name", "NotifyPulse") APP_ID = re.sub(r"[^A-Za-z0-9]", "", APP_NAME) or "NotifyPulse" # Do NOT auto-activate any usecase — user must pick one via UI/PWA # (active_usecase in settings is intentionally not restored on startup) try: hotkey = get_setting("hotkey", "F13") keyboard.add_hotkey(hotkey, toggle_pause, suppress=True) except Exception as e: log(f"Hotkey error: {e}", "warn") root_instance = tk.Tk() root_instance.withdraw() threading.Thread(target=notification_loop, daemon=True).start() threading.Thread(target=schedule_runner, daemon=True).start() threading.Thread(target=run_flask, daemon=True).start() log(f"NotifyPulse V{VERSION} started") if get_setting("startup_toast", True): send_toast(f"V{VERSION} running! Web UI → localhost:{WEB_PORT}") if get_setting("auto_open_browser", True): threading.Timer(1.5, lambda: webbrowser.open(f"http://localhost:{WEB_PORT}")).start() threading.Thread(target=run_tray, daemon=True).start() root_instance.mainloop() if __name__ == "__main__": main()