351 lines
13 KiB
Python
351 lines
13 KiB
Python
"""
|
|
Social Media Collection Downloader
|
|
Downloads TikTok collections and Instagram saved posts, then removes them.
|
|
Requires: yt-dlp, playwright, browser cookies exported via browser extension.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import logging
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
# ── Logging ──────────────────────────────────────────────────────────────────
|
|
|
|
LOG_DIR = Path(__file__).parent / "logs"
|
|
LOG_DIR.mkdir(exist_ok=True)
|
|
log_file = LOG_DIR / f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
|
|
|
|
import io
|
|
_stream_handler = logging.StreamHandler(io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace"))
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(log_file, encoding="utf-8"),
|
|
_stream_handler,
|
|
],
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
|
|
CONFIG_PATH = Path(__file__).parent / "config" / "config.json"
|
|
DOWNLOADS_DIR = Path(__file__).parent / "downloads"
|
|
|
|
|
|
def load_config() -> dict:
|
|
if not CONFIG_PATH.exists():
|
|
log.error(f"Config not found at {CONFIG_PATH}. Run: python setup.py")
|
|
sys.exit(1)
|
|
with open(CONFIG_PATH) as f:
|
|
return json.load(f)
|
|
|
|
|
|
# ── yt-dlp helpers ───────────────────────────────────────────────────────────
|
|
|
|
def build_ytdlp_cmd(url: str, output_dir: Path, cookies_file: Optional[str] = None, cookies_from_browser: Optional[str] = None) -> list:
|
|
cmd = [
|
|
"yt-dlp",
|
|
"--no-warnings",
|
|
"--quiet",
|
|
"--progress",
|
|
"-o", str(output_dir / "%(uploader)s - %(title).80s [%(id)s].%(ext)s"),
|
|
"--write-info-json",
|
|
"--no-overwrites",
|
|
"--retries", "3",
|
|
"--fragment-retries", "3",
|
|
"--concurrent-fragments", "4",
|
|
]
|
|
|
|
if cookies_file and Path(cookies_file).exists():
|
|
cmd += ["--cookies", cookies_file]
|
|
elif cookies_from_browser:
|
|
cmd += ["--cookies-from-browser", cookies_from_browser]
|
|
|
|
cmd.append(url)
|
|
return cmd
|
|
|
|
|
|
def download_url(url: str, output_dir: Path, cookies_file: Optional[str] = None, cookies_from_browser: Optional[str] = None) -> bool:
|
|
cmd = build_ytdlp_cmd(url, output_dir, cookies_file, cookies_from_browser)
|
|
log.info(f"Downloading: {url}")
|
|
result = subprocess.run(cmd, capture_output=False, text=True)
|
|
if result.returncode == 0:
|
|
log.info(f"[OK] Downloaded: {url}")
|
|
return True
|
|
else:
|
|
log.error(f"[FAIL] Failed: {url}")
|
|
return False
|
|
|
|
|
|
# ── TikTok ───────────────────────────────────────────────────────────────────
|
|
|
|
def get_tiktok_collection_urls(collection_url: str, cookies_file: Optional[str], cookies_from_browser: Optional[str]) -> list[str]:
|
|
"""Use yt-dlp to extract all video URLs from a TikTok collection/playlist."""
|
|
cmd = [
|
|
"yt-dlp",
|
|
"--flat-playlist",
|
|
"--print", "url",
|
|
"--no-warnings",
|
|
"--quiet",
|
|
]
|
|
if cookies_file and Path(cookies_file).exists():
|
|
cmd += ["--cookies", cookies_file]
|
|
elif cookies_from_browser:
|
|
cmd += ["--cookies-from-browser", cookies_from_browser]
|
|
cmd.append(collection_url)
|
|
|
|
log.info(f"Fetching TikTok collection URLs from: {collection_url}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
urls = [line.strip() for line in result.stdout.splitlines() if line.strip().startswith("http")]
|
|
log.info(f"Found {len(urls)} videos in collection")
|
|
return urls
|
|
|
|
|
|
def download_tiktok_collection(config: dict) -> list[str]:
|
|
"""Download all videos from configured TikTok collections. Returns list of downloaded URLs."""
|
|
tk_cfg = config.get("tiktok", {})
|
|
if not tk_cfg.get("enabled", False):
|
|
log.info("TikTok disabled in config, skipping.")
|
|
return []
|
|
|
|
collections = tk_cfg.get("collections", [])
|
|
if not collections:
|
|
log.warning("No TikTok collections configured.")
|
|
return []
|
|
|
|
cookies_file = tk_cfg.get("cookies_file")
|
|
cookies_from_browser = tk_cfg.get("cookies_from_browser") # e.g. "chrome", "firefox"
|
|
output_dir = DOWNLOADS_DIR / "tiktok"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
downloaded_urls = []
|
|
for collection_url in collections:
|
|
urls = get_tiktok_collection_urls(collection_url, cookies_file, cookies_from_browser)
|
|
for url in urls:
|
|
success = download_url(url, output_dir, cookies_file, cookies_from_browser)
|
|
if success:
|
|
downloaded_urls.append(url)
|
|
time.sleep(tk_cfg.get("delay_between_downloads", 2))
|
|
|
|
return downloaded_urls
|
|
|
|
|
|
# ── Instagram ─────────────────────────────────────────────────────────────────
|
|
|
|
def get_instagram_saved_urls(collection_url: str, cookies_file: Optional[str], cookies_from_browser: Optional[str]) -> list[str]:
|
|
"""Use yt-dlp to extract all post URLs from an Instagram saved collection."""
|
|
cmd = [
|
|
"yt-dlp",
|
|
"--flat-playlist",
|
|
"--print", "url",
|
|
"--no-warnings",
|
|
"--quiet",
|
|
]
|
|
if cookies_file and Path(cookies_file).exists():
|
|
cmd += ["--cookies", cookies_file]
|
|
elif cookies_from_browser:
|
|
cmd += ["--cookies-from-browser", cookies_from_browser]
|
|
cmd.append(collection_url)
|
|
|
|
log.info(f"Fetching Instagram saved URLs from: {collection_url}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
urls = [line.strip() for line in result.stdout.splitlines() if line.strip().startswith("http")]
|
|
log.info(f"Found {len(urls)} posts in saved collection")
|
|
return urls
|
|
|
|
|
|
def download_instagram_collection(config: dict) -> list[str]:
|
|
"""Download all posts from configured Instagram saved collections."""
|
|
ig_cfg = config.get("instagram", {})
|
|
if not ig_cfg.get("enabled", False):
|
|
log.info("Instagram disabled in config, skipping.")
|
|
return []
|
|
|
|
collections = ig_cfg.get("collections", [])
|
|
if not collections:
|
|
log.warning("No Instagram collections configured.")
|
|
return []
|
|
|
|
cookies_file = ig_cfg.get("cookies_file")
|
|
cookies_from_browser = ig_cfg.get("cookies_from_browser")
|
|
output_dir = DOWNLOADS_DIR / "instagram"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
downloaded_urls = []
|
|
for collection_url in collections:
|
|
urls = get_instagram_saved_urls(collection_url, cookies_file, cookies_from_browser)
|
|
for url in urls:
|
|
success = download_url(url, output_dir, cookies_file, cookies_from_browser)
|
|
if success:
|
|
downloaded_urls.append(url)
|
|
time.sleep(ig_cfg.get("delay_between_downloads", 3))
|
|
|
|
return downloaded_urls
|
|
|
|
|
|
# ── Unsave / Remove ───────────────────────────────────────────────────────────
|
|
|
|
def unsave_tiktok_videos(urls: list[str], config: dict):
|
|
"""Use Playwright to unsave/unlike downloaded TikTok videos."""
|
|
if not urls:
|
|
return
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except ImportError:
|
|
log.error("Playwright not installed. Run: pip install playwright && playwright install chromium")
|
|
return
|
|
|
|
tk_cfg = config.get("tiktok", {})
|
|
cookies_file = tk_cfg.get("cookies_file")
|
|
|
|
log.info(f"Unsaving {len(urls)} TikTok videos...")
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=tk_cfg.get("headless", False))
|
|
context = browser.new_context()
|
|
|
|
if cookies_file and Path(cookies_file).exists():
|
|
with open(cookies_file) as f:
|
|
raw = json.load(f)
|
|
pw_cookies = []
|
|
for c in raw:
|
|
if "tiktok.com" in c.get("domain", ""):
|
|
pw_cookies.append({
|
|
"name": c["name"],
|
|
"value": c["value"],
|
|
"domain": c["domain"],
|
|
"path": c.get("path", "/"),
|
|
"httpOnly": c.get("httpOnly", False),
|
|
"secure": c.get("secure", False),
|
|
})
|
|
context.add_cookies(pw_cookies)
|
|
|
|
page = context.new_page()
|
|
|
|
for url in urls:
|
|
try:
|
|
log.info(f"Unsaving: {url}")
|
|
page.goto(url, wait_until="networkidle", timeout=30000)
|
|
time.sleep(2)
|
|
|
|
# Try clicking bookmark/save button (TikTok uses aria-label)
|
|
bookmark = page.query_selector('[data-e2e="bookmark-icon"], [aria-label*="Add to Favorites"], [aria-label*="Save"]')
|
|
if bookmark:
|
|
bookmark.click()
|
|
time.sleep(1)
|
|
log.info(f"[OK] Unsaved: {url}")
|
|
else:
|
|
log.warning(f"[WARN] Could not find bookmark button for: {url}")
|
|
|
|
time.sleep(tk_cfg.get("delay_between_unsaves", 2))
|
|
except Exception as e:
|
|
log.error(f"Error unsaving {url}: {e}")
|
|
|
|
browser.close()
|
|
|
|
|
|
def unsave_instagram_posts(urls: list[str], config: dict):
|
|
"""Use Playwright to unsave downloaded Instagram posts."""
|
|
if not urls:
|
|
return
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except ImportError:
|
|
log.error("Playwright not installed. Run: pip install playwright && playwright install chromium")
|
|
return
|
|
|
|
ig_cfg = config.get("instagram", {})
|
|
cookies_file = ig_cfg.get("cookies_file")
|
|
|
|
log.info(f"Unsaving {len(urls)} Instagram posts...")
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=ig_cfg.get("headless", False))
|
|
context = browser.new_context(
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
)
|
|
|
|
if cookies_file and Path(cookies_file).exists():
|
|
with open(cookies_file) as f:
|
|
raw = json.load(f)
|
|
pw_cookies = []
|
|
for c in raw:
|
|
if "instagram.com" in c.get("domain", ""):
|
|
pw_cookies.append({
|
|
"name": c["name"],
|
|
"value": c["value"],
|
|
"domain": c["domain"],
|
|
"path": c.get("path", "/"),
|
|
"httpOnly": c.get("httpOnly", False),
|
|
"secure": c.get("secure", False),
|
|
})
|
|
context.add_cookies(pw_cookies)
|
|
|
|
page = context.new_page()
|
|
|
|
for url in urls:
|
|
try:
|
|
log.info(f"Unsaving: {url}")
|
|
page.goto(url, wait_until="networkidle", timeout=30000)
|
|
time.sleep(2)
|
|
|
|
# Instagram save button - look for bookmark SVG button
|
|
save_btn = page.query_selector('svg[aria-label="Remove"]')
|
|
if not save_btn:
|
|
save_btn = page.query_selector('[aria-label="Unsave"]')
|
|
if not save_btn:
|
|
# Try finding bookmark icon that's currently "saved" (filled state)
|
|
save_btn = page.query_selector('button svg[aria-label*="Save"]')
|
|
|
|
if save_btn:
|
|
save_btn.click()
|
|
time.sleep(1)
|
|
log.info(f"[OK] Unsaved: {url}")
|
|
else:
|
|
log.warning(f"[WARN] Could not find save button for: {url}")
|
|
|
|
time.sleep(ig_cfg.get("delay_between_unsaves", 3))
|
|
except Exception as e:
|
|
log.error(f"Error unsaving {url}: {e}")
|
|
|
|
browser.close()
|
|
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
log.info("=" * 60)
|
|
log.info("Social Media Collection Downloader — Starting")
|
|
log.info("=" * 60)
|
|
|
|
config = load_config()
|
|
|
|
# Download TikTok
|
|
tiktok_downloaded = download_tiktok_collection(config)
|
|
log.info(f"TikTok: downloaded {len(tiktok_downloaded)} videos")
|
|
|
|
# Download Instagram
|
|
instagram_downloaded = download_instagram_collection(config)
|
|
log.info(f"Instagram: downloaded {len(instagram_downloaded)} posts")
|
|
|
|
# Unsave TikTok videos
|
|
if config.get("tiktok", {}).get("unsave_after_download", True):
|
|
unsave_tiktok_videos(tiktok_downloaded, config)
|
|
|
|
# Unsave Instagram posts
|
|
if config.get("instagram", {}).get("unsave_after_download", True):
|
|
unsave_instagram_posts(instagram_downloaded, config)
|
|
|
|
log.info("=" * 60)
|
|
log.info(f"Done. TikTok: {len(tiktok_downloaded)} | Instagram: {len(instagram_downloaded)}")
|
|
log.info(f"Log saved to: {log_file}")
|
|
log.info("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|