# NotifyPulse V2 - Rewritten from the ground up # Config: AppData/Roaming/NotifyPulse/ # Web UI: http://localhost:5000 # PWA: http://:5000/pwa from __future__ import annotations from queue import Empty, Queue import sys, os, ctypes, ctypes.wintypes, threading, time, random, re, json import webbrowser, shutil, hashlib, base64, mimetypes import subprocess from datetime import datetime from pathlib import Path from collections import deque # ── Dependency check ────────────────────────────────────────────────────────── try: import keyboard from winotify import Notification, audio import pystray from PIL import Image, ImageDraw, ImageTk import schedule import tkinter as tk from flask import Flask, jsonify, request, send_file, abort, Response from flask_cors import CORS except ImportError as e: import tkinter as tk from tkinter import messagebox root = tk.Tk(); root.withdraw() messagebox.showerror("Missing dependency", f"Run: pip install -r requirements.txt\n\nMissing: {e}") sys.exit(1) # ── Constants ───────────────────────────────────────────────────────────────── HOTKEY = "F13" WEB_PORT = 5000 DEFAULT_APP_NAME = "NotifyPulse" WALLPAPER_MAGIC = "change.wallpaper" OVERLAY_MAGIC = "show.overlay" VERSION = "2.0.0" MOBILE_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".webp", ".heic", ".heif") SPI_SETDESKWALLPAPER = 0x0014 SPIF_UPDATEINIFILE = 0x01 SPIF_SENDCHANGE = 0x02 # ── Paths (set in main) ─────────────────────────────────────────────────────── APPDATA_DIR: Path | None = None NOTIFICATIONS_FILE: Path | None = None ICON_FILE: Path | None = None WALLPAPERS_DIR: Path | None = None OVERLAY_DIR: Path | None = None MOBILE_DIR: Path | None = None # for PWA lockscreen/background images SETTINGS_FILE: Path | None = None # ── Global state ────────────────────────────────────────────────────────────── paused: bool = False notification_queue: list = [] state_lock = threading.Lock() tray_icon = None root_instance = None APP_NAME: str = DEFAULT_APP_NAME APP_ID: str = "NotifyPulse" MIN_INTERVAL_SEC: int = 10 * 60 MAX_INTERVAL_SEC: int = 30 * 60 _entries: list = [] _entries_lock = threading.Lock() _file_mtime: float = 0.0 _next_fire_at: float = 0.0 _active_overlays: list = [] _overlays_lock = threading.Lock() _default_wallpaper: str = "" _last_wallpaper: str = "" _overlay_stretch: bool = False # Connected PWA clients: {client_id: {"last_seen": float, "user_agent": str}} _pwa_clients: dict = {} _pwa_clients_lock = threading.Lock() # Pending mobile wallpaper commands: {client_id: {"lockscreen": path, "background": path}} _pending_wallpapers: dict = {} _pending_lock = threading.Lock() # Log ring buffer _log: deque = deque(maxlen=200) _log_lock = threading.Lock() # Settings cache (persisted to settings.json) _settings: dict = {} _notification_wake = threading.Event() def _ps_quote_literal(path: Path) -> str: return str(path).replace("'", "''") def _ps_read_file_utf8(path: Path) -> str | None: if os.name != "nt": return None quoted = _ps_quote_literal(path) cmd = ( "$ErrorActionPreference='Stop';" f"$p='{quoted}';" "if (Test-Path -LiteralPath $p) {" " $b=[IO.File]::ReadAllBytes($p);" " [Console]::Out.Write('1:' + [Convert]::ToBase64String($b));" "} else {" " [Console]::Out.Write('0:');" "}" ) try: out = subprocess.check_output( ["powershell", "-NoProfile", "-Command", cmd], encoding="ascii", errors="ignore", ).strip() if not out.startswith("1:"): return None payload = out[2:] if payload == "": return "" return base64.b64decode(payload).decode("utf-8") except Exception: return None def _ps_write_file_utf8(path: Path, content: str) -> bool: if os.name != "nt": return False quoted = _ps_quote_literal(path) payload = base64.b64encode(content.encode("utf-8")).decode("ascii") cmd = ( "$ErrorActionPreference='Stop';" f"$p='{quoted}';" "$d=[IO.Path]::GetDirectoryName($p);" "if ($d) { [IO.Directory]::CreateDirectory($d) | Out-Null };" f"[IO.File]::WriteAllBytes($p,[Convert]::FromBase64String('{payload}'));" ) try: subprocess.check_call( ["powershell", "-NoProfile", "-Command", cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return True except Exception: return False def _real_appdata_dir() -> Path: return Path(os.environ.get("USERPROFILE", str(Path.home()))) / "AppData" / "Roaming" / "NotifyPulse" def notifications_file_exists() -> bool: p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt") if os.name == "nt": quoted = _ps_quote_literal(p) cmd = f"$p='{quoted}'; if (Test-Path -LiteralPath $p) {{ '1' }} else {{ '0' }}" try: out = subprocess.check_output( ["powershell", "-NoProfile", "-Command", cmd], encoding="ascii", errors="ignore", ).strip() return out == "1" except Exception: pass return p.exists() def notifications_file_mtime() -> float: p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt") if os.name == "nt": quoted = _ps_quote_literal(p) cmd = ( "$ErrorActionPreference='Stop';" f"$p='{quoted}';" "if (Test-Path -LiteralPath $p) {" " [Console]::Out.Write((Get-Item -LiteralPath $p).LastWriteTimeUtc.ToFileTimeUtc());" "}" ) try: out = subprocess.check_output( ["powershell", "-NoProfile", "-Command", cmd], encoding="ascii", errors="ignore", ).strip() if out: return float(out) except Exception: pass try: return p.stat().st_mtime except Exception: return 0.0 def read_notifications_text() -> str: p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt") text = _ps_read_file_utf8(p) if text is not None: return text return p.read_text("utf-8") def write_notifications_text(content: str): p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt") if _ps_write_file_utf8(p, content): return p.write_text(content, "utf-8") class BackgroundWorker: def __init__(self): self.queue = Queue() self.thread: threading.Thread | None = None self.lock = threading.Lock() def enqueue(self, fn, *args, **kwargs): self.queue.put((fn, args, kwargs)) self._ensure_thread() def _ensure_thread(self): with self.lock: if self.thread and self.thread.is_alive(): return self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() def _run(self): while True: try: fn, args, kwargs = self.queue.get(timeout=5) except Empty: with self.lock: if self.queue.empty(): self.thread = None return continue try: fn(*args, **kwargs) except Exception as exc: log(f"Background task error: {exc}", "error") finally: self.queue.task_done() _background_worker = BackgroundWorker() def enqueue_background(fn, *args, **kwargs): _background_worker.enqueue(fn, *args, **kwargs) # ── Logging ─────────────────────────────────────────────────────────────────── def log(msg: str, level: str = "info"): ts = datetime.now().strftime("%H:%M:%S") entry = {"time": ts, "msg": msg, "level": level} with _log_lock: _log.appendleft(entry) # ── Settings persistence ────────────────────────────────────────────────────── DEFAULT_SETTINGS = { "overlay_stretch": False, "overlay_opacity": 0.4, "overlay_duration": 6, "startup_toast": True, "hotkey": "F13", "web_port": 5000, "theme": "dark", "notify_sound": True, "mobile_wallpaper_dir": "", # relative sub-folder inside MOBILE_DIR "pwa_token": "", # simple shared secret for PWA auth (empty = no auth) "auto_open_browser": True, "log_maxlen": 200, } def load_settings() -> dict: global _settings if SETTINGS_FILE and SETTINGS_FILE.exists(): try: raw = json.loads(SETTINGS_FILE.read_text("utf-8")) _settings = {**DEFAULT_SETTINGS, **raw} return _settings except Exception: pass _settings = dict(DEFAULT_SETTINGS) return _settings def save_settings(): if SETTINGS_FILE: try: SETTINGS_FILE.write_text(json.dumps(_settings, indent=2), "utf-8") except Exception as e: log(f"Settings save error: {e}", "error") def get_setting(key: str, default=None): return _settings.get(key, DEFAULT_SETTINGS.get(key, default)) # ── AppData setup ───────────────────────────────────────────────────────────── def resolve_appdata_dir() -> Path: """ Always use the real user AppData path so edits in Explorer match the app. If a virtualised APPDATA copy exists and is newer, import it once. """ env_base = Path(os.environ.get("APPDATA", "") or Path.home() / "AppData" / "Roaming") real_base = Path(os.environ.get("USERPROFILE", str(Path.home()))) / "AppData" / "Roaming" env_dir = env_base / "NotifyPulse" real_dir = real_base / "NotifyPulse" chosen = real_dir try: chosen.mkdir(parents=True, exist_ok=True) probe = chosen / ".probe" probe.write_text("ok", "utf-8") probe.unlink(missing_ok=True) # type: ignore[arg-type] # One-time import if the virtualised copy is newer try: src = env_dir / "notifications.txt" dst = chosen / "notifications.txt" if src.exists(): if (not dst.exists()) or src.stat().st_mtime > dst.stat().st_mtime: dst.write_bytes(src.read_bytes()) except Exception: pass return chosen except Exception as exc: root = tk.Tk(); root.withdraw() from tkinter import messagebox messagebox.showerror("NotifyPulse V2", f"Cannot create config folder:\n{chosen}\n\n{exc}") sys.exit(1) # ── Config file ─────────────────────────────────────────────────────────────── class Entry: __slots__ = ("text", "weight", "trigger_time") def __init__(self, text, weight, trigger_time): self.text = text self.weight = weight self.trigger_time = trigger_time def create_default_config(): write_notifications_text( "# NotifyPulse V2 - notifications.txt\n" "# Reloads automatically on save!\n" "#\n" "# @name Your App Name\n" "# @interval 10 30 <- min/max minutes between random notifications\n" "#\n" "# Text message | 30% <- random, picked 30% of the time\n" "# Text message | 14:30 <- fires daily at 14:30\n" "# change.wallpaper | 20% <- changes PC wallpaper randomly\n" "# change.wallpaper.mobile <- triggers mobile wallpaper via PWA\n" "# show.overlay | 10% <- fullscreen image overlay (6s default)\n" "# show.overlay.10 | 10% <- overlay for 10 seconds\n" "\n" "@name NotifyPulse\n" "@interval 10 30\n" "\n" "Take a short break and stretch! | 35%\n" "Drink some water - stay hydrated! | 30%\n" "Check your posture! | 20%\n" "Deep breath - you got this! | 15%\n" "\n" "# change.wallpaper | 20%\n" "# show.overlay | 15%\n" "\n" "Morning standup time! | 09:00\n" "Afternoon focus block | 14:00\n" "End of day - wrap up! | 17:30\n", ) def parse_overlay_duration(text: str) -> int: parts = text.strip().lower().split(".") if len(parts) >= 3: try: return max(1, int(parts[2])) * 1000 except ValueError: pass return int(get_setting("overlay_duration", 6)) * 1000 def is_overlay_entry(text: str) -> bool: t = text.strip().lower() return t == OVERLAY_MAGIC or t.startswith(OVERLAY_MAGIC + ".") def is_mobile_wallpaper(text: str) -> bool: return text.strip().lower() == "change.wallpaper.mobile" def load_config(): log(f"Loading config from {NOTIFICATIONS_FILE}") if not notifications_file_exists(): create_default_config() app_name = DEFAULT_APP_NAME min_sec = 10 * 60 max_sec = 30 * 60 entries = [] for raw in read_notifications_text().splitlines(): line = raw.strip() if not line or line.startswith("#"): continue if line.startswith("@"): parts = line[1:].split() key = parts[0].lower() if parts else "" if key == "name" and len(parts) >= 2: app_name = " ".join(parts[1:]) elif key == "interval" and len(parts) == 3: try: mn = max(1, int(parts[1])) mx = max(mn, int(parts[2])) min_sec = mn * 60 max_sec = mx * 60 except ValueError: pass continue if "|" in line: text, tag = [p.strip() for p in line.split("|", 1)] else: text, tag = line, None if tag and re.match(r"^\d{1,2}:\d{2}$", tag): h, m = tag.split(":") entries.append(Entry(text, None, f"{int(h):02d}:{m}")) elif tag and tag.endswith("%"): try: w = float(tag[:-1]) except ValueError: w = 1.0 entries.append(Entry(text, w, None)) else: entries.append(Entry(text, 1.0, None)) return app_name, min_sec, max_sec, entries def apply_config(app_name, min_sec, max_sec, entries, silent=False): global APP_NAME, APP_ID, MIN_INTERVAL_SEC, MAX_INTERVAL_SEC, _entries, _file_mtime APP_NAME = app_name APP_ID = re.sub(r"[^A-Za-z0-9]", "", app_name) or "NotifyPulse" MIN_INTERVAL_SEC = min_sec MAX_INTERVAL_SEC = max_sec with _entries_lock: _entries = entries schedule.clear() for entry in (e for e in entries if e.trigger_time): def make_job(e): def job(): fire_entry(e) return job schedule.every().day.at(entry.trigger_time).do(make_job(entry)) _file_mtime = notifications_file_mtime() update_tray_title() _notification_wake.set() if not silent: log(f"Config reloaded — {min_sec//60}–{max_sec//60}min, {len(entries)} entries") # ── File watcher ────────────────────────────────────────────────────────────── class ConfigFileMonitor: def __init__(self, path: Path, on_change, min_interval=1.0, max_interval=4.0): self.path = path self.on_change = on_change self.min_interval = min_interval self.max_interval = max_interval self._interval = min_interval self._lock = threading.Lock() self._timer: threading.Timer | None = None self._active = False def start(self): with self._lock: if self._active: return self._active = True self._interval = self.min_interval self._schedule(self._interval) def stop(self): with self._lock: self._active = False if self._timer: self._timer.cancel() self._timer = None def _schedule(self, delay): timer = threading.Timer(delay, self._run) timer.daemon = True self._timer = timer timer.start() def _run(self): current = notifications_file_mtime() changed = current is not None and current != _file_mtime if changed: self._interval = self.min_interval try: self.on_change() except Exception as exc: log("Config watcher error: {exc}", "error") else: self._interval = min(self.max_interval, self._interval * 2) with self._lock: if self._active: self._schedule(self._interval) def _reload_config_if_changed(): mtime = notifications_file_mtime() if mtime == _file_mtime: return app_name, min_sec, max_sec, entries = load_config() apply_config(app_name, min_sec, max_sec, entries) send_toast("Config reloaded!") _config_monitor: ConfigFileMonitor | None = None # ── Wallpaper ───────────────────────────────────────────────────────────────── def get_current_wallpaper() -> str: try: import winreg key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop") val, _ = winreg.QueryValueEx(key, "WallPaper") winreg.CloseKey(key) return val except Exception: return "" def set_wallpaper(path: str): ctypes.windll.user32.SystemParametersInfoW( SPI_SETDESKWALLPAPER, 0, path, SPIF_UPDATEINIFILE | SPIF_SENDCHANGE) def do_wallpaper_change(): global _last_wallpaper if not WALLPAPERS_DIR or not WALLPAPERS_DIR.exists(): log("Wallpaper folder missing", "warn") return files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp") for f in WALLPAPERS_DIR.glob(ext)] if not files: log("No wallpaper images found", "warn") return wp = str(random.choice(files)) _last_wallpaper = wp set_wallpaper(wp) log(f"Wallpaper → {Path(wp).name}") # ── Mobile wallpaper (PWA push) ─────────────────────────────────────────────── def _mobile_base_dir() -> Path: base = MOBILE_DIR sub = str(get_setting("mobile_wallpaper_dir", "")).strip() if sub: candidate = (MOBILE_DIR / sub).resolve() if candidate.exists(): base = candidate else: log(f"Mobile subfolder missing: {candidate}", "warn") return base def _list_mobile_images(folder: Path) -> list[Path]: files = [] for ext in MOBILE_EXTS: files.extend(folder.glob(f"*{ext}")) return [p for p in files if p.is_file()] def _copy_for_send(src: Path, prefix: str, cache_dir: Path) -> Path: cache_dir.mkdir(exist_ok=True) for old in cache_dir.glob(f"{prefix}_*"): try: old.unlink() except Exception: pass ext = src.suffix if src.suffix else ".jpg" rand = random.randint(1000, 9999) dest = cache_dir / f"{prefix}_{rand}{ext.lower()}" shutil.copyfile(src, dest) return dest def do_mobile_wallpaper(): """Queue a wallpaper push for all connected PWA clients.""" if not MOBILE_DIR or not MOBILE_DIR.exists(): log("Mobile wallpaper dir missing", "warn") return base_dir = _mobile_base_dir() if not base_dir.exists(): log("Mobile wallpaper dir missing", "warn") return images = _list_mobile_images(base_dir) # Keep support for explicitly named pairs, otherwise pick two random files. ls_candidates = [p for p in images if p.stem.lower() == "lockscreen"] bg_candidates = [p for p in images if p.stem.lower() == "background"] if not ls_candidates or not bg_candidates: unique_images = images.copy() if len(unique_images) >= 2: chosen = random.sample(unique_images, 2) elif unique_images: chosen = [unique_images[0], unique_images[0]] else: log("No images in Mobile folder to send", "warn") return if not ls_candidates: ls_candidates = [chosen[0]] if not bg_candidates: bg_candidates = [chosen[1]] cache_dir = MOBILE_DIR / "_send_cache" ls_path = _copy_for_send(random.choice(ls_candidates), "Lockscreen", cache_dir) bg_path = _copy_for_send(random.choice(bg_candidates), "Background", cache_dir) with _pwa_clients_lock: client_ids = list(_pwa_clients.keys()) if not client_ids: log("No PWA clients connected — wallpaper queued for next connection", "info") with _pending_lock: for cid in client_ids if client_ids else ["__broadcast__"]: _pending_wallpapers[cid] = { "lockscreen": str(ls_path), "background": str(bg_path), } log(f"Mobile wallpaper queued from {base_dir} → {len(client_ids)} client(s)") # ── Screen Overlay ───────────────────────────────────────────────────────────── def _get_monitors(): monitors = [] try: MonitorEnumProc = ctypes.WINFUNCTYPE( ctypes.c_bool, ctypes.c_ulong, ctypes.c_ulong, ctypes.POINTER(ctypes.wintypes.RECT), ctypes.c_double) def _cb(hMon, hdcMon, lprcMon, dwData): r = lprcMon.contents monitors.append((r.left, r.top, r.right - r.left, r.bottom - r.top)) return True ctypes.windll.user32.EnumDisplayMonitors(None, None, MonitorEnumProc(_cb), 0) except Exception: pass return monitors or [None] def do_screen_overlay(duration_ms: int = 6000): if not OVERLAY_DIR or not OVERLAY_DIR.exists(): log("Overlay folder missing", "warn") return files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp") for f in OVERLAY_DIR.glob(ext)] if not files: log("No overlay images found", "warn") return img_path = str(random.choice(files)) monitors = _get_monitors() opacity = float(get_setting("overlay_opacity", 0.4)) log(f"Overlay: {Path(img_path).name} — {duration_ms//1000}s on {len(monitors)} screen(s)") def _make_tk_image(pil_img, sw, sh): if _overlay_stretch: return ImageTk.PhotoImage(pil_img.resize((sw, sh), Image.Resampling.LANCZOS)) img_w, img_h = pil_img.size scale = min(sw / img_w, sh / img_h) nw, nh = int(img_w * scale), int(img_h * scale) resized = pil_img.resize((nw, nh), Image.Resampling.LANCZOS) canvas = Image.new("RGB", (sw, sh), "black") canvas.paste(resized, ((sw - nw) // 2, (sh - nh) // 2)) return ImageTk.PhotoImage(canvas) def create_overlay(): try: pil_img = Image.open(img_path) except Exception as e: log(f"Overlay open error: {e}", "error") return with state_lock: is_paused = paused for mon in monitors: if mon is None: tmp = tk.Toplevel(); tmp.withdraw(); tmp.update_idletasks() sw, sh, mx, my = tmp.winfo_screenwidth(), tmp.winfo_screenheight(), 0, 0 tmp.destroy() else: mx, my, sw, sh = mon ov = tk.Toplevel() ov.attributes("-topmost", True) ov.attributes("-alpha", opacity) ov.overrideredirect(True) ov.update_idletasks() # Click-through hwnd = ctypes.windll.user32.GetAncestor(ov.winfo_id(), 2) or ov.winfo_id() style = ctypes.windll.user32.GetWindowLongW(hwnd, -20) ctypes.windll.user32.SetWindowLongW(hwnd, -20, style | 0x80000 | 0x20) ov.geometry(f"{sw}x{sh}+{mx}+{my}") try: tk_img = _make_tk_image(pil_img, sw, sh) lbl = tk.Label(ov, image=tk_img, bg="black") lbl.image = tk_img lbl.pack(fill="both", expand=True) except Exception as e: log(f"Overlay render error: {e}", "error") ov.destroy() continue entry = {"window": ov, "remaining_ms": duration_ms, "after_id": None, "started_at": time.monotonic()} def on_dismiss(e=entry, w=ov): with _overlays_lock: if e in _active_overlays: _active_overlays.remove(e) if w.winfo_exists(): w.destroy() def schedule_dismiss(e=entry, w=ov): e["started_at"] = time.monotonic() e["after_id"] = w.after(e["remaining_ms"], lambda: on_dismiss(e, w)) entry["schedule_dismiss"] = schedule_dismiss with _overlays_lock: _active_overlays.append(entry) if is_paused: ov.withdraw() else: schedule_dismiss() root_instance.after(0, create_overlay) def pause_overlays(): def _do(): with _overlays_lock: overlays = list(_active_overlays) for e in overlays: ov = e["window"] if not ov.winfo_exists(): continue if e["after_id"] is not None: try: ov.after_cancel(e["after_id"]) except Exception: pass e["remaining_ms"] = max(0, e["remaining_ms"] - int((time.monotonic() - e["started_at"]) * 1000)) e["after_id"] = None ov.withdraw() root_instance.after(0, _do) def resume_overlays(): def _do(): with _overlays_lock: overlays = list(_active_overlays) for e in overlays: ov = e["window"] if not ov.winfo_exists(): with _overlays_lock: if e in _active_overlays: _active_overlays.remove(e) continue if e["remaining_ms"] <= 0: ov.destroy() with _overlays_lock: if e in _active_overlays: _active_overlays.remove(e) continue ov.deiconify() e["schedule_dismiss"]() root_instance.after(0, _do) # ── Icon ────────────────────────────────────────────────────────────────────── def make_default_icon(size=256): img = Image.new("RGBA", (size, size), (0, 0, 0, 0)) d = ImageDraw.Draw(img) s = size / 64 d.ellipse([8*s, 20*s, 56*s, 56*s], fill="#4f8ef7") d.polygon([(32*s, 4*s), (16*s, 24*s), (48*s, 24*s)], fill="#4f8ef7") d.rectangle([24*s, 56*s, 40*s, 62*s], fill="#4f8ef7") return img def load_tray_icon(): if ICON_FILE and ICON_FILE.exists(): try: return Image.open(ICON_FILE).convert("RGBA") except Exception: pass return make_default_icon() # ── Toast ───────────────────────────────────────────────────────────────────── def send_toast(message: str, title: str | None = None): icon_path = "" if ICON_FILE and ICON_FILE.exists(): icon_path = str(ICON_FILE) elif APPDATA_DIR and (APPDATA_DIR / "icon.ico").exists(): icon_path = str(APPDATA_DIR / "icon.ico") t = title or APP_NAME notif = Notification(app_id=APP_ID, title=t, msg=message, duration="short", icon=icon_path) if get_setting("notify_sound", True): notif.set_audio(audio.Default, loop=False) notif.show() log(f"Toast: {message}") # ── Notification logic ──────────────────────────────────────────────────────── def pick_weighted(entries): weighted = [e for e in entries if e.weight is not None] if not weighted: return None return random.choices(weighted, weights=[e.weight for e in weighted], k=1)[0] def fire_entry(entry: Entry): text = entry.text.strip() with state_lock: if paused: notification_queue.append(text) log(f"Queued (paused): {text}") return _dispatch(text) def _dispatch(text: str): tl = text.lower() if tl == WALLPAPER_MAGIC: enqueue_background(do_wallpaper_change) elif is_mobile_wallpaper(tl): enqueue_background(do_mobile_wallpaper) elif is_overlay_entry(tl): dur = parse_overlay_duration(tl) enqueue_background(do_screen_overlay, dur) else: send_toast(text) def flush_queued(queued: list): for item in queued: _dispatch(item) time.sleep(1.2) def notification_loop(): global _next_fire_at while True: with _entries_lock: entries = list(_entries) chosen = pick_weighted(entries) if chosen: fire_entry(chosen) interval = random.randint(MIN_INTERVAL_SEC, MAX_INTERVAL_SEC) _next_fire_at = time.time() + interval log(f"Next in {interval//60}m {interval%60}s") while True: remaining = _next_fire_at - time.time() if remaining <= 0: break if _notification_wake.is_set(): _notification_wake.clear() break wait_time = min(remaining, 5) triggered = _notification_wake.wait(wait_time) if triggered: _notification_wake.clear() break def schedule_runner(): while True: schedule.run_pending() time.sleep(10) # ── Pause / Resume ──────────────────────────────────────────────────────────── def toggle_pause(): global paused queued = [] with state_lock: paused = not paused if not paused: queued = notification_queue.copy() notification_queue.clear() if paused: log("Paused") pause_overlays() if _default_wallpaper: threading.Thread(target=set_wallpaper, args=(_default_wallpaper,), daemon=True).start() else: log("Resumed") resume_overlays() if _last_wallpaper: threading.Thread(target=set_wallpaper, args=(_last_wallpaper,), daemon=True).start() threading.Thread(target=flush_queued, args=(queued,), daemon=True).start() update_tray_title() # ── Tray ────────────────────────────────────────────────────────────────────── def update_tray_title(): if tray_icon: tray_icon.title = f"{APP_NAME} — {'PAUSED' if paused else 'Running'}" tray_icon.update_menu() def build_tray_menu(): def status_text(item): hotkey = get_setting("hotkey", HOTKEY) state = "Paused" if paused else "Running" return f"{state} (Hotkey: {hotkey})" return pystray.Menu( pystray.MenuItem(status_text, lambda i, it: toggle_pause()), pystray.MenuItem("Open Web UI", lambda i, it: webbrowser.open(f"http://localhost:{WEB_PORT}")), pystray.Menu.SEPARATOR, pystray.MenuItem("Quit", lambda i, it: (i.stop(), os._exit(0))), ) def run_tray(): global tray_icon tray_icon = pystray.Icon(APP_NAME, load_tray_icon(), f"{APP_NAME} — Running", menu=build_tray_menu()) tray_icon.run() # ── Flask API ───────────────────────────────────────────────────────────────── flask_app = Flask(__name__, static_folder=None) CORS(flask_app, resources={r"/api/*": {"origins": "*"}, r"/pwa*": {"origins": "*"}}) # Silence werkzeug import logging as _logging _logging.getLogger("werkzeug").setLevel(_logging.ERROR) # ─ Helper: image to base64 data URI ────────────────────────────────────────── def _img_to_datauri(path: str) -> str | None: try: p = Path(path) mime = mimetypes.guess_type(p.name)[0] or "image/jpeg" data = base64.b64encode(p.read_bytes()).decode() return f"data:{mime};base64,{data}" except Exception: return None # ─ PWA heartbeat / client tracking ─────────────────────────────────────────── @flask_app.route("/api/pwa/ping", methods=["POST"]) def pwa_ping(): data = request.get_json(silent=True) or {} cid = data.get("client_id", "") if not cid: return jsonify({"error": "no client_id"}), 400 ua = request.headers.get("User-Agent", "") with _pwa_clients_lock: _pwa_clients[cid] = {"last_seen": time.time(), "user_agent": ua} # Expire old clients (> 60s) cutoff = time.time() - 60 with _pwa_clients_lock: expired = [k for k, v in _pwa_clients.items() if v["last_seen"] < cutoff] for k in expired: del _pwa_clients[k] return jsonify({"ok": True, "clients": len(_pwa_clients)}) # ─ Mobile wallpaper poll ───────────────────────────────────────────────────── @flask_app.route("/api/pwa/wallpaper", methods=["GET"]) def pwa_wallpaper_poll(): """PWA polls this; returns base64 images if a wallpaper change was queued.""" cid = request.args.get("client_id", "") if not cid: return jsonify({"pending": False}) with _pending_lock: # Check specific client first, then broadcast cmd = _pending_wallpapers.pop(cid, None) or _pending_wallpapers.pop("__broadcast__", None) if not cmd: return jsonify({"pending": False}) result = {"pending": True} if cmd.get("lockscreen"): uri = _img_to_datauri(cmd["lockscreen"]) if uri: result["lockscreen"] = uri if cmd.get("background"): uri = _img_to_datauri(cmd["background"]) if uri: result["background"] = uri log(f"Served mobile wallpaper to PWA client {cid[:8]}…") return jsonify(result) # ─ PWA splash image ──────────────────────────────────────────────────────────────── @flask_app.route("/api/pwa/splash_image") def pwa_splash_image(): try: base_dir = _mobile_base_dir() if not base_dir.exists(): return jsonify({"image": ""}) images = _list_mobile_images(base_dir) if not images: return jsonify({"image": ""}) chosen = random.choice(images) datauri = _img_to_datauri(str(chosen)) return jsonify({"image": datauri or ""}) except Exception as exc: log(f"Splash image error: {exc}", "warn") return jsonify({"image": ""}) # ─ Trigger mobile wallpaper from PWA UI ───────────────────────────────────── @flask_app.route("/api/pwa/trigger_wallpaper", methods=["POST"]) def pwa_trigger_wallpaper(): enqueue_background(do_mobile_wallpaper) return jsonify({"ok": True}) # ─ State API ───────────────────────────────────────────────────────────────── @flask_app.route("/api/pwa/app_name") def pwa_app_name(): """Lightweight endpoint so the PWA can render the app name immediately.""" resp = jsonify({"app_name": APP_NAME}) resp.headers["Cache-Control"] = "no-store" return resp @flask_app.route("/api/state") def api_state(): with _entries_lock: entries_data = [{"text": e.text, "weight": e.weight, "trigger_time": e.trigger_time} for e in _entries] with _log_lock: log_data = list(_log) with _pwa_clients_lock: pwa_count = len(_pwa_clients) return jsonify({ "app_name": APP_NAME, "version": VERSION, "paused": paused, "min_interval": MIN_INTERVAL_SEC // 60, "max_interval": MAX_INTERVAL_SEC // 60, "next_fire_at": _next_fire_at, "entries": entries_data, "log": log_data, "overlay_stretch": _overlay_stretch, "pwa_clients": pwa_count, "settings": _settings, "config_path": str(NOTIFICATIONS_FILE), }) @flask_app.route("/api/pause", methods=["POST"]) def api_pause(): toggle_pause() return jsonify({"paused": paused}) @flask_app.route("/api/fire_now", methods=["POST"]) def api_fire_now(): """Force fire the next random notification immediately.""" with _entries_lock: entries = list(_entries) chosen = pick_weighted(entries) if chosen: threading.Thread(target=fire_entry, args=(chosen,), daemon=True).start() return jsonify({"ok": True, "fired": chosen.text}) return jsonify({"ok": False, "error": "No entries"}) @flask_app.route("/api/test_notification", methods=["POST"]) def api_test_notif(): data = request.get_json(silent=True) or {} msg = data.get("message", "Test notification from NotifyPulse!") send_toast(msg) return jsonify({"ok": True}) @flask_app.route("/api/test_wallpaper", methods=["POST"]) def api_test_wallpaper(): enqueue_background(do_wallpaper_change) return jsonify({"ok": True}) @flask_app.route("/api/test_overlay", methods=["POST"]) def api_test_overlay(): enqueue_background(do_screen_overlay) return jsonify({"ok": True}) @flask_app.route("/api/test_mobile_wallpaper", methods=["POST"]) def api_test_mobile_wallpaper(): enqueue_background(do_mobile_wallpaper) return jsonify({"ok": True}) @flask_app.route("/api/settings", methods=["GET", "POST"]) def api_settings(): global MIN_INTERVAL_SEC, MAX_INTERVAL_SEC, APP_NAME, _overlay_stretch, _next_fire_at if request.method == "GET": return jsonify(_settings) try: data = request.get_json(silent=True) or {} changed = False if "name" in data: name = data["name"].strip() if name: APP_NAME = name _settings["name"] = name changed = True if "min_interval" in data or "max_interval" in data: mn = max(1, int(data.get("min_interval", MIN_INTERVAL_SEC // 60))) mx = max(mn, int(data.get("max_interval", MAX_INTERVAL_SEC // 60))) MIN_INTERVAL_SEC = mn * 60 MAX_INTERVAL_SEC = mx * 60 _settings["min_interval"] = mn _settings["max_interval"] = mx interval = random.randint(MIN_INTERVAL_SEC, MAX_INTERVAL_SEC) _next_fire_at = time.time() + interval log(f"Interval → {mn}–{mx}min, timer reset") _notification_wake.set() changed = True # Boolean/generic settings passthrough for key in ("overlay_stretch", "overlay_opacity", "overlay_duration", "startup_toast", "hotkey", "theme", "notify_sound", "auto_open_browser", "pwa_token"): if key in data: val = data[key] _settings[key] = val if key == "overlay_stretch": _overlay_stretch = bool(val) changed = True # Persist to notifications.txt if name/interval changed if "name" in data or "min_interval" in data or "max_interval" in data: _patch_notifications_file( _settings.get("name", APP_NAME), MIN_INTERVAL_SEC // 60, MAX_INTERVAL_SEC // 60) if changed: save_settings() update_tray_title() return jsonify({"ok": True}) except Exception as ex: return jsonify({"ok": False, "error": str(ex)}), 500 def _patch_notifications_file(name: str, min_m: int, max_m: int): try: lines = read_notifications_text().splitlines() new_lines = [] found_name = found_interval = False for line in lines: s = line.strip() if s.startswith("@name"): new_lines.append(f"@name {name}"); found_name = True elif s.startswith("@interval"): new_lines.append(f"@interval {min_m} {max_m}"); found_interval = True else: new_lines.append(line) if not found_name: new_lines.insert(0, f"@name {name}") if not found_interval: new_lines.insert(1, f"@interval {min_m} {max_m}") write_notifications_text("\n".join(new_lines) + "\n") except Exception as e: log(f"Patch config error: {e}", "error") @flask_app.route("/api/entries", methods=["GET", "POST"]) def api_entries(): """Read or replace the raw notifications.txt content.""" if request.method == "GET": try: return jsonify({"content": read_notifications_text()}) except Exception as e: return jsonify({"error": str(e)}), 500 else: data = request.get_json(silent=True) or {} content = data.get("content", "") try: write_notifications_text(content) return jsonify({"ok": True}) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 @flask_app.route("/api/log") def api_log(): limit = min(int(request.args.get("limit", 100)), 200) with _log_lock: return jsonify(list(_log)[:limit]) # ─ PWA static files ────────────────────────────────────────────────────────── PWA_DIR = Path(__file__).parent / "pwa" @flask_app.route("/pwa") @flask_app.route("/pwa/") def pwa_index(): p = PWA_DIR / "index.html" if p.exists(): return send_file(p) return "PWA files not found. Place them in the pwa/ folder.", 404 @flask_app.route("/pwa/") def pwa_static(filename): p = PWA_DIR / filename if p.exists() and p.is_file(): return send_file(p) abort(404) # Desktop web UI served at / @flask_app.route("/") def desktop_index(): p = Path(__file__).parent / "ui" / "index.html" if p.exists(): return send_file(p) return INLINE_HTML # fallback inline # ─ Inline fallback HTML (minified at bottom) ───────────────────────────────── INLINE_HTML = "" # filled after def run_flask(): flask_app.run(host="0.0.0.0", port=WEB_PORT, debug=False, use_reloader=False, threaded=True) # ── Main ────────────────────────────────────────────────────────────────────── def main(): global APP_NAME, APP_ID, APPDATA_DIR, NOTIFICATIONS_FILE, ICON_FILE global WALLPAPERS_DIR, OVERLAY_DIR, MOBILE_DIR, SETTINGS_FILE global _default_wallpaper, _last_wallpaper, _file_mtime, _next_fire_at, _config_monitor global root_instance, _overlay_stretch APPDATA_DIR = resolve_appdata_dir() log(f"Using config dir: {APPDATA_DIR}") NOTIFICATIONS_FILE = APPDATA_DIR / "notifications.txt" ICON_FILE = APPDATA_DIR / "icon.png" WALLPAPERS_DIR = APPDATA_DIR / "wallpapers" OVERLAY_DIR = APPDATA_DIR / "Overlay" MOBILE_DIR = APPDATA_DIR / "Mobile" SETTINGS_FILE = APPDATA_DIR / "settings.json" for d in (WALLPAPERS_DIR, OVERLAY_DIR, MOBILE_DIR): d.mkdir(exist_ok=True) load_settings() _overlay_stretch = get_setting("overlay_stretch", False) _default_wallpaper = get_current_wallpaper() app_name, min_sec, max_sec, entries = load_config() apply_config(app_name, min_sec, max_sec, entries, silent=True) _next_fire_at = time.time() + random.randint(min_sec, max_sec) _config_monitor = ConfigFileMonitor(NOTIFICATIONS_FILE, _reload_config_if_changed) _config_monitor.start() # Hotkey try: hotkey = get_setting("hotkey", "F13") keyboard.add_hotkey(hotkey, toggle_pause, suppress=True) except Exception as e: log(f"Hotkey error: {e}", "warn") root_instance = tk.Tk() root_instance.withdraw() threading.Thread(target=notification_loop, daemon=True).start() threading.Thread(target=schedule_runner, daemon=True).start() threading.Thread(target=run_flask, daemon=True).start() log(f"NotifyPulse V{VERSION} started — {min_sec//60}–{max_sec//60}min") if get_setting("startup_toast", True): send_toast(f"V{VERSION} running! Web UI → localhost:{WEB_PORT}") if get_setting("auto_open_browser", True): threading.Timer(1.5, lambda: webbrowser.open(f"http://localhost:{WEB_PORT}")).start() threading.Thread(target=run_tray, daemon=True).start() root_instance.mainloop() if __name__ == "__main__": main()