#!/usr/bin/env python3 """FaceSwap Batch Tool v5 - Mit Face Enhancer & Farbanpassung. Fuehre setup.bat aus bevor du dieses Skript startest.""" import os, sys, subprocess, importlib, platform, urllib.request, shutil, zipfile, json, sysconfig from pathlib import Path SCRIPT_DIR = Path(__file__).parent.resolve() MODELS_DIR = SCRIPT_DIR / "models" SETUP_FLAG = SCRIPT_DIR / ".setup_done" CONFIG_FILE = SCRIPT_DIR / "config.json" IS_WINDOWS = platform.system() == "Windows" _CUDA_PRELOAD_TRIED = False _PROVIDERS_CACHE = None _DLL_DIR_HANDLES = [] REQUIRED_PYTHON = (3, 12) def _require_python_version(): if sys.version_info[:2] == REQUIRED_PYTHON: return required = f"{REQUIRED_PYTHON[0]}.{REQUIRED_PYTHON[1]}" msg = ( "Falsche Python-Version erkannt.\n\n" f"Benoetigt: Python {required}\n" f"Aktuell: Python {sys.version.split()[0]}\n" f"Interpreter: {sys.executable}\n\n" f"Bitte so starten:\n py -{required} {Path(__file__).name}" ) try: import tkinter as tk from tkinter import messagebox root = tk.Tk() root.withdraw() messagebox.showerror("FaceSwap - Python Version", msg, parent=root) root.destroy() except Exception: pass raise RuntimeError(msg) def _load_config(): try: if CONFIG_FILE.exists(): return json.loads(CONFIG_FILE.read_text(encoding="utf-8")) except Exception: pass return {} def _save_config(data): try: CONFIG_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") except Exception: pass def _run_ok(*cmd): try: out = subprocess.check_output(list(cmd), stderr=subprocess.DEVNULL, timeout=8).decode() return True, out except Exception: return False, "" def _cuda_available(): ok, out = _run_ok("nvidia-smi") if ok: import re m = re.search(r"CUDA Version:\s*([\d.]+)", out) return True, (m.group(1) if m else "unknown") return False, "" def _ensure_torchvision_functional_tensor_alias(log_fn=None): try: import torchvision.transforms.functional_tensor # noqa: F401 return True except Exception: pass try: import sys import types from torchvision.transforms import functional as tvf if not hasattr(tvf, "rgb_to_grayscale"): return False alias_mod = types.ModuleType("torchvision.transforms.functional_tensor") alias_mod.rgb_to_grayscale = tvf.rgb_to_grayscale sys.modules["torchvision.transforms.functional_tensor"] = alias_mod if log_fn: log_fn(" Hinweis: torchvision-Kompatibilitaetsmodus fuer GFPGAN aktiv.") return True except Exception: return False def _preload_onnxruntime_cuda(log_fn=None): global _CUDA_PRELOAD_TRIED if _CUDA_PRELOAD_TRIED or not IS_WINDOWS: return _CUDA_PRELOAD_TRIED = True try: import site import onnxruntime as ort dll_dirs = [] seen = set() def _push_dir(path_obj): try: p = str(Path(path_obj).resolve()) except Exception: return key = p.lower() if key in seen: return seen.add(key) if Path(p).is_dir(): dll_dirs.append(p) # onnxruntime eigene DLLs ort_pkg = Path(ort.__file__).resolve().parent _push_dir(ort_pkg / "capi") # NVIDIA Runtime-DLLs aus allen relevanten site-packages site_roots = [] try: site_roots.extend(site.getsitepackages()) except Exception: pass try: site_roots.append(site.getusersitepackages()) except Exception: pass # Interpreter-spezifisches site-packages ebenfalls absichern site_roots.append(sysconfig.get_paths().get("purelib", "")) for root in [Path(r) for r in site_roots if r]: _push_dir(root / "nvidia" / "cublas" / "bin") _push_dir(root / "nvidia" / "cuda_runtime" / "bin") _push_dir(root / "nvidia" / "cuda_nvrtc" / "bin") _push_dir(root / "nvidia" / "cudnn" / "bin") _push_dir(root / "nvidia" / "cufft" / "bin") _push_dir(root / "nvidia" / "nvjitlink" / "bin") # DLL-Suchpfad fuer spaeter dynamisch geladene cuDNN-Teillibs erweitern path_parts = os.environ.get("PATH", "").split(os.pathsep) path_keys = {p.lower() for p in path_parts} for d in dll_dirs: if d.lower() not in path_keys: path_parts.insert(0, d) path_keys.add(d.lower()) try: if hasattr(os, "add_dll_directory"): _DLL_DIR_HANDLES.append(os.add_dll_directory(d)) except Exception: pass os.environ["PATH"] = os.pathsep.join(path_parts) preload = getattr(ort, "preload_dlls", None) if callable(preload): preload(directory="") if log_fn: log_fn(f" ONNX Runtime CUDA-DLLs vorgeladen ({len(dll_dirs)} DLL-Ordner).") except Exception as e: if log_fn: log_fn(f" Hinweis: CUDA-DLL-Preload fehlgeschlagen ({e}).") def _dl(url, dest, log=print): def hook(count, block, total): if total > 0 and count % 100 == 0: pct = min(100, count * block * 100 // total) log(f" ... {pct}% ({count*block/1_048_576:.0f} MB)") urllib.request.urlretrieve(url, dest, reporthook=hook) def _cv2_imread_unicode(path, flags=None): import cv2 import numpy as np if flags is None: flags = cv2.IMREAD_COLOR p = str(path) img = cv2.imread(p, flags) if img is not None: return img try: data = np.fromfile(p, dtype=np.uint8) if data.size == 0: return None return cv2.imdecode(data, flags) except Exception: return None def _cv2_imwrite_unicode(path, img, params=None): import cv2 p = str(path) if params is None: params = [] try: if cv2.imwrite(p, img, params): return True except Exception: pass try: ext = Path(p).suffix or ".png" ok, buf = cv2.imencode(ext, img, params) if not ok: return False buf.tofile(p) return True except Exception: return False def _path_has_non_ascii(path): try: s = str(path) except Exception: return False return any(ord(ch) > 127 for ch in s) def _open_videocapture_unicode(source, log_fn=None): import cv2 import tempfile if isinstance(source, int): return cv2.VideoCapture(int(source)), None p = str(source) cap = cv2.VideoCapture(p) if cap.isOpened(): return cap, None tmp_copy = None try: src_path = Path(p) if IS_WINDOWS and src_path.is_file() and _path_has_non_ascii(src_path): fd, tmp_name = tempfile.mkstemp(prefix="faceswap_vid_", suffix=(src_path.suffix or ".mp4"), dir=str(SCRIPT_DIR)) os.close(fd) tmp_copy = Path(tmp_name) shutil.copy2(src_path, tmp_copy) cap2 = cv2.VideoCapture(str(tmp_copy)) if cap2.isOpened(): cap.release() if log_fn: log_fn(" Hinweis: Unicode-Video-Fallback aktiv (temp Datei).") return cap2, tmp_copy cap2.release() except Exception: pass return cap, None def _cleanup_temp_file(path): if path is None: return try: Path(path).unlink(missing_ok=True) except Exception: pass def run_setup(log=print): log("=" * 60) log(" FaceSwap Batch Tool - Modell-Download") log("=" * 60) log("\nPruefe NVIDIA GPU / CUDA ...") cuda, cuda_ver = _cuda_available() log(f" {'OK CUDA ' + cuda_ver + ' -> GPU-Modus' if cuda else 'Info: Kein CUDA -> CPU-Modus'}") log("\nPruefe Installation ...") missing = [] for mod, name in [("cv2","opencv-python"), ("numpy","numpy"), ("insightface","insightface"), ("onnx","onnx"), ("onnxruntime","onnxruntime"), ("albumentations","albumentations")]: try: importlib.import_module(mod) log(f" OK {name}") except Exception: log(f" FEHLT: {name}") missing.append(name) if missing: raise RuntimeError( f"Fehlende Pakete: {', '.join(missing)}\n\n" "Bitte fuehre zuerst setup.bat aus!" ) log("\nPruefe GFPGAN ...") has_gfpgan = False try: if importlib.util.find_spec("gfpgan") is None: raise ModuleNotFoundError("gfpgan") _ensure_torchvision_functional_tensor_alias(log) importlib.import_module("gfpgan") has_gfpgan = True log(" OK gfpgan") except ModuleNotFoundError: log(" FEHLT: gfpgan - Funktion deaktiviert (optional)") except Exception as e: log(f" FEHLER: gfpgan installiert, aber nicht importierbar ({e})") log(" Funktion deaktiviert (optional)") import numpy as np major = int(np.__version__.split(".")[0]) if major >= 2: raise RuntimeError( f"numpy {np.__version__} ist installiert, aber numpy<2.0 wird benoetigt.\n\n" "Bitte fuehre setup.bat aus um alle Pakete neu zu installieren." ) log(f" OK numpy {np.__version__} (kompatibel)") log("\nPruefe KI-Modelle ...") MODELS_DIR.mkdir(exist_ok=True) buffalo_dir = MODELS_DIR / "buffalo_l" if buffalo_dir.exists() and any(buffalo_dir.iterdir()): log(" OK buffalo_l") else: log(" Lade buffalo_l (~200 MB) ...") zp = MODELS_DIR / "buffalo_l.zip" try: _dl("https://github.com/deepinsight/insightface/releases/download/v0.7/buffalo_l.zip", zp, log) with zipfile.ZipFile(zp) as z: z.extractall(MODELS_DIR) zp.unlink(missing_ok=True) log(" OK buffalo_l") except Exception as e: zp.unlink(missing_ok=True) raise RuntimeError(f"buffalo_l Download fehlgeschlagen: {e}") sm = MODELS_DIR / "inswapper_128.onnx" if sm.exists() and sm.stat().st_size > 100_000: log(" OK inswapper_128.onnx") else: log(" Lade inswapper_128.onnx (~500 MB) ...") urls = [ "https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx", "https://github.com/deepinsight/insightface/releases/download/v0.7/inswapper_128.onnx", ] ok = False for url in urls: try: log(f" Versuche {url.split('/')[2]} ...") _dl(url, sm, log) if sm.exists() and sm.stat().st_size > 100_000: ok = True break sm.unlink(missing_ok=True) except Exception as e: log(f" Fehler: {e}") sm.unlink(missing_ok=True) if not ok: raise RuntimeError( "inswapper_128.onnx konnte nicht heruntergeladen werden.\n\n" "Manuell herunterladen von:\n" " https://huggingface.co/deepinsight/inswapper\n" f"Datei ablegen in: {MODELS_DIR}" ) log(" OK inswapper_128.onnx") if has_gfpgan: try: FaceRestorer(log_fn=log).ensure_model() log(" OK GFPGANv1.4.pth") except Exception as e: log(f" WARNUNG: GFPGAN-Modell konnte nicht vorbereitet werden ({e})") SETUP_FLAG.write_text(f"cuda={cuda}\n") log("\nEinrichtung abgeschlossen!") def _show_setup_window(): import tkinter as tk from tkinter import ttk, messagebox import threading root = tk.Tk() root.title("FaceSwap - Einrichtung") root.geometry("700x520") root.configure(bg="#090912") root.resizable(False, False) tk.Label(root, text="Einrichtung", font=("Courier New", 17, "bold"), bg="#090912", fg="#e8d5b7").pack(pady=(18, 4)) tk.Label(root, text="Pruefe Pakete & lade KI-Modelle ...", font=("Courier New", 9), bg="#090912", fg="#7a7a9a").pack() lf = tk.Frame(root, bg="#090912") lf.pack(fill="both", expand=True, padx=18, pady=8) lb = tk.Text(lf, bg="#04040a", fg="#8aff8a", font=("Courier New", 8), relief="flat", state="disabled") sb = tk.Scrollbar(lf, command=lb.yview) lb.configure(yscrollcommand=sb.set) lb.pack(side="left", fill="both", expand=True) sb.pack(side="right", fill="y") style = ttk.Style(root) style.theme_use("default") style.configure("S.Horizontal.TProgressbar", troughcolor="#111120", background="#3adf6a", thickness=10) pb = ttk.Progressbar(root, mode="indeterminate", length=660, style="S.Horizontal.TProgressbar") pb.pack(padx=18, pady=4) sv = tk.StringVar(value="Starte ...") tk.Label(root, textvariable=sv, font=("Courier New", 9), bg="#090912", fg="#c8a96a").pack() btn = tk.Button(root, text="Schliessen & Starten", font=("Courier New", 11, "bold"), bg="#1a3a2a", fg="#8aff8a", relief="flat", state="disabled", cursor="hand2", command=root.destroy) btn.pack(pady=10) def append(msg): lb.configure(state="normal") lb.insert("end", msg + "\n") lb.see("end") lb.configure(state="disabled") root.update_idletasks() def worker(): pb.start(10) try: run_setup(log=append) sv.set("Fertig!") btn.configure(state="normal") except Exception as e: append(f"\nFehler: {e}") sv.set("Fehler - Details im Log") messagebox.showerror("Fehler", str(e), parent=root) btn.configure(state="normal") finally: pb.stop() threading.Thread(target=worker, daemon=True).start() root.mainloop() def _get_providers(log_fn=None): global _PROVIDERS_CACHE if _PROVIDERS_CACHE is not None: return list(_PROVIDERS_CACHE) try: import onnxruntime as ort _preload_onnxruntime_cuda(log_fn) if "CUDAExecutionProvider" in ort.get_available_providers(): probe_model = MODELS_DIR / "buffalo_l" / "1k3d68.onnx" if probe_model.exists(): session = ort.InferenceSession( str(probe_model), providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) if "CUDAExecutionProvider" not in session.get_providers(): if log_fn: log_fn(" CUDA erkannt, aber ONNX Runtime initialisiert nur CPU. CPU-Fallback aktiv.") _PROVIDERS_CACHE = ["CPUExecutionProvider"] return list(_PROVIDERS_CACHE) _PROVIDERS_CACHE = ["CUDAExecutionProvider", "CPUExecutionProvider"] return list(_PROVIDERS_CACHE) except Exception: pass _PROVIDERS_CACHE = ["CPUExecutionProvider"] return list(_PROVIDERS_CACHE) def _enhance_face_region(img, bbox, sharpen=True, color_correct=True): import cv2, numpy as np x1, y1, x2, y2 = [int(v) for v in bbox] pad = 10 x1c = max(0, x1 - pad); y1c = max(0, y1 - pad) x2c = min(img.shape[1], x2 + pad); y2c = min(img.shape[0], y2 + pad) h, w = y2c - y1c, x2c - x1c if h <= 0 or w <= 0: return img original_region = img[y1c:y2c, x1c:x2c].copy() enhanced = original_region.copy() if sharpen: blur = cv2.GaussianBlur(enhanced, (0, 0), 2) enhanced = cv2.addWeighted(enhanced, 1.4, blur, -0.4, 0) mask = np.zeros((h, w), dtype=np.float32) border = max(8, min(h, w) // 6) mask[border:-border, border:-border] = 1.0 mask = cv2.GaussianBlur(mask, (0, 0), border * 0.8) mask = np.clip(mask, 0, 1)[:, :, np.newaxis] blended = (enhanced.astype(np.float32) * mask + original_region.astype(np.float32) * (1 - mask)) img[y1c:y2c, x1c:x2c] = np.clip(blended, 0, 255).astype(np.uint8) return img def _match_face_color(swapped_img, original_img, bbox): import cv2, numpy as np x1, y1, x2, y2 = [int(v) for v in bbox] pad = 5 x1c = max(0, x1 - pad); y1c = max(0, y1 - pad) x2c = min(swapped_img.shape[1], x2 + pad); y2c = min(swapped_img.shape[0], y2 + pad) src_region = original_img[y1c:y2c, x1c:x2c].astype(np.float32) dst_region = swapped_img[y1c:y2c, x1c:x2c].astype(np.float32) if src_region.size == 0 or dst_region.size == 0: return swapped_img for c in range(3): src_mean, src_std = src_region[:,:,c].mean(), src_region[:,:,c].std() + 1e-6 dst_mean, dst_std = dst_region[:,:,c].mean(), dst_region[:,:,c].std() + 1e-6 factor = 0.5 dst_region[:,:,c] = (dst_region[:,:,c] - dst_mean) * (src_std / dst_std) * factor \ + dst_mean * factor + dst_region[:,:,c] * (1 - factor) dst_region = np.clip(dst_region, 0, 255).astype(np.uint8) result = swapped_img.copy() result[y1c:y2c, x1c:x2c] = dst_region return result class FaceRestorer: MODEL_URL = "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth" MODEL_PATH = MODELS_DIR / "GFPGANv1.4.pth" _FACEXLIB_FILES = { "detection_Resnet50_Final.pth": "https://github.com/xinntao/facexlib/releases/download/v0.1.0/detection_Resnet50_Final.pth", "parsing_parsenet.pth": "https://github.com/xinntao/facexlib/releases/download/v0.2.2/parsing_parsenet.pth", } def __init__(self, log_fn=print): self.log = log_fn self.restorer = None self.upscale = 1 self.only_center_face = False self.min_face_px = 72 self.blend_alpha = 0.72 self.pad_ratio = 0.22 @staticmethod def is_available(log_fn=None): import importlib import importlib.util if importlib.util.find_spec("gfpgan") is None: return False _ensure_torchvision_functional_tensor_alias(log_fn) try: importlib.import_module("gfpgan") return True except Exception: return False def ensure_model(self): MODELS_DIR.mkdir(exist_ok=True) if not self.MODEL_PATH.exists() or self.MODEL_PATH.stat().st_size < 100_000: self.log(" Lade GFPGANv1.4.pth (~330 MB) ...") _dl(self.MODEL_URL, self.MODEL_PATH, self.log) self._ensure_facexlib_weights() def _ensure_facexlib_weights(self): if not self.is_available(): return try: import facexlib weights_dir = Path(facexlib.__file__).resolve().parent / "weights" weights_dir.mkdir(parents=True, exist_ok=True) for filename, url in self._FACEXLIB_FILES.items(): dst = weights_dir / filename if dst.exists() and dst.stat().st_size > 100_000: continue self.log(f" Lade {filename} ...") _dl(url, dst, self.log) except Exception as e: self.log(f" Hinweis: facexlib-Modelle konnten nicht vorab geladen werden ({e}).") def load(self): if self.restorer is not None: return if not self.is_available(log_fn=self.log): raise RuntimeError("GFPGAN ist nicht installiert.") self.ensure_model() _ensure_torchvision_functional_tensor_alias(self.log) from gfpgan import GFPGANer providers = _get_providers(self.log) device = "cuda" if "CUDAExecutionProvider" in providers else "cpu" self.restorer = GFPGANer( model_path=str(self.MODEL_PATH), upscale=self.upscale, arch="clean", channel_multiplier=2, bg_upsampler=None, device=device, ) def restore(self, bgr_img): import cv2 import numpy as np if bgr_img is None: return bgr_img try: self.load() _, _, restored = self.restorer.enhance( bgr_img, has_aligned=False, only_center_face=self.only_center_face, paste_back=True, ) if restored is None: return bgr_img if restored.shape[:2] != bgr_img.shape[:2]: restored = cv2.resize(restored, (bgr_img.shape[1], bgr_img.shape[0]), interpolation=cv2.INTER_AREA) return np.clip(restored, 0, 255).astype(np.uint8) except Exception as e: self.log(f" Hinweis: GFPGAN-Restore fehlgeschlagen ({e}).") return bgr_img @staticmethod def _bbox_to_rect(bbox, img_w, img_h, pad_ratio=0.22): try: x1, y1, x2, y2 = [float(v) for v in bbox] except Exception: return None bw = max(1.0, x2 - x1) bh = max(1.0, y2 - y1) pad_x = max(8, int(round(bw * pad_ratio))) pad_y = max(8, int(round(bh * pad_ratio))) rx1 = max(0, int(round(x1)) - pad_x) ry1 = max(0, int(round(y1)) - pad_y) rx2 = min(int(img_w), int(round(x2)) + pad_x) ry2 = min(int(img_h), int(round(y2)) + pad_y) if rx2 <= rx1 + 2 or ry2 <= ry1 + 2: return None return rx1, ry1, rx2, ry2 def restore_faces(self, bgr_img, bboxes): import cv2 import numpy as np if bgr_img is None: return bgr_img if not bboxes: return bgr_img try: self.load() except Exception as e: self.log(f" Hinweis: GFPGAN nicht bereit ({e}).") return bgr_img out = bgr_img.copy() h, w = out.shape[:2] restored_any = False for bbox in bboxes: rect = self._bbox_to_rect(bbox, w, h, pad_ratio=self.pad_ratio) if rect is None: continue x1, y1, x2, y2 = rect rw = x2 - x1 rh = y2 - y1 if rw < self.min_face_px or rh < self.min_face_px: continue crop = out[y1:y2, x1:x2] if crop.size == 0: continue try: _, _, restored = self.restorer.enhance( crop, has_aligned=False, only_center_face=True, paste_back=True, ) except Exception: continue if restored is None: continue if restored.shape[:2] != crop.shape[:2]: restored = cv2.resize(restored, (crop.shape[1], crop.shape[0]), interpolation=cv2.INTER_AREA) alpha = max(0.0, min(1.0, float(self.blend_alpha))) mixed = cv2.addWeighted(restored.astype(np.float32), alpha, crop.astype(np.float32), 1.0 - alpha, 0.0) out[y1:y2, x1:x2] = np.clip(mixed, 0, 255).astype(np.uint8) restored_any = True if restored_any: return out return bgr_img class FaceLibrary: ROOT = SCRIPT_DIR / "face_library" def __init__(self, log_fn=print): self.log = log_fn self.ROOT.mkdir(exist_ok=True) self._detector = None @staticmethod def _slugify(name): import re import unicodedata raw = (name or "").strip().lower() if not raw: raw = "gesicht" raw = unicodedata.normalize("NFKD", raw) raw = raw.encode("ascii", "ignore").decode("ascii") raw = raw.replace(" ", "_") raw = re.sub(r"[^a-z0-9_]+", "", raw) raw = re.sub(r"_+", "_", raw).strip("_") return raw or "gesicht" def _ensure_detector(self): if self._detector is not None: return self._detector import cv2 cascade_path = Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml" detector = cv2.CascadeClassifier(str(cascade_path)) if detector.empty(): raise RuntimeError("Gesichtsdetektor konnte nicht geladen werden.") self._detector = detector return detector def _detect_largest_face(self, bgr_img): import cv2 det = self._ensure_detector() gray = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2GRAY) faces = det.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30)) if len(faces) == 0: return None x, y, w, h = max(faces, key=lambda f: int(f[2]) * int(f[3])) return int(x), int(y), int(w), int(h) def add(self, name, image_path): import cv2 import datetime image_path = Path(image_path) if not image_path.is_file(): raise RuntimeError(f"Datei nicht gefunden: {image_path}") src_img = _cv2_imread_unicode(image_path) if src_img is None: raise RuntimeError("Bild konnte nicht geladen werden.") h0, w0 = src_img.shape[:2] max_side = max(h0, w0) scale = min(1.0, 1920.0 / max(1.0, float(max_side))) if scale < 1.0: work = cv2.resize(src_img, (int(round(w0 * scale)), int(round(h0 * scale))), interpolation=cv2.INTER_AREA) else: work = src_img face = self._detect_largest_face(work) if face is None: raise RuntimeError("Kein Gesicht im Bild erkannt.") fx, fy, fw, fh = face inv = 1.0 / max(scale, 1e-9) fx = int(round(fx * inv)); fy = int(round(fy * inv)) fw = int(round(fw * inv)); fh = int(round(fh * inv)) pad_x = int(round(fw * 0.20)) pad_y = int(round(fh * 0.20)) x1 = max(0, fx - pad_x); y1 = max(0, fy - pad_y) x2 = min(w0, fx + fw + pad_x); y2 = min(h0, fy + fh + pad_y) crop = src_img[y1:y2, x1:x2] if crop.size == 0: raise RuntimeError("Kein gueltiger Gesichtsausschnitt gefunden.") base_slug = self._slugify(name) slug = base_slug idx = 2 while (self.ROOT / slug).exists(): slug = f"{base_slug}_{idx}" idx += 1 face_dir = self.ROOT / slug face_dir.mkdir(parents=True, exist_ok=True) source_path = face_dir / "source.jpg" thumb_path = face_dir / "thumb.png" meta_path = face_dir / "meta.json" if not _cv2_imwrite_unicode(source_path, src_img): raise RuntimeError("source.jpg konnte nicht gespeichert werden.") thumb = cv2.resize(crop, (96, 96), interpolation=cv2.INTER_AREA) if not _cv2_imwrite_unicode(thumb_path, thumb): raise RuntimeError("thumb.png konnte nicht gespeichert werden.") meta = { "slug": slug, "name": (name or "").strip() or slug, "added": datetime.date.today().isoformat(), } meta_path.write_text(json.dumps(meta, indent=2, ensure_ascii=False), encoding="utf-8") return { "slug": slug, "name": meta["name"], "source_path": str(source_path), "thumb_path": str(thumb_path), "added": meta["added"], } def remove(self, slug): slug = self._slugify(slug) target = self.ROOT / slug if target.exists(): shutil.rmtree(target, ignore_errors=True) def list_entries(self): entries = [] if not self.ROOT.exists(): return entries for d in self.ROOT.iterdir(): if not d.is_dir(): continue source_path = d / "source.jpg" meta_path = d / "meta.json" thumb_path = d / "thumb.png" if not source_path.exists(): self.log(f"WARNUNG Bibliothekseintrag ohne source.jpg uebersprungen: {d.name}") continue name = d.name added = "" if meta_path.exists(): try: meta = json.loads(meta_path.read_text(encoding="utf-8")) name = (meta.get("name") or name).strip() or name added = (meta.get("added") or "").strip() except Exception: pass entries.append({ "slug": d.name, "name": name, "source_path": str(source_path), "thumb_path": str(thumb_path), "added": added, }) entries.sort(key=lambda x: x["name"].lower()) return entries def get_source_path(self, slug): p = self.ROOT / self._slugify(slug) / "source.jpg" if not p.exists(): raise RuntimeError(f"Eintrag nicht gefunden: {slug}") return p class FaceSwapper: def __init__(self, log_fn=print): self.log = log_fn self.app = self.swapper = None self.restorer = None self.use_restoration = False self._restoration_warned_cpu = False self.enhance = True self.color = True self.high_sensitivity = True self.det_size = (640, 640) self.det_thresh = 0.5 # Video-Stabilitaet: nur konsistente Face-Tracks werden geswappt. self.video_track_single_face = True self.video_min_det_score = 0.30 self.video_start_det_score = 0.30 self.video_new_face_det_score = 0.55 self.video_min_face_size_px = 32 self.video_min_iou = 0.02 self.video_max_center_jump = 0.28 self.video_min_area_ratio = 0.20 self.video_max_area_ratio = 4.00 self.video_track_memory = 10 self.video_min_embed_sim = 0.05 self.video_track_accept_score = 0.15 # was 0.30 — too strict, rejected valid tracked faces self.video_fade_in_step = 1.0 self.video_fade_out_step = 0.0 self.video_occlusion_hold = 0 self.video_occlusion_mouth_ratio_min = 0.30 # was 0.58 — too aggressive, falsely blocked swap self.video_occlusion_texture_drop = 0.30 # was 0.55 — too sensitive to normal lighting change self.video_occluder_diff_thresh = 24 self.video_occluder_min_coverage = 0.08 self.video_occluder_max_coverage = 0.46 self.video_clean_ref_max_coverage = 0.35 # was 0.05 — too strict, clean_original never saved self.video_abs_mouth_width_min = 0.10 # was 0.45 — way too high, flagged almost all faces self.video_abs_mouth_drop_min = 0.10 # was 0.32 — too high, caused false occlusion def init_models(self): providers = _get_providers(self.log) from insightface.app import FaceAnalysis import insightface.model_zoo as mz gpu = "CUDAExecutionProvider" in providers self.log(f" {'GPU (CUDA) ' if gpu else 'CPU'}") self.log(" Lade buffalo_l ...") preferred_det_size = (1024, 1024) if gpu else (768, 768) self.det_thresh = 0.25 self.app = FaceAnalysis(name="buffalo_l", root=str(MODELS_DIR.parent), providers=providers) try: self.app.prepare(ctx_id=0 if gpu else -1, det_size=preferred_det_size, det_thresh=self.det_thresh) self.det_size = preferred_det_size except Exception as e: fallback_size = (640, 640) self.log(f" Hohe Detektions-Aufloesung fehlgeschlagen ({e}). Fallback auf {fallback_size[0]}x{fallback_size[1]}.") self.app = FaceAnalysis(name="buffalo_l", root=str(MODELS_DIR.parent), providers=providers) self.app.prepare(ctx_id=0 if gpu else -1, det_size=fallback_size, det_thresh=self.det_thresh) self.det_size = fallback_size self.log(f" Detektion: {self.det_size[0]}x{self.det_size[1]}, Schwelle {self.det_thresh:.2f}") swap_path = MODELS_DIR / "inswapper_128.onnx" if not swap_path.exists(): raise RuntimeError(f"inswapper_128.onnx fehlt in {MODELS_DIR}") self.log(" Lade inswapper ...") self.swapper = mz.get_model(str(swap_path), providers=providers) self.log(" Modelle geladen.") @staticmethod def _face_area(face): x1, y1, x2, y2 = [float(v) for v in face.bbox] return max(0.0, x2 - x1) * max(0.0, y2 - y1) def _pick_primary_face(self, faces): if not faces: return None return max(faces, key=lambda f: (self._face_area(f), float(getattr(f, "det_score", 0.0)))) @staticmethod def _norm_face_metrics(face, w, h): x1, y1, x2, y2 = [float(v) for v in face.bbox] bw = max(1.0, x2 - x1) bh = max(1.0, y2 - y1) nx1 = max(0.0, min(1.0, x1 / max(1.0, float(w)))) ny1 = max(0.0, min(1.0, y1 / max(1.0, float(h)))) nx2 = max(0.0, min(1.0, x2 / max(1.0, float(w)))) ny2 = max(0.0, min(1.0, y2 / max(1.0, float(h)))) cx = (nx1 + nx2) * 0.5 cy = (ny1 + ny2) * 0.5 area = max(1e-6, (nx2 - nx1) * (ny2 - ny1)) return { "bbox": (nx1, ny1, nx2, ny2), "cx": cx, "cy": cy, "area": area, "px_w": bw, "px_h": bh, "score": float(getattr(face, "det_score", 0.0)), } @staticmethod def _extract_embedding(face): import numpy as np emb = getattr(face, "normed_embedding", None) if emb is None: emb = getattr(face, "embedding", None) if emb is None: return None arr = np.asarray(emb, dtype=np.float32).reshape(-1) if arr.size == 0: return None norm = float(np.linalg.norm(arr)) if norm < 1e-8: return None return arr / norm @staticmethod def _extract_kps(face, w, h): import numpy as np kps = getattr(face, "kps", None) if kps is None: return None arr = np.asarray(kps, dtype=np.float32) if arr.ndim != 2 or arr.shape[1] != 2 or arr.shape[0] < 3: return None arr = arr[:5, :].copy() arr[:, 0] = np.clip(arr[:, 0] / max(1.0, float(w)), 0.0, 1.0) arr[:, 1] = np.clip(arr[:, 1] / max(1.0, float(h)), 0.0, 1.0) return arr @staticmethod def _embedding_similarity(a, b): import numpy as np if a is None or b is None: return None return float(np.clip(np.dot(a, b), -1.0, 1.0)) @staticmethod def _kps_similarity(a, b): import numpy as np if a is None or b is None: return None n = min(int(a.shape[0]), int(b.shape[0])) if n < 3: return None dist = float(np.linalg.norm(a[:n] - b[:n], axis=1).mean()) # 0.0 Distanz => 1.0 Similarity; >0.20 gilt als deutlich instabil. return max(0.0, min(1.0, 1.0 - dist / 0.20)) def _build_video_face_entry(self, face, w, h): m = self._norm_face_metrics(face, w, h) return { "face": face, "bbox": m["bbox"], "cx": m["cx"], "cy": m["cy"], "area": m["area"], "px_w": m["px_w"], "px_h": m["px_h"], "det_score": m["score"], "embedding": self._extract_embedding(face), "kps": self._extract_kps(face, w, h), } @staticmethod def _trim_track_entry(entry): return { "bbox": entry["bbox"], "cx": entry["cx"], "cy": entry["cy"], "area": entry["area"], "det_score": entry["det_score"], "embedding": entry["embedding"], "kps": entry["kps"], } @staticmethod def _kps_geometry(kps): import numpy as np if kps is None: return None arr = np.asarray(kps, dtype=np.float32) if arr.ndim != 2 or arr.shape[0] < 5 or arr.shape[1] != 2: return None eye_a, eye_b, nose, mouth_a, mouth_b = arr[:5] eye_dist = float(np.linalg.norm(eye_a - eye_b)) if eye_dist < 1e-6: return None eye_mid = (eye_a + eye_b) * 0.5 mouth_mid = (mouth_a + mouth_b) * 0.5 return { "eye_dist": eye_dist, "mouth_width": float(np.linalg.norm(mouth_a - mouth_b)) / eye_dist, "nose_drop": float(np.linalg.norm(nose - eye_mid)) / eye_dist, "mouth_drop": float(np.linalg.norm(mouth_mid - nose)) / eye_dist, "mouth_offset_y": float((mouth_mid[1] - eye_mid[1]) / eye_dist), } @staticmethod def _bbox_to_pixel_rect(bbox, w, h): x1 = max(0, min(w - 1, int(round(float(bbox[0]) * w)))) y1 = max(0, min(h - 1, int(round(float(bbox[1]) * h)))) x2 = max(0, min(w, int(round(float(bbox[2]) * w)))) y2 = max(0, min(h, int(round(float(bbox[3]) * h)))) if x2 <= x1 + 2 or y2 <= y1 + 2: return None return x1, y1, x2, y2 @staticmethod def _face_texture_ratio(gray, bbox): import cv2 h, w = gray.shape[:2] rect = FaceSwapper._bbox_to_pixel_rect(bbox, w, h) if rect is None: return None x1, y1, x2, y2 = rect fh = y2 - y1 if fh < 16: return None upper_end = y1 + int(fh * 0.42) lower_start = y1 + int(fh * 0.50) if upper_end <= y1 + 4 or lower_start >= y2 - 4: return None upper = gray[y1:upper_end, x1:x2] lower = gray[lower_start:y2, x1:x2] if upper.size == 0 or lower.size == 0: return None upper_var = float(cv2.Laplacian(upper, cv2.CV_32F).var()) lower_var = float(cv2.Laplacian(lower, cv2.CV_32F).var()) if upper_var < 1e-6: return None ratio = lower_var / upper_var return max(0.0, min(4.0, ratio)) @staticmethod def _blend_frames(swapped, original, alpha): import cv2 if alpha <= 1e-3: return original if alpha >= 1.0 - 1e-3: return swapped return cv2.addWeighted(swapped, float(alpha), original, float(1.0 - alpha), 0.0) @staticmethod def _keep_border_components(raw_mask, min_area_ratio=0.015, lateral_only=False): import cv2, numpy as np if raw_mask is None or raw_mask.size == 0: return raw_mask h, w = raw_mask.shape[:2] labels_count, labels, stats, _ = cv2.connectedComponentsWithStats((raw_mask > 0).astype(np.uint8), 8) kept = np.zeros((h, w), dtype=np.uint8) min_area = max(8, int(h * w * min_area_ratio)) border = max(3, int(min(h, w) * 0.06)) for label in range(1, labels_count): x, y, bw, bh, area = stats[label] if area < min_area: continue if lateral_only: touches_border = x <= border or x + bw >= w - border else: touches_border = ( x <= border or y <= border or x + bw >= w - border or y + bh >= h - border ) if touches_border: kept[labels == label] = 255 return kept @staticmethod def _apply_float_mask(foreground, background, mask): import numpy as np if mask is None: return foreground m = np.clip(mask.astype(np.float32), 0.0, 1.0) if m.ndim == 2: m = m[:, :, None] out = foreground.astype(np.float32) * (1.0 - m) + background.astype(np.float32) * m return np.clip(out, 0, 255).astype(np.uint8) @staticmethod def _soft_bbox_mask(shape, bbox): import cv2, numpy as np h, w = shape[:2] rect = FaceSwapper._bbox_to_pixel_rect(bbox, w, h) mask = np.zeros((h, w), dtype=np.float32) if rect is None: return mask x1, y1, x2, y2 = rect bw = x2 - x1 bh = y2 - y1 pad_x = max(2, int(bw * 0.05)) pad_y = max(2, int(bh * 0.06)) mask[y1 + pad_y:y2 - pad_y, x1 + pad_x:x2 - pad_x] = 1.0 mask = cv2.GaussianBlur(mask, (0, 0), max(3.0, min(bw, bh) * 0.045)) return np.clip(mask, 0.0, 1.0) def _occluder_mask_from_reference(self, original, bbox, track_state): import cv2, numpy as np if track_state is None or bbox is None: return None, 0.0 ref = track_state.get("clean_original") if ref is None: return None, 0.0 if getattr(ref, "shape", None) != original.shape: ref = cv2.resize(ref, (original.shape[1], original.shape[0]), interpolation=cv2.INTER_AREA) h, w = original.shape[:2] rect = self._bbox_to_pixel_rect(bbox, w, h) if rect is None: return None, 0.0 x1, y1, x2, y2 = rect cur_roi = original[y1:y2, x1:x2] ref_roi = ref[y1:y2, x1:x2] if cur_roi.size == 0 or ref_roi.size == 0: return None, 0.0 diff = cv2.absdiff(cur_roi, ref_roi) diff_gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY) _, raw = cv2.threshold(diff_gray, self.video_occluder_diff_thresh, 255, cv2.THRESH_BINARY) k = max(3, int(min(x2 - x1, y2 - y1) * 0.045) | 1) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k)) raw = cv2.morphologyEx(raw, cv2.MORPH_CLOSE, kernel, iterations=2) raw = cv2.morphologyEx(raw, cv2.MORPH_OPEN, kernel, iterations=1) raw = self._keep_border_components(raw, min_area_ratio=0.012, lateral_only=True) coverage = float((raw > 0).mean()) if coverage < self.video_occluder_min_coverage: return None, coverage soft = cv2.GaussianBlur(raw.astype(np.float32) / 255.0, (0, 0), max(2.0, k * 0.6)) mask = np.zeros((h, w), dtype=np.float32) mask[y1:y2, x1:x2] = np.clip(soft, 0.0, 1.0) return mask, coverage def _carry_forward_swap(self, original, track_state): import cv2, numpy as np if track_state is None: return original, 0 last_result = track_state.get("last_result") bbox = track_state.get("last_bbox") if last_result is None or bbox is None: return original, 0 if getattr(last_result, "shape", None) != original.shape: last_result = cv2.resize(last_result, (original.shape[1], original.shape[0]), interpolation=cv2.INTER_AREA) face_mask = self._soft_bbox_mask(original.shape, bbox) occ_mask, _ = self._occluder_mask_from_reference(original, bbox, track_state) current_mask, _ = self._current_occluder_mask(original, bbox) if current_mask is not None: occ_mask = current_mask if occ_mask is None else np.maximum(occ_mask, current_mask) if occ_mask is not None: face_mask = face_mask * (1.0 - occ_mask) return self._apply_float_mask(original, last_result, face_mask), 1 def _current_occluder_mask(self, original, bbox): import cv2, numpy as np h, w = original.shape[:2] rect = self._bbox_to_pixel_rect(bbox, w, h) if rect is None: return None, 0.0 x1, y1, x2, y2 = rect roi = original[y1:y2, x1:x2] if roi.size == 0: return None, 0.0 gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) ycrcb = cv2.cvtColor(roi, cv2.COLOR_BGR2YCrCb) skin = cv2.inRange(ycrcb, (0, 133, 77), (255, 184, 138)) dark_lace = cv2.inRange(gray, 0, 92) raw = cv2.bitwise_or(skin, dark_lace) fh, fw = gray.shape[:2] gate = np.zeros((fh, fw), dtype=np.uint8) gate[int(fh * 0.10):int(fh * 0.94), :] = 255 raw = cv2.bitwise_and(raw, gate) k = max(3, int(min(fw, fh) * 0.045) | 1) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k)) raw = cv2.morphologyEx(raw, cv2.MORPH_CLOSE, kernel, iterations=1) raw = cv2.morphologyEx(raw, cv2.MORPH_OPEN, kernel, iterations=1) raw = self._keep_border_components(raw, min_area_ratio=0.010, lateral_only=True) coverage = float((raw > 0).mean()) if coverage < self.video_occluder_min_coverage: return None, coverage soft = cv2.GaussianBlur(raw.astype(np.float32) / 255.0, (0, 0), max(2.0, k * 0.55)) mask = np.zeros((h, w), dtype=np.float32) mask[y1:y2, x1:x2] = np.clip(soft, 0.0, 1.0) return mask, coverage def _smooth_skin_occluder_mask(self, original, bbox): import cv2, numpy as np h, w = original.shape[:2] rect = self._bbox_to_pixel_rect(bbox, w, h) if rect is None: return None, 0.0 x1, y1, x2, y2 = rect roi = original[y1:y2, x1:x2] if roi.size == 0: return None, 0.0 gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) ycrcb = cv2.cvtColor(roi, cv2.COLOR_BGR2YCrCb) skin = cv2.inRange(ycrcb, (0, 133, 77), (255, 180, 135)) texture = cv2.Laplacian(gray, cv2.CV_32F) low_texture = (np.abs(texture) < 8.0).astype(np.uint8) * 255 raw = cv2.bitwise_and(skin, low_texture) fh, fw = gray.shape[:2] gate = np.zeros((fh, fw), dtype=np.uint8) gate[int(fh * 0.22):int(fh * 0.90), int(fw * 0.12):int(fw * 0.94)] = 255 raw = cv2.bitwise_and(raw, gate) k = max(5, int(min(fw, fh) * 0.055) | 1) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k)) raw = cv2.morphologyEx(raw, cv2.MORPH_CLOSE, kernel, iterations=2) raw = cv2.morphologyEx(raw, cv2.MORPH_OPEN, kernel, iterations=1) raw = self._keep_border_components(raw, min_area_ratio=0.025, lateral_only=True) coverage = float((raw > 0).mean()) if coverage < 0.12: return None, coverage if coverage > 0.42: return None, coverage soft = cv2.GaussianBlur(raw.astype(np.float32) / 255.0, (0, 0), max(2.0, k * 0.5)) mask = np.zeros((h, w), dtype=np.float32) mask[y1:y2, x1:x2] = np.clip(soft, 0.0, 1.0) return mask, coverage def _is_occluded_mouth(self, candidate, reference, gray, track_state): if candidate is None: return False occluded = False # Geometrie-Pruefung: Mund-/Nasenverhaeltnis sollte ueber Frames stabil bleiben. ref_geo = self._kps_geometry(reference.get("kps") if reference else None) cur_geo = self._kps_geometry(candidate.get("kps")) if ref_geo and cur_geo: ref_mouth = max(1e-6, ref_geo["mouth_width"]) ref_drop = max(1e-6, ref_geo["mouth_drop"]) mouth_ratio = cur_geo["mouth_width"] / ref_mouth drop_ratio = cur_geo["mouth_drop"] / ref_drop if mouth_ratio < self.video_occlusion_mouth_ratio_min or drop_ratio < self.video_occlusion_mouth_ratio_min: occluded = True if cur_geo: if (cur_geo["mouth_width"] < self.video_abs_mouth_width_min or cur_geo["mouth_drop"] < self.video_abs_mouth_drop_min): occluded = True # Textur-Pruefung: Bei Hand vor Mund bricht Detail im unteren Gesichtsbereich stark ein. texture_ratio = self._face_texture_ratio(gray, candidate["bbox"]) if texture_ratio is not None: tex_ref = track_state.get("texture_ref") if tex_ref is not None and texture_ratio < tex_ref * self.video_occlusion_texture_drop: occluded = True if not occluded: if tex_ref is None: track_state["texture_ref"] = texture_ratio else: track_state["texture_ref"] = 0.90 * float(tex_ref) + 0.10 * texture_ratio return occluded @staticmethod def _bbox_iou(a, b): ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b ix1 = max(ax1, bx1) iy1 = max(ay1, by1) ix2 = min(ax2, bx2) iy2 = min(ay2, by2) iw = max(0.0, ix2 - ix1) ih = max(0.0, iy2 - iy1) inter = iw * ih if inter <= 0: return 0.0 a_area = max(1e-9, (ax2 - ax1) * (ay2 - ay1)) b_area = max(1e-9, (bx2 - bx1) * (by2 - by1)) return inter / max(1e-9, a_area + b_area - inter) def _filter_video_faces(self, faces, frame_shape, track_state): h, w = frame_shape[:2] active = track_state.get("active") track_state["candidate"] = None candidates = [] # When a face is already being tracked we accept lower-confidence detections. # A hand partially covering the tracked face drops the detector score, but # the face is still there and should be swapped. effective_min_score = (self.video_min_det_score * 0.5 if active is not None else self.video_min_det_score) for face in faces: entry = self._build_video_face_entry(face, w, h) if entry["px_w"] < self.video_min_face_size_px or entry["px_h"] < self.video_min_face_size_px: continue if entry["det_score"] < effective_min_score: continue candidates.append(entry) if not candidates: track_state["miss"] = int(track_state.get("miss", 0)) + 1 if track_state["miss"] >= self.video_track_memory: track_state["active"] = None track_state["texture_ref"] = None track_state["hold"] = 0 return [] candidates.sort(key=lambda c: (c["area"], c["det_score"]), reverse=True) if active is None: seed = None for c in candidates: if c["det_score"] >= self.video_start_det_score: seed = c break if seed is None: track_state["miss"] = int(track_state.get("miss", 0)) + 1 return [] track_state["candidate"] = seed track_state["miss"] = 0 return [seed["face"]] best = None best_score = -1.0 # When a face is already tracked, allow a smaller detected area (hand can # shrink the visible face region by up to 70 % without losing the track). effective_min_area = (self.video_min_area_ratio * 0.30 if active is not None else self.video_min_area_ratio) for c in candidates: iou = self._bbox_iou(c["bbox"], active["bbox"]) dx = c["cx"] - active["cx"] dy = c["cy"] - active["cy"] center_shift = (dx * dx + dy * dy) ** 0.5 area_ratio = c["area"] / max(1e-9, active["area"]) if not (effective_min_area <= area_ratio <= self.video_max_area_ratio): continue if iou < self.video_min_iou and center_shift > self.video_max_center_jump: continue emb_sim = self._embedding_similarity(c["embedding"], active["embedding"]) if emb_sim is not None and emb_sim < self.video_min_embed_sim: continue kps_sim = self._kps_similarity(c["kps"], active["kps"]) iou_score = max(0.0, min(1.0, iou / 0.35)) motion_score = max(0.0, min(1.0, 1.0 - center_shift / max(self.video_max_center_jump, 1e-6))) det_score = max(0.0, min(1.0, c["det_score"])) emb_score = 0.5 if emb_sim is None else max(0.0, min(1.0, (emb_sim + 1.0) * 0.5)) kps_score = 0.5 if kps_sim is None else kps_sim track_score = (0.30 * iou_score + 0.24 * motion_score + 0.16 * det_score + 0.20 * emb_score + 0.10 * kps_score) if emb_sim is None and c["det_score"] < self.video_new_face_det_score: track_score *= 0.86 if track_score > best_score: best_score = track_score best = c if best is None or best_score < self.video_track_accept_score: track_state["miss"] = int(track_state.get("miss", 0)) + 1 if track_state["miss"] >= self.video_track_memory: track_state["active"] = None track_state["texture_ref"] = None track_state["hold"] = 0 return [] track_state["candidate"] = best track_state["miss"] = 0 return [best["face"]] def _detect_faces(self, frame): import cv2 faces = self.app.get(frame) if faces or not self.high_sensitivity: return faces, frame, 1.0 h, w = frame.shape[:2] short_edge = min(h, w) if short_edge < 720: scale = 2.0 elif short_edge < 1080: scale = 1.5 else: scale = 1.25 up_w = max(2, int(round(w * scale))) up_h = max(2, int(round(h * scale))) max_side = max(up_w, up_h) if max_side > 1920: clamp = 1920.0 / max_side up_w = max(2, int(round(up_w * clamp))) up_h = max(2, int(round(up_h * clamp))) scale *= clamp upscaled = None if scale > 1.01: upscaled = cv2.resize(frame, (up_w, up_h), interpolation=cv2.INTER_CUBIC) faces = self.app.get(upscaled) if faces: return faces, upscaled, scale # Low-threshold fallback: faces partially covered by a hand have reduced # detector confidence. Drop the threshold temporarily to find them. det_model = getattr(self.app, 'det_model', None) if det_model is None: # Older insightface builds store the model in self.app.models dict for _m in getattr(self.app, 'models', {}).values(): if hasattr(_m, 'det_thresh'): det_model = _m break if det_model is not None and hasattr(det_model, 'det_thresh'): orig_thresh = det_model.det_thresh try: det_model.det_thresh = 0.10 # much lower so partially-occluded faces are found faces = self.app.get(frame) if not faces and upscaled is not None: faces_up = self.app.get(upscaled) if faces_up: return faces_up, upscaled, scale finally: det_model.det_thresh = orig_thresh # always restore if faces: return faces, frame, 1.0 if upscaled is not None: return [], upscaled, scale return [], frame, 1.0 def _swap_faces_in_frame(self, frame, src_face, track_state=None): import cv2, numpy as np def _note_drop(work_img): if track_state is None: return work_img, 0 track_state["locked_frames"] = 0 track_state["candidate"] = None track_state["alpha"] = max(0.0, float(track_state.get("alpha", 1.0)) - self.video_fade_out_step) return self._carry_forward_swap(work_img, track_state) faces, work_img, scale = self._detect_faces(frame) if not faces: if track_state is not None: track_state["miss"] = int(track_state.get("miss", 0)) + 1 if track_state["miss"] >= self.video_track_memory: track_state["active"] = None track_state["texture_ref"] = None track_state["hold"] = 0 result, face_count = _note_drop(work_img) if scale != 1.0: result = cv2.resize(result, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_AREA) return result, face_count return frame, 0 if track_state is not None: faces = self._filter_video_faces(faces, work_img.shape, track_state) if not faces: result, face_count = _note_drop(work_img) if scale != 1.0: result = cv2.resize(result, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_AREA) return result, face_count candidate = track_state.get("candidate") if track_state is not None else None prev_active = track_state.get("active") if track_state is not None else None gray = cv2.cvtColor(work_img, cv2.COLOR_BGR2GRAY) if track_state is not None else None mouth_occluded = False if track_state is not None: hold = int(track_state.get("hold", 0)) if hold > 0: track_state["hold"] = max(0, hold - 1) mouth_occluded = True if self._is_occluded_mouth(candidate, prev_active, gray, track_state): track_state["hold"] = self.video_occlusion_hold mouth_occluded = True if not mouth_occluded: track_state["hold"] = 0 if candidate is not None: track_state["active"] = self._trim_track_entry(candidate) track_state["locked_frames"] = int(track_state.get("locked_frames", 0)) + 1 alpha_prev = float(track_state.get("alpha", 0.0)) track_state["alpha"] = min(1.0, alpha_prev + self.video_fade_in_step) original = work_img.copy() result = work_img.copy() swapped_bboxes = [] for face in faces: result = self.swapper.get(result, face, src_face, paste_back=True) swapped_bboxes.append(face.bbox) if self.color: result = _match_face_color(result, original, face.bbox) if self.enhance: result = _enhance_face_region(result, face.bbox, sharpen=True) if self.use_restoration and self.restorer is not None: providers = _get_providers() if not self._restoration_warned_cpu and "CUDAExecutionProvider" not in providers: self.log(" Hinweis: GFPGAN im CPU-Modus ist langsam (ca. 1-3 s pro Frame moeglich).") self._restoration_warned_cpu = True result = self.restorer.restore_faces(result, swapped_bboxes) if track_state is not None: result = self._blend_frames(result, original, float(track_state.get("alpha", 1.0))) active = track_state.get("active") bbox = active.get("bbox") if active else (candidate.get("bbox") if candidate else None) occ_mask, coverage = self._occluder_mask_from_reference(original, bbox, track_state) # Only run current_occluder_mask when we have a clean reference — otherwise the face's # own skin colour triggers false occlusion detection on every frame. has_clean_ref = track_state.get("clean_original") is not None current_mask, current_coverage = (self._current_occluder_mask(original, bbox) if bbox is not None and has_clean_ref else (None, 0.0)) if current_mask is not None: occ_mask = current_mask if occ_mask is None else np.maximum(occ_mask, current_mask) coverage = max(coverage, current_coverage) if mouth_occluded and bbox is not None and has_clean_ref: smooth_mask, smooth_coverage = self._smooth_skin_occluder_mask(original, bbox) if smooth_mask is not None: occ_mask = smooth_mask if occ_mask is None else np.maximum(occ_mask, smooth_mask) coverage = max(coverage, smooth_coverage) if occ_mask is not None: result = self._apply_float_mask(result, original, occ_mask) if candidate is not None: track_state["last_bbox"] = candidate["bbox"] track_state["last_result"] = result.copy() track_state["last_original"] = original.copy() if bbox is not None and not mouth_occluded and coverage <= self.video_clean_ref_max_coverage: track_state["clean_original"] = original.copy() if scale != 1.0: result = cv2.resize(result, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_AREA) return result, len(faces) def get_first_face(self, img): faces, _, _ = self._detect_faces(img) return self._pick_primary_face(faces) def swap_image(self, src_face, target, out): original = _cv2_imread_unicode(target) if original is None: return False result, face_count = self._swap_faces_in_frame(original, src_face) if face_count == 0: return False return _cv2_imwrite_unicode(out, result) def swap_video(self, src_face, target_video, out_video, progress_cb=None, cancel_check=None): import cv2, subprocess, shutil, tempfile cap, cap_tmp_copy = _open_videocapture_unicode(target_video, log_fn=self.log) if not cap.isOpened(): raise RuntimeError(f"Video konnte nicht geoeffnet werden: {target_video}") fps = cap.get(cv2.CAP_PROP_FPS) or 25 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fd_tmp, tmp_name = tempfile.mkstemp(prefix="faceswap_noaudio_", suffix=".mp4", dir=str(SCRIPT_DIR)) os.close(fd_tmp) tmp_video = Path(tmp_name) fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(str(tmp_video), fourcc, fps, (width, height)) if not writer.isOpened(): cap.release() _cleanup_temp_file(cap_tmp_copy) _cleanup_temp_file(tmp_video) raise RuntimeError(f"Video-Writer konnte nicht geoeffnet werden: {out_video}") done = 0; swapped_frames = 0; swapped_faces = 0 import numpy as np try: while True: ret, frame = cap.read() if not ret: break if cancel_check and cancel_check(): break # Detect faces directly — no tracking, no occlusion logic faces = self.app.get(frame) if done == 0: # Log first-frame diagnostics once src_emb = getattr(src_face, 'normed_embedding', None) self.log(f" [Info] Frame 1: {len(faces)} Gesicht(er) erkannt | " f"src_embedding={'OK' if src_emb is not None else 'FEHLT'} | " f"Frame {width}x{height}") result = frame.copy() for face in faces: before = result.copy() result = self.swapper.get(result, face, src_face, paste_back=True) diff = float(np.abs(result.astype(np.float32) - before.astype(np.float32)).mean()) if done == 0: tgt_emb = getattr(face, 'normed_embedding', None) self.log(f" [Info] Swap-Differenz Frame 1: {diff:.4f} | " f"tgt_embedding={'OK' if tgt_emb is not None else 'FEHLT'}") if diff > 0 and self.color: result = _match_face_color(result, frame, face.bbox) if diff > 0 and self.enhance: result = _enhance_face_region(result, face.bbox, sharpen=True) if face is not None: swapped_faces += 1 writer.write(result) if len(faces) > 0: swapped_frames += 1 done += 1 if progress_cb: progress_cb(done, total) finally: cap.release() writer.release() _cleanup_temp_file(cap_tmp_copy) ffmpeg = shutil.which("ffmpeg") if ffmpeg: try: cmd = [ffmpeg, "-y", "-i", str(tmp_video), "-i", str(target_video), "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0?", "-shortest", str(out_video)] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) _cleanup_temp_file(tmp_video) except Exception: shutil.move(str(tmp_video), str(out_video)) else: shutil.move(str(tmp_video), str(out_video)) return {"frames_processed": done, "frames_total": total, "frames_swapped": swapped_frames, "faces_swapped": swapped_faces} def swap_webcam( self, src_face, camera_index=0, record_path=None, fps_target=25.0, cancel_check=None, frame_cb=None, stats_cb=None, use_tracking=False, ): import cv2 import time import tempfile cap = cv2.VideoCapture(int(camera_index)) if not cap.isOpened(): raise RuntimeError(f"Kamera konnte nicht geoeffnet werden (Index {camera_index}).") width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) src_fps = cap.get(cv2.CAP_PROP_FPS) or float(fps_target or 25.0) writer = None record_tmp = None if record_path: out_path = Path(record_path) out_path.parent.mkdir(parents=True, exist_ok=True) fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(str(out_path), fourcc, src_fps, (width, height)) if not writer.isOpened(): try: fd_tmp, tmp_name = tempfile.mkstemp(prefix="faceswap_webcam_", suffix=".mp4", dir=str(SCRIPT_DIR)) os.close(fd_tmp) record_tmp = Path(tmp_name) writer = cv2.VideoWriter(str(record_tmp), fourcc, src_fps, (width, height)) except Exception: writer = None if writer is None or not writer.isOpened(): writer = None _cleanup_temp_file(record_tmp) record_tmp = None self.log(f"WARNUNG: Aufnahme konnte nicht gestartet werden: {record_path}") else: self.log(" Hinweis: Unicode-Ausgabe-Fallback aktiv (temp Datei).") frame_count = 0 track_state = {} if use_tracking else None fps_smooth = 0.0 try: while True: if cancel_check and cancel_check(): break started = time.perf_counter() ret, frame = cap.read() if not ret: break result, face_count = self._swap_faces_in_frame(frame, src_face, track_state=track_state) if frame_cb: frame_cb(result) if writer is not None: writer.write(result) frame_count += 1 elapsed = max(1e-6, time.perf_counter() - started) inst_fps = 1.0 / elapsed fps_smooth = inst_fps if fps_smooth <= 0 else (0.90 * fps_smooth + 0.10 * inst_fps) if stats_cb and frame_count % 30 == 0: stats_cb(float(fps_smooth), int(face_count)) if fps_target and fps_target > 0: wait_s = (1.0 / float(fps_target)) - elapsed if wait_s > 0: time.sleep(min(wait_s, 0.02)) finally: cap.release() if writer is not None: writer.release() if record_tmp is not None and record_path: try: shutil.move(str(record_tmp), str(record_path)) except Exception as e: self.log(f"WARNUNG: Temp-Aufnahme konnte nicht verschoben werden ({e})") finally: _cleanup_temp_file(record_tmp) class VoiceCloner: XTTS_MODEL = "tts_models/multilingual/multi-dataset/xtts_v2" VC_MODEL = "voice_conversion_models/multilingual/vctk/freevc24" SUPPORTED_LANGS = ( "en", "es", "fr", "de", "it", "pt", "pl", "tr", "ru", "nl", "cs", "ar", "zh-cn", "ja", "hu", "ko", "hi" ) SUPPORTED_AUDIO = {".wav", ".mp3", ".m4a", ".flac", ".ogg", ".aac", ".wma"} def __init__(self, log_fn=print): self.log = log_fn self.device = "cpu" self.tts = None self.vc = None def _load_runtime(self): try: import torch from TTS.api import TTS except Exception as e: raise RuntimeError( "Voice-Cloning ist nicht installiert.\n\n" "Bitte installiere zuerst:\n" " python -m pip install coqui-tts torch torchaudio\n\n" f"Details: {e}" ) from e self.device = "cuda" if torch.cuda.is_available() else "cpu" return TTS def _ensure_xtts(self): if self.tts is not None: return TTS = self._load_runtime() self.log(f"VOICE: Lade XTTS v2 ({self.device}) ...") self.tts = TTS(self.XTTS_MODEL).to(self.device) self.log("VOICE: XTTS bereit.") def _ensure_vc(self): if self.vc is not None: return TTS = self._load_runtime() self.log(f"VOICE: Lade FreeVC ({self.device}) ...") self.vc = TTS(self.VC_MODEL).to(self.device) self.log("VOICE: FreeVC bereit.") @staticmethod def _check_audio(path, label): p = Path(path) if not p.is_file(): raise RuntimeError(f"{label} nicht gefunden:\n{path}") if p.suffix.lower() not in VoiceCloner.SUPPORTED_AUDIO: raise RuntimeError( f"{label} hat ein nicht unterstuetztes Format: {p.suffix}\n" f"Erlaubt: {', '.join(sorted(VoiceCloner.SUPPORTED_AUDIO))}" ) return str(p) def clone_from_text(self, speaker_wav, text, language, out_file): if not text.strip(): raise RuntimeError("Bitte Text eingeben.") language = (language or "de").strip().lower() if language not in self.SUPPORTED_LANGS: raise RuntimeError(f"Sprache '{language}' nicht unterstuetzt. Nutze z.B.: {', '.join(self.SUPPORTED_LANGS)}") speaker_wav = self._check_audio(speaker_wav, "Referenz-Stimme") self._ensure_xtts() out_path = Path(out_file) out_path.parent.mkdir(parents=True, exist_ok=True) self.tts.tts_to_file(text=text, speaker_wav=speaker_wav, language=language, file_path=str(out_path)) return str(out_path) def clone_from_audio(self, speaker_wav, source_wav, out_file): speaker_wav = self._check_audio(speaker_wav, "Referenz-Stimme") source_wav = self._check_audio(source_wav, "Eingabe-Audio") self._ensure_vc() import tempfile, math try: import soundfile as sf import numpy as np except ImportError: raise RuntimeError( "Bitte installiere soundfile:\n" " python -m pip install soundfile" ) out_path = Path(out_file) out_path.parent.mkdir(parents=True, exist_ok=True) # Audiodatei laden und in Segmente aufteilen CHUNK_SEC = 30 # Segmentlänge in Sekunden (bei RAM-Problemen kleiner wählen, z.B. 20) data, sr = sf.read(source_wav, always_2d=False) chunk_samples = int(CHUNK_SEC * sr) total_samples = len(data) num_chunks = math.ceil(total_samples / chunk_samples) if num_chunks <= 1: # Kurze Datei: direkt verarbeiten self.vc.voice_conversion_to_file( source_wav=source_wav, target_wav=speaker_wav, file_path=str(out_path) ) return str(out_path) self.log(f"VOICE: Datei zu lang — teile in {num_chunks} Segmente à {CHUNK_SEC}s ...") results = [] with tempfile.TemporaryDirectory() as tmpdir: for i in range(num_chunks): start = i * chunk_samples end = min(start + chunk_samples, total_samples) chunk = data[start:end] chunk_in = Path(tmpdir) / f"chunk_{i:04d}_in.wav" chunk_out = Path(tmpdir) / f"chunk_{i:04d}_out.wav" sf.write(str(chunk_in), chunk, sr) self.log(f"VOICE: Segment {i+1}/{num_chunks} ...") self.vc.voice_conversion_to_file( source_wav=str(chunk_in), target_wav=speaker_wav, file_path=str(chunk_out) ) out_data, out_sr = sf.read(str(chunk_out)) results.append((out_data, out_sr)) # Segmente zusammenführen self.log("VOICE: Füge Segmente zusammen ...") target_sr = results[0][1] merged = np.concatenate( [r if sr == target_sr else r # ggf. Resampling hier einfügen for r, sr in results], axis=0 ) sf.write(str(out_path), merged, target_sr) return str(out_path) class MainApp: SUPPORTED = {".jpg", ".jpeg", ".png", ".bmp", ".webp"} VIDEO_SUPPORTED = {".mp4", ".avi", ".mov", ".mkv", ".wmv", ".webm", ".flv"} # Alle Felder die gespeichert werden _CONFIG_KEYS = [ "source", "input_dir", "output_dir", "video_input_dir", "video_output_dir", "voice_ref", "voice_source_audio", "voice_output", "voice_language", "webcam_index", "webcam_output", "last_library_face", "restoration", "webcam_record" ] def __init__(self): import tkinter as tk from tkinter import ttk, filedialog, messagebox self.tk, self.ttk, self.fd, self.mb = tk, ttk, filedialog, messagebox providers = _get_providers() self.gpu = "CUDAExecutionProvider" in providers self.swapper = FaceSwapper(log_fn=self._log) self.voice = VoiceCloner(log_fn=self._log) self.library = FaceLibrary(log_fn=self._log) self.restorer = FaceRestorer(log_fn=self._log) self.swapper.restorer = self.restorer self._selected_library_slug = None self._library_images = {} self._webcam_thread = None self._webcam_running = False self._webcam_cancel = None self._webcam_last_faces = 0 self._cfg = _load_config() # Gespeicherte Konfiguration laden self._root_real = tk.Tk() self._root_real.title("FaceSwap Batch Tool") self._root_real.geometry("760x700") self._root_real.resizable(True, True) self._root_real.configure(bg="#0d0d12") self._root_real.protocol("WM_DELETE_WINDOW", self._on_close) canvas = tk.Canvas(self._root_real, bg="#0d0d12", highlightthickness=0) scrollbar = tk.Scrollbar(self._root_real, orient="vertical", command=canvas.yview) canvas.configure(yscrollcommand=scrollbar.set) scrollbar.pack(side="right", fill="y") canvas.pack(side="left", fill="both", expand=True) self._scroll_frame = tk.Frame(canvas, bg="#0d0d12") self._scroll_window = canvas.create_window((0, 0), window=self._scroll_frame, anchor="nw") def _on_resize(event): canvas.itemconfig(self._scroll_window, width=event.width) canvas.bind("", _on_resize) def _on_frame_resize(event): canvas.configure(scrollregion=canvas.bbox("all")) self._scroll_frame.bind("", _on_frame_resize) def _on_mousewheel(event): canvas.yview_scroll(int(-1 * (event.delta / 120)), "units") canvas.bind_all("", _on_mousewheel) self.root = self._scroll_frame self._build() self._root_real.mainloop() def _build(self): tk, ttk = self.tk, self.ttk tk.Label(self.root, text="FaceSwap Batch", font=("Courier New", 22, "bold"), bg="#0d0d12", fg="#e8d5b7").pack(pady=(20, 3)) tk.Label(self.root, text="Ersetze Gesichter + klone Stimmen lokal", font=("Courier New", 9), bg="#0d0d12", fg="#7a7a9a").pack() bc = "#142814" if self.gpu else "#281414" bt = "GPU-Modus | CUDA aktiv" if self.gpu else "CPU-Modus" bf = "#5aff5a" if self.gpu else "#ff7a5a" tk.Label(self.root, text=bt, font=("Courier New", 9, "bold"), bg=bc, fg=bf, padx=14, pady=5).pack(pady=(8, 0)) # Variablen anlegen und gespeicherte Werte laden # These must be created BEFORE trace_add is set on _vars, because # _save_now() references them and trace callbacks may fire during setup. self._var_voice_mode = tk.StringVar(value=self._cfg.get("voice_mode", "text")) self._var_enhance = tk.BooleanVar(value=self._cfg.get("enhance", True)) self._var_color = tk.BooleanVar(value=self._cfg.get("color", True)) self._var_restoration = tk.BooleanVar(value=bool(self._cfg.get("restoration", False))) self._var_webcam_record = tk.BooleanVar(value=bool(self._cfg.get("webcam_record", False))) self._vars = {} for k in self._CONFIG_KEYS: v = tk.StringVar(value=self._cfg.get(k, "")) v.trace_add("write", lambda *_, key=k: self._on_var_change(key)) self._vars[k] = v if self._vars["restoration"].get(): self._var_restoration.set(str(self._vars["restoration"].get()).strip().lower() in ("1", "true", "yes", "on")) else: self._vars["restoration"].set("1" if self._var_restoration.get() else "0") if self._vars["webcam_record"].get(): self._var_webcam_record.set(str(self._vars["webcam_record"].get()).strip().lower() in ("1", "true", "yes", "on")) else: self._vars["webcam_record"].set("1" if self._var_webcam_record.get() else "0") if not self._vars["webcam_index"].get(): self._vars["webcam_index"].set("0") if not self._vars["webcam_output"].get(): self._vars["webcam_output"].set(str(SCRIPT_DIR / "webcam_recording.mp4")) if not self._vars["voice_language"].get(): self._vars["voice_language"].set("de") self._var_voice_mode.trace_add("write", lambda *_: self._save_now()) self._var_enhance.trace_add("write", lambda *_: self._save_now()) self._var_color.trace_add("write", lambda *_: self._save_now()) self._var_restoration.trace_add("write", lambda *_: self._save_now()) self._var_webcam_record.trace_add("write", lambda *_: self._save_now()) self._section("1 QUELLBILD - Gesicht, das eingefuegt wird") self._row("source", self._pick_source) self._build_library_panel() self._prev_lbl = tk.Label(self.root, bg="#0d0d12") self._prev_lbl.pack() # Vorschaubild laden falls Quellbild gespeichert if self._vars["source"].get(): self._load_preview(self._vars["source"].get()) self._refresh_library_grid() style = ttk.Style(self.root) style.theme_use("default") style.configure("TNotebook", background="#0d0d12", borderwidth=0) style.configure("TNotebook.Tab", background="#1a1a2c", foreground="#8a8aff", font=("Courier New", 9, "bold"), padding=(14, 5)) style.map("TNotebook.Tab", background=[("selected", "#0d0d12")], foreground=[("selected", "#e8d5b7")]) nb = ttk.Notebook(self.root) nb.pack(fill="x", padx=20, pady=(10, 0)) # Tab 1: Bilder img_tab = tk.Frame(nb, bg="#0d0d12") nb.add(img_tab, text="Bilder (Batch)") tk.Label(img_tab, text="2 EINGABE-ORDNER - Bilder, die bearbeitet werden", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(10, 2)) self._row_in(img_tab, "input_dir", self._pick_indir) tk.Label(img_tab, text="3 AUSGABE-ORDNER - Zielort fuer fertige Bilder", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(8, 2)) self._row_in(img_tab, "output_dir", self._pick_outdir) # Tab 2: Video vid_tab = tk.Frame(nb, bg="#0d0d12") nb.add(vid_tab, text="Video (Batch)") tk.Label(vid_tab, text="2 EINGABE-ORDNER - Videos, die bearbeitet werden", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(10, 2)) self._row_in(vid_tab, "video_input_dir", self._pick_video_indir) tk.Label(vid_tab, text="3 AUSGABE-ORDNER - Zielort fuer fertige Videos (mp4)", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(8, 2)) self._row_in(vid_tab, "video_output_dir", self._pick_video_outdir) ffmpeg_note = "ffmpeg gefunden - Audio wird beibehalten" if shutil.which("ffmpeg") \ else "Achtung: ffmpeg nicht gefunden - kein Audio im Ausgabevideo" ffmpeg_col = "#5aff5a" if shutil.which("ffmpeg") else "#ffaa44" tk.Label(vid_tab, text=ffmpeg_note, font=("Courier New", 8), bg="#0d0d12", fg=ffmpeg_col).pack(anchor="w", padx=4, pady=(4, 0)) # Tab 3: Voice Cloning voice_tab = tk.Frame(nb, bg="#0d0d12") nb.add(voice_tab, text="Stimme klonen") tk.Label(voice_tab, text="2 REFERENZ-STIMME - Audio mit Zielstimme", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(10, 2)) self._row_in(voice_tab, "voice_ref", self._pick_voice_ref) tk.Label(voice_tab, text="3 MODUS", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(8, 2)) mode_row = tk.Frame(voice_tab, bg="#0d0d12") mode_row.pack(fill="x") tk.Radiobutton(mode_row, text="Text -> Stimme", value="text", variable=self._var_voice_mode, font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a", selectcolor="#0d0d12", activebackground="#0d0d12").pack(side="left", padx=(0, 16)) tk.Radiobutton(mode_row, text="Audio -> Stimme", value="audio", variable=self._var_voice_mode, font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a", selectcolor="#0d0d12", activebackground="#0d0d12").pack(side="left") tk.Label(voice_tab, text="4 TEXT (nur fuer Text-Modus)", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(8, 2)) self._voice_text = tk.Text(voice_tab, height=4, bg="#14141e", fg="#d8d8f0", font=("Courier New", 9), relief="flat", insertbackground="white") self._voice_text.pack(fill="x") # Gespeicherten Text wiederherstellen saved_text = self._cfg.get("voice_text", "") if saved_text: self._voice_text.insert("1.0", saved_text) self._voice_text.bind("", lambda e: self._save_now()) tk.Label(voice_tab, text="5 EINGABE-AUDIO (nur fuer Audio-Modus)", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(8, 2)) self._row_in(voice_tab, "voice_source_audio", self._pick_voice_source_audio) tk.Label(voice_tab, text="6 SPRACHE (Text-Modus, z.B. de/en/fr)", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(8, 2)) lang_row = tk.Frame(voice_tab, bg="#14141e") lang_row.pack(fill="x", pady=2) self._voice_lang = ttk.Combobox( lang_row, textvariable=self._vars["voice_language"], values=list(VoiceCloner.SUPPORTED_LANGS), state="readonly", font=("Courier New", 9) ) self._voice_lang.pack(side="left", padx=8, pady=6) tk.Label(voice_tab, text="7 AUSGABE-AUDIO (.wav)", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", pady=(8, 2)) self._row_in(voice_tab, "voice_output", self._pick_voice_out) tk.Label(voice_tab, text="Hinweis: Beim ersten Lauf werden Sprachmodelle automatisch geladen.", font=("Courier New", 8), bg="#0d0d12", fg="#7a7a9a").pack(anchor="w", padx=4, pady=(4, 0)) self._build_webcam_tab(nb) self._nb = nb # Qualitaets-Optionen qf = tk.Frame(self.root, bg="#0d0d12") qf.pack(fill="x", padx=20, pady=(10, 0)) tk.Label(qf, text="4 QUALITAETS-OPTIONEN", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a").pack(side="left") tk.Checkbutton(qf, text="Schaerfen", variable=self._var_enhance, font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a", selectcolor="#0d0d12", activebackground="#0d0d12", command=self._update_quality).pack(side="left", padx=(20, 0)) tk.Checkbutton(qf, text="Farbanpassung", variable=self._var_color, font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a", selectcolor="#0d0d12", activebackground="#0d0d12", command=self._update_quality).pack(side="left", padx=(12, 0)) self._chk_restoration = tk.Checkbutton( qf, text="Gesichtswiederherstellung (GFPGAN)", variable=self._var_restoration, font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a", selectcolor="#0d0d12", activebackground="#0d0d12", command=self._update_quality ) self._chk_restoration.pack(side="left", padx=(12, 0)) if not self.restorer.is_available(): self._chk_restoration.configure(state="disabled", text="GFPGAN nicht installiert") self._var_restoration.set(False) style.configure("G.Horizontal.TProgressbar", troughcolor="#101020", background="#3adf6a", thickness=14) self._pb = ttk.Progressbar(self.root, length=700, mode="determinate", style="G.Horizontal.TProgressbar") self._pb.pack(padx=20, pady=(14, 4)) self._sv = self.tk.StringVar(value="Bereit.") self.tk.Label(self.root, textvariable=self._sv, font=("Courier New", 9), bg="#0d0d12", fg="#5a8a6a").pack() lf = tk.Frame(self.root, bg="#0d0d12") lf.pack(fill="both", expand=True, padx=20, pady=(8, 0)) self._lb = tk.Text(lf, height=7, bg="#060610", fg="#8aff8a", font=("Courier New", 9), relief="flat", insertbackground="#8aff8a") sb = tk.Scrollbar(lf, command=self._lb.yview) self._lb.configure(yscrollcommand=sb.set) self._lb.pack(side="left", fill="both", expand=True) sb.pack(side="right", fill="y") bf2 = tk.Frame(self.root, bg="#0d0d12") bf2.pack(pady=14) self._btn = tk.Button(bf2, text="STARTEN", font=("Courier New", 13, "bold"), bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a", relief="flat", cursor="hand2", padx=22, command=self._start) self._btn.pack(side="left", padx=8) tk.Button(bf2, text="Setup wiederholen", font=("Courier New", 9), bg="#1a1a2c", fg="#8a8aff", activebackground="#252540", relief="flat", cursor="hand2", command=self._redo_setup).pack(side="left", padx=8) self._update_quality() def _on_var_change(self, key): """Wird aufgerufen wenn sich ein Pfad-Feld aendert -> sofort speichern.""" self._save_now() def _save_now(self): """Aktuelle Einstellungen in config.json speichern.""" if not hasattr(self, "_vars"): return if "restoration" not in self._vars or "webcam_record" not in self._vars: return restoration_val = "1" if self._var_restoration.get() else "0" webcam_record_val = "1" if self._var_webcam_record.get() else "0" if self._vars["restoration"].get() != restoration_val: self._vars["restoration"].set(restoration_val) if self._vars["webcam_record"].get() != webcam_record_val: self._vars["webcam_record"].set(webcam_record_val) data = {k: self._vars[k].get() for k in self._CONFIG_KEYS} data["voice_mode"] = self._var_voice_mode.get() data["enhance"] = self._var_enhance.get() data["color"] = self._var_color.get() data["restoration"] = self._var_restoration.get() data["webcam_record"] = self._var_webcam_record.get() try: data["voice_text"] = self._voice_text.get("1.0", "end-1c") except Exception: pass _save_config(data) def _load_preview(self, path): try: from PIL import Image, ImageTk img = Image.open(path).convert("RGB") img.thumbnail((110, 110)) self._tkimg = ImageTk.PhotoImage(img) self._prev_lbl.configure(image=self._tkimg) except Exception: pass def _section(self, txt): self.tk.Label(self.root, text=txt, font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", padx=20, pady=(12, 2)) def _row(self, key, cmd): f = self.tk.Frame(self.root, bg="#14141e") f.pack(fill="x", padx=20, pady=2) self.tk.Entry(f, textvariable=self._vars[key], font=("Courier New", 9), bg="#14141e", fg="#d8d8f0", relief="flat", insertbackground="white").pack(side="left", padx=8, pady=6, fill="x", expand=True) self.tk.Button(f, text="...", bg="#22223c", fg="#d0d0e0", relief="flat", cursor="hand2", command=cmd).pack(side="right", padx=4) def _row_in(self, parent, key, cmd): f = self.tk.Frame(parent, bg="#14141e") f.pack(fill="x", pady=2) self.tk.Entry(f, textvariable=self._vars[key], font=("Courier New", 9), bg="#14141e", fg="#d8d8f0", relief="flat", insertbackground="white").pack(side="left", padx=8, pady=6, fill="x", expand=True) self.tk.Button(f, text="...", bg="#22223c", fg="#d0d0e0", relief="flat", cursor="hand2", command=cmd).pack(side="right", padx=4) def _build_library_panel(self): tk = self.tk wrap = tk.Frame(self.root, bg="#0d0d12") wrap.pack(fill="x", padx=20, pady=(6, 0)) self._library_open = False self._btn_library_toggle = tk.Button( wrap, text="Bibliothek oeffnen", font=("Courier New", 9), bg="#1a1a2c", fg="#8a8aff", activebackground="#252540", relief="flat", cursor="hand2", command=self._toggle_library_panel ) self._btn_library_toggle.pack(anchor="w", pady=(0, 4)) self._library_panel = tk.Frame(wrap, bg="#11111a", bd=1, relief="flat") top = tk.Frame(self._library_panel, bg="#11111a") top.pack(fill="x", padx=8, pady=(8, 4)) tk.Label(top, text="Name:", font=("Courier New", 9), bg="#11111a", fg="#d8d8f0").pack(side="left") self._var_library_name = tk.StringVar(value="") tk.Entry(top, textvariable=self._var_library_name, font=("Courier New", 9), bg="#14141e", fg="#d8d8f0", relief="flat", insertbackground="white", width=18).pack(side="left", padx=8) tk.Button(top, text="+ Hinzufuegen", font=("Courier New", 9), bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a", relief="flat", cursor="hand2", command=self._library_add_from_file).pack(side="left", padx=(0, 6)) tk.Button(top, text="Als Quelle verwenden", font=("Courier New", 9), bg="#22223c", fg="#d0d0e0", activebackground="#2e2e4e", relief="flat", cursor="hand2", command=self._library_use_selected).pack(side="left", padx=(8, 6)) tk.Button(top, text="Loeschen", font=("Courier New", 9), bg="#3a1a1a", fg="#ffb0b0", activebackground="#4a2222", relief="flat", cursor="hand2", command=self._library_delete_selected).pack(side="left") self._library_grid = tk.Frame(self._library_panel, bg="#11111a") self._library_grid.pack(fill="x", padx=8, pady=(4, 8)) self._library_placeholder = tk.Label( self._library_grid, text="Noch keine Gesichter gespeichert.", font=("Courier New", 9), bg="#11111a", fg="#7a7a9a" ) self._library_placeholder.grid(row=0, column=0, sticky="w") def _toggle_library_panel(self): self._library_open = not self._library_open if self._library_open: self._library_panel.pack(fill="x") self._btn_library_toggle.configure(text="Bibliothek schliessen") self._refresh_library_grid() else: self._library_panel.pack_forget() self._btn_library_toggle.configure(text="Bibliothek oeffnen") def _refresh_library_grid(self): if not hasattr(self, "_library_grid"): return for child in list(self._library_grid.winfo_children()): child.destroy() self._library_images = {} entries = self.library.list_entries() if not entries: self._library_placeholder = self.tk.Label( self._library_grid, text="Noch keine Gesichter gespeichert.", font=("Courier New", 9), bg="#11111a", fg="#7a7a9a" ) self._library_placeholder.grid(row=0, column=0, sticky="w") return from PIL import Image, ImageTk last_slug = (self._vars.get("last_library_face").get().strip() if "last_library_face" in self._vars else "") if self._selected_library_slug is None and last_slug: self._selected_library_slug = last_slug for i, entry in enumerate(entries): col = i % 4 row = i // 4 cell = self.tk.Frame(self._library_grid, bg="#11111a", bd=0) cell.grid(row=row, column=col, padx=6, pady=6, sticky="n") thumb_path = Path(entry["thumb_path"]) if thumb_path.exists(): img = Image.open(thumb_path).convert("RGB") else: img = Image.new("RGB", (96, 96), "#1a1a2c") photo = ImageTk.PhotoImage(img) self._library_images[entry["slug"]] = photo btn = self.tk.Button( cell, image=photo, relief="solid", bd=3, highlightthickness=0, bg="#11111a", activebackground="#1a1a2c", command=lambda slug=entry["slug"]: self._select_library_entry(slug) ) btn.pack() self.tk.Label(cell, text=entry["name"], font=("Courier New", 8), bg="#11111a", fg="#d8d8f0").pack(pady=(2, 0)) cell._slug = entry["slug"] cell._btn = btn self._select_library_entry(self._selected_library_slug, save=False) def _select_library_entry(self, slug, save=True): if slug: self._selected_library_slug = slug if not hasattr(self, "_library_grid"): return selected = self._selected_library_slug for cell in self._library_grid.winfo_children(): b = getattr(cell, "_btn", None) s = getattr(cell, "_slug", None) if b is None: continue if s == selected: b.configure(bg="#28442a") else: b.configure(bg="#11111a") if save and selected: self._vars["last_library_face"].set(selected) self._save_now() def _library_add_from_file(self): p = self.fd.askopenfilename( title="Gesicht fuer Bibliothek waehlen", filetypes=[("Bilder", "*.jpg *.jpeg *.png *.bmp *.webp"), ("Alle", "*.*")] ) if not p: return name = self._var_library_name.get().strip() or Path(p).stem try: entry = self.library.add(name, p) self._selected_library_slug = entry["slug"] self._vars["last_library_face"].set(entry["slug"]) self._refresh_library_grid() self.mb.showinfo("Bibliothek", f"Gesicht gespeichert: {entry['name']}") except Exception as e: self.mb.showerror("Bibliothek", str(e)) def _library_use_selected(self): slug = self._selected_library_slug if not slug: return self.mb.showerror("Bibliothek", "Bitte zuerst ein Gesicht auswaehlen.") try: src = self.library.get_source_path(slug) self._vars["source"].set(str(src)) self._load_preview(str(src)) self._save_now() except Exception as e: self.mb.showerror("Bibliothek", str(e)) def _library_delete_selected(self): slug = self._selected_library_slug if not slug: return self.mb.showerror("Bibliothek", "Bitte zuerst ein Gesicht auswaehlen.") if not self.mb.askyesno("Bibliothek", f"Eintrag '{slug}' wirklich loeschen?"): return try: self.library.remove(slug) self._selected_library_slug = None self._vars["last_library_face"].set("") self._refresh_library_grid() except Exception as e: self.mb.showerror("Bibliothek", str(e)) def _build_webcam_tab(self, notebook): tk, ttk = self.tk, self.ttk tab = tk.Frame(notebook, bg="#0d0d12") notebook.add(tab, text="Webcam") row1 = tk.Frame(tab, bg="#0d0d12") row1.pack(fill="x", pady=(10, 2)) tk.Label(row1, text="Kamera-Index", font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a").pack(side="left", padx=(0, 8)) self._webcam_index_box = ttk.Combobox( row1, textvariable=self._vars["webcam_index"], values=[str(i) for i in range(5)], state="readonly", width=6, font=("Courier New", 9) ) self._webcam_index_box.pack(side="left") self._webcam_resolution_label = tk.Label( row1, text="Aufloesung: -", font=("Courier New", 8), bg="#0d0d12", fg="#7a7a9a" ) self._webcam_resolution_label.pack(side="left", padx=(12, 0)) row2 = tk.Frame(tab, bg="#0d0d12") row2.pack(fill="x", pady=(8, 2)) tk.Checkbutton(row2, text="Aufnahme aktiv", variable=self._var_webcam_record, font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a", selectcolor="#0d0d12", activebackground="#0d0d12", command=self._save_now).pack(side="left") self._row_in(tab, "webcam_output", self._pick_webcam_out) self._webcam_preview_target = (640, 360) self._webcam_preview_collapsed_h = 96 self._webcam_preview_box = tk.Frame(tab, bg="#05050c", height=self._webcam_preview_collapsed_h) self._webcam_preview_box.pack(fill="x", padx=4, pady=(8, 6)) self._webcam_preview_box.pack_propagate(False) self._webcam_preview = tk.Label( self._webcam_preview_box, bg="#05050c", fg="#7a7a9a", text="Webcam Vorschau (Starten fuer Live-Preview)" ) self._webcam_preview.pack(fill="both", expand=True) self._webcam_stats = tk.Label( tab, text="FPS: - | Gesichter: -", font=("Courier New", 9), bg="#0d0d12", fg="#8a8aff" ) self._webcam_stats.pack(anchor="w", padx=4, pady=(0, 8)) self._btn_webcam = tk.Button( tab, text="Starten", font=("Courier New", 10, "bold"), bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a", relief="flat", cursor="hand2", command=self._toggle_webcam ) self._btn_webcam.pack(anchor="w", padx=4, pady=(0, 10)) self._set_webcam_preview_collapsed(True) def _toggle_webcam(self): if self._webcam_running: self._stop_webcam() else: self._start_webcam() def _pick_webcam_out(self): p = self.fd.asksaveasfilename( title="Webcam-Aufnahme speichern", defaultextension=".mp4", filetypes=[("MP4 Video", "*.mp4"), ("Alle", "*.*")] ) if p: self._vars["webcam_output"].set(p) def _set_webcam_preview_collapsed(self, collapsed): if not hasattr(self, "_webcam_preview_box"): return if collapsed: self._webcam_preview_box.configure(height=self._webcam_preview_collapsed_h) self._webcam_preview.configure(image="", text="Webcam Vorschau (Starten fuer Live-Preview)") self._webcam_preview.image = None else: self._webcam_preview_box.configure(height=int(self._webcam_preview_target[1])) def _start_webcam(self): import cv2 import threading src = self._vars["source"].get().strip() if not src or not Path(src).is_file(): return self.mb.showerror("Webcam", "Bitte zuerst ein gueltiges Quellbild waehlen.") try: cam_idx = int(self._vars["webcam_index"].get().strip() or "0") except Exception: return self.mb.showerror("Webcam", "Ungueltiger Kamera-Index.") probe = cv2.VideoCapture(cam_idx) if not probe.isOpened(): probe.release() tried = [] for i in range(5): c = cv2.VideoCapture(i) ok = c.isOpened() c.release() if ok: tried.append(i) return self.mb.showerror( "Webcam", f"Kamera konnte nicht geoeffnet werden (Index {cam_idx}).\n" f"Verfuegbare Indizes: {tried if tried else 'keine'}" ) w = int(probe.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) h = int(probe.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) probe.release() self._webcam_resolution_label.configure(text=f"Aufloesung: {w}x{h}") self._update_quality() try: self.swapper.init_models() except Exception as e: return self.mb.showerror("Webcam", f"Modellfehler: {e}") src_img = _cv2_imread_unicode(src) if src_img is None: return self.mb.showerror("Webcam", "Quellbild konnte nicht geladen werden.") src_face = self.swapper.get_first_face(src_img) if src_face is None: return self.mb.showerror("Webcam", "Kein Gesicht im Quellbild gefunden!") self._webcam_cancel = threading.Event() self._webcam_running = True self._btn_webcam.configure(text="Stoppen", bg="#3a1a1a", fg="#ffb0b0", activebackground="#4a2222") self._webcam_stats.configure(text="FPS: - | Gesichter: -") self._set_webcam_preview_collapsed(False) record_path = self._vars["webcam_output"].get().strip() if self._var_webcam_record.get() else None self._webcam_thread = threading.Thread( target=self._webcam_worker, args=(src_face, cam_idx, record_path), daemon=True, ) self._webcam_thread.start() def _stop_webcam(self): if self._webcam_cancel is not None: self._webcam_cancel.set() th = self._webcam_thread if th is not None and th.is_alive(): th.join(timeout=1.5) self._webcam_running = False self._webcam_thread = None self._webcam_cancel = None if hasattr(self, "_btn_webcam"): self._btn_webcam.configure(text="Starten", bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a") if hasattr(self, "_webcam_preview"): self._set_webcam_preview_collapsed(True) def _webcam_worker(self, src_face, cam_idx, record_path): import cv2 from PIL import Image def on_frame(frame): rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = Image.fromarray(rgb) target_w, target_h = self._webcam_preview_target src_w, src_h = img.size scale = min(target_w / max(1, src_w), target_h / max(1, src_h)) new_w = max(1, int(round(src_w * scale))) new_h = max(1, int(round(src_h * scale))) resampling = getattr(Image, "Resampling", Image).BILINEAR fitted = img.resize((new_w, new_h), resampling) canvas = Image.new("RGB", (target_w, target_h), (5, 5, 12)) off_x = (target_w - new_w) // 2 off_y = (target_h - new_h) // 2 canvas.paste(fitted, (off_x, off_y)) self._root_real.after(1, self._apply_webcam_frame, canvas) def on_stats(fps, faces): self._webcam_last_faces = int(faces) self._root_real.after(1, lambda: self._webcam_stats.configure(text=f"FPS: {fps:.1f} | Gesichter: {int(faces)}")) try: self.swapper.swap_webcam( src_face=src_face, camera_index=cam_idx, record_path=record_path, fps_target=25.0, cancel_check=(self._webcam_cancel.is_set if self._webcam_cancel is not None else None), frame_cb=on_frame, stats_cb=on_stats, use_tracking=False, ) except Exception as e: self._root_real.after(1, lambda: self.mb.showerror("Webcam", str(e))) finally: self._root_real.after(1, self._stop_webcam) def _apply_webcam_frame(self, pil_img): from PIL import ImageTk if not hasattr(self, "_webcam_preview"): return photo = ImageTk.PhotoImage(pil_img) self._webcam_preview.configure(image=photo, text="") self._webcam_preview.image = photo def _pick_source(self): p = self.fd.askopenfilename(title="Quellbild waehlen", filetypes=[("Bilder", "*.jpg *.jpeg *.png *.bmp *.webp"), ("Alle", "*.*")]) if not p: return self._vars["source"].set(p) self._load_preview(p) def _pick_indir(self): p = self.fd.askdirectory(title="Eingabe-Ordner waehlen") if p: self._vars["input_dir"].set(p) def _pick_outdir(self): p = self.fd.askdirectory(title="Ausgabe-Ordner waehlen") if p: self._vars["output_dir"].set(p) def _pick_video_indir(self): p = self.fd.askdirectory(title="Video-Eingabe-Ordner waehlen") if p: self._vars["video_input_dir"].set(p) if not self._vars["video_output_dir"].get(): self._vars["video_output_dir"].set(str(Path(p) / "output_videos")) def _pick_video_outdir(self): p = self.fd.askdirectory(title="Video-Ausgabe-Ordner waehlen") if p: self._vars["video_output_dir"].set(p) def _pick_voice_ref(self): p = self.fd.askopenfilename( title="Referenz-Stimme waehlen", filetypes=[("Audio", "*.wav *.mp3 *.m4a *.flac *.ogg *.aac *.wma"), ("Alle", "*.*")] ) if p: self._vars["voice_ref"].set(p) if not self._vars["voice_output"].get(): base = Path(p).with_suffix("").name self._vars["voice_output"].set(str(Path(p).parent / f"{base}_cloned.wav")) def _pick_voice_source_audio(self): p = self.fd.askopenfilename( title="Eingabe-Audio waehlen", filetypes=[("Audio", "*.wav *.mp3 *.m4a *.flac *.ogg *.aac *.wma"), ("Alle", "*.*")] ) if p: self._vars["voice_source_audio"].set(p) if not self._vars["voice_output"].get(): base = Path(p).with_suffix("").name self._vars["voice_output"].set(str(Path(p).parent / f"{base}_voiceclone.wav")) def _pick_voice_out(self): p = self.fd.asksaveasfilename( title="Ausgabe-Audio speichern", defaultextension=".wav", filetypes=[("WAV-Audio", "*.wav"), ("Alle", "*.*")] ) if p: self._vars["voice_output"].set(p) def _log(self, msg): self._lb.insert("end", msg + "\n") self._lb.see("end") self._root_real.update_idletasks() def _update_quality(self): self.swapper.enhance = self._var_enhance.get() self.swapper.color = self._var_color.get() allow_restoration = self._var_restoration.get() and self.restorer.is_available() self.swapper.use_restoration = bool(allow_restoration) self._save_now() def _redo_setup(self): if self._webcam_running: self._stop_webcam() SETUP_FLAG.unlink(missing_ok=True) self._root_real.destroy() _show_setup_window() importlib.invalidate_caches() MainApp() def _on_close(self): if self._webcam_running: self._stop_webcam() self._root_real.destroy() def _start(self): self._btn.configure(state="disabled") import threading tab_idx = self._nb.index(self._nb.select()) if tab_idx == 3: self._btn.configure(state="normal") self._toggle_webcam() return if tab_idx == 1: target = self._run_video elif tab_idx == 2: target = self._run_voice else: target = self._run threading.Thread(target=target, daemon=True).start() def _run(self): import cv2 self._update_quality() src = self._vars["source"].get().strip() indir = self._vars["input_dir"].get().strip() outdir = self._vars["output_dir"].get().strip() def err(t, m): self.mb.showerror(t, m) self._btn.configure(state="normal") if not all([src, indir, outdir]): return err("Fehler", "Bitte alle drei Felder ausfuellen.") if not Path(src).is_file(): return err("Fehler", f"Quellbild nicht gefunden:\n{src}") if not Path(indir).is_dir(): return err("Fehler", f"Eingabe-Ordner existiert nicht:\n{indir}") Path(outdir).mkdir(parents=True, exist_ok=True) try: self.swapper.init_models() except Exception as e: return err("Modellfehler", str(e)) src_img = _cv2_imread_unicode(src) if src_img is None: return err("Fehler", "Quellbild konnte nicht geladen werden.") src_face = self.swapper.get_first_face(src_img) if src_face is None: return err("Fehler", "Kein Gesicht im Quellbild gefunden!") self._log(f"OK Quellgesicht erkannt: {Path(src).name}") images = sorted(p for p in Path(indir).iterdir() if p.suffix.lower() in self.SUPPORTED and p.is_file()) if not images: self.mb.showinfo("Keine Bilder", "Keine unterstuetzten Bilder im Eingabe-Ordner.") self._btn.configure(state="normal") return self._log(f"{len(images)} Bild(er) gefunden ...\n") self._pb["maximum"] = len(images) self._pb["value"] = 0 ok = 0; failed = 0 failed_dir = Path(outdir) / "failed" for i, imgp in enumerate(images, 1): outp = Path(outdir) / imgp.name self._log(f"[{i}/{len(images)}] {imgp.name}") self._sv.set(f"Verarbeite {imgp.name} ({i}/{len(images)}) ...") swapped = False try: swapped = self.swapper.swap_image(src_face, imgp, outp) except Exception as e: self._log(f" FEHLER: {e}") if swapped: ok += 1 self._log(" OK gespeichert") else: failed_dir.mkdir(parents=True, exist_ok=True) shutil.copy2(imgp, failed_dir / imgp.name) failed += 1 self._log(f" WARNUNG -> failed/{imgp.name} (kein Gesicht erkannt)") self._pb["value"] = i summary = f"{ok} erfolgreich" if failed: summary += f" | {failed} fehlgeschlagen -> Ordner: failed/" self._sv.set(summary) self._log(f"\nFertig: {ok}/{len(images)} Bilder bearbeitet.") if failed: self._log(f"WARNUNG: {failed} Bild(er) ohne Gesicht -> {failed_dir}") self.mb.showinfo("Fertig", f"{ok} von {len(images)} erfolgreich.\n" + (f"{failed} ohne Gesicht -> Ordner 'failed'\n" if failed else "") + f"\nAusgabe:\n{outdir}") self._btn.configure(state="normal") def _run_video(self): import cv2 self._update_quality() src = self._vars["source"].get().strip() video_indir = self._vars["video_input_dir"].get().strip() video_outdir = self._vars["video_output_dir"].get().strip() def err(t, m): self.mb.showerror(t, m) self._btn.configure(state="normal") if not all([src, video_indir, video_outdir]): return err("Fehler", "Bitte Quellbild, Video-Eingabe-Ordner und Video-Ausgabe-Ordner angeben.") if not Path(src).is_file(): return err("Fehler", f"Quellbild nicht gefunden:\n{src}") if not Path(video_indir).is_dir(): return err("Fehler", f"Video-Eingabe-Ordner existiert nicht:\n{video_indir}") Path(video_outdir).mkdir(parents=True, exist_ok=True) videos = sorted(p for p in Path(video_indir).iterdir() if p.is_file() and p.suffix.lower() in self.VIDEO_SUPPORTED) if not videos: self.mb.showinfo("Keine Videos", "Keine unterstuetzten Videos im Eingabe-Ordner.") self._btn.configure(state="normal") return try: self.swapper.init_models() except Exception as e: return err("Modellfehler", str(e)) src_img = _cv2_imread_unicode(src) if src_img is None: return err("Fehler", "Quellbild konnte nicht geladen werden.") src_face = self.swapper.get_first_face(src_img) if src_face is None: return err("Fehler", "Kein Gesicht im Quellbild gefunden!") self._log(f"OK Quellgesicht erkannt: {Path(src).name}") self._log(f"{len(videos)} Video(s) im Eingabe-Ordner gefunden.") total_frames = 0 for vp in videos: cap, cap_tmp_copy = _open_videocapture_unicode(vp, log_fn=self._log) if cap.isOpened(): total_frames += max(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 1) else: total_frames += 1 cap.release() _cleanup_temp_file(cap_tmp_copy) self._pb["maximum"] = max(total_frames, 1) self._pb["value"] = 0 done_total = 0; ok = 0; failed = 0; no_face = 0 failed_dir = Path(video_outdir) / "failed" for i, vp in enumerate(videos, 1): outp = Path(video_outdir) / f"{vp.stem}_faceswap.mp4" self._log(f"[{i}/{len(videos)}] {vp.name}") self._sv.set(f"Verarbeite Video {i}/{len(videos)}: {vp.name}") offset = done_total max_total = int(float(self._pb["maximum"])) def on_progress(done, total, name=vp.name, base=offset): combined = base + done self._pb["value"] = min(combined, max_total) self._sv.set(f"{name}: Frame {done}/{max(total, 1)} | Gesamt {combined}/{max_total}") self._root_real.update_idletasks() try: stats = self.swapper.swap_video(src_face, vp, outp, progress_cb=on_progress) done_total += int(stats.get("frames_processed", 0)) swapped_frames = int(stats.get("frames_swapped", 0)) swapped_faces = int(stats.get("faces_swapped", 0)) ok += 1 if swapped_frames == 0: no_face += 1 self._log(f" OK gespeichert: {outp.name} (Frames mit Face: {swapped_frames}, getauschte Gesichter: {swapped_faces})") except Exception as e: failed += 1 failed_dir.mkdir(parents=True, exist_ok=True) try: shutil.copy2(vp, failed_dir / vp.name) except Exception: pass self._log(f" FEHLER: {e}") self._log(f" WARNUNG: Original kopiert nach failed/{vp.name}") summary = f"{ok} Video(s) verarbeitet" if no_face: summary += f" | {no_face} ohne erkannten Face-Frame" if failed: summary += f" | {failed} fehlgeschlagen" self._sv.set(summary) self._pb["value"] = self._pb["maximum"] self._log(f"\nVideo-Batch fertig: {ok}/{len(videos)} verarbeitet.") if no_face: self._log(f"WARNUNG: {no_face} Video(s) hatten keinen erkannten Face-Frame.") if failed: self._log(f"FEHLER: {failed} Video(s) fehlgeschlagen -> {failed_dir}") self.mb.showinfo("Fertig", f"{ok} von {len(videos)} Video(s) verarbeitet.\n" + (f"{no_face} ohne erkannten Face-Frame.\n" if no_face else "") + (f"{failed} fehlgeschlagen -> Ordner 'failed'\n" if failed else "") + f"\nAusgabe:\n{video_outdir}") self._btn.configure(state="normal") def _run_voice(self): mode = self._var_voice_mode.get().strip().lower() ref = self._vars["voice_ref"].get().strip() out_file = self._vars["voice_output"].get().strip() lang = self._vars["voice_language"].get().strip().lower() or "de" text = self._voice_text.get("1.0", "end").strip() source_audio = self._vars["voice_source_audio"].get().strip() def err(t, m): self.mb.showerror(t, m) self._btn.configure(state="normal") if not ref: return err("Fehler", "Bitte eine Referenz-Stimme waehlen.") if not out_file: return err("Fehler", "Bitte eine Ausgabe-Audio-Datei waehlen.") self._pb["maximum"] = 100 self._pb["value"] = 5 self._sv.set("Starte Voice-Cloning ...") self._log("VOICE: Starte Verarbeitung ...") try: if mode == "audio": if not source_audio: return err("Fehler", "Bitte Eingabe-Audio waehlen (Audio-Modus).") self._sv.set("VOICE: Lade Model und konvertiere Audio ...") self._pb["value"] = 35 result = self.voice.clone_from_audio(ref, source_audio, out_file) self._pb["value"] = 100 self._sv.set("OK Voice-Cloning abgeschlossen (Audio-Modus)") self._log(f"VOICE: Fertig (Audio-Modus) -> {result}") self.mb.showinfo("Fertig", f"Voice-Cloning fertig.\n\nAusgabe:\n{result}") else: if not text: return err("Fehler", "Bitte Text eingeben (Text-Modus).") self._sv.set("VOICE: Lade XTTS und generiere Sprache ...") self._pb["value"] = 35 result = self.voice.clone_from_text(ref, text, lang, out_file) self._pb["value"] = 100 self._sv.set("OK Voice-Cloning abgeschlossen (Text-Modus)") self._log(f"VOICE: Fertig (Text-Modus, Sprache={lang}) -> {result}") self.mb.showinfo("Fertig", f"Voice-Cloning fertig.\n\nAusgabe:\n{result}") except Exception as e: self._log(f"VOICE: Fehler: {e}") err("Voice-Cloning Fehler", str(e)) finally: self._btn.configure(state="normal") def main(): _require_python_version() if not SETUP_FLAG.exists(): _show_setup_window() importlib.invalidate_caches() MainApp() if __name__ == "__main__": main()