Files
Goddess/notifier.py
TutorialsGHG b011c7ad75 Initial commit
2026-04-12 21:57:21 +02:00

1318 lines
46 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# NotifyPulse V2 - Rewritten from the ground up
# Config: AppData/Roaming/NotifyPulse/
# Web UI: http://localhost:5000
# PWA: http://<your-lan-ip>:5000/pwa
from __future__ import annotations
from queue import Empty, Queue
import sys, os, ctypes, ctypes.wintypes, threading, time, random, re, json
import webbrowser, shutil, hashlib, base64, mimetypes
import subprocess
from datetime import datetime
from pathlib import Path
from collections import deque
# ── Dependency check ──────────────────────────────────────────────────────────
try:
import keyboard
from winotify import Notification, audio
import pystray
from PIL import Image, ImageDraw, ImageTk
import schedule
import tkinter as tk
from flask import Flask, jsonify, request, send_file, abort, Response
from flask_cors import CORS
except ImportError as e:
import tkinter as tk
from tkinter import messagebox
root = tk.Tk(); root.withdraw()
messagebox.showerror("Missing dependency",
f"Run: pip install -r requirements.txt\n\nMissing: {e}")
sys.exit(1)
# ── Constants ─────────────────────────────────────────────────────────────────
HOTKEY = "F13"
WEB_PORT = 5000
DEFAULT_APP_NAME = "NotifyPulse"
WALLPAPER_MAGIC = "change.wallpaper"
OVERLAY_MAGIC = "show.overlay"
VERSION = "2.0.0"
MOBILE_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".webp", ".heic", ".heif")
SPI_SETDESKWALLPAPER = 0x0014
SPIF_UPDATEINIFILE = 0x01
SPIF_SENDCHANGE = 0x02
# ── Paths (set in main) ───────────────────────────────────────────────────────
APPDATA_DIR: Path | None = None
NOTIFICATIONS_FILE: Path | None = None
ICON_FILE: Path | None = None
WALLPAPERS_DIR: Path | None = None
OVERLAY_DIR: Path | None = None
MOBILE_DIR: Path | None = None # for PWA lockscreen/background images
SETTINGS_FILE: Path | None = None
# ── Global state ──────────────────────────────────────────────────────────────
paused: bool = False
notification_queue: list = []
state_lock = threading.Lock()
tray_icon = None
root_instance = None
APP_NAME: str = DEFAULT_APP_NAME
APP_ID: str = "NotifyPulse"
MIN_INTERVAL_SEC: int = 10 * 60
MAX_INTERVAL_SEC: int = 30 * 60
_entries: list = []
_entries_lock = threading.Lock()
_file_mtime: float = 0.0
_next_fire_at: float = 0.0
_active_overlays: list = []
_overlays_lock = threading.Lock()
_default_wallpaper: str = ""
_last_wallpaper: str = ""
_overlay_stretch: bool = False
# Connected PWA clients: {client_id: {"last_seen": float, "user_agent": str}}
_pwa_clients: dict = {}
_pwa_clients_lock = threading.Lock()
# Pending mobile wallpaper commands: {client_id: {"lockscreen": path, "background": path}}
_pending_wallpapers: dict = {}
_pending_lock = threading.Lock()
# Log ring buffer
_log: deque = deque(maxlen=200)
_log_lock = threading.Lock()
# Settings cache (persisted to settings.json)
_settings: dict = {}
_notification_wake = threading.Event()
def _ps_quote_literal(path: Path) -> str:
return str(path).replace("'", "''")
def _ps_read_file_utf8(path: Path) -> str | None:
if os.name != "nt":
return None
quoted = _ps_quote_literal(path)
cmd = (
"$ErrorActionPreference='Stop';"
f"$p='{quoted}';"
"if (Test-Path -LiteralPath $p) {"
" $b=[IO.File]::ReadAllBytes($p);"
" [Console]::Out.Write('1:' + [Convert]::ToBase64String($b));"
"} else {"
" [Console]::Out.Write('0:');"
"}"
)
try:
out = subprocess.check_output(
["powershell", "-NoProfile", "-Command", cmd],
encoding="ascii",
errors="ignore",
).strip()
if not out.startswith("1:"):
return None
payload = out[2:]
if payload == "":
return ""
return base64.b64decode(payload).decode("utf-8")
except Exception:
return None
def _ps_write_file_utf8(path: Path, content: str) -> bool:
if os.name != "nt":
return False
quoted = _ps_quote_literal(path)
payload = base64.b64encode(content.encode("utf-8")).decode("ascii")
cmd = (
"$ErrorActionPreference='Stop';"
f"$p='{quoted}';"
"$d=[IO.Path]::GetDirectoryName($p);"
"if ($d) { [IO.Directory]::CreateDirectory($d) | Out-Null };"
f"[IO.File]::WriteAllBytes($p,[Convert]::FromBase64String('{payload}'));"
)
try:
subprocess.check_call(
["powershell", "-NoProfile", "-Command", cmd],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return True
except Exception:
return False
def _real_appdata_dir() -> Path:
return Path(os.environ.get("USERPROFILE", str(Path.home()))) / "AppData" / "Roaming" / "NotifyPulse"
def notifications_file_exists() -> bool:
p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt")
if os.name == "nt":
quoted = _ps_quote_literal(p)
cmd = f"$p='{quoted}'; if (Test-Path -LiteralPath $p) {{ '1' }} else {{ '0' }}"
try:
out = subprocess.check_output(
["powershell", "-NoProfile", "-Command", cmd],
encoding="ascii",
errors="ignore",
).strip()
return out == "1"
except Exception:
pass
return p.exists()
def notifications_file_mtime() -> float:
p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt")
if os.name == "nt":
quoted = _ps_quote_literal(p)
cmd = (
"$ErrorActionPreference='Stop';"
f"$p='{quoted}';"
"if (Test-Path -LiteralPath $p) {"
" [Console]::Out.Write((Get-Item -LiteralPath $p).LastWriteTimeUtc.ToFileTimeUtc());"
"}"
)
try:
out = subprocess.check_output(
["powershell", "-NoProfile", "-Command", cmd],
encoding="ascii",
errors="ignore",
).strip()
if out:
return float(out)
except Exception:
pass
try:
return p.stat().st_mtime
except Exception:
return 0.0
def read_notifications_text() -> str:
p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt")
text = _ps_read_file_utf8(p)
if text is not None:
return text
return p.read_text("utf-8")
def write_notifications_text(content: str):
p = NOTIFICATIONS_FILE if NOTIFICATIONS_FILE else (_real_appdata_dir() / "notifications.txt")
if _ps_write_file_utf8(p, content):
return
p.write_text(content, "utf-8")
class BackgroundWorker:
def __init__(self):
self.queue = Queue()
self.thread: threading.Thread | None = None
self.lock = threading.Lock()
def enqueue(self, fn, *args, **kwargs):
self.queue.put((fn, args, kwargs))
self._ensure_thread()
def _ensure_thread(self):
with self.lock:
if self.thread and self.thread.is_alive():
return
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
def _run(self):
while True:
try:
fn, args, kwargs = self.queue.get(timeout=5)
except Empty:
with self.lock:
if self.queue.empty():
self.thread = None
return
continue
try:
fn(*args, **kwargs)
except Exception as exc:
log(f"Background task error: {exc}", "error")
finally:
self.queue.task_done()
_background_worker = BackgroundWorker()
def enqueue_background(fn, *args, **kwargs):
_background_worker.enqueue(fn, *args, **kwargs)
# ── Logging ───────────────────────────────────────────────────────────────────
def log(msg: str, level: str = "info"):
ts = datetime.now().strftime("%H:%M:%S")
entry = {"time": ts, "msg": msg, "level": level}
with _log_lock:
_log.appendleft(entry)
# ── Settings persistence ──────────────────────────────────────────────────────
DEFAULT_SETTINGS = {
"overlay_stretch": False,
"overlay_opacity": 0.4,
"overlay_duration": 6,
"startup_toast": True,
"hotkey": "F13",
"web_port": 5000,
"theme": "dark",
"notify_sound": True,
"mobile_wallpaper_dir": "", # relative sub-folder inside MOBILE_DIR
"pwa_token": "", # simple shared secret for PWA auth (empty = no auth)
"auto_open_browser": True,
"log_maxlen": 200,
}
def load_settings() -> dict:
global _settings
if SETTINGS_FILE and SETTINGS_FILE.exists():
try:
raw = json.loads(SETTINGS_FILE.read_text("utf-8"))
_settings = {**DEFAULT_SETTINGS, **raw}
return _settings
except Exception:
pass
_settings = dict(DEFAULT_SETTINGS)
return _settings
def save_settings():
if SETTINGS_FILE:
try:
SETTINGS_FILE.write_text(json.dumps(_settings, indent=2), "utf-8")
except Exception as e:
log(f"Settings save error: {e}", "error")
def get_setting(key: str, default=None):
return _settings.get(key, DEFAULT_SETTINGS.get(key, default))
# ── AppData setup ─────────────────────────────────────────────────────────────
def resolve_appdata_dir() -> Path:
"""
Always use the real user AppData path so edits in Explorer match the app.
If a virtualised APPDATA copy exists and is newer, import it once.
"""
env_base = Path(os.environ.get("APPDATA", "") or Path.home() / "AppData" / "Roaming")
real_base = Path(os.environ.get("USERPROFILE", str(Path.home()))) / "AppData" / "Roaming"
env_dir = env_base / "NotifyPulse"
real_dir = real_base / "NotifyPulse"
chosen = real_dir
try:
chosen.mkdir(parents=True, exist_ok=True)
probe = chosen / ".probe"
probe.write_text("ok", "utf-8")
probe.unlink(missing_ok=True) # type: ignore[arg-type]
# One-time import if the virtualised copy is newer
try:
src = env_dir / "notifications.txt"
dst = chosen / "notifications.txt"
if src.exists():
if (not dst.exists()) or src.stat().st_mtime > dst.stat().st_mtime:
dst.write_bytes(src.read_bytes())
except Exception:
pass
return chosen
except Exception as exc:
root = tk.Tk(); root.withdraw()
from tkinter import messagebox
messagebox.showerror("NotifyPulse V2", f"Cannot create config folder:\n{chosen}\n\n{exc}")
sys.exit(1)
# ── Config file ───────────────────────────────────────────────────────────────
class Entry:
__slots__ = ("text", "weight", "trigger_time")
def __init__(self, text, weight, trigger_time):
self.text = text
self.weight = weight
self.trigger_time = trigger_time
def create_default_config():
write_notifications_text(
"# NotifyPulse V2 - notifications.txt\n"
"# Reloads automatically on save!\n"
"#\n"
"# @name Your App Name\n"
"# @interval 10 30 <- min/max minutes between random notifications\n"
"#\n"
"# Text message | 30% <- random, picked 30% of the time\n"
"# Text message | 14:30 <- fires daily at 14:30\n"
"# change.wallpaper | 20% <- changes PC wallpaper randomly\n"
"# change.wallpaper.mobile <- triggers mobile wallpaper via PWA\n"
"# show.overlay | 10% <- fullscreen image overlay (6s default)\n"
"# show.overlay.10 | 10% <- overlay for 10 seconds\n"
"\n"
"@name NotifyPulse\n"
"@interval 10 30\n"
"\n"
"Take a short break and stretch! | 35%\n"
"Drink some water - stay hydrated! | 30%\n"
"Check your posture! | 20%\n"
"Deep breath - you got this! | 15%\n"
"\n"
"# change.wallpaper | 20%\n"
"# show.overlay | 15%\n"
"\n"
"Morning standup time! | 09:00\n"
"Afternoon focus block | 14:00\n"
"End of day - wrap up! | 17:30\n",
)
def parse_overlay_duration(text: str) -> int:
parts = text.strip().lower().split(".")
if len(parts) >= 3:
try:
return max(1, int(parts[2])) * 1000
except ValueError:
pass
return int(get_setting("overlay_duration", 6)) * 1000
def is_overlay_entry(text: str) -> bool:
t = text.strip().lower()
return t == OVERLAY_MAGIC or t.startswith(OVERLAY_MAGIC + ".")
def is_mobile_wallpaper(text: str) -> bool:
return text.strip().lower() == "change.wallpaper.mobile"
def load_config():
log(f"Loading config from {NOTIFICATIONS_FILE}")
if not notifications_file_exists():
create_default_config()
app_name = DEFAULT_APP_NAME
min_sec = 10 * 60
max_sec = 30 * 60
entries = []
for raw in read_notifications_text().splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if line.startswith("@"):
parts = line[1:].split()
key = parts[0].lower() if parts else ""
if key == "name" and len(parts) >= 2:
app_name = " ".join(parts[1:])
elif key == "interval" and len(parts) == 3:
try:
mn = max(1, int(parts[1]))
mx = max(mn, int(parts[2]))
min_sec = mn * 60
max_sec = mx * 60
except ValueError:
pass
continue
if "|" in line:
text, tag = [p.strip() for p in line.split("|", 1)]
else:
text, tag = line, None
if tag and re.match(r"^\d{1,2}:\d{2}$", tag):
h, m = tag.split(":")
entries.append(Entry(text, None, f"{int(h):02d}:{m}"))
elif tag and tag.endswith("%"):
try:
w = float(tag[:-1])
except ValueError:
w = 1.0
entries.append(Entry(text, w, None))
else:
entries.append(Entry(text, 1.0, None))
return app_name, min_sec, max_sec, entries
def apply_config(app_name, min_sec, max_sec, entries, silent=False):
global APP_NAME, APP_ID, MIN_INTERVAL_SEC, MAX_INTERVAL_SEC, _entries, _file_mtime
APP_NAME = app_name
APP_ID = re.sub(r"[^A-Za-z0-9]", "", app_name) or "NotifyPulse"
MIN_INTERVAL_SEC = min_sec
MAX_INTERVAL_SEC = max_sec
with _entries_lock:
_entries = entries
schedule.clear()
for entry in (e for e in entries if e.trigger_time):
def make_job(e):
def job(): fire_entry(e)
return job
schedule.every().day.at(entry.trigger_time).do(make_job(entry))
_file_mtime = notifications_file_mtime()
update_tray_title()
_notification_wake.set()
if not silent:
log(f"Config reloaded — {min_sec//60}{max_sec//60}min, {len(entries)} entries")
# ── File watcher ──────────────────────────────────────────────────────────────
class ConfigFileMonitor:
def __init__(self, path: Path, on_change, min_interval=1.0, max_interval=4.0):
self.path = path
self.on_change = on_change
self.min_interval = min_interval
self.max_interval = max_interval
self._interval = min_interval
self._lock = threading.Lock()
self._timer: threading.Timer | None = None
self._active = False
def start(self):
with self._lock:
if self._active:
return
self._active = True
self._interval = self.min_interval
self._schedule(self._interval)
def stop(self):
with self._lock:
self._active = False
if self._timer:
self._timer.cancel()
self._timer = None
def _schedule(self, delay):
timer = threading.Timer(delay, self._run)
timer.daemon = True
self._timer = timer
timer.start()
def _run(self):
current = notifications_file_mtime()
changed = current is not None and current != _file_mtime
if changed:
self._interval = self.min_interval
try:
self.on_change()
except Exception as exc:
log("Config watcher error: {exc}", "error")
else:
self._interval = min(self.max_interval, self._interval * 2)
with self._lock:
if self._active:
self._schedule(self._interval)
def _reload_config_if_changed():
mtime = notifications_file_mtime()
if mtime == _file_mtime:
return
app_name, min_sec, max_sec, entries = load_config()
apply_config(app_name, min_sec, max_sec, entries)
send_toast("Config reloaded!")
_config_monitor: ConfigFileMonitor | None = None
# ── Wallpaper ─────────────────────────────────────────────────────────────────
def get_current_wallpaper() -> str:
try:
import winreg
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop")
val, _ = winreg.QueryValueEx(key, "WallPaper")
winreg.CloseKey(key)
return val
except Exception:
return ""
def set_wallpaper(path: str):
ctypes.windll.user32.SystemParametersInfoW(
SPI_SETDESKWALLPAPER, 0, path,
SPIF_UPDATEINIFILE | SPIF_SENDCHANGE)
def do_wallpaper_change():
global _last_wallpaper
if not WALLPAPERS_DIR or not WALLPAPERS_DIR.exists():
log("Wallpaper folder missing", "warn")
return
files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp")
for f in WALLPAPERS_DIR.glob(ext)]
if not files:
log("No wallpaper images found", "warn")
return
wp = str(random.choice(files))
_last_wallpaper = wp
set_wallpaper(wp)
log(f"Wallpaper → {Path(wp).name}")
# ── Mobile wallpaper (PWA push) ───────────────────────────────────────────────
def _mobile_base_dir() -> Path:
base = MOBILE_DIR
sub = str(get_setting("mobile_wallpaper_dir", "")).strip()
if sub:
candidate = (MOBILE_DIR / sub).resolve()
if candidate.exists():
base = candidate
else:
log(f"Mobile subfolder missing: {candidate}", "warn")
return base
def _list_mobile_images(folder: Path) -> list[Path]:
files = []
for ext in MOBILE_EXTS:
files.extend(folder.glob(f"*{ext}"))
return [p for p in files if p.is_file()]
def _copy_for_send(src: Path, prefix: str, cache_dir: Path) -> Path:
cache_dir.mkdir(exist_ok=True)
for old in cache_dir.glob(f"{prefix}_*"):
try:
old.unlink()
except Exception:
pass
ext = src.suffix if src.suffix else ".jpg"
rand = random.randint(1000, 9999)
dest = cache_dir / f"{prefix}_{rand}{ext.lower()}"
shutil.copyfile(src, dest)
return dest
def do_mobile_wallpaper():
"""Queue a wallpaper push for all connected PWA clients."""
if not MOBILE_DIR or not MOBILE_DIR.exists():
log("Mobile wallpaper dir missing", "warn")
return
base_dir = _mobile_base_dir()
if not base_dir.exists():
log("Mobile wallpaper dir missing", "warn")
return
images = _list_mobile_images(base_dir)
# Keep support for explicitly named pairs, otherwise pick two random files.
ls_candidates = [p for p in images if p.stem.lower() == "lockscreen"]
bg_candidates = [p for p in images if p.stem.lower() == "background"]
if not ls_candidates or not bg_candidates:
unique_images = images.copy()
if len(unique_images) >= 2:
chosen = random.sample(unique_images, 2)
elif unique_images:
chosen = [unique_images[0], unique_images[0]]
else:
log("No images in Mobile folder to send", "warn")
return
if not ls_candidates:
ls_candidates = [chosen[0]]
if not bg_candidates:
bg_candidates = [chosen[1]]
cache_dir = MOBILE_DIR / "_send_cache"
ls_path = _copy_for_send(random.choice(ls_candidates), "Lockscreen", cache_dir)
bg_path = _copy_for_send(random.choice(bg_candidates), "Background", cache_dir)
with _pwa_clients_lock:
client_ids = list(_pwa_clients.keys())
if not client_ids:
log("No PWA clients connected — wallpaper queued for next connection", "info")
with _pending_lock:
for cid in client_ids if client_ids else ["__broadcast__"]:
_pending_wallpapers[cid] = {
"lockscreen": str(ls_path),
"background": str(bg_path),
}
log(f"Mobile wallpaper queued from {base_dir}{len(client_ids)} client(s)")
# ── Screen Overlay ─────────────────────────────────────────────────────────────
def _get_monitors():
monitors = []
try:
MonitorEnumProc = ctypes.WINFUNCTYPE(
ctypes.c_bool, ctypes.c_ulong, ctypes.c_ulong,
ctypes.POINTER(ctypes.wintypes.RECT), ctypes.c_double)
def _cb(hMon, hdcMon, lprcMon, dwData):
r = lprcMon.contents
monitors.append((r.left, r.top, r.right - r.left, r.bottom - r.top))
return True
ctypes.windll.user32.EnumDisplayMonitors(None, None, MonitorEnumProc(_cb), 0)
except Exception:
pass
return monitors or [None]
def do_screen_overlay(duration_ms: int = 6000):
if not OVERLAY_DIR or not OVERLAY_DIR.exists():
log("Overlay folder missing", "warn")
return
files = [f for ext in ("*.jpg","*.jpeg","*.png","*.bmp","*.webp")
for f in OVERLAY_DIR.glob(ext)]
if not files:
log("No overlay images found", "warn")
return
img_path = str(random.choice(files))
monitors = _get_monitors()
opacity = float(get_setting("overlay_opacity", 0.4))
log(f"Overlay: {Path(img_path).name}{duration_ms//1000}s on {len(monitors)} screen(s)")
def _make_tk_image(pil_img, sw, sh):
if _overlay_stretch:
return ImageTk.PhotoImage(pil_img.resize((sw, sh), Image.Resampling.LANCZOS))
img_w, img_h = pil_img.size
scale = min(sw / img_w, sh / img_h)
nw, nh = int(img_w * scale), int(img_h * scale)
resized = pil_img.resize((nw, nh), Image.Resampling.LANCZOS)
canvas = Image.new("RGB", (sw, sh), "black")
canvas.paste(resized, ((sw - nw) // 2, (sh - nh) // 2))
return ImageTk.PhotoImage(canvas)
def create_overlay():
try:
pil_img = Image.open(img_path)
except Exception as e:
log(f"Overlay open error: {e}", "error")
return
with state_lock:
is_paused = paused
for mon in monitors:
if mon is None:
tmp = tk.Toplevel(); tmp.withdraw(); tmp.update_idletasks()
sw, sh, mx, my = tmp.winfo_screenwidth(), tmp.winfo_screenheight(), 0, 0
tmp.destroy()
else:
mx, my, sw, sh = mon
ov = tk.Toplevel()
ov.attributes("-topmost", True)
ov.attributes("-alpha", opacity)
ov.overrideredirect(True)
ov.update_idletasks()
# Click-through
hwnd = ctypes.windll.user32.GetAncestor(ov.winfo_id(), 2) or ov.winfo_id()
style = ctypes.windll.user32.GetWindowLongW(hwnd, -20)
ctypes.windll.user32.SetWindowLongW(hwnd, -20, style | 0x80000 | 0x20)
ov.geometry(f"{sw}x{sh}+{mx}+{my}")
try:
tk_img = _make_tk_image(pil_img, sw, sh)
lbl = tk.Label(ov, image=tk_img, bg="black")
lbl.image = tk_img
lbl.pack(fill="both", expand=True)
except Exception as e:
log(f"Overlay render error: {e}", "error")
ov.destroy()
continue
entry = {"window": ov, "remaining_ms": duration_ms,
"after_id": None, "started_at": time.monotonic()}
def on_dismiss(e=entry, w=ov):
with _overlays_lock:
if e in _active_overlays:
_active_overlays.remove(e)
if w.winfo_exists(): w.destroy()
def schedule_dismiss(e=entry, w=ov):
e["started_at"] = time.monotonic()
e["after_id"] = w.after(e["remaining_ms"], lambda: on_dismiss(e, w))
entry["schedule_dismiss"] = schedule_dismiss
with _overlays_lock:
_active_overlays.append(entry)
if is_paused:
ov.withdraw()
else:
schedule_dismiss()
root_instance.after(0, create_overlay)
def pause_overlays():
def _do():
with _overlays_lock:
overlays = list(_active_overlays)
for e in overlays:
ov = e["window"]
if not ov.winfo_exists(): continue
if e["after_id"] is not None:
try: ov.after_cancel(e["after_id"])
except Exception: pass
e["remaining_ms"] = max(0, e["remaining_ms"] -
int((time.monotonic() - e["started_at"]) * 1000))
e["after_id"] = None
ov.withdraw()
root_instance.after(0, _do)
def resume_overlays():
def _do():
with _overlays_lock:
overlays = list(_active_overlays)
for e in overlays:
ov = e["window"]
if not ov.winfo_exists():
with _overlays_lock:
if e in _active_overlays: _active_overlays.remove(e)
continue
if e["remaining_ms"] <= 0:
ov.destroy()
with _overlays_lock:
if e in _active_overlays: _active_overlays.remove(e)
continue
ov.deiconify()
e["schedule_dismiss"]()
root_instance.after(0, _do)
# ── Icon ──────────────────────────────────────────────────────────────────────
def make_default_icon(size=256):
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
d = ImageDraw.Draw(img)
s = size / 64
d.ellipse([8*s, 20*s, 56*s, 56*s], fill="#4f8ef7")
d.polygon([(32*s, 4*s), (16*s, 24*s), (48*s, 24*s)], fill="#4f8ef7")
d.rectangle([24*s, 56*s, 40*s, 62*s], fill="#4f8ef7")
return img
def load_tray_icon():
if ICON_FILE and ICON_FILE.exists():
try:
return Image.open(ICON_FILE).convert("RGBA")
except Exception:
pass
return make_default_icon()
# ── Toast ─────────────────────────────────────────────────────────────────────
def send_toast(message: str, title: str | None = None):
icon_path = ""
if ICON_FILE and ICON_FILE.exists():
icon_path = str(ICON_FILE)
elif APPDATA_DIR and (APPDATA_DIR / "icon.ico").exists():
icon_path = str(APPDATA_DIR / "icon.ico")
t = title or APP_NAME
notif = Notification(app_id=APP_ID, title=t, msg=message,
duration="short", icon=icon_path)
if get_setting("notify_sound", True):
notif.set_audio(audio.Default, loop=False)
notif.show()
log(f"Toast: {message}")
# ── Notification logic ────────────────────────────────────────────────────────
def pick_weighted(entries):
weighted = [e for e in entries if e.weight is not None]
if not weighted: return None
return random.choices(weighted, weights=[e.weight for e in weighted], k=1)[0]
def fire_entry(entry: Entry):
text = entry.text.strip()
with state_lock:
if paused:
notification_queue.append(text)
log(f"Queued (paused): {text}")
return
_dispatch(text)
def _dispatch(text: str):
tl = text.lower()
if tl == WALLPAPER_MAGIC:
enqueue_background(do_wallpaper_change)
elif is_mobile_wallpaper(tl):
enqueue_background(do_mobile_wallpaper)
elif is_overlay_entry(tl):
dur = parse_overlay_duration(tl)
enqueue_background(do_screen_overlay, dur)
else:
send_toast(text)
def flush_queued(queued: list):
for item in queued:
_dispatch(item)
time.sleep(1.2)
def notification_loop():
global _next_fire_at
while True:
with _entries_lock:
entries = list(_entries)
chosen = pick_weighted(entries)
if chosen:
fire_entry(chosen)
interval = random.randint(MIN_INTERVAL_SEC, MAX_INTERVAL_SEC)
_next_fire_at = time.time() + interval
log(f"Next in {interval//60}m {interval%60}s")
while True:
remaining = _next_fire_at - time.time()
if remaining <= 0:
break
if _notification_wake.is_set():
_notification_wake.clear()
break
wait_time = min(remaining, 5)
triggered = _notification_wake.wait(wait_time)
if triggered:
_notification_wake.clear()
break
def schedule_runner():
while True:
schedule.run_pending()
time.sleep(10)
# ── Pause / Resume ────────────────────────────────────────────────────────────
def toggle_pause():
global paused
queued = []
with state_lock:
paused = not paused
if not paused:
queued = notification_queue.copy()
notification_queue.clear()
if paused:
log("Paused")
pause_overlays()
if _default_wallpaper:
threading.Thread(target=set_wallpaper, args=(_default_wallpaper,), daemon=True).start()
else:
log("Resumed")
resume_overlays()
if _last_wallpaper:
threading.Thread(target=set_wallpaper, args=(_last_wallpaper,), daemon=True).start()
threading.Thread(target=flush_queued, args=(queued,), daemon=True).start()
update_tray_title()
# ── Tray ──────────────────────────────────────────────────────────────────────
def update_tray_title():
if tray_icon:
tray_icon.title = f"{APP_NAME}{'PAUSED' if paused else 'Running'}"
tray_icon.update_menu()
def build_tray_menu():
def status_text(item):
hotkey = get_setting("hotkey", HOTKEY)
state = "Paused" if paused else "Running"
return f"{state} (Hotkey: {hotkey})"
return pystray.Menu(
pystray.MenuItem(status_text, lambda i, it: toggle_pause()),
pystray.MenuItem("Open Web UI", lambda i, it: webbrowser.open(f"http://localhost:{WEB_PORT}")),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Quit", lambda i, it: (i.stop(), os._exit(0))),
)
def run_tray():
global tray_icon
tray_icon = pystray.Icon(APP_NAME, load_tray_icon(),
f"{APP_NAME} — Running", menu=build_tray_menu())
tray_icon.run()
# ── Flask API ─────────────────────────────────────────────────────────────────
flask_app = Flask(__name__, static_folder=None)
CORS(flask_app, resources={r"/api/*": {"origins": "*"}, r"/pwa*": {"origins": "*"}})
# Silence werkzeug
import logging as _logging
_logging.getLogger("werkzeug").setLevel(_logging.ERROR)
# ─ Helper: image to base64 data URI ──────────────────────────────────────────
def _img_to_datauri(path: str) -> str | None:
try:
p = Path(path)
mime = mimetypes.guess_type(p.name)[0] or "image/jpeg"
data = base64.b64encode(p.read_bytes()).decode()
return f"data:{mime};base64,{data}"
except Exception:
return None
# ─ PWA heartbeat / client tracking ───────────────────────────────────────────
@flask_app.route("/api/pwa/ping", methods=["POST"])
def pwa_ping():
data = request.get_json(silent=True) or {}
cid = data.get("client_id", "")
if not cid:
return jsonify({"error": "no client_id"}), 400
ua = request.headers.get("User-Agent", "")
with _pwa_clients_lock:
_pwa_clients[cid] = {"last_seen": time.time(), "user_agent": ua}
# Expire old clients (> 60s)
cutoff = time.time() - 60
with _pwa_clients_lock:
expired = [k for k, v in _pwa_clients.items() if v["last_seen"] < cutoff]
for k in expired:
del _pwa_clients[k]
return jsonify({"ok": True, "clients": len(_pwa_clients)})
# ─ Mobile wallpaper poll ─────────────────────────────────────────────────────
@flask_app.route("/api/pwa/wallpaper", methods=["GET"])
def pwa_wallpaper_poll():
"""PWA polls this; returns base64 images if a wallpaper change was queued."""
cid = request.args.get("client_id", "")
if not cid:
return jsonify({"pending": False})
with _pending_lock:
# Check specific client first, then broadcast
cmd = _pending_wallpapers.pop(cid, None) or _pending_wallpapers.pop("__broadcast__", None)
if not cmd:
return jsonify({"pending": False})
result = {"pending": True}
if cmd.get("lockscreen"):
uri = _img_to_datauri(cmd["lockscreen"])
if uri:
result["lockscreen"] = uri
if cmd.get("background"):
uri = _img_to_datauri(cmd["background"])
if uri:
result["background"] = uri
log(f"Served mobile wallpaper to PWA client {cid[:8]}")
return jsonify(result)
# ─ PWA splash image ────────────────────────────────────────────────────────────────
@flask_app.route("/api/pwa/splash_image")
def pwa_splash_image():
try:
base_dir = _mobile_base_dir()
if not base_dir.exists():
return jsonify({"image": ""})
images = _list_mobile_images(base_dir)
if not images:
return jsonify({"image": ""})
chosen = random.choice(images)
datauri = _img_to_datauri(str(chosen))
return jsonify({"image": datauri or ""})
except Exception as exc:
log(f"Splash image error: {exc}", "warn")
return jsonify({"image": ""})
# ─ Trigger mobile wallpaper from PWA UI ─────────────────────────────────────
@flask_app.route("/api/pwa/trigger_wallpaper", methods=["POST"])
def pwa_trigger_wallpaper():
enqueue_background(do_mobile_wallpaper)
return jsonify({"ok": True})
# ─ State API ─────────────────────────────────────────────────────────────────
@flask_app.route("/api/pwa/app_name")
def pwa_app_name():
"""Lightweight endpoint so the PWA can render the app name immediately."""
resp = jsonify({"app_name": APP_NAME})
resp.headers["Cache-Control"] = "no-store"
return resp
@flask_app.route("/api/state")
def api_state():
with _entries_lock:
entries_data = [{"text": e.text, "weight": e.weight, "trigger_time": e.trigger_time}
for e in _entries]
with _log_lock:
log_data = list(_log)
with _pwa_clients_lock:
pwa_count = len(_pwa_clients)
return jsonify({
"app_name": APP_NAME,
"version": VERSION,
"paused": paused,
"min_interval": MIN_INTERVAL_SEC // 60,
"max_interval": MAX_INTERVAL_SEC // 60,
"next_fire_at": _next_fire_at,
"entries": entries_data,
"log": log_data,
"overlay_stretch": _overlay_stretch,
"pwa_clients": pwa_count,
"settings": _settings,
"config_path": str(NOTIFICATIONS_FILE),
})
@flask_app.route("/api/pause", methods=["POST"])
def api_pause():
toggle_pause()
return jsonify({"paused": paused})
@flask_app.route("/api/fire_now", methods=["POST"])
def api_fire_now():
"""Force fire the next random notification immediately."""
with _entries_lock:
entries = list(_entries)
chosen = pick_weighted(entries)
if chosen:
threading.Thread(target=fire_entry, args=(chosen,), daemon=True).start()
return jsonify({"ok": True, "fired": chosen.text})
return jsonify({"ok": False, "error": "No entries"})
@flask_app.route("/api/test_notification", methods=["POST"])
def api_test_notif():
data = request.get_json(silent=True) or {}
msg = data.get("message", "Test notification from NotifyPulse!")
send_toast(msg)
return jsonify({"ok": True})
@flask_app.route("/api/test_wallpaper", methods=["POST"])
def api_test_wallpaper():
enqueue_background(do_wallpaper_change)
return jsonify({"ok": True})
@flask_app.route("/api/test_overlay", methods=["POST"])
def api_test_overlay():
enqueue_background(do_screen_overlay)
return jsonify({"ok": True})
@flask_app.route("/api/test_mobile_wallpaper", methods=["POST"])
def api_test_mobile_wallpaper():
enqueue_background(do_mobile_wallpaper)
return jsonify({"ok": True})
@flask_app.route("/api/settings", methods=["GET", "POST"])
def api_settings():
global MIN_INTERVAL_SEC, MAX_INTERVAL_SEC, APP_NAME, _overlay_stretch, _next_fire_at
if request.method == "GET":
return jsonify(_settings)
try:
data = request.get_json(silent=True) or {}
changed = False
if "name" in data:
name = data["name"].strip()
if name:
APP_NAME = name
_settings["name"] = name
changed = True
if "min_interval" in data or "max_interval" in data:
mn = max(1, int(data.get("min_interval", MIN_INTERVAL_SEC // 60)))
mx = max(mn, int(data.get("max_interval", MAX_INTERVAL_SEC // 60)))
MIN_INTERVAL_SEC = mn * 60
MAX_INTERVAL_SEC = mx * 60
_settings["min_interval"] = mn
_settings["max_interval"] = mx
interval = random.randint(MIN_INTERVAL_SEC, MAX_INTERVAL_SEC)
_next_fire_at = time.time() + interval
log(f"Interval → {mn}{mx}min, timer reset")
_notification_wake.set()
changed = True
# Boolean/generic settings passthrough
for key in ("overlay_stretch", "overlay_opacity", "overlay_duration",
"startup_toast", "hotkey", "theme", "notify_sound",
"auto_open_browser", "pwa_token"):
if key in data:
val = data[key]
_settings[key] = val
if key == "overlay_stretch":
_overlay_stretch = bool(val)
changed = True
# Persist to notifications.txt if name/interval changed
if "name" in data or "min_interval" in data or "max_interval" in data:
_patch_notifications_file(
_settings.get("name", APP_NAME),
MIN_INTERVAL_SEC // 60,
MAX_INTERVAL_SEC // 60)
if changed:
save_settings()
update_tray_title()
return jsonify({"ok": True})
except Exception as ex:
return jsonify({"ok": False, "error": str(ex)}), 500
def _patch_notifications_file(name: str, min_m: int, max_m: int):
try:
lines = read_notifications_text().splitlines()
new_lines = []
found_name = found_interval = False
for line in lines:
s = line.strip()
if s.startswith("@name"):
new_lines.append(f"@name {name}"); found_name = True
elif s.startswith("@interval"):
new_lines.append(f"@interval {min_m} {max_m}"); found_interval = True
else:
new_lines.append(line)
if not found_name: new_lines.insert(0, f"@name {name}")
if not found_interval: new_lines.insert(1, f"@interval {min_m} {max_m}")
write_notifications_text("\n".join(new_lines) + "\n")
except Exception as e:
log(f"Patch config error: {e}", "error")
@flask_app.route("/api/entries", methods=["GET", "POST"])
def api_entries():
"""Read or replace the raw notifications.txt content."""
if request.method == "GET":
try:
return jsonify({"content": read_notifications_text()})
except Exception as e:
return jsonify({"error": str(e)}), 500
else:
data = request.get_json(silent=True) or {}
content = data.get("content", "")
try:
write_notifications_text(content)
return jsonify({"ok": True})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@flask_app.route("/api/log")
def api_log():
limit = min(int(request.args.get("limit", 100)), 200)
with _log_lock:
return jsonify(list(_log)[:limit])
# ─ PWA static files ──────────────────────────────────────────────────────────
PWA_DIR = Path(__file__).parent / "pwa"
@flask_app.route("/pwa")
@flask_app.route("/pwa/")
def pwa_index():
p = PWA_DIR / "index.html"
if p.exists(): return send_file(p)
return "PWA files not found. Place them in the pwa/ folder.", 404
@flask_app.route("/pwa/<path:filename>")
def pwa_static(filename):
p = PWA_DIR / filename
if p.exists() and p.is_file(): return send_file(p)
abort(404)
# Desktop web UI served at /
@flask_app.route("/")
def desktop_index():
p = Path(__file__).parent / "ui" / "index.html"
if p.exists(): return send_file(p)
return INLINE_HTML # fallback inline
# ─ Inline fallback HTML (minified at bottom) ─────────────────────────────────
INLINE_HTML = "" # filled after
def run_flask():
flask_app.run(host="0.0.0.0", port=WEB_PORT, debug=False,
use_reloader=False, threaded=True)
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
global APP_NAME, APP_ID, APPDATA_DIR, NOTIFICATIONS_FILE, ICON_FILE
global WALLPAPERS_DIR, OVERLAY_DIR, MOBILE_DIR, SETTINGS_FILE
global _default_wallpaper, _last_wallpaper, _file_mtime, _next_fire_at, _config_monitor
global root_instance, _overlay_stretch
APPDATA_DIR = resolve_appdata_dir()
log(f"Using config dir: {APPDATA_DIR}")
NOTIFICATIONS_FILE = APPDATA_DIR / "notifications.txt"
ICON_FILE = APPDATA_DIR / "icon.png"
WALLPAPERS_DIR = APPDATA_DIR / "wallpapers"
OVERLAY_DIR = APPDATA_DIR / "Overlay"
MOBILE_DIR = APPDATA_DIR / "Mobile"
SETTINGS_FILE = APPDATA_DIR / "settings.json"
for d in (WALLPAPERS_DIR, OVERLAY_DIR, MOBILE_DIR):
d.mkdir(exist_ok=True)
load_settings()
_overlay_stretch = get_setting("overlay_stretch", False)
_default_wallpaper = get_current_wallpaper()
app_name, min_sec, max_sec, entries = load_config()
apply_config(app_name, min_sec, max_sec, entries, silent=True)
_next_fire_at = time.time() + random.randint(min_sec, max_sec)
_config_monitor = ConfigFileMonitor(NOTIFICATIONS_FILE, _reload_config_if_changed)
_config_monitor.start()
# Hotkey
try:
hotkey = get_setting("hotkey", "F13")
keyboard.add_hotkey(hotkey, toggle_pause, suppress=True)
except Exception as e:
log(f"Hotkey error: {e}", "warn")
root_instance = tk.Tk()
root_instance.withdraw()
threading.Thread(target=notification_loop, daemon=True).start()
threading.Thread(target=schedule_runner, daemon=True).start()
threading.Thread(target=run_flask, daemon=True).start()
log(f"NotifyPulse V{VERSION} started — {min_sec//60}{max_sec//60}min")
if get_setting("startup_toast", True):
send_toast(f"V{VERSION} running! Web UI → localhost:{WEB_PORT}")
if get_setting("auto_open_browser", True):
threading.Timer(1.5, lambda: webbrowser.open(f"http://localhost:{WEB_PORT}")).start()
threading.Thread(target=run_tray, daemon=True).start()
root_instance.mainloop()
if __name__ == "__main__":
main()