Files
Ai-Swaper/face_swap.py
TutorialsGHG 1cad87752c new version
2026-05-11 21:04:04 +02:00

2834 lines
118 KiB
Python

#!/usr/bin/env python3
"""FaceSwap Batch Tool v5 - Mit Face Enhancer & Farbanpassung. Fuehre setup.bat aus bevor du dieses Skript startest."""
import os, sys, subprocess, importlib, platform, urllib.request, shutil, zipfile, json, sysconfig
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.resolve()
MODELS_DIR = SCRIPT_DIR / "models"
SETUP_FLAG = SCRIPT_DIR / ".setup_done"
CONFIG_FILE = SCRIPT_DIR / "config.json"
IS_WINDOWS = platform.system() == "Windows"
_CUDA_PRELOAD_TRIED = False
_PROVIDERS_CACHE = None
_DLL_DIR_HANDLES = []
REQUIRED_PYTHON = (3, 12)
def _require_python_version():
if sys.version_info[:2] == REQUIRED_PYTHON:
return
required = f"{REQUIRED_PYTHON[0]}.{REQUIRED_PYTHON[1]}"
msg = (
"Falsche Python-Version erkannt.\n\n"
f"Benoetigt: Python {required}\n"
f"Aktuell: Python {sys.version.split()[0]}\n"
f"Interpreter: {sys.executable}\n\n"
f"Bitte so starten:\n py -{required} {Path(__file__).name}"
)
try:
import tkinter as tk
from tkinter import messagebox
root = tk.Tk()
root.withdraw()
messagebox.showerror("FaceSwap - Python Version", msg, parent=root)
root.destroy()
except Exception:
pass
raise RuntimeError(msg)
def _load_config():
try:
if CONFIG_FILE.exists():
return json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
except Exception:
pass
return {}
def _save_config(data):
try:
CONFIG_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
except Exception:
pass
def _run_ok(*cmd):
try:
out = subprocess.check_output(list(cmd), stderr=subprocess.DEVNULL, timeout=8).decode()
return True, out
except Exception:
return False, ""
def _cuda_available():
ok, out = _run_ok("nvidia-smi")
if ok:
import re
m = re.search(r"CUDA Version:\s*([\d.]+)", out)
return True, (m.group(1) if m else "unknown")
return False, ""
def _ensure_torchvision_functional_tensor_alias(log_fn=None):
try:
import torchvision.transforms.functional_tensor # noqa: F401
return True
except Exception:
pass
try:
import sys
import types
from torchvision.transforms import functional as tvf
if not hasattr(tvf, "rgb_to_grayscale"):
return False
alias_mod = types.ModuleType("torchvision.transforms.functional_tensor")
alias_mod.rgb_to_grayscale = tvf.rgb_to_grayscale
sys.modules["torchvision.transforms.functional_tensor"] = alias_mod
if log_fn:
log_fn(" Hinweis: torchvision-Kompatibilitaetsmodus fuer GFPGAN aktiv.")
return True
except Exception:
return False
def _preload_onnxruntime_cuda(log_fn=None):
global _CUDA_PRELOAD_TRIED
if _CUDA_PRELOAD_TRIED or not IS_WINDOWS:
return
_CUDA_PRELOAD_TRIED = True
try:
import site
import onnxruntime as ort
dll_dirs = []
seen = set()
def _push_dir(path_obj):
try:
p = str(Path(path_obj).resolve())
except Exception:
return
key = p.lower()
if key in seen:
return
seen.add(key)
if Path(p).is_dir():
dll_dirs.append(p)
# onnxruntime eigene DLLs
ort_pkg = Path(ort.__file__).resolve().parent
_push_dir(ort_pkg / "capi")
# NVIDIA Runtime-DLLs aus allen relevanten site-packages
site_roots = []
try:
site_roots.extend(site.getsitepackages())
except Exception:
pass
try:
site_roots.append(site.getusersitepackages())
except Exception:
pass
# Interpreter-spezifisches site-packages ebenfalls absichern
site_roots.append(sysconfig.get_paths().get("purelib", ""))
for root in [Path(r) for r in site_roots if r]:
_push_dir(root / "nvidia" / "cublas" / "bin")
_push_dir(root / "nvidia" / "cuda_runtime" / "bin")
_push_dir(root / "nvidia" / "cuda_nvrtc" / "bin")
_push_dir(root / "nvidia" / "cudnn" / "bin")
_push_dir(root / "nvidia" / "cufft" / "bin")
_push_dir(root / "nvidia" / "nvjitlink" / "bin")
# DLL-Suchpfad fuer spaeter dynamisch geladene cuDNN-Teillibs erweitern
path_parts = os.environ.get("PATH", "").split(os.pathsep)
path_keys = {p.lower() for p in path_parts}
for d in dll_dirs:
if d.lower() not in path_keys:
path_parts.insert(0, d)
path_keys.add(d.lower())
try:
if hasattr(os, "add_dll_directory"):
_DLL_DIR_HANDLES.append(os.add_dll_directory(d))
except Exception:
pass
os.environ["PATH"] = os.pathsep.join(path_parts)
preload = getattr(ort, "preload_dlls", None)
if callable(preload):
preload(directory="")
if log_fn:
log_fn(f" ONNX Runtime CUDA-DLLs vorgeladen ({len(dll_dirs)} DLL-Ordner).")
except Exception as e:
if log_fn:
log_fn(f" Hinweis: CUDA-DLL-Preload fehlgeschlagen ({e}).")
def _dl(url, dest, log=print):
def hook(count, block, total):
if total > 0 and count % 100 == 0:
pct = min(100, count * block * 100 // total)
log(f" ... {pct}% ({count*block/1_048_576:.0f} MB)")
urllib.request.urlretrieve(url, dest, reporthook=hook)
def _cv2_imread_unicode(path, flags=None):
import cv2
import numpy as np
if flags is None:
flags = cv2.IMREAD_COLOR
p = str(path)
img = cv2.imread(p, flags)
if img is not None:
return img
try:
data = np.fromfile(p, dtype=np.uint8)
if data.size == 0:
return None
return cv2.imdecode(data, flags)
except Exception:
return None
def _cv2_imwrite_unicode(path, img, params=None):
import cv2
p = str(path)
if params is None:
params = []
try:
if cv2.imwrite(p, img, params):
return True
except Exception:
pass
try:
ext = Path(p).suffix or ".png"
ok, buf = cv2.imencode(ext, img, params)
if not ok:
return False
buf.tofile(p)
return True
except Exception:
return False
def _path_has_non_ascii(path):
try:
s = str(path)
except Exception:
return False
return any(ord(ch) > 127 for ch in s)
def _open_videocapture_unicode(source, log_fn=None):
import cv2
import tempfile
if isinstance(source, int):
return cv2.VideoCapture(int(source)), None
p = str(source)
cap = cv2.VideoCapture(p)
if cap.isOpened():
return cap, None
tmp_copy = None
try:
src_path = Path(p)
if IS_WINDOWS and src_path.is_file() and _path_has_non_ascii(src_path):
fd, tmp_name = tempfile.mkstemp(prefix="faceswap_vid_", suffix=(src_path.suffix or ".mp4"), dir=str(SCRIPT_DIR))
os.close(fd)
tmp_copy = Path(tmp_name)
shutil.copy2(src_path, tmp_copy)
cap2 = cv2.VideoCapture(str(tmp_copy))
if cap2.isOpened():
cap.release()
if log_fn:
log_fn(" Hinweis: Unicode-Video-Fallback aktiv (temp Datei).")
return cap2, tmp_copy
cap2.release()
except Exception:
pass
return cap, None
def _cleanup_temp_file(path):
if path is None:
return
try:
Path(path).unlink(missing_ok=True)
except Exception:
pass
def run_setup(log=print):
log("=" * 60)
log(" FaceSwap Batch Tool - Modell-Download")
log("=" * 60)
log("\nPruefe NVIDIA GPU / CUDA ...")
cuda, cuda_ver = _cuda_available()
log(f" {'OK CUDA ' + cuda_ver + ' -> GPU-Modus' if cuda else 'Info: Kein CUDA -> CPU-Modus'}")
log("\nPruefe Installation ...")
missing = []
for mod, name in [("cv2","opencv-python"), ("numpy","numpy"), ("insightface","insightface"),
("onnx","onnx"), ("onnxruntime","onnxruntime"), ("albumentations","albumentations")]:
try:
importlib.import_module(mod)
log(f" OK {name}")
except Exception:
log(f" FEHLT: {name}")
missing.append(name)
if missing:
raise RuntimeError(
f"Fehlende Pakete: {', '.join(missing)}\n\n"
"Bitte fuehre zuerst setup.bat aus!"
)
log("\nPruefe GFPGAN ...")
has_gfpgan = False
try:
if importlib.util.find_spec("gfpgan") is None:
raise ModuleNotFoundError("gfpgan")
_ensure_torchvision_functional_tensor_alias(log)
importlib.import_module("gfpgan")
has_gfpgan = True
log(" OK gfpgan")
except ModuleNotFoundError:
log(" FEHLT: gfpgan - Funktion deaktiviert (optional)")
except Exception as e:
log(f" FEHLER: gfpgan installiert, aber nicht importierbar ({e})")
log(" Funktion deaktiviert (optional)")
import numpy as np
major = int(np.__version__.split(".")[0])
if major >= 2:
raise RuntimeError(
f"numpy {np.__version__} ist installiert, aber numpy<2.0 wird benoetigt.\n\n"
"Bitte fuehre setup.bat aus um alle Pakete neu zu installieren."
)
log(f" OK numpy {np.__version__} (kompatibel)")
log("\nPruefe KI-Modelle ...")
MODELS_DIR.mkdir(exist_ok=True)
buffalo_dir = MODELS_DIR / "buffalo_l"
if buffalo_dir.exists() and any(buffalo_dir.iterdir()):
log(" OK buffalo_l")
else:
log(" Lade buffalo_l (~200 MB) ...")
zp = MODELS_DIR / "buffalo_l.zip"
try:
_dl("https://github.com/deepinsight/insightface/releases/download/v0.7/buffalo_l.zip", zp, log)
with zipfile.ZipFile(zp) as z:
z.extractall(MODELS_DIR)
zp.unlink(missing_ok=True)
log(" OK buffalo_l")
except Exception as e:
zp.unlink(missing_ok=True)
raise RuntimeError(f"buffalo_l Download fehlgeschlagen: {e}")
sm = MODELS_DIR / "inswapper_128.onnx"
if sm.exists() and sm.stat().st_size > 100_000:
log(" OK inswapper_128.onnx")
else:
log(" Lade inswapper_128.onnx (~500 MB) ...")
urls = [
"https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx",
"https://github.com/deepinsight/insightface/releases/download/v0.7/inswapper_128.onnx",
]
ok = False
for url in urls:
try:
log(f" Versuche {url.split('/')[2]} ...")
_dl(url, sm, log)
if sm.exists() and sm.stat().st_size > 100_000:
ok = True
break
sm.unlink(missing_ok=True)
except Exception as e:
log(f" Fehler: {e}")
sm.unlink(missing_ok=True)
if not ok:
raise RuntimeError(
"inswapper_128.onnx konnte nicht heruntergeladen werden.\n\n"
"Manuell herunterladen von:\n"
" https://huggingface.co/deepinsight/inswapper\n"
f"Datei ablegen in: {MODELS_DIR}"
)
log(" OK inswapper_128.onnx")
if has_gfpgan:
try:
FaceRestorer(log_fn=log).ensure_model()
log(" OK GFPGANv1.4.pth")
except Exception as e:
log(f" WARNUNG: GFPGAN-Modell konnte nicht vorbereitet werden ({e})")
SETUP_FLAG.write_text(f"cuda={cuda}\n")
log("\nEinrichtung abgeschlossen!")
def _show_setup_window():
import tkinter as tk
from tkinter import ttk, messagebox
import threading
root = tk.Tk()
root.title("FaceSwap - Einrichtung")
root.geometry("700x520")
root.configure(bg="#090912")
root.resizable(False, False)
tk.Label(root, text="Einrichtung", font=("Courier New", 17, "bold"),
bg="#090912", fg="#e8d5b7").pack(pady=(18, 4))
tk.Label(root, text="Pruefe Pakete & lade KI-Modelle ...",
font=("Courier New", 9), bg="#090912", fg="#7a7a9a").pack()
lf = tk.Frame(root, bg="#090912")
lf.pack(fill="both", expand=True, padx=18, pady=8)
lb = tk.Text(lf, bg="#04040a", fg="#8aff8a", font=("Courier New", 8), relief="flat", state="disabled")
sb = tk.Scrollbar(lf, command=lb.yview)
lb.configure(yscrollcommand=sb.set)
lb.pack(side="left", fill="both", expand=True)
sb.pack(side="right", fill="y")
style = ttk.Style(root)
style.theme_use("default")
style.configure("S.Horizontal.TProgressbar", troughcolor="#111120", background="#3adf6a", thickness=10)
pb = ttk.Progressbar(root, mode="indeterminate", length=660, style="S.Horizontal.TProgressbar")
pb.pack(padx=18, pady=4)
sv = tk.StringVar(value="Starte ...")
tk.Label(root, textvariable=sv, font=("Courier New", 9), bg="#090912", fg="#c8a96a").pack()
btn = tk.Button(root, text="Schliessen & Starten", font=("Courier New", 11, "bold"),
bg="#1a3a2a", fg="#8aff8a", relief="flat", state="disabled",
cursor="hand2", command=root.destroy)
btn.pack(pady=10)
def append(msg):
lb.configure(state="normal")
lb.insert("end", msg + "\n")
lb.see("end")
lb.configure(state="disabled")
root.update_idletasks()
def worker():
pb.start(10)
try:
run_setup(log=append)
sv.set("Fertig!")
btn.configure(state="normal")
except Exception as e:
append(f"\nFehler: {e}")
sv.set("Fehler - Details im Log")
messagebox.showerror("Fehler", str(e), parent=root)
btn.configure(state="normal")
finally:
pb.stop()
threading.Thread(target=worker, daemon=True).start()
root.mainloop()
def _get_providers(log_fn=None):
global _PROVIDERS_CACHE
if _PROVIDERS_CACHE is not None:
return list(_PROVIDERS_CACHE)
try:
import onnxruntime as ort
_preload_onnxruntime_cuda(log_fn)
if "CUDAExecutionProvider" in ort.get_available_providers():
probe_model = MODELS_DIR / "buffalo_l" / "1k3d68.onnx"
if probe_model.exists():
session = ort.InferenceSession(
str(probe_model),
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
if "CUDAExecutionProvider" not in session.get_providers():
if log_fn:
log_fn(" CUDA erkannt, aber ONNX Runtime initialisiert nur CPU. CPU-Fallback aktiv.")
_PROVIDERS_CACHE = ["CPUExecutionProvider"]
return list(_PROVIDERS_CACHE)
_PROVIDERS_CACHE = ["CUDAExecutionProvider", "CPUExecutionProvider"]
return list(_PROVIDERS_CACHE)
except Exception:
pass
_PROVIDERS_CACHE = ["CPUExecutionProvider"]
return list(_PROVIDERS_CACHE)
def _enhance_face_region(img, bbox, sharpen=True, color_correct=True):
import cv2, numpy as np
x1, y1, x2, y2 = [int(v) for v in bbox]
pad = 10
x1c = max(0, x1 - pad); y1c = max(0, y1 - pad)
x2c = min(img.shape[1], x2 + pad); y2c = min(img.shape[0], y2 + pad)
h, w = y2c - y1c, x2c - x1c
if h <= 0 or w <= 0:
return img
original_region = img[y1c:y2c, x1c:x2c].copy()
enhanced = original_region.copy()
if sharpen:
blur = cv2.GaussianBlur(enhanced, (0, 0), 2)
enhanced = cv2.addWeighted(enhanced, 1.4, blur, -0.4, 0)
mask = np.zeros((h, w), dtype=np.float32)
border = max(8, min(h, w) // 6)
mask[border:-border, border:-border] = 1.0
mask = cv2.GaussianBlur(mask, (0, 0), border * 0.8)
mask = np.clip(mask, 0, 1)[:, :, np.newaxis]
blended = (enhanced.astype(np.float32) * mask +
original_region.astype(np.float32) * (1 - mask))
img[y1c:y2c, x1c:x2c] = np.clip(blended, 0, 255).astype(np.uint8)
return img
def _match_face_color(swapped_img, original_img, bbox):
import cv2, numpy as np
x1, y1, x2, y2 = [int(v) for v in bbox]
pad = 5
x1c = max(0, x1 - pad); y1c = max(0, y1 - pad)
x2c = min(swapped_img.shape[1], x2 + pad); y2c = min(swapped_img.shape[0], y2 + pad)
src_region = original_img[y1c:y2c, x1c:x2c].astype(np.float32)
dst_region = swapped_img[y1c:y2c, x1c:x2c].astype(np.float32)
if src_region.size == 0 or dst_region.size == 0:
return swapped_img
for c in range(3):
src_mean, src_std = src_region[:,:,c].mean(), src_region[:,:,c].std() + 1e-6
dst_mean, dst_std = dst_region[:,:,c].mean(), dst_region[:,:,c].std() + 1e-6
factor = 0.5
dst_region[:,:,c] = (dst_region[:,:,c] - dst_mean) * (src_std / dst_std) * factor \
+ dst_mean * factor + dst_region[:,:,c] * (1 - factor)
dst_region = np.clip(dst_region, 0, 255).astype(np.uint8)
result = swapped_img.copy()
result[y1c:y2c, x1c:x2c] = dst_region
return result
class FaceRestorer:
MODEL_URL = "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth"
MODEL_PATH = MODELS_DIR / "GFPGANv1.4.pth"
_FACEXLIB_FILES = {
"detection_Resnet50_Final.pth": "https://github.com/xinntao/facexlib/releases/download/v0.1.0/detection_Resnet50_Final.pth",
"parsing_parsenet.pth": "https://github.com/xinntao/facexlib/releases/download/v0.2.2/parsing_parsenet.pth",
}
def __init__(self, log_fn=print):
self.log = log_fn
self.restorer = None
self.upscale = 1
self.only_center_face = False
self.min_face_px = 72
self.blend_alpha = 0.72
self.pad_ratio = 0.22
@staticmethod
def is_available(log_fn=None):
import importlib
import importlib.util
if importlib.util.find_spec("gfpgan") is None:
return False
_ensure_torchvision_functional_tensor_alias(log_fn)
try:
importlib.import_module("gfpgan")
return True
except Exception:
return False
def ensure_model(self):
MODELS_DIR.mkdir(exist_ok=True)
if not self.MODEL_PATH.exists() or self.MODEL_PATH.stat().st_size < 100_000:
self.log(" Lade GFPGANv1.4.pth (~330 MB) ...")
_dl(self.MODEL_URL, self.MODEL_PATH, self.log)
self._ensure_facexlib_weights()
def _ensure_facexlib_weights(self):
if not self.is_available():
return
try:
import facexlib
weights_dir = Path(facexlib.__file__).resolve().parent / "weights"
weights_dir.mkdir(parents=True, exist_ok=True)
for filename, url in self._FACEXLIB_FILES.items():
dst = weights_dir / filename
if dst.exists() and dst.stat().st_size > 100_000:
continue
self.log(f" Lade {filename} ...")
_dl(url, dst, self.log)
except Exception as e:
self.log(f" Hinweis: facexlib-Modelle konnten nicht vorab geladen werden ({e}).")
def load(self):
if self.restorer is not None:
return
if not self.is_available(log_fn=self.log):
raise RuntimeError("GFPGAN ist nicht installiert.")
self.ensure_model()
_ensure_torchvision_functional_tensor_alias(self.log)
from gfpgan import GFPGANer
providers = _get_providers(self.log)
device = "cuda" if "CUDAExecutionProvider" in providers else "cpu"
self.restorer = GFPGANer(
model_path=str(self.MODEL_PATH),
upscale=self.upscale,
arch="clean",
channel_multiplier=2,
bg_upsampler=None,
device=device,
)
def restore(self, bgr_img):
import cv2
import numpy as np
if bgr_img is None:
return bgr_img
try:
self.load()
_, _, restored = self.restorer.enhance(
bgr_img,
has_aligned=False,
only_center_face=self.only_center_face,
paste_back=True,
)
if restored is None:
return bgr_img
if restored.shape[:2] != bgr_img.shape[:2]:
restored = cv2.resize(restored, (bgr_img.shape[1], bgr_img.shape[0]), interpolation=cv2.INTER_AREA)
return np.clip(restored, 0, 255).astype(np.uint8)
except Exception as e:
self.log(f" Hinweis: GFPGAN-Restore fehlgeschlagen ({e}).")
return bgr_img
@staticmethod
def _bbox_to_rect(bbox, img_w, img_h, pad_ratio=0.22):
try:
x1, y1, x2, y2 = [float(v) for v in bbox]
except Exception:
return None
bw = max(1.0, x2 - x1)
bh = max(1.0, y2 - y1)
pad_x = max(8, int(round(bw * pad_ratio)))
pad_y = max(8, int(round(bh * pad_ratio)))
rx1 = max(0, int(round(x1)) - pad_x)
ry1 = max(0, int(round(y1)) - pad_y)
rx2 = min(int(img_w), int(round(x2)) + pad_x)
ry2 = min(int(img_h), int(round(y2)) + pad_y)
if rx2 <= rx1 + 2 or ry2 <= ry1 + 2:
return None
return rx1, ry1, rx2, ry2
def restore_faces(self, bgr_img, bboxes):
import cv2
import numpy as np
if bgr_img is None:
return bgr_img
if not bboxes:
return bgr_img
try:
self.load()
except Exception as e:
self.log(f" Hinweis: GFPGAN nicht bereit ({e}).")
return bgr_img
out = bgr_img.copy()
h, w = out.shape[:2]
restored_any = False
for bbox in bboxes:
rect = self._bbox_to_rect(bbox, w, h, pad_ratio=self.pad_ratio)
if rect is None:
continue
x1, y1, x2, y2 = rect
rw = x2 - x1
rh = y2 - y1
if rw < self.min_face_px or rh < self.min_face_px:
continue
crop = out[y1:y2, x1:x2]
if crop.size == 0:
continue
try:
_, _, restored = self.restorer.enhance(
crop,
has_aligned=False,
only_center_face=True,
paste_back=True,
)
except Exception:
continue
if restored is None:
continue
if restored.shape[:2] != crop.shape[:2]:
restored = cv2.resize(restored, (crop.shape[1], crop.shape[0]), interpolation=cv2.INTER_AREA)
alpha = max(0.0, min(1.0, float(self.blend_alpha)))
mixed = cv2.addWeighted(restored.astype(np.float32), alpha, crop.astype(np.float32), 1.0 - alpha, 0.0)
out[y1:y2, x1:x2] = np.clip(mixed, 0, 255).astype(np.uint8)
restored_any = True
if restored_any:
return out
return bgr_img
class FaceLibrary:
ROOT = SCRIPT_DIR / "face_library"
def __init__(self, log_fn=print):
self.log = log_fn
self.ROOT.mkdir(exist_ok=True)
self._detector = None
@staticmethod
def _slugify(name):
import re
import unicodedata
raw = (name or "").strip().lower()
if not raw:
raw = "gesicht"
raw = unicodedata.normalize("NFKD", raw)
raw = raw.encode("ascii", "ignore").decode("ascii")
raw = raw.replace(" ", "_")
raw = re.sub(r"[^a-z0-9_]+", "", raw)
raw = re.sub(r"_+", "_", raw).strip("_")
return raw or "gesicht"
def _ensure_detector(self):
if self._detector is not None:
return self._detector
import cv2
cascade_path = Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml"
detector = cv2.CascadeClassifier(str(cascade_path))
if detector.empty():
raise RuntimeError("Gesichtsdetektor konnte nicht geladen werden.")
self._detector = detector
return detector
def _detect_largest_face(self, bgr_img):
import cv2
det = self._ensure_detector()
gray = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2GRAY)
faces = det.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30))
if len(faces) == 0:
return None
x, y, w, h = max(faces, key=lambda f: int(f[2]) * int(f[3]))
return int(x), int(y), int(w), int(h)
def add(self, name, image_path):
import cv2
import datetime
image_path = Path(image_path)
if not image_path.is_file():
raise RuntimeError(f"Datei nicht gefunden: {image_path}")
src_img = _cv2_imread_unicode(image_path)
if src_img is None:
raise RuntimeError("Bild konnte nicht geladen werden.")
h0, w0 = src_img.shape[:2]
max_side = max(h0, w0)
scale = min(1.0, 1920.0 / max(1.0, float(max_side)))
if scale < 1.0:
work = cv2.resize(src_img, (int(round(w0 * scale)), int(round(h0 * scale))), interpolation=cv2.INTER_AREA)
else:
work = src_img
face = self._detect_largest_face(work)
if face is None:
raise RuntimeError("Kein Gesicht im Bild erkannt.")
fx, fy, fw, fh = face
inv = 1.0 / max(scale, 1e-9)
fx = int(round(fx * inv)); fy = int(round(fy * inv))
fw = int(round(fw * inv)); fh = int(round(fh * inv))
pad_x = int(round(fw * 0.20))
pad_y = int(round(fh * 0.20))
x1 = max(0, fx - pad_x); y1 = max(0, fy - pad_y)
x2 = min(w0, fx + fw + pad_x); y2 = min(h0, fy + fh + pad_y)
crop = src_img[y1:y2, x1:x2]
if crop.size == 0:
raise RuntimeError("Kein gueltiger Gesichtsausschnitt gefunden.")
base_slug = self._slugify(name)
slug = base_slug
idx = 2
while (self.ROOT / slug).exists():
slug = f"{base_slug}_{idx}"
idx += 1
face_dir = self.ROOT / slug
face_dir.mkdir(parents=True, exist_ok=True)
source_path = face_dir / "source.jpg"
thumb_path = face_dir / "thumb.png"
meta_path = face_dir / "meta.json"
if not _cv2_imwrite_unicode(source_path, src_img):
raise RuntimeError("source.jpg konnte nicht gespeichert werden.")
thumb = cv2.resize(crop, (96, 96), interpolation=cv2.INTER_AREA)
if not _cv2_imwrite_unicode(thumb_path, thumb):
raise RuntimeError("thumb.png konnte nicht gespeichert werden.")
meta = {
"slug": slug,
"name": (name or "").strip() or slug,
"added": datetime.date.today().isoformat(),
}
meta_path.write_text(json.dumps(meta, indent=2, ensure_ascii=False), encoding="utf-8")
return {
"slug": slug,
"name": meta["name"],
"source_path": str(source_path),
"thumb_path": str(thumb_path),
"added": meta["added"],
}
def remove(self, slug):
slug = self._slugify(slug)
target = self.ROOT / slug
if target.exists():
shutil.rmtree(target, ignore_errors=True)
def list_entries(self):
entries = []
if not self.ROOT.exists():
return entries
for d in self.ROOT.iterdir():
if not d.is_dir():
continue
source_path = d / "source.jpg"
meta_path = d / "meta.json"
thumb_path = d / "thumb.png"
if not source_path.exists():
self.log(f"WARNUNG Bibliothekseintrag ohne source.jpg uebersprungen: {d.name}")
continue
name = d.name
added = ""
if meta_path.exists():
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
name = (meta.get("name") or name).strip() or name
added = (meta.get("added") or "").strip()
except Exception:
pass
entries.append({
"slug": d.name,
"name": name,
"source_path": str(source_path),
"thumb_path": str(thumb_path),
"added": added,
})
entries.sort(key=lambda x: x["name"].lower())
return entries
def get_source_path(self, slug):
p = self.ROOT / self._slugify(slug) / "source.jpg"
if not p.exists():
raise RuntimeError(f"Eintrag nicht gefunden: {slug}")
return p
class FaceSwapper:
def __init__(self, log_fn=print):
self.log = log_fn
self.app = self.swapper = None
self.restorer = None
self.use_restoration = False
self._restoration_warned_cpu = False
self.enhance = True
self.color = True
self.high_sensitivity = True
self.det_size = (640, 640)
self.det_thresh = 0.5
# Video-Stabilitaet: nur konsistente Face-Tracks werden geswappt.
self.video_track_single_face = True
self.video_min_det_score = 0.30
self.video_start_det_score = 0.30
self.video_new_face_det_score = 0.55
self.video_min_face_size_px = 32
self.video_min_iou = 0.02
self.video_max_center_jump = 0.28
self.video_min_area_ratio = 0.20
self.video_max_area_ratio = 4.00
self.video_track_memory = 10
self.video_min_embed_sim = 0.05
self.video_track_accept_score = 0.15 # was 0.30 — too strict, rejected valid tracked faces
self.video_fade_in_step = 1.0
self.video_fade_out_step = 0.0
self.video_occlusion_hold = 0
self.video_occlusion_mouth_ratio_min = 0.30 # was 0.58 — too aggressive, falsely blocked swap
self.video_occlusion_texture_drop = 0.30 # was 0.55 — too sensitive to normal lighting change
self.video_occluder_diff_thresh = 24
self.video_occluder_min_coverage = 0.08
self.video_occluder_max_coverage = 0.46
self.video_clean_ref_max_coverage = 0.35 # was 0.05 — too strict, clean_original never saved
self.video_abs_mouth_width_min = 0.10 # was 0.45 — way too high, flagged almost all faces
self.video_abs_mouth_drop_min = 0.10 # was 0.32 — too high, caused false occlusion
def init_models(self):
providers = _get_providers(self.log)
from insightface.app import FaceAnalysis
import insightface.model_zoo as mz
gpu = "CUDAExecutionProvider" in providers
self.log(f" {'GPU (CUDA) ' if gpu else 'CPU'}")
self.log(" Lade buffalo_l ...")
preferred_det_size = (1024, 1024) if gpu else (768, 768)
self.det_thresh = 0.25
self.app = FaceAnalysis(name="buffalo_l", root=str(MODELS_DIR.parent), providers=providers)
try:
self.app.prepare(ctx_id=0 if gpu else -1, det_size=preferred_det_size, det_thresh=self.det_thresh)
self.det_size = preferred_det_size
except Exception as e:
fallback_size = (640, 640)
self.log(f" Hohe Detektions-Aufloesung fehlgeschlagen ({e}). Fallback auf {fallback_size[0]}x{fallback_size[1]}.")
self.app = FaceAnalysis(name="buffalo_l", root=str(MODELS_DIR.parent), providers=providers)
self.app.prepare(ctx_id=0 if gpu else -1, det_size=fallback_size, det_thresh=self.det_thresh)
self.det_size = fallback_size
self.log(f" Detektion: {self.det_size[0]}x{self.det_size[1]}, Schwelle {self.det_thresh:.2f}")
swap_path = MODELS_DIR / "inswapper_128.onnx"
if not swap_path.exists():
raise RuntimeError(f"inswapper_128.onnx fehlt in {MODELS_DIR}")
self.log(" Lade inswapper ...")
self.swapper = mz.get_model(str(swap_path), providers=providers)
self.log(" Modelle geladen.")
@staticmethod
def _face_area(face):
x1, y1, x2, y2 = [float(v) for v in face.bbox]
return max(0.0, x2 - x1) * max(0.0, y2 - y1)
def _pick_primary_face(self, faces):
if not faces:
return None
return max(faces, key=lambda f: (self._face_area(f), float(getattr(f, "det_score", 0.0))))
@staticmethod
def _norm_face_metrics(face, w, h):
x1, y1, x2, y2 = [float(v) for v in face.bbox]
bw = max(1.0, x2 - x1)
bh = max(1.0, y2 - y1)
nx1 = max(0.0, min(1.0, x1 / max(1.0, float(w))))
ny1 = max(0.0, min(1.0, y1 / max(1.0, float(h))))
nx2 = max(0.0, min(1.0, x2 / max(1.0, float(w))))
ny2 = max(0.0, min(1.0, y2 / max(1.0, float(h))))
cx = (nx1 + nx2) * 0.5
cy = (ny1 + ny2) * 0.5
area = max(1e-6, (nx2 - nx1) * (ny2 - ny1))
return {
"bbox": (nx1, ny1, nx2, ny2),
"cx": cx,
"cy": cy,
"area": area,
"px_w": bw,
"px_h": bh,
"score": float(getattr(face, "det_score", 0.0)),
}
@staticmethod
def _extract_embedding(face):
import numpy as np
emb = getattr(face, "normed_embedding", None)
if emb is None:
emb = getattr(face, "embedding", None)
if emb is None:
return None
arr = np.asarray(emb, dtype=np.float32).reshape(-1)
if arr.size == 0:
return None
norm = float(np.linalg.norm(arr))
if norm < 1e-8:
return None
return arr / norm
@staticmethod
def _extract_kps(face, w, h):
import numpy as np
kps = getattr(face, "kps", None)
if kps is None:
return None
arr = np.asarray(kps, dtype=np.float32)
if arr.ndim != 2 or arr.shape[1] != 2 or arr.shape[0] < 3:
return None
arr = arr[:5, :].copy()
arr[:, 0] = np.clip(arr[:, 0] / max(1.0, float(w)), 0.0, 1.0)
arr[:, 1] = np.clip(arr[:, 1] / max(1.0, float(h)), 0.0, 1.0)
return arr
@staticmethod
def _embedding_similarity(a, b):
import numpy as np
if a is None or b is None:
return None
return float(np.clip(np.dot(a, b), -1.0, 1.0))
@staticmethod
def _kps_similarity(a, b):
import numpy as np
if a is None or b is None:
return None
n = min(int(a.shape[0]), int(b.shape[0]))
if n < 3:
return None
dist = float(np.linalg.norm(a[:n] - b[:n], axis=1).mean())
# 0.0 Distanz => 1.0 Similarity; >0.20 gilt als deutlich instabil.
return max(0.0, min(1.0, 1.0 - dist / 0.20))
def _build_video_face_entry(self, face, w, h):
m = self._norm_face_metrics(face, w, h)
return {
"face": face,
"bbox": m["bbox"],
"cx": m["cx"],
"cy": m["cy"],
"area": m["area"],
"px_w": m["px_w"],
"px_h": m["px_h"],
"det_score": m["score"],
"embedding": self._extract_embedding(face),
"kps": self._extract_kps(face, w, h),
}
@staticmethod
def _trim_track_entry(entry):
return {
"bbox": entry["bbox"],
"cx": entry["cx"],
"cy": entry["cy"],
"area": entry["area"],
"det_score": entry["det_score"],
"embedding": entry["embedding"],
"kps": entry["kps"],
}
@staticmethod
def _kps_geometry(kps):
import numpy as np
if kps is None:
return None
arr = np.asarray(kps, dtype=np.float32)
if arr.ndim != 2 or arr.shape[0] < 5 or arr.shape[1] != 2:
return None
eye_a, eye_b, nose, mouth_a, mouth_b = arr[:5]
eye_dist = float(np.linalg.norm(eye_a - eye_b))
if eye_dist < 1e-6:
return None
eye_mid = (eye_a + eye_b) * 0.5
mouth_mid = (mouth_a + mouth_b) * 0.5
return {
"eye_dist": eye_dist,
"mouth_width": float(np.linalg.norm(mouth_a - mouth_b)) / eye_dist,
"nose_drop": float(np.linalg.norm(nose - eye_mid)) / eye_dist,
"mouth_drop": float(np.linalg.norm(mouth_mid - nose)) / eye_dist,
"mouth_offset_y": float((mouth_mid[1] - eye_mid[1]) / eye_dist),
}
@staticmethod
def _bbox_to_pixel_rect(bbox, w, h):
x1 = max(0, min(w - 1, int(round(float(bbox[0]) * w))))
y1 = max(0, min(h - 1, int(round(float(bbox[1]) * h))))
x2 = max(0, min(w, int(round(float(bbox[2]) * w))))
y2 = max(0, min(h, int(round(float(bbox[3]) * h))))
if x2 <= x1 + 2 or y2 <= y1 + 2:
return None
return x1, y1, x2, y2
@staticmethod
def _face_texture_ratio(gray, bbox):
import cv2
h, w = gray.shape[:2]
rect = FaceSwapper._bbox_to_pixel_rect(bbox, w, h)
if rect is None:
return None
x1, y1, x2, y2 = rect
fh = y2 - y1
if fh < 16:
return None
upper_end = y1 + int(fh * 0.42)
lower_start = y1 + int(fh * 0.50)
if upper_end <= y1 + 4 or lower_start >= y2 - 4:
return None
upper = gray[y1:upper_end, x1:x2]
lower = gray[lower_start:y2, x1:x2]
if upper.size == 0 or lower.size == 0:
return None
upper_var = float(cv2.Laplacian(upper, cv2.CV_32F).var())
lower_var = float(cv2.Laplacian(lower, cv2.CV_32F).var())
if upper_var < 1e-6:
return None
ratio = lower_var / upper_var
return max(0.0, min(4.0, ratio))
@staticmethod
def _blend_frames(swapped, original, alpha):
import cv2
if alpha <= 1e-3:
return original
if alpha >= 1.0 - 1e-3:
return swapped
return cv2.addWeighted(swapped, float(alpha), original, float(1.0 - alpha), 0.0)
@staticmethod
def _keep_border_components(raw_mask, min_area_ratio=0.015, lateral_only=False):
import cv2, numpy as np
if raw_mask is None or raw_mask.size == 0:
return raw_mask
h, w = raw_mask.shape[:2]
labels_count, labels, stats, _ = cv2.connectedComponentsWithStats((raw_mask > 0).astype(np.uint8), 8)
kept = np.zeros((h, w), dtype=np.uint8)
min_area = max(8, int(h * w * min_area_ratio))
border = max(3, int(min(h, w) * 0.06))
for label in range(1, labels_count):
x, y, bw, bh, area = stats[label]
if area < min_area:
continue
if lateral_only:
touches_border = x <= border or x + bw >= w - border
else:
touches_border = (
x <= border or y <= border or
x + bw >= w - border or y + bh >= h - border
)
if touches_border:
kept[labels == label] = 255
return kept
@staticmethod
def _apply_float_mask(foreground, background, mask):
import numpy as np
if mask is None:
return foreground
m = np.clip(mask.astype(np.float32), 0.0, 1.0)
if m.ndim == 2:
m = m[:, :, None]
out = foreground.astype(np.float32) * (1.0 - m) + background.astype(np.float32) * m
return np.clip(out, 0, 255).astype(np.uint8)
@staticmethod
def _soft_bbox_mask(shape, bbox):
import cv2, numpy as np
h, w = shape[:2]
rect = FaceSwapper._bbox_to_pixel_rect(bbox, w, h)
mask = np.zeros((h, w), dtype=np.float32)
if rect is None:
return mask
x1, y1, x2, y2 = rect
bw = x2 - x1
bh = y2 - y1
pad_x = max(2, int(bw * 0.05))
pad_y = max(2, int(bh * 0.06))
mask[y1 + pad_y:y2 - pad_y, x1 + pad_x:x2 - pad_x] = 1.0
mask = cv2.GaussianBlur(mask, (0, 0), max(3.0, min(bw, bh) * 0.045))
return np.clip(mask, 0.0, 1.0)
def _occluder_mask_from_reference(self, original, bbox, track_state):
import cv2, numpy as np
if track_state is None or bbox is None:
return None, 0.0
ref = track_state.get("clean_original")
if ref is None:
return None, 0.0
if getattr(ref, "shape", None) != original.shape:
ref = cv2.resize(ref, (original.shape[1], original.shape[0]), interpolation=cv2.INTER_AREA)
h, w = original.shape[:2]
rect = self._bbox_to_pixel_rect(bbox, w, h)
if rect is None:
return None, 0.0
x1, y1, x2, y2 = rect
cur_roi = original[y1:y2, x1:x2]
ref_roi = ref[y1:y2, x1:x2]
if cur_roi.size == 0 or ref_roi.size == 0:
return None, 0.0
diff = cv2.absdiff(cur_roi, ref_roi)
diff_gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
_, raw = cv2.threshold(diff_gray, self.video_occluder_diff_thresh, 255, cv2.THRESH_BINARY)
k = max(3, int(min(x2 - x1, y2 - y1) * 0.045) | 1)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
raw = cv2.morphologyEx(raw, cv2.MORPH_CLOSE, kernel, iterations=2)
raw = cv2.morphologyEx(raw, cv2.MORPH_OPEN, kernel, iterations=1)
raw = self._keep_border_components(raw, min_area_ratio=0.012, lateral_only=True)
coverage = float((raw > 0).mean())
if coverage < self.video_occluder_min_coverage:
return None, coverage
soft = cv2.GaussianBlur(raw.astype(np.float32) / 255.0, (0, 0), max(2.0, k * 0.6))
mask = np.zeros((h, w), dtype=np.float32)
mask[y1:y2, x1:x2] = np.clip(soft, 0.0, 1.0)
return mask, coverage
def _carry_forward_swap(self, original, track_state):
import cv2, numpy as np
if track_state is None:
return original, 0
last_result = track_state.get("last_result")
bbox = track_state.get("last_bbox")
if last_result is None or bbox is None:
return original, 0
if getattr(last_result, "shape", None) != original.shape:
last_result = cv2.resize(last_result, (original.shape[1], original.shape[0]), interpolation=cv2.INTER_AREA)
face_mask = self._soft_bbox_mask(original.shape, bbox)
occ_mask, _ = self._occluder_mask_from_reference(original, bbox, track_state)
current_mask, _ = self._current_occluder_mask(original, bbox)
if current_mask is not None:
occ_mask = current_mask if occ_mask is None else np.maximum(occ_mask, current_mask)
if occ_mask is not None:
face_mask = face_mask * (1.0 - occ_mask)
return self._apply_float_mask(original, last_result, face_mask), 1
def _current_occluder_mask(self, original, bbox):
import cv2, numpy as np
h, w = original.shape[:2]
rect = self._bbox_to_pixel_rect(bbox, w, h)
if rect is None:
return None, 0.0
x1, y1, x2, y2 = rect
roi = original[y1:y2, x1:x2]
if roi.size == 0:
return None, 0.0
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
ycrcb = cv2.cvtColor(roi, cv2.COLOR_BGR2YCrCb)
skin = cv2.inRange(ycrcb, (0, 133, 77), (255, 184, 138))
dark_lace = cv2.inRange(gray, 0, 92)
raw = cv2.bitwise_or(skin, dark_lace)
fh, fw = gray.shape[:2]
gate = np.zeros((fh, fw), dtype=np.uint8)
gate[int(fh * 0.10):int(fh * 0.94), :] = 255
raw = cv2.bitwise_and(raw, gate)
k = max(3, int(min(fw, fh) * 0.045) | 1)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
raw = cv2.morphologyEx(raw, cv2.MORPH_CLOSE, kernel, iterations=1)
raw = cv2.morphologyEx(raw, cv2.MORPH_OPEN, kernel, iterations=1)
raw = self._keep_border_components(raw, min_area_ratio=0.010, lateral_only=True)
coverage = float((raw > 0).mean())
if coverage < self.video_occluder_min_coverage:
return None, coverage
soft = cv2.GaussianBlur(raw.astype(np.float32) / 255.0, (0, 0), max(2.0, k * 0.55))
mask = np.zeros((h, w), dtype=np.float32)
mask[y1:y2, x1:x2] = np.clip(soft, 0.0, 1.0)
return mask, coverage
def _smooth_skin_occluder_mask(self, original, bbox):
import cv2, numpy as np
h, w = original.shape[:2]
rect = self._bbox_to_pixel_rect(bbox, w, h)
if rect is None:
return None, 0.0
x1, y1, x2, y2 = rect
roi = original[y1:y2, x1:x2]
if roi.size == 0:
return None, 0.0
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
ycrcb = cv2.cvtColor(roi, cv2.COLOR_BGR2YCrCb)
skin = cv2.inRange(ycrcb, (0, 133, 77), (255, 180, 135))
texture = cv2.Laplacian(gray, cv2.CV_32F)
low_texture = (np.abs(texture) < 8.0).astype(np.uint8) * 255
raw = cv2.bitwise_and(skin, low_texture)
fh, fw = gray.shape[:2]
gate = np.zeros((fh, fw), dtype=np.uint8)
gate[int(fh * 0.22):int(fh * 0.90), int(fw * 0.12):int(fw * 0.94)] = 255
raw = cv2.bitwise_and(raw, gate)
k = max(5, int(min(fw, fh) * 0.055) | 1)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
raw = cv2.morphologyEx(raw, cv2.MORPH_CLOSE, kernel, iterations=2)
raw = cv2.morphologyEx(raw, cv2.MORPH_OPEN, kernel, iterations=1)
raw = self._keep_border_components(raw, min_area_ratio=0.025, lateral_only=True)
coverage = float((raw > 0).mean())
if coverage < 0.12:
return None, coverage
if coverage > 0.42:
return None, coverage
soft = cv2.GaussianBlur(raw.astype(np.float32) / 255.0, (0, 0), max(2.0, k * 0.5))
mask = np.zeros((h, w), dtype=np.float32)
mask[y1:y2, x1:x2] = np.clip(soft, 0.0, 1.0)
return mask, coverage
def _is_occluded_mouth(self, candidate, reference, gray, track_state):
if candidate is None:
return False
occluded = False
# Geometrie-Pruefung: Mund-/Nasenverhaeltnis sollte ueber Frames stabil bleiben.
ref_geo = self._kps_geometry(reference.get("kps") if reference else None)
cur_geo = self._kps_geometry(candidate.get("kps"))
if ref_geo and cur_geo:
ref_mouth = max(1e-6, ref_geo["mouth_width"])
ref_drop = max(1e-6, ref_geo["mouth_drop"])
mouth_ratio = cur_geo["mouth_width"] / ref_mouth
drop_ratio = cur_geo["mouth_drop"] / ref_drop
if mouth_ratio < self.video_occlusion_mouth_ratio_min or drop_ratio < self.video_occlusion_mouth_ratio_min:
occluded = True
if cur_geo:
if (cur_geo["mouth_width"] < self.video_abs_mouth_width_min or
cur_geo["mouth_drop"] < self.video_abs_mouth_drop_min):
occluded = True
# Textur-Pruefung: Bei Hand vor Mund bricht Detail im unteren Gesichtsbereich stark ein.
texture_ratio = self._face_texture_ratio(gray, candidate["bbox"])
if texture_ratio is not None:
tex_ref = track_state.get("texture_ref")
if tex_ref is not None and texture_ratio < tex_ref * self.video_occlusion_texture_drop:
occluded = True
if not occluded:
if tex_ref is None:
track_state["texture_ref"] = texture_ratio
else:
track_state["texture_ref"] = 0.90 * float(tex_ref) + 0.10 * texture_ratio
return occluded
@staticmethod
def _bbox_iou(a, b):
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
ix1 = max(ax1, bx1)
iy1 = max(ay1, by1)
ix2 = min(ax2, bx2)
iy2 = min(ay2, by2)
iw = max(0.0, ix2 - ix1)
ih = max(0.0, iy2 - iy1)
inter = iw * ih
if inter <= 0:
return 0.0
a_area = max(1e-9, (ax2 - ax1) * (ay2 - ay1))
b_area = max(1e-9, (bx2 - bx1) * (by2 - by1))
return inter / max(1e-9, a_area + b_area - inter)
def _filter_video_faces(self, faces, frame_shape, track_state):
h, w = frame_shape[:2]
active = track_state.get("active")
track_state["candidate"] = None
candidates = []
# When a face is already being tracked we accept lower-confidence detections.
# A hand partially covering the tracked face drops the detector score, but
# the face is still there and should be swapped.
effective_min_score = (self.video_min_det_score * 0.5
if active is not None else self.video_min_det_score)
for face in faces:
entry = self._build_video_face_entry(face, w, h)
if entry["px_w"] < self.video_min_face_size_px or entry["px_h"] < self.video_min_face_size_px:
continue
if entry["det_score"] < effective_min_score:
continue
candidates.append(entry)
if not candidates:
track_state["miss"] = int(track_state.get("miss", 0)) + 1
if track_state["miss"] >= self.video_track_memory:
track_state["active"] = None
track_state["texture_ref"] = None
track_state["hold"] = 0
return []
candidates.sort(key=lambda c: (c["area"], c["det_score"]), reverse=True)
if active is None:
seed = None
for c in candidates:
if c["det_score"] >= self.video_start_det_score:
seed = c
break
if seed is None:
track_state["miss"] = int(track_state.get("miss", 0)) + 1
return []
track_state["candidate"] = seed
track_state["miss"] = 0
return [seed["face"]]
best = None
best_score = -1.0
# When a face is already tracked, allow a smaller detected area (hand can
# shrink the visible face region by up to 70 % without losing the track).
effective_min_area = (self.video_min_area_ratio * 0.30
if active is not None else self.video_min_area_ratio)
for c in candidates:
iou = self._bbox_iou(c["bbox"], active["bbox"])
dx = c["cx"] - active["cx"]
dy = c["cy"] - active["cy"]
center_shift = (dx * dx + dy * dy) ** 0.5
area_ratio = c["area"] / max(1e-9, active["area"])
if not (effective_min_area <= area_ratio <= self.video_max_area_ratio):
continue
if iou < self.video_min_iou and center_shift > self.video_max_center_jump:
continue
emb_sim = self._embedding_similarity(c["embedding"], active["embedding"])
if emb_sim is not None and emb_sim < self.video_min_embed_sim:
continue
kps_sim = self._kps_similarity(c["kps"], active["kps"])
iou_score = max(0.0, min(1.0, iou / 0.35))
motion_score = max(0.0, min(1.0, 1.0 - center_shift / max(self.video_max_center_jump, 1e-6)))
det_score = max(0.0, min(1.0, c["det_score"]))
emb_score = 0.5 if emb_sim is None else max(0.0, min(1.0, (emb_sim + 1.0) * 0.5))
kps_score = 0.5 if kps_sim is None else kps_sim
track_score = (0.30 * iou_score + 0.24 * motion_score + 0.16 * det_score
+ 0.20 * emb_score + 0.10 * kps_score)
if emb_sim is None and c["det_score"] < self.video_new_face_det_score:
track_score *= 0.86
if track_score > best_score:
best_score = track_score
best = c
if best is None or best_score < self.video_track_accept_score:
track_state["miss"] = int(track_state.get("miss", 0)) + 1
if track_state["miss"] >= self.video_track_memory:
track_state["active"] = None
track_state["texture_ref"] = None
track_state["hold"] = 0
return []
track_state["candidate"] = best
track_state["miss"] = 0
return [best["face"]]
def _detect_faces(self, frame):
import cv2
faces = self.app.get(frame)
if faces or not self.high_sensitivity:
return faces, frame, 1.0
h, w = frame.shape[:2]
short_edge = min(h, w)
if short_edge < 720: scale = 2.0
elif short_edge < 1080: scale = 1.5
else: scale = 1.25
up_w = max(2, int(round(w * scale)))
up_h = max(2, int(round(h * scale)))
max_side = max(up_w, up_h)
if max_side > 1920:
clamp = 1920.0 / max_side
up_w = max(2, int(round(up_w * clamp)))
up_h = max(2, int(round(up_h * clamp)))
scale *= clamp
upscaled = None
if scale > 1.01:
upscaled = cv2.resize(frame, (up_w, up_h), interpolation=cv2.INTER_CUBIC)
faces = self.app.get(upscaled)
if faces:
return faces, upscaled, scale
# Low-threshold fallback: faces partially covered by a hand have reduced
# detector confidence. Drop the threshold temporarily to find them.
det_model = getattr(self.app, 'det_model', None)
if det_model is None:
# Older insightface builds store the model in self.app.models dict
for _m in getattr(self.app, 'models', {}).values():
if hasattr(_m, 'det_thresh'):
det_model = _m
break
if det_model is not None and hasattr(det_model, 'det_thresh'):
orig_thresh = det_model.det_thresh
try:
det_model.det_thresh = 0.10 # much lower so partially-occluded faces are found
faces = self.app.get(frame)
if not faces and upscaled is not None:
faces_up = self.app.get(upscaled)
if faces_up:
return faces_up, upscaled, scale
finally:
det_model.det_thresh = orig_thresh # always restore
if faces:
return faces, frame, 1.0
if upscaled is not None:
return [], upscaled, scale
return [], frame, 1.0
def _swap_faces_in_frame(self, frame, src_face, track_state=None):
import cv2, numpy as np
def _note_drop(work_img):
if track_state is None:
return work_img, 0
track_state["locked_frames"] = 0
track_state["candidate"] = None
track_state["alpha"] = max(0.0, float(track_state.get("alpha", 1.0)) - self.video_fade_out_step)
return self._carry_forward_swap(work_img, track_state)
faces, work_img, scale = self._detect_faces(frame)
if not faces:
if track_state is not None:
track_state["miss"] = int(track_state.get("miss", 0)) + 1
if track_state["miss"] >= self.video_track_memory:
track_state["active"] = None
track_state["texture_ref"] = None
track_state["hold"] = 0
result, face_count = _note_drop(work_img)
if scale != 1.0:
result = cv2.resize(result, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_AREA)
return result, face_count
return frame, 0
if track_state is not None:
faces = self._filter_video_faces(faces, work_img.shape, track_state)
if not faces:
result, face_count = _note_drop(work_img)
if scale != 1.0:
result = cv2.resize(result, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_AREA)
return result, face_count
candidate = track_state.get("candidate") if track_state is not None else None
prev_active = track_state.get("active") if track_state is not None else None
gray = cv2.cvtColor(work_img, cv2.COLOR_BGR2GRAY) if track_state is not None else None
mouth_occluded = False
if track_state is not None:
hold = int(track_state.get("hold", 0))
if hold > 0:
track_state["hold"] = max(0, hold - 1)
mouth_occluded = True
if self._is_occluded_mouth(candidate, prev_active, gray, track_state):
track_state["hold"] = self.video_occlusion_hold
mouth_occluded = True
if not mouth_occluded:
track_state["hold"] = 0
if candidate is not None:
track_state["active"] = self._trim_track_entry(candidate)
track_state["locked_frames"] = int(track_state.get("locked_frames", 0)) + 1
alpha_prev = float(track_state.get("alpha", 0.0))
track_state["alpha"] = min(1.0, alpha_prev + self.video_fade_in_step)
original = work_img.copy()
result = work_img.copy()
swapped_bboxes = []
for face in faces:
result = self.swapper.get(result, face, src_face, paste_back=True)
swapped_bboxes.append(face.bbox)
if self.color:
result = _match_face_color(result, original, face.bbox)
if self.enhance:
result = _enhance_face_region(result, face.bbox, sharpen=True)
if self.use_restoration and self.restorer is not None:
providers = _get_providers()
if not self._restoration_warned_cpu and "CUDAExecutionProvider" not in providers:
self.log(" Hinweis: GFPGAN im CPU-Modus ist langsam (ca. 1-3 s pro Frame moeglich).")
self._restoration_warned_cpu = True
result = self.restorer.restore_faces(result, swapped_bboxes)
if track_state is not None:
result = self._blend_frames(result, original, float(track_state.get("alpha", 1.0)))
active = track_state.get("active")
bbox = active.get("bbox") if active else (candidate.get("bbox") if candidate else None)
occ_mask, coverage = self._occluder_mask_from_reference(original, bbox, track_state)
# Only run current_occluder_mask when we have a clean reference — otherwise the face's
# own skin colour triggers false occlusion detection on every frame.
has_clean_ref = track_state.get("clean_original") is not None
current_mask, current_coverage = (self._current_occluder_mask(original, bbox)
if bbox is not None and has_clean_ref else (None, 0.0))
if current_mask is not None:
occ_mask = current_mask if occ_mask is None else np.maximum(occ_mask, current_mask)
coverage = max(coverage, current_coverage)
if mouth_occluded and bbox is not None and has_clean_ref:
smooth_mask, smooth_coverage = self._smooth_skin_occluder_mask(original, bbox)
if smooth_mask is not None:
occ_mask = smooth_mask if occ_mask is None else np.maximum(occ_mask, smooth_mask)
coverage = max(coverage, smooth_coverage)
if occ_mask is not None:
result = self._apply_float_mask(result, original, occ_mask)
if candidate is not None:
track_state["last_bbox"] = candidate["bbox"]
track_state["last_result"] = result.copy()
track_state["last_original"] = original.copy()
if bbox is not None and not mouth_occluded and coverage <= self.video_clean_ref_max_coverage:
track_state["clean_original"] = original.copy()
if scale != 1.0:
result = cv2.resize(result, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_AREA)
return result, len(faces)
def get_first_face(self, img):
faces, _, _ = self._detect_faces(img)
return self._pick_primary_face(faces)
def swap_image(self, src_face, target, out):
original = _cv2_imread_unicode(target)
if original is None:
return False
result, face_count = self._swap_faces_in_frame(original, src_face)
if face_count == 0:
return False
return _cv2_imwrite_unicode(out, result)
def swap_video(self, src_face, target_video, out_video, progress_cb=None, cancel_check=None):
import cv2, subprocess, shutil, tempfile
cap, cap_tmp_copy = _open_videocapture_unicode(target_video, log_fn=self.log)
if not cap.isOpened():
raise RuntimeError(f"Video konnte nicht geoeffnet werden: {target_video}")
fps = cap.get(cv2.CAP_PROP_FPS) or 25
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fd_tmp, tmp_name = tempfile.mkstemp(prefix="faceswap_noaudio_", suffix=".mp4", dir=str(SCRIPT_DIR))
os.close(fd_tmp)
tmp_video = Path(tmp_name)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(tmp_video), fourcc, fps, (width, height))
if not writer.isOpened():
cap.release()
_cleanup_temp_file(cap_tmp_copy)
_cleanup_temp_file(tmp_video)
raise RuntimeError(f"Video-Writer konnte nicht geoeffnet werden: {out_video}")
done = 0; swapped_frames = 0; swapped_faces = 0
import numpy as np
try:
while True:
ret, frame = cap.read()
if not ret:
break
if cancel_check and cancel_check():
break
# Detect faces directly — no tracking, no occlusion logic
faces = self.app.get(frame)
if done == 0:
# Log first-frame diagnostics once
src_emb = getattr(src_face, 'normed_embedding', None)
self.log(f" [Info] Frame 1: {len(faces)} Gesicht(er) erkannt | "
f"src_embedding={'OK' if src_emb is not None else 'FEHLT'} | "
f"Frame {width}x{height}")
result = frame.copy()
for face in faces:
before = result.copy()
result = self.swapper.get(result, face, src_face, paste_back=True)
diff = float(np.abs(result.astype(np.float32) - before.astype(np.float32)).mean())
if done == 0:
tgt_emb = getattr(face, 'normed_embedding', None)
self.log(f" [Info] Swap-Differenz Frame 1: {diff:.4f} | "
f"tgt_embedding={'OK' if tgt_emb is not None else 'FEHLT'}")
if diff > 0 and self.color:
result = _match_face_color(result, frame, face.bbox)
if diff > 0 and self.enhance:
result = _enhance_face_region(result, face.bbox, sharpen=True)
if face is not None:
swapped_faces += 1
writer.write(result)
if len(faces) > 0:
swapped_frames += 1
done += 1
if progress_cb:
progress_cb(done, total)
finally:
cap.release()
writer.release()
_cleanup_temp_file(cap_tmp_copy)
ffmpeg = shutil.which("ffmpeg")
if ffmpeg:
try:
cmd = [ffmpeg, "-y", "-i", str(tmp_video), "-i", str(target_video),
"-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0?",
"-shortest", str(out_video)]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_cleanup_temp_file(tmp_video)
except Exception:
shutil.move(str(tmp_video), str(out_video))
else:
shutil.move(str(tmp_video), str(out_video))
return {"frames_processed": done, "frames_total": total,
"frames_swapped": swapped_frames, "faces_swapped": swapped_faces}
def swap_webcam(
self,
src_face,
camera_index=0,
record_path=None,
fps_target=25.0,
cancel_check=None,
frame_cb=None,
stats_cb=None,
use_tracking=False,
):
import cv2
import time
import tempfile
cap = cv2.VideoCapture(int(camera_index))
if not cap.isOpened():
raise RuntimeError(f"Kamera konnte nicht geoeffnet werden (Index {camera_index}).")
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
src_fps = cap.get(cv2.CAP_PROP_FPS) or float(fps_target or 25.0)
writer = None
record_tmp = None
if record_path:
out_path = Path(record_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(out_path), fourcc, src_fps, (width, height))
if not writer.isOpened():
try:
fd_tmp, tmp_name = tempfile.mkstemp(prefix="faceswap_webcam_", suffix=".mp4", dir=str(SCRIPT_DIR))
os.close(fd_tmp)
record_tmp = Path(tmp_name)
writer = cv2.VideoWriter(str(record_tmp), fourcc, src_fps, (width, height))
except Exception:
writer = None
if writer is None or not writer.isOpened():
writer = None
_cleanup_temp_file(record_tmp)
record_tmp = None
self.log(f"WARNUNG: Aufnahme konnte nicht gestartet werden: {record_path}")
else:
self.log(" Hinweis: Unicode-Ausgabe-Fallback aktiv (temp Datei).")
frame_count = 0
track_state = {} if use_tracking else None
fps_smooth = 0.0
try:
while True:
if cancel_check and cancel_check():
break
started = time.perf_counter()
ret, frame = cap.read()
if not ret:
break
result, face_count = self._swap_faces_in_frame(frame, src_face, track_state=track_state)
if frame_cb:
frame_cb(result)
if writer is not None:
writer.write(result)
frame_count += 1
elapsed = max(1e-6, time.perf_counter() - started)
inst_fps = 1.0 / elapsed
fps_smooth = inst_fps if fps_smooth <= 0 else (0.90 * fps_smooth + 0.10 * inst_fps)
if stats_cb and frame_count % 30 == 0:
stats_cb(float(fps_smooth), int(face_count))
if fps_target and fps_target > 0:
wait_s = (1.0 / float(fps_target)) - elapsed
if wait_s > 0:
time.sleep(min(wait_s, 0.02))
finally:
cap.release()
if writer is not None:
writer.release()
if record_tmp is not None and record_path:
try:
shutil.move(str(record_tmp), str(record_path))
except Exception as e:
self.log(f"WARNUNG: Temp-Aufnahme konnte nicht verschoben werden ({e})")
finally:
_cleanup_temp_file(record_tmp)
class VoiceCloner:
XTTS_MODEL = "tts_models/multilingual/multi-dataset/xtts_v2"
VC_MODEL = "voice_conversion_models/multilingual/vctk/freevc24"
SUPPORTED_LANGS = (
"en", "es", "fr", "de", "it", "pt", "pl", "tr", "ru",
"nl", "cs", "ar", "zh-cn", "ja", "hu", "ko", "hi"
)
SUPPORTED_AUDIO = {".wav", ".mp3", ".m4a", ".flac", ".ogg", ".aac", ".wma"}
def __init__(self, log_fn=print):
self.log = log_fn
self.device = "cpu"
self.tts = None
self.vc = None
def _load_runtime(self):
try:
import torch
from TTS.api import TTS
except Exception as e:
raise RuntimeError(
"Voice-Cloning ist nicht installiert.\n\n"
"Bitte installiere zuerst:\n"
" python -m pip install coqui-tts torch torchaudio\n\n"
f"Details: {e}"
) from e
self.device = "cuda" if torch.cuda.is_available() else "cpu"
return TTS
def _ensure_xtts(self):
if self.tts is not None: return
TTS = self._load_runtime()
self.log(f"VOICE: Lade XTTS v2 ({self.device}) ...")
self.tts = TTS(self.XTTS_MODEL).to(self.device)
self.log("VOICE: XTTS bereit.")
def _ensure_vc(self):
if self.vc is not None: return
TTS = self._load_runtime()
self.log(f"VOICE: Lade FreeVC ({self.device}) ...")
self.vc = TTS(self.VC_MODEL).to(self.device)
self.log("VOICE: FreeVC bereit.")
@staticmethod
def _check_audio(path, label):
p = Path(path)
if not p.is_file():
raise RuntimeError(f"{label} nicht gefunden:\n{path}")
if p.suffix.lower() not in VoiceCloner.SUPPORTED_AUDIO:
raise RuntimeError(
f"{label} hat ein nicht unterstuetztes Format: {p.suffix}\n"
f"Erlaubt: {', '.join(sorted(VoiceCloner.SUPPORTED_AUDIO))}"
)
return str(p)
def clone_from_text(self, speaker_wav, text, language, out_file):
if not text.strip(): raise RuntimeError("Bitte Text eingeben.")
language = (language or "de").strip().lower()
if language not in self.SUPPORTED_LANGS:
raise RuntimeError(f"Sprache '{language}' nicht unterstuetzt. Nutze z.B.: {', '.join(self.SUPPORTED_LANGS)}")
speaker_wav = self._check_audio(speaker_wav, "Referenz-Stimme")
self._ensure_xtts()
out_path = Path(out_file)
out_path.parent.mkdir(parents=True, exist_ok=True)
self.tts.tts_to_file(text=text, speaker_wav=speaker_wav, language=language, file_path=str(out_path))
return str(out_path)
def clone_from_audio(self, speaker_wav, source_wav, out_file):
speaker_wav = self._check_audio(speaker_wav, "Referenz-Stimme")
source_wav = self._check_audio(source_wav, "Eingabe-Audio")
self._ensure_vc()
import tempfile, math
try:
import soundfile as sf
import numpy as np
except ImportError:
raise RuntimeError(
"Bitte installiere soundfile:\n"
" python -m pip install soundfile"
)
out_path = Path(out_file)
out_path.parent.mkdir(parents=True, exist_ok=True)
# Audiodatei laden und in Segmente aufteilen
CHUNK_SEC = 30 # Segmentlänge in Sekunden (bei RAM-Problemen kleiner wählen, z.B. 20)
data, sr = sf.read(source_wav, always_2d=False)
chunk_samples = int(CHUNK_SEC * sr)
total_samples = len(data)
num_chunks = math.ceil(total_samples / chunk_samples)
if num_chunks <= 1:
# Kurze Datei: direkt verarbeiten
self.vc.voice_conversion_to_file(
source_wav=source_wav, target_wav=speaker_wav, file_path=str(out_path)
)
return str(out_path)
self.log(f"VOICE: Datei zu lang — teile in {num_chunks} Segmente à {CHUNK_SEC}s ...")
results = []
with tempfile.TemporaryDirectory() as tmpdir:
for i in range(num_chunks):
start = i * chunk_samples
end = min(start + chunk_samples, total_samples)
chunk = data[start:end]
chunk_in = Path(tmpdir) / f"chunk_{i:04d}_in.wav"
chunk_out = Path(tmpdir) / f"chunk_{i:04d}_out.wav"
sf.write(str(chunk_in), chunk, sr)
self.log(f"VOICE: Segment {i+1}/{num_chunks} ...")
self.vc.voice_conversion_to_file(
source_wav=str(chunk_in), target_wav=speaker_wav,
file_path=str(chunk_out)
)
out_data, out_sr = sf.read(str(chunk_out))
results.append((out_data, out_sr))
# Segmente zusammenführen
self.log("VOICE: Füge Segmente zusammen ...")
target_sr = results[0][1]
merged = np.concatenate(
[r if sr == target_sr else r # ggf. Resampling hier einfügen
for r, sr in results], axis=0
)
sf.write(str(out_path), merged, target_sr)
return str(out_path)
class MainApp:
SUPPORTED = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}
VIDEO_SUPPORTED = {".mp4", ".avi", ".mov", ".mkv", ".wmv", ".webm", ".flv"}
# Alle Felder die gespeichert werden
_CONFIG_KEYS = [
"source", "input_dir", "output_dir",
"video_input_dir", "video_output_dir",
"voice_ref", "voice_source_audio", "voice_output", "voice_language",
"webcam_index", "webcam_output", "last_library_face",
"restoration", "webcam_record"
]
def __init__(self):
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
self.tk, self.ttk, self.fd, self.mb = tk, ttk, filedialog, messagebox
providers = _get_providers()
self.gpu = "CUDAExecutionProvider" in providers
self.swapper = FaceSwapper(log_fn=self._log)
self.voice = VoiceCloner(log_fn=self._log)
self.library = FaceLibrary(log_fn=self._log)
self.restorer = FaceRestorer(log_fn=self._log)
self.swapper.restorer = self.restorer
self._selected_library_slug = None
self._library_images = {}
self._webcam_thread = None
self._webcam_running = False
self._webcam_cancel = None
self._webcam_last_faces = 0
self._cfg = _load_config() # Gespeicherte Konfiguration laden
self._root_real = tk.Tk()
self._root_real.title("FaceSwap Batch Tool")
self._root_real.geometry("760x700")
self._root_real.resizable(True, True)
self._root_real.configure(bg="#0d0d12")
self._root_real.protocol("WM_DELETE_WINDOW", self._on_close)
canvas = tk.Canvas(self._root_real, bg="#0d0d12", highlightthickness=0)
scrollbar = tk.Scrollbar(self._root_real, orient="vertical", command=canvas.yview)
canvas.configure(yscrollcommand=scrollbar.set)
scrollbar.pack(side="right", fill="y")
canvas.pack(side="left", fill="both", expand=True)
self._scroll_frame = tk.Frame(canvas, bg="#0d0d12")
self._scroll_window = canvas.create_window((0, 0), window=self._scroll_frame, anchor="nw")
def _on_resize(event):
canvas.itemconfig(self._scroll_window, width=event.width)
canvas.bind("<Configure>", _on_resize)
def _on_frame_resize(event):
canvas.configure(scrollregion=canvas.bbox("all"))
self._scroll_frame.bind("<Configure>", _on_frame_resize)
def _on_mousewheel(event):
canvas.yview_scroll(int(-1 * (event.delta / 120)), "units")
canvas.bind_all("<MouseWheel>", _on_mousewheel)
self.root = self._scroll_frame
self._build()
self._root_real.mainloop()
def _build(self):
tk, ttk = self.tk, self.ttk
tk.Label(self.root, text="FaceSwap Batch", font=("Courier New", 22, "bold"),
bg="#0d0d12", fg="#e8d5b7").pack(pady=(20, 3))
tk.Label(self.root, text="Ersetze Gesichter + klone Stimmen lokal",
font=("Courier New", 9), bg="#0d0d12", fg="#7a7a9a").pack()
bc = "#142814" if self.gpu else "#281414"
bt = "GPU-Modus | CUDA aktiv" if self.gpu else "CPU-Modus"
bf = "#5aff5a" if self.gpu else "#ff7a5a"
tk.Label(self.root, text=bt, font=("Courier New", 9, "bold"),
bg=bc, fg=bf, padx=14, pady=5).pack(pady=(8, 0))
# Variablen anlegen und gespeicherte Werte laden
# These must be created BEFORE trace_add is set on _vars, because
# _save_now() references them and trace callbacks may fire during setup.
self._var_voice_mode = tk.StringVar(value=self._cfg.get("voice_mode", "text"))
self._var_enhance = tk.BooleanVar(value=self._cfg.get("enhance", True))
self._var_color = tk.BooleanVar(value=self._cfg.get("color", True))
self._var_restoration = tk.BooleanVar(value=bool(self._cfg.get("restoration", False)))
self._var_webcam_record = tk.BooleanVar(value=bool(self._cfg.get("webcam_record", False)))
self._vars = {}
for k in self._CONFIG_KEYS:
v = tk.StringVar(value=self._cfg.get(k, ""))
v.trace_add("write", lambda *_, key=k: self._on_var_change(key))
self._vars[k] = v
if self._vars["restoration"].get():
self._var_restoration.set(str(self._vars["restoration"].get()).strip().lower() in ("1", "true", "yes", "on"))
else:
self._vars["restoration"].set("1" if self._var_restoration.get() else "0")
if self._vars["webcam_record"].get():
self._var_webcam_record.set(str(self._vars["webcam_record"].get()).strip().lower() in ("1", "true", "yes", "on"))
else:
self._vars["webcam_record"].set("1" if self._var_webcam_record.get() else "0")
if not self._vars["webcam_index"].get():
self._vars["webcam_index"].set("0")
if not self._vars["webcam_output"].get():
self._vars["webcam_output"].set(str(SCRIPT_DIR / "webcam_recording.mp4"))
if not self._vars["voice_language"].get():
self._vars["voice_language"].set("de")
self._var_voice_mode.trace_add("write", lambda *_: self._save_now())
self._var_enhance.trace_add("write", lambda *_: self._save_now())
self._var_color.trace_add("write", lambda *_: self._save_now())
self._var_restoration.trace_add("write", lambda *_: self._save_now())
self._var_webcam_record.trace_add("write", lambda *_: self._save_now())
self._section("1 QUELLBILD - Gesicht, das eingefuegt wird")
self._row("source", self._pick_source)
self._build_library_panel()
self._prev_lbl = tk.Label(self.root, bg="#0d0d12")
self._prev_lbl.pack()
# Vorschaubild laden falls Quellbild gespeichert
if self._vars["source"].get():
self._load_preview(self._vars["source"].get())
self._refresh_library_grid()
style = ttk.Style(self.root)
style.theme_use("default")
style.configure("TNotebook", background="#0d0d12", borderwidth=0)
style.configure("TNotebook.Tab", background="#1a1a2c", foreground="#8a8aff",
font=("Courier New", 9, "bold"), padding=(14, 5))
style.map("TNotebook.Tab",
background=[("selected", "#0d0d12")],
foreground=[("selected", "#e8d5b7")])
nb = ttk.Notebook(self.root)
nb.pack(fill="x", padx=20, pady=(10, 0))
# Tab 1: Bilder
img_tab = tk.Frame(nb, bg="#0d0d12")
nb.add(img_tab, text="Bilder (Batch)")
tk.Label(img_tab, text="2 EINGABE-ORDNER - Bilder, die bearbeitet werden",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(10, 2))
self._row_in(img_tab, "input_dir", self._pick_indir)
tk.Label(img_tab, text="3 AUSGABE-ORDNER - Zielort fuer fertige Bilder",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(8, 2))
self._row_in(img_tab, "output_dir", self._pick_outdir)
# Tab 2: Video
vid_tab = tk.Frame(nb, bg="#0d0d12")
nb.add(vid_tab, text="Video (Batch)")
tk.Label(vid_tab, text="2 EINGABE-ORDNER - Videos, die bearbeitet werden",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(10, 2))
self._row_in(vid_tab, "video_input_dir", self._pick_video_indir)
tk.Label(vid_tab, text="3 AUSGABE-ORDNER - Zielort fuer fertige Videos (mp4)",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(8, 2))
self._row_in(vid_tab, "video_output_dir", self._pick_video_outdir)
ffmpeg_note = "ffmpeg gefunden - Audio wird beibehalten" if shutil.which("ffmpeg") \
else "Achtung: ffmpeg nicht gefunden - kein Audio im Ausgabevideo"
ffmpeg_col = "#5aff5a" if shutil.which("ffmpeg") else "#ffaa44"
tk.Label(vid_tab, text=ffmpeg_note, font=("Courier New", 8),
bg="#0d0d12", fg=ffmpeg_col).pack(anchor="w", padx=4, pady=(4, 0))
# Tab 3: Voice Cloning
voice_tab = tk.Frame(nb, bg="#0d0d12")
nb.add(voice_tab, text="Stimme klonen")
tk.Label(voice_tab, text="2 REFERENZ-STIMME - Audio mit Zielstimme",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(10, 2))
self._row_in(voice_tab, "voice_ref", self._pick_voice_ref)
tk.Label(voice_tab, text="3 MODUS",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(8, 2))
mode_row = tk.Frame(voice_tab, bg="#0d0d12")
mode_row.pack(fill="x")
tk.Radiobutton(mode_row, text="Text -> Stimme", value="text", variable=self._var_voice_mode,
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
selectcolor="#0d0d12", activebackground="#0d0d12").pack(side="left", padx=(0, 16))
tk.Radiobutton(mode_row, text="Audio -> Stimme", value="audio", variable=self._var_voice_mode,
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
selectcolor="#0d0d12", activebackground="#0d0d12").pack(side="left")
tk.Label(voice_tab, text="4 TEXT (nur fuer Text-Modus)",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(8, 2))
self._voice_text = tk.Text(voice_tab, height=4, bg="#14141e", fg="#d8d8f0",
font=("Courier New", 9), relief="flat", insertbackground="white")
self._voice_text.pack(fill="x")
# Gespeicherten Text wiederherstellen
saved_text = self._cfg.get("voice_text", "")
if saved_text:
self._voice_text.insert("1.0", saved_text)
self._voice_text.bind("<KeyRelease>", lambda e: self._save_now())
tk.Label(voice_tab, text="5 EINGABE-AUDIO (nur fuer Audio-Modus)",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(8, 2))
self._row_in(voice_tab, "voice_source_audio", self._pick_voice_source_audio)
tk.Label(voice_tab, text="6 SPRACHE (Text-Modus, z.B. de/en/fr)",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(8, 2))
lang_row = tk.Frame(voice_tab, bg="#14141e")
lang_row.pack(fill="x", pady=2)
self._voice_lang = ttk.Combobox(
lang_row, textvariable=self._vars["voice_language"],
values=list(VoiceCloner.SUPPORTED_LANGS), state="readonly", font=("Courier New", 9)
)
self._voice_lang.pack(side="left", padx=8, pady=6)
tk.Label(voice_tab, text="7 AUSGABE-AUDIO (.wav)",
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
anchor="w").pack(fill="x", pady=(8, 2))
self._row_in(voice_tab, "voice_output", self._pick_voice_out)
tk.Label(voice_tab, text="Hinweis: Beim ersten Lauf werden Sprachmodelle automatisch geladen.",
font=("Courier New", 8), bg="#0d0d12", fg="#7a7a9a").pack(anchor="w", padx=4, pady=(4, 0))
self._build_webcam_tab(nb)
self._nb = nb
# Qualitaets-Optionen
qf = tk.Frame(self.root, bg="#0d0d12")
qf.pack(fill="x", padx=20, pady=(10, 0))
tk.Label(qf, text="4 QUALITAETS-OPTIONEN", font=("Courier New", 9, "bold"),
bg="#0d0d12", fg="#c8a96a").pack(side="left")
tk.Checkbutton(qf, text="Schaerfen", variable=self._var_enhance,
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
selectcolor="#0d0d12", activebackground="#0d0d12",
command=self._update_quality).pack(side="left", padx=(20, 0))
tk.Checkbutton(qf, text="Farbanpassung", variable=self._var_color,
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
selectcolor="#0d0d12", activebackground="#0d0d12",
command=self._update_quality).pack(side="left", padx=(12, 0))
self._chk_restoration = tk.Checkbutton(
qf, text="Gesichtswiederherstellung (GFPGAN)", variable=self._var_restoration,
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
selectcolor="#0d0d12", activebackground="#0d0d12",
command=self._update_quality
)
self._chk_restoration.pack(side="left", padx=(12, 0))
if not self.restorer.is_available():
self._chk_restoration.configure(state="disabled", text="GFPGAN nicht installiert")
self._var_restoration.set(False)
style.configure("G.Horizontal.TProgressbar", troughcolor="#101020", background="#3adf6a", thickness=14)
self._pb = ttk.Progressbar(self.root, length=700, mode="determinate", style="G.Horizontal.TProgressbar")
self._pb.pack(padx=20, pady=(14, 4))
self._sv = self.tk.StringVar(value="Bereit.")
self.tk.Label(self.root, textvariable=self._sv, font=("Courier New", 9),
bg="#0d0d12", fg="#5a8a6a").pack()
lf = tk.Frame(self.root, bg="#0d0d12")
lf.pack(fill="both", expand=True, padx=20, pady=(8, 0))
self._lb = tk.Text(lf, height=7, bg="#060610", fg="#8aff8a", font=("Courier New", 9),
relief="flat", insertbackground="#8aff8a")
sb = tk.Scrollbar(lf, command=self._lb.yview)
self._lb.configure(yscrollcommand=sb.set)
self._lb.pack(side="left", fill="both", expand=True)
sb.pack(side="right", fill="y")
bf2 = tk.Frame(self.root, bg="#0d0d12")
bf2.pack(pady=14)
self._btn = tk.Button(bf2, text="STARTEN", font=("Courier New", 13, "bold"),
bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a",
relief="flat", cursor="hand2", padx=22, command=self._start)
self._btn.pack(side="left", padx=8)
tk.Button(bf2, text="Setup wiederholen", font=("Courier New", 9),
bg="#1a1a2c", fg="#8a8aff", activebackground="#252540",
relief="flat", cursor="hand2", command=self._redo_setup).pack(side="left", padx=8)
self._update_quality()
def _on_var_change(self, key):
"""Wird aufgerufen wenn sich ein Pfad-Feld aendert -> sofort speichern."""
self._save_now()
def _save_now(self):
"""Aktuelle Einstellungen in config.json speichern."""
if not hasattr(self, "_vars"):
return
if "restoration" not in self._vars or "webcam_record" not in self._vars:
return
restoration_val = "1" if self._var_restoration.get() else "0"
webcam_record_val = "1" if self._var_webcam_record.get() else "0"
if self._vars["restoration"].get() != restoration_val:
self._vars["restoration"].set(restoration_val)
if self._vars["webcam_record"].get() != webcam_record_val:
self._vars["webcam_record"].set(webcam_record_val)
data = {k: self._vars[k].get() for k in self._CONFIG_KEYS}
data["voice_mode"] = self._var_voice_mode.get()
data["enhance"] = self._var_enhance.get()
data["color"] = self._var_color.get()
data["restoration"] = self._var_restoration.get()
data["webcam_record"] = self._var_webcam_record.get()
try:
data["voice_text"] = self._voice_text.get("1.0", "end-1c")
except Exception:
pass
_save_config(data)
def _load_preview(self, path):
try:
from PIL import Image, ImageTk
img = Image.open(path).convert("RGB")
img.thumbnail((110, 110))
self._tkimg = ImageTk.PhotoImage(img)
self._prev_lbl.configure(image=self._tkimg)
except Exception:
pass
def _section(self, txt):
self.tk.Label(self.root, text=txt, font=("Courier New", 9, "bold"),
bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", padx=20, pady=(12, 2))
def _row(self, key, cmd):
f = self.tk.Frame(self.root, bg="#14141e")
f.pack(fill="x", padx=20, pady=2)
self.tk.Entry(f, textvariable=self._vars[key], font=("Courier New", 9),
bg="#14141e", fg="#d8d8f0", relief="flat",
insertbackground="white").pack(side="left", padx=8, pady=6, fill="x", expand=True)
self.tk.Button(f, text="...", bg="#22223c", fg="#d0d0e0", relief="flat",
cursor="hand2", command=cmd).pack(side="right", padx=4)
def _row_in(self, parent, key, cmd):
f = self.tk.Frame(parent, bg="#14141e")
f.pack(fill="x", pady=2)
self.tk.Entry(f, textvariable=self._vars[key], font=("Courier New", 9),
bg="#14141e", fg="#d8d8f0", relief="flat",
insertbackground="white").pack(side="left", padx=8, pady=6, fill="x", expand=True)
self.tk.Button(f, text="...", bg="#22223c", fg="#d0d0e0", relief="flat",
cursor="hand2", command=cmd).pack(side="right", padx=4)
def _build_library_panel(self):
tk = self.tk
wrap = tk.Frame(self.root, bg="#0d0d12")
wrap.pack(fill="x", padx=20, pady=(6, 0))
self._library_open = False
self._btn_library_toggle = tk.Button(
wrap, text="Bibliothek oeffnen", font=("Courier New", 9),
bg="#1a1a2c", fg="#8a8aff", activebackground="#252540",
relief="flat", cursor="hand2", command=self._toggle_library_panel
)
self._btn_library_toggle.pack(anchor="w", pady=(0, 4))
self._library_panel = tk.Frame(wrap, bg="#11111a", bd=1, relief="flat")
top = tk.Frame(self._library_panel, bg="#11111a")
top.pack(fill="x", padx=8, pady=(8, 4))
tk.Label(top, text="Name:", font=("Courier New", 9), bg="#11111a", fg="#d8d8f0").pack(side="left")
self._var_library_name = tk.StringVar(value="")
tk.Entry(top, textvariable=self._var_library_name, font=("Courier New", 9),
bg="#14141e", fg="#d8d8f0", relief="flat",
insertbackground="white", width=18).pack(side="left", padx=8)
tk.Button(top, text="+ Hinzufuegen", font=("Courier New", 9),
bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a",
relief="flat", cursor="hand2", command=self._library_add_from_file).pack(side="left", padx=(0, 6))
tk.Button(top, text="Als Quelle verwenden", font=("Courier New", 9),
bg="#22223c", fg="#d0d0e0", activebackground="#2e2e4e",
relief="flat", cursor="hand2", command=self._library_use_selected).pack(side="left", padx=(8, 6))
tk.Button(top, text="Loeschen", font=("Courier New", 9),
bg="#3a1a1a", fg="#ffb0b0", activebackground="#4a2222",
relief="flat", cursor="hand2", command=self._library_delete_selected).pack(side="left")
self._library_grid = tk.Frame(self._library_panel, bg="#11111a")
self._library_grid.pack(fill="x", padx=8, pady=(4, 8))
self._library_placeholder = tk.Label(
self._library_grid, text="Noch keine Gesichter gespeichert.",
font=("Courier New", 9), bg="#11111a", fg="#7a7a9a"
)
self._library_placeholder.grid(row=0, column=0, sticky="w")
def _toggle_library_panel(self):
self._library_open = not self._library_open
if self._library_open:
self._library_panel.pack(fill="x")
self._btn_library_toggle.configure(text="Bibliothek schliessen")
self._refresh_library_grid()
else:
self._library_panel.pack_forget()
self._btn_library_toggle.configure(text="Bibliothek oeffnen")
def _refresh_library_grid(self):
if not hasattr(self, "_library_grid"):
return
for child in list(self._library_grid.winfo_children()):
child.destroy()
self._library_images = {}
entries = self.library.list_entries()
if not entries:
self._library_placeholder = self.tk.Label(
self._library_grid, text="Noch keine Gesichter gespeichert.",
font=("Courier New", 9), bg="#11111a", fg="#7a7a9a"
)
self._library_placeholder.grid(row=0, column=0, sticky="w")
return
from PIL import Image, ImageTk
last_slug = (self._vars.get("last_library_face").get().strip()
if "last_library_face" in self._vars else "")
if self._selected_library_slug is None and last_slug:
self._selected_library_slug = last_slug
for i, entry in enumerate(entries):
col = i % 4
row = i // 4
cell = self.tk.Frame(self._library_grid, bg="#11111a", bd=0)
cell.grid(row=row, column=col, padx=6, pady=6, sticky="n")
thumb_path = Path(entry["thumb_path"])
if thumb_path.exists():
img = Image.open(thumb_path).convert("RGB")
else:
img = Image.new("RGB", (96, 96), "#1a1a2c")
photo = ImageTk.PhotoImage(img)
self._library_images[entry["slug"]] = photo
btn = self.tk.Button(
cell, image=photo, relief="solid", bd=3,
highlightthickness=0, bg="#11111a", activebackground="#1a1a2c",
command=lambda slug=entry["slug"]: self._select_library_entry(slug)
)
btn.pack()
self.tk.Label(cell, text=entry["name"], font=("Courier New", 8),
bg="#11111a", fg="#d8d8f0").pack(pady=(2, 0))
cell._slug = entry["slug"]
cell._btn = btn
self._select_library_entry(self._selected_library_slug, save=False)
def _select_library_entry(self, slug, save=True):
if slug:
self._selected_library_slug = slug
if not hasattr(self, "_library_grid"):
return
selected = self._selected_library_slug
for cell in self._library_grid.winfo_children():
b = getattr(cell, "_btn", None)
s = getattr(cell, "_slug", None)
if b is None:
continue
if s == selected:
b.configure(bg="#28442a")
else:
b.configure(bg="#11111a")
if save and selected:
self._vars["last_library_face"].set(selected)
self._save_now()
def _library_add_from_file(self):
p = self.fd.askopenfilename(
title="Gesicht fuer Bibliothek waehlen",
filetypes=[("Bilder", "*.jpg *.jpeg *.png *.bmp *.webp"), ("Alle", "*.*")]
)
if not p:
return
name = self._var_library_name.get().strip() or Path(p).stem
try:
entry = self.library.add(name, p)
self._selected_library_slug = entry["slug"]
self._vars["last_library_face"].set(entry["slug"])
self._refresh_library_grid()
self.mb.showinfo("Bibliothek", f"Gesicht gespeichert: {entry['name']}")
except Exception as e:
self.mb.showerror("Bibliothek", str(e))
def _library_use_selected(self):
slug = self._selected_library_slug
if not slug:
return self.mb.showerror("Bibliothek", "Bitte zuerst ein Gesicht auswaehlen.")
try:
src = self.library.get_source_path(slug)
self._vars["source"].set(str(src))
self._load_preview(str(src))
self._save_now()
except Exception as e:
self.mb.showerror("Bibliothek", str(e))
def _library_delete_selected(self):
slug = self._selected_library_slug
if not slug:
return self.mb.showerror("Bibliothek", "Bitte zuerst ein Gesicht auswaehlen.")
if not self.mb.askyesno("Bibliothek", f"Eintrag '{slug}' wirklich loeschen?"):
return
try:
self.library.remove(slug)
self._selected_library_slug = None
self._vars["last_library_face"].set("")
self._refresh_library_grid()
except Exception as e:
self.mb.showerror("Bibliothek", str(e))
def _build_webcam_tab(self, notebook):
tk, ttk = self.tk, self.ttk
tab = tk.Frame(notebook, bg="#0d0d12")
notebook.add(tab, text="Webcam")
row1 = tk.Frame(tab, bg="#0d0d12")
row1.pack(fill="x", pady=(10, 2))
tk.Label(row1, text="Kamera-Index", font=("Courier New", 9, "bold"),
bg="#0d0d12", fg="#c8a96a").pack(side="left", padx=(0, 8))
self._webcam_index_box = ttk.Combobox(
row1, textvariable=self._vars["webcam_index"],
values=[str(i) for i in range(5)], state="readonly", width=6, font=("Courier New", 9)
)
self._webcam_index_box.pack(side="left")
self._webcam_resolution_label = tk.Label(
row1, text="Aufloesung: -", font=("Courier New", 8), bg="#0d0d12", fg="#7a7a9a"
)
self._webcam_resolution_label.pack(side="left", padx=(12, 0))
row2 = tk.Frame(tab, bg="#0d0d12")
row2.pack(fill="x", pady=(8, 2))
tk.Checkbutton(row2, text="Aufnahme aktiv", variable=self._var_webcam_record,
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
selectcolor="#0d0d12", activebackground="#0d0d12",
command=self._save_now).pack(side="left")
self._row_in(tab, "webcam_output", self._pick_webcam_out)
self._webcam_preview_target = (640, 360)
self._webcam_preview_collapsed_h = 96
self._webcam_preview_box = tk.Frame(tab, bg="#05050c", height=self._webcam_preview_collapsed_h)
self._webcam_preview_box.pack(fill="x", padx=4, pady=(8, 6))
self._webcam_preview_box.pack_propagate(False)
self._webcam_preview = tk.Label(
self._webcam_preview_box, bg="#05050c", fg="#7a7a9a",
text="Webcam Vorschau (Starten fuer Live-Preview)"
)
self._webcam_preview.pack(fill="both", expand=True)
self._webcam_stats = tk.Label(
tab, text="FPS: - | Gesichter: -", font=("Courier New", 9),
bg="#0d0d12", fg="#8a8aff"
)
self._webcam_stats.pack(anchor="w", padx=4, pady=(0, 8))
self._btn_webcam = tk.Button(
tab, text="Starten", font=("Courier New", 10, "bold"),
bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a",
relief="flat", cursor="hand2", command=self._toggle_webcam
)
self._btn_webcam.pack(anchor="w", padx=4, pady=(0, 10))
self._set_webcam_preview_collapsed(True)
def _toggle_webcam(self):
if self._webcam_running:
self._stop_webcam()
else:
self._start_webcam()
def _pick_webcam_out(self):
p = self.fd.asksaveasfilename(
title="Webcam-Aufnahme speichern",
defaultextension=".mp4",
filetypes=[("MP4 Video", "*.mp4"), ("Alle", "*.*")]
)
if p:
self._vars["webcam_output"].set(p)
def _set_webcam_preview_collapsed(self, collapsed):
if not hasattr(self, "_webcam_preview_box"):
return
if collapsed:
self._webcam_preview_box.configure(height=self._webcam_preview_collapsed_h)
self._webcam_preview.configure(image="", text="Webcam Vorschau (Starten fuer Live-Preview)")
self._webcam_preview.image = None
else:
self._webcam_preview_box.configure(height=int(self._webcam_preview_target[1]))
def _start_webcam(self):
import cv2
import threading
src = self._vars["source"].get().strip()
if not src or not Path(src).is_file():
return self.mb.showerror("Webcam", "Bitte zuerst ein gueltiges Quellbild waehlen.")
try:
cam_idx = int(self._vars["webcam_index"].get().strip() or "0")
except Exception:
return self.mb.showerror("Webcam", "Ungueltiger Kamera-Index.")
probe = cv2.VideoCapture(cam_idx)
if not probe.isOpened():
probe.release()
tried = []
for i in range(5):
c = cv2.VideoCapture(i)
ok = c.isOpened()
c.release()
if ok:
tried.append(i)
return self.mb.showerror(
"Webcam",
f"Kamera konnte nicht geoeffnet werden (Index {cam_idx}).\n"
f"Verfuegbare Indizes: {tried if tried else 'keine'}"
)
w = int(probe.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
h = int(probe.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
probe.release()
self._webcam_resolution_label.configure(text=f"Aufloesung: {w}x{h}")
self._update_quality()
try:
self.swapper.init_models()
except Exception as e:
return self.mb.showerror("Webcam", f"Modellfehler: {e}")
src_img = _cv2_imread_unicode(src)
if src_img is None:
return self.mb.showerror("Webcam", "Quellbild konnte nicht geladen werden.")
src_face = self.swapper.get_first_face(src_img)
if src_face is None:
return self.mb.showerror("Webcam", "Kein Gesicht im Quellbild gefunden!")
self._webcam_cancel = threading.Event()
self._webcam_running = True
self._btn_webcam.configure(text="Stoppen", bg="#3a1a1a", fg="#ffb0b0", activebackground="#4a2222")
self._webcam_stats.configure(text="FPS: - | Gesichter: -")
self._set_webcam_preview_collapsed(False)
record_path = self._vars["webcam_output"].get().strip() if self._var_webcam_record.get() else None
self._webcam_thread = threading.Thread(
target=self._webcam_worker,
args=(src_face, cam_idx, record_path),
daemon=True,
)
self._webcam_thread.start()
def _stop_webcam(self):
if self._webcam_cancel is not None:
self._webcam_cancel.set()
th = self._webcam_thread
if th is not None and th.is_alive():
th.join(timeout=1.5)
self._webcam_running = False
self._webcam_thread = None
self._webcam_cancel = None
if hasattr(self, "_btn_webcam"):
self._btn_webcam.configure(text="Starten", bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a")
if hasattr(self, "_webcam_preview"):
self._set_webcam_preview_collapsed(True)
def _webcam_worker(self, src_face, cam_idx, record_path):
import cv2
from PIL import Image
def on_frame(frame):
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = Image.fromarray(rgb)
target_w, target_h = self._webcam_preview_target
src_w, src_h = img.size
scale = min(target_w / max(1, src_w), target_h / max(1, src_h))
new_w = max(1, int(round(src_w * scale)))
new_h = max(1, int(round(src_h * scale)))
resampling = getattr(Image, "Resampling", Image).BILINEAR
fitted = img.resize((new_w, new_h), resampling)
canvas = Image.new("RGB", (target_w, target_h), (5, 5, 12))
off_x = (target_w - new_w) // 2
off_y = (target_h - new_h) // 2
canvas.paste(fitted, (off_x, off_y))
self._root_real.after(1, self._apply_webcam_frame, canvas)
def on_stats(fps, faces):
self._webcam_last_faces = int(faces)
self._root_real.after(1, lambda: self._webcam_stats.configure(text=f"FPS: {fps:.1f} | Gesichter: {int(faces)}"))
try:
self.swapper.swap_webcam(
src_face=src_face,
camera_index=cam_idx,
record_path=record_path,
fps_target=25.0,
cancel_check=(self._webcam_cancel.is_set if self._webcam_cancel is not None else None),
frame_cb=on_frame,
stats_cb=on_stats,
use_tracking=False,
)
except Exception as e:
self._root_real.after(1, lambda: self.mb.showerror("Webcam", str(e)))
finally:
self._root_real.after(1, self._stop_webcam)
def _apply_webcam_frame(self, pil_img):
from PIL import ImageTk
if not hasattr(self, "_webcam_preview"):
return
photo = ImageTk.PhotoImage(pil_img)
self._webcam_preview.configure(image=photo, text="")
self._webcam_preview.image = photo
def _pick_source(self):
p = self.fd.askopenfilename(title="Quellbild waehlen",
filetypes=[("Bilder", "*.jpg *.jpeg *.png *.bmp *.webp"), ("Alle", "*.*")])
if not p: return
self._vars["source"].set(p)
self._load_preview(p)
def _pick_indir(self):
p = self.fd.askdirectory(title="Eingabe-Ordner waehlen")
if p: self._vars["input_dir"].set(p)
def _pick_outdir(self):
p = self.fd.askdirectory(title="Ausgabe-Ordner waehlen")
if p: self._vars["output_dir"].set(p)
def _pick_video_indir(self):
p = self.fd.askdirectory(title="Video-Eingabe-Ordner waehlen")
if p:
self._vars["video_input_dir"].set(p)
if not self._vars["video_output_dir"].get():
self._vars["video_output_dir"].set(str(Path(p) / "output_videos"))
def _pick_video_outdir(self):
p = self.fd.askdirectory(title="Video-Ausgabe-Ordner waehlen")
if p: self._vars["video_output_dir"].set(p)
def _pick_voice_ref(self):
p = self.fd.askopenfilename(
title="Referenz-Stimme waehlen",
filetypes=[("Audio", "*.wav *.mp3 *.m4a *.flac *.ogg *.aac *.wma"), ("Alle", "*.*")]
)
if p:
self._vars["voice_ref"].set(p)
if not self._vars["voice_output"].get():
base = Path(p).with_suffix("").name
self._vars["voice_output"].set(str(Path(p).parent / f"{base}_cloned.wav"))
def _pick_voice_source_audio(self):
p = self.fd.askopenfilename(
title="Eingabe-Audio waehlen",
filetypes=[("Audio", "*.wav *.mp3 *.m4a *.flac *.ogg *.aac *.wma"), ("Alle", "*.*")]
)
if p:
self._vars["voice_source_audio"].set(p)
if not self._vars["voice_output"].get():
base = Path(p).with_suffix("").name
self._vars["voice_output"].set(str(Path(p).parent / f"{base}_voiceclone.wav"))
def _pick_voice_out(self):
p = self.fd.asksaveasfilename(
title="Ausgabe-Audio speichern",
defaultextension=".wav",
filetypes=[("WAV-Audio", "*.wav"), ("Alle", "*.*")]
)
if p: self._vars["voice_output"].set(p)
def _log(self, msg):
self._lb.insert("end", msg + "\n")
self._lb.see("end")
self._root_real.update_idletasks()
def _update_quality(self):
self.swapper.enhance = self._var_enhance.get()
self.swapper.color = self._var_color.get()
allow_restoration = self._var_restoration.get() and self.restorer.is_available()
self.swapper.use_restoration = bool(allow_restoration)
self._save_now()
def _redo_setup(self):
if self._webcam_running:
self._stop_webcam()
SETUP_FLAG.unlink(missing_ok=True)
self._root_real.destroy()
_show_setup_window()
importlib.invalidate_caches()
MainApp()
def _on_close(self):
if self._webcam_running:
self._stop_webcam()
self._root_real.destroy()
def _start(self):
self._btn.configure(state="disabled")
import threading
tab_idx = self._nb.index(self._nb.select())
if tab_idx == 3:
self._btn.configure(state="normal")
self._toggle_webcam()
return
if tab_idx == 1:
target = self._run_video
elif tab_idx == 2:
target = self._run_voice
else:
target = self._run
threading.Thread(target=target, daemon=True).start()
def _run(self):
import cv2
self._update_quality()
src = self._vars["source"].get().strip()
indir = self._vars["input_dir"].get().strip()
outdir = self._vars["output_dir"].get().strip()
def err(t, m):
self.mb.showerror(t, m)
self._btn.configure(state="normal")
if not all([src, indir, outdir]): return err("Fehler", "Bitte alle drei Felder ausfuellen.")
if not Path(src).is_file(): return err("Fehler", f"Quellbild nicht gefunden:\n{src}")
if not Path(indir).is_dir(): return err("Fehler", f"Eingabe-Ordner existiert nicht:\n{indir}")
Path(outdir).mkdir(parents=True, exist_ok=True)
try:
self.swapper.init_models()
except Exception as e:
return err("Modellfehler", str(e))
src_img = _cv2_imread_unicode(src)
if src_img is None: return err("Fehler", "Quellbild konnte nicht geladen werden.")
src_face = self.swapper.get_first_face(src_img)
if src_face is None: return err("Fehler", "Kein Gesicht im Quellbild gefunden!")
self._log(f"OK Quellgesicht erkannt: {Path(src).name}")
images = sorted(p for p in Path(indir).iterdir()
if p.suffix.lower() in self.SUPPORTED and p.is_file())
if not images:
self.mb.showinfo("Keine Bilder", "Keine unterstuetzten Bilder im Eingabe-Ordner.")
self._btn.configure(state="normal")
return
self._log(f"{len(images)} Bild(er) gefunden ...\n")
self._pb["maximum"] = len(images)
self._pb["value"] = 0
ok = 0; failed = 0
failed_dir = Path(outdir) / "failed"
for i, imgp in enumerate(images, 1):
outp = Path(outdir) / imgp.name
self._log(f"[{i}/{len(images)}] {imgp.name}")
self._sv.set(f"Verarbeite {imgp.name} ({i}/{len(images)}) ...")
swapped = False
try:
swapped = self.swapper.swap_image(src_face, imgp, outp)
except Exception as e:
self._log(f" FEHLER: {e}")
if swapped:
ok += 1
self._log(" OK gespeichert")
else:
failed_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(imgp, failed_dir / imgp.name)
failed += 1
self._log(f" WARNUNG -> failed/{imgp.name} (kein Gesicht erkannt)")
self._pb["value"] = i
summary = f"{ok} erfolgreich"
if failed: summary += f" | {failed} fehlgeschlagen -> Ordner: failed/"
self._sv.set(summary)
self._log(f"\nFertig: {ok}/{len(images)} Bilder bearbeitet.")
if failed: self._log(f"WARNUNG: {failed} Bild(er) ohne Gesicht -> {failed_dir}")
self.mb.showinfo("Fertig",
f"{ok} von {len(images)} erfolgreich.\n"
+ (f"{failed} ohne Gesicht -> Ordner 'failed'\n" if failed else "")
+ f"\nAusgabe:\n{outdir}")
self._btn.configure(state="normal")
def _run_video(self):
import cv2
self._update_quality()
src = self._vars["source"].get().strip()
video_indir = self._vars["video_input_dir"].get().strip()
video_outdir = self._vars["video_output_dir"].get().strip()
def err(t, m):
self.mb.showerror(t, m)
self._btn.configure(state="normal")
if not all([src, video_indir, video_outdir]):
return err("Fehler", "Bitte Quellbild, Video-Eingabe-Ordner und Video-Ausgabe-Ordner angeben.")
if not Path(src).is_file(): return err("Fehler", f"Quellbild nicht gefunden:\n{src}")
if not Path(video_indir).is_dir(): return err("Fehler", f"Video-Eingabe-Ordner existiert nicht:\n{video_indir}")
Path(video_outdir).mkdir(parents=True, exist_ok=True)
videos = sorted(p for p in Path(video_indir).iterdir()
if p.is_file() and p.suffix.lower() in self.VIDEO_SUPPORTED)
if not videos:
self.mb.showinfo("Keine Videos", "Keine unterstuetzten Videos im Eingabe-Ordner.")
self._btn.configure(state="normal")
return
try:
self.swapper.init_models()
except Exception as e:
return err("Modellfehler", str(e))
src_img = _cv2_imread_unicode(src)
if src_img is None: return err("Fehler", "Quellbild konnte nicht geladen werden.")
src_face = self.swapper.get_first_face(src_img)
if src_face is None: return err("Fehler", "Kein Gesicht im Quellbild gefunden!")
self._log(f"OK Quellgesicht erkannt: {Path(src).name}")
self._log(f"{len(videos)} Video(s) im Eingabe-Ordner gefunden.")
total_frames = 0
for vp in videos:
cap, cap_tmp_copy = _open_videocapture_unicode(vp, log_fn=self._log)
if cap.isOpened():
total_frames += max(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 1)
else:
total_frames += 1
cap.release()
_cleanup_temp_file(cap_tmp_copy)
self._pb["maximum"] = max(total_frames, 1)
self._pb["value"] = 0
done_total = 0; ok = 0; failed = 0; no_face = 0
failed_dir = Path(video_outdir) / "failed"
for i, vp in enumerate(videos, 1):
outp = Path(video_outdir) / f"{vp.stem}_faceswap.mp4"
self._log(f"[{i}/{len(videos)}] {vp.name}")
self._sv.set(f"Verarbeite Video {i}/{len(videos)}: {vp.name}")
offset = done_total
max_total = int(float(self._pb["maximum"]))
def on_progress(done, total, name=vp.name, base=offset):
combined = base + done
self._pb["value"] = min(combined, max_total)
self._sv.set(f"{name}: Frame {done}/{max(total, 1)} | Gesamt {combined}/{max_total}")
self._root_real.update_idletasks()
try:
stats = self.swapper.swap_video(src_face, vp, outp, progress_cb=on_progress)
done_total += int(stats.get("frames_processed", 0))
swapped_frames = int(stats.get("frames_swapped", 0))
swapped_faces = int(stats.get("faces_swapped", 0))
ok += 1
if swapped_frames == 0: no_face += 1
self._log(f" OK gespeichert: {outp.name} (Frames mit Face: {swapped_frames}, getauschte Gesichter: {swapped_faces})")
except Exception as e:
failed += 1
failed_dir.mkdir(parents=True, exist_ok=True)
try: shutil.copy2(vp, failed_dir / vp.name)
except Exception: pass
self._log(f" FEHLER: {e}")
self._log(f" WARNUNG: Original kopiert nach failed/{vp.name}")
summary = f"{ok} Video(s) verarbeitet"
if no_face: summary += f" | {no_face} ohne erkannten Face-Frame"
if failed: summary += f" | {failed} fehlgeschlagen"
self._sv.set(summary)
self._pb["value"] = self._pb["maximum"]
self._log(f"\nVideo-Batch fertig: {ok}/{len(videos)} verarbeitet.")
if no_face: self._log(f"WARNUNG: {no_face} Video(s) hatten keinen erkannten Face-Frame.")
if failed: self._log(f"FEHLER: {failed} Video(s) fehlgeschlagen -> {failed_dir}")
self.mb.showinfo("Fertig",
f"{ok} von {len(videos)} Video(s) verarbeitet.\n"
+ (f"{no_face} ohne erkannten Face-Frame.\n" if no_face else "")
+ (f"{failed} fehlgeschlagen -> Ordner 'failed'\n" if failed else "")
+ f"\nAusgabe:\n{video_outdir}")
self._btn.configure(state="normal")
def _run_voice(self):
mode = self._var_voice_mode.get().strip().lower()
ref = self._vars["voice_ref"].get().strip()
out_file = self._vars["voice_output"].get().strip()
lang = self._vars["voice_language"].get().strip().lower() or "de"
text = self._voice_text.get("1.0", "end").strip()
source_audio = self._vars["voice_source_audio"].get().strip()
def err(t, m):
self.mb.showerror(t, m)
self._btn.configure(state="normal")
if not ref: return err("Fehler", "Bitte eine Referenz-Stimme waehlen.")
if not out_file: return err("Fehler", "Bitte eine Ausgabe-Audio-Datei waehlen.")
self._pb["maximum"] = 100
self._pb["value"] = 5
self._sv.set("Starte Voice-Cloning ...")
self._log("VOICE: Starte Verarbeitung ...")
try:
if mode == "audio":
if not source_audio: return err("Fehler", "Bitte Eingabe-Audio waehlen (Audio-Modus).")
self._sv.set("VOICE: Lade Model und konvertiere Audio ...")
self._pb["value"] = 35
result = self.voice.clone_from_audio(ref, source_audio, out_file)
self._pb["value"] = 100
self._sv.set("OK Voice-Cloning abgeschlossen (Audio-Modus)")
self._log(f"VOICE: Fertig (Audio-Modus) -> {result}")
self.mb.showinfo("Fertig", f"Voice-Cloning fertig.\n\nAusgabe:\n{result}")
else:
if not text: return err("Fehler", "Bitte Text eingeben (Text-Modus).")
self._sv.set("VOICE: Lade XTTS und generiere Sprache ...")
self._pb["value"] = 35
result = self.voice.clone_from_text(ref, text, lang, out_file)
self._pb["value"] = 100
self._sv.set("OK Voice-Cloning abgeschlossen (Text-Modus)")
self._log(f"VOICE: Fertig (Text-Modus, Sprache={lang}) -> {result}")
self.mb.showinfo("Fertig", f"Voice-Cloning fertig.\n\nAusgabe:\n{result}")
except Exception as e:
self._log(f"VOICE: Fehler: {e}")
err("Voice-Cloning Fehler", str(e))
finally:
self._btn.configure(state="normal")
def main():
_require_python_version()
if not SETUP_FLAG.exists():
_show_setup_window()
importlib.invalidate_caches()
MainApp()
if __name__ == "__main__":
main()