""" Social Media Collection Downloader Downloads TikTok collections and Instagram saved posts, then removes them. Requires: yt-dlp, playwright, browser cookies exported via browser extension. """ import os import json import time import logging import subprocess import sys from pathlib import Path from datetime import datetime from typing import Optional # ── Logging ────────────────────────────────────────────────────────────────── LOG_DIR = Path(__file__).parent / "logs" LOG_DIR.mkdir(exist_ok=True) log_file = LOG_DIR / f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" import io _stream_handler = logging.StreamHandler(io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(log_file, encoding="utf-8"), _stream_handler, ], ) log = logging.getLogger(__name__) # ── Config ──────────────────────────────────────────────────────────────────── CONFIG_PATH = Path(__file__).parent / "config" / "config.json" DOWNLOADS_DIR = Path(__file__).parent / "downloads" def load_config() -> dict: if not CONFIG_PATH.exists(): log.error(f"Config not found at {CONFIG_PATH}. Run: python setup.py") sys.exit(1) with open(CONFIG_PATH) as f: return json.load(f) # ── yt-dlp helpers ─────────────────────────────────────────────────────────── def build_ytdlp_cmd(url: str, output_dir: Path, cookies_file: Optional[str] = None, cookies_from_browser: Optional[str] = None) -> list: cmd = [ "yt-dlp", "--no-warnings", "--quiet", "--progress", "-o", str(output_dir / "%(uploader)s - %(title).80s [%(id)s].%(ext)s"), "--write-info-json", "--no-overwrites", "--retries", "3", "--fragment-retries", "3", "--concurrent-fragments", "4", ] if cookies_file and Path(cookies_file).exists(): cmd += ["--cookies", cookies_file] elif cookies_from_browser: cmd += ["--cookies-from-browser", cookies_from_browser] cmd.append(url) return cmd def download_url(url: str, output_dir: Path, cookies_file: Optional[str] = None, cookies_from_browser: Optional[str] = None) -> bool: cmd = build_ytdlp_cmd(url, output_dir, cookies_file, cookies_from_browser) log.info(f"Downloading: {url}") result = subprocess.run(cmd, capture_output=False, text=True) if result.returncode == 0: log.info(f"[OK] Downloaded: {url}") return True else: log.error(f"[FAIL] Failed: {url}") return False # ── TikTok ─────────────────────────────────────────────────────────────────── def get_tiktok_collection_urls(collection_url: str, cookies_file: Optional[str], cookies_from_browser: Optional[str]) -> list[str]: """Use yt-dlp to extract all video URLs from a TikTok collection/playlist.""" cmd = [ "yt-dlp", "--flat-playlist", "--print", "url", "--no-warnings", "--quiet", ] if cookies_file and Path(cookies_file).exists(): cmd += ["--cookies", cookies_file] elif cookies_from_browser: cmd += ["--cookies-from-browser", cookies_from_browser] cmd.append(collection_url) log.info(f"Fetching TikTok collection URLs from: {collection_url}") result = subprocess.run(cmd, capture_output=True, text=True) urls = [line.strip() for line in result.stdout.splitlines() if line.strip().startswith("http")] log.info(f"Found {len(urls)} videos in collection") return urls def download_tiktok_collection(config: dict) -> list[str]: """Download all videos from configured TikTok collections. Returns list of downloaded URLs.""" tk_cfg = config.get("tiktok", {}) if not tk_cfg.get("enabled", False): log.info("TikTok disabled in config, skipping.") return [] collections = tk_cfg.get("collections", []) if not collections: log.warning("No TikTok collections configured.") return [] cookies_file = tk_cfg.get("cookies_file") cookies_from_browser = tk_cfg.get("cookies_from_browser") # e.g. "chrome", "firefox" output_dir = DOWNLOADS_DIR / "tiktok" output_dir.mkdir(parents=True, exist_ok=True) downloaded_urls = [] for collection_url in collections: urls = get_tiktok_collection_urls(collection_url, cookies_file, cookies_from_browser) for url in urls: success = download_url(url, output_dir, cookies_file, cookies_from_browser) if success: downloaded_urls.append(url) time.sleep(tk_cfg.get("delay_between_downloads", 2)) return downloaded_urls # ── Instagram ───────────────────────────────────────────────────────────────── def get_instagram_saved_urls(collection_url: str, cookies_file: Optional[str], cookies_from_browser: Optional[str]) -> list[str]: """Use yt-dlp to extract all post URLs from an Instagram saved collection.""" cmd = [ "yt-dlp", "--flat-playlist", "--print", "url", "--no-warnings", "--quiet", ] if cookies_file and Path(cookies_file).exists(): cmd += ["--cookies", cookies_file] elif cookies_from_browser: cmd += ["--cookies-from-browser", cookies_from_browser] cmd.append(collection_url) log.info(f"Fetching Instagram saved URLs from: {collection_url}") result = subprocess.run(cmd, capture_output=True, text=True) urls = [line.strip() for line in result.stdout.splitlines() if line.strip().startswith("http")] log.info(f"Found {len(urls)} posts in saved collection") return urls def download_instagram_collection(config: dict) -> list[str]: """Download all posts from configured Instagram saved collections.""" ig_cfg = config.get("instagram", {}) if not ig_cfg.get("enabled", False): log.info("Instagram disabled in config, skipping.") return [] collections = ig_cfg.get("collections", []) if not collections: log.warning("No Instagram collections configured.") return [] cookies_file = ig_cfg.get("cookies_file") cookies_from_browser = ig_cfg.get("cookies_from_browser") output_dir = DOWNLOADS_DIR / "instagram" output_dir.mkdir(parents=True, exist_ok=True) downloaded_urls = [] for collection_url in collections: urls = get_instagram_saved_urls(collection_url, cookies_file, cookies_from_browser) for url in urls: success = download_url(url, output_dir, cookies_file, cookies_from_browser) if success: downloaded_urls.append(url) time.sleep(ig_cfg.get("delay_between_downloads", 3)) return downloaded_urls # ── Unsave / Remove ─────────────────────────────────────────────────────────── def unsave_tiktok_videos(urls: list[str], config: dict): """Use Playwright to unsave/unlike downloaded TikTok videos.""" if not urls: return try: from playwright.sync_api import sync_playwright except ImportError: log.error("Playwright not installed. Run: pip install playwright && playwright install chromium") return tk_cfg = config.get("tiktok", {}) cookies_file = tk_cfg.get("cookies_file") log.info(f"Unsaving {len(urls)} TikTok videos...") with sync_playwright() as p: browser = p.chromium.launch(headless=tk_cfg.get("headless", False)) context = browser.new_context() if cookies_file and Path(cookies_file).exists(): with open(cookies_file) as f: raw = json.load(f) pw_cookies = [] for c in raw: if "tiktok.com" in c.get("domain", ""): pw_cookies.append({ "name": c["name"], "value": c["value"], "domain": c["domain"], "path": c.get("path", "/"), "httpOnly": c.get("httpOnly", False), "secure": c.get("secure", False), }) context.add_cookies(pw_cookies) page = context.new_page() for url in urls: try: log.info(f"Unsaving: {url}") page.goto(url, wait_until="networkidle", timeout=30000) time.sleep(2) # Try clicking bookmark/save button (TikTok uses aria-label) bookmark = page.query_selector('[data-e2e="bookmark-icon"], [aria-label*="Add to Favorites"], [aria-label*="Save"]') if bookmark: bookmark.click() time.sleep(1) log.info(f"[OK] Unsaved: {url}") else: log.warning(f"[WARN] Could not find bookmark button for: {url}") time.sleep(tk_cfg.get("delay_between_unsaves", 2)) except Exception as e: log.error(f"Error unsaving {url}: {e}") browser.close() def unsave_instagram_posts(urls: list[str], config: dict): """Use Playwright to unsave downloaded Instagram posts.""" if not urls: return try: from playwright.sync_api import sync_playwright except ImportError: log.error("Playwright not installed. Run: pip install playwright && playwright install chromium") return ig_cfg = config.get("instagram", {}) cookies_file = ig_cfg.get("cookies_file") log.info(f"Unsaving {len(urls)} Instagram posts...") with sync_playwright() as p: browser = p.chromium.launch(headless=ig_cfg.get("headless", False)) context = browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) if cookies_file and Path(cookies_file).exists(): with open(cookies_file) as f: raw = json.load(f) pw_cookies = [] for c in raw: if "instagram.com" in c.get("domain", ""): pw_cookies.append({ "name": c["name"], "value": c["value"], "domain": c["domain"], "path": c.get("path", "/"), "httpOnly": c.get("httpOnly", False), "secure": c.get("secure", False), }) context.add_cookies(pw_cookies) page = context.new_page() for url in urls: try: log.info(f"Unsaving: {url}") page.goto(url, wait_until="networkidle", timeout=30000) time.sleep(2) # Instagram save button - look for bookmark SVG button save_btn = page.query_selector('svg[aria-label="Remove"]') if not save_btn: save_btn = page.query_selector('[aria-label="Unsave"]') if not save_btn: # Try finding bookmark icon that's currently "saved" (filled state) save_btn = page.query_selector('button svg[aria-label*="Save"]') if save_btn: save_btn.click() time.sleep(1) log.info(f"[OK] Unsaved: {url}") else: log.warning(f"[WARN] Could not find save button for: {url}") time.sleep(ig_cfg.get("delay_between_unsaves", 3)) except Exception as e: log.error(f"Error unsaving {url}: {e}") browser.close() # ── Main ────────────────────────────────────────────────────────────────────── def main(): log.info("=" * 60) log.info("Social Media Collection Downloader — Starting") log.info("=" * 60) config = load_config() # Download TikTok tiktok_downloaded = download_tiktok_collection(config) log.info(f"TikTok: downloaded {len(tiktok_downloaded)} videos") # Download Instagram instagram_downloaded = download_instagram_collection(config) log.info(f"Instagram: downloaded {len(instagram_downloaded)} posts") # Unsave TikTok videos if config.get("tiktok", {}).get("unsave_after_download", True): unsave_tiktok_videos(tiktok_downloaded, config) # Unsave Instagram posts if config.get("instagram", {}).get("unsave_after_download", True): unsave_instagram_posts(instagram_downloaded, config) log.info("=" * 60) log.info(f"Done. TikTok: {len(tiktok_downloaded)} | Instagram: {len(instagram_downloaded)}") log.info(f"Log saved to: {log_file}") log.info("=" * 60) if __name__ == "__main__": main()