1300 lines
45 KiB
Python
1300 lines
45 KiB
Python
# NotifyPulse - Multi-Profile Block Architecture
|
||
# Config: AppData/Roaming/NotifyPulse/
|
||
# Web UI: http://localhost:5000
|
||
# PWA: http://<your-lan-ip>:5000/pwa
|
||
|
||
from __future__ import annotations
|
||
from queue import Empty, Queue
|
||
import sys, os, ctypes, ctypes.wintypes, threading, time, random, re, json
|
||
import webbrowser, shutil, hashlib, base64, mimetypes
|
||
import subprocess
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from collections import deque
|
||
|
||
# ── Dependency check ──────────────────────────────────────────────────────────
|
||
try:
|
||
import keyboard
|
||
from winotify import Notification, audio
|
||
import pystray
|
||
from PIL import Image, ImageDraw, ImageTk
|
||
import schedule
|
||
import tkinter as tk
|
||
from flask import Flask, jsonify, request, send_file, abort, Response
|
||
from flask_cors import CORS
|
||
except ImportError as e:
|
||
import tkinter as tk
|
||
from tkinter import messagebox
|
||
root = tk.Tk(); root.withdraw()
|
||
messagebox.showerror("Missing dependency",
|
||
f"Run: pip install -r requirements.txt\n\nMissing: {e}")
|
||
sys.exit(1)
|
||
|
||
# ── Constants ─────────────────────────────────────────────────────────────────
|
||
HOTKEY = "F13"
|
||
WEB_PORT = 5000
|
||
DEFAULT_APP_NAME = "NotifyPulse"
|
||
VERSION = "3.0.0"
|
||
MOBILE_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".webp", ".heic", ".heif")
|
||
|
||
SPI_SETDESKWALLPAPER = 0x0014
|
||
SPIF_UPDATEINIFILE = 0x01
|
||
SPIF_SENDCHANGE = 0x02
|
||
|
||
# ── Available Blocks ──────────────────────────────────────────────────────────
|
||
# Each block has an id, display name, description, and optional required dirs
|
||
ALL_BLOCKS = [
|
||
{
|
||
"id": "notifications",
|
||
"name": "Notifications",
|
||
"description": "Random & scheduled toast notifications",
|
||
"icon": "🔔",
|
||
"dirs": [],
|
||
},
|
||
{
|
||
"id": "wallpaper",
|
||
"name": "Wallpaper",
|
||
"description": "Random desktop wallpaper changes",
|
||
"icon": "🖼️",
|
||
"dirs": ["wallpapers"],
|
||
},
|
||
{
|
||
"id": "overlay",
|
||
"name": "Overlay",
|
||
"description": "Fullscreen image overlay on monitor",
|
||
"icon": "✨",
|
||
"dirs": ["overlay"],
|
||
},
|
||
{
|
||
"id": "timer",
|
||
"name": "Timer",
|
||
"description": "Scheduled time-based triggers",
|
||
"icon": "⏱️",
|
||
"dirs": [],
|
||
},
|
||
{
|
||
"id": "mobile_wallpaper",
|
||
"name": "Mobile Wallpaper",
|
||
"description": "Push wallpapers to phone via PWA",
|
||
"icon": "📱",
|
||
"dirs": ["mobile"],
|
||
},
|
||
]
|
||
|
||
# ── Paths ─────────────────────────────────────────────────────────────────────
|
||
APPDATA_DIR: Path | None = None
|
||
USECASES_FILE: Path | None = None # usecases.json
|
||
SETTINGS_FILE: Path | None = None # global settings.json
|
||
ICON_FILE: Path | None = None
|
||
|
||
# ── Global state ──────────────────────────────────────────────────────────────
|
||
paused: bool = False
|
||
state_lock = threading.Lock()
|
||
tray_icon = None
|
||
root_instance = None
|
||
|
||
APP_NAME: str = DEFAULT_APP_NAME
|
||
APP_ID: str = "NotifyPulse"
|
||
|
||
_active_usecase_id: str = "" # which usecase is currently running
|
||
_usecases: list = [] # list of usecase dicts
|
||
_usecases_lock = threading.Lock()
|
||
|
||
_notification_queue: list = []
|
||
_entries: list = []
|
||
_entries_lock = threading.Lock()
|
||
_file_mtime: float = 0.0
|
||
_next_fire_at: float = 0.0
|
||
|
||
_active_overlays: list = []
|
||
_overlays_lock = threading.Lock()
|
||
|
||
_default_wallpaper: str = ""
|
||
_last_wallpaper: str = ""
|
||
|
||
_pwa_clients: dict = {}
|
||
_pwa_clients_lock = threading.Lock()
|
||
|
||
_pending_wallpapers: dict = {}
|
||
_pending_lock = threading.Lock()
|
||
|
||
_log: deque = deque(maxlen=200)
|
||
_log_lock = threading.Lock()
|
||
|
||
_settings: dict = {}
|
||
_notification_wake = threading.Event()
|
||
_reload_flag = threading.Event() # set by apply_usecase to reload without firing
|
||
_config_monitor = None
|
||
|
||
|
||
def _ps_quote_literal(path: Path) -> str:
|
||
return str(path).replace("'", "''")
|
||
|
||
|
||
def _ps_read_file_utf8(path: Path) -> str | None:
|
||
if os.name != "nt":
|
||
return None
|
||
quoted = _ps_quote_literal(path)
|
||
cmd = (
|
||
"$ErrorActionPreference='Stop';"
|
||
f"$p='{quoted}';"
|
||
"if (Test-Path -LiteralPath $p) {"
|
||
" $b=[IO.File]::ReadAllBytes($p);"
|
||
" [Console]::Out.Write('1:' + [Convert]::ToBase64String($b));"
|
||
"} else {"
|
||
" [Console]::Out.Write('0:');"
|
||
"}"
|
||
)
|
||
try:
|
||
out = subprocess.check_output(
|
||
["powershell", "-NoProfile", "-Command", cmd],
|
||
encoding="ascii", errors="ignore",
|
||
).strip()
|
||
if not out.startswith("1:"):
|
||
return None
|
||
payload = out[2:]
|
||
return "" if payload == "" else base64.b64decode(payload).decode("utf-8")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _ps_write_file_utf8(path: Path, content: str) -> bool:
|
||
if os.name != "nt":
|
||
return False
|
||
quoted = _ps_quote_literal(path)
|
||
payload = base64.b64encode(content.encode("utf-8")).decode("ascii")
|
||
cmd = (
|
||
"$ErrorActionPreference='Stop';"
|
||
f"$p='{quoted}';"
|
||
"$d=[IO.Path]::GetDirectoryName($p);"
|
||
"if ($d) { [IO.Directory]::CreateDirectory($d) | Out-Null };"
|
||
f"[IO.File]::WriteAllBytes($p,[Convert]::FromBase64String('{payload}'));"
|
||
)
|
||
try:
|
||
subprocess.check_call(
|
||
["powershell", "-NoProfile", "-Command", cmd],
|
||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
||
)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
# ── Logging ───────────────────────────────────────────────────────────────────
|
||
def log(msg: str, level: str = "info"):
|
||
ts = datetime.now().strftime("%H:%M:%S")
|
||
entry = {"time": ts, "msg": msg, "level": level}
|
||
with _log_lock:
|
||
_log.appendleft(entry)
|
||
|
||
|
||
# ── Settings ──────────────────────────────────────────────────────────────────
|
||
DEFAULT_SETTINGS = {
|
||
"overlay_stretch": False,
|
||
"overlay_opacity": 0.4,
|
||
"overlay_duration": 6,
|
||
"overlay_monitor": 0, # 0 = primary, -1 = all monitors
|
||
"startup_toast": True,
|
||
"hotkey": "F13",
|
||
"web_port": 5000,
|
||
"theme": "dark",
|
||
"notify_sound": True,
|
||
"auto_open_browser": True,
|
||
"active_usecase": "", # empty = no usecase selected yet
|
||
"app_name": "NotifyPulse", # shown before any usecase is selected
|
||
"minimize_to_tray": True, # hide window to tray instead of closing
|
||
"log_max_entries": 100, # how many log entries to keep in memory
|
||
"notification_duration": 5, # toast display duration in seconds
|
||
"wallpaper_fit": "fill", # fill | fit | stretch | center | tile
|
||
"pwa_port": 5000, # port exposed to the LAN for the PWA
|
||
"run_on_startup": False, # register in Windows startup
|
||
"confirm_delete": True, # show confirmation before deleting usecases
|
||
"entry_display_mode": "percent", # percent | chance — default entry list display
|
||
"pwa_bg_blur": 18, # backdrop blur (px) on selector/app screens
|
||
"pwa_bg_opacity": 0.72, # dark overlay opacity on selector screen (0-1)
|
||
}
|
||
|
||
|
||
def load_settings() -> dict:
|
||
global _settings
|
||
if SETTINGS_FILE and SETTINGS_FILE.exists():
|
||
try:
|
||
raw = json.loads(SETTINGS_FILE.read_text("utf-8"))
|
||
_settings = {**DEFAULT_SETTINGS, **raw}
|
||
return _settings
|
||
except Exception:
|
||
pass
|
||
_settings = dict(DEFAULT_SETTINGS)
|
||
return _settings
|
||
|
||
|
||
def save_settings():
|
||
if SETTINGS_FILE:
|
||
try:
|
||
SETTINGS_FILE.write_text(json.dumps(_settings, indent=2), "utf-8")
|
||
except Exception as e:
|
||
log(f"Settings save error: {e}", "error")
|
||
|
||
|
||
def get_setting(key: str, default=None):
|
||
return _settings.get(key, DEFAULT_SETTINGS.get(key, default))
|
||
|
||
|
||
# ── Usecase management ────────────────────────────────────────────────────────
|
||
DEFAULT_USECASE = {
|
||
"id": "",
|
||
"name": "Default",
|
||
"color": "#4f8ef7",
|
||
"blocks": ["notifications", "wallpaper", "overlay", "timer", "mobile_wallpaper"],
|
||
"min_interval": 10,
|
||
"max_interval": 30,
|
||
"notifications": [],
|
||
}
|
||
|
||
|
||
def usecase_dir(uc: dict) -> Path:
|
||
"""Return the folder for this usecase inside APPDATA_DIR."""
|
||
safe = re.sub(r"[^A-Za-z0-9_\-]", "_", uc["id"])
|
||
return APPDATA_DIR / "usecases" / safe
|
||
|
||
|
||
def ensure_usecase_dirs(uc: dict):
|
||
"""Create all required dirs for enabled blocks."""
|
||
base = usecase_dir(uc)
|
||
base.mkdir(parents=True, exist_ok=True)
|
||
enabled = set(uc.get("blocks", []))
|
||
for block in ALL_BLOCKS:
|
||
if block["id"] in enabled:
|
||
for d in block["dirs"]:
|
||
(base / d).mkdir(exist_ok=True)
|
||
|
||
|
||
def load_usecases() -> list:
|
||
global _usecases
|
||
if USECASES_FILE and USECASES_FILE.exists():
|
||
try:
|
||
raw = json.loads(USECASES_FILE.read_text("utf-8"))
|
||
if isinstance(raw, list):
|
||
_usecases = raw
|
||
elif isinstance(raw, dict):
|
||
# Wrapped format — shouldn't happen but handle it
|
||
_usecases = list(raw.values()) if raw else []
|
||
else:
|
||
_usecases = []
|
||
return _usecases
|
||
except Exception:
|
||
pass
|
||
# First run — create a sample usecase
|
||
sample = {
|
||
"id": "wellness",
|
||
"name": "Wellness",
|
||
"color": "#22c55e",
|
||
"blocks": ["notifications", "wallpaper", "overlay"],
|
||
"min_interval": 10,
|
||
"max_interval": 30,
|
||
"notifications": [
|
||
"Take a short break and stretch! | 35%",
|
||
"Drink some water — stay hydrated! | 30%",
|
||
"Check your posture! | 20%",
|
||
"Deep breath — you got this! | 15%",
|
||
"change.wallpaper | 20%",
|
||
"show.overlay | 15%",
|
||
"Morning standup time! | 09:00",
|
||
"End of day — wrap up! | 17:30",
|
||
],
|
||
}
|
||
_usecases = [sample]
|
||
save_usecases()
|
||
return _usecases
|
||
|
||
|
||
def save_usecases():
|
||
if USECASES_FILE:
|
||
USECASES_FILE.write_text(json.dumps(_usecases, indent=2), "utf-8")
|
||
|
||
|
||
def get_usecase(uid: str) -> dict | None:
|
||
if not uid or not isinstance(uid, str):
|
||
return None
|
||
with _usecases_lock:
|
||
for uc in _usecases:
|
||
if isinstance(uc, dict) and uc.get("id") == uid:
|
||
return uc
|
||
return None
|
||
|
||
|
||
def active_usecase() -> dict | None:
|
||
return get_usecase(_active_usecase_id)
|
||
|
||
|
||
# ── Config parsing ─────────────────────────────────────────────────────────────
|
||
class Entry:
|
||
__slots__ = ("text", "weight", "trigger_time")
|
||
def __init__(self, text, weight, trigger_time):
|
||
self.text = text
|
||
self.weight = weight
|
||
self.trigger_time = trigger_time
|
||
|
||
|
||
def parse_entries(lines: list[str]) -> list[Entry]:
|
||
entries = []
|
||
for raw in lines:
|
||
line = raw.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if line.startswith("@"):
|
||
continue
|
||
if "|" in line:
|
||
text, tag = [p.strip() for p in line.split("|", 1)]
|
||
else:
|
||
text, tag = line, None
|
||
|
||
if tag and re.match(r"^\d{1,2}:\d{2}$", tag):
|
||
h, m = tag.split(":")
|
||
h, m = int(h), int(m)
|
||
if 0 <= h <= 23 and 0 <= m <= 59:
|
||
entries.append(Entry(text, None, f"{h:02d}:{m:02d}"))
|
||
else:
|
||
entries.append(Entry(text, 1.0, None)) # invalid time → treat as weighted
|
||
elif tag and tag.endswith("%"):
|
||
try:
|
||
w = float(tag[:-1])
|
||
except ValueError:
|
||
w = 1.0
|
||
entries.append(Entry(text, w, None))
|
||
else:
|
||
entries.append(Entry(text, 1.0, None))
|
||
return entries
|
||
|
||
|
||
def apply_usecase(uc: dict, silent=False):
|
||
global _active_usecase_id, APP_NAME, APP_ID
|
||
global _entries, _next_fire_at, _file_mtime
|
||
_active_usecase_id = uc["id"]
|
||
APP_NAME = uc.get("name", DEFAULT_APP_NAME)
|
||
APP_ID = re.sub(r"[^A-Za-z0-9]", "", APP_NAME) or "NotifyPulse"
|
||
|
||
_settings["active_usecase"] = uc["id"]
|
||
save_settings()
|
||
|
||
lines = uc.get("notifications", [])
|
||
entries = parse_entries(lines)
|
||
|
||
mn = uc.get("min_interval", 10) * 60
|
||
mx = max(mn, uc.get("max_interval", 30) * 60) # guard: mx must be >= mn
|
||
|
||
schedule.clear()
|
||
for entry in (e for e in entries if e.trigger_time):
|
||
def make_job(e):
|
||
def job(): fire_entry(e)
|
||
return job
|
||
schedule.every().day.at(entry.trigger_time).do(make_job(entry))
|
||
|
||
with _entries_lock:
|
||
_entries = entries
|
||
|
||
interval = random.randint(mn, mx)
|
||
_next_fire_at = time.time() + interval
|
||
_reload_flag.set()
|
||
_notification_wake.set()
|
||
|
||
ensure_usecase_dirs(uc)
|
||
|
||
update_tray_title()
|
||
if not silent:
|
||
log(f"Switched to usecase '{APP_NAME}' — {mn//60}–{mx//60}min, {len(entries)} entries")
|
||
|
||
|
||
# ── Background worker ─────────────────────────────────────────────────────────
|
||
class BackgroundWorker:
|
||
def __init__(self):
|
||
self.queue = Queue()
|
||
self.thread: threading.Thread | None = None
|
||
self.lock = threading.Lock()
|
||
|
||
def enqueue(self, fn, *args, **kwargs):
|
||
self.queue.put((fn, args, kwargs))
|
||
self._ensure_thread()
|
||
|
||
def _ensure_thread(self):
|
||
with self.lock:
|
||
if self.thread and self.thread.is_alive():
|
||
return
|
||
self.thread = threading.Thread(target=self._run, daemon=True)
|
||
self.thread.start()
|
||
|
||
def _run(self):
|
||
while True:
|
||
try:
|
||
fn, args, kwargs = self.queue.get(timeout=5)
|
||
except Empty:
|
||
with self.lock:
|
||
if self.queue.empty():
|
||
self.thread = None
|
||
return
|
||
continue
|
||
try:
|
||
fn(*args, **kwargs)
|
||
except Exception as exc:
|
||
log(f"Background task error: {exc}", "error")
|
||
finally:
|
||
self.queue.task_done()
|
||
|
||
|
||
_bg = BackgroundWorker()
|
||
|
||
|
||
def enqueue_bg(fn, *args, **kwargs):
|
||
_bg.enqueue(fn, *args, **kwargs)
|
||
|
||
|
||
# ── Block helpers ─────────────────────────────────────────────────────────────
|
||
def block_enabled(block_id: str) -> bool:
|
||
uc = active_usecase()
|
||
if not uc:
|
||
return False
|
||
return block_id in uc.get("blocks", [])
|
||
|
||
|
||
def uc_dir(sub: str = "") -> Path | None:
|
||
uc = active_usecase()
|
||
if not uc:
|
||
return None
|
||
base = usecase_dir(uc)
|
||
return base / sub if sub else base
|
||
|
||
|
||
# ── Wallpaper block ───────────────────────────────────────────────────────────
|
||
def get_current_wallpaper() -> str:
|
||
try:
|
||
import winreg
|
||
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop")
|
||
val, _ = winreg.QueryValueEx(key, "WallPaper")
|
||
winreg.CloseKey(key)
|
||
return val
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def set_wallpaper(path: str):
|
||
ctypes.windll.user32.SystemParametersInfoW(
|
||
SPI_SETDESKWALLPAPER, 0, path,
|
||
SPIF_UPDATEINIFILE | SPIF_SENDCHANGE)
|
||
|
||
|
||
def do_wallpaper_change():
|
||
global _last_wallpaper
|
||
d = uc_dir("wallpapers")
|
||
if not d or not d.exists():
|
||
log("Wallpaper folder missing", "warn")
|
||
return
|
||
files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp")
|
||
for f in d.glob(ext)]
|
||
if not files:
|
||
log("No wallpaper images found", "warn")
|
||
return
|
||
wp = str(random.choice(files))
|
||
_last_wallpaper = wp
|
||
set_wallpaper(wp)
|
||
log(f"Wallpaper → {Path(wp).name}")
|
||
|
||
|
||
# ── Mobile wallpaper block ────────────────────────────────────────────────────
|
||
def _list_mobile_images(folder: Path) -> list[Path]:
|
||
files = []
|
||
for ext in MOBILE_EXTS:
|
||
files.extend(folder.glob(f"*{ext}"))
|
||
return [p for p in files if p.is_file()]
|
||
|
||
|
||
def _copy_for_send(src: Path, prefix: str, cache_dir: Path) -> Path:
|
||
cache_dir.mkdir(exist_ok=True)
|
||
for old in cache_dir.glob(f"{prefix}_*"):
|
||
try: old.unlink()
|
||
except Exception: pass
|
||
ext = src.suffix if src.suffix else ".jpg"
|
||
rand = random.randint(1000, 9999)
|
||
dest = cache_dir / f"{prefix}_{rand}{ext.lower()}"
|
||
shutil.copyfile(src, dest)
|
||
return dest
|
||
|
||
|
||
def do_mobile_wallpaper():
|
||
d = uc_dir("mobile")
|
||
if not d or not d.exists():
|
||
log("Mobile wallpaper dir missing", "warn")
|
||
return
|
||
images = _list_mobile_images(d)
|
||
if not images:
|
||
log("No images in Mobile folder", "warn")
|
||
return
|
||
|
||
if len(images) >= 2:
|
||
chosen = random.sample(images, 2)
|
||
else:
|
||
chosen = [images[0], images[0]]
|
||
|
||
cache_dir = d / "_send_cache"
|
||
ls_path = _copy_for_send(chosen[0], "Lockscreen", cache_dir)
|
||
bg_path = _copy_for_send(chosen[1], "Background", cache_dir)
|
||
|
||
with _pwa_clients_lock:
|
||
client_ids = list(_pwa_clients.keys())
|
||
|
||
with _pending_lock:
|
||
for cid in client_ids if client_ids else ["__broadcast__"]:
|
||
_pending_wallpapers[cid] = {
|
||
"lockscreen": str(ls_path),
|
||
"background": str(bg_path),
|
||
}
|
||
|
||
log(f"Mobile wallpaper queued → {len(client_ids)} client(s)")
|
||
|
||
|
||
# ── Overlay block ─────────────────────────────────────────────────────────────
|
||
def _get_monitors():
|
||
monitors = []
|
||
try:
|
||
MonitorEnumProc = ctypes.WINFUNCTYPE(
|
||
ctypes.c_bool, ctypes.c_ulong, ctypes.c_ulong,
|
||
ctypes.POINTER(ctypes.wintypes.RECT), ctypes.c_double)
|
||
def _cb(hMon, hdcMon, lprcMon, dwData):
|
||
r = lprcMon.contents
|
||
monitors.append((r.left, r.top, r.right - r.left, r.bottom - r.top))
|
||
return True
|
||
ctypes.windll.user32.EnumDisplayMonitors(None, None, MonitorEnumProc(_cb), 0)
|
||
except Exception:
|
||
pass
|
||
return monitors or [None]
|
||
|
||
|
||
def do_screen_overlay(duration_ms: int = 6000):
|
||
d = uc_dir("overlay")
|
||
if not d or not d.exists():
|
||
log("Overlay folder missing", "warn")
|
||
return
|
||
files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp")
|
||
for f in d.glob(ext)]
|
||
if not files:
|
||
log("No overlay images found", "warn")
|
||
return
|
||
|
||
img_path = str(random.choice(files))
|
||
monitors = _get_monitors()
|
||
opacity = float(get_setting("overlay_opacity", 0.4))
|
||
stretch = bool(get_setting("overlay_stretch", False))
|
||
log(f"Overlay: {Path(img_path).name} — {duration_ms//1000}s on {len(monitors)} screen(s)")
|
||
|
||
def _make_tk_image(pil_img, sw, sh):
|
||
if stretch:
|
||
return ImageTk.PhotoImage(pil_img.resize((sw, sh), Image.Resampling.LANCZOS))
|
||
img_w, img_h = pil_img.size
|
||
scale = min(sw / img_w, sh / img_h)
|
||
nw, nh = int(img_w * scale), int(img_h * scale)
|
||
resized = pil_img.resize((nw, nh), Image.Resampling.LANCZOS)
|
||
canvas = Image.new("RGB", (sw, sh), "black")
|
||
canvas.paste(resized, ((sw - nw) // 2, (sh - nh) // 2))
|
||
return ImageTk.PhotoImage(canvas)
|
||
|
||
def create_overlay():
|
||
try:
|
||
pil_img = Image.open(img_path)
|
||
except Exception as e:
|
||
log(f"Overlay open error: {e}", "error")
|
||
return
|
||
|
||
with state_lock:
|
||
is_paused = paused
|
||
|
||
for mon in monitors:
|
||
if mon is None:
|
||
tmp = tk.Toplevel(); tmp.withdraw(); tmp.update_idletasks()
|
||
sw, sh, mx, my = tmp.winfo_screenwidth(), tmp.winfo_screenheight(), 0, 0
|
||
tmp.destroy()
|
||
else:
|
||
mx, my, sw, sh = mon
|
||
|
||
ov = tk.Toplevel()
|
||
ov.attributes("-topmost", True)
|
||
ov.attributes("-alpha", opacity)
|
||
ov.overrideredirect(True)
|
||
ov.update_idletasks()
|
||
|
||
hwnd = ctypes.windll.user32.GetAncestor(ov.winfo_id(), 2) or ov.winfo_id()
|
||
style = ctypes.windll.user32.GetWindowLongW(hwnd, -20)
|
||
ctypes.windll.user32.SetWindowLongW(hwnd, -20, style | 0x80000 | 0x20)
|
||
|
||
ov.geometry(f"{sw}x{sh}+{mx}+{my}")
|
||
|
||
try:
|
||
tk_img = _make_tk_image(pil_img, sw, sh)
|
||
lbl = tk.Label(ov, image=tk_img, bg="black")
|
||
lbl.image = tk_img
|
||
lbl.pack(fill="both", expand=True)
|
||
except Exception as e:
|
||
log(f"Overlay render error: {e}", "error")
|
||
ov.destroy()
|
||
continue
|
||
|
||
entry = {"window": ov, "remaining_ms": duration_ms,
|
||
"after_id": None, "started_at": time.monotonic(),
|
||
"img_path": img_path}
|
||
|
||
def on_dismiss(e=entry, w=ov):
|
||
with _overlays_lock:
|
||
if e in _active_overlays: _active_overlays.remove(e)
|
||
if w.winfo_exists(): w.destroy()
|
||
|
||
def schedule_dismiss(e=entry, w=ov):
|
||
e["started_at"] = time.monotonic()
|
||
e["after_id"] = w.after(e["remaining_ms"], lambda: on_dismiss(e, w))
|
||
|
||
entry["schedule_dismiss"] = schedule_dismiss
|
||
with _overlays_lock:
|
||
_active_overlays.append(entry)
|
||
|
||
if not is_paused:
|
||
schedule_dismiss()
|
||
else:
|
||
ov.withdraw()
|
||
|
||
if root_instance:
|
||
root_instance.after(0, create_overlay)
|
||
|
||
|
||
# ── Icon ──────────────────────────────────────────────────────────────────────
|
||
def make_default_icon(size=256):
|
||
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
|
||
d = ImageDraw.Draw(img)
|
||
s = size / 64
|
||
d.ellipse([8*s, 20*s, 56*s, 56*s], fill="#4f8ef7")
|
||
d.polygon([(32*s, 4*s), (16*s, 24*s), (48*s, 24*s)], fill="#4f8ef7")
|
||
d.rectangle([24*s, 56*s, 40*s, 62*s], fill="#4f8ef7")
|
||
return img
|
||
|
||
|
||
def load_tray_icon():
|
||
if ICON_FILE and ICON_FILE.exists():
|
||
try:
|
||
return Image.open(ICON_FILE).convert("RGBA")
|
||
except Exception:
|
||
pass
|
||
return make_default_icon()
|
||
|
||
|
||
# ── Toast ─────────────────────────────────────────────────────────────────────
|
||
def send_toast(message: str, title: str | None = None):
|
||
icon_path = str(ICON_FILE) if ICON_FILE and ICON_FILE.exists() else ""
|
||
t = title or APP_NAME
|
||
notif = Notification(app_id=APP_ID, title=t, msg=message,
|
||
duration="short", icon=icon_path)
|
||
if get_setting("notify_sound", True):
|
||
notif.set_audio(audio.Default, loop=False)
|
||
notif.show()
|
||
log(f"Toast: {message}")
|
||
|
||
|
||
# ── Notification logic ────────────────────────────────────────────────────────
|
||
OVERLAY_MAGIC = "show.overlay"
|
||
WALLPAPER_MAGIC = "change.wallpaper"
|
||
|
||
def parse_overlay_duration(text: str) -> int:
|
||
parts = text.strip().lower().split(".")
|
||
if len(parts) >= 3:
|
||
try:
|
||
return max(1, int(parts[2])) * 1000
|
||
except ValueError:
|
||
pass
|
||
return int(get_setting("overlay_duration", 6)) * 1000
|
||
|
||
|
||
def pick_weighted(entries):
|
||
weighted = [e for e in entries if e.weight is not None]
|
||
if not weighted: return None
|
||
return random.choices(weighted, weights=[e.weight for e in weighted], k=1)[0]
|
||
|
||
|
||
def _dispatch(text: str):
|
||
tl = text.lower().strip()
|
||
uc = active_usecase()
|
||
blocks = set(uc.get("blocks", [])) if uc else set()
|
||
|
||
if tl == WALLPAPER_MAGIC:
|
||
if "wallpaper" in blocks:
|
||
enqueue_bg(do_wallpaper_change)
|
||
elif tl == "change.wallpaper.mobile":
|
||
if "mobile_wallpaper" in blocks:
|
||
enqueue_bg(do_mobile_wallpaper)
|
||
elif tl == OVERLAY_MAGIC or tl.startswith(OVERLAY_MAGIC + "."):
|
||
if "overlay" in blocks:
|
||
dur = parse_overlay_duration(tl)
|
||
enqueue_bg(do_screen_overlay, dur)
|
||
else:
|
||
if "notifications" in blocks:
|
||
send_toast(text)
|
||
|
||
|
||
def fire_entry(entry: Entry):
|
||
text = entry.text.strip()
|
||
with state_lock:
|
||
if paused:
|
||
_notification_queue.append(text)
|
||
log(f"Queued (paused): {text}")
|
||
return
|
||
_dispatch(text)
|
||
|
||
|
||
def flush_queued(queued: list):
|
||
for item in queued:
|
||
_dispatch(item)
|
||
time.sleep(1.2)
|
||
|
||
|
||
def notification_loop():
|
||
global _next_fire_at
|
||
while True:
|
||
uc = active_usecase()
|
||
if uc and _active_usecase_id:
|
||
mn = uc.get("min_interval", 10) * 60
|
||
mx = max(mn, uc.get("max_interval", 30) * 60) # guard: mx must be >= mn
|
||
with _entries_lock:
|
||
entries = list(_entries)
|
||
# Only fire if this iteration was triggered by the timer expiring,
|
||
# not by a reload signal from apply_usecase / PWA activation.
|
||
if not _reload_flag.is_set():
|
||
chosen = pick_weighted(entries)
|
||
if chosen:
|
||
fire_entry(chosen)
|
||
_reload_flag.clear()
|
||
interval = random.randint(mn, mx)
|
||
else:
|
||
interval = 60
|
||
_next_fire_at = time.time() + interval
|
||
log(f"Next in {interval//60}m {interval%60}s")
|
||
while True:
|
||
remaining = _next_fire_at - time.time()
|
||
if remaining <= 0:
|
||
break
|
||
triggered = _notification_wake.wait(min(remaining, 5))
|
||
if triggered:
|
||
_notification_wake.clear()
|
||
break
|
||
|
||
|
||
def schedule_runner():
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(1)
|
||
|
||
|
||
# ── Pause / Resume ────────────────────────────────────────────────────────────
|
||
def pause_overlays():
|
||
def _do():
|
||
with _overlays_lock:
|
||
overlays = list(_active_overlays)
|
||
for e in overlays:
|
||
ov = e["window"]
|
||
if not ov.winfo_exists(): continue
|
||
if e["after_id"] is not None:
|
||
try: ov.after_cancel(e["after_id"])
|
||
except Exception: pass
|
||
e["remaining_ms"] = max(0, e["remaining_ms"] -
|
||
int((time.monotonic() - e["started_at"]) * 1000))
|
||
e["after_id"] = None
|
||
ov.withdraw()
|
||
if root_instance:
|
||
root_instance.after(0, _do)
|
||
|
||
|
||
def resume_overlays():
|
||
def _do():
|
||
with _overlays_lock:
|
||
overlays = list(_active_overlays)
|
||
for e in overlays:
|
||
ov = e["window"]
|
||
if not ov.winfo_exists():
|
||
with _overlays_lock:
|
||
if e in _active_overlays: _active_overlays.remove(e)
|
||
continue
|
||
if e["remaining_ms"] <= 0:
|
||
ov.destroy()
|
||
with _overlays_lock:
|
||
if e in _active_overlays: _active_overlays.remove(e)
|
||
continue
|
||
ov.deiconify()
|
||
e["schedule_dismiss"]()
|
||
if root_instance:
|
||
root_instance.after(0, _do)
|
||
|
||
|
||
def toggle_pause():
|
||
global paused
|
||
queued = []
|
||
with state_lock:
|
||
paused = not paused
|
||
if not paused:
|
||
queued = _notification_queue.copy()
|
||
_notification_queue.clear()
|
||
if paused:
|
||
log("Paused")
|
||
pause_overlays()
|
||
if _default_wallpaper:
|
||
threading.Thread(target=set_wallpaper, args=(_default_wallpaper,), daemon=True).start()
|
||
else:
|
||
log("Resumed")
|
||
resume_overlays()
|
||
if _last_wallpaper:
|
||
threading.Thread(target=set_wallpaper, args=(_last_wallpaper,), daemon=True).start()
|
||
threading.Thread(target=flush_queued, args=(queued,), daemon=True).start()
|
||
update_tray_title()
|
||
|
||
|
||
# ── Tray ──────────────────────────────────────────────────────────────────────
|
||
def update_tray_title():
|
||
if tray_icon:
|
||
name = APP_NAME or DEFAULT_APP_NAME
|
||
tray_icon.title = f"{name} V3 — {'PAUSED' if paused else 'Running'}"
|
||
tray_icon.update_menu()
|
||
|
||
|
||
def build_tray_menu():
|
||
def status_text(item):
|
||
state = "Paused" if paused else "Running"
|
||
uc = active_usecase()
|
||
name = uc["name"] if uc else "None"
|
||
return f"{state} | {name}"
|
||
|
||
return pystray.Menu(
|
||
pystray.MenuItem(status_text, lambda i, it: toggle_pause()),
|
||
pystray.MenuItem("Open Web UI", lambda i, it: webbrowser.open(f"http://localhost:{WEB_PORT}")),
|
||
pystray.Menu.SEPARATOR,
|
||
pystray.MenuItem("Quit", lambda i, it: (i.stop(), os._exit(0))),
|
||
)
|
||
|
||
|
||
def run_tray():
|
||
global tray_icon
|
||
tray_icon = pystray.Icon("NotifyPulse", load_tray_icon(),
|
||
"NotifyPulse V3", menu=build_tray_menu())
|
||
tray_icon.run()
|
||
|
||
|
||
# ── Flask App ─────────────────────────────────────────────────────────────────
|
||
flask_app = Flask(__name__, static_folder=None)
|
||
CORS(flask_app, resources={r"/api/*": {"origins": "*"}, r"/pwa*": {"origins": "*"}})
|
||
|
||
import logging as _logging
|
||
_logging.getLogger("werkzeug").setLevel(_logging.ERROR)
|
||
|
||
PWA_DIR = Path(__file__).parent / "pwa"
|
||
UI_DIR = Path(__file__).parent / "ui"
|
||
|
||
|
||
def _img_to_datauri(path: str) -> str | None:
|
||
try:
|
||
p = Path(path)
|
||
mime = mimetypes.guess_type(p.name)[0] or "image/jpeg"
|
||
data = base64.b64encode(p.read_bytes()).decode()
|
||
return f"data:{mime};base64,{data}"
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
# ── API: Usecases ─────────────────────────────────────────────────────────────
|
||
@flask_app.route("/api/usecases", methods=["GET"])
|
||
def api_get_usecases():
|
||
with _usecases_lock:
|
||
return jsonify({
|
||
"usecases": list(_usecases),
|
||
"active": _active_usecase_id,
|
||
"blocks": ALL_BLOCKS,
|
||
})
|
||
|
||
|
||
@flask_app.route("/api/usecases", methods=["POST"])
|
||
def api_create_usecase():
|
||
data = request.get_json(silent=True) or {}
|
||
name = data.get("name", "").strip()
|
||
if not name:
|
||
return jsonify({"ok": False, "error": "name required"}), 400
|
||
|
||
uid = re.sub(r"[^a-z0-9_]", "_", name.lower()) + f"_{random.randint(100,999)}"
|
||
uc = {
|
||
"id": uid,
|
||
"name": name,
|
||
"color": data.get("color", "#4f8ef7"),
|
||
"group": data.get("group", "").strip(),
|
||
"blocks": data.get("blocks", ["notifications"]),
|
||
"min_interval": int(data.get("min_interval", 10)),
|
||
"max_interval": int(data.get("max_interval", 30)),
|
||
"notifications": data.get("notifications", []),
|
||
"dashboard_layout": data.get("dashboard_layout", {}),
|
||
}
|
||
with _usecases_lock:
|
||
_usecases.append(uc)
|
||
save_usecases()
|
||
ensure_usecase_dirs(uc)
|
||
log(f"Usecase created: {name}")
|
||
return jsonify({"ok": True, "usecase": uc})
|
||
|
||
|
||
@flask_app.route("/api/usecases/<uid>", methods=["PUT"])
|
||
def api_update_usecase(uid):
|
||
data = request.get_json(silent=True) or {}
|
||
updated_uc = None
|
||
is_active = False
|
||
with _usecases_lock:
|
||
for uc in _usecases:
|
||
if uc["id"] == uid:
|
||
uc.update({k: v for k, v in data.items() if k != "id"})
|
||
save_usecases()
|
||
ensure_usecase_dirs(uc)
|
||
is_active = (uid == _active_usecase_id)
|
||
updated_uc = dict(uc)
|
||
log(f"Usecase updated: {uc['name']}")
|
||
break
|
||
if updated_uc is None:
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
# Call apply_usecase OUTSIDE the lock to avoid deadlock
|
||
if is_active:
|
||
apply_usecase(updated_uc)
|
||
return jsonify({"ok": True, "usecase": updated_uc})
|
||
|
||
|
||
@flask_app.route("/api/usecases/<uid>", methods=["DELETE"])
|
||
def api_delete_usecase(uid):
|
||
global _active_usecase_id
|
||
new_active_uc = None
|
||
with _usecases_lock:
|
||
before = len(_usecases)
|
||
_usecases[:] = [u for u in _usecases if u["id"] != uid]
|
||
if len(_usecases) < before:
|
||
if _active_usecase_id == uid:
|
||
_active_usecase_id = _usecases[0]["id"] if _usecases else ""
|
||
new_active_uc = _usecases[0] if _usecases else None
|
||
save_usecases()
|
||
log(f"Usecase deleted: {uid}")
|
||
else:
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
# Apply the new active usecase OUTSIDE the lock to avoid deadlock
|
||
if new_active_uc:
|
||
apply_usecase(new_active_uc, silent=True)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/usecases/<uid>/activate", methods=["POST"])
|
||
def api_activate_usecase(uid):
|
||
uc = get_usecase(uid)
|
||
if not uc:
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
apply_usecase(uc)
|
||
if get_setting("startup_toast", True):
|
||
send_toast(f"Switched to '{uc['name']}'")
|
||
return jsonify({"ok": True, "active": uid})
|
||
|
||
|
||
@flask_app.route("/api/usecases/<uid>/open_folder", methods=["POST"])
|
||
def api_open_usecase_folder(uid):
|
||
uc = get_usecase(uid)
|
||
if not uc:
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
d = usecase_dir(uc)
|
||
d.mkdir(parents=True, exist_ok=True)
|
||
if os.name == "nt":
|
||
os.startfile(str(d))
|
||
return jsonify({"ok": True, "path": str(d)})
|
||
|
||
|
||
# ── API: State ────────────────────────────────────────────────────────────────
|
||
@flask_app.route("/api/state")
|
||
def api_state():
|
||
with _entries_lock:
|
||
entries_data = [{"text": e.text, "weight": e.weight, "trigger_time": e.trigger_time}
|
||
for e in _entries]
|
||
with _log_lock:
|
||
log_data = list(_log)
|
||
with _pwa_clients_lock:
|
||
pwa_count = len(_pwa_clients)
|
||
uc = active_usecase()
|
||
|
||
return jsonify({
|
||
"app_name": APP_NAME, # active usecase name (or settings name if none)
|
||
"settings_app_name": get_setting("app_name", "NotifyPulse"), # always the settings @name
|
||
"version": VERSION,
|
||
"paused": paused,
|
||
"next_fire_at": _next_fire_at,
|
||
"entries": entries_data,
|
||
"log": log_data,
|
||
"pwa_clients": pwa_count,
|
||
"settings": _settings,
|
||
"active_usecase": _active_usecase_id,
|
||
"usecase": uc,
|
||
})
|
||
|
||
|
||
@flask_app.route("/api/pause", methods=["POST"])
|
||
def api_pause():
|
||
toggle_pause()
|
||
return jsonify({"paused": paused})
|
||
|
||
|
||
@flask_app.route("/api/fire_now", methods=["POST"])
|
||
def api_fire_now():
|
||
with _entries_lock:
|
||
entries = list(_entries)
|
||
chosen = pick_weighted(entries)
|
||
if chosen:
|
||
threading.Thread(target=fire_entry, args=(chosen,), daemon=True).start()
|
||
return jsonify({"ok": True, "fired": chosen.text})
|
||
return jsonify({"ok": False, "error": "No entries"})
|
||
|
||
|
||
@flask_app.route("/api/settings", methods=["GET", "POST"])
|
||
def api_settings():
|
||
if request.method == "GET":
|
||
return jsonify(_settings)
|
||
data = request.get_json(silent=True) or {}
|
||
_settings.update({k: v for k, v in data.items() if k in DEFAULT_SETTINGS})
|
||
save_settings()
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/log")
|
||
def api_log():
|
||
limit = min(int(request.args.get("limit", 100)), 200)
|
||
with _log_lock:
|
||
return jsonify(list(_log)[:limit])
|
||
|
||
|
||
@flask_app.route("/api/test_notification", methods=["POST"])
|
||
def api_test_notif():
|
||
data = request.get_json(silent=True) or {}
|
||
send_toast(data.get("message", "Test from NotifyPulse V3!"))
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/test_wallpaper", methods=["POST"])
|
||
def api_test_wallpaper():
|
||
enqueue_bg(do_wallpaper_change)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/test_overlay", methods=["POST"])
|
||
def api_test_overlay():
|
||
enqueue_bg(do_screen_overlay)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/test_mobile_wallpaper", methods=["POST"])
|
||
def api_test_mobile_wallpaper():
|
||
enqueue_bg(do_mobile_wallpaper)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
# ── API: PWA ──────────────────────────────────────────────────────────────────
|
||
@flask_app.route("/api/pwa/ping", methods=["POST"])
|
||
def pwa_ping():
|
||
data = request.get_json(silent=True) or {}
|
||
cid = data.get("client_id", "")
|
||
if not cid:
|
||
return jsonify({"error": "no client_id"}), 400
|
||
ua = request.headers.get("User-Agent", "")
|
||
with _pwa_clients_lock:
|
||
_pwa_clients[cid] = {"last_seen": time.time(), "user_agent": ua}
|
||
cutoff = time.time() - 60
|
||
expired = [k for k, v in _pwa_clients.items() if v["last_seen"] < cutoff]
|
||
for k in expired:
|
||
del _pwa_clients[k]
|
||
return jsonify({"ok": True, "clients": len(_pwa_clients)})
|
||
|
||
|
||
@flask_app.route("/api/pwa/wallpaper", methods=["GET"])
|
||
def pwa_wallpaper_poll():
|
||
cid = request.args.get("client_id", "")
|
||
if not cid:
|
||
return jsonify({"pending": False})
|
||
with _pending_lock:
|
||
cmd = _pending_wallpapers.pop(cid, None) or _pending_wallpapers.pop("__broadcast__", None)
|
||
if not cmd:
|
||
return jsonify({"pending": False})
|
||
result = {"pending": True}
|
||
for key in ("lockscreen", "background"):
|
||
if cmd.get(key):
|
||
uri = _img_to_datauri(cmd[key])
|
||
if uri:
|
||
result[key] = uri
|
||
return jsonify(result)
|
||
|
||
|
||
@flask_app.route("/api/pwa/app_name")
|
||
def pwa_app_name():
|
||
with _usecases_lock:
|
||
usecases_for_pwa = [
|
||
{"id": u["id"], "name": u["name"], "color": u.get("color","#4f8ef7"),
|
||
"group": u.get("group", ""),
|
||
"blocks": u.get("blocks", []), "min_interval": u.get("min_interval",10),
|
||
"max_interval": u.get("max_interval",30)}
|
||
for u in _usecases
|
||
]
|
||
resp = jsonify({
|
||
"app_name": APP_NAME,
|
||
"settings_app_name": get_setting("app_name", "NotifyPulse"),
|
||
"active": _active_usecase_id,
|
||
"usecases": usecases_for_pwa,
|
||
"version": VERSION,
|
||
"blocks": ALL_BLOCKS,
|
||
"pwa_bg_blur": get_setting("pwa_bg_blur", 18),
|
||
"pwa_bg_opacity": get_setting("pwa_bg_opacity", 0.72),
|
||
})
|
||
resp.headers["Cache-Control"] = "no-store"
|
||
return resp
|
||
|
||
|
||
@flask_app.route("/api/pwa/splash_image")
|
||
def pwa_splash_image():
|
||
import random as _random
|
||
# 1) Try splash/ folder — pick a random image from it
|
||
splash_dir = APPDATA_DIR / "splash"
|
||
if splash_dir and splash_dir.is_dir():
|
||
candidates = [
|
||
f for f in splash_dir.iterdir()
|
||
if f.is_file() and f.suffix.lower() in (".png", ".jpg", ".jpeg", ".webp")
|
||
]
|
||
if candidates:
|
||
chosen = _random.choice(candidates)
|
||
uri = _img_to_datauri(str(chosen))
|
||
if uri:
|
||
return jsonify({"image": uri, "has_custom": True})
|
||
# 2) Fallback: legacy single main.* file in APPDATA_DIR
|
||
for name in ("main.png", "main.jpg", "main.jpeg", "main.webp"):
|
||
p = APPDATA_DIR / name
|
||
if p and p.exists():
|
||
uri = _img_to_datauri(str(p))
|
||
return jsonify({"image": uri or "", "has_custom": True})
|
||
return jsonify({"image": "", "has_custom": False})
|
||
|
||
|
||
@flask_app.route("/api/pwa/overlay")
|
||
def pwa_overlay():
|
||
"""Return the currently active overlay image (or null when none / paused)."""
|
||
with state_lock:
|
||
is_paused = paused
|
||
if is_paused:
|
||
return jsonify({"active": False, "image": None})
|
||
with _overlays_lock:
|
||
active = [e for e in _active_overlays if e.get("img_path")]
|
||
if not active:
|
||
return jsonify({"active": False, "image": None})
|
||
# Pick the most recently added overlay
|
||
entry = active[-1]
|
||
uri = _img_to_datauri(entry["img_path"])
|
||
if not uri:
|
||
return jsonify({"active": False, "image": None})
|
||
remaining_ms = entry.get("remaining_ms", 0)
|
||
started_at = entry.get("started_at", time.monotonic())
|
||
elapsed_ms = int((time.monotonic() - started_at) * 1000)
|
||
left_ms = max(0, remaining_ms - elapsed_ms)
|
||
return jsonify({"active": True, "image": uri, "remaining_ms": left_ms})
|
||
|
||
|
||
@flask_app.route("/api/pwa/trigger_wallpaper", methods=["POST"])
|
||
def pwa_trigger_wallpaper():
|
||
enqueue_bg(do_mobile_wallpaper)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/pwa/activate_usecase", methods=["POST"])
|
||
def pwa_activate_usecase():
|
||
data = request.get_json(silent=True) or {}
|
||
uid = data.get("usecase_id", "")
|
||
uc = get_usecase(uid)
|
||
if not uc:
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
apply_usecase(uc)
|
||
return jsonify({"ok": True, "name": uc["name"]})
|
||
|
||
|
||
# ── Static routes ─────────────────────────────────────────────────────────────
|
||
@flask_app.route("/pwa")
|
||
@flask_app.route("/pwa/")
|
||
def pwa_index():
|
||
p = PWA_DIR / "index.html"
|
||
if p.exists(): return send_file(p)
|
||
return "PWA files not found.", 404
|
||
|
||
|
||
@flask_app.route("/pwa/<path:filename>")
|
||
def pwa_static(filename):
|
||
p = PWA_DIR / filename
|
||
if p.exists() and p.is_file(): return send_file(p)
|
||
abort(404)
|
||
|
||
|
||
@flask_app.route("/")
|
||
def desktop_index():
|
||
p = UI_DIR / "index.html"
|
||
if p.exists(): return send_file(p)
|
||
return "UI files not found.", 404
|
||
|
||
|
||
def run_flask():
|
||
flask_app.run(host="0.0.0.0", port=WEB_PORT, debug=False,
|
||
use_reloader=False, threaded=True)
|
||
|
||
|
||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||
def main():
|
||
global APP_NAME, APP_ID, APPDATA_DIR, USECASES_FILE, SETTINGS_FILE, ICON_FILE
|
||
global _default_wallpaper, root_instance, _active_usecase_id
|
||
|
||
from pathlib import Path
|
||
base = Path(os.environ.get("APPDATA", Path.home() / "AppData" / "Roaming"))
|
||
APPDATA_DIR = base / "NotifyPulse"
|
||
APPDATA_DIR.mkdir(parents=True, exist_ok=True)
|
||
(APPDATA_DIR / "usecases").mkdir(exist_ok=True)
|
||
(APPDATA_DIR / "splash").mkdir(exist_ok=True)
|
||
|
||
USECASES_FILE = APPDATA_DIR / "usecases.json"
|
||
SETTINGS_FILE = APPDATA_DIR / "settings.json"
|
||
ICON_FILE = APPDATA_DIR / "icon.png"
|
||
|
||
load_settings()
|
||
load_usecases()
|
||
|
||
_default_wallpaper = get_current_wallpaper()
|
||
|
||
# Use the settings app_name as the heading until a usecase is chosen
|
||
APP_NAME = get_setting("app_name", "NotifyPulse")
|
||
APP_ID = re.sub(r"[^A-Za-z0-9]", "", APP_NAME) or "NotifyPulse"
|
||
|
||
# Do NOT auto-activate any usecase — user must pick one via UI/PWA
|
||
# (active_usecase in settings is intentionally not restored on startup)
|
||
|
||
try:
|
||
hotkey = get_setting("hotkey", "F13")
|
||
keyboard.add_hotkey(hotkey, toggle_pause, suppress=True)
|
||
except Exception as e:
|
||
log(f"Hotkey error: {e}", "warn")
|
||
|
||
root_instance = tk.Tk()
|
||
root_instance.withdraw()
|
||
|
||
threading.Thread(target=notification_loop, daemon=True).start()
|
||
threading.Thread(target=schedule_runner, daemon=True).start()
|
||
threading.Thread(target=run_flask, daemon=True).start()
|
||
|
||
log(f"NotifyPulse V{VERSION} started")
|
||
|
||
if get_setting("startup_toast", True):
|
||
send_toast(f"V{VERSION} running! Web UI → localhost:{WEB_PORT}")
|
||
|
||
if get_setting("auto_open_browser", True):
|
||
threading.Timer(1.5, lambda: webbrowser.open(f"http://localhost:{WEB_PORT}")).start()
|
||
|
||
threading.Thread(target=run_tray, daemon=True).start()
|
||
root_instance.mainloop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|