1318 lines
46 KiB
Python
1318 lines
46 KiB
Python
# NotifyPulse V2 - Rewritten from the ground up
|
||
# Config: AppData/Roaming/NotifyPulse/
|
||
# Web UI: http://localhost:5000
|
||
# PWA: http://<your-lan-ip>:5000/pwa
|
||
|
||
from __future__ import annotations
|
||
from queue import Empty, Queue
|
||
import sys, os, ctypes, ctypes.wintypes, threading, time, random, re, json
|
||
import webbrowser, shutil, hashlib, base64, mimetypes
|
||
import subprocess
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from collections import deque
|
||
|
||
# ── Dependency check ──────────────────────────────────────────────────────────
|
||
try:
|
||
import keyboard
|
||
from winotify import Notification, audio
|
||
import pystray
|
||
from PIL import Image, ImageDraw, ImageTk
|
||
import schedule
|
||
import tkinter as tk
|
||
from flask import Flask, jsonify, request, send_file, abort, Response
|
||
from flask_cors import CORS
|
||
except ImportError as e:
|
||
import tkinter as tk
|
||
from tkinter import messagebox
|
||
root = tk.Tk(); root.withdraw()
|
||
messagebox.showerror("Missing dependency",
|
||
f"Run: pip install -r requirements.txt\n\nMissing: {e}")
|
||
sys.exit(1)
|
||
|
||
# ── Constants ─────────────────────────────────────────────────────────────────
|
||
HOTKEY = "F13"
|
||
WEB_PORT = 5000
|
||
DEFAULT_APP_NAME = "NotifyPulse"
|
||
WALLPAPER_MAGIC = "change.wallpaper"
|
||
OVERLAY_MAGIC = "show.overlay"
|
||
VERSION = "2.0.0"
|
||
MOBILE_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".webp", ".heic", ".heif")
|
||
|
||
SPI_SETDESKWALLPAPER = 0x0014
|
||
SPIF_UPDATEINIFILE = 0x01
|
||
SPIF_SENDCHANGE = 0x02
|
||
|
||
# ── Paths (set in main) ───────────────────────────────────────────────────────
|
||
APPDATA_DIR: Path | None = None
|
||
NOTIFICATIONS_FILE: Path | None = None
|
||
ICON_FILE: Path | None = None
|
||
WALLPAPERS_DIR: Path | None = None
|
||
OVERLAY_DIR: Path | None = None
|
||
MOBILE_DIR: Path | None = None # for PWA lockscreen/background images
|
||
SETTINGS_FILE: Path | None = None
|
||
|
||
# ── Global state ──────────────────────────────────────────────────────────────
|
||
paused: bool = False
|
||
notification_queue: list = []
|
||
state_lock = threading.Lock()
|
||
tray_icon = None
|
||
root_instance = None
|
||
|
||
APP_NAME: str = DEFAULT_APP_NAME
|
||
APP_ID: str = "NotifyPulse"
|
||
|
||
MIN_INTERVAL_SEC: int = 10 * 60
|
||
MAX_INTERVAL_SEC: int = 30 * 60
|
||
|
||
_entries: list = []
|
||
_entries_lock = threading.Lock()
|
||
_file_mtime: float = 0.0
|
||
_next_fire_at: float = 0.0
|
||
|
||
_active_overlays: list = []
|
||
_overlays_lock = threading.Lock()
|
||
|
||
_default_wallpaper: str = ""
|
||
_last_wallpaper: str = ""
|
||
_overlay_stretch: bool = False
|
||
|
||
# Connected PWA clients: {client_id: {"last_seen": float, "user_agent": str}}
|
||
_pwa_clients: dict = {}
|
||
_pwa_clients_lock = threading.Lock()
|
||
|
||
# Pending mobile wallpaper commands: {client_id: {"lockscreen": path, "background": path}}
|
||
_pending_wallpapers: dict = {}
|
||
_pending_lock = threading.Lock()
|
||
|
||
# Log ring buffer
|
||
_log: deque = deque(maxlen=200)
|
||
_log_lock = threading.Lock()
|
||
|
||
# Settings cache (persisted to settings.json)
|
||
_settings: dict = {}
|
||
|
||
_notification_wake = threading.Event()
|
||
|
||
|
||
def _ps_quote_literal(path: Path) -> str:
|
||
return str(path).replace("'", "''")
|
||
|
||
|
||
def _ps_read_file_utf8(path: Path) -> str | None:
|
||
if os.name != "nt":
|
||
return None
|
||
quoted = _ps_quote_literal(path)
|
||
cmd = (
|
||
"$ErrorActionPreference='Stop';"
|
||
f"$p='{quoted}';"
|
||
"if (Test-Path -LiteralPath $p) {"
|
||
" $b=[IO.File]::ReadAllBytes($p);"
|
||
" [Console]::Out.Write('1:' + [Convert]::ToBase64String($b));"
|
||
"} else {"
|
||
" [Console]::Out.Write('0:');"
|
||
"}"
|
||
)
|
||
try:
|
||
out = subprocess.check_output(
|
||
["powershell", "-NoProfile", "-Command", cmd],
|
||
encoding="ascii",
|
||
errors="ignore",
|
||
).strip()
|
||
if not out.startswith("1:"):
|
||
return None
|
||
payload = out[2:]
|
||
if payload == "":
|
||
return ""
|
||
return base64.b64decode(payload).decode("utf-8")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _ps_write_file_utf8(path: Path, content: str) -> bool:
|
||
if os.name != "nt":
|
||
return False
|
||
quoted = _ps_quote_literal(path)
|
||
payload = base64.b64encode(content.encode("utf-8")).decode("ascii")
|
||
cmd = (
|
||
"$ErrorActionPreference='Stop';"
|
||
f"$p='{quoted}';"
|
||
"$d=[IO.Path]::GetDirectoryName($p);"
|
||
"if ($d) { [IO.Directory]::CreateDirectory($d) | Out-Null };"
|
||
f"[IO.File]::WriteAllBytes($p,[Convert]::FromBase64String('{payload}'));"
|
||
)
|
||
try:
|
||
subprocess.check_call(
|
||
["powershell", "-NoProfile", "-Command", cmd],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _real_appdata_dir() -> Path:
|
||
return Path(os.environ.get("USERPROFILE", str(Path.home()))) / "AppData" / "Roaming" / "NotifyPulse"
|
||
|
||
|
||
def notifications_file_exists() -> bool:
|
||
p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt")
|
||
if os.name == "nt":
|
||
quoted = _ps_quote_literal(p)
|
||
cmd = f"$p='{quoted}'; if (Test-Path -LiteralPath $p) {{ '1' }} else {{ '0' }}"
|
||
try:
|
||
out = subprocess.check_output(
|
||
["powershell", "-NoProfile", "-Command", cmd],
|
||
encoding="ascii",
|
||
errors="ignore",
|
||
).strip()
|
||
return out == "1"
|
||
except Exception:
|
||
pass
|
||
return p.exists()
|
||
|
||
|
||
def notifications_file_mtime() -> float:
|
||
p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt")
|
||
if os.name == "nt":
|
||
quoted = _ps_quote_literal(p)
|
||
cmd = (
|
||
"$ErrorActionPreference='Stop';"
|
||
f"$p='{quoted}';"
|
||
"if (Test-Path -LiteralPath $p) {"
|
||
" [Console]::Out.Write((Get-Item -LiteralPath $p).LastWriteTimeUtc.ToFileTimeUtc());"
|
||
"}"
|
||
)
|
||
try:
|
||
out = subprocess.check_output(
|
||
["powershell", "-NoProfile", "-Command", cmd],
|
||
encoding="ascii",
|
||
errors="ignore",
|
||
).strip()
|
||
if out:
|
||
return float(out)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
return p.stat().st_mtime
|
||
except Exception:
|
||
return 0.0
|
||
|
||
|
||
def read_notifications_text() -> str:
|
||
p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt")
|
||
text = _ps_read_file_utf8(p)
|
||
if text is not None:
|
||
return text
|
||
return p.read_text("utf-8")
|
||
|
||
|
||
def write_notifications_text(content: str):
|
||
p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt")
|
||
if _ps_write_file_utf8(p, content):
|
||
return
|
||
p.write_text(content, "utf-8")
|
||
|
||
class BackgroundWorker:
|
||
def __init__(self):
|
||
self.queue = Queue()
|
||
self.thread: threading.Thread | None = None
|
||
self.lock = threading.Lock()
|
||
|
||
def enqueue(self, fn, *args, **kwargs):
|
||
self.queue.put((fn, args, kwargs))
|
||
self._ensure_thread()
|
||
|
||
def _ensure_thread(self):
|
||
with self.lock:
|
||
if self.thread and self.thread.is_alive():
|
||
return
|
||
self.thread = threading.Thread(target=self._run, daemon=True)
|
||
self.thread.start()
|
||
|
||
def _run(self):
|
||
while True:
|
||
try:
|
||
fn, args, kwargs = self.queue.get(timeout=5)
|
||
except Empty:
|
||
with self.lock:
|
||
if self.queue.empty():
|
||
self.thread = None
|
||
return
|
||
continue
|
||
try:
|
||
fn(*args, **kwargs)
|
||
except Exception as exc:
|
||
log(f"Background task error: {exc}", "error")
|
||
finally:
|
||
self.queue.task_done()
|
||
|
||
_background_worker = BackgroundWorker()
|
||
|
||
def enqueue_background(fn, *args, **kwargs):
|
||
_background_worker.enqueue(fn, *args, **kwargs)
|
||
|
||
|
||
# ── Logging ───────────────────────────────────────────────────────────────────
|
||
def log(msg: str, level: str = "info"):
|
||
ts = datetime.now().strftime("%H:%M:%S")
|
||
entry = {"time": ts, "msg": msg, "level": level}
|
||
with _log_lock:
|
||
_log.appendleft(entry)
|
||
|
||
|
||
# ── Settings persistence ──────────────────────────────────────────────────────
|
||
DEFAULT_SETTINGS = {
|
||
"overlay_stretch": False,
|
||
"overlay_opacity": 0.4,
|
||
"overlay_duration": 6,
|
||
"startup_toast": True,
|
||
"hotkey": "F13",
|
||
"web_port": 5000,
|
||
"theme": "dark",
|
||
"notify_sound": True,
|
||
"mobile_wallpaper_dir": "", # relative sub-folder inside MOBILE_DIR
|
||
"pwa_token": "", # simple shared secret for PWA auth (empty = no auth)
|
||
"auto_open_browser": True,
|
||
"log_maxlen": 200,
|
||
}
|
||
|
||
def load_settings() -> dict:
|
||
global _settings
|
||
if SETTINGS_FILE and SETTINGS_FILE.exists():
|
||
try:
|
||
raw = json.loads(SETTINGS_FILE.read_text("utf-8"))
|
||
_settings = {**DEFAULT_SETTINGS, **raw}
|
||
return _settings
|
||
except Exception:
|
||
pass
|
||
_settings = dict(DEFAULT_SETTINGS)
|
||
return _settings
|
||
|
||
def save_settings():
|
||
if SETTINGS_FILE:
|
||
try:
|
||
SETTINGS_FILE.write_text(json.dumps(_settings, indent=2), "utf-8")
|
||
except Exception as e:
|
||
log(f"Settings save error: {e}", "error")
|
||
|
||
def get_setting(key: str, default=None):
|
||
return _settings.get(key, DEFAULT_SETTINGS.get(key, default))
|
||
|
||
|
||
# ── AppData setup ─────────────────────────────────────────────────────────────
|
||
def resolve_appdata_dir() -> Path:
|
||
"""
|
||
Always use the real user AppData path so edits in Explorer match the app.
|
||
If a virtualised APPDATA copy exists and is newer, import it once.
|
||
"""
|
||
env_base = Path(os.environ.get("APPDATA", "") or Path.home() / "AppData" / "Roaming")
|
||
real_base = Path(os.environ.get("USERPROFILE", str(Path.home()))) / "AppData" / "Roaming"
|
||
|
||
env_dir = env_base / "NotifyPulse"
|
||
real_dir = real_base / "NotifyPulse"
|
||
|
||
chosen = real_dir
|
||
try:
|
||
chosen.mkdir(parents=True, exist_ok=True)
|
||
probe = chosen / ".probe"
|
||
probe.write_text("ok", "utf-8")
|
||
probe.unlink(missing_ok=True) # type: ignore[arg-type]
|
||
|
||
# One-time import if the virtualised copy is newer
|
||
try:
|
||
src = env_dir / "notifications.txt"
|
||
dst = chosen / "notifications.txt"
|
||
if src.exists():
|
||
if (not dst.exists()) or src.stat().st_mtime > dst.stat().st_mtime:
|
||
dst.write_bytes(src.read_bytes())
|
||
except Exception:
|
||
pass
|
||
return chosen
|
||
except Exception as exc:
|
||
root = tk.Tk(); root.withdraw()
|
||
from tkinter import messagebox
|
||
messagebox.showerror("NotifyPulse V2", f"Cannot create config folder:\n{chosen}\n\n{exc}")
|
||
sys.exit(1)
|
||
|
||
|
||
# ── Config file ───────────────────────────────────────────────────────────────
|
||
class Entry:
|
||
__slots__ = ("text", "weight", "trigger_time")
|
||
def __init__(self, text, weight, trigger_time):
|
||
self.text = text
|
||
self.weight = weight
|
||
self.trigger_time = trigger_time
|
||
|
||
|
||
def create_default_config():
|
||
write_notifications_text(
|
||
"# NotifyPulse V2 - notifications.txt\n"
|
||
"# Reloads automatically on save!\n"
|
||
"#\n"
|
||
"# @name Your App Name\n"
|
||
"# @interval 10 30 <- min/max minutes between random notifications\n"
|
||
"#\n"
|
||
"# Text message | 30% <- random, picked 30% of the time\n"
|
||
"# Text message | 14:30 <- fires daily at 14:30\n"
|
||
"# change.wallpaper | 20% <- changes PC wallpaper randomly\n"
|
||
"# change.wallpaper.mobile <- triggers mobile wallpaper via PWA\n"
|
||
"# show.overlay | 10% <- fullscreen image overlay (6s default)\n"
|
||
"# show.overlay.10 | 10% <- overlay for 10 seconds\n"
|
||
"\n"
|
||
"@name NotifyPulse\n"
|
||
"@interval 10 30\n"
|
||
"\n"
|
||
"Take a short break and stretch! | 35%\n"
|
||
"Drink some water - stay hydrated! | 30%\n"
|
||
"Check your posture! | 20%\n"
|
||
"Deep breath - you got this! | 15%\n"
|
||
"\n"
|
||
"# change.wallpaper | 20%\n"
|
||
"# show.overlay | 15%\n"
|
||
"\n"
|
||
"Morning standup time! | 09:00\n"
|
||
"Afternoon focus block | 14:00\n"
|
||
"End of day - wrap up! | 17:30\n",
|
||
)
|
||
|
||
|
||
def parse_overlay_duration(text: str) -> int:
|
||
parts = text.strip().lower().split(".")
|
||
if len(parts) >= 3:
|
||
try:
|
||
return max(1, int(parts[2])) * 1000
|
||
except ValueError:
|
||
pass
|
||
return int(get_setting("overlay_duration", 6)) * 1000
|
||
|
||
|
||
def is_overlay_entry(text: str) -> bool:
|
||
t = text.strip().lower()
|
||
return t == OVERLAY_MAGIC or t.startswith(OVERLAY_MAGIC + ".")
|
||
|
||
|
||
def is_mobile_wallpaper(text: str) -> bool:
|
||
return text.strip().lower() == "change.wallpaper.mobile"
|
||
|
||
|
||
def load_config():
|
||
log(f"Loading config from {NOTIFICATIONS_FILE}")
|
||
if not notifications_file_exists():
|
||
create_default_config()
|
||
|
||
app_name = DEFAULT_APP_NAME
|
||
min_sec = 10 * 60
|
||
max_sec = 30 * 60
|
||
entries = []
|
||
|
||
for raw in read_notifications_text().splitlines():
|
||
line = raw.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if line.startswith("@"):
|
||
parts = line[1:].split()
|
||
key = parts[0].lower() if parts else ""
|
||
if key == "name" and len(parts) >= 2:
|
||
app_name = " ".join(parts[1:])
|
||
elif key == "interval" and len(parts) == 3:
|
||
try:
|
||
mn = max(1, int(parts[1]))
|
||
mx = max(mn, int(parts[2]))
|
||
min_sec = mn * 60
|
||
max_sec = mx * 60
|
||
except ValueError:
|
||
pass
|
||
continue
|
||
|
||
if "|" in line:
|
||
text, tag = [p.strip() for p in line.split("|", 1)]
|
||
else:
|
||
text, tag = line, None
|
||
|
||
if tag and re.match(r"^\d{1,2}:\d{2}$", tag):
|
||
h, m = tag.split(":")
|
||
entries.append(Entry(text, None, f"{int(h):02d}:{m}"))
|
||
elif tag and tag.endswith("%"):
|
||
try:
|
||
w = float(tag[:-1])
|
||
except ValueError:
|
||
w = 1.0
|
||
entries.append(Entry(text, w, None))
|
||
else:
|
||
entries.append(Entry(text, 1.0, None))
|
||
|
||
return app_name, min_sec, max_sec, entries
|
||
|
||
|
||
def apply_config(app_name, min_sec, max_sec, entries, silent=False):
|
||
global APP_NAME, APP_ID, MIN_INTERVAL_SEC, MAX_INTERVAL_SEC, _entries, _file_mtime
|
||
APP_NAME = app_name
|
||
APP_ID = re.sub(r"[^A-Za-z0-9]", "", app_name) or "NotifyPulse"
|
||
MIN_INTERVAL_SEC = min_sec
|
||
MAX_INTERVAL_SEC = max_sec
|
||
with _entries_lock:
|
||
_entries = entries
|
||
schedule.clear()
|
||
for entry in (e for e in entries if e.trigger_time):
|
||
def make_job(e):
|
||
def job(): fire_entry(e)
|
||
return job
|
||
schedule.every().day.at(entry.trigger_time).do(make_job(entry))
|
||
_file_mtime = notifications_file_mtime()
|
||
update_tray_title()
|
||
_notification_wake.set()
|
||
if not silent:
|
||
log(f"Config reloaded — {min_sec//60}–{max_sec//60}min, {len(entries)} entries")
|
||
|
||
|
||
# ── File watcher ──────────────────────────────────────────────────────────────
|
||
class ConfigFileMonitor:
|
||
def __init__(self, path: Path, on_change, min_interval=1.0, max_interval=4.0):
|
||
self.path = path
|
||
self.on_change = on_change
|
||
self.min_interval = min_interval
|
||
self.max_interval = max_interval
|
||
self._interval = min_interval
|
||
self._lock = threading.Lock()
|
||
self._timer: threading.Timer | None = None
|
||
self._active = False
|
||
|
||
def start(self):
|
||
with self._lock:
|
||
if self._active:
|
||
return
|
||
self._active = True
|
||
self._interval = self.min_interval
|
||
self._schedule(self._interval)
|
||
|
||
def stop(self):
|
||
with self._lock:
|
||
self._active = False
|
||
if self._timer:
|
||
self._timer.cancel()
|
||
self._timer = None
|
||
|
||
def _schedule(self, delay):
|
||
timer = threading.Timer(delay, self._run)
|
||
timer.daemon = True
|
||
self._timer = timer
|
||
timer.start()
|
||
|
||
def _run(self):
|
||
current = notifications_file_mtime()
|
||
|
||
changed = current is not None and current != _file_mtime
|
||
if changed:
|
||
self._interval = self.min_interval
|
||
try:
|
||
self.on_change()
|
||
except Exception as exc:
|
||
log("Config watcher error: {exc}", "error")
|
||
else:
|
||
self._interval = min(self.max_interval, self._interval * 2)
|
||
|
||
with self._lock:
|
||
if self._active:
|
||
self._schedule(self._interval)
|
||
|
||
|
||
def _reload_config_if_changed():
|
||
mtime = notifications_file_mtime()
|
||
if mtime == _file_mtime:
|
||
return
|
||
app_name, min_sec, max_sec, entries = load_config()
|
||
apply_config(app_name, min_sec, max_sec, entries)
|
||
send_toast("Config reloaded!")
|
||
|
||
|
||
_config_monitor: ConfigFileMonitor | None = None
|
||
|
||
|
||
# ── Wallpaper ─────────────────────────────────────────────────────────────────
|
||
def get_current_wallpaper() -> str:
|
||
try:
|
||
import winreg
|
||
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop")
|
||
val, _ = winreg.QueryValueEx(key, "WallPaper")
|
||
winreg.CloseKey(key)
|
||
return val
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def set_wallpaper(path: str):
|
||
ctypes.windll.user32.SystemParametersInfoW(
|
||
SPI_SETDESKWALLPAPER, 0, path,
|
||
SPIF_UPDATEINIFILE | SPIF_SENDCHANGE)
|
||
|
||
|
||
def do_wallpaper_change():
|
||
global _last_wallpaper
|
||
if not WALLPAPERS_DIR or not WALLPAPERS_DIR.exists():
|
||
log("Wallpaper folder missing", "warn")
|
||
return
|
||
files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp")
|
||
for f in WALLPAPERS_DIR.glob(ext)]
|
||
if not files:
|
||
log("No wallpaper images found", "warn")
|
||
return
|
||
wp = str(random.choice(files))
|
||
_last_wallpaper = wp
|
||
set_wallpaper(wp)
|
||
log(f"Wallpaper → {Path(wp).name}")
|
||
|
||
|
||
# ── Mobile wallpaper (PWA push) ───────────────────────────────────────────────
|
||
def _mobile_base_dir() -> Path:
|
||
base = MOBILE_DIR
|
||
sub = str(get_setting("mobile_wallpaper_dir", "")).strip()
|
||
if sub:
|
||
candidate = (MOBILE_DIR / sub).resolve()
|
||
if candidate.exists():
|
||
base = candidate
|
||
else:
|
||
log(f"Mobile subfolder missing: {candidate}", "warn")
|
||
return base
|
||
|
||
|
||
def _list_mobile_images(folder: Path) -> list[Path]:
|
||
files = []
|
||
for ext in MOBILE_EXTS:
|
||
files.extend(folder.glob(f"*{ext}"))
|
||
return [p for p in files if p.is_file()]
|
||
|
||
|
||
def _copy_for_send(src: Path, prefix: str, cache_dir: Path) -> Path:
|
||
cache_dir.mkdir(exist_ok=True)
|
||
for old in cache_dir.glob(f"{prefix}_*"):
|
||
try:
|
||
old.unlink()
|
||
except Exception:
|
||
pass
|
||
ext = src.suffix if src.suffix else ".jpg"
|
||
rand = random.randint(1000, 9999)
|
||
dest = cache_dir / f"{prefix}_{rand}{ext.lower()}"
|
||
shutil.copyfile(src, dest)
|
||
return dest
|
||
|
||
|
||
def do_mobile_wallpaper():
|
||
"""Queue a wallpaper push for all connected PWA clients."""
|
||
if not MOBILE_DIR or not MOBILE_DIR.exists():
|
||
log("Mobile wallpaper dir missing", "warn")
|
||
return
|
||
|
||
base_dir = _mobile_base_dir()
|
||
if not base_dir.exists():
|
||
log("Mobile wallpaper dir missing", "warn")
|
||
return
|
||
|
||
images = _list_mobile_images(base_dir)
|
||
|
||
# Keep support for explicitly named pairs, otherwise pick two random files.
|
||
ls_candidates = [p for p in images if p.stem.lower() == "lockscreen"]
|
||
bg_candidates = [p for p in images if p.stem.lower() == "background"]
|
||
|
||
if not ls_candidates or not bg_candidates:
|
||
unique_images = images.copy()
|
||
if len(unique_images) >= 2:
|
||
chosen = random.sample(unique_images, 2)
|
||
elif unique_images:
|
||
chosen = [unique_images[0], unique_images[0]]
|
||
else:
|
||
log("No images in Mobile folder to send", "warn")
|
||
return
|
||
if not ls_candidates:
|
||
ls_candidates = [chosen[0]]
|
||
if not bg_candidates:
|
||
bg_candidates = [chosen[1]]
|
||
|
||
cache_dir = MOBILE_DIR / "_send_cache"
|
||
ls_path = _copy_for_send(random.choice(ls_candidates), "Lockscreen", cache_dir)
|
||
bg_path = _copy_for_send(random.choice(bg_candidates), "Background", cache_dir)
|
||
|
||
with _pwa_clients_lock:
|
||
client_ids = list(_pwa_clients.keys())
|
||
|
||
if not client_ids:
|
||
log("No PWA clients connected — wallpaper queued for next connection", "info")
|
||
|
||
with _pending_lock:
|
||
for cid in client_ids if client_ids else ["__broadcast__"]:
|
||
_pending_wallpapers[cid] = {
|
||
"lockscreen": str(ls_path),
|
||
"background": str(bg_path),
|
||
}
|
||
|
||
log(f"Mobile wallpaper queued from {base_dir} → {len(client_ids)} client(s)")
|
||
|
||
|
||
# ── Screen Overlay ─────────────────────────────────────────────────────────────
|
||
def _get_monitors():
|
||
monitors = []
|
||
try:
|
||
MonitorEnumProc = ctypes.WINFUNCTYPE(
|
||
ctypes.c_bool, ctypes.c_ulong, ctypes.c_ulong,
|
||
ctypes.POINTER(ctypes.wintypes.RECT), ctypes.c_double)
|
||
def _cb(hMon, hdcMon, lprcMon, dwData):
|
||
r = lprcMon.contents
|
||
monitors.append((r.left, r.top, r.right - r.left, r.bottom - r.top))
|
||
return True
|
||
ctypes.windll.user32.EnumDisplayMonitors(None, None, MonitorEnumProc(_cb), 0)
|
||
except Exception:
|
||
pass
|
||
return monitors or [None]
|
||
|
||
|
||
def do_screen_overlay(duration_ms: int = 6000):
|
||
if not OVERLAY_DIR or not OVERLAY_DIR.exists():
|
||
log("Overlay folder missing", "warn")
|
||
return
|
||
files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp")
|
||
for f in OVERLAY_DIR.glob(ext)]
|
||
if not files:
|
||
log("No overlay images found", "warn")
|
||
return
|
||
|
||
img_path = str(random.choice(files))
|
||
monitors = _get_monitors()
|
||
opacity = float(get_setting("overlay_opacity", 0.4))
|
||
log(f"Overlay: {Path(img_path).name} — {duration_ms//1000}s on {len(monitors)} screen(s)")
|
||
|
||
def _make_tk_image(pil_img, sw, sh):
|
||
if _overlay_stretch:
|
||
return ImageTk.PhotoImage(pil_img.resize((sw, sh), Image.Resampling.LANCZOS))
|
||
img_w, img_h = pil_img.size
|
||
scale = min(sw / img_w, sh / img_h)
|
||
nw, nh = int(img_w * scale), int(img_h * scale)
|
||
resized = pil_img.resize((nw, nh), Image.Resampling.LANCZOS)
|
||
canvas = Image.new("RGB", (sw, sh), "black")
|
||
canvas.paste(resized, ((sw - nw) // 2, (sh - nh) // 2))
|
||
return ImageTk.PhotoImage(canvas)
|
||
|
||
def create_overlay():
|
||
try:
|
||
pil_img = Image.open(img_path)
|
||
except Exception as e:
|
||
log(f"Overlay open error: {e}", "error")
|
||
return
|
||
|
||
with state_lock:
|
||
is_paused = paused
|
||
|
||
for mon in monitors:
|
||
if mon is None:
|
||
tmp = tk.Toplevel(); tmp.withdraw(); tmp.update_idletasks()
|
||
sw, sh, mx, my = tmp.winfo_screenwidth(), tmp.winfo_screenheight(), 0, 0
|
||
tmp.destroy()
|
||
else:
|
||
mx, my, sw, sh = mon
|
||
|
||
ov = tk.Toplevel()
|
||
ov.attributes("-topmost", True)
|
||
ov.attributes("-alpha", opacity)
|
||
ov.overrideredirect(True)
|
||
ov.update_idletasks()
|
||
|
||
# Click-through
|
||
hwnd = ctypes.windll.user32.GetAncestor(ov.winfo_id(), 2) or ov.winfo_id()
|
||
style = ctypes.windll.user32.GetWindowLongW(hwnd, -20)
|
||
ctypes.windll.user32.SetWindowLongW(hwnd, -20, style | 0x80000 | 0x20)
|
||
|
||
ov.geometry(f"{sw}x{sh}+{mx}+{my}")
|
||
|
||
try:
|
||
tk_img = _make_tk_image(pil_img, sw, sh)
|
||
lbl = tk.Label(ov, image=tk_img, bg="black")
|
||
lbl.image = tk_img
|
||
lbl.pack(fill="both", expand=True)
|
||
except Exception as e:
|
||
log(f"Overlay render error: {e}", "error")
|
||
ov.destroy()
|
||
continue
|
||
|
||
entry = {"window": ov, "remaining_ms": duration_ms,
|
||
"after_id": None, "started_at": time.monotonic()}
|
||
|
||
def on_dismiss(e=entry, w=ov):
|
||
with _overlays_lock:
|
||
if e in _active_overlays:
|
||
_active_overlays.remove(e)
|
||
if w.winfo_exists(): w.destroy()
|
||
|
||
def schedule_dismiss(e=entry, w=ov):
|
||
e["started_at"] = time.monotonic()
|
||
e["after_id"] = w.after(e["remaining_ms"], lambda: on_dismiss(e, w))
|
||
|
||
entry["schedule_dismiss"] = schedule_dismiss
|
||
with _overlays_lock:
|
||
_active_overlays.append(entry)
|
||
|
||
if is_paused:
|
||
ov.withdraw()
|
||
else:
|
||
schedule_dismiss()
|
||
|
||
root_instance.after(0, create_overlay)
|
||
|
||
|
||
def pause_overlays():
|
||
def _do():
|
||
with _overlays_lock:
|
||
overlays = list(_active_overlays)
|
||
for e in overlays:
|
||
ov = e["window"]
|
||
if not ov.winfo_exists(): continue
|
||
if e["after_id"] is not None:
|
||
try: ov.after_cancel(e["after_id"])
|
||
except Exception: pass
|
||
e["remaining_ms"] = max(0, e["remaining_ms"] -
|
||
int((time.monotonic() - e["started_at"]) * 1000))
|
||
e["after_id"] = None
|
||
ov.withdraw()
|
||
root_instance.after(0, _do)
|
||
|
||
|
||
def resume_overlays():
|
||
def _do():
|
||
with _overlays_lock:
|
||
overlays = list(_active_overlays)
|
||
for e in overlays:
|
||
ov = e["window"]
|
||
if not ov.winfo_exists():
|
||
with _overlays_lock:
|
||
if e in _active_overlays: _active_overlays.remove(e)
|
||
continue
|
||
if e["remaining_ms"] <= 0:
|
||
ov.destroy()
|
||
with _overlays_lock:
|
||
if e in _active_overlays: _active_overlays.remove(e)
|
||
continue
|
||
ov.deiconify()
|
||
e["schedule_dismiss"]()
|
||
root_instance.after(0, _do)
|
||
|
||
|
||
# ── Icon ──────────────────────────────────────────────────────────────────────
|
||
def make_default_icon(size=256):
|
||
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
|
||
d = ImageDraw.Draw(img)
|
||
s = size / 64
|
||
d.ellipse([8*s, 20*s, 56*s, 56*s], fill="#4f8ef7")
|
||
d.polygon([(32*s, 4*s), (16*s, 24*s), (48*s, 24*s)], fill="#4f8ef7")
|
||
d.rectangle([24*s, 56*s, 40*s, 62*s], fill="#4f8ef7")
|
||
return img
|
||
|
||
|
||
def load_tray_icon():
|
||
if ICON_FILE and ICON_FILE.exists():
|
||
try:
|
||
return Image.open(ICON_FILE).convert("RGBA")
|
||
except Exception:
|
||
pass
|
||
return make_default_icon()
|
||
|
||
|
||
# ── Toast ─────────────────────────────────────────────────────────────────────
|
||
def send_toast(message: str, title: str | None = None):
|
||
icon_path = ""
|
||
if ICON_FILE and ICON_FILE.exists():
|
||
icon_path = str(ICON_FILE)
|
||
elif APPDATA_DIR and (APPDATA_DIR / "icon.ico").exists():
|
||
icon_path = str(APPDATA_DIR / "icon.ico")
|
||
t = title or APP_NAME
|
||
notif = Notification(app_id=APP_ID, title=t, msg=message,
|
||
duration="short", icon=icon_path)
|
||
if get_setting("notify_sound", True):
|
||
notif.set_audio(audio.Default, loop=False)
|
||
notif.show()
|
||
log(f"Toast: {message}")
|
||
|
||
|
||
# ── Notification logic ────────────────────────────────────────────────────────
|
||
def pick_weighted(entries):
|
||
weighted = [e for e in entries if e.weight is not None]
|
||
if not weighted: return None
|
||
return random.choices(weighted, weights=[e.weight for e in weighted], k=1)[0]
|
||
|
||
|
||
def fire_entry(entry: Entry):
|
||
text = entry.text.strip()
|
||
with state_lock:
|
||
if paused:
|
||
notification_queue.append(text)
|
||
log(f"Queued (paused): {text}")
|
||
return
|
||
_dispatch(text)
|
||
|
||
|
||
def _dispatch(text: str):
|
||
tl = text.lower()
|
||
if tl == WALLPAPER_MAGIC:
|
||
enqueue_background(do_wallpaper_change)
|
||
elif is_mobile_wallpaper(tl):
|
||
enqueue_background(do_mobile_wallpaper)
|
||
elif is_overlay_entry(tl):
|
||
dur = parse_overlay_duration(tl)
|
||
enqueue_background(do_screen_overlay, dur)
|
||
else:
|
||
send_toast(text)
|
||
|
||
|
||
def flush_queued(queued: list):
|
||
for item in queued:
|
||
_dispatch(item)
|
||
time.sleep(1.2)
|
||
|
||
|
||
def notification_loop():
|
||
global _next_fire_at
|
||
while True:
|
||
with _entries_lock:
|
||
entries = list(_entries)
|
||
chosen = pick_weighted(entries)
|
||
if chosen:
|
||
fire_entry(chosen)
|
||
interval = random.randint(MIN_INTERVAL_SEC, MAX_INTERVAL_SEC)
|
||
_next_fire_at = time.time() + interval
|
||
log(f"Next in {interval//60}m {interval%60}s")
|
||
while True:
|
||
remaining = _next_fire_at - time.time()
|
||
if remaining <= 0:
|
||
break
|
||
if _notification_wake.is_set():
|
||
_notification_wake.clear()
|
||
break
|
||
wait_time = min(remaining, 5)
|
||
triggered = _notification_wake.wait(wait_time)
|
||
if triggered:
|
||
_notification_wake.clear()
|
||
break
|
||
|
||
|
||
def schedule_runner():
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(10)
|
||
|
||
|
||
# ── Pause / Resume ────────────────────────────────────────────────────────────
|
||
def toggle_pause():
|
||
global paused
|
||
queued = []
|
||
with state_lock:
|
||
paused = not paused
|
||
if not paused:
|
||
queued = notification_queue.copy()
|
||
notification_queue.clear()
|
||
if paused:
|
||
log("Paused")
|
||
pause_overlays()
|
||
if _default_wallpaper:
|
||
threading.Thread(target=set_wallpaper, args=(_default_wallpaper,), daemon=True).start()
|
||
else:
|
||
log("Resumed")
|
||
resume_overlays()
|
||
if _last_wallpaper:
|
||
threading.Thread(target=set_wallpaper, args=(_last_wallpaper,), daemon=True).start()
|
||
threading.Thread(target=flush_queued, args=(queued,), daemon=True).start()
|
||
update_tray_title()
|
||
|
||
|
||
# ── Tray ──────────────────────────────────────────────────────────────────────
|
||
def update_tray_title():
|
||
if tray_icon:
|
||
tray_icon.title = f"{APP_NAME} — {'PAUSED' if paused else 'Running'}"
|
||
tray_icon.update_menu()
|
||
|
||
|
||
def build_tray_menu():
|
||
def status_text(item):
|
||
hotkey = get_setting("hotkey", HOTKEY)
|
||
state = "Paused" if paused else "Running"
|
||
return f"{state} (Hotkey: {hotkey})"
|
||
|
||
return pystray.Menu(
|
||
pystray.MenuItem(status_text, lambda i, it: toggle_pause()),
|
||
pystray.MenuItem("Open Web UI", lambda i, it: webbrowser.open(f"http://localhost:{WEB_PORT}")),
|
||
pystray.Menu.SEPARATOR,
|
||
pystray.MenuItem("Quit", lambda i, it: (i.stop(), os._exit(0))),
|
||
)
|
||
|
||
|
||
def run_tray():
|
||
global tray_icon
|
||
tray_icon = pystray.Icon(APP_NAME, load_tray_icon(),
|
||
f"{APP_NAME} — Running", menu=build_tray_menu())
|
||
tray_icon.run()
|
||
|
||
|
||
# ── Flask API ─────────────────────────────────────────────────────────────────
|
||
flask_app = Flask(__name__, static_folder=None)
|
||
CORS(flask_app, resources={r"/api/*": {"origins": "*"}, r"/pwa*": {"origins": "*"}})
|
||
|
||
# Silence werkzeug
|
||
import logging as _logging
|
||
_logging.getLogger("werkzeug").setLevel(_logging.ERROR)
|
||
|
||
|
||
# ─ Helper: image to base64 data URI ──────────────────────────────────────────
|
||
def _img_to_datauri(path: str) -> str | None:
|
||
try:
|
||
p = Path(path)
|
||
mime = mimetypes.guess_type(p.name)[0] or "image/jpeg"
|
||
data = base64.b64encode(p.read_bytes()).decode()
|
||
return f"data:{mime};base64,{data}"
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
# ─ PWA heartbeat / client tracking ───────────────────────────────────────────
|
||
@flask_app.route("/api/pwa/ping", methods=["POST"])
|
||
def pwa_ping():
|
||
data = request.get_json(silent=True) or {}
|
||
cid = data.get("client_id", "")
|
||
if not cid:
|
||
return jsonify({"error": "no client_id"}), 400
|
||
ua = request.headers.get("User-Agent", "")
|
||
with _pwa_clients_lock:
|
||
_pwa_clients[cid] = {"last_seen": time.time(), "user_agent": ua}
|
||
# Expire old clients (> 60s)
|
||
cutoff = time.time() - 60
|
||
with _pwa_clients_lock:
|
||
expired = [k for k, v in _pwa_clients.items() if v["last_seen"] < cutoff]
|
||
for k in expired:
|
||
del _pwa_clients[k]
|
||
return jsonify({"ok": True, "clients": len(_pwa_clients)})
|
||
|
||
|
||
# ─ Mobile wallpaper poll ─────────────────────────────────────────────────────
|
||
@flask_app.route("/api/pwa/wallpaper", methods=["GET"])
|
||
def pwa_wallpaper_poll():
|
||
"""PWA polls this; returns base64 images if a wallpaper change was queued."""
|
||
cid = request.args.get("client_id", "")
|
||
if not cid:
|
||
return jsonify({"pending": False})
|
||
|
||
with _pending_lock:
|
||
# Check specific client first, then broadcast
|
||
cmd = _pending_wallpapers.pop(cid, None) or _pending_wallpapers.pop("__broadcast__", None)
|
||
|
||
if not cmd:
|
||
return jsonify({"pending": False})
|
||
|
||
result = {"pending": True}
|
||
if cmd.get("lockscreen"):
|
||
uri = _img_to_datauri(cmd["lockscreen"])
|
||
if uri:
|
||
result["lockscreen"] = uri
|
||
if cmd.get("background"):
|
||
uri = _img_to_datauri(cmd["background"])
|
||
if uri:
|
||
result["background"] = uri
|
||
|
||
log(f"Served mobile wallpaper to PWA client {cid[:8]}…")
|
||
return jsonify(result)
|
||
|
||
|
||
# ─ PWA splash image ────────────────────────────────────────────────────────────────
|
||
@flask_app.route("/api/pwa/splash_image")
|
||
def pwa_splash_image():
|
||
try:
|
||
base_dir = _mobile_base_dir()
|
||
if not base_dir.exists():
|
||
return jsonify({"image": ""})
|
||
images = _list_mobile_images(base_dir)
|
||
if not images:
|
||
return jsonify({"image": ""})
|
||
chosen = random.choice(images)
|
||
datauri = _img_to_datauri(str(chosen))
|
||
return jsonify({"image": datauri or ""})
|
||
except Exception as exc:
|
||
log(f"Splash image error: {exc}", "warn")
|
||
return jsonify({"image": ""})
|
||
|
||
|
||
# ─ Trigger mobile wallpaper from PWA UI ─────────────────────────────────────
|
||
@flask_app.route("/api/pwa/trigger_wallpaper", methods=["POST"])
|
||
def pwa_trigger_wallpaper():
|
||
enqueue_background(do_mobile_wallpaper)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
# ─ State API ─────────────────────────────────────────────────────────────────
|
||
@flask_app.route("/api/pwa/app_name")
|
||
def pwa_app_name():
|
||
"""Lightweight endpoint so the PWA can render the app name immediately."""
|
||
resp = jsonify({"app_name": APP_NAME})
|
||
resp.headers["Cache-Control"] = "no-store"
|
||
return resp
|
||
|
||
|
||
@flask_app.route("/api/state")
|
||
def api_state():
|
||
with _entries_lock:
|
||
entries_data = [{"text": e.text, "weight": e.weight, "trigger_time": e.trigger_time}
|
||
for e in _entries]
|
||
with _log_lock:
|
||
log_data = list(_log)
|
||
with _pwa_clients_lock:
|
||
pwa_count = len(_pwa_clients)
|
||
|
||
return jsonify({
|
||
"app_name": APP_NAME,
|
||
"version": VERSION,
|
||
"paused": paused,
|
||
"min_interval": MIN_INTERVAL_SEC // 60,
|
||
"max_interval": MAX_INTERVAL_SEC // 60,
|
||
"next_fire_at": _next_fire_at,
|
||
"entries": entries_data,
|
||
"log": log_data,
|
||
"overlay_stretch": _overlay_stretch,
|
||
"pwa_clients": pwa_count,
|
||
"settings": _settings,
|
||
"config_path": str(NOTIFICATIONS_FILE),
|
||
})
|
||
|
||
|
||
@flask_app.route("/api/pause", methods=["POST"])
|
||
def api_pause():
|
||
toggle_pause()
|
||
return jsonify({"paused": paused})
|
||
|
||
|
||
@flask_app.route("/api/fire_now", methods=["POST"])
|
||
def api_fire_now():
|
||
"""Force fire the next random notification immediately."""
|
||
with _entries_lock:
|
||
entries = list(_entries)
|
||
chosen = pick_weighted(entries)
|
||
if chosen:
|
||
threading.Thread(target=fire_entry, args=(chosen,), daemon=True).start()
|
||
return jsonify({"ok": True, "fired": chosen.text})
|
||
return jsonify({"ok": False, "error": "No entries"})
|
||
|
||
|
||
@flask_app.route("/api/test_notification", methods=["POST"])
|
||
def api_test_notif():
|
||
data = request.get_json(silent=True) or {}
|
||
msg = data.get("message", "Test notification from NotifyPulse!")
|
||
send_toast(msg)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/test_wallpaper", methods=["POST"])
|
||
def api_test_wallpaper():
|
||
enqueue_background(do_wallpaper_change)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/test_overlay", methods=["POST"])
|
||
def api_test_overlay():
|
||
enqueue_background(do_screen_overlay)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/test_mobile_wallpaper", methods=["POST"])
|
||
def api_test_mobile_wallpaper():
|
||
enqueue_background(do_mobile_wallpaper)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@flask_app.route("/api/settings", methods=["GET", "POST"])
|
||
def api_settings():
|
||
global MIN_INTERVAL_SEC, MAX_INTERVAL_SEC, APP_NAME, _overlay_stretch, _next_fire_at
|
||
if request.method == "GET":
|
||
return jsonify(_settings)
|
||
|
||
try:
|
||
data = request.get_json(silent=True) or {}
|
||
changed = False
|
||
|
||
if "name" in data:
|
||
name = data["name"].strip()
|
||
if name:
|
||
APP_NAME = name
|
||
_settings["name"] = name
|
||
changed = True
|
||
|
||
if "min_interval" in data or "max_interval" in data:
|
||
mn = max(1, int(data.get("min_interval", MIN_INTERVAL_SEC // 60)))
|
||
mx = max(mn, int(data.get("max_interval", MAX_INTERVAL_SEC // 60)))
|
||
MIN_INTERVAL_SEC = mn * 60
|
||
MAX_INTERVAL_SEC = mx * 60
|
||
_settings["min_interval"] = mn
|
||
_settings["max_interval"] = mx
|
||
interval = random.randint(MIN_INTERVAL_SEC, MAX_INTERVAL_SEC)
|
||
_next_fire_at = time.time() + interval
|
||
log(f"Interval → {mn}–{mx}min, timer reset")
|
||
_notification_wake.set()
|
||
changed = True
|
||
|
||
# Boolean/generic settings passthrough
|
||
for key in ("overlay_stretch", "overlay_opacity", "overlay_duration",
|
||
"startup_toast", "hotkey", "theme", "notify_sound",
|
||
"auto_open_browser", "pwa_token"):
|
||
if key in data:
|
||
val = data[key]
|
||
_settings[key] = val
|
||
if key == "overlay_stretch":
|
||
_overlay_stretch = bool(val)
|
||
changed = True
|
||
|
||
# Persist to notifications.txt if name/interval changed
|
||
if "name" in data or "min_interval" in data or "max_interval" in data:
|
||
_patch_notifications_file(
|
||
_settings.get("name", APP_NAME),
|
||
MIN_INTERVAL_SEC // 60,
|
||
MAX_INTERVAL_SEC // 60)
|
||
|
||
if changed:
|
||
save_settings()
|
||
update_tray_title()
|
||
|
||
return jsonify({"ok": True})
|
||
except Exception as ex:
|
||
return jsonify({"ok": False, "error": str(ex)}), 500
|
||
|
||
|
||
def _patch_notifications_file(name: str, min_m: int, max_m: int):
|
||
try:
|
||
lines = read_notifications_text().splitlines()
|
||
new_lines = []
|
||
found_name = found_interval = False
|
||
for line in lines:
|
||
s = line.strip()
|
||
if s.startswith("@name"):
|
||
new_lines.append(f"@name {name}"); found_name = True
|
||
elif s.startswith("@interval"):
|
||
new_lines.append(f"@interval {min_m} {max_m}"); found_interval = True
|
||
else:
|
||
new_lines.append(line)
|
||
if not found_name: new_lines.insert(0, f"@name {name}")
|
||
if not found_interval: new_lines.insert(1, f"@interval {min_m} {max_m}")
|
||
write_notifications_text("\n".join(new_lines) + "\n")
|
||
except Exception as e:
|
||
log(f"Patch config error: {e}", "error")
|
||
|
||
|
||
@flask_app.route("/api/entries", methods=["GET", "POST"])
|
||
def api_entries():
|
||
"""Read or replace the raw notifications.txt content."""
|
||
if request.method == "GET":
|
||
try:
|
||
return jsonify({"content": read_notifications_text()})
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
else:
|
||
data = request.get_json(silent=True) or {}
|
||
content = data.get("content", "")
|
||
try:
|
||
write_notifications_text(content)
|
||
return jsonify({"ok": True})
|
||
except Exception as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@flask_app.route("/api/log")
|
||
def api_log():
|
||
limit = min(int(request.args.get("limit", 100)), 200)
|
||
with _log_lock:
|
||
return jsonify(list(_log)[:limit])
|
||
|
||
|
||
# ─ PWA static files ──────────────────────────────────────────────────────────
|
||
PWA_DIR = Path(__file__).parent / "pwa"
|
||
|
||
@flask_app.route("/pwa")
|
||
@flask_app.route("/pwa/")
|
||
def pwa_index():
|
||
p = PWA_DIR / "index.html"
|
||
if p.exists(): return send_file(p)
|
||
return "PWA files not found. Place them in the pwa/ folder.", 404
|
||
|
||
@flask_app.route("/pwa/<path:filename>")
|
||
def pwa_static(filename):
|
||
p = PWA_DIR / filename
|
||
if p.exists() and p.is_file(): return send_file(p)
|
||
abort(404)
|
||
|
||
# Desktop web UI served at /
|
||
@flask_app.route("/")
|
||
def desktop_index():
|
||
p = Path(__file__).parent / "ui" / "index.html"
|
||
if p.exists(): return send_file(p)
|
||
return INLINE_HTML # fallback inline
|
||
|
||
|
||
# ─ Inline fallback HTML (minified at bottom) ─────────────────────────────────
|
||
INLINE_HTML = "" # filled after
|
||
|
||
|
||
def run_flask():
|
||
flask_app.run(host="0.0.0.0", port=WEB_PORT, debug=False,
|
||
use_reloader=False, threaded=True)
|
||
|
||
|
||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||
def main():
|
||
global APP_NAME, APP_ID, APPDATA_DIR, NOTIFICATIONS_FILE, ICON_FILE
|
||
global WALLPAPERS_DIR, OVERLAY_DIR, MOBILE_DIR, SETTINGS_FILE
|
||
global _default_wallpaper, _last_wallpaper, _file_mtime, _next_fire_at, _config_monitor
|
||
global root_instance, _overlay_stretch
|
||
|
||
APPDATA_DIR = resolve_appdata_dir()
|
||
log(f"Using config dir: {APPDATA_DIR}")
|
||
NOTIFICATIONS_FILE = APPDATA_DIR / "notifications.txt"
|
||
ICON_FILE = APPDATA_DIR / "icon.png"
|
||
WALLPAPERS_DIR = APPDATA_DIR / "wallpapers"
|
||
OVERLAY_DIR = APPDATA_DIR / "Overlay"
|
||
MOBILE_DIR = APPDATA_DIR / "Mobile"
|
||
SETTINGS_FILE = APPDATA_DIR / "settings.json"
|
||
|
||
for d in (WALLPAPERS_DIR, OVERLAY_DIR, MOBILE_DIR):
|
||
d.mkdir(exist_ok=True)
|
||
|
||
load_settings()
|
||
_overlay_stretch = get_setting("overlay_stretch", False)
|
||
|
||
_default_wallpaper = get_current_wallpaper()
|
||
|
||
app_name, min_sec, max_sec, entries = load_config()
|
||
apply_config(app_name, min_sec, max_sec, entries, silent=True)
|
||
|
||
_next_fire_at = time.time() + random.randint(min_sec, max_sec)
|
||
_config_monitor = ConfigFileMonitor(NOTIFICATIONS_FILE, _reload_config_if_changed)
|
||
_config_monitor.start()
|
||
|
||
# Hotkey
|
||
try:
|
||
hotkey = get_setting("hotkey", "F13")
|
||
keyboard.add_hotkey(hotkey, toggle_pause, suppress=True)
|
||
except Exception as e:
|
||
log(f"Hotkey error: {e}", "warn")
|
||
|
||
root_instance = tk.Tk()
|
||
root_instance.withdraw()
|
||
|
||
threading.Thread(target=notification_loop, daemon=True).start()
|
||
threading.Thread(target=schedule_runner, daemon=True).start()
|
||
threading.Thread(target=run_flask, daemon=True).start()
|
||
|
||
log(f"NotifyPulse V{VERSION} started — {min_sec//60}–{max_sec//60}min")
|
||
|
||
if get_setting("startup_toast", True):
|
||
send_toast(f"V{VERSION} running! Web UI → localhost:{WEB_PORT}")
|
||
|
||
if get_setting("auto_open_browser", True):
|
||
threading.Timer(1.5, lambda: webbrowser.open(f"http://localhost:{WEB_PORT}")).start()
|
||
|
||
threading.Thread(target=run_tray, daemon=True).start()
|
||
root_instance.mainloop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|