# NotifyPulse - Background notification app # Config folder: AppData/Roaming/NotifyPulse/ # Web UI: http://localhost:5000 import sys import os import ctypes import ctypes.wintypes import threading import time import random import re import json import webbrowser import tkinter as tk from datetime import datetime from pathlib import Path from collections import deque # ── Third-party imports ────────────────────────────────────────────────────── try: import keyboard from winotify import Notification, audio import pystray from PIL import Image, ImageDraw, ImageTk import schedule from flask import Flask, jsonify, request, Response except ImportError as e: root = tk.Tk() root.withdraw() from tkinter import messagebox messagebox.showerror( "Missing dependency", f"Install requirements first:\n\npip install -r requirements.txt\n\nError: {e}", ) sys.exit(1) HOTKEY = "F13" WEB_PORT = 5000 DEFAULT_APP_NAME = "NotifyPulse" WALLPAPER_MAGIC = "change.wallpaper" OVERLAY_MAGIC = "show.overlay" # also matches show.overlay.N def parse_overlay_duration(text: str) -> int: """Return overlay duration in ms from text like 'show.overlay' or 'show.overlay.10'.""" parts = text.strip().lower().split(".") # parts: ['show', 'overlay'] or ['show', 'overlay', 'N'] if len(parts) >= 3: try: return max(1, int(parts[2])) * 1000 except ValueError: pass return 6000 # default 6 s def is_overlay_entry(text: str) -> bool: t = text.strip().lower() return t == OVERLAY_MAGIC or t.startswith(OVERLAY_MAGIC + ".") # ── Global state ────────────────────────────────────────────────────────────── paused: bool = False notification_queue = [] state_lock = threading.Lock() tray_icon = None APP_NAME: str = DEFAULT_APP_NAME APP_ID: str = "NotifyPulse" APPDATA_DIR: Path = None NOTIFICATIONS_FILE: Path = None ICON_FILE: Path = None # Active overlays: list of {"window": tk.Toplevel, "remaining_ms": int, "after_id": str} _active_overlays: list = [] _overlays_lock = threading.Lock() WALLPAPERS_DIR: Path = None OVERLAY_DIR: Path = None _default_wallpaper: str = "" _last_wallpaper: str = "" _overlay_stretch: bool = False # True = stretch to fill, False = fit + letterbox MIN_INTERVAL_SEC: int = 10 * 60 MAX_INTERVAL_SEC: int = 30 * 60 _entries = [] _entries_lock = threading.Lock() _file_mtime = 0.0 # Countdown tracking _next_fire_at: float = 0.0 # unix timestamp of next notification # Log (last 100 events) _log: deque = deque(maxlen=100) _log_lock = threading.Lock() def log(msg: str): ts = datetime.now().strftime("%H:%M:%S") with _log_lock: _log.appendleft({"time": ts, "msg": msg}) # ── AppData setup ───────────────────────────────────────────────────────────── def resolve_appdata_dir() -> Path: appdata_env = os.environ.get("APPDATA", "").strip() base = Path(appdata_env) if appdata_env else Path.home() / "AppData" / "Roaming" folder = base / "NotifyPulse" try: folder.mkdir(parents=True, exist_ok=True) probe = folder / ".probe" probe.write_text("ok", encoding="utf-8") probe.unlink() return folder except Exception as exc: root = tk.Tk() root.withdraw() from tkinter import messagebox messagebox.showerror("NotifyPulse - Startup Error", f"Cannot create config folder:\n\n{folder}\n\nError: {exc}") sys.exit(1) # ── Config file ─────────────────────────────────────────────────────────────── class Entry: def __init__(self, text, weight, trigger_time): self.text = text self.weight = weight self.trigger_time = trigger_time def create_default_config(): NOTIFICATIONS_FILE.write_text( "# NotifyPulse - notifications.txt\n" "# Changes are picked up automatically - no restart needed!\n" "#\n" "# @name Your App Name <- name shown in tray + notifications\n" "# @interval 10 30 <- min and max minutes between notifications\n" "#\n" "# Text | 30% <- random, picked 30% of the time\n" "# Text | 14:30 <- fires every day at 14:30\n" "# change.wallpaper | 20% <- picks a random wallpaper\n" "# show.overlay | 10% <- shows random image from Overlay folder (6s default)\n" "# show.overlay.10 | 10% <- shows overlay for 10 seconds\n" "\n" "@name NotifyPulse\n" "@interval 10 30\n" "\n" "Take a short break and stretch! | 35%\n" "Drink some water - stay hydrated! | 30%\n" "Check your posture! | 20%\n" "Deep breath - you got this! | 15%\n" "\n" "# change.wallpaper | 20%\n" "# show.overlay | 15%\n" "\n" "Morning standup time! | 09:00\n" "Afternoon focus block | 14:00\n" "End of day - wrap up! | 17:30\n", encoding="utf-8", ) def load_config(): if not NOTIFICATIONS_FILE.exists(): create_default_config() app_name = DEFAULT_APP_NAME min_sec = 10 * 60 max_sec = 30 * 60 entries = [] with open(NOTIFICATIONS_FILE, encoding="utf-8") as f: for raw in f: line = raw.strip() if not line or line.startswith("#"): continue if line.startswith("@"): parts = line[1:].split() key = parts[0].lower() if parts else "" if key == "name" and len(parts) >= 2: app_name = " ".join(parts[1:]) elif key == "interval" and len(parts) == 3: try: mn = max(1, int(parts[1])) mx = max(mn, int(parts[2])) min_sec = mn * 60 max_sec = mx * 60 except ValueError: pass continue if "|" in line: text, tag = [p.strip() for p in line.split("|", 1)] else: text, tag = line, None if tag and re.match(r"^\d{1,2}:\d{2}$", tag): h, m = tag.split(":") tag = f"{int(h):02d}:{m}" entries.append(Entry(text, None, tag)) elif tag and tag.endswith("%"): try: w = float(tag[:-1]) except ValueError: w = 1.0 entries.append(Entry(text, w, None)) else: entries.append(Entry(text, 1.0, None)) return app_name, min_sec, max_sec, entries def apply_config(app_name, min_sec, max_sec, entries, silent=False): global APP_NAME, APP_ID, MIN_INTERVAL_SEC, MAX_INTERVAL_SEC, _entries, _file_mtime APP_NAME = app_name APP_ID = app_name.replace(" ", "") MIN_INTERVAL_SEC = min_sec MAX_INTERVAL_SEC = max_sec with _entries_lock: _entries = entries schedule.clear() for entry in [e for e in entries if e.trigger_time]: def make_job(e): def job(): fire_entry(e) return job schedule.every().day.at(entry.trigger_time).do(make_job(entry)) _file_mtime = NOTIFICATIONS_FILE.stat().st_mtime update_tray_title() if not silent: log(f"Config reloaded - interval {min_sec//60}-{max_sec//60}min, {len(entries)} entries") # ── File watcher ────────────────────────────────────────────────────────────── def file_watcher(): while True: time.sleep(5) try: mtime = NOTIFICATIONS_FILE.stat().st_mtime if mtime != _file_mtime: app_name, min_sec, max_sec, entries = load_config() apply_config(app_name, min_sec, max_sec, entries) send_toast("Config reloaded!") except Exception: pass # ── Wallpaper ───────────────────────────────────────────────────────────────── SPI_SETDESKWALLPAPER = 0x0014 SPIF_UPDATEINIFILE = 0x01 SPIF_SENDCHANGE = 0x02 def get_current_wallpaper() -> str: try: import winreg key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop") val, _ = winreg.QueryValueEx(key, "WallPaper") winreg.CloseKey(key) return val except Exception: return "" def set_wallpaper(path: str): ctypes.windll.user32.SystemParametersInfoW( SPI_SETDESKWALLPAPER, 0, path, SPIF_UPDATEINIFILE | SPIF_SENDCHANGE) def do_wallpaper_change(): global _last_wallpaper if not WALLPAPERS_DIR or not WALLPAPERS_DIR.exists(): send_toast(f"Wallpaper folder not found!") return files = [] for ext in ["*.jpg", "*.jpeg", "*.png", "*.bmp", "*.webp"]: files.extend(WALLPAPERS_DIR.glob(ext)) if not files: send_toast(f"No images found in wallpapers folder!") return wp = str(random.choice(files)) _last_wallpaper = wp set_wallpaper(wp) log(f"Wallpaper changed: {Path(wp).name}") # ── Screen Overlay ──────────────────────────────────────────────────────────── def _get_monitors(): """Return list of (x, y, w, h) tuples for every connected monitor.""" monitors = [] try: # EnumDisplayMonitors via ctypes MonitorEnumProc = ctypes.WINFUNCTYPE( ctypes.c_bool, ctypes.c_ulong, # hMonitor ctypes.c_ulong, # hdcMonitor ctypes.POINTER(ctypes.wintypes.RECT), ctypes.c_double, # dwData ) def _cb(hMon, hdcMon, lprcMon, dwData): r = lprcMon.contents monitors.append((r.left, r.top, r.right - r.left, r.bottom - r.top)) return True ctypes.windll.user32.EnumDisplayMonitors(None, None, MonitorEnumProc(_cb), 0) except Exception: pass if not monitors: # Fallback: single primary screen via tkinter (filled in create_overlay) monitors = [None] return monitors def do_screen_overlay(duration_ms: int = 6000): if not OVERLAY_DIR or not OVERLAY_DIR.exists(): send_toast("Overlay folder not found!") return files = [] for ext in ["*.jpg", "*.jpeg", "*.png", "*.bmp", "*.webp"]: files.extend(OVERLAY_DIR.glob(ext)) if not files: send_toast("No images found in Overlay folder!") return img_path = str(random.choice(files)) monitors = _get_monitors() log(f"Showing overlay: {Path(img_path).name} for {duration_ms//1000}s on {len(monitors)} screen(s)") def _make_tk_image(pil_img, sw, sh): """Resize image for a monitor. Stretch to fill, or fit with letterbox.""" if _overlay_stretch: resized = pil_img.resize((sw, sh), Image.Resampling.LANCZOS) return ImageTk.PhotoImage(resized) else: img_w, img_h = pil_img.size scale = min(sw / img_w, sh / img_h) new_w = int(img_w * scale) new_h = int(img_h * scale) resized = pil_img.resize((new_w, new_h), Image.Resampling.LANCZOS) canvas = Image.new("RGB", (sw, sh), "black") canvas.paste(resized, ((sw - new_w) // 2, (sh - new_h) // 2)) return ImageTk.PhotoImage(canvas) def create_overlay(): try: pil_img = Image.open(img_path) except Exception as e: log(f"Overlay error opening image: {e}") return with state_lock: is_paused = paused for mon in monitors: # Determine monitor geometry if mon is None: # Fallback to primary screen tmp = tk.Toplevel(); tmp.withdraw(); tmp.update_idletasks() sw, sh, mx, my = tmp.winfo_screenwidth(), tmp.winfo_screenheight(), 0, 0 tmp.destroy() else: mx, my, sw, sh = mon ov = tk.Toplevel() ov.attributes("-topmost", True) ov.attributes("-alpha", 0.4) ov.overrideredirect(True) ov.update_idletasks() # Make click-through h_wnd = ctypes.windll.user32.GetAncestor(ov.winfo_id(), 2) if not h_wnd: h_wnd = ov.winfo_id() style = ctypes.windll.user32.GetWindowLongW(h_wnd, -20) ctypes.windll.user32.SetWindowLongW(h_wnd, -20, style | 0x80000 | 0x20) ov.geometry(f"{sw}x{sh}+{mx}+{my}") try: tk_img = _make_tk_image(pil_img, sw, sh) lbl = tk.Label(ov, image=tk_img, bg='black') lbl.image = tk_img lbl.pack(fill="both", expand=True) except Exception as e: log(f"Overlay render error: {e}") ov.destroy() continue entry = { "window": ov, "remaining_ms": duration_ms, "after_id": None, "started_at": time.monotonic(), } def on_dismiss(e=entry, w=ov): with _overlays_lock: if e in _active_overlays: _active_overlays.remove(e) if w.winfo_exists(): w.destroy() def schedule_dismiss(e=entry, w=ov): e["started_at"] = time.monotonic() e["after_id"] = w.after(e["remaining_ms"], lambda: on_dismiss(e, w)) entry["schedule_dismiss"] = schedule_dismiss with _overlays_lock: _active_overlays.append(entry) if is_paused: ov.withdraw() else: schedule_dismiss() root_instance.after(0, create_overlay) def pause_overlays(): """Hide all active overlays and freeze their remaining time.""" def _do(): with _overlays_lock: overlays = list(_active_overlays) for entry in overlays: ov = entry["window"] if not ov.winfo_exists(): continue if entry["after_id"] is not None: try: ov.after_cancel(entry["after_id"]) except Exception: pass elapsed_ms = int((time.monotonic() - entry["started_at"]) * 1000) entry["remaining_ms"] = max(0, entry["remaining_ms"] - elapsed_ms) entry["after_id"] = None ov.withdraw() root_instance.after(0, _do) def resume_overlays(): """Show all paused overlays and restart their timers with remaining time.""" def _do(): with _overlays_lock: overlays = list(_active_overlays) for entry in overlays: ov = entry["window"] if not ov.winfo_exists(): with _overlays_lock: if entry in _active_overlays: _active_overlays.remove(entry) continue if entry["remaining_ms"] <= 0: ov.destroy() with _overlays_lock: if entry in _active_overlays: _active_overlays.remove(entry) continue ov.deiconify() entry["schedule_dismiss"]() root_instance.after(0, _do) # ── Icon ────────────────────────────────────────────────────────────────────── def prepare_icons(): if not ICON_FILE.exists(): return None try: img = Image.open(ICON_FILE).convert("RGBA") ico = APPDATA_DIR / "icon.ico" sizes = [256, 128, 64, 48, 32, 16] frames = [] for s in sizes: f = img.copy(); f.thumbnail((s, s), Image.LANCZOS); frames.append(f) frames[0].save(ico, format="ICO", sizes=[(s,s) for s in sizes], append_images=frames[1:]) return img except Exception: return None def make_default_icon(): size = 256 img = Image.new("RGBA", (size, size), (0, 0, 0, 0)) d = ImageDraw.Draw(img) s = size / 64 d.ellipse([8*s, 20*s, 56*s, 56*s], fill="#4f8ef7") d.polygon([(32*s, 4*s), (16*s, 24*s), (48*s, 24*s)], fill="#4f8ef7") d.rectangle([24*s, 56*s, 40*s, 62*s], fill="#4f8ef7") return img def load_tray_icon(): img = prepare_icons() return img if img is not None else make_default_icon() # ── Toast ───────────────────────────────────────────────────────────────────── def send_toast(message: str): if ICON_FILE and ICON_FILE.exists(): icon_path = str(ICON_FILE) elif APPDATA_DIR and (APPDATA_DIR / "icon.ico").exists(): icon_path = str(APPDATA_DIR / "icon.ico") else: icon_path = "" notif = Notification(app_id=APP_ID, title=APP_NAME, msg=message, duration="short", icon=icon_path) notif.set_audio(audio.Default, loop=False) notif.show() log(f"Notification: {message}") # ── Notification logic ──────────────────────────────────────────────────────── def pick_weighted(entries): weighted = [e for e in entries if e.weight is not None] if not weighted: return None return random.choices(weighted, weights=[e.weight for e in weighted], k=1)[0] def fire_entry(entry): is_wallpaper = entry.text.strip().lower() == WALLPAPER_MAGIC is_overlay = is_overlay_entry(entry.text) with state_lock: if paused: notification_queue.append(entry.text) log(f"Queued (paused): {entry.text}") return if is_wallpaper: do_wallpaper_change() elif is_overlay: dur = parse_overlay_duration(entry.text) threading.Thread(target=do_screen_overlay, args=(dur,), daemon=True).start() else: send_toast(entry.text) def flush_queued(queued: list): for item in queued: low = item.strip().lower() if low == WALLPAPER_MAGIC: do_wallpaper_change() elif is_overlay_entry(item): dur = parse_overlay_duration(item) do_screen_overlay(dur) else: send_toast(item) time.sleep(1.5) def notification_loop(): global _next_fire_at while True: with _entries_lock: entries = list(_entries) chosen = pick_weighted(entries) if chosen: fire_entry(chosen) interval = random.randint(MIN_INTERVAL_SEC, MAX_INTERVAL_SEC) _next_fire_at = time.time() + interval log(f"Next notification in {interval//60}m {interval%60}s") while time.time() < _next_fire_at: time.sleep(5) def schedule_runner(): while True: schedule.run_pending() time.sleep(10) # ── Hotkey ──────────────────────────────────────────────────────────────────── def toggle_pause(): global paused queued = [] with state_lock: paused = not paused if not paused: queued = notification_queue.copy() notification_queue.clear() if paused: log("Paused") pause_overlays() # instant – runs on Tk thread immediately if _default_wallpaper: threading.Thread(target=set_wallpaper, args=(_default_wallpaper,), daemon=True).start() else: log("Resumed") resume_overlays() # instant if _last_wallpaper: threading.Thread(target=set_wallpaper, args=(_last_wallpaper,), daemon=True).start() threading.Thread(target=flush_queued, args=(queued,), daemon=True).start() update_tray_title() def update_tray_title(): if tray_icon: tray_icon.title = f"{APP_NAME} - {'PAUSED' if paused else 'Running'}" tray_icon.update_menu() # ── Web UI ──────────────────────────────────────────────────────────────────── flask_app = Flask(__name__) HTML = r"""