first commit
This commit is contained in:
350
downloader.py
Normal file
350
downloader.py
Normal file
@@ -0,0 +1,350 @@
|
||||
"""
|
||||
Social Media Collection Downloader
|
||||
Downloads TikTok collections and Instagram saved posts, then removes them.
|
||||
Requires: yt-dlp, playwright, browser cookies exported via browser extension.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
# ── Logging ──────────────────────────────────────────────────────────────────
|
||||
|
||||
LOG_DIR = Path(__file__).parent / "logs"
|
||||
LOG_DIR.mkdir(exist_ok=True)
|
||||
log_file = LOG_DIR / f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
|
||||
|
||||
import io
|
||||
_stream_handler = logging.StreamHandler(io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace"))
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler(log_file, encoding="utf-8"),
|
||||
_stream_handler,
|
||||
],
|
||||
)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# ── Config ────────────────────────────────────────────────────────────────────
|
||||
|
||||
CONFIG_PATH = Path(__file__).parent / "config" / "config.json"
|
||||
DOWNLOADS_DIR = Path(__file__).parent / "downloads"
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
if not CONFIG_PATH.exists():
|
||||
log.error(f"Config not found at {CONFIG_PATH}. Run: python setup.py")
|
||||
sys.exit(1)
|
||||
with open(CONFIG_PATH) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
# ── yt-dlp helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
def build_ytdlp_cmd(url: str, output_dir: Path, cookies_file: Optional[str] = None, cookies_from_browser: Optional[str] = None) -> list:
|
||||
cmd = [
|
||||
"yt-dlp",
|
||||
"--no-warnings",
|
||||
"--quiet",
|
||||
"--progress",
|
||||
"-o", str(output_dir / "%(uploader)s - %(title).80s [%(id)s].%(ext)s"),
|
||||
"--write-info-json",
|
||||
"--no-overwrites",
|
||||
"--retries", "3",
|
||||
"--fragment-retries", "3",
|
||||
"--concurrent-fragments", "4",
|
||||
]
|
||||
|
||||
if cookies_file and Path(cookies_file).exists():
|
||||
cmd += ["--cookies", cookies_file]
|
||||
elif cookies_from_browser:
|
||||
cmd += ["--cookies-from-browser", cookies_from_browser]
|
||||
|
||||
cmd.append(url)
|
||||
return cmd
|
||||
|
||||
|
||||
def download_url(url: str, output_dir: Path, cookies_file: Optional[str] = None, cookies_from_browser: Optional[str] = None) -> bool:
|
||||
cmd = build_ytdlp_cmd(url, output_dir, cookies_file, cookies_from_browser)
|
||||
log.info(f"Downloading: {url}")
|
||||
result = subprocess.run(cmd, capture_output=False, text=True)
|
||||
if result.returncode == 0:
|
||||
log.info(f"[OK] Downloaded: {url}")
|
||||
return True
|
||||
else:
|
||||
log.error(f"[FAIL] Failed: {url}")
|
||||
return False
|
||||
|
||||
|
||||
# ── TikTok ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def get_tiktok_collection_urls(collection_url: str, cookies_file: Optional[str], cookies_from_browser: Optional[str]) -> list[str]:
|
||||
"""Use yt-dlp to extract all video URLs from a TikTok collection/playlist."""
|
||||
cmd = [
|
||||
"yt-dlp",
|
||||
"--flat-playlist",
|
||||
"--print", "url",
|
||||
"--no-warnings",
|
||||
"--quiet",
|
||||
]
|
||||
if cookies_file and Path(cookies_file).exists():
|
||||
cmd += ["--cookies", cookies_file]
|
||||
elif cookies_from_browser:
|
||||
cmd += ["--cookies-from-browser", cookies_from_browser]
|
||||
cmd.append(collection_url)
|
||||
|
||||
log.info(f"Fetching TikTok collection URLs from: {collection_url}")
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
urls = [line.strip() for line in result.stdout.splitlines() if line.strip().startswith("http")]
|
||||
log.info(f"Found {len(urls)} videos in collection")
|
||||
return urls
|
||||
|
||||
|
||||
def download_tiktok_collection(config: dict) -> list[str]:
|
||||
"""Download all videos from configured TikTok collections. Returns list of downloaded URLs."""
|
||||
tk_cfg = config.get("tiktok", {})
|
||||
if not tk_cfg.get("enabled", False):
|
||||
log.info("TikTok disabled in config, skipping.")
|
||||
return []
|
||||
|
||||
collections = tk_cfg.get("collections", [])
|
||||
if not collections:
|
||||
log.warning("No TikTok collections configured.")
|
||||
return []
|
||||
|
||||
cookies_file = tk_cfg.get("cookies_file")
|
||||
cookies_from_browser = tk_cfg.get("cookies_from_browser") # e.g. "chrome", "firefox"
|
||||
output_dir = DOWNLOADS_DIR / "tiktok"
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
downloaded_urls = []
|
||||
for collection_url in collections:
|
||||
urls = get_tiktok_collection_urls(collection_url, cookies_file, cookies_from_browser)
|
||||
for url in urls:
|
||||
success = download_url(url, output_dir, cookies_file, cookies_from_browser)
|
||||
if success:
|
||||
downloaded_urls.append(url)
|
||||
time.sleep(tk_cfg.get("delay_between_downloads", 2))
|
||||
|
||||
return downloaded_urls
|
||||
|
||||
|
||||
# ── Instagram ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def get_instagram_saved_urls(collection_url: str, cookies_file: Optional[str], cookies_from_browser: Optional[str]) -> list[str]:
|
||||
"""Use yt-dlp to extract all post URLs from an Instagram saved collection."""
|
||||
cmd = [
|
||||
"yt-dlp",
|
||||
"--flat-playlist",
|
||||
"--print", "url",
|
||||
"--no-warnings",
|
||||
"--quiet",
|
||||
]
|
||||
if cookies_file and Path(cookies_file).exists():
|
||||
cmd += ["--cookies", cookies_file]
|
||||
elif cookies_from_browser:
|
||||
cmd += ["--cookies-from-browser", cookies_from_browser]
|
||||
cmd.append(collection_url)
|
||||
|
||||
log.info(f"Fetching Instagram saved URLs from: {collection_url}")
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
urls = [line.strip() for line in result.stdout.splitlines() if line.strip().startswith("http")]
|
||||
log.info(f"Found {len(urls)} posts in saved collection")
|
||||
return urls
|
||||
|
||||
|
||||
def download_instagram_collection(config: dict) -> list[str]:
|
||||
"""Download all posts from configured Instagram saved collections."""
|
||||
ig_cfg = config.get("instagram", {})
|
||||
if not ig_cfg.get("enabled", False):
|
||||
log.info("Instagram disabled in config, skipping.")
|
||||
return []
|
||||
|
||||
collections = ig_cfg.get("collections", [])
|
||||
if not collections:
|
||||
log.warning("No Instagram collections configured.")
|
||||
return []
|
||||
|
||||
cookies_file = ig_cfg.get("cookies_file")
|
||||
cookies_from_browser = ig_cfg.get("cookies_from_browser")
|
||||
output_dir = DOWNLOADS_DIR / "instagram"
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
downloaded_urls = []
|
||||
for collection_url in collections:
|
||||
urls = get_instagram_saved_urls(collection_url, cookies_file, cookies_from_browser)
|
||||
for url in urls:
|
||||
success = download_url(url, output_dir, cookies_file, cookies_from_browser)
|
||||
if success:
|
||||
downloaded_urls.append(url)
|
||||
time.sleep(ig_cfg.get("delay_between_downloads", 3))
|
||||
|
||||
return downloaded_urls
|
||||
|
||||
|
||||
# ── Unsave / Remove ───────────────────────────────────────────────────────────
|
||||
|
||||
def unsave_tiktok_videos(urls: list[str], config: dict):
|
||||
"""Use Playwright to unsave/unlike downloaded TikTok videos."""
|
||||
if not urls:
|
||||
return
|
||||
try:
|
||||
from playwright.sync_api import sync_playwright
|
||||
except ImportError:
|
||||
log.error("Playwright not installed. Run: pip install playwright && playwright install chromium")
|
||||
return
|
||||
|
||||
tk_cfg = config.get("tiktok", {})
|
||||
cookies_file = tk_cfg.get("cookies_file")
|
||||
|
||||
log.info(f"Unsaving {len(urls)} TikTok videos...")
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=tk_cfg.get("headless", False))
|
||||
context = browser.new_context()
|
||||
|
||||
if cookies_file and Path(cookies_file).exists():
|
||||
with open(cookies_file) as f:
|
||||
raw = json.load(f)
|
||||
pw_cookies = []
|
||||
for c in raw:
|
||||
if "tiktok.com" in c.get("domain", ""):
|
||||
pw_cookies.append({
|
||||
"name": c["name"],
|
||||
"value": c["value"],
|
||||
"domain": c["domain"],
|
||||
"path": c.get("path", "/"),
|
||||
"httpOnly": c.get("httpOnly", False),
|
||||
"secure": c.get("secure", False),
|
||||
})
|
||||
context.add_cookies(pw_cookies)
|
||||
|
||||
page = context.new_page()
|
||||
|
||||
for url in urls:
|
||||
try:
|
||||
log.info(f"Unsaving: {url}")
|
||||
page.goto(url, wait_until="networkidle", timeout=30000)
|
||||
time.sleep(2)
|
||||
|
||||
# Try clicking bookmark/save button (TikTok uses aria-label)
|
||||
bookmark = page.query_selector('[data-e2e="bookmark-icon"], [aria-label*="Add to Favorites"], [aria-label*="Save"]')
|
||||
if bookmark:
|
||||
bookmark.click()
|
||||
time.sleep(1)
|
||||
log.info(f"[OK] Unsaved: {url}")
|
||||
else:
|
||||
log.warning(f"[WARN] Could not find bookmark button for: {url}")
|
||||
|
||||
time.sleep(tk_cfg.get("delay_between_unsaves", 2))
|
||||
except Exception as e:
|
||||
log.error(f"Error unsaving {url}: {e}")
|
||||
|
||||
browser.close()
|
||||
|
||||
|
||||
def unsave_instagram_posts(urls: list[str], config: dict):
|
||||
"""Use Playwright to unsave downloaded Instagram posts."""
|
||||
if not urls:
|
||||
return
|
||||
try:
|
||||
from playwright.sync_api import sync_playwright
|
||||
except ImportError:
|
||||
log.error("Playwright not installed. Run: pip install playwright && playwright install chromium")
|
||||
return
|
||||
|
||||
ig_cfg = config.get("instagram", {})
|
||||
cookies_file = ig_cfg.get("cookies_file")
|
||||
|
||||
log.info(f"Unsaving {len(urls)} Instagram posts...")
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=ig_cfg.get("headless", False))
|
||||
context = browser.new_context(
|
||||
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||||
)
|
||||
|
||||
if cookies_file and Path(cookies_file).exists():
|
||||
with open(cookies_file) as f:
|
||||
raw = json.load(f)
|
||||
pw_cookies = []
|
||||
for c in raw:
|
||||
if "instagram.com" in c.get("domain", ""):
|
||||
pw_cookies.append({
|
||||
"name": c["name"],
|
||||
"value": c["value"],
|
||||
"domain": c["domain"],
|
||||
"path": c.get("path", "/"),
|
||||
"httpOnly": c.get("httpOnly", False),
|
||||
"secure": c.get("secure", False),
|
||||
})
|
||||
context.add_cookies(pw_cookies)
|
||||
|
||||
page = context.new_page()
|
||||
|
||||
for url in urls:
|
||||
try:
|
||||
log.info(f"Unsaving: {url}")
|
||||
page.goto(url, wait_until="networkidle", timeout=30000)
|
||||
time.sleep(2)
|
||||
|
||||
# Instagram save button - look for bookmark SVG button
|
||||
save_btn = page.query_selector('svg[aria-label="Remove"]')
|
||||
if not save_btn:
|
||||
save_btn = page.query_selector('[aria-label="Unsave"]')
|
||||
if not save_btn:
|
||||
# Try finding bookmark icon that's currently "saved" (filled state)
|
||||
save_btn = page.query_selector('button svg[aria-label*="Save"]')
|
||||
|
||||
if save_btn:
|
||||
save_btn.click()
|
||||
time.sleep(1)
|
||||
log.info(f"[OK] Unsaved: {url}")
|
||||
else:
|
||||
log.warning(f"[WARN] Could not find save button for: {url}")
|
||||
|
||||
time.sleep(ig_cfg.get("delay_between_unsaves", 3))
|
||||
except Exception as e:
|
||||
log.error(f"Error unsaving {url}: {e}")
|
||||
|
||||
browser.close()
|
||||
|
||||
|
||||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
log.info("=" * 60)
|
||||
log.info("Social Media Collection Downloader — Starting")
|
||||
log.info("=" * 60)
|
||||
|
||||
config = load_config()
|
||||
|
||||
# Download TikTok
|
||||
tiktok_downloaded = download_tiktok_collection(config)
|
||||
log.info(f"TikTok: downloaded {len(tiktok_downloaded)} videos")
|
||||
|
||||
# Download Instagram
|
||||
instagram_downloaded = download_instagram_collection(config)
|
||||
log.info(f"Instagram: downloaded {len(instagram_downloaded)} posts")
|
||||
|
||||
# Unsave TikTok videos
|
||||
if config.get("tiktok", {}).get("unsave_after_download", True):
|
||||
unsave_tiktok_videos(tiktok_downloaded, config)
|
||||
|
||||
# Unsave Instagram posts
|
||||
if config.get("instagram", {}).get("unsave_after_download", True):
|
||||
unsave_instagram_posts(instagram_downloaded, config)
|
||||
|
||||
log.info("=" * 60)
|
||||
log.info(f"Done. TikTok: {len(tiktok_downloaded)} | Instagram: {len(instagram_downloaded)}")
|
||||
log.info(f"Log saved to: {log_file}")
|
||||
log.info("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user