2834 lines
118 KiB
Python
2834 lines
118 KiB
Python
#!/usr/bin/env python3
|
|
"""FaceSwap Batch Tool v5 - Mit Face Enhancer & Farbanpassung. Fuehre setup.bat aus bevor du dieses Skript startest."""
|
|
|
|
import os, sys, subprocess, importlib, platform, urllib.request, shutil, zipfile, json, sysconfig
|
|
from pathlib import Path
|
|
|
|
SCRIPT_DIR = Path(__file__).parent.resolve()
|
|
MODELS_DIR = SCRIPT_DIR / "models"
|
|
SETUP_FLAG = SCRIPT_DIR / ".setup_done"
|
|
CONFIG_FILE = SCRIPT_DIR / "config.json"
|
|
IS_WINDOWS = platform.system() == "Windows"
|
|
_CUDA_PRELOAD_TRIED = False
|
|
_PROVIDERS_CACHE = None
|
|
_DLL_DIR_HANDLES = []
|
|
REQUIRED_PYTHON = (3, 12)
|
|
|
|
|
|
def _require_python_version():
|
|
if sys.version_info[:2] == REQUIRED_PYTHON:
|
|
return
|
|
required = f"{REQUIRED_PYTHON[0]}.{REQUIRED_PYTHON[1]}"
|
|
msg = (
|
|
"Falsche Python-Version erkannt.\n\n"
|
|
f"Benoetigt: Python {required}\n"
|
|
f"Aktuell: Python {sys.version.split()[0]}\n"
|
|
f"Interpreter: {sys.executable}\n\n"
|
|
f"Bitte so starten:\n py -{required} {Path(__file__).name}"
|
|
)
|
|
try:
|
|
import tkinter as tk
|
|
from tkinter import messagebox
|
|
root = tk.Tk()
|
|
root.withdraw()
|
|
messagebox.showerror("FaceSwap - Python Version", msg, parent=root)
|
|
root.destroy()
|
|
except Exception:
|
|
pass
|
|
raise RuntimeError(msg)
|
|
|
|
|
|
def _load_config():
|
|
try:
|
|
if CONFIG_FILE.exists():
|
|
return json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
def _save_config(data):
|
|
try:
|
|
CONFIG_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _run_ok(*cmd):
|
|
try:
|
|
out = subprocess.check_output(list(cmd), stderr=subprocess.DEVNULL, timeout=8).decode()
|
|
return True, out
|
|
except Exception:
|
|
return False, ""
|
|
|
|
def _cuda_available():
|
|
ok, out = _run_ok("nvidia-smi")
|
|
if ok:
|
|
import re
|
|
m = re.search(r"CUDA Version:\s*([\d.]+)", out)
|
|
return True, (m.group(1) if m else "unknown")
|
|
return False, ""
|
|
|
|
|
|
def _ensure_torchvision_functional_tensor_alias(log_fn=None):
|
|
try:
|
|
import torchvision.transforms.functional_tensor # noqa: F401
|
|
return True
|
|
except Exception:
|
|
pass
|
|
try:
|
|
import sys
|
|
import types
|
|
from torchvision.transforms import functional as tvf
|
|
if not hasattr(tvf, "rgb_to_grayscale"):
|
|
return False
|
|
alias_mod = types.ModuleType("torchvision.transforms.functional_tensor")
|
|
alias_mod.rgb_to_grayscale = tvf.rgb_to_grayscale
|
|
sys.modules["torchvision.transforms.functional_tensor"] = alias_mod
|
|
if log_fn:
|
|
log_fn(" Hinweis: torchvision-Kompatibilitaetsmodus fuer GFPGAN aktiv.")
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _preload_onnxruntime_cuda(log_fn=None):
|
|
global _CUDA_PRELOAD_TRIED
|
|
if _CUDA_PRELOAD_TRIED or not IS_WINDOWS:
|
|
return
|
|
_CUDA_PRELOAD_TRIED = True
|
|
try:
|
|
import site
|
|
import onnxruntime as ort
|
|
dll_dirs = []
|
|
seen = set()
|
|
|
|
def _push_dir(path_obj):
|
|
try:
|
|
p = str(Path(path_obj).resolve())
|
|
except Exception:
|
|
return
|
|
key = p.lower()
|
|
if key in seen:
|
|
return
|
|
seen.add(key)
|
|
if Path(p).is_dir():
|
|
dll_dirs.append(p)
|
|
|
|
# onnxruntime eigene DLLs
|
|
ort_pkg = Path(ort.__file__).resolve().parent
|
|
_push_dir(ort_pkg / "capi")
|
|
|
|
# NVIDIA Runtime-DLLs aus allen relevanten site-packages
|
|
site_roots = []
|
|
try:
|
|
site_roots.extend(site.getsitepackages())
|
|
except Exception:
|
|
pass
|
|
try:
|
|
site_roots.append(site.getusersitepackages())
|
|
except Exception:
|
|
pass
|
|
# Interpreter-spezifisches site-packages ebenfalls absichern
|
|
site_roots.append(sysconfig.get_paths().get("purelib", ""))
|
|
|
|
for root in [Path(r) for r in site_roots if r]:
|
|
_push_dir(root / "nvidia" / "cublas" / "bin")
|
|
_push_dir(root / "nvidia" / "cuda_runtime" / "bin")
|
|
_push_dir(root / "nvidia" / "cuda_nvrtc" / "bin")
|
|
_push_dir(root / "nvidia" / "cudnn" / "bin")
|
|
_push_dir(root / "nvidia" / "cufft" / "bin")
|
|
_push_dir(root / "nvidia" / "nvjitlink" / "bin")
|
|
|
|
# DLL-Suchpfad fuer spaeter dynamisch geladene cuDNN-Teillibs erweitern
|
|
path_parts = os.environ.get("PATH", "").split(os.pathsep)
|
|
path_keys = {p.lower() for p in path_parts}
|
|
for d in dll_dirs:
|
|
if d.lower() not in path_keys:
|
|
path_parts.insert(0, d)
|
|
path_keys.add(d.lower())
|
|
try:
|
|
if hasattr(os, "add_dll_directory"):
|
|
_DLL_DIR_HANDLES.append(os.add_dll_directory(d))
|
|
except Exception:
|
|
pass
|
|
os.environ["PATH"] = os.pathsep.join(path_parts)
|
|
|
|
preload = getattr(ort, "preload_dlls", None)
|
|
if callable(preload):
|
|
preload(directory="")
|
|
if log_fn:
|
|
log_fn(f" ONNX Runtime CUDA-DLLs vorgeladen ({len(dll_dirs)} DLL-Ordner).")
|
|
except Exception as e:
|
|
if log_fn:
|
|
log_fn(f" Hinweis: CUDA-DLL-Preload fehlgeschlagen ({e}).")
|
|
|
|
def _dl(url, dest, log=print):
|
|
def hook(count, block, total):
|
|
if total > 0 and count % 100 == 0:
|
|
pct = min(100, count * block * 100 // total)
|
|
log(f" ... {pct}% ({count*block/1_048_576:.0f} MB)")
|
|
urllib.request.urlretrieve(url, dest, reporthook=hook)
|
|
|
|
|
|
def _cv2_imread_unicode(path, flags=None):
|
|
import cv2
|
|
import numpy as np
|
|
|
|
if flags is None:
|
|
flags = cv2.IMREAD_COLOR
|
|
p = str(path)
|
|
img = cv2.imread(p, flags)
|
|
if img is not None:
|
|
return img
|
|
try:
|
|
data = np.fromfile(p, dtype=np.uint8)
|
|
if data.size == 0:
|
|
return None
|
|
return cv2.imdecode(data, flags)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _cv2_imwrite_unicode(path, img, params=None):
|
|
import cv2
|
|
|
|
p = str(path)
|
|
if params is None:
|
|
params = []
|
|
try:
|
|
if cv2.imwrite(p, img, params):
|
|
return True
|
|
except Exception:
|
|
pass
|
|
try:
|
|
ext = Path(p).suffix or ".png"
|
|
ok, buf = cv2.imencode(ext, img, params)
|
|
if not ok:
|
|
return False
|
|
buf.tofile(p)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _path_has_non_ascii(path):
|
|
try:
|
|
s = str(path)
|
|
except Exception:
|
|
return False
|
|
return any(ord(ch) > 127 for ch in s)
|
|
|
|
|
|
def _open_videocapture_unicode(source, log_fn=None):
|
|
import cv2
|
|
import tempfile
|
|
|
|
if isinstance(source, int):
|
|
return cv2.VideoCapture(int(source)), None
|
|
p = str(source)
|
|
cap = cv2.VideoCapture(p)
|
|
if cap.isOpened():
|
|
return cap, None
|
|
tmp_copy = None
|
|
try:
|
|
src_path = Path(p)
|
|
if IS_WINDOWS and src_path.is_file() and _path_has_non_ascii(src_path):
|
|
fd, tmp_name = tempfile.mkstemp(prefix="faceswap_vid_", suffix=(src_path.suffix or ".mp4"), dir=str(SCRIPT_DIR))
|
|
os.close(fd)
|
|
tmp_copy = Path(tmp_name)
|
|
shutil.copy2(src_path, tmp_copy)
|
|
cap2 = cv2.VideoCapture(str(tmp_copy))
|
|
if cap2.isOpened():
|
|
cap.release()
|
|
if log_fn:
|
|
log_fn(" Hinweis: Unicode-Video-Fallback aktiv (temp Datei).")
|
|
return cap2, tmp_copy
|
|
cap2.release()
|
|
except Exception:
|
|
pass
|
|
return cap, None
|
|
|
|
|
|
def _cleanup_temp_file(path):
|
|
if path is None:
|
|
return
|
|
try:
|
|
Path(path).unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def run_setup(log=print):
|
|
log("=" * 60)
|
|
log(" FaceSwap Batch Tool - Modell-Download")
|
|
log("=" * 60)
|
|
|
|
log("\nPruefe NVIDIA GPU / CUDA ...")
|
|
cuda, cuda_ver = _cuda_available()
|
|
log(f" {'OK CUDA ' + cuda_ver + ' -> GPU-Modus' if cuda else 'Info: Kein CUDA -> CPU-Modus'}")
|
|
|
|
log("\nPruefe Installation ...")
|
|
missing = []
|
|
for mod, name in [("cv2","opencv-python"), ("numpy","numpy"), ("insightface","insightface"),
|
|
("onnx","onnx"), ("onnxruntime","onnxruntime"), ("albumentations","albumentations")]:
|
|
try:
|
|
importlib.import_module(mod)
|
|
log(f" OK {name}")
|
|
except Exception:
|
|
log(f" FEHLT: {name}")
|
|
missing.append(name)
|
|
|
|
if missing:
|
|
raise RuntimeError(
|
|
f"Fehlende Pakete: {', '.join(missing)}\n\n"
|
|
"Bitte fuehre zuerst setup.bat aus!"
|
|
)
|
|
|
|
log("\nPruefe GFPGAN ...")
|
|
has_gfpgan = False
|
|
try:
|
|
if importlib.util.find_spec("gfpgan") is None:
|
|
raise ModuleNotFoundError("gfpgan")
|
|
_ensure_torchvision_functional_tensor_alias(log)
|
|
importlib.import_module("gfpgan")
|
|
has_gfpgan = True
|
|
log(" OK gfpgan")
|
|
except ModuleNotFoundError:
|
|
log(" FEHLT: gfpgan - Funktion deaktiviert (optional)")
|
|
except Exception as e:
|
|
log(f" FEHLER: gfpgan installiert, aber nicht importierbar ({e})")
|
|
log(" Funktion deaktiviert (optional)")
|
|
|
|
import numpy as np
|
|
major = int(np.__version__.split(".")[0])
|
|
if major >= 2:
|
|
raise RuntimeError(
|
|
f"numpy {np.__version__} ist installiert, aber numpy<2.0 wird benoetigt.\n\n"
|
|
"Bitte fuehre setup.bat aus um alle Pakete neu zu installieren."
|
|
)
|
|
log(f" OK numpy {np.__version__} (kompatibel)")
|
|
|
|
log("\nPruefe KI-Modelle ...")
|
|
MODELS_DIR.mkdir(exist_ok=True)
|
|
|
|
buffalo_dir = MODELS_DIR / "buffalo_l"
|
|
if buffalo_dir.exists() and any(buffalo_dir.iterdir()):
|
|
log(" OK buffalo_l")
|
|
else:
|
|
log(" Lade buffalo_l (~200 MB) ...")
|
|
zp = MODELS_DIR / "buffalo_l.zip"
|
|
try:
|
|
_dl("https://github.com/deepinsight/insightface/releases/download/v0.7/buffalo_l.zip", zp, log)
|
|
with zipfile.ZipFile(zp) as z:
|
|
z.extractall(MODELS_DIR)
|
|
zp.unlink(missing_ok=True)
|
|
log(" OK buffalo_l")
|
|
except Exception as e:
|
|
zp.unlink(missing_ok=True)
|
|
raise RuntimeError(f"buffalo_l Download fehlgeschlagen: {e}")
|
|
|
|
sm = MODELS_DIR / "inswapper_128.onnx"
|
|
if sm.exists() and sm.stat().st_size > 100_000:
|
|
log(" OK inswapper_128.onnx")
|
|
else:
|
|
log(" Lade inswapper_128.onnx (~500 MB) ...")
|
|
urls = [
|
|
"https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx",
|
|
"https://github.com/deepinsight/insightface/releases/download/v0.7/inswapper_128.onnx",
|
|
]
|
|
ok = False
|
|
for url in urls:
|
|
try:
|
|
log(f" Versuche {url.split('/')[2]} ...")
|
|
_dl(url, sm, log)
|
|
if sm.exists() and sm.stat().st_size > 100_000:
|
|
ok = True
|
|
break
|
|
sm.unlink(missing_ok=True)
|
|
except Exception as e:
|
|
log(f" Fehler: {e}")
|
|
sm.unlink(missing_ok=True)
|
|
if not ok:
|
|
raise RuntimeError(
|
|
"inswapper_128.onnx konnte nicht heruntergeladen werden.\n\n"
|
|
"Manuell herunterladen von:\n"
|
|
" https://huggingface.co/deepinsight/inswapper\n"
|
|
f"Datei ablegen in: {MODELS_DIR}"
|
|
)
|
|
log(" OK inswapper_128.onnx")
|
|
|
|
if has_gfpgan:
|
|
try:
|
|
FaceRestorer(log_fn=log).ensure_model()
|
|
log(" OK GFPGANv1.4.pth")
|
|
except Exception as e:
|
|
log(f" WARNUNG: GFPGAN-Modell konnte nicht vorbereitet werden ({e})")
|
|
|
|
SETUP_FLAG.write_text(f"cuda={cuda}\n")
|
|
log("\nEinrichtung abgeschlossen!")
|
|
|
|
|
|
def _show_setup_window():
|
|
import tkinter as tk
|
|
from tkinter import ttk, messagebox
|
|
import threading
|
|
|
|
root = tk.Tk()
|
|
root.title("FaceSwap - Einrichtung")
|
|
root.geometry("700x520")
|
|
root.configure(bg="#090912")
|
|
root.resizable(False, False)
|
|
|
|
tk.Label(root, text="Einrichtung", font=("Courier New", 17, "bold"),
|
|
bg="#090912", fg="#e8d5b7").pack(pady=(18, 4))
|
|
tk.Label(root, text="Pruefe Pakete & lade KI-Modelle ...",
|
|
font=("Courier New", 9), bg="#090912", fg="#7a7a9a").pack()
|
|
|
|
lf = tk.Frame(root, bg="#090912")
|
|
lf.pack(fill="both", expand=True, padx=18, pady=8)
|
|
lb = tk.Text(lf, bg="#04040a", fg="#8aff8a", font=("Courier New", 8), relief="flat", state="disabled")
|
|
sb = tk.Scrollbar(lf, command=lb.yview)
|
|
lb.configure(yscrollcommand=sb.set)
|
|
lb.pack(side="left", fill="both", expand=True)
|
|
sb.pack(side="right", fill="y")
|
|
|
|
style = ttk.Style(root)
|
|
style.theme_use("default")
|
|
style.configure("S.Horizontal.TProgressbar", troughcolor="#111120", background="#3adf6a", thickness=10)
|
|
pb = ttk.Progressbar(root, mode="indeterminate", length=660, style="S.Horizontal.TProgressbar")
|
|
pb.pack(padx=18, pady=4)
|
|
|
|
sv = tk.StringVar(value="Starte ...")
|
|
tk.Label(root, textvariable=sv, font=("Courier New", 9), bg="#090912", fg="#c8a96a").pack()
|
|
|
|
btn = tk.Button(root, text="Schliessen & Starten", font=("Courier New", 11, "bold"),
|
|
bg="#1a3a2a", fg="#8aff8a", relief="flat", state="disabled",
|
|
cursor="hand2", command=root.destroy)
|
|
btn.pack(pady=10)
|
|
|
|
def append(msg):
|
|
lb.configure(state="normal")
|
|
lb.insert("end", msg + "\n")
|
|
lb.see("end")
|
|
lb.configure(state="disabled")
|
|
root.update_idletasks()
|
|
|
|
def worker():
|
|
pb.start(10)
|
|
try:
|
|
run_setup(log=append)
|
|
sv.set("Fertig!")
|
|
btn.configure(state="normal")
|
|
except Exception as e:
|
|
append(f"\nFehler: {e}")
|
|
sv.set("Fehler - Details im Log")
|
|
messagebox.showerror("Fehler", str(e), parent=root)
|
|
btn.configure(state="normal")
|
|
finally:
|
|
pb.stop()
|
|
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
root.mainloop()
|
|
|
|
|
|
def _get_providers(log_fn=None):
|
|
global _PROVIDERS_CACHE
|
|
if _PROVIDERS_CACHE is not None:
|
|
return list(_PROVIDERS_CACHE)
|
|
try:
|
|
import onnxruntime as ort
|
|
_preload_onnxruntime_cuda(log_fn)
|
|
if "CUDAExecutionProvider" in ort.get_available_providers():
|
|
probe_model = MODELS_DIR / "buffalo_l" / "1k3d68.onnx"
|
|
if probe_model.exists():
|
|
session = ort.InferenceSession(
|
|
str(probe_model),
|
|
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
|
|
)
|
|
if "CUDAExecutionProvider" not in session.get_providers():
|
|
if log_fn:
|
|
log_fn(" CUDA erkannt, aber ONNX Runtime initialisiert nur CPU. CPU-Fallback aktiv.")
|
|
_PROVIDERS_CACHE = ["CPUExecutionProvider"]
|
|
return list(_PROVIDERS_CACHE)
|
|
_PROVIDERS_CACHE = ["CUDAExecutionProvider", "CPUExecutionProvider"]
|
|
return list(_PROVIDERS_CACHE)
|
|
except Exception:
|
|
pass
|
|
_PROVIDERS_CACHE = ["CPUExecutionProvider"]
|
|
return list(_PROVIDERS_CACHE)
|
|
|
|
|
|
def _enhance_face_region(img, bbox, sharpen=True, color_correct=True):
|
|
import cv2, numpy as np
|
|
x1, y1, x2, y2 = [int(v) for v in bbox]
|
|
pad = 10
|
|
x1c = max(0, x1 - pad); y1c = max(0, y1 - pad)
|
|
x2c = min(img.shape[1], x2 + pad); y2c = min(img.shape[0], y2 + pad)
|
|
h, w = y2c - y1c, x2c - x1c
|
|
if h <= 0 or w <= 0:
|
|
return img
|
|
original_region = img[y1c:y2c, x1c:x2c].copy()
|
|
enhanced = original_region.copy()
|
|
if sharpen:
|
|
blur = cv2.GaussianBlur(enhanced, (0, 0), 2)
|
|
enhanced = cv2.addWeighted(enhanced, 1.4, blur, -0.4, 0)
|
|
mask = np.zeros((h, w), dtype=np.float32)
|
|
border = max(8, min(h, w) // 6)
|
|
mask[border:-border, border:-border] = 1.0
|
|
mask = cv2.GaussianBlur(mask, (0, 0), border * 0.8)
|
|
mask = np.clip(mask, 0, 1)[:, :, np.newaxis]
|
|
blended = (enhanced.astype(np.float32) * mask +
|
|
original_region.astype(np.float32) * (1 - mask))
|
|
img[y1c:y2c, x1c:x2c] = np.clip(blended, 0, 255).astype(np.uint8)
|
|
return img
|
|
|
|
|
|
def _match_face_color(swapped_img, original_img, bbox):
|
|
import cv2, numpy as np
|
|
x1, y1, x2, y2 = [int(v) for v in bbox]
|
|
pad = 5
|
|
x1c = max(0, x1 - pad); y1c = max(0, y1 - pad)
|
|
x2c = min(swapped_img.shape[1], x2 + pad); y2c = min(swapped_img.shape[0], y2 + pad)
|
|
src_region = original_img[y1c:y2c, x1c:x2c].astype(np.float32)
|
|
dst_region = swapped_img[y1c:y2c, x1c:x2c].astype(np.float32)
|
|
if src_region.size == 0 or dst_region.size == 0:
|
|
return swapped_img
|
|
for c in range(3):
|
|
src_mean, src_std = src_region[:,:,c].mean(), src_region[:,:,c].std() + 1e-6
|
|
dst_mean, dst_std = dst_region[:,:,c].mean(), dst_region[:,:,c].std() + 1e-6
|
|
factor = 0.5
|
|
dst_region[:,:,c] = (dst_region[:,:,c] - dst_mean) * (src_std / dst_std) * factor \
|
|
+ dst_mean * factor + dst_region[:,:,c] * (1 - factor)
|
|
dst_region = np.clip(dst_region, 0, 255).astype(np.uint8)
|
|
result = swapped_img.copy()
|
|
result[y1c:y2c, x1c:x2c] = dst_region
|
|
return result
|
|
|
|
|
|
class FaceRestorer:
|
|
MODEL_URL = "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth"
|
|
MODEL_PATH = MODELS_DIR / "GFPGANv1.4.pth"
|
|
_FACEXLIB_FILES = {
|
|
"detection_Resnet50_Final.pth": "https://github.com/xinntao/facexlib/releases/download/v0.1.0/detection_Resnet50_Final.pth",
|
|
"parsing_parsenet.pth": "https://github.com/xinntao/facexlib/releases/download/v0.2.2/parsing_parsenet.pth",
|
|
}
|
|
|
|
def __init__(self, log_fn=print):
|
|
self.log = log_fn
|
|
self.restorer = None
|
|
self.upscale = 1
|
|
self.only_center_face = False
|
|
self.min_face_px = 72
|
|
self.blend_alpha = 0.72
|
|
self.pad_ratio = 0.22
|
|
|
|
@staticmethod
|
|
def is_available(log_fn=None):
|
|
import importlib
|
|
import importlib.util
|
|
if importlib.util.find_spec("gfpgan") is None:
|
|
return False
|
|
_ensure_torchvision_functional_tensor_alias(log_fn)
|
|
try:
|
|
importlib.import_module("gfpgan")
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def ensure_model(self):
|
|
MODELS_DIR.mkdir(exist_ok=True)
|
|
if not self.MODEL_PATH.exists() or self.MODEL_PATH.stat().st_size < 100_000:
|
|
self.log(" Lade GFPGANv1.4.pth (~330 MB) ...")
|
|
_dl(self.MODEL_URL, self.MODEL_PATH, self.log)
|
|
self._ensure_facexlib_weights()
|
|
|
|
def _ensure_facexlib_weights(self):
|
|
if not self.is_available():
|
|
return
|
|
try:
|
|
import facexlib
|
|
weights_dir = Path(facexlib.__file__).resolve().parent / "weights"
|
|
weights_dir.mkdir(parents=True, exist_ok=True)
|
|
for filename, url in self._FACEXLIB_FILES.items():
|
|
dst = weights_dir / filename
|
|
if dst.exists() and dst.stat().st_size > 100_000:
|
|
continue
|
|
self.log(f" Lade {filename} ...")
|
|
_dl(url, dst, self.log)
|
|
except Exception as e:
|
|
self.log(f" Hinweis: facexlib-Modelle konnten nicht vorab geladen werden ({e}).")
|
|
|
|
def load(self):
|
|
if self.restorer is not None:
|
|
return
|
|
if not self.is_available(log_fn=self.log):
|
|
raise RuntimeError("GFPGAN ist nicht installiert.")
|
|
self.ensure_model()
|
|
_ensure_torchvision_functional_tensor_alias(self.log)
|
|
from gfpgan import GFPGANer
|
|
providers = _get_providers(self.log)
|
|
device = "cuda" if "CUDAExecutionProvider" in providers else "cpu"
|
|
self.restorer = GFPGANer(
|
|
model_path=str(self.MODEL_PATH),
|
|
upscale=self.upscale,
|
|
arch="clean",
|
|
channel_multiplier=2,
|
|
bg_upsampler=None,
|
|
device=device,
|
|
)
|
|
|
|
def restore(self, bgr_img):
|
|
import cv2
|
|
import numpy as np
|
|
|
|
if bgr_img is None:
|
|
return bgr_img
|
|
try:
|
|
self.load()
|
|
_, _, restored = self.restorer.enhance(
|
|
bgr_img,
|
|
has_aligned=False,
|
|
only_center_face=self.only_center_face,
|
|
paste_back=True,
|
|
)
|
|
if restored is None:
|
|
return bgr_img
|
|
if restored.shape[:2] != bgr_img.shape[:2]:
|
|
restored = cv2.resize(restored, (bgr_img.shape[1], bgr_img.shape[0]), interpolation=cv2.INTER_AREA)
|
|
return np.clip(restored, 0, 255).astype(np.uint8)
|
|
except Exception as e:
|
|
self.log(f" Hinweis: GFPGAN-Restore fehlgeschlagen ({e}).")
|
|
return bgr_img
|
|
|
|
@staticmethod
|
|
def _bbox_to_rect(bbox, img_w, img_h, pad_ratio=0.22):
|
|
try:
|
|
x1, y1, x2, y2 = [float(v) for v in bbox]
|
|
except Exception:
|
|
return None
|
|
bw = max(1.0, x2 - x1)
|
|
bh = max(1.0, y2 - y1)
|
|
pad_x = max(8, int(round(bw * pad_ratio)))
|
|
pad_y = max(8, int(round(bh * pad_ratio)))
|
|
rx1 = max(0, int(round(x1)) - pad_x)
|
|
ry1 = max(0, int(round(y1)) - pad_y)
|
|
rx2 = min(int(img_w), int(round(x2)) + pad_x)
|
|
ry2 = min(int(img_h), int(round(y2)) + pad_y)
|
|
if rx2 <= rx1 + 2 or ry2 <= ry1 + 2:
|
|
return None
|
|
return rx1, ry1, rx2, ry2
|
|
|
|
def restore_faces(self, bgr_img, bboxes):
|
|
import cv2
|
|
import numpy as np
|
|
|
|
if bgr_img is None:
|
|
return bgr_img
|
|
if not bboxes:
|
|
return bgr_img
|
|
try:
|
|
self.load()
|
|
except Exception as e:
|
|
self.log(f" Hinweis: GFPGAN nicht bereit ({e}).")
|
|
return bgr_img
|
|
|
|
out = bgr_img.copy()
|
|
h, w = out.shape[:2]
|
|
restored_any = False
|
|
for bbox in bboxes:
|
|
rect = self._bbox_to_rect(bbox, w, h, pad_ratio=self.pad_ratio)
|
|
if rect is None:
|
|
continue
|
|
x1, y1, x2, y2 = rect
|
|
rw = x2 - x1
|
|
rh = y2 - y1
|
|
if rw < self.min_face_px or rh < self.min_face_px:
|
|
continue
|
|
crop = out[y1:y2, x1:x2]
|
|
if crop.size == 0:
|
|
continue
|
|
try:
|
|
_, _, restored = self.restorer.enhance(
|
|
crop,
|
|
has_aligned=False,
|
|
only_center_face=True,
|
|
paste_back=True,
|
|
)
|
|
except Exception:
|
|
continue
|
|
if restored is None:
|
|
continue
|
|
if restored.shape[:2] != crop.shape[:2]:
|
|
restored = cv2.resize(restored, (crop.shape[1], crop.shape[0]), interpolation=cv2.INTER_AREA)
|
|
alpha = max(0.0, min(1.0, float(self.blend_alpha)))
|
|
mixed = cv2.addWeighted(restored.astype(np.float32), alpha, crop.astype(np.float32), 1.0 - alpha, 0.0)
|
|
out[y1:y2, x1:x2] = np.clip(mixed, 0, 255).astype(np.uint8)
|
|
restored_any = True
|
|
if restored_any:
|
|
return out
|
|
return bgr_img
|
|
|
|
|
|
class FaceLibrary:
|
|
ROOT = SCRIPT_DIR / "face_library"
|
|
|
|
def __init__(self, log_fn=print):
|
|
self.log = log_fn
|
|
self.ROOT.mkdir(exist_ok=True)
|
|
self._detector = None
|
|
|
|
@staticmethod
|
|
def _slugify(name):
|
|
import re
|
|
import unicodedata
|
|
|
|
raw = (name or "").strip().lower()
|
|
if not raw:
|
|
raw = "gesicht"
|
|
raw = unicodedata.normalize("NFKD", raw)
|
|
raw = raw.encode("ascii", "ignore").decode("ascii")
|
|
raw = raw.replace(" ", "_")
|
|
raw = re.sub(r"[^a-z0-9_]+", "", raw)
|
|
raw = re.sub(r"_+", "_", raw).strip("_")
|
|
return raw or "gesicht"
|
|
|
|
def _ensure_detector(self):
|
|
if self._detector is not None:
|
|
return self._detector
|
|
import cv2
|
|
|
|
cascade_path = Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml"
|
|
detector = cv2.CascadeClassifier(str(cascade_path))
|
|
if detector.empty():
|
|
raise RuntimeError("Gesichtsdetektor konnte nicht geladen werden.")
|
|
self._detector = detector
|
|
return detector
|
|
|
|
def _detect_largest_face(self, bgr_img):
|
|
import cv2
|
|
|
|
det = self._ensure_detector()
|
|
gray = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2GRAY)
|
|
faces = det.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30))
|
|
if len(faces) == 0:
|
|
return None
|
|
x, y, w, h = max(faces, key=lambda f: int(f[2]) * int(f[3]))
|
|
return int(x), int(y), int(w), int(h)
|
|
|
|
def add(self, name, image_path):
|
|
import cv2
|
|
import datetime
|
|
|
|
image_path = Path(image_path)
|
|
if not image_path.is_file():
|
|
raise RuntimeError(f"Datei nicht gefunden: {image_path}")
|
|
|
|
src_img = _cv2_imread_unicode(image_path)
|
|
if src_img is None:
|
|
raise RuntimeError("Bild konnte nicht geladen werden.")
|
|
|
|
h0, w0 = src_img.shape[:2]
|
|
max_side = max(h0, w0)
|
|
scale = min(1.0, 1920.0 / max(1.0, float(max_side)))
|
|
if scale < 1.0:
|
|
work = cv2.resize(src_img, (int(round(w0 * scale)), int(round(h0 * scale))), interpolation=cv2.INTER_AREA)
|
|
else:
|
|
work = src_img
|
|
|
|
face = self._detect_largest_face(work)
|
|
if face is None:
|
|
raise RuntimeError("Kein Gesicht im Bild erkannt.")
|
|
fx, fy, fw, fh = face
|
|
inv = 1.0 / max(scale, 1e-9)
|
|
fx = int(round(fx * inv)); fy = int(round(fy * inv))
|
|
fw = int(round(fw * inv)); fh = int(round(fh * inv))
|
|
|
|
pad_x = int(round(fw * 0.20))
|
|
pad_y = int(round(fh * 0.20))
|
|
x1 = max(0, fx - pad_x); y1 = max(0, fy - pad_y)
|
|
x2 = min(w0, fx + fw + pad_x); y2 = min(h0, fy + fh + pad_y)
|
|
crop = src_img[y1:y2, x1:x2]
|
|
if crop.size == 0:
|
|
raise RuntimeError("Kein gueltiger Gesichtsausschnitt gefunden.")
|
|
|
|
base_slug = self._slugify(name)
|
|
slug = base_slug
|
|
idx = 2
|
|
while (self.ROOT / slug).exists():
|
|
slug = f"{base_slug}_{idx}"
|
|
idx += 1
|
|
|
|
face_dir = self.ROOT / slug
|
|
face_dir.mkdir(parents=True, exist_ok=True)
|
|
source_path = face_dir / "source.jpg"
|
|
thumb_path = face_dir / "thumb.png"
|
|
meta_path = face_dir / "meta.json"
|
|
|
|
if not _cv2_imwrite_unicode(source_path, src_img):
|
|
raise RuntimeError("source.jpg konnte nicht gespeichert werden.")
|
|
thumb = cv2.resize(crop, (96, 96), interpolation=cv2.INTER_AREA)
|
|
if not _cv2_imwrite_unicode(thumb_path, thumb):
|
|
raise RuntimeError("thumb.png konnte nicht gespeichert werden.")
|
|
|
|
meta = {
|
|
"slug": slug,
|
|
"name": (name or "").strip() or slug,
|
|
"added": datetime.date.today().isoformat(),
|
|
}
|
|
meta_path.write_text(json.dumps(meta, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
return {
|
|
"slug": slug,
|
|
"name": meta["name"],
|
|
"source_path": str(source_path),
|
|
"thumb_path": str(thumb_path),
|
|
"added": meta["added"],
|
|
}
|
|
|
|
def remove(self, slug):
|
|
slug = self._slugify(slug)
|
|
target = self.ROOT / slug
|
|
if target.exists():
|
|
shutil.rmtree(target, ignore_errors=True)
|
|
|
|
def list_entries(self):
|
|
entries = []
|
|
if not self.ROOT.exists():
|
|
return entries
|
|
for d in self.ROOT.iterdir():
|
|
if not d.is_dir():
|
|
continue
|
|
source_path = d / "source.jpg"
|
|
meta_path = d / "meta.json"
|
|
thumb_path = d / "thumb.png"
|
|
if not source_path.exists():
|
|
self.log(f"WARNUNG Bibliothekseintrag ohne source.jpg uebersprungen: {d.name}")
|
|
continue
|
|
name = d.name
|
|
added = ""
|
|
if meta_path.exists():
|
|
try:
|
|
meta = json.loads(meta_path.read_text(encoding="utf-8"))
|
|
name = (meta.get("name") or name).strip() or name
|
|
added = (meta.get("added") or "").strip()
|
|
except Exception:
|
|
pass
|
|
entries.append({
|
|
"slug": d.name,
|
|
"name": name,
|
|
"source_path": str(source_path),
|
|
"thumb_path": str(thumb_path),
|
|
"added": added,
|
|
})
|
|
entries.sort(key=lambda x: x["name"].lower())
|
|
return entries
|
|
|
|
def get_source_path(self, slug):
|
|
p = self.ROOT / self._slugify(slug) / "source.jpg"
|
|
if not p.exists():
|
|
raise RuntimeError(f"Eintrag nicht gefunden: {slug}")
|
|
return p
|
|
|
|
|
|
class FaceSwapper:
|
|
def __init__(self, log_fn=print):
|
|
self.log = log_fn
|
|
self.app = self.swapper = None
|
|
self.restorer = None
|
|
self.use_restoration = False
|
|
self._restoration_warned_cpu = False
|
|
self.enhance = True
|
|
self.color = True
|
|
self.high_sensitivity = True
|
|
self.det_size = (640, 640)
|
|
self.det_thresh = 0.5
|
|
# Video-Stabilitaet: nur konsistente Face-Tracks werden geswappt.
|
|
self.video_track_single_face = True
|
|
self.video_min_det_score = 0.30
|
|
self.video_start_det_score = 0.30
|
|
self.video_new_face_det_score = 0.55
|
|
self.video_min_face_size_px = 32
|
|
self.video_min_iou = 0.02
|
|
self.video_max_center_jump = 0.28
|
|
self.video_min_area_ratio = 0.20
|
|
self.video_max_area_ratio = 4.00
|
|
self.video_track_memory = 10
|
|
self.video_min_embed_sim = 0.05
|
|
self.video_track_accept_score = 0.15 # was 0.30 — too strict, rejected valid tracked faces
|
|
self.video_fade_in_step = 1.0
|
|
self.video_fade_out_step = 0.0
|
|
self.video_occlusion_hold = 0
|
|
self.video_occlusion_mouth_ratio_min = 0.30 # was 0.58 — too aggressive, falsely blocked swap
|
|
self.video_occlusion_texture_drop = 0.30 # was 0.55 — too sensitive to normal lighting change
|
|
self.video_occluder_diff_thresh = 24
|
|
self.video_occluder_min_coverage = 0.08
|
|
self.video_occluder_max_coverage = 0.46
|
|
self.video_clean_ref_max_coverage = 0.35 # was 0.05 — too strict, clean_original never saved
|
|
self.video_abs_mouth_width_min = 0.10 # was 0.45 — way too high, flagged almost all faces
|
|
self.video_abs_mouth_drop_min = 0.10 # was 0.32 — too high, caused false occlusion
|
|
|
|
def init_models(self):
|
|
providers = _get_providers(self.log)
|
|
from insightface.app import FaceAnalysis
|
|
import insightface.model_zoo as mz
|
|
gpu = "CUDAExecutionProvider" in providers
|
|
self.log(f" {'GPU (CUDA) ' if gpu else 'CPU'}")
|
|
self.log(" Lade buffalo_l ...")
|
|
preferred_det_size = (1024, 1024) if gpu else (768, 768)
|
|
self.det_thresh = 0.25
|
|
self.app = FaceAnalysis(name="buffalo_l", root=str(MODELS_DIR.parent), providers=providers)
|
|
try:
|
|
self.app.prepare(ctx_id=0 if gpu else -1, det_size=preferred_det_size, det_thresh=self.det_thresh)
|
|
self.det_size = preferred_det_size
|
|
except Exception as e:
|
|
fallback_size = (640, 640)
|
|
self.log(f" Hohe Detektions-Aufloesung fehlgeschlagen ({e}). Fallback auf {fallback_size[0]}x{fallback_size[1]}.")
|
|
self.app = FaceAnalysis(name="buffalo_l", root=str(MODELS_DIR.parent), providers=providers)
|
|
self.app.prepare(ctx_id=0 if gpu else -1, det_size=fallback_size, det_thresh=self.det_thresh)
|
|
self.det_size = fallback_size
|
|
self.log(f" Detektion: {self.det_size[0]}x{self.det_size[1]}, Schwelle {self.det_thresh:.2f}")
|
|
swap_path = MODELS_DIR / "inswapper_128.onnx"
|
|
if not swap_path.exists():
|
|
raise RuntimeError(f"inswapper_128.onnx fehlt in {MODELS_DIR}")
|
|
self.log(" Lade inswapper ...")
|
|
self.swapper = mz.get_model(str(swap_path), providers=providers)
|
|
self.log(" Modelle geladen.")
|
|
|
|
@staticmethod
|
|
def _face_area(face):
|
|
x1, y1, x2, y2 = [float(v) for v in face.bbox]
|
|
return max(0.0, x2 - x1) * max(0.0, y2 - y1)
|
|
|
|
def _pick_primary_face(self, faces):
|
|
if not faces:
|
|
return None
|
|
return max(faces, key=lambda f: (self._face_area(f), float(getattr(f, "det_score", 0.0))))
|
|
|
|
@staticmethod
|
|
def _norm_face_metrics(face, w, h):
|
|
x1, y1, x2, y2 = [float(v) for v in face.bbox]
|
|
bw = max(1.0, x2 - x1)
|
|
bh = max(1.0, y2 - y1)
|
|
nx1 = max(0.0, min(1.0, x1 / max(1.0, float(w))))
|
|
ny1 = max(0.0, min(1.0, y1 / max(1.0, float(h))))
|
|
nx2 = max(0.0, min(1.0, x2 / max(1.0, float(w))))
|
|
ny2 = max(0.0, min(1.0, y2 / max(1.0, float(h))))
|
|
cx = (nx1 + nx2) * 0.5
|
|
cy = (ny1 + ny2) * 0.5
|
|
area = max(1e-6, (nx2 - nx1) * (ny2 - ny1))
|
|
return {
|
|
"bbox": (nx1, ny1, nx2, ny2),
|
|
"cx": cx,
|
|
"cy": cy,
|
|
"area": area,
|
|
"px_w": bw,
|
|
"px_h": bh,
|
|
"score": float(getattr(face, "det_score", 0.0)),
|
|
}
|
|
|
|
@staticmethod
|
|
def _extract_embedding(face):
|
|
import numpy as np
|
|
emb = getattr(face, "normed_embedding", None)
|
|
if emb is None:
|
|
emb = getattr(face, "embedding", None)
|
|
if emb is None:
|
|
return None
|
|
arr = np.asarray(emb, dtype=np.float32).reshape(-1)
|
|
if arr.size == 0:
|
|
return None
|
|
norm = float(np.linalg.norm(arr))
|
|
if norm < 1e-8:
|
|
return None
|
|
return arr / norm
|
|
|
|
@staticmethod
|
|
def _extract_kps(face, w, h):
|
|
import numpy as np
|
|
kps = getattr(face, "kps", None)
|
|
if kps is None:
|
|
return None
|
|
arr = np.asarray(kps, dtype=np.float32)
|
|
if arr.ndim != 2 or arr.shape[1] != 2 or arr.shape[0] < 3:
|
|
return None
|
|
arr = arr[:5, :].copy()
|
|
arr[:, 0] = np.clip(arr[:, 0] / max(1.0, float(w)), 0.0, 1.0)
|
|
arr[:, 1] = np.clip(arr[:, 1] / max(1.0, float(h)), 0.0, 1.0)
|
|
return arr
|
|
|
|
@staticmethod
|
|
def _embedding_similarity(a, b):
|
|
import numpy as np
|
|
if a is None or b is None:
|
|
return None
|
|
return float(np.clip(np.dot(a, b), -1.0, 1.0))
|
|
|
|
@staticmethod
|
|
def _kps_similarity(a, b):
|
|
import numpy as np
|
|
if a is None or b is None:
|
|
return None
|
|
n = min(int(a.shape[0]), int(b.shape[0]))
|
|
if n < 3:
|
|
return None
|
|
dist = float(np.linalg.norm(a[:n] - b[:n], axis=1).mean())
|
|
# 0.0 Distanz => 1.0 Similarity; >0.20 gilt als deutlich instabil.
|
|
return max(0.0, min(1.0, 1.0 - dist / 0.20))
|
|
|
|
def _build_video_face_entry(self, face, w, h):
|
|
m = self._norm_face_metrics(face, w, h)
|
|
return {
|
|
"face": face,
|
|
"bbox": m["bbox"],
|
|
"cx": m["cx"],
|
|
"cy": m["cy"],
|
|
"area": m["area"],
|
|
"px_w": m["px_w"],
|
|
"px_h": m["px_h"],
|
|
"det_score": m["score"],
|
|
"embedding": self._extract_embedding(face),
|
|
"kps": self._extract_kps(face, w, h),
|
|
}
|
|
|
|
@staticmethod
|
|
def _trim_track_entry(entry):
|
|
return {
|
|
"bbox": entry["bbox"],
|
|
"cx": entry["cx"],
|
|
"cy": entry["cy"],
|
|
"area": entry["area"],
|
|
"det_score": entry["det_score"],
|
|
"embedding": entry["embedding"],
|
|
"kps": entry["kps"],
|
|
}
|
|
|
|
@staticmethod
|
|
def _kps_geometry(kps):
|
|
import numpy as np
|
|
if kps is None:
|
|
return None
|
|
arr = np.asarray(kps, dtype=np.float32)
|
|
if arr.ndim != 2 or arr.shape[0] < 5 or arr.shape[1] != 2:
|
|
return None
|
|
eye_a, eye_b, nose, mouth_a, mouth_b = arr[:5]
|
|
eye_dist = float(np.linalg.norm(eye_a - eye_b))
|
|
if eye_dist < 1e-6:
|
|
return None
|
|
eye_mid = (eye_a + eye_b) * 0.5
|
|
mouth_mid = (mouth_a + mouth_b) * 0.5
|
|
return {
|
|
"eye_dist": eye_dist,
|
|
"mouth_width": float(np.linalg.norm(mouth_a - mouth_b)) / eye_dist,
|
|
"nose_drop": float(np.linalg.norm(nose - eye_mid)) / eye_dist,
|
|
"mouth_drop": float(np.linalg.norm(mouth_mid - nose)) / eye_dist,
|
|
"mouth_offset_y": float((mouth_mid[1] - eye_mid[1]) / eye_dist),
|
|
}
|
|
|
|
@staticmethod
|
|
def _bbox_to_pixel_rect(bbox, w, h):
|
|
x1 = max(0, min(w - 1, int(round(float(bbox[0]) * w))))
|
|
y1 = max(0, min(h - 1, int(round(float(bbox[1]) * h))))
|
|
x2 = max(0, min(w, int(round(float(bbox[2]) * w))))
|
|
y2 = max(0, min(h, int(round(float(bbox[3]) * h))))
|
|
if x2 <= x1 + 2 or y2 <= y1 + 2:
|
|
return None
|
|
return x1, y1, x2, y2
|
|
|
|
@staticmethod
|
|
def _face_texture_ratio(gray, bbox):
|
|
import cv2
|
|
h, w = gray.shape[:2]
|
|
rect = FaceSwapper._bbox_to_pixel_rect(bbox, w, h)
|
|
if rect is None:
|
|
return None
|
|
x1, y1, x2, y2 = rect
|
|
fh = y2 - y1
|
|
if fh < 16:
|
|
return None
|
|
upper_end = y1 + int(fh * 0.42)
|
|
lower_start = y1 + int(fh * 0.50)
|
|
if upper_end <= y1 + 4 or lower_start >= y2 - 4:
|
|
return None
|
|
upper = gray[y1:upper_end, x1:x2]
|
|
lower = gray[lower_start:y2, x1:x2]
|
|
if upper.size == 0 or lower.size == 0:
|
|
return None
|
|
upper_var = float(cv2.Laplacian(upper, cv2.CV_32F).var())
|
|
lower_var = float(cv2.Laplacian(lower, cv2.CV_32F).var())
|
|
if upper_var < 1e-6:
|
|
return None
|
|
ratio = lower_var / upper_var
|
|
return max(0.0, min(4.0, ratio))
|
|
|
|
@staticmethod
|
|
def _blend_frames(swapped, original, alpha):
|
|
import cv2
|
|
if alpha <= 1e-3:
|
|
return original
|
|
if alpha >= 1.0 - 1e-3:
|
|
return swapped
|
|
return cv2.addWeighted(swapped, float(alpha), original, float(1.0 - alpha), 0.0)
|
|
|
|
@staticmethod
|
|
def _keep_border_components(raw_mask, min_area_ratio=0.015, lateral_only=False):
|
|
import cv2, numpy as np
|
|
if raw_mask is None or raw_mask.size == 0:
|
|
return raw_mask
|
|
h, w = raw_mask.shape[:2]
|
|
labels_count, labels, stats, _ = cv2.connectedComponentsWithStats((raw_mask > 0).astype(np.uint8), 8)
|
|
kept = np.zeros((h, w), dtype=np.uint8)
|
|
min_area = max(8, int(h * w * min_area_ratio))
|
|
border = max(3, int(min(h, w) * 0.06))
|
|
for label in range(1, labels_count):
|
|
x, y, bw, bh, area = stats[label]
|
|
if area < min_area:
|
|
continue
|
|
if lateral_only:
|
|
touches_border = x <= border or x + bw >= w - border
|
|
else:
|
|
touches_border = (
|
|
x <= border or y <= border or
|
|
x + bw >= w - border or y + bh >= h - border
|
|
)
|
|
if touches_border:
|
|
kept[labels == label] = 255
|
|
return kept
|
|
|
|
@staticmethod
|
|
def _apply_float_mask(foreground, background, mask):
|
|
import numpy as np
|
|
if mask is None:
|
|
return foreground
|
|
m = np.clip(mask.astype(np.float32), 0.0, 1.0)
|
|
if m.ndim == 2:
|
|
m = m[:, :, None]
|
|
out = foreground.astype(np.float32) * (1.0 - m) + background.astype(np.float32) * m
|
|
return np.clip(out, 0, 255).astype(np.uint8)
|
|
|
|
@staticmethod
|
|
def _soft_bbox_mask(shape, bbox):
|
|
import cv2, numpy as np
|
|
h, w = shape[:2]
|
|
rect = FaceSwapper._bbox_to_pixel_rect(bbox, w, h)
|
|
mask = np.zeros((h, w), dtype=np.float32)
|
|
if rect is None:
|
|
return mask
|
|
x1, y1, x2, y2 = rect
|
|
bw = x2 - x1
|
|
bh = y2 - y1
|
|
pad_x = max(2, int(bw * 0.05))
|
|
pad_y = max(2, int(bh * 0.06))
|
|
mask[y1 + pad_y:y2 - pad_y, x1 + pad_x:x2 - pad_x] = 1.0
|
|
mask = cv2.GaussianBlur(mask, (0, 0), max(3.0, min(bw, bh) * 0.045))
|
|
return np.clip(mask, 0.0, 1.0)
|
|
|
|
def _occluder_mask_from_reference(self, original, bbox, track_state):
|
|
import cv2, numpy as np
|
|
if track_state is None or bbox is None:
|
|
return None, 0.0
|
|
ref = track_state.get("clean_original")
|
|
if ref is None:
|
|
return None, 0.0
|
|
if getattr(ref, "shape", None) != original.shape:
|
|
ref = cv2.resize(ref, (original.shape[1], original.shape[0]), interpolation=cv2.INTER_AREA)
|
|
h, w = original.shape[:2]
|
|
rect = self._bbox_to_pixel_rect(bbox, w, h)
|
|
if rect is None:
|
|
return None, 0.0
|
|
x1, y1, x2, y2 = rect
|
|
cur_roi = original[y1:y2, x1:x2]
|
|
ref_roi = ref[y1:y2, x1:x2]
|
|
if cur_roi.size == 0 or ref_roi.size == 0:
|
|
return None, 0.0
|
|
diff = cv2.absdiff(cur_roi, ref_roi)
|
|
diff_gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
|
|
_, raw = cv2.threshold(diff_gray, self.video_occluder_diff_thresh, 255, cv2.THRESH_BINARY)
|
|
k = max(3, int(min(x2 - x1, y2 - y1) * 0.045) | 1)
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
|
|
raw = cv2.morphologyEx(raw, cv2.MORPH_CLOSE, kernel, iterations=2)
|
|
raw = cv2.morphologyEx(raw, cv2.MORPH_OPEN, kernel, iterations=1)
|
|
raw = self._keep_border_components(raw, min_area_ratio=0.012, lateral_only=True)
|
|
coverage = float((raw > 0).mean())
|
|
if coverage < self.video_occluder_min_coverage:
|
|
return None, coverage
|
|
soft = cv2.GaussianBlur(raw.astype(np.float32) / 255.0, (0, 0), max(2.0, k * 0.6))
|
|
mask = np.zeros((h, w), dtype=np.float32)
|
|
mask[y1:y2, x1:x2] = np.clip(soft, 0.0, 1.0)
|
|
return mask, coverage
|
|
|
|
def _carry_forward_swap(self, original, track_state):
|
|
import cv2, numpy as np
|
|
if track_state is None:
|
|
return original, 0
|
|
last_result = track_state.get("last_result")
|
|
bbox = track_state.get("last_bbox")
|
|
if last_result is None or bbox is None:
|
|
return original, 0
|
|
if getattr(last_result, "shape", None) != original.shape:
|
|
last_result = cv2.resize(last_result, (original.shape[1], original.shape[0]), interpolation=cv2.INTER_AREA)
|
|
face_mask = self._soft_bbox_mask(original.shape, bbox)
|
|
occ_mask, _ = self._occluder_mask_from_reference(original, bbox, track_state)
|
|
current_mask, _ = self._current_occluder_mask(original, bbox)
|
|
if current_mask is not None:
|
|
occ_mask = current_mask if occ_mask is None else np.maximum(occ_mask, current_mask)
|
|
if occ_mask is not None:
|
|
face_mask = face_mask * (1.0 - occ_mask)
|
|
return self._apply_float_mask(original, last_result, face_mask), 1
|
|
|
|
def _current_occluder_mask(self, original, bbox):
|
|
import cv2, numpy as np
|
|
h, w = original.shape[:2]
|
|
rect = self._bbox_to_pixel_rect(bbox, w, h)
|
|
if rect is None:
|
|
return None, 0.0
|
|
x1, y1, x2, y2 = rect
|
|
roi = original[y1:y2, x1:x2]
|
|
if roi.size == 0:
|
|
return None, 0.0
|
|
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
|
ycrcb = cv2.cvtColor(roi, cv2.COLOR_BGR2YCrCb)
|
|
skin = cv2.inRange(ycrcb, (0, 133, 77), (255, 184, 138))
|
|
dark_lace = cv2.inRange(gray, 0, 92)
|
|
raw = cv2.bitwise_or(skin, dark_lace)
|
|
fh, fw = gray.shape[:2]
|
|
gate = np.zeros((fh, fw), dtype=np.uint8)
|
|
gate[int(fh * 0.10):int(fh * 0.94), :] = 255
|
|
raw = cv2.bitwise_and(raw, gate)
|
|
k = max(3, int(min(fw, fh) * 0.045) | 1)
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
|
|
raw = cv2.morphologyEx(raw, cv2.MORPH_CLOSE, kernel, iterations=1)
|
|
raw = cv2.morphologyEx(raw, cv2.MORPH_OPEN, kernel, iterations=1)
|
|
raw = self._keep_border_components(raw, min_area_ratio=0.010, lateral_only=True)
|
|
coverage = float((raw > 0).mean())
|
|
if coverage < self.video_occluder_min_coverage:
|
|
return None, coverage
|
|
soft = cv2.GaussianBlur(raw.astype(np.float32) / 255.0, (0, 0), max(2.0, k * 0.55))
|
|
mask = np.zeros((h, w), dtype=np.float32)
|
|
mask[y1:y2, x1:x2] = np.clip(soft, 0.0, 1.0)
|
|
return mask, coverage
|
|
|
|
def _smooth_skin_occluder_mask(self, original, bbox):
|
|
import cv2, numpy as np
|
|
h, w = original.shape[:2]
|
|
rect = self._bbox_to_pixel_rect(bbox, w, h)
|
|
if rect is None:
|
|
return None, 0.0
|
|
x1, y1, x2, y2 = rect
|
|
roi = original[y1:y2, x1:x2]
|
|
if roi.size == 0:
|
|
return None, 0.0
|
|
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
|
ycrcb = cv2.cvtColor(roi, cv2.COLOR_BGR2YCrCb)
|
|
skin = cv2.inRange(ycrcb, (0, 133, 77), (255, 180, 135))
|
|
texture = cv2.Laplacian(gray, cv2.CV_32F)
|
|
low_texture = (np.abs(texture) < 8.0).astype(np.uint8) * 255
|
|
raw = cv2.bitwise_and(skin, low_texture)
|
|
fh, fw = gray.shape[:2]
|
|
gate = np.zeros((fh, fw), dtype=np.uint8)
|
|
gate[int(fh * 0.22):int(fh * 0.90), int(fw * 0.12):int(fw * 0.94)] = 255
|
|
raw = cv2.bitwise_and(raw, gate)
|
|
k = max(5, int(min(fw, fh) * 0.055) | 1)
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
|
|
raw = cv2.morphologyEx(raw, cv2.MORPH_CLOSE, kernel, iterations=2)
|
|
raw = cv2.morphologyEx(raw, cv2.MORPH_OPEN, kernel, iterations=1)
|
|
raw = self._keep_border_components(raw, min_area_ratio=0.025, lateral_only=True)
|
|
coverage = float((raw > 0).mean())
|
|
if coverage < 0.12:
|
|
return None, coverage
|
|
if coverage > 0.42:
|
|
return None, coverage
|
|
soft = cv2.GaussianBlur(raw.astype(np.float32) / 255.0, (0, 0), max(2.0, k * 0.5))
|
|
mask = np.zeros((h, w), dtype=np.float32)
|
|
mask[y1:y2, x1:x2] = np.clip(soft, 0.0, 1.0)
|
|
return mask, coverage
|
|
|
|
def _is_occluded_mouth(self, candidate, reference, gray, track_state):
|
|
if candidate is None:
|
|
return False
|
|
occluded = False
|
|
# Geometrie-Pruefung: Mund-/Nasenverhaeltnis sollte ueber Frames stabil bleiben.
|
|
ref_geo = self._kps_geometry(reference.get("kps") if reference else None)
|
|
cur_geo = self._kps_geometry(candidate.get("kps"))
|
|
if ref_geo and cur_geo:
|
|
ref_mouth = max(1e-6, ref_geo["mouth_width"])
|
|
ref_drop = max(1e-6, ref_geo["mouth_drop"])
|
|
mouth_ratio = cur_geo["mouth_width"] / ref_mouth
|
|
drop_ratio = cur_geo["mouth_drop"] / ref_drop
|
|
if mouth_ratio < self.video_occlusion_mouth_ratio_min or drop_ratio < self.video_occlusion_mouth_ratio_min:
|
|
occluded = True
|
|
if cur_geo:
|
|
if (cur_geo["mouth_width"] < self.video_abs_mouth_width_min or
|
|
cur_geo["mouth_drop"] < self.video_abs_mouth_drop_min):
|
|
occluded = True
|
|
# Textur-Pruefung: Bei Hand vor Mund bricht Detail im unteren Gesichtsbereich stark ein.
|
|
texture_ratio = self._face_texture_ratio(gray, candidate["bbox"])
|
|
if texture_ratio is not None:
|
|
tex_ref = track_state.get("texture_ref")
|
|
if tex_ref is not None and texture_ratio < tex_ref * self.video_occlusion_texture_drop:
|
|
occluded = True
|
|
if not occluded:
|
|
if tex_ref is None:
|
|
track_state["texture_ref"] = texture_ratio
|
|
else:
|
|
track_state["texture_ref"] = 0.90 * float(tex_ref) + 0.10 * texture_ratio
|
|
return occluded
|
|
|
|
@staticmethod
|
|
def _bbox_iou(a, b):
|
|
ax1, ay1, ax2, ay2 = a
|
|
bx1, by1, bx2, by2 = b
|
|
ix1 = max(ax1, bx1)
|
|
iy1 = max(ay1, by1)
|
|
ix2 = min(ax2, bx2)
|
|
iy2 = min(ay2, by2)
|
|
iw = max(0.0, ix2 - ix1)
|
|
ih = max(0.0, iy2 - iy1)
|
|
inter = iw * ih
|
|
if inter <= 0:
|
|
return 0.0
|
|
a_area = max(1e-9, (ax2 - ax1) * (ay2 - ay1))
|
|
b_area = max(1e-9, (bx2 - bx1) * (by2 - by1))
|
|
return inter / max(1e-9, a_area + b_area - inter)
|
|
|
|
def _filter_video_faces(self, faces, frame_shape, track_state):
|
|
h, w = frame_shape[:2]
|
|
active = track_state.get("active")
|
|
track_state["candidate"] = None
|
|
candidates = []
|
|
# When a face is already being tracked we accept lower-confidence detections.
|
|
# A hand partially covering the tracked face drops the detector score, but
|
|
# the face is still there and should be swapped.
|
|
effective_min_score = (self.video_min_det_score * 0.5
|
|
if active is not None else self.video_min_det_score)
|
|
for face in faces:
|
|
entry = self._build_video_face_entry(face, w, h)
|
|
if entry["px_w"] < self.video_min_face_size_px or entry["px_h"] < self.video_min_face_size_px:
|
|
continue
|
|
if entry["det_score"] < effective_min_score:
|
|
continue
|
|
candidates.append(entry)
|
|
if not candidates:
|
|
track_state["miss"] = int(track_state.get("miss", 0)) + 1
|
|
if track_state["miss"] >= self.video_track_memory:
|
|
track_state["active"] = None
|
|
track_state["texture_ref"] = None
|
|
track_state["hold"] = 0
|
|
return []
|
|
candidates.sort(key=lambda c: (c["area"], c["det_score"]), reverse=True)
|
|
|
|
if active is None:
|
|
seed = None
|
|
for c in candidates:
|
|
if c["det_score"] >= self.video_start_det_score:
|
|
seed = c
|
|
break
|
|
if seed is None:
|
|
track_state["miss"] = int(track_state.get("miss", 0)) + 1
|
|
return []
|
|
track_state["candidate"] = seed
|
|
track_state["miss"] = 0
|
|
return [seed["face"]]
|
|
|
|
best = None
|
|
best_score = -1.0
|
|
# When a face is already tracked, allow a smaller detected area (hand can
|
|
# shrink the visible face region by up to 70 % without losing the track).
|
|
effective_min_area = (self.video_min_area_ratio * 0.30
|
|
if active is not None else self.video_min_area_ratio)
|
|
for c in candidates:
|
|
iou = self._bbox_iou(c["bbox"], active["bbox"])
|
|
dx = c["cx"] - active["cx"]
|
|
dy = c["cy"] - active["cy"]
|
|
center_shift = (dx * dx + dy * dy) ** 0.5
|
|
area_ratio = c["area"] / max(1e-9, active["area"])
|
|
if not (effective_min_area <= area_ratio <= self.video_max_area_ratio):
|
|
continue
|
|
if iou < self.video_min_iou and center_shift > self.video_max_center_jump:
|
|
continue
|
|
emb_sim = self._embedding_similarity(c["embedding"], active["embedding"])
|
|
if emb_sim is not None and emb_sim < self.video_min_embed_sim:
|
|
continue
|
|
kps_sim = self._kps_similarity(c["kps"], active["kps"])
|
|
iou_score = max(0.0, min(1.0, iou / 0.35))
|
|
motion_score = max(0.0, min(1.0, 1.0 - center_shift / max(self.video_max_center_jump, 1e-6)))
|
|
det_score = max(0.0, min(1.0, c["det_score"]))
|
|
emb_score = 0.5 if emb_sim is None else max(0.0, min(1.0, (emb_sim + 1.0) * 0.5))
|
|
kps_score = 0.5 if kps_sim is None else kps_sim
|
|
track_score = (0.30 * iou_score + 0.24 * motion_score + 0.16 * det_score
|
|
+ 0.20 * emb_score + 0.10 * kps_score)
|
|
if emb_sim is None and c["det_score"] < self.video_new_face_det_score:
|
|
track_score *= 0.86
|
|
if track_score > best_score:
|
|
best_score = track_score
|
|
best = c
|
|
|
|
if best is None or best_score < self.video_track_accept_score:
|
|
track_state["miss"] = int(track_state.get("miss", 0)) + 1
|
|
if track_state["miss"] >= self.video_track_memory:
|
|
track_state["active"] = None
|
|
track_state["texture_ref"] = None
|
|
track_state["hold"] = 0
|
|
return []
|
|
|
|
track_state["candidate"] = best
|
|
track_state["miss"] = 0
|
|
return [best["face"]]
|
|
|
|
def _detect_faces(self, frame):
|
|
import cv2
|
|
faces = self.app.get(frame)
|
|
if faces or not self.high_sensitivity:
|
|
return faces, frame, 1.0
|
|
h, w = frame.shape[:2]
|
|
short_edge = min(h, w)
|
|
if short_edge < 720: scale = 2.0
|
|
elif short_edge < 1080: scale = 1.5
|
|
else: scale = 1.25
|
|
up_w = max(2, int(round(w * scale)))
|
|
up_h = max(2, int(round(h * scale)))
|
|
max_side = max(up_w, up_h)
|
|
if max_side > 1920:
|
|
clamp = 1920.0 / max_side
|
|
up_w = max(2, int(round(up_w * clamp)))
|
|
up_h = max(2, int(round(up_h * clamp)))
|
|
scale *= clamp
|
|
upscaled = None
|
|
if scale > 1.01:
|
|
upscaled = cv2.resize(frame, (up_w, up_h), interpolation=cv2.INTER_CUBIC)
|
|
faces = self.app.get(upscaled)
|
|
if faces:
|
|
return faces, upscaled, scale
|
|
|
|
# Low-threshold fallback: faces partially covered by a hand have reduced
|
|
# detector confidence. Drop the threshold temporarily to find them.
|
|
det_model = getattr(self.app, 'det_model', None)
|
|
if det_model is None:
|
|
# Older insightface builds store the model in self.app.models dict
|
|
for _m in getattr(self.app, 'models', {}).values():
|
|
if hasattr(_m, 'det_thresh'):
|
|
det_model = _m
|
|
break
|
|
if det_model is not None and hasattr(det_model, 'det_thresh'):
|
|
orig_thresh = det_model.det_thresh
|
|
try:
|
|
det_model.det_thresh = 0.10 # much lower so partially-occluded faces are found
|
|
faces = self.app.get(frame)
|
|
if not faces and upscaled is not None:
|
|
faces_up = self.app.get(upscaled)
|
|
if faces_up:
|
|
return faces_up, upscaled, scale
|
|
finally:
|
|
det_model.det_thresh = orig_thresh # always restore
|
|
if faces:
|
|
return faces, frame, 1.0
|
|
|
|
if upscaled is not None:
|
|
return [], upscaled, scale
|
|
return [], frame, 1.0
|
|
|
|
def _swap_faces_in_frame(self, frame, src_face, track_state=None):
|
|
import cv2, numpy as np
|
|
def _note_drop(work_img):
|
|
if track_state is None:
|
|
return work_img, 0
|
|
track_state["locked_frames"] = 0
|
|
track_state["candidate"] = None
|
|
track_state["alpha"] = max(0.0, float(track_state.get("alpha", 1.0)) - self.video_fade_out_step)
|
|
return self._carry_forward_swap(work_img, track_state)
|
|
|
|
faces, work_img, scale = self._detect_faces(frame)
|
|
if not faces:
|
|
if track_state is not None:
|
|
track_state["miss"] = int(track_state.get("miss", 0)) + 1
|
|
if track_state["miss"] >= self.video_track_memory:
|
|
track_state["active"] = None
|
|
track_state["texture_ref"] = None
|
|
track_state["hold"] = 0
|
|
result, face_count = _note_drop(work_img)
|
|
if scale != 1.0:
|
|
result = cv2.resize(result, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_AREA)
|
|
return result, face_count
|
|
return frame, 0
|
|
if track_state is not None:
|
|
faces = self._filter_video_faces(faces, work_img.shape, track_state)
|
|
if not faces:
|
|
result, face_count = _note_drop(work_img)
|
|
if scale != 1.0:
|
|
result = cv2.resize(result, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_AREA)
|
|
return result, face_count
|
|
|
|
candidate = track_state.get("candidate") if track_state is not None else None
|
|
prev_active = track_state.get("active") if track_state is not None else None
|
|
gray = cv2.cvtColor(work_img, cv2.COLOR_BGR2GRAY) if track_state is not None else None
|
|
mouth_occluded = False
|
|
if track_state is not None:
|
|
hold = int(track_state.get("hold", 0))
|
|
if hold > 0:
|
|
track_state["hold"] = max(0, hold - 1)
|
|
mouth_occluded = True
|
|
if self._is_occluded_mouth(candidate, prev_active, gray, track_state):
|
|
track_state["hold"] = self.video_occlusion_hold
|
|
mouth_occluded = True
|
|
if not mouth_occluded:
|
|
track_state["hold"] = 0
|
|
if candidate is not None:
|
|
track_state["active"] = self._trim_track_entry(candidate)
|
|
track_state["locked_frames"] = int(track_state.get("locked_frames", 0)) + 1
|
|
alpha_prev = float(track_state.get("alpha", 0.0))
|
|
track_state["alpha"] = min(1.0, alpha_prev + self.video_fade_in_step)
|
|
|
|
original = work_img.copy()
|
|
result = work_img.copy()
|
|
swapped_bboxes = []
|
|
for face in faces:
|
|
result = self.swapper.get(result, face, src_face, paste_back=True)
|
|
swapped_bboxes.append(face.bbox)
|
|
if self.color:
|
|
result = _match_face_color(result, original, face.bbox)
|
|
if self.enhance:
|
|
result = _enhance_face_region(result, face.bbox, sharpen=True)
|
|
if self.use_restoration and self.restorer is not None:
|
|
providers = _get_providers()
|
|
if not self._restoration_warned_cpu and "CUDAExecutionProvider" not in providers:
|
|
self.log(" Hinweis: GFPGAN im CPU-Modus ist langsam (ca. 1-3 s pro Frame moeglich).")
|
|
self._restoration_warned_cpu = True
|
|
result = self.restorer.restore_faces(result, swapped_bboxes)
|
|
if track_state is not None:
|
|
result = self._blend_frames(result, original, float(track_state.get("alpha", 1.0)))
|
|
active = track_state.get("active")
|
|
bbox = active.get("bbox") if active else (candidate.get("bbox") if candidate else None)
|
|
occ_mask, coverage = self._occluder_mask_from_reference(original, bbox, track_state)
|
|
# Only run current_occluder_mask when we have a clean reference — otherwise the face's
|
|
# own skin colour triggers false occlusion detection on every frame.
|
|
has_clean_ref = track_state.get("clean_original") is not None
|
|
current_mask, current_coverage = (self._current_occluder_mask(original, bbox)
|
|
if bbox is not None and has_clean_ref else (None, 0.0))
|
|
if current_mask is not None:
|
|
occ_mask = current_mask if occ_mask is None else np.maximum(occ_mask, current_mask)
|
|
coverage = max(coverage, current_coverage)
|
|
if mouth_occluded and bbox is not None and has_clean_ref:
|
|
smooth_mask, smooth_coverage = self._smooth_skin_occluder_mask(original, bbox)
|
|
if smooth_mask is not None:
|
|
occ_mask = smooth_mask if occ_mask is None else np.maximum(occ_mask, smooth_mask)
|
|
coverage = max(coverage, smooth_coverage)
|
|
if occ_mask is not None:
|
|
result = self._apply_float_mask(result, original, occ_mask)
|
|
if candidate is not None:
|
|
track_state["last_bbox"] = candidate["bbox"]
|
|
track_state["last_result"] = result.copy()
|
|
track_state["last_original"] = original.copy()
|
|
if bbox is not None and not mouth_occluded and coverage <= self.video_clean_ref_max_coverage:
|
|
track_state["clean_original"] = original.copy()
|
|
if scale != 1.0:
|
|
result = cv2.resize(result, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_AREA)
|
|
return result, len(faces)
|
|
|
|
def get_first_face(self, img):
|
|
faces, _, _ = self._detect_faces(img)
|
|
return self._pick_primary_face(faces)
|
|
|
|
def swap_image(self, src_face, target, out):
|
|
original = _cv2_imread_unicode(target)
|
|
if original is None:
|
|
return False
|
|
result, face_count = self._swap_faces_in_frame(original, src_face)
|
|
if face_count == 0:
|
|
return False
|
|
return _cv2_imwrite_unicode(out, result)
|
|
|
|
def swap_video(self, src_face, target_video, out_video, progress_cb=None, cancel_check=None):
|
|
import cv2, subprocess, shutil, tempfile
|
|
cap, cap_tmp_copy = _open_videocapture_unicode(target_video, log_fn=self.log)
|
|
if not cap.isOpened():
|
|
raise RuntimeError(f"Video konnte nicht geoeffnet werden: {target_video}")
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 25
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
fd_tmp, tmp_name = tempfile.mkstemp(prefix="faceswap_noaudio_", suffix=".mp4", dir=str(SCRIPT_DIR))
|
|
os.close(fd_tmp)
|
|
tmp_video = Path(tmp_name)
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
|
writer = cv2.VideoWriter(str(tmp_video), fourcc, fps, (width, height))
|
|
if not writer.isOpened():
|
|
cap.release()
|
|
_cleanup_temp_file(cap_tmp_copy)
|
|
_cleanup_temp_file(tmp_video)
|
|
raise RuntimeError(f"Video-Writer konnte nicht geoeffnet werden: {out_video}")
|
|
done = 0; swapped_frames = 0; swapped_faces = 0
|
|
import numpy as np
|
|
try:
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
if cancel_check and cancel_check():
|
|
break
|
|
|
|
# Detect faces directly — no tracking, no occlusion logic
|
|
faces = self.app.get(frame)
|
|
|
|
if done == 0:
|
|
# Log first-frame diagnostics once
|
|
src_emb = getattr(src_face, 'normed_embedding', None)
|
|
self.log(f" [Info] Frame 1: {len(faces)} Gesicht(er) erkannt | "
|
|
f"src_embedding={'OK' if src_emb is not None else 'FEHLT'} | "
|
|
f"Frame {width}x{height}")
|
|
|
|
result = frame.copy()
|
|
for face in faces:
|
|
before = result.copy()
|
|
result = self.swapper.get(result, face, src_face, paste_back=True)
|
|
diff = float(np.abs(result.astype(np.float32) - before.astype(np.float32)).mean())
|
|
if done == 0:
|
|
tgt_emb = getattr(face, 'normed_embedding', None)
|
|
self.log(f" [Info] Swap-Differenz Frame 1: {diff:.4f} | "
|
|
f"tgt_embedding={'OK' if tgt_emb is not None else 'FEHLT'}")
|
|
if diff > 0 and self.color:
|
|
result = _match_face_color(result, frame, face.bbox)
|
|
if diff > 0 and self.enhance:
|
|
result = _enhance_face_region(result, face.bbox, sharpen=True)
|
|
if face is not None:
|
|
swapped_faces += 1
|
|
|
|
writer.write(result)
|
|
if len(faces) > 0:
|
|
swapped_frames += 1
|
|
done += 1
|
|
if progress_cb:
|
|
progress_cb(done, total)
|
|
finally:
|
|
cap.release()
|
|
writer.release()
|
|
_cleanup_temp_file(cap_tmp_copy)
|
|
ffmpeg = shutil.which("ffmpeg")
|
|
if ffmpeg:
|
|
try:
|
|
cmd = [ffmpeg, "-y", "-i", str(tmp_video), "-i", str(target_video),
|
|
"-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0?",
|
|
"-shortest", str(out_video)]
|
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
_cleanup_temp_file(tmp_video)
|
|
except Exception:
|
|
shutil.move(str(tmp_video), str(out_video))
|
|
else:
|
|
shutil.move(str(tmp_video), str(out_video))
|
|
return {"frames_processed": done, "frames_total": total,
|
|
"frames_swapped": swapped_frames, "faces_swapped": swapped_faces}
|
|
|
|
def swap_webcam(
|
|
self,
|
|
src_face,
|
|
camera_index=0,
|
|
record_path=None,
|
|
fps_target=25.0,
|
|
cancel_check=None,
|
|
frame_cb=None,
|
|
stats_cb=None,
|
|
use_tracking=False,
|
|
):
|
|
import cv2
|
|
import time
|
|
import tempfile
|
|
|
|
cap = cv2.VideoCapture(int(camera_index))
|
|
if not cap.isOpened():
|
|
raise RuntimeError(f"Kamera konnte nicht geoeffnet werden (Index {camera_index}).")
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
|
|
src_fps = cap.get(cv2.CAP_PROP_FPS) or float(fps_target or 25.0)
|
|
|
|
writer = None
|
|
record_tmp = None
|
|
if record_path:
|
|
out_path = Path(record_path)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
|
writer = cv2.VideoWriter(str(out_path), fourcc, src_fps, (width, height))
|
|
if not writer.isOpened():
|
|
try:
|
|
fd_tmp, tmp_name = tempfile.mkstemp(prefix="faceswap_webcam_", suffix=".mp4", dir=str(SCRIPT_DIR))
|
|
os.close(fd_tmp)
|
|
record_tmp = Path(tmp_name)
|
|
writer = cv2.VideoWriter(str(record_tmp), fourcc, src_fps, (width, height))
|
|
except Exception:
|
|
writer = None
|
|
if writer is None or not writer.isOpened():
|
|
writer = None
|
|
_cleanup_temp_file(record_tmp)
|
|
record_tmp = None
|
|
self.log(f"WARNUNG: Aufnahme konnte nicht gestartet werden: {record_path}")
|
|
else:
|
|
self.log(" Hinweis: Unicode-Ausgabe-Fallback aktiv (temp Datei).")
|
|
|
|
frame_count = 0
|
|
track_state = {} if use_tracking else None
|
|
fps_smooth = 0.0
|
|
try:
|
|
while True:
|
|
if cancel_check and cancel_check():
|
|
break
|
|
started = time.perf_counter()
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
result, face_count = self._swap_faces_in_frame(frame, src_face, track_state=track_state)
|
|
if frame_cb:
|
|
frame_cb(result)
|
|
if writer is not None:
|
|
writer.write(result)
|
|
|
|
frame_count += 1
|
|
elapsed = max(1e-6, time.perf_counter() - started)
|
|
inst_fps = 1.0 / elapsed
|
|
fps_smooth = inst_fps if fps_smooth <= 0 else (0.90 * fps_smooth + 0.10 * inst_fps)
|
|
if stats_cb and frame_count % 30 == 0:
|
|
stats_cb(float(fps_smooth), int(face_count))
|
|
|
|
if fps_target and fps_target > 0:
|
|
wait_s = (1.0 / float(fps_target)) - elapsed
|
|
if wait_s > 0:
|
|
time.sleep(min(wait_s, 0.02))
|
|
finally:
|
|
cap.release()
|
|
if writer is not None:
|
|
writer.release()
|
|
if record_tmp is not None and record_path:
|
|
try:
|
|
shutil.move(str(record_tmp), str(record_path))
|
|
except Exception as e:
|
|
self.log(f"WARNUNG: Temp-Aufnahme konnte nicht verschoben werden ({e})")
|
|
finally:
|
|
_cleanup_temp_file(record_tmp)
|
|
|
|
|
|
class VoiceCloner:
|
|
XTTS_MODEL = "tts_models/multilingual/multi-dataset/xtts_v2"
|
|
VC_MODEL = "voice_conversion_models/multilingual/vctk/freevc24"
|
|
SUPPORTED_LANGS = (
|
|
"en", "es", "fr", "de", "it", "pt", "pl", "tr", "ru",
|
|
"nl", "cs", "ar", "zh-cn", "ja", "hu", "ko", "hi"
|
|
)
|
|
SUPPORTED_AUDIO = {".wav", ".mp3", ".m4a", ".flac", ".ogg", ".aac", ".wma"}
|
|
|
|
def __init__(self, log_fn=print):
|
|
self.log = log_fn
|
|
self.device = "cpu"
|
|
self.tts = None
|
|
self.vc = None
|
|
|
|
def _load_runtime(self):
|
|
try:
|
|
import torch
|
|
from TTS.api import TTS
|
|
except Exception as e:
|
|
raise RuntimeError(
|
|
"Voice-Cloning ist nicht installiert.\n\n"
|
|
"Bitte installiere zuerst:\n"
|
|
" python -m pip install coqui-tts torch torchaudio\n\n"
|
|
f"Details: {e}"
|
|
) from e
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
return TTS
|
|
|
|
def _ensure_xtts(self):
|
|
if self.tts is not None: return
|
|
TTS = self._load_runtime()
|
|
self.log(f"VOICE: Lade XTTS v2 ({self.device}) ...")
|
|
self.tts = TTS(self.XTTS_MODEL).to(self.device)
|
|
self.log("VOICE: XTTS bereit.")
|
|
|
|
def _ensure_vc(self):
|
|
if self.vc is not None: return
|
|
TTS = self._load_runtime()
|
|
self.log(f"VOICE: Lade FreeVC ({self.device}) ...")
|
|
self.vc = TTS(self.VC_MODEL).to(self.device)
|
|
self.log("VOICE: FreeVC bereit.")
|
|
|
|
@staticmethod
|
|
def _check_audio(path, label):
|
|
p = Path(path)
|
|
if not p.is_file():
|
|
raise RuntimeError(f"{label} nicht gefunden:\n{path}")
|
|
if p.suffix.lower() not in VoiceCloner.SUPPORTED_AUDIO:
|
|
raise RuntimeError(
|
|
f"{label} hat ein nicht unterstuetztes Format: {p.suffix}\n"
|
|
f"Erlaubt: {', '.join(sorted(VoiceCloner.SUPPORTED_AUDIO))}"
|
|
)
|
|
return str(p)
|
|
|
|
def clone_from_text(self, speaker_wav, text, language, out_file):
|
|
if not text.strip(): raise RuntimeError("Bitte Text eingeben.")
|
|
language = (language or "de").strip().lower()
|
|
if language not in self.SUPPORTED_LANGS:
|
|
raise RuntimeError(f"Sprache '{language}' nicht unterstuetzt. Nutze z.B.: {', '.join(self.SUPPORTED_LANGS)}")
|
|
speaker_wav = self._check_audio(speaker_wav, "Referenz-Stimme")
|
|
self._ensure_xtts()
|
|
out_path = Path(out_file)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.tts.tts_to_file(text=text, speaker_wav=speaker_wav, language=language, file_path=str(out_path))
|
|
return str(out_path)
|
|
|
|
def clone_from_audio(self, speaker_wav, source_wav, out_file):
|
|
speaker_wav = self._check_audio(speaker_wav, "Referenz-Stimme")
|
|
source_wav = self._check_audio(source_wav, "Eingabe-Audio")
|
|
self._ensure_vc()
|
|
|
|
import tempfile, math
|
|
try:
|
|
import soundfile as sf
|
|
import numpy as np
|
|
except ImportError:
|
|
raise RuntimeError(
|
|
"Bitte installiere soundfile:\n"
|
|
" python -m pip install soundfile"
|
|
)
|
|
|
|
out_path = Path(out_file)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Audiodatei laden und in Segmente aufteilen
|
|
CHUNK_SEC = 30 # Segmentlänge in Sekunden (bei RAM-Problemen kleiner wählen, z.B. 20)
|
|
data, sr = sf.read(source_wav, always_2d=False)
|
|
chunk_samples = int(CHUNK_SEC * sr)
|
|
total_samples = len(data)
|
|
num_chunks = math.ceil(total_samples / chunk_samples)
|
|
|
|
if num_chunks <= 1:
|
|
# Kurze Datei: direkt verarbeiten
|
|
self.vc.voice_conversion_to_file(
|
|
source_wav=source_wav, target_wav=speaker_wav, file_path=str(out_path)
|
|
)
|
|
return str(out_path)
|
|
|
|
self.log(f"VOICE: Datei zu lang — teile in {num_chunks} Segmente à {CHUNK_SEC}s ...")
|
|
results = []
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
for i in range(num_chunks):
|
|
start = i * chunk_samples
|
|
end = min(start + chunk_samples, total_samples)
|
|
chunk = data[start:end]
|
|
chunk_in = Path(tmpdir) / f"chunk_{i:04d}_in.wav"
|
|
chunk_out = Path(tmpdir) / f"chunk_{i:04d}_out.wav"
|
|
sf.write(str(chunk_in), chunk, sr)
|
|
self.log(f"VOICE: Segment {i+1}/{num_chunks} ...")
|
|
self.vc.voice_conversion_to_file(
|
|
source_wav=str(chunk_in), target_wav=speaker_wav,
|
|
file_path=str(chunk_out)
|
|
)
|
|
out_data, out_sr = sf.read(str(chunk_out))
|
|
results.append((out_data, out_sr))
|
|
|
|
# Segmente zusammenführen
|
|
self.log("VOICE: Füge Segmente zusammen ...")
|
|
target_sr = results[0][1]
|
|
merged = np.concatenate(
|
|
[r if sr == target_sr else r # ggf. Resampling hier einfügen
|
|
for r, sr in results], axis=0
|
|
)
|
|
sf.write(str(out_path), merged, target_sr)
|
|
|
|
return str(out_path)
|
|
|
|
|
|
class MainApp:
|
|
SUPPORTED = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}
|
|
VIDEO_SUPPORTED = {".mp4", ".avi", ".mov", ".mkv", ".wmv", ".webm", ".flv"}
|
|
|
|
# Alle Felder die gespeichert werden
|
|
_CONFIG_KEYS = [
|
|
"source", "input_dir", "output_dir",
|
|
"video_input_dir", "video_output_dir",
|
|
"voice_ref", "voice_source_audio", "voice_output", "voice_language",
|
|
"webcam_index", "webcam_output", "last_library_face",
|
|
"restoration", "webcam_record"
|
|
]
|
|
|
|
def __init__(self):
|
|
import tkinter as tk
|
|
from tkinter import ttk, filedialog, messagebox
|
|
self.tk, self.ttk, self.fd, self.mb = tk, ttk, filedialog, messagebox
|
|
providers = _get_providers()
|
|
self.gpu = "CUDAExecutionProvider" in providers
|
|
self.swapper = FaceSwapper(log_fn=self._log)
|
|
self.voice = VoiceCloner(log_fn=self._log)
|
|
self.library = FaceLibrary(log_fn=self._log)
|
|
self.restorer = FaceRestorer(log_fn=self._log)
|
|
self.swapper.restorer = self.restorer
|
|
self._selected_library_slug = None
|
|
self._library_images = {}
|
|
self._webcam_thread = None
|
|
self._webcam_running = False
|
|
self._webcam_cancel = None
|
|
self._webcam_last_faces = 0
|
|
|
|
self._cfg = _load_config() # Gespeicherte Konfiguration laden
|
|
|
|
self._root_real = tk.Tk()
|
|
self._root_real.title("FaceSwap Batch Tool")
|
|
self._root_real.geometry("760x700")
|
|
self._root_real.resizable(True, True)
|
|
self._root_real.configure(bg="#0d0d12")
|
|
self._root_real.protocol("WM_DELETE_WINDOW", self._on_close)
|
|
|
|
canvas = tk.Canvas(self._root_real, bg="#0d0d12", highlightthickness=0)
|
|
scrollbar = tk.Scrollbar(self._root_real, orient="vertical", command=canvas.yview)
|
|
canvas.configure(yscrollcommand=scrollbar.set)
|
|
scrollbar.pack(side="right", fill="y")
|
|
canvas.pack(side="left", fill="both", expand=True)
|
|
|
|
self._scroll_frame = tk.Frame(canvas, bg="#0d0d12")
|
|
self._scroll_window = canvas.create_window((0, 0), window=self._scroll_frame, anchor="nw")
|
|
|
|
def _on_resize(event):
|
|
canvas.itemconfig(self._scroll_window, width=event.width)
|
|
canvas.bind("<Configure>", _on_resize)
|
|
|
|
def _on_frame_resize(event):
|
|
canvas.configure(scrollregion=canvas.bbox("all"))
|
|
self._scroll_frame.bind("<Configure>", _on_frame_resize)
|
|
|
|
def _on_mousewheel(event):
|
|
canvas.yview_scroll(int(-1 * (event.delta / 120)), "units")
|
|
canvas.bind_all("<MouseWheel>", _on_mousewheel)
|
|
|
|
self.root = self._scroll_frame
|
|
|
|
self._build()
|
|
self._root_real.mainloop()
|
|
|
|
def _build(self):
|
|
tk, ttk = self.tk, self.ttk
|
|
tk.Label(self.root, text="FaceSwap Batch", font=("Courier New", 22, "bold"),
|
|
bg="#0d0d12", fg="#e8d5b7").pack(pady=(20, 3))
|
|
tk.Label(self.root, text="Ersetze Gesichter + klone Stimmen lokal",
|
|
font=("Courier New", 9), bg="#0d0d12", fg="#7a7a9a").pack()
|
|
bc = "#142814" if self.gpu else "#281414"
|
|
bt = "GPU-Modus | CUDA aktiv" if self.gpu else "CPU-Modus"
|
|
bf = "#5aff5a" if self.gpu else "#ff7a5a"
|
|
tk.Label(self.root, text=bt, font=("Courier New", 9, "bold"),
|
|
bg=bc, fg=bf, padx=14, pady=5).pack(pady=(8, 0))
|
|
|
|
# Variablen anlegen und gespeicherte Werte laden
|
|
# These must be created BEFORE trace_add is set on _vars, because
|
|
# _save_now() references them and trace callbacks may fire during setup.
|
|
self._var_voice_mode = tk.StringVar(value=self._cfg.get("voice_mode", "text"))
|
|
self._var_enhance = tk.BooleanVar(value=self._cfg.get("enhance", True))
|
|
self._var_color = tk.BooleanVar(value=self._cfg.get("color", True))
|
|
self._var_restoration = tk.BooleanVar(value=bool(self._cfg.get("restoration", False)))
|
|
self._var_webcam_record = tk.BooleanVar(value=bool(self._cfg.get("webcam_record", False)))
|
|
|
|
self._vars = {}
|
|
for k in self._CONFIG_KEYS:
|
|
v = tk.StringVar(value=self._cfg.get(k, ""))
|
|
v.trace_add("write", lambda *_, key=k: self._on_var_change(key))
|
|
self._vars[k] = v
|
|
if self._vars["restoration"].get():
|
|
self._var_restoration.set(str(self._vars["restoration"].get()).strip().lower() in ("1", "true", "yes", "on"))
|
|
else:
|
|
self._vars["restoration"].set("1" if self._var_restoration.get() else "0")
|
|
if self._vars["webcam_record"].get():
|
|
self._var_webcam_record.set(str(self._vars["webcam_record"].get()).strip().lower() in ("1", "true", "yes", "on"))
|
|
else:
|
|
self._vars["webcam_record"].set("1" if self._var_webcam_record.get() else "0")
|
|
if not self._vars["webcam_index"].get():
|
|
self._vars["webcam_index"].set("0")
|
|
if not self._vars["webcam_output"].get():
|
|
self._vars["webcam_output"].set(str(SCRIPT_DIR / "webcam_recording.mp4"))
|
|
if not self._vars["voice_language"].get():
|
|
self._vars["voice_language"].set("de")
|
|
|
|
self._var_voice_mode.trace_add("write", lambda *_: self._save_now())
|
|
self._var_enhance.trace_add("write", lambda *_: self._save_now())
|
|
self._var_color.trace_add("write", lambda *_: self._save_now())
|
|
self._var_restoration.trace_add("write", lambda *_: self._save_now())
|
|
self._var_webcam_record.trace_add("write", lambda *_: self._save_now())
|
|
|
|
self._section("1 QUELLBILD - Gesicht, das eingefuegt wird")
|
|
self._row("source", self._pick_source)
|
|
self._build_library_panel()
|
|
self._prev_lbl = tk.Label(self.root, bg="#0d0d12")
|
|
self._prev_lbl.pack()
|
|
|
|
# Vorschaubild laden falls Quellbild gespeichert
|
|
if self._vars["source"].get():
|
|
self._load_preview(self._vars["source"].get())
|
|
self._refresh_library_grid()
|
|
|
|
style = ttk.Style(self.root)
|
|
style.theme_use("default")
|
|
style.configure("TNotebook", background="#0d0d12", borderwidth=0)
|
|
style.configure("TNotebook.Tab", background="#1a1a2c", foreground="#8a8aff",
|
|
font=("Courier New", 9, "bold"), padding=(14, 5))
|
|
style.map("TNotebook.Tab",
|
|
background=[("selected", "#0d0d12")],
|
|
foreground=[("selected", "#e8d5b7")])
|
|
nb = ttk.Notebook(self.root)
|
|
nb.pack(fill="x", padx=20, pady=(10, 0))
|
|
|
|
# Tab 1: Bilder
|
|
img_tab = tk.Frame(nb, bg="#0d0d12")
|
|
nb.add(img_tab, text="Bilder (Batch)")
|
|
tk.Label(img_tab, text="2 EINGABE-ORDNER - Bilder, die bearbeitet werden",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(10, 2))
|
|
self._row_in(img_tab, "input_dir", self._pick_indir)
|
|
tk.Label(img_tab, text="3 AUSGABE-ORDNER - Zielort fuer fertige Bilder",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(8, 2))
|
|
self._row_in(img_tab, "output_dir", self._pick_outdir)
|
|
|
|
# Tab 2: Video
|
|
vid_tab = tk.Frame(nb, bg="#0d0d12")
|
|
nb.add(vid_tab, text="Video (Batch)")
|
|
tk.Label(vid_tab, text="2 EINGABE-ORDNER - Videos, die bearbeitet werden",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(10, 2))
|
|
self._row_in(vid_tab, "video_input_dir", self._pick_video_indir)
|
|
tk.Label(vid_tab, text="3 AUSGABE-ORDNER - Zielort fuer fertige Videos (mp4)",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(8, 2))
|
|
self._row_in(vid_tab, "video_output_dir", self._pick_video_outdir)
|
|
ffmpeg_note = "ffmpeg gefunden - Audio wird beibehalten" if shutil.which("ffmpeg") \
|
|
else "Achtung: ffmpeg nicht gefunden - kein Audio im Ausgabevideo"
|
|
ffmpeg_col = "#5aff5a" if shutil.which("ffmpeg") else "#ffaa44"
|
|
tk.Label(vid_tab, text=ffmpeg_note, font=("Courier New", 8),
|
|
bg="#0d0d12", fg=ffmpeg_col).pack(anchor="w", padx=4, pady=(4, 0))
|
|
|
|
# Tab 3: Voice Cloning
|
|
voice_tab = tk.Frame(nb, bg="#0d0d12")
|
|
nb.add(voice_tab, text="Stimme klonen")
|
|
tk.Label(voice_tab, text="2 REFERENZ-STIMME - Audio mit Zielstimme",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(10, 2))
|
|
self._row_in(voice_tab, "voice_ref", self._pick_voice_ref)
|
|
tk.Label(voice_tab, text="3 MODUS",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(8, 2))
|
|
mode_row = tk.Frame(voice_tab, bg="#0d0d12")
|
|
mode_row.pack(fill="x")
|
|
tk.Radiobutton(mode_row, text="Text -> Stimme", value="text", variable=self._var_voice_mode,
|
|
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
|
|
selectcolor="#0d0d12", activebackground="#0d0d12").pack(side="left", padx=(0, 16))
|
|
tk.Radiobutton(mode_row, text="Audio -> Stimme", value="audio", variable=self._var_voice_mode,
|
|
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
|
|
selectcolor="#0d0d12", activebackground="#0d0d12").pack(side="left")
|
|
tk.Label(voice_tab, text="4 TEXT (nur fuer Text-Modus)",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(8, 2))
|
|
self._voice_text = tk.Text(voice_tab, height=4, bg="#14141e", fg="#d8d8f0",
|
|
font=("Courier New", 9), relief="flat", insertbackground="white")
|
|
self._voice_text.pack(fill="x")
|
|
# Gespeicherten Text wiederherstellen
|
|
saved_text = self._cfg.get("voice_text", "")
|
|
if saved_text:
|
|
self._voice_text.insert("1.0", saved_text)
|
|
self._voice_text.bind("<KeyRelease>", lambda e: self._save_now())
|
|
|
|
tk.Label(voice_tab, text="5 EINGABE-AUDIO (nur fuer Audio-Modus)",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(8, 2))
|
|
self._row_in(voice_tab, "voice_source_audio", self._pick_voice_source_audio)
|
|
tk.Label(voice_tab, text="6 SPRACHE (Text-Modus, z.B. de/en/fr)",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(8, 2))
|
|
lang_row = tk.Frame(voice_tab, bg="#14141e")
|
|
lang_row.pack(fill="x", pady=2)
|
|
self._voice_lang = ttk.Combobox(
|
|
lang_row, textvariable=self._vars["voice_language"],
|
|
values=list(VoiceCloner.SUPPORTED_LANGS), state="readonly", font=("Courier New", 9)
|
|
)
|
|
self._voice_lang.pack(side="left", padx=8, pady=6)
|
|
tk.Label(voice_tab, text="7 AUSGABE-AUDIO (.wav)",
|
|
font=("Courier New", 9, "bold"), bg="#0d0d12", fg="#c8a96a",
|
|
anchor="w").pack(fill="x", pady=(8, 2))
|
|
self._row_in(voice_tab, "voice_output", self._pick_voice_out)
|
|
tk.Label(voice_tab, text="Hinweis: Beim ersten Lauf werden Sprachmodelle automatisch geladen.",
|
|
font=("Courier New", 8), bg="#0d0d12", fg="#7a7a9a").pack(anchor="w", padx=4, pady=(4, 0))
|
|
|
|
self._build_webcam_tab(nb)
|
|
|
|
self._nb = nb
|
|
|
|
# Qualitaets-Optionen
|
|
qf = tk.Frame(self.root, bg="#0d0d12")
|
|
qf.pack(fill="x", padx=20, pady=(10, 0))
|
|
tk.Label(qf, text="4 QUALITAETS-OPTIONEN", font=("Courier New", 9, "bold"),
|
|
bg="#0d0d12", fg="#c8a96a").pack(side="left")
|
|
tk.Checkbutton(qf, text="Schaerfen", variable=self._var_enhance,
|
|
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
|
|
selectcolor="#0d0d12", activebackground="#0d0d12",
|
|
command=self._update_quality).pack(side="left", padx=(20, 0))
|
|
tk.Checkbutton(qf, text="Farbanpassung", variable=self._var_color,
|
|
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
|
|
selectcolor="#0d0d12", activebackground="#0d0d12",
|
|
command=self._update_quality).pack(side="left", padx=(12, 0))
|
|
self._chk_restoration = tk.Checkbutton(
|
|
qf, text="Gesichtswiederherstellung (GFPGAN)", variable=self._var_restoration,
|
|
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
|
|
selectcolor="#0d0d12", activebackground="#0d0d12",
|
|
command=self._update_quality
|
|
)
|
|
self._chk_restoration.pack(side="left", padx=(12, 0))
|
|
if not self.restorer.is_available():
|
|
self._chk_restoration.configure(state="disabled", text="GFPGAN nicht installiert")
|
|
self._var_restoration.set(False)
|
|
|
|
style.configure("G.Horizontal.TProgressbar", troughcolor="#101020", background="#3adf6a", thickness=14)
|
|
self._pb = ttk.Progressbar(self.root, length=700, mode="determinate", style="G.Horizontal.TProgressbar")
|
|
self._pb.pack(padx=20, pady=(14, 4))
|
|
self._sv = self.tk.StringVar(value="Bereit.")
|
|
self.tk.Label(self.root, textvariable=self._sv, font=("Courier New", 9),
|
|
bg="#0d0d12", fg="#5a8a6a").pack()
|
|
lf = tk.Frame(self.root, bg="#0d0d12")
|
|
lf.pack(fill="both", expand=True, padx=20, pady=(8, 0))
|
|
self._lb = tk.Text(lf, height=7, bg="#060610", fg="#8aff8a", font=("Courier New", 9),
|
|
relief="flat", insertbackground="#8aff8a")
|
|
sb = tk.Scrollbar(lf, command=self._lb.yview)
|
|
self._lb.configure(yscrollcommand=sb.set)
|
|
self._lb.pack(side="left", fill="both", expand=True)
|
|
sb.pack(side="right", fill="y")
|
|
bf2 = tk.Frame(self.root, bg="#0d0d12")
|
|
bf2.pack(pady=14)
|
|
self._btn = tk.Button(bf2, text="STARTEN", font=("Courier New", 13, "bold"),
|
|
bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a",
|
|
relief="flat", cursor="hand2", padx=22, command=self._start)
|
|
self._btn.pack(side="left", padx=8)
|
|
tk.Button(bf2, text="Setup wiederholen", font=("Courier New", 9),
|
|
bg="#1a1a2c", fg="#8a8aff", activebackground="#252540",
|
|
relief="flat", cursor="hand2", command=self._redo_setup).pack(side="left", padx=8)
|
|
self._update_quality()
|
|
|
|
def _on_var_change(self, key):
|
|
"""Wird aufgerufen wenn sich ein Pfad-Feld aendert -> sofort speichern."""
|
|
self._save_now()
|
|
|
|
def _save_now(self):
|
|
"""Aktuelle Einstellungen in config.json speichern."""
|
|
if not hasattr(self, "_vars"):
|
|
return
|
|
if "restoration" not in self._vars or "webcam_record" not in self._vars:
|
|
return
|
|
restoration_val = "1" if self._var_restoration.get() else "0"
|
|
webcam_record_val = "1" if self._var_webcam_record.get() else "0"
|
|
if self._vars["restoration"].get() != restoration_val:
|
|
self._vars["restoration"].set(restoration_val)
|
|
if self._vars["webcam_record"].get() != webcam_record_val:
|
|
self._vars["webcam_record"].set(webcam_record_val)
|
|
data = {k: self._vars[k].get() for k in self._CONFIG_KEYS}
|
|
data["voice_mode"] = self._var_voice_mode.get()
|
|
data["enhance"] = self._var_enhance.get()
|
|
data["color"] = self._var_color.get()
|
|
data["restoration"] = self._var_restoration.get()
|
|
data["webcam_record"] = self._var_webcam_record.get()
|
|
try:
|
|
data["voice_text"] = self._voice_text.get("1.0", "end-1c")
|
|
except Exception:
|
|
pass
|
|
_save_config(data)
|
|
|
|
def _load_preview(self, path):
|
|
try:
|
|
from PIL import Image, ImageTk
|
|
img = Image.open(path).convert("RGB")
|
|
img.thumbnail((110, 110))
|
|
self._tkimg = ImageTk.PhotoImage(img)
|
|
self._prev_lbl.configure(image=self._tkimg)
|
|
except Exception:
|
|
pass
|
|
|
|
def _section(self, txt):
|
|
self.tk.Label(self.root, text=txt, font=("Courier New", 9, "bold"),
|
|
bg="#0d0d12", fg="#c8a96a", anchor="w").pack(fill="x", padx=20, pady=(12, 2))
|
|
|
|
def _row(self, key, cmd):
|
|
f = self.tk.Frame(self.root, bg="#14141e")
|
|
f.pack(fill="x", padx=20, pady=2)
|
|
self.tk.Entry(f, textvariable=self._vars[key], font=("Courier New", 9),
|
|
bg="#14141e", fg="#d8d8f0", relief="flat",
|
|
insertbackground="white").pack(side="left", padx=8, pady=6, fill="x", expand=True)
|
|
self.tk.Button(f, text="...", bg="#22223c", fg="#d0d0e0", relief="flat",
|
|
cursor="hand2", command=cmd).pack(side="right", padx=4)
|
|
|
|
def _row_in(self, parent, key, cmd):
|
|
f = self.tk.Frame(parent, bg="#14141e")
|
|
f.pack(fill="x", pady=2)
|
|
self.tk.Entry(f, textvariable=self._vars[key], font=("Courier New", 9),
|
|
bg="#14141e", fg="#d8d8f0", relief="flat",
|
|
insertbackground="white").pack(side="left", padx=8, pady=6, fill="x", expand=True)
|
|
self.tk.Button(f, text="...", bg="#22223c", fg="#d0d0e0", relief="flat",
|
|
cursor="hand2", command=cmd).pack(side="right", padx=4)
|
|
|
|
def _build_library_panel(self):
|
|
tk = self.tk
|
|
wrap = tk.Frame(self.root, bg="#0d0d12")
|
|
wrap.pack(fill="x", padx=20, pady=(6, 0))
|
|
self._library_open = False
|
|
self._btn_library_toggle = tk.Button(
|
|
wrap, text="Bibliothek oeffnen", font=("Courier New", 9),
|
|
bg="#1a1a2c", fg="#8a8aff", activebackground="#252540",
|
|
relief="flat", cursor="hand2", command=self._toggle_library_panel
|
|
)
|
|
self._btn_library_toggle.pack(anchor="w", pady=(0, 4))
|
|
|
|
self._library_panel = tk.Frame(wrap, bg="#11111a", bd=1, relief="flat")
|
|
top = tk.Frame(self._library_panel, bg="#11111a")
|
|
top.pack(fill="x", padx=8, pady=(8, 4))
|
|
tk.Label(top, text="Name:", font=("Courier New", 9), bg="#11111a", fg="#d8d8f0").pack(side="left")
|
|
self._var_library_name = tk.StringVar(value="")
|
|
tk.Entry(top, textvariable=self._var_library_name, font=("Courier New", 9),
|
|
bg="#14141e", fg="#d8d8f0", relief="flat",
|
|
insertbackground="white", width=18).pack(side="left", padx=8)
|
|
tk.Button(top, text="+ Hinzufuegen", font=("Courier New", 9),
|
|
bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a",
|
|
relief="flat", cursor="hand2", command=self._library_add_from_file).pack(side="left", padx=(0, 6))
|
|
tk.Button(top, text="Als Quelle verwenden", font=("Courier New", 9),
|
|
bg="#22223c", fg="#d0d0e0", activebackground="#2e2e4e",
|
|
relief="flat", cursor="hand2", command=self._library_use_selected).pack(side="left", padx=(8, 6))
|
|
tk.Button(top, text="Loeschen", font=("Courier New", 9),
|
|
bg="#3a1a1a", fg="#ffb0b0", activebackground="#4a2222",
|
|
relief="flat", cursor="hand2", command=self._library_delete_selected).pack(side="left")
|
|
|
|
self._library_grid = tk.Frame(self._library_panel, bg="#11111a")
|
|
self._library_grid.pack(fill="x", padx=8, pady=(4, 8))
|
|
self._library_placeholder = tk.Label(
|
|
self._library_grid, text="Noch keine Gesichter gespeichert.",
|
|
font=("Courier New", 9), bg="#11111a", fg="#7a7a9a"
|
|
)
|
|
self._library_placeholder.grid(row=0, column=0, sticky="w")
|
|
|
|
def _toggle_library_panel(self):
|
|
self._library_open = not self._library_open
|
|
if self._library_open:
|
|
self._library_panel.pack(fill="x")
|
|
self._btn_library_toggle.configure(text="Bibliothek schliessen")
|
|
self._refresh_library_grid()
|
|
else:
|
|
self._library_panel.pack_forget()
|
|
self._btn_library_toggle.configure(text="Bibliothek oeffnen")
|
|
|
|
def _refresh_library_grid(self):
|
|
if not hasattr(self, "_library_grid"):
|
|
return
|
|
for child in list(self._library_grid.winfo_children()):
|
|
child.destroy()
|
|
self._library_images = {}
|
|
entries = self.library.list_entries()
|
|
if not entries:
|
|
self._library_placeholder = self.tk.Label(
|
|
self._library_grid, text="Noch keine Gesichter gespeichert.",
|
|
font=("Courier New", 9), bg="#11111a", fg="#7a7a9a"
|
|
)
|
|
self._library_placeholder.grid(row=0, column=0, sticky="w")
|
|
return
|
|
|
|
from PIL import Image, ImageTk
|
|
|
|
last_slug = (self._vars.get("last_library_face").get().strip()
|
|
if "last_library_face" in self._vars else "")
|
|
if self._selected_library_slug is None and last_slug:
|
|
self._selected_library_slug = last_slug
|
|
|
|
for i, entry in enumerate(entries):
|
|
col = i % 4
|
|
row = i // 4
|
|
cell = self.tk.Frame(self._library_grid, bg="#11111a", bd=0)
|
|
cell.grid(row=row, column=col, padx=6, pady=6, sticky="n")
|
|
|
|
thumb_path = Path(entry["thumb_path"])
|
|
if thumb_path.exists():
|
|
img = Image.open(thumb_path).convert("RGB")
|
|
else:
|
|
img = Image.new("RGB", (96, 96), "#1a1a2c")
|
|
photo = ImageTk.PhotoImage(img)
|
|
self._library_images[entry["slug"]] = photo
|
|
btn = self.tk.Button(
|
|
cell, image=photo, relief="solid", bd=3,
|
|
highlightthickness=0, bg="#11111a", activebackground="#1a1a2c",
|
|
command=lambda slug=entry["slug"]: self._select_library_entry(slug)
|
|
)
|
|
btn.pack()
|
|
self.tk.Label(cell, text=entry["name"], font=("Courier New", 8),
|
|
bg="#11111a", fg="#d8d8f0").pack(pady=(2, 0))
|
|
cell._slug = entry["slug"]
|
|
cell._btn = btn
|
|
|
|
self._select_library_entry(self._selected_library_slug, save=False)
|
|
|
|
def _select_library_entry(self, slug, save=True):
|
|
if slug:
|
|
self._selected_library_slug = slug
|
|
if not hasattr(self, "_library_grid"):
|
|
return
|
|
selected = self._selected_library_slug
|
|
for cell in self._library_grid.winfo_children():
|
|
b = getattr(cell, "_btn", None)
|
|
s = getattr(cell, "_slug", None)
|
|
if b is None:
|
|
continue
|
|
if s == selected:
|
|
b.configure(bg="#28442a")
|
|
else:
|
|
b.configure(bg="#11111a")
|
|
if save and selected:
|
|
self._vars["last_library_face"].set(selected)
|
|
self._save_now()
|
|
|
|
def _library_add_from_file(self):
|
|
p = self.fd.askopenfilename(
|
|
title="Gesicht fuer Bibliothek waehlen",
|
|
filetypes=[("Bilder", "*.jpg *.jpeg *.png *.bmp *.webp"), ("Alle", "*.*")]
|
|
)
|
|
if not p:
|
|
return
|
|
name = self._var_library_name.get().strip() or Path(p).stem
|
|
try:
|
|
entry = self.library.add(name, p)
|
|
self._selected_library_slug = entry["slug"]
|
|
self._vars["last_library_face"].set(entry["slug"])
|
|
self._refresh_library_grid()
|
|
self.mb.showinfo("Bibliothek", f"Gesicht gespeichert: {entry['name']}")
|
|
except Exception as e:
|
|
self.mb.showerror("Bibliothek", str(e))
|
|
|
|
def _library_use_selected(self):
|
|
slug = self._selected_library_slug
|
|
if not slug:
|
|
return self.mb.showerror("Bibliothek", "Bitte zuerst ein Gesicht auswaehlen.")
|
|
try:
|
|
src = self.library.get_source_path(slug)
|
|
self._vars["source"].set(str(src))
|
|
self._load_preview(str(src))
|
|
self._save_now()
|
|
except Exception as e:
|
|
self.mb.showerror("Bibliothek", str(e))
|
|
|
|
def _library_delete_selected(self):
|
|
slug = self._selected_library_slug
|
|
if not slug:
|
|
return self.mb.showerror("Bibliothek", "Bitte zuerst ein Gesicht auswaehlen.")
|
|
if not self.mb.askyesno("Bibliothek", f"Eintrag '{slug}' wirklich loeschen?"):
|
|
return
|
|
try:
|
|
self.library.remove(slug)
|
|
self._selected_library_slug = None
|
|
self._vars["last_library_face"].set("")
|
|
self._refresh_library_grid()
|
|
except Exception as e:
|
|
self.mb.showerror("Bibliothek", str(e))
|
|
|
|
def _build_webcam_tab(self, notebook):
|
|
tk, ttk = self.tk, self.ttk
|
|
tab = tk.Frame(notebook, bg="#0d0d12")
|
|
notebook.add(tab, text="Webcam")
|
|
|
|
row1 = tk.Frame(tab, bg="#0d0d12")
|
|
row1.pack(fill="x", pady=(10, 2))
|
|
tk.Label(row1, text="Kamera-Index", font=("Courier New", 9, "bold"),
|
|
bg="#0d0d12", fg="#c8a96a").pack(side="left", padx=(0, 8))
|
|
self._webcam_index_box = ttk.Combobox(
|
|
row1, textvariable=self._vars["webcam_index"],
|
|
values=[str(i) for i in range(5)], state="readonly", width=6, font=("Courier New", 9)
|
|
)
|
|
self._webcam_index_box.pack(side="left")
|
|
self._webcam_resolution_label = tk.Label(
|
|
row1, text="Aufloesung: -", font=("Courier New", 8), bg="#0d0d12", fg="#7a7a9a"
|
|
)
|
|
self._webcam_resolution_label.pack(side="left", padx=(12, 0))
|
|
|
|
row2 = tk.Frame(tab, bg="#0d0d12")
|
|
row2.pack(fill="x", pady=(8, 2))
|
|
tk.Checkbutton(row2, text="Aufnahme aktiv", variable=self._var_webcam_record,
|
|
font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a",
|
|
selectcolor="#0d0d12", activebackground="#0d0d12",
|
|
command=self._save_now).pack(side="left")
|
|
self._row_in(tab, "webcam_output", self._pick_webcam_out)
|
|
|
|
self._webcam_preview_target = (640, 360)
|
|
self._webcam_preview_collapsed_h = 96
|
|
self._webcam_preview_box = tk.Frame(tab, bg="#05050c", height=self._webcam_preview_collapsed_h)
|
|
self._webcam_preview_box.pack(fill="x", padx=4, pady=(8, 6))
|
|
self._webcam_preview_box.pack_propagate(False)
|
|
self._webcam_preview = tk.Label(
|
|
self._webcam_preview_box, bg="#05050c", fg="#7a7a9a",
|
|
text="Webcam Vorschau (Starten fuer Live-Preview)"
|
|
)
|
|
self._webcam_preview.pack(fill="both", expand=True)
|
|
self._webcam_stats = tk.Label(
|
|
tab, text="FPS: - | Gesichter: -", font=("Courier New", 9),
|
|
bg="#0d0d12", fg="#8a8aff"
|
|
)
|
|
self._webcam_stats.pack(anchor="w", padx=4, pady=(0, 8))
|
|
self._btn_webcam = tk.Button(
|
|
tab, text="Starten", font=("Courier New", 10, "bold"),
|
|
bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a",
|
|
relief="flat", cursor="hand2", command=self._toggle_webcam
|
|
)
|
|
self._btn_webcam.pack(anchor="w", padx=4, pady=(0, 10))
|
|
self._set_webcam_preview_collapsed(True)
|
|
|
|
def _toggle_webcam(self):
|
|
if self._webcam_running:
|
|
self._stop_webcam()
|
|
else:
|
|
self._start_webcam()
|
|
|
|
def _pick_webcam_out(self):
|
|
p = self.fd.asksaveasfilename(
|
|
title="Webcam-Aufnahme speichern",
|
|
defaultextension=".mp4",
|
|
filetypes=[("MP4 Video", "*.mp4"), ("Alle", "*.*")]
|
|
)
|
|
if p:
|
|
self._vars["webcam_output"].set(p)
|
|
|
|
def _set_webcam_preview_collapsed(self, collapsed):
|
|
if not hasattr(self, "_webcam_preview_box"):
|
|
return
|
|
if collapsed:
|
|
self._webcam_preview_box.configure(height=self._webcam_preview_collapsed_h)
|
|
self._webcam_preview.configure(image="", text="Webcam Vorschau (Starten fuer Live-Preview)")
|
|
self._webcam_preview.image = None
|
|
else:
|
|
self._webcam_preview_box.configure(height=int(self._webcam_preview_target[1]))
|
|
|
|
def _start_webcam(self):
|
|
import cv2
|
|
import threading
|
|
|
|
src = self._vars["source"].get().strip()
|
|
if not src or not Path(src).is_file():
|
|
return self.mb.showerror("Webcam", "Bitte zuerst ein gueltiges Quellbild waehlen.")
|
|
try:
|
|
cam_idx = int(self._vars["webcam_index"].get().strip() or "0")
|
|
except Exception:
|
|
return self.mb.showerror("Webcam", "Ungueltiger Kamera-Index.")
|
|
|
|
probe = cv2.VideoCapture(cam_idx)
|
|
if not probe.isOpened():
|
|
probe.release()
|
|
tried = []
|
|
for i in range(5):
|
|
c = cv2.VideoCapture(i)
|
|
ok = c.isOpened()
|
|
c.release()
|
|
if ok:
|
|
tried.append(i)
|
|
return self.mb.showerror(
|
|
"Webcam",
|
|
f"Kamera konnte nicht geoeffnet werden (Index {cam_idx}).\n"
|
|
f"Verfuegbare Indizes: {tried if tried else 'keine'}"
|
|
)
|
|
w = int(probe.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
|
|
h = int(probe.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
|
|
probe.release()
|
|
self._webcam_resolution_label.configure(text=f"Aufloesung: {w}x{h}")
|
|
|
|
self._update_quality()
|
|
try:
|
|
self.swapper.init_models()
|
|
except Exception as e:
|
|
return self.mb.showerror("Webcam", f"Modellfehler: {e}")
|
|
|
|
src_img = _cv2_imread_unicode(src)
|
|
if src_img is None:
|
|
return self.mb.showerror("Webcam", "Quellbild konnte nicht geladen werden.")
|
|
src_face = self.swapper.get_first_face(src_img)
|
|
if src_face is None:
|
|
return self.mb.showerror("Webcam", "Kein Gesicht im Quellbild gefunden!")
|
|
|
|
self._webcam_cancel = threading.Event()
|
|
self._webcam_running = True
|
|
self._btn_webcam.configure(text="Stoppen", bg="#3a1a1a", fg="#ffb0b0", activebackground="#4a2222")
|
|
self._webcam_stats.configure(text="FPS: - | Gesichter: -")
|
|
self._set_webcam_preview_collapsed(False)
|
|
|
|
record_path = self._vars["webcam_output"].get().strip() if self._var_webcam_record.get() else None
|
|
self._webcam_thread = threading.Thread(
|
|
target=self._webcam_worker,
|
|
args=(src_face, cam_idx, record_path),
|
|
daemon=True,
|
|
)
|
|
self._webcam_thread.start()
|
|
|
|
def _stop_webcam(self):
|
|
if self._webcam_cancel is not None:
|
|
self._webcam_cancel.set()
|
|
th = self._webcam_thread
|
|
if th is not None and th.is_alive():
|
|
th.join(timeout=1.5)
|
|
self._webcam_running = False
|
|
self._webcam_thread = None
|
|
self._webcam_cancel = None
|
|
if hasattr(self, "_btn_webcam"):
|
|
self._btn_webcam.configure(text="Starten", bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a")
|
|
if hasattr(self, "_webcam_preview"):
|
|
self._set_webcam_preview_collapsed(True)
|
|
|
|
def _webcam_worker(self, src_face, cam_idx, record_path):
|
|
import cv2
|
|
from PIL import Image
|
|
|
|
def on_frame(frame):
|
|
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
img = Image.fromarray(rgb)
|
|
target_w, target_h = self._webcam_preview_target
|
|
src_w, src_h = img.size
|
|
scale = min(target_w / max(1, src_w), target_h / max(1, src_h))
|
|
new_w = max(1, int(round(src_w * scale)))
|
|
new_h = max(1, int(round(src_h * scale)))
|
|
resampling = getattr(Image, "Resampling", Image).BILINEAR
|
|
fitted = img.resize((new_w, new_h), resampling)
|
|
canvas = Image.new("RGB", (target_w, target_h), (5, 5, 12))
|
|
off_x = (target_w - new_w) // 2
|
|
off_y = (target_h - new_h) // 2
|
|
canvas.paste(fitted, (off_x, off_y))
|
|
self._root_real.after(1, self._apply_webcam_frame, canvas)
|
|
|
|
def on_stats(fps, faces):
|
|
self._webcam_last_faces = int(faces)
|
|
self._root_real.after(1, lambda: self._webcam_stats.configure(text=f"FPS: {fps:.1f} | Gesichter: {int(faces)}"))
|
|
|
|
try:
|
|
self.swapper.swap_webcam(
|
|
src_face=src_face,
|
|
camera_index=cam_idx,
|
|
record_path=record_path,
|
|
fps_target=25.0,
|
|
cancel_check=(self._webcam_cancel.is_set if self._webcam_cancel is not None else None),
|
|
frame_cb=on_frame,
|
|
stats_cb=on_stats,
|
|
use_tracking=False,
|
|
)
|
|
except Exception as e:
|
|
self._root_real.after(1, lambda: self.mb.showerror("Webcam", str(e)))
|
|
finally:
|
|
self._root_real.after(1, self._stop_webcam)
|
|
|
|
def _apply_webcam_frame(self, pil_img):
|
|
from PIL import ImageTk
|
|
if not hasattr(self, "_webcam_preview"):
|
|
return
|
|
photo = ImageTk.PhotoImage(pil_img)
|
|
self._webcam_preview.configure(image=photo, text="")
|
|
self._webcam_preview.image = photo
|
|
|
|
def _pick_source(self):
|
|
p = self.fd.askopenfilename(title="Quellbild waehlen",
|
|
filetypes=[("Bilder", "*.jpg *.jpeg *.png *.bmp *.webp"), ("Alle", "*.*")])
|
|
if not p: return
|
|
self._vars["source"].set(p)
|
|
self._load_preview(p)
|
|
|
|
def _pick_indir(self):
|
|
p = self.fd.askdirectory(title="Eingabe-Ordner waehlen")
|
|
if p: self._vars["input_dir"].set(p)
|
|
|
|
def _pick_outdir(self):
|
|
p = self.fd.askdirectory(title="Ausgabe-Ordner waehlen")
|
|
if p: self._vars["output_dir"].set(p)
|
|
|
|
def _pick_video_indir(self):
|
|
p = self.fd.askdirectory(title="Video-Eingabe-Ordner waehlen")
|
|
if p:
|
|
self._vars["video_input_dir"].set(p)
|
|
if not self._vars["video_output_dir"].get():
|
|
self._vars["video_output_dir"].set(str(Path(p) / "output_videos"))
|
|
|
|
def _pick_video_outdir(self):
|
|
p = self.fd.askdirectory(title="Video-Ausgabe-Ordner waehlen")
|
|
if p: self._vars["video_output_dir"].set(p)
|
|
|
|
def _pick_voice_ref(self):
|
|
p = self.fd.askopenfilename(
|
|
title="Referenz-Stimme waehlen",
|
|
filetypes=[("Audio", "*.wav *.mp3 *.m4a *.flac *.ogg *.aac *.wma"), ("Alle", "*.*")]
|
|
)
|
|
if p:
|
|
self._vars["voice_ref"].set(p)
|
|
if not self._vars["voice_output"].get():
|
|
base = Path(p).with_suffix("").name
|
|
self._vars["voice_output"].set(str(Path(p).parent / f"{base}_cloned.wav"))
|
|
|
|
def _pick_voice_source_audio(self):
|
|
p = self.fd.askopenfilename(
|
|
title="Eingabe-Audio waehlen",
|
|
filetypes=[("Audio", "*.wav *.mp3 *.m4a *.flac *.ogg *.aac *.wma"), ("Alle", "*.*")]
|
|
)
|
|
if p:
|
|
self._vars["voice_source_audio"].set(p)
|
|
if not self._vars["voice_output"].get():
|
|
base = Path(p).with_suffix("").name
|
|
self._vars["voice_output"].set(str(Path(p).parent / f"{base}_voiceclone.wav"))
|
|
|
|
def _pick_voice_out(self):
|
|
p = self.fd.asksaveasfilename(
|
|
title="Ausgabe-Audio speichern",
|
|
defaultextension=".wav",
|
|
filetypes=[("WAV-Audio", "*.wav"), ("Alle", "*.*")]
|
|
)
|
|
if p: self._vars["voice_output"].set(p)
|
|
|
|
def _log(self, msg):
|
|
self._lb.insert("end", msg + "\n")
|
|
self._lb.see("end")
|
|
self._root_real.update_idletasks()
|
|
|
|
def _update_quality(self):
|
|
self.swapper.enhance = self._var_enhance.get()
|
|
self.swapper.color = self._var_color.get()
|
|
allow_restoration = self._var_restoration.get() and self.restorer.is_available()
|
|
self.swapper.use_restoration = bool(allow_restoration)
|
|
self._save_now()
|
|
|
|
def _redo_setup(self):
|
|
if self._webcam_running:
|
|
self._stop_webcam()
|
|
SETUP_FLAG.unlink(missing_ok=True)
|
|
self._root_real.destroy()
|
|
_show_setup_window()
|
|
importlib.invalidate_caches()
|
|
MainApp()
|
|
|
|
def _on_close(self):
|
|
if self._webcam_running:
|
|
self._stop_webcam()
|
|
self._root_real.destroy()
|
|
|
|
def _start(self):
|
|
self._btn.configure(state="disabled")
|
|
import threading
|
|
tab_idx = self._nb.index(self._nb.select())
|
|
if tab_idx == 3:
|
|
self._btn.configure(state="normal")
|
|
self._toggle_webcam()
|
|
return
|
|
if tab_idx == 1:
|
|
target = self._run_video
|
|
elif tab_idx == 2:
|
|
target = self._run_voice
|
|
else:
|
|
target = self._run
|
|
threading.Thread(target=target, daemon=True).start()
|
|
|
|
def _run(self):
|
|
import cv2
|
|
self._update_quality()
|
|
src = self._vars["source"].get().strip()
|
|
indir = self._vars["input_dir"].get().strip()
|
|
outdir = self._vars["output_dir"].get().strip()
|
|
|
|
def err(t, m):
|
|
self.mb.showerror(t, m)
|
|
self._btn.configure(state="normal")
|
|
|
|
if not all([src, indir, outdir]): return err("Fehler", "Bitte alle drei Felder ausfuellen.")
|
|
if not Path(src).is_file(): return err("Fehler", f"Quellbild nicht gefunden:\n{src}")
|
|
if not Path(indir).is_dir(): return err("Fehler", f"Eingabe-Ordner existiert nicht:\n{indir}")
|
|
Path(outdir).mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
self.swapper.init_models()
|
|
except Exception as e:
|
|
return err("Modellfehler", str(e))
|
|
|
|
src_img = _cv2_imread_unicode(src)
|
|
if src_img is None: return err("Fehler", "Quellbild konnte nicht geladen werden.")
|
|
src_face = self.swapper.get_first_face(src_img)
|
|
if src_face is None: return err("Fehler", "Kein Gesicht im Quellbild gefunden!")
|
|
self._log(f"OK Quellgesicht erkannt: {Path(src).name}")
|
|
|
|
images = sorted(p for p in Path(indir).iterdir()
|
|
if p.suffix.lower() in self.SUPPORTED and p.is_file())
|
|
if not images:
|
|
self.mb.showinfo("Keine Bilder", "Keine unterstuetzten Bilder im Eingabe-Ordner.")
|
|
self._btn.configure(state="normal")
|
|
return
|
|
|
|
self._log(f"{len(images)} Bild(er) gefunden ...\n")
|
|
self._pb["maximum"] = len(images)
|
|
self._pb["value"] = 0
|
|
ok = 0; failed = 0
|
|
failed_dir = Path(outdir) / "failed"
|
|
|
|
for i, imgp in enumerate(images, 1):
|
|
outp = Path(outdir) / imgp.name
|
|
self._log(f"[{i}/{len(images)}] {imgp.name}")
|
|
self._sv.set(f"Verarbeite {imgp.name} ({i}/{len(images)}) ...")
|
|
swapped = False
|
|
try:
|
|
swapped = self.swapper.swap_image(src_face, imgp, outp)
|
|
except Exception as e:
|
|
self._log(f" FEHLER: {e}")
|
|
if swapped:
|
|
ok += 1
|
|
self._log(" OK gespeichert")
|
|
else:
|
|
failed_dir.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(imgp, failed_dir / imgp.name)
|
|
failed += 1
|
|
self._log(f" WARNUNG -> failed/{imgp.name} (kein Gesicht erkannt)")
|
|
self._pb["value"] = i
|
|
|
|
summary = f"{ok} erfolgreich"
|
|
if failed: summary += f" | {failed} fehlgeschlagen -> Ordner: failed/"
|
|
self._sv.set(summary)
|
|
self._log(f"\nFertig: {ok}/{len(images)} Bilder bearbeitet.")
|
|
if failed: self._log(f"WARNUNG: {failed} Bild(er) ohne Gesicht -> {failed_dir}")
|
|
self.mb.showinfo("Fertig",
|
|
f"{ok} von {len(images)} erfolgreich.\n"
|
|
+ (f"{failed} ohne Gesicht -> Ordner 'failed'\n" if failed else "")
|
|
+ f"\nAusgabe:\n{outdir}")
|
|
self._btn.configure(state="normal")
|
|
|
|
def _run_video(self):
|
|
import cv2
|
|
self._update_quality()
|
|
src = self._vars["source"].get().strip()
|
|
video_indir = self._vars["video_input_dir"].get().strip()
|
|
video_outdir = self._vars["video_output_dir"].get().strip()
|
|
|
|
def err(t, m):
|
|
self.mb.showerror(t, m)
|
|
self._btn.configure(state="normal")
|
|
|
|
if not all([src, video_indir, video_outdir]):
|
|
return err("Fehler", "Bitte Quellbild, Video-Eingabe-Ordner und Video-Ausgabe-Ordner angeben.")
|
|
if not Path(src).is_file(): return err("Fehler", f"Quellbild nicht gefunden:\n{src}")
|
|
if not Path(video_indir).is_dir(): return err("Fehler", f"Video-Eingabe-Ordner existiert nicht:\n{video_indir}")
|
|
Path(video_outdir).mkdir(parents=True, exist_ok=True)
|
|
|
|
videos = sorted(p for p in Path(video_indir).iterdir()
|
|
if p.is_file() and p.suffix.lower() in self.VIDEO_SUPPORTED)
|
|
if not videos:
|
|
self.mb.showinfo("Keine Videos", "Keine unterstuetzten Videos im Eingabe-Ordner.")
|
|
self._btn.configure(state="normal")
|
|
return
|
|
|
|
try:
|
|
self.swapper.init_models()
|
|
except Exception as e:
|
|
return err("Modellfehler", str(e))
|
|
|
|
src_img = _cv2_imread_unicode(src)
|
|
if src_img is None: return err("Fehler", "Quellbild konnte nicht geladen werden.")
|
|
src_face = self.swapper.get_first_face(src_img)
|
|
if src_face is None: return err("Fehler", "Kein Gesicht im Quellbild gefunden!")
|
|
self._log(f"OK Quellgesicht erkannt: {Path(src).name}")
|
|
self._log(f"{len(videos)} Video(s) im Eingabe-Ordner gefunden.")
|
|
|
|
total_frames = 0
|
|
for vp in videos:
|
|
cap, cap_tmp_copy = _open_videocapture_unicode(vp, log_fn=self._log)
|
|
if cap.isOpened():
|
|
total_frames += max(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 1)
|
|
else:
|
|
total_frames += 1
|
|
cap.release()
|
|
_cleanup_temp_file(cap_tmp_copy)
|
|
|
|
self._pb["maximum"] = max(total_frames, 1)
|
|
self._pb["value"] = 0
|
|
done_total = 0; ok = 0; failed = 0; no_face = 0
|
|
failed_dir = Path(video_outdir) / "failed"
|
|
|
|
for i, vp in enumerate(videos, 1):
|
|
outp = Path(video_outdir) / f"{vp.stem}_faceswap.mp4"
|
|
self._log(f"[{i}/{len(videos)}] {vp.name}")
|
|
self._sv.set(f"Verarbeite Video {i}/{len(videos)}: {vp.name}")
|
|
offset = done_total
|
|
max_total = int(float(self._pb["maximum"]))
|
|
|
|
def on_progress(done, total, name=vp.name, base=offset):
|
|
combined = base + done
|
|
self._pb["value"] = min(combined, max_total)
|
|
self._sv.set(f"{name}: Frame {done}/{max(total, 1)} | Gesamt {combined}/{max_total}")
|
|
self._root_real.update_idletasks()
|
|
|
|
try:
|
|
stats = self.swapper.swap_video(src_face, vp, outp, progress_cb=on_progress)
|
|
done_total += int(stats.get("frames_processed", 0))
|
|
swapped_frames = int(stats.get("frames_swapped", 0))
|
|
swapped_faces = int(stats.get("faces_swapped", 0))
|
|
ok += 1
|
|
if swapped_frames == 0: no_face += 1
|
|
self._log(f" OK gespeichert: {outp.name} (Frames mit Face: {swapped_frames}, getauschte Gesichter: {swapped_faces})")
|
|
except Exception as e:
|
|
failed += 1
|
|
failed_dir.mkdir(parents=True, exist_ok=True)
|
|
try: shutil.copy2(vp, failed_dir / vp.name)
|
|
except Exception: pass
|
|
self._log(f" FEHLER: {e}")
|
|
self._log(f" WARNUNG: Original kopiert nach failed/{vp.name}")
|
|
|
|
summary = f"{ok} Video(s) verarbeitet"
|
|
if no_face: summary += f" | {no_face} ohne erkannten Face-Frame"
|
|
if failed: summary += f" | {failed} fehlgeschlagen"
|
|
self._sv.set(summary)
|
|
self._pb["value"] = self._pb["maximum"]
|
|
self._log(f"\nVideo-Batch fertig: {ok}/{len(videos)} verarbeitet.")
|
|
if no_face: self._log(f"WARNUNG: {no_face} Video(s) hatten keinen erkannten Face-Frame.")
|
|
if failed: self._log(f"FEHLER: {failed} Video(s) fehlgeschlagen -> {failed_dir}")
|
|
self.mb.showinfo("Fertig",
|
|
f"{ok} von {len(videos)} Video(s) verarbeitet.\n"
|
|
+ (f"{no_face} ohne erkannten Face-Frame.\n" if no_face else "")
|
|
+ (f"{failed} fehlgeschlagen -> Ordner 'failed'\n" if failed else "")
|
|
+ f"\nAusgabe:\n{video_outdir}")
|
|
self._btn.configure(state="normal")
|
|
|
|
def _run_voice(self):
|
|
mode = self._var_voice_mode.get().strip().lower()
|
|
ref = self._vars["voice_ref"].get().strip()
|
|
out_file = self._vars["voice_output"].get().strip()
|
|
lang = self._vars["voice_language"].get().strip().lower() or "de"
|
|
text = self._voice_text.get("1.0", "end").strip()
|
|
source_audio = self._vars["voice_source_audio"].get().strip()
|
|
|
|
def err(t, m):
|
|
self.mb.showerror(t, m)
|
|
self._btn.configure(state="normal")
|
|
|
|
if not ref: return err("Fehler", "Bitte eine Referenz-Stimme waehlen.")
|
|
if not out_file: return err("Fehler", "Bitte eine Ausgabe-Audio-Datei waehlen.")
|
|
|
|
self._pb["maximum"] = 100
|
|
self._pb["value"] = 5
|
|
self._sv.set("Starte Voice-Cloning ...")
|
|
self._log("VOICE: Starte Verarbeitung ...")
|
|
|
|
try:
|
|
if mode == "audio":
|
|
if not source_audio: return err("Fehler", "Bitte Eingabe-Audio waehlen (Audio-Modus).")
|
|
self._sv.set("VOICE: Lade Model und konvertiere Audio ...")
|
|
self._pb["value"] = 35
|
|
result = self.voice.clone_from_audio(ref, source_audio, out_file)
|
|
self._pb["value"] = 100
|
|
self._sv.set("OK Voice-Cloning abgeschlossen (Audio-Modus)")
|
|
self._log(f"VOICE: Fertig (Audio-Modus) -> {result}")
|
|
self.mb.showinfo("Fertig", f"Voice-Cloning fertig.\n\nAusgabe:\n{result}")
|
|
else:
|
|
if not text: return err("Fehler", "Bitte Text eingeben (Text-Modus).")
|
|
self._sv.set("VOICE: Lade XTTS und generiere Sprache ...")
|
|
self._pb["value"] = 35
|
|
result = self.voice.clone_from_text(ref, text, lang, out_file)
|
|
self._pb["value"] = 100
|
|
self._sv.set("OK Voice-Cloning abgeschlossen (Text-Modus)")
|
|
self._log(f"VOICE: Fertig (Text-Modus, Sprache={lang}) -> {result}")
|
|
self.mb.showinfo("Fertig", f"Voice-Cloning fertig.\n\nAusgabe:\n{result}")
|
|
except Exception as e:
|
|
self._log(f"VOICE: Fehler: {e}")
|
|
err("Voice-Cloning Fehler", str(e))
|
|
finally:
|
|
self._btn.configure(state="normal")
|
|
|
|
|
|
def main():
|
|
_require_python_version()
|
|
if not SETUP_FLAG.exists():
|
|
_show_setup_window()
|
|
importlib.invalidate_caches()
|
|
MainApp()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|