Files
Goddess/notifier.py
2026-04-12 21:55:26 +02:00

1252 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# NotifyPulse - Multi-Profile Block Architecture
# Config: AppData/Roaming/NotifyPulse/
# Web UI: http://localhost:5000
# PWA: http://<your-lan-ip>:5000/pwa
from __future__ import annotations
from queue import Empty, Queue
import sys, os, ctypes, ctypes.wintypes, threading, time, random, re, json
import webbrowser, shutil, hashlib, base64, mimetypes
import subprocess
from datetime import datetime
from pathlib import Path
from collections import deque
# ── Dependency check ──────────────────────────────────────────────────────────
try:
import keyboard
from winotify import Notification, audio
import pystray
from PIL import Image, ImageDraw, ImageTk
import schedule
import tkinter as tk
from flask import Flask, jsonify, request, send_file, abort, Response
from flask_cors import CORS
except ImportError as e:
import tkinter as tk
from tkinter import messagebox
root = tk.Tk(); root.withdraw()
messagebox.showerror("Missing dependency",
f"Run: pip install -r requirements.txt\n\nMissing: {e}")
sys.exit(1)
# ── Constants ─────────────────────────────────────────────────────────────────
HOTKEY = "F13"
WEB_PORT = 5000
DEFAULT_APP_NAME = "NotifyPulse"
VERSION = "3.0.0"
MOBILE_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".webp", ".heic", ".heif")
SPI_SETDESKWALLPAPER = 0x0014
SPIF_UPDATEINIFILE = 0x01
SPIF_SENDCHANGE = 0x02
# ── Available Blocks ──────────────────────────────────────────────────────────
# Each block has an id, display name, description, and optional required dirs
ALL_BLOCKS = [
{
"id": "notifications",
"name": "Notifications",
"description": "Random & scheduled toast notifications",
"icon": "🔔",
"dirs": [],
},
{
"id": "wallpaper",
"name": "Wallpaper",
"description": "Random desktop wallpaper changes",
"icon": "🖼️",
"dirs": ["wallpapers"],
},
{
"id": "overlay",
"name": "Overlay",
"description": "Fullscreen image overlay on monitor",
"icon": "",
"dirs": ["overlay"],
},
{
"id": "timer",
"name": "Timer",
"description": "Scheduled time-based triggers",
"icon": "⏱️",
"dirs": [],
},
{
"id": "mobile_wallpaper",
"name": "Mobile Wallpaper",
"description": "Push wallpapers to phone via PWA",
"icon": "📱",
"dirs": ["mobile"],
},
]
# ── Paths ─────────────────────────────────────────────────────────────────────
APPDATA_DIR: Path | None = None
USECASES_FILE: Path | None = None # usecases.json
SETTINGS_FILE: Path | None = None # global settings.json
ICON_FILE: Path | None = None
# ── Global state ──────────────────────────────────────────────────────────────
paused: bool = False
state_lock = threading.Lock()
tray_icon = None
root_instance = None
APP_NAME: str = DEFAULT_APP_NAME
APP_ID: str = "NotifyPulse"
_active_usecase_id: str = "" # which usecase is currently running
_usecases: list = [] # list of usecase dicts
_usecases_lock = threading.Lock()
_notification_queue: list = []
_entries: list = []
_entries_lock = threading.Lock()
_file_mtime: float = 0.0
_next_fire_at: float = 0.0
_active_overlays: list = []
_overlays_lock = threading.Lock()
_default_wallpaper: str = ""
_last_wallpaper: str = ""
_pwa_clients: dict = {}
_pwa_clients_lock = threading.Lock()
_pending_wallpapers: dict = {}
_pending_lock = threading.Lock()
_log: deque = deque(maxlen=200)
_log_lock = threading.Lock()
_settings: dict = {}
_notification_wake = threading.Event()
_reload_flag = threading.Event() # set by apply_usecase to reload without firing
_config_monitor = None
def _ps_quote_literal(path: Path) -> str:
return str(path).replace("'", "''")
def _ps_read_file_utf8(path: Path) -> str | None:
if os.name != "nt":
return None
quoted = _ps_quote_literal(path)
cmd = (
"$ErrorActionPreference='Stop';"
f"$p='{quoted}';"
"if (Test-Path -LiteralPath $p) {"
" $b=[IO.File]::ReadAllBytes($p);"
" [Console]::Out.Write('1:' + [Convert]::ToBase64String($b));"
"} else {"
" [Console]::Out.Write('0:');"
"}"
)
try:
out = subprocess.check_output(
["powershell", "-NoProfile", "-Command", cmd],
encoding="ascii", errors="ignore",
).strip()
if not out.startswith("1:"):
return None
payload = out[2:]
return "" if payload == "" else base64.b64decode(payload).decode("utf-8")
except Exception:
return None
def _ps_write_file_utf8(path: Path, content: str) -> bool:
if os.name != "nt":
return False
quoted = _ps_quote_literal(path)
payload = base64.b64encode(content.encode("utf-8")).decode("ascii")
cmd = (
"$ErrorActionPreference='Stop';"
f"$p='{quoted}';"
"$d=[IO.Path]::GetDirectoryName($p);"
"if ($d) { [IO.Directory]::CreateDirectory($d) | Out-Null };"
f"[IO.File]::WriteAllBytes($p,[Convert]::FromBase64String('{payload}'));"
)
try:
subprocess.check_call(
["powershell", "-NoProfile", "-Command", cmd],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
return True
except Exception:
return False
# ── Logging ───────────────────────────────────────────────────────────────────
def log(msg: str, level: str = "info"):
ts = datetime.now().strftime("%H:%M:%S")
entry = {"time": ts, "msg": msg, "level": level}
with _log_lock:
_log.appendleft(entry)
# ── Settings ──────────────────────────────────────────────────────────────────
DEFAULT_SETTINGS = {
"overlay_stretch": False,
"overlay_opacity": 0.4,
"overlay_duration": 6,
"overlay_monitor": 0, # 0 = primary, -1 = all monitors
"startup_toast": True,
"hotkey": "F13",
"web_port": 5000,
"theme": "dark",
"notify_sound": True,
"auto_open_browser": True,
"active_usecase": "", # empty = no usecase selected yet
"app_name": "NotifyPulse", # shown before any usecase is selected
"minimize_to_tray": True, # hide window to tray instead of closing
"log_max_entries": 100, # how many log entries to keep in memory
"notification_duration": 5, # toast display duration in seconds
"wallpaper_fit": "fill", # fill | fit | stretch | center | tile
"pwa_port": 5000, # port exposed to the LAN for the PWA
"run_on_startup": False, # register in Windows startup
"confirm_delete": True, # show confirmation before deleting usecases
"entry_display_mode": "percent", # percent | chance — default entry list display
}
def load_settings() -> dict:
global _settings
if SETTINGS_FILE and SETTINGS_FILE.exists():
try:
raw = json.loads(SETTINGS_FILE.read_text("utf-8"))
_settings = {**DEFAULT_SETTINGS, **raw}
return _settings
except Exception:
pass
_settings = dict(DEFAULT_SETTINGS)
return _settings
def save_settings():
if SETTINGS_FILE:
try:
SETTINGS_FILE.write_text(json.dumps(_settings, indent=2), "utf-8")
except Exception as e:
log(f"Settings save error: {e}", "error")
def get_setting(key: str, default=None):
return _settings.get(key, DEFAULT_SETTINGS.get(key, default))
# ── Usecase management ────────────────────────────────────────────────────────
DEFAULT_USECASE = {
"id": "",
"name": "Default",
"color": "#4f8ef7",
"blocks": ["notifications", "wallpaper", "overlay", "timer", "mobile_wallpaper"],
"min_interval": 10,
"max_interval": 30,
"notifications": [],
}
def usecase_dir(uc: dict) -> Path:
"""Return the folder for this usecase inside APPDATA_DIR."""
safe = re.sub(r"[^A-Za-z0-9_\-]", "_", uc["id"])
return APPDATA_DIR / "usecases" / safe
def ensure_usecase_dirs(uc: dict):
"""Create all required dirs for enabled blocks."""
base = usecase_dir(uc)
base.mkdir(parents=True, exist_ok=True)
enabled = set(uc.get("blocks", []))
for block in ALL_BLOCKS:
if block["id"] in enabled:
for d in block["dirs"]:
(base / d).mkdir(exist_ok=True)
def load_usecases() -> list:
global _usecases
if USECASES_FILE and USECASES_FILE.exists():
try:
raw = json.loads(USECASES_FILE.read_text("utf-8"))
if isinstance(raw, list):
_usecases = raw
elif isinstance(raw, dict):
# Wrapped format — shouldn't happen but handle it
_usecases = list(raw.values()) if raw else []
else:
_usecases = []
return _usecases
except Exception:
pass
# First run — create a sample usecase
sample = {
"id": "wellness",
"name": "Wellness",
"color": "#22c55e",
"blocks": ["notifications", "wallpaper", "overlay"],
"min_interval": 10,
"max_interval": 30,
"notifications": [
"Take a short break and stretch! | 35%",
"Drink some water — stay hydrated! | 30%",
"Check your posture! | 20%",
"Deep breath — you got this! | 15%",
"change.wallpaper | 20%",
"show.overlay | 15%",
"Morning standup time! | 09:00",
"End of day — wrap up! | 17:30",
],
}
_usecases = [sample]
save_usecases()
return _usecases
def save_usecases():
if USECASES_FILE:
USECASES_FILE.write_text(json.dumps(_usecases, indent=2), "utf-8")
def get_usecase(uid: str) -> dict | None:
if not uid or not isinstance(uid, str):
return None
with _usecases_lock:
for uc in _usecases:
if isinstance(uc, dict) and uc.get("id") == uid:
return uc
return None
def active_usecase() -> dict | None:
return get_usecase(_active_usecase_id)
# ── Config parsing ─────────────────────────────────────────────────────────────
class Entry:
__slots__ = ("text", "weight", "trigger_time")
def __init__(self, text, weight, trigger_time):
self.text = text
self.weight = weight
self.trigger_time = trigger_time
def parse_entries(lines: list[str]) -> list[Entry]:
entries = []
for raw in lines:
line = raw.strip()
if not line or line.startswith("#"):
continue
if line.startswith("@"):
continue
if "|" in line:
text, tag = [p.strip() for p in line.split("|", 1)]
else:
text, tag = line, None
if tag and re.match(r"^\d{1,2}:\d{2}$", tag):
h, m = tag.split(":")
h, m = int(h), int(m)
if 0 <= h <= 23 and 0 <= m <= 59:
entries.append(Entry(text, None, f"{h:02d}:{m:02d}"))
else:
entries.append(Entry(text, 1.0, None)) # invalid time → treat as weighted
elif tag and tag.endswith("%"):
try:
w = float(tag[:-1])
except ValueError:
w = 1.0
entries.append(Entry(text, w, None))
else:
entries.append(Entry(text, 1.0, None))
return entries
def apply_usecase(uc: dict, silent=False):
global _active_usecase_id, APP_NAME, APP_ID
global _entries, _next_fire_at, _file_mtime
_active_usecase_id = uc["id"]
APP_NAME = uc.get("name", DEFAULT_APP_NAME)
APP_ID = re.sub(r"[^A-Za-z0-9]", "", APP_NAME) or "NotifyPulse"
_settings["active_usecase"] = uc["id"]
save_settings()
lines = uc.get("notifications", [])
entries = parse_entries(lines)
mn = uc.get("min_interval", 10) * 60
mx = max(mn, uc.get("max_interval", 30) * 60) # guard: mx must be >= mn
schedule.clear()
for entry in (e for e in entries if e.trigger_time):
def make_job(e):
def job(): fire_entry(e)
return job
schedule.every().day.at(entry.trigger_time).do(make_job(entry))
with _entries_lock:
_entries = entries
interval = random.randint(mn, mx)
_next_fire_at = time.time() + interval
_reload_flag.set()
_notification_wake.set()
ensure_usecase_dirs(uc)
update_tray_title()
if not silent:
log(f"Switched to usecase '{APP_NAME}'{mn//60}{mx//60}min, {len(entries)} entries")
# ── Background worker ─────────────────────────────────────────────────────────
class BackgroundWorker:
def __init__(self):
self.queue = Queue()
self.thread: threading.Thread | None = None
self.lock = threading.Lock()
def enqueue(self, fn, *args, **kwargs):
self.queue.put((fn, args, kwargs))
self._ensure_thread()
def _ensure_thread(self):
with self.lock:
if self.thread and self.thread.is_alive():
return
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
def _run(self):
while True:
try:
fn, args, kwargs = self.queue.get(timeout=5)
except Empty:
with self.lock:
if self.queue.empty():
self.thread = None
return
continue
try:
fn(*args, **kwargs)
except Exception as exc:
log(f"Background task error: {exc}", "error")
finally:
self.queue.task_done()
_bg = BackgroundWorker()
def enqueue_bg(fn, *args, **kwargs):
_bg.enqueue(fn, *args, **kwargs)
# ── Block helpers ─────────────────────────────────────────────────────────────
def block_enabled(block_id: str) -> bool:
uc = active_usecase()
if not uc:
return False
return block_id in uc.get("blocks", [])
def uc_dir(sub: str = "") -> Path | None:
uc = active_usecase()
if not uc:
return None
base = usecase_dir(uc)
return base / sub if sub else base
# ── Wallpaper block ───────────────────────────────────────────────────────────
def get_current_wallpaper() -> str:
try:
import winreg
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop")
val, _ = winreg.QueryValueEx(key, "WallPaper")
winreg.CloseKey(key)
return val
except Exception:
return ""
def set_wallpaper(path: str):
ctypes.windll.user32.SystemParametersInfoW(
SPI_SETDESKWALLPAPER, 0, path,
SPIF_UPDATEINIFILE | SPIF_SENDCHANGE)
def do_wallpaper_change():
global _last_wallpaper
d = uc_dir("wallpapers")
if not d or not d.exists():
log("Wallpaper folder missing", "warn")
return
files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp")
for f in d.glob(ext)]
if not files:
log("No wallpaper images found", "warn")
return
wp = str(random.choice(files))
_last_wallpaper = wp
set_wallpaper(wp)
log(f"Wallpaper → {Path(wp).name}")
# ── Mobile wallpaper block ────────────────────────────────────────────────────
def _list_mobile_images(folder: Path) -> list[Path]:
files = []
for ext in MOBILE_EXTS:
files.extend(folder.glob(f"*{ext}"))
return [p for p in files if p.is_file()]
def _copy_for_send(src: Path, prefix: str, cache_dir: Path) -> Path:
cache_dir.mkdir(exist_ok=True)
for old in cache_dir.glob(f"{prefix}_*"):
try: old.unlink()
except Exception: pass
ext = src.suffix if src.suffix else ".jpg"
rand = random.randint(1000, 9999)
dest = cache_dir / f"{prefix}_{rand}{ext.lower()}"
shutil.copyfile(src, dest)
return dest
def do_mobile_wallpaper():
d = uc_dir("mobile")
if not d or not d.exists():
log("Mobile wallpaper dir missing", "warn")
return
images = _list_mobile_images(d)
if not images:
log("No images in Mobile folder", "warn")
return
if len(images) >= 2:
chosen = random.sample(images, 2)
else:
chosen = [images[0], images[0]]
cache_dir = d / "_send_cache"
ls_path = _copy_for_send(chosen[0], "Lockscreen", cache_dir)
bg_path = _copy_for_send(chosen[1], "Background", cache_dir)
with _pwa_clients_lock:
client_ids = list(_pwa_clients.keys())
with _pending_lock:
for cid in client_ids if client_ids else ["__broadcast__"]:
_pending_wallpapers[cid] = {
"lockscreen": str(ls_path),
"background": str(bg_path),
}
log(f"Mobile wallpaper queued → {len(client_ids)} client(s)")
# ── Overlay block ─────────────────────────────────────────────────────────────
def _get_monitors():
monitors = []
try:
MonitorEnumProc = ctypes.WINFUNCTYPE(
ctypes.c_bool, ctypes.c_ulong, ctypes.c_ulong,
ctypes.POINTER(ctypes.wintypes.RECT), ctypes.c_double)
def _cb(hMon, hdcMon, lprcMon, dwData):
r = lprcMon.contents
monitors.append((r.left, r.top, r.right - r.left, r.bottom - r.top))
return True
ctypes.windll.user32.EnumDisplayMonitors(None, None, MonitorEnumProc(_cb), 0)
except Exception:
pass
return monitors or [None]
def do_screen_overlay(duration_ms: int = 6000):
d = uc_dir("overlay")
if not d or not d.exists():
log("Overlay folder missing", "warn")
return
files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp")
for f in d.glob(ext)]
if not files:
log("No overlay images found", "warn")
return
img_path = str(random.choice(files))
monitors = _get_monitors()
opacity = float(get_setting("overlay_opacity", 0.4))
stretch = bool(get_setting("overlay_stretch", False))
log(f"Overlay: {Path(img_path).name}{duration_ms//1000}s on {len(monitors)} screen(s)")
def _make_tk_image(pil_img, sw, sh):
if stretch:
return ImageTk.PhotoImage(pil_img.resize((sw, sh), Image.Resampling.LANCZOS))
img_w, img_h = pil_img.size
scale = min(sw / img_w, sh / img_h)
nw, nh = int(img_w * scale), int(img_h * scale)
resized = pil_img.resize((nw, nh), Image.Resampling.LANCZOS)
canvas = Image.new("RGB", (sw, sh), "black")
canvas.paste(resized, ((sw - nw) // 2, (sh - nh) // 2))
return ImageTk.PhotoImage(canvas)
def create_overlay():
try:
pil_img = Image.open(img_path)
except Exception as e:
log(f"Overlay open error: {e}", "error")
return
with state_lock:
is_paused = paused
for mon in monitors:
if mon is None:
tmp = tk.Toplevel(); tmp.withdraw(); tmp.update_idletasks()
sw, sh, mx, my = tmp.winfo_screenwidth(), tmp.winfo_screenheight(), 0, 0
tmp.destroy()
else:
mx, my, sw, sh = mon
ov = tk.Toplevel()
ov.attributes("-topmost", True)
ov.attributes("-alpha", opacity)
ov.overrideredirect(True)
ov.update_idletasks()
hwnd = ctypes.windll.user32.GetAncestor(ov.winfo_id(), 2) or ov.winfo_id()
style = ctypes.windll.user32.GetWindowLongW(hwnd, -20)
ctypes.windll.user32.SetWindowLongW(hwnd, -20, style | 0x80000 | 0x20)
ov.geometry(f"{sw}x{sh}+{mx}+{my}")
try:
tk_img = _make_tk_image(pil_img, sw, sh)
lbl = tk.Label(ov, image=tk_img, bg="black")
lbl.image = tk_img
lbl.pack(fill="both", expand=True)
except Exception as e:
log(f"Overlay render error: {e}", "error")
ov.destroy()
continue
entry = {"window": ov, "remaining_ms": duration_ms,
"after_id": None, "started_at": time.monotonic()}
def on_dismiss(e=entry, w=ov):
with _overlays_lock:
if e in _active_overlays: _active_overlays.remove(e)
if w.winfo_exists(): w.destroy()
def schedule_dismiss(e=entry, w=ov):
e["started_at"] = time.monotonic()
e["after_id"] = w.after(e["remaining_ms"], lambda: on_dismiss(e, w))
entry["schedule_dismiss"] = schedule_dismiss
with _overlays_lock:
_active_overlays.append(entry)
if not is_paused:
schedule_dismiss()
else:
ov.withdraw()
if root_instance:
root_instance.after(0, create_overlay)
# ── Icon ──────────────────────────────────────────────────────────────────────
def make_default_icon(size=256):
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
d = ImageDraw.Draw(img)
s = size / 64
d.ellipse([8*s, 20*s, 56*s, 56*s], fill="#4f8ef7")
d.polygon([(32*s, 4*s), (16*s, 24*s), (48*s, 24*s)], fill="#4f8ef7")
d.rectangle([24*s, 56*s, 40*s, 62*s], fill="#4f8ef7")
return img
def load_tray_icon():
if ICON_FILE and ICON_FILE.exists():
try:
return Image.open(ICON_FILE).convert("RGBA")
except Exception:
pass
return make_default_icon()
# ── Toast ─────────────────────────────────────────────────────────────────────
def send_toast(message: str, title: str | None = None):
icon_path = str(ICON_FILE) if ICON_FILE and ICON_FILE.exists() else ""
t = title or APP_NAME
notif = Notification(app_id=APP_ID, title=t, msg=message,
duration="short", icon=icon_path)
if get_setting("notify_sound", True):
notif.set_audio(audio.Default, loop=False)
notif.show()
log(f"Toast: {message}")
# ── Notification logic ────────────────────────────────────────────────────────
OVERLAY_MAGIC = "show.overlay"
WALLPAPER_MAGIC = "change.wallpaper"
def parse_overlay_duration(text: str) -> int:
parts = text.strip().lower().split(".")
if len(parts) >= 3:
try:
return max(1, int(parts[2])) * 1000
except ValueError:
pass
return int(get_setting("overlay_duration", 6)) * 1000
def pick_weighted(entries):
weighted = [e for e in entries if e.weight is not None]
if not weighted: return None
return random.choices(weighted, weights=[e.weight for e in weighted], k=1)[0]
def _dispatch(text: str):
tl = text.lower().strip()
uc = active_usecase()
blocks = set(uc.get("blocks", [])) if uc else set()
if tl == WALLPAPER_MAGIC:
if "wallpaper" in blocks:
enqueue_bg(do_wallpaper_change)
elif tl == "change.wallpaper.mobile":
if "mobile_wallpaper" in blocks:
enqueue_bg(do_mobile_wallpaper)
elif tl == OVERLAY_MAGIC or tl.startswith(OVERLAY_MAGIC + "."):
if "overlay" in blocks:
dur = parse_overlay_duration(tl)
enqueue_bg(do_screen_overlay, dur)
else:
if "notifications" in blocks:
send_toast(text)
def fire_entry(entry: Entry):
text = entry.text.strip()
with state_lock:
if paused:
_notification_queue.append(text)
log(f"Queued (paused): {text}")
return
_dispatch(text)
def flush_queued(queued: list):
for item in queued:
_dispatch(item)
time.sleep(1.2)
def notification_loop():
global _next_fire_at
while True:
uc = active_usecase()
if uc and _active_usecase_id:
mn = uc.get("min_interval", 10) * 60
mx = max(mn, uc.get("max_interval", 30) * 60) # guard: mx must be >= mn
with _entries_lock:
entries = list(_entries)
# Only fire if this iteration was triggered by the timer expiring,
# not by a reload signal from apply_usecase / PWA activation.
if not _reload_flag.is_set():
chosen = pick_weighted(entries)
if chosen:
fire_entry(chosen)
_reload_flag.clear()
interval = random.randint(mn, mx)
else:
interval = 60
_next_fire_at = time.time() + interval
log(f"Next in {interval//60}m {interval%60}s")
while True:
remaining = _next_fire_at - time.time()
if remaining <= 0:
break
triggered = _notification_wake.wait(min(remaining, 5))
if triggered:
_notification_wake.clear()
break
def schedule_runner():
while True:
schedule.run_pending()
time.sleep(1)
# ── Pause / Resume ────────────────────────────────────────────────────────────
def pause_overlays():
def _do():
with _overlays_lock:
overlays = list(_active_overlays)
for e in overlays:
ov = e["window"]
if not ov.winfo_exists(): continue
if e["after_id"] is not None:
try: ov.after_cancel(e["after_id"])
except Exception: pass
e["remaining_ms"] = max(0, e["remaining_ms"] -
int((time.monotonic() - e["started_at"]) * 1000))
e["after_id"] = None
ov.withdraw()
if root_instance:
root_instance.after(0, _do)
def _do():
with _overlays_lock:
overlays = list(_active_overlays)
for e in overlays:
ov = e["window"]
if not ov.winfo_exists():
with _overlays_lock:
if e in _active_overlays: _active_overlays.remove(e)
continue
if e["remaining_ms"] <= 0:
ov.destroy()
with _overlays_lock:
if e in _active_overlays: _active_overlays.remove(e)
continue
ov.deiconify()
e["schedule_dismiss"]()
if root_instance:
root_instance.after(0, _do)
def toggle_pause():
global paused
queued = []
with state_lock:
paused = not paused
if not paused:
queued = _notification_queue.copy()
_notification_queue.clear()
if paused:
log("Paused")
pause_overlays()
if _default_wallpaper:
threading.Thread(target=set_wallpaper, args=(_default_wallpaper,), daemon=True).start()
else:
log("Resumed")
resume_overlays()
if _last_wallpaper:
threading.Thread(target=set_wallpaper, args=(_last_wallpaper,), daemon=True).start()
threading.Thread(target=flush_queued, args=(queued,), daemon=True).start()
update_tray_title()
# ── Tray ──────────────────────────────────────────────────────────────────────
def update_tray_title():
if tray_icon:
name = APP_NAME or DEFAULT_APP_NAME
tray_icon.title = f"{name} V3 — {'PAUSED' if paused else 'Running'}"
tray_icon.update_menu()
def build_tray_menu():
def status_text(item):
state = "Paused" if paused else "Running"
uc = active_usecase()
name = uc["name"] if uc else "None"
return f"{state} | {name}"
return pystray.Menu(
pystray.MenuItem(status_text, lambda i, it: toggle_pause()),
pystray.MenuItem("Open Web UI", lambda i, it: webbrowser.open(f"http://localhost:{WEB_PORT}")),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Quit", lambda i, it: (i.stop(), os._exit(0))),
)
def run_tray():
global tray_icon
tray_icon = pystray.Icon("NotifyPulse", load_tray_icon(),
"NotifyPulse V3", menu=build_tray_menu())
tray_icon.run()
# ── Flask App ─────────────────────────────────────────────────────────────────
flask_app = Flask(__name__, static_folder=None)
CORS(flask_app, resources={r"/api/*": {"origins": "*"}, r"/pwa*": {"origins": "*"}})
import logging as _logging
_logging.getLogger("werkzeug").setLevel(_logging.ERROR)
PWA_DIR = Path(__file__).parent / "pwa"
UI_DIR = Path(__file__).parent / "ui"
def _img_to_datauri(path: str) -> str | None:
try:
p = Path(path)
mime = mimetypes.guess_type(p.name)[0] or "image/jpeg"
data = base64.b64encode(p.read_bytes()).decode()
return f"data:{mime};base64,{data}"
except Exception:
return None
# ── API: Usecases ─────────────────────────────────────────────────────────────
@flask_app.route("/api/usecases", methods=["GET"])
def api_get_usecases():
with _usecases_lock:
return jsonify({
"usecases": list(_usecases),
"active": _active_usecase_id,
"blocks": ALL_BLOCKS,
})
@flask_app.route("/api/usecases", methods=["POST"])
def api_create_usecase():
data = request.get_json(silent=True) or {}
name = data.get("name", "").strip()
if not name:
return jsonify({"ok": False, "error": "name required"}), 400
uid = re.sub(r"[^a-z0-9_]", "_", name.lower()) + f"_{random.randint(100,999)}"
uc = {
"id": uid,
"name": name,
"color": data.get("color", "#4f8ef7"),
"blocks": data.get("blocks", ["notifications"]),
"min_interval": int(data.get("min_interval", 10)),
"max_interval": int(data.get("max_interval", 30)),
"notifications": data.get("notifications", []),
"dashboard_layout": data.get("dashboard_layout", {}),
}
with _usecases_lock:
_usecases.append(uc)
save_usecases()
ensure_usecase_dirs(uc)
log(f"Usecase created: {name}")
return jsonify({"ok": True, "usecase": uc})
@flask_app.route("/api/usecases/<uid>", methods=["PUT"])
def api_update_usecase(uid):
data = request.get_json(silent=True) or {}
updated_uc = None
is_active = False
with _usecases_lock:
for uc in _usecases:
if uc["id"] == uid:
uc.update({k: v for k, v in data.items() if k != "id"})
save_usecases()
ensure_usecase_dirs(uc)
is_active = (uid == _active_usecase_id)
updated_uc = dict(uc)
log(f"Usecase updated: {uc['name']}")
break
if updated_uc is None:
return jsonify({"ok": False, "error": "not found"}), 404
# Call apply_usecase OUTSIDE the lock to avoid deadlock
if is_active:
apply_usecase(updated_uc)
return jsonify({"ok": True, "usecase": updated_uc})
@flask_app.route("/api/usecases/<uid>", methods=["DELETE"])
def api_delete_usecase(uid):
global _active_usecase_id
new_active_uc = None
with _usecases_lock:
before = len(_usecases)
_usecases[:] = [u for u in _usecases if u["id"] != uid]
if len(_usecases) < before:
if _active_usecase_id == uid:
_active_usecase_id = _usecases[0]["id"] if _usecases else ""
new_active_uc = _usecases[0] if _usecases else None
save_usecases()
log(f"Usecase deleted: {uid}")
else:
return jsonify({"ok": False, "error": "not found"}), 404
# Apply the new active usecase OUTSIDE the lock to avoid deadlock
if new_active_uc:
apply_usecase(new_active_uc, silent=True)
return jsonify({"ok": True})
@flask_app.route("/api/usecases/<uid>/activate", methods=["POST"])
def api_activate_usecase(uid):
uc = get_usecase(uid)
if not uc:
return jsonify({"ok": False, "error": "not found"}), 404
apply_usecase(uc)
if get_setting("startup_toast", True):
send_toast(f"Switched to '{uc['name']}'")
return jsonify({"ok": True, "active": uid})
@flask_app.route("/api/usecases/<uid>/open_folder", methods=["POST"])
def api_open_usecase_folder(uid):
uc = get_usecase(uid)
if not uc:
return jsonify({"ok": False, "error": "not found"}), 404
d = usecase_dir(uc)
d.mkdir(parents=True, exist_ok=True)
if os.name == "nt":
os.startfile(str(d))
return jsonify({"ok": True, "path": str(d)})
# ── API: State ────────────────────────────────────────────────────────────────
@flask_app.route("/api/state")
def api_state():
with _entries_lock:
entries_data = [{"text": e.text, "weight": e.weight, "trigger_time": e.trigger_time}
for e in _entries]
with _log_lock:
log_data = list(_log)
with _pwa_clients_lock:
pwa_count = len(_pwa_clients)
uc = active_usecase()
return jsonify({
"app_name": APP_NAME, # active usecase name (or settings name if none)
"settings_app_name": get_setting("app_name", "NotifyPulse"), # always the settings @name
"version": VERSION,
"paused": paused,
"next_fire_at": _next_fire_at,
"entries": entries_data,
"log": log_data,
"pwa_clients": pwa_count,
"settings": _settings,
"active_usecase": _active_usecase_id,
"usecase": uc,
})
@flask_app.route("/api/pause", methods=["POST"])
def api_pause():
toggle_pause()
return jsonify({"paused": paused})
@flask_app.route("/api/fire_now", methods=["POST"])
def api_fire_now():
with _entries_lock:
entries = list(_entries)
chosen = pick_weighted(entries)
if chosen:
threading.Thread(target=fire_entry, args=(chosen,), daemon=True).start()
return jsonify({"ok": True, "fired": chosen.text})
return jsonify({"ok": False, "error": "No entries"})
@flask_app.route("/api/settings", methods=["GET", "POST"])
def api_settings():
if request.method == "GET":
return jsonify(_settings)
data = request.get_json(silent=True) or {}
_settings.update({k: v for k, v in data.items() if k in DEFAULT_SETTINGS})
save_settings()
return jsonify({"ok": True})
@flask_app.route("/api/log")
def api_log():
limit = min(int(request.args.get("limit", 100)), 200)
with _log_lock:
return jsonify(list(_log)[:limit])
@flask_app.route("/api/test_notification", methods=["POST"])
def api_test_notif():
data = request.get_json(silent=True) or {}
send_toast(data.get("message", "Test from NotifyPulse V3!"))
return jsonify({"ok": True})
@flask_app.route("/api/test_wallpaper", methods=["POST"])
def api_test_wallpaper():
enqueue_bg(do_wallpaper_change)
return jsonify({"ok": True})
@flask_app.route("/api/test_overlay", methods=["POST"])
def api_test_overlay():
enqueue_bg(do_screen_overlay)
return jsonify({"ok": True})
@flask_app.route("/api/test_mobile_wallpaper", methods=["POST"])
def api_test_mobile_wallpaper():
enqueue_bg(do_mobile_wallpaper)
return jsonify({"ok": True})
# ── API: PWA ──────────────────────────────────────────────────────────────────
@flask_app.route("/api/pwa/ping", methods=["POST"])
def pwa_ping():
data = request.get_json(silent=True) or {}
cid = data.get("client_id", "")
if not cid:
return jsonify({"error": "no client_id"}), 400
ua = request.headers.get("User-Agent", "")
with _pwa_clients_lock:
_pwa_clients[cid] = {"last_seen": time.time(), "user_agent": ua}
cutoff = time.time() - 60
expired = [k for k, v in _pwa_clients.items() if v["last_seen"] < cutoff]
for k in expired:
del _pwa_clients[k]
return jsonify({"ok": True, "clients": len(_pwa_clients)})
@flask_app.route("/api/pwa/wallpaper", methods=["GET"])
def pwa_wallpaper_poll():
cid = request.args.get("client_id", "")
if not cid:
return jsonify({"pending": False})
with _pending_lock:
cmd = _pending_wallpapers.pop(cid, None) or _pending_wallpapers.pop("__broadcast__", None)
if not cmd:
return jsonify({"pending": False})
result = {"pending": True}
for key in ("lockscreen", "background"):
if cmd.get(key):
uri = _img_to_datauri(cmd[key])
if uri:
result[key] = uri
return jsonify(result)
@flask_app.route("/api/pwa/app_name")
def pwa_app_name():
with _usecases_lock:
usecases_for_pwa = [
{"id": u["id"], "name": u["name"], "color": u.get("color","#4f8ef7"),
"blocks": u.get("blocks", []), "min_interval": u.get("min_interval",10),
"max_interval": u.get("max_interval",30)}
for u in _usecases
]
resp = jsonify({
"app_name": APP_NAME,
"settings_app_name": get_setting("app_name", "NotifyPulse"),
"active": _active_usecase_id,
"usecases": usecases_for_pwa,
"version": VERSION,
"blocks": ALL_BLOCKS,
})
resp.headers["Cache-Control"] = "no-store"
return resp
@flask_app.route("/api/pwa/splash_image")
def pwa_splash_image():
# Serve main.png from APPDATA_DIR if it exists
for name in ("main.png", "main.jpg", "main.jpeg", "main.webp"):
p = APPDATA_DIR / name
if p and p.exists():
uri = _img_to_datauri(str(p))
return jsonify({"image": uri or "", "has_custom": True})
return jsonify({"image": "", "has_custom": False})
@flask_app.route("/api/pwa/trigger_wallpaper", methods=["POST"])
def pwa_trigger_wallpaper():
enqueue_bg(do_mobile_wallpaper)
return jsonify({"ok": True})
@flask_app.route("/api/pwa/activate_usecase", methods=["POST"])
def pwa_activate_usecase():
data = request.get_json(silent=True) or {}
uid = data.get("usecase_id", "")
uc = get_usecase(uid)
if not uc:
return jsonify({"ok": False, "error": "not found"}), 404
apply_usecase(uc)
return jsonify({"ok": True, "name": uc["name"]})
# ── Static routes ─────────────────────────────────────────────────────────────
@flask_app.route("/pwa")
@flask_app.route("/pwa/")
def pwa_index():
p = PWA_DIR / "index.html"
if p.exists(): return send_file(p)
return "PWA files not found.", 404
@flask_app.route("/pwa/<path:filename>")
def pwa_static(filename):
p = PWA_DIR / filename
if p.exists() and p.is_file(): return send_file(p)
abort(404)
@flask_app.route("/")
def desktop_index():
p = UI_DIR / "index.html"
if p.exists(): return send_file(p)
return "UI files not found.", 404
def run_flask():
flask_app.run(host="0.0.0.0", port=WEB_PORT, debug=False,
use_reloader=False, threaded=True)
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
global APP_NAME, APP_ID, APPDATA_DIR, USECASES_FILE, SETTINGS_FILE, ICON_FILE
global _default_wallpaper, root_instance, _active_usecase_id
from pathlib import Path
base = Path(os.environ.get("APPDATA", Path.home() / "AppData" / "Roaming"))
APPDATA_DIR = base / "NotifyPulse"
APPDATA_DIR.mkdir(parents=True, exist_ok=True)
(APPDATA_DIR / "usecases").mkdir(exist_ok=True)
USECASES_FILE = APPDATA_DIR / "usecases.json"
SETTINGS_FILE = APPDATA_DIR / "settings.json"
ICON_FILE = APPDATA_DIR / "icon.png"
load_settings()
load_usecases()
_default_wallpaper = get_current_wallpaper()
# Use the settings app_name as the heading until a usecase is chosen
APP_NAME = get_setting("app_name", "NotifyPulse")
APP_ID = re.sub(r"[^A-Za-z0-9]", "", APP_NAME) or "NotifyPulse"
# Do NOT auto-activate any usecase — user must pick one via UI/PWA
# (active_usecase in settings is intentionally not restored on startup)
try:
hotkey = get_setting("hotkey", "F13")
keyboard.add_hotkey(hotkey, toggle_pause, suppress=True)
except Exception as e:
log(f"Hotkey error: {e}", "warn")
root_instance = tk.Tk()
root_instance.withdraw()
threading.Thread(target=notification_loop, daemon=True).start()
threading.Thread(target=schedule_runner, daemon=True).start()
threading.Thread(target=run_flask, daemon=True).start()
log(f"NotifyPulse V{VERSION} started")
if get_setting("startup_toast", True):
send_toast(f"V{VERSION} running! Web UI → localhost:{WEB_PORT}")
if get_setting("auto_open_browser", True):
threading.Timer(1.5, lambda: webbrowser.open(f"http://localhost:{WEB_PORT}")).start()
threading.Thread(target=run_tray, daemon=True).start()
root_instance.mainloop()
if __name__ == "__main__":
main()