diff --git a/face_swap.py b/face_swap.py index 2c920c3..442e6c6 100644 --- a/face_swap.py +++ b/face_swap.py @@ -12,6 +12,30 @@ IS_WINDOWS = platform.system() == "Windows" _CUDA_PRELOAD_TRIED = False _PROVIDERS_CACHE = None _DLL_DIR_HANDLES = [] +REQUIRED_PYTHON = (3, 12) + + +def _require_python_version(): + if sys.version_info[:2] == REQUIRED_PYTHON: + return + required = f"{REQUIRED_PYTHON[0]}.{REQUIRED_PYTHON[1]}" + msg = ( + "Falsche Python-Version erkannt.\n\n" + f"Benoetigt: Python {required}\n" + f"Aktuell: Python {sys.version.split()[0]}\n" + f"Interpreter: {sys.executable}\n\n" + f"Bitte so starten:\n py -{required} {Path(__file__).name}" + ) + try: + import tkinter as tk + from tkinter import messagebox + root = tk.Tk() + root.withdraw() + messagebox.showerror("FaceSwap - Python Version", msg, parent=root) + root.destroy() + except Exception: + pass + raise RuntimeError(msg) def _load_config(): @@ -45,6 +69,28 @@ def _cuda_available(): return False, "" +def _ensure_torchvision_functional_tensor_alias(log_fn=None): + try: + import torchvision.transforms.functional_tensor # noqa: F401 + return True + except Exception: + pass + try: + import sys + import types + from torchvision.transforms import functional as tvf + if not hasattr(tvf, "rgb_to_grayscale"): + return False + alias_mod = types.ModuleType("torchvision.transforms.functional_tensor") + alias_mod.rgb_to_grayscale = tvf.rgb_to_grayscale + sys.modules["torchvision.transforms.functional_tensor"] = alias_mod + if log_fn: + log_fn(" Hinweis: torchvision-Kompatibilitaetsmodus fuer GFPGAN aktiv.") + return True + except Exception: + return False + + def _preload_onnxruntime_cuda(log_fn=None): global _CUDA_PRELOAD_TRIED if _CUDA_PRELOAD_TRIED or not IS_WINDOWS: @@ -124,6 +170,94 @@ def _dl(url, dest, log=print): urllib.request.urlretrieve(url, dest, reporthook=hook) +def _cv2_imread_unicode(path, flags=None): + import cv2 + import numpy as np + + if flags is None: + flags = cv2.IMREAD_COLOR + p = str(path) + img = cv2.imread(p, flags) + if img is not None: + return img + try: + data = np.fromfile(p, dtype=np.uint8) + if data.size == 0: + return None + return cv2.imdecode(data, flags) + except Exception: + return None + + +def _cv2_imwrite_unicode(path, img, params=None): + import cv2 + + p = str(path) + if params is None: + params = [] + try: + if cv2.imwrite(p, img, params): + return True + except Exception: + pass + try: + ext = Path(p).suffix or ".png" + ok, buf = cv2.imencode(ext, img, params) + if not ok: + return False + buf.tofile(p) + return True + except Exception: + return False + + +def _path_has_non_ascii(path): + try: + s = str(path) + except Exception: + return False + return any(ord(ch) > 127 for ch in s) + + +def _open_videocapture_unicode(source, log_fn=None): + import cv2 + import tempfile + + if isinstance(source, int): + return cv2.VideoCapture(int(source)), None + p = str(source) + cap = cv2.VideoCapture(p) + if cap.isOpened(): + return cap, None + tmp_copy = None + try: + src_path = Path(p) + if IS_WINDOWS and src_path.is_file() and _path_has_non_ascii(src_path): + fd, tmp_name = tempfile.mkstemp(prefix="faceswap_vid_", suffix=(src_path.suffix or ".mp4"), dir=str(SCRIPT_DIR)) + os.close(fd) + tmp_copy = Path(tmp_name) + shutil.copy2(src_path, tmp_copy) + cap2 = cv2.VideoCapture(str(tmp_copy)) + if cap2.isOpened(): + cap.release() + if log_fn: + log_fn(" Hinweis: Unicode-Video-Fallback aktiv (temp Datei).") + return cap2, tmp_copy + cap2.release() + except Exception: + pass + return cap, None + + +def _cleanup_temp_file(path): + if path is None: + return + try: + Path(path).unlink(missing_ok=True) + except Exception: + pass + + def run_setup(log=print): log("=" * 60) log(" FaceSwap Batch Tool - Modell-Download") @@ -150,6 +284,21 @@ def run_setup(log=print): "Bitte fuehre zuerst setup.bat aus!" ) + log("\nPruefe GFPGAN ...") + has_gfpgan = False + try: + if importlib.util.find_spec("gfpgan") is None: + raise ModuleNotFoundError("gfpgan") + _ensure_torchvision_functional_tensor_alias(log) + importlib.import_module("gfpgan") + has_gfpgan = True + log(" OK gfpgan") + except ModuleNotFoundError: + log(" FEHLT: gfpgan - Funktion deaktiviert (optional)") + except Exception as e: + log(f" FEHLER: gfpgan installiert, aber nicht importierbar ({e})") + log(" Funktion deaktiviert (optional)") + import numpy as np major = int(np.__version__.split(".")[0]) if major >= 2: @@ -208,6 +357,13 @@ def run_setup(log=print): ) log(" OK inswapper_128.onnx") + if has_gfpgan: + try: + FaceRestorer(log_fn=log).ensure_model() + log(" OK GFPGANv1.4.pth") + except Exception as e: + log(f" WARNUNG: GFPGAN-Modell konnte nicht vorbereitet werden ({e})") + SETUP_FLAG.write_text(f"cuda={cuda}\n") log("\nEinrichtung abgeschlossen!") @@ -349,10 +505,337 @@ def _match_face_color(swapped_img, original_img, bbox): return result +class FaceRestorer: + MODEL_URL = "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth" + MODEL_PATH = MODELS_DIR / "GFPGANv1.4.pth" + _FACEXLIB_FILES = { + "detection_Resnet50_Final.pth": "https://github.com/xinntao/facexlib/releases/download/v0.1.0/detection_Resnet50_Final.pth", + "parsing_parsenet.pth": "https://github.com/xinntao/facexlib/releases/download/v0.2.2/parsing_parsenet.pth", + } + + def __init__(self, log_fn=print): + self.log = log_fn + self.restorer = None + self.upscale = 1 + self.only_center_face = False + self.min_face_px = 72 + self.blend_alpha = 0.72 + self.pad_ratio = 0.22 + + @staticmethod + def is_available(log_fn=None): + import importlib + import importlib.util + if importlib.util.find_spec("gfpgan") is None: + return False + _ensure_torchvision_functional_tensor_alias(log_fn) + try: + importlib.import_module("gfpgan") + return True + except Exception: + return False + + def ensure_model(self): + MODELS_DIR.mkdir(exist_ok=True) + if not self.MODEL_PATH.exists() or self.MODEL_PATH.stat().st_size < 100_000: + self.log(" Lade GFPGANv1.4.pth (~330 MB) ...") + _dl(self.MODEL_URL, self.MODEL_PATH, self.log) + self._ensure_facexlib_weights() + + def _ensure_facexlib_weights(self): + if not self.is_available(): + return + try: + import facexlib + weights_dir = Path(facexlib.__file__).resolve().parent / "weights" + weights_dir.mkdir(parents=True, exist_ok=True) + for filename, url in self._FACEXLIB_FILES.items(): + dst = weights_dir / filename + if dst.exists() and dst.stat().st_size > 100_000: + continue + self.log(f" Lade {filename} ...") + _dl(url, dst, self.log) + except Exception as e: + self.log(f" Hinweis: facexlib-Modelle konnten nicht vorab geladen werden ({e}).") + + def load(self): + if self.restorer is not None: + return + if not self.is_available(log_fn=self.log): + raise RuntimeError("GFPGAN ist nicht installiert.") + self.ensure_model() + _ensure_torchvision_functional_tensor_alias(self.log) + from gfpgan import GFPGANer + providers = _get_providers(self.log) + device = "cuda" if "CUDAExecutionProvider" in providers else "cpu" + self.restorer = GFPGANer( + model_path=str(self.MODEL_PATH), + upscale=self.upscale, + arch="clean", + channel_multiplier=2, + bg_upsampler=None, + device=device, + ) + + def restore(self, bgr_img): + import cv2 + import numpy as np + + if bgr_img is None: + return bgr_img + try: + self.load() + _, _, restored = self.restorer.enhance( + bgr_img, + has_aligned=False, + only_center_face=self.only_center_face, + paste_back=True, + ) + if restored is None: + return bgr_img + if restored.shape[:2] != bgr_img.shape[:2]: + restored = cv2.resize(restored, (bgr_img.shape[1], bgr_img.shape[0]), interpolation=cv2.INTER_AREA) + return np.clip(restored, 0, 255).astype(np.uint8) + except Exception as e: + self.log(f" Hinweis: GFPGAN-Restore fehlgeschlagen ({e}).") + return bgr_img + + @staticmethod + def _bbox_to_rect(bbox, img_w, img_h, pad_ratio=0.22): + try: + x1, y1, x2, y2 = [float(v) for v in bbox] + except Exception: + return None + bw = max(1.0, x2 - x1) + bh = max(1.0, y2 - y1) + pad_x = max(8, int(round(bw * pad_ratio))) + pad_y = max(8, int(round(bh * pad_ratio))) + rx1 = max(0, int(round(x1)) - pad_x) + ry1 = max(0, int(round(y1)) - pad_y) + rx2 = min(int(img_w), int(round(x2)) + pad_x) + ry2 = min(int(img_h), int(round(y2)) + pad_y) + if rx2 <= rx1 + 2 or ry2 <= ry1 + 2: + return None + return rx1, ry1, rx2, ry2 + + def restore_faces(self, bgr_img, bboxes): + import cv2 + import numpy as np + + if bgr_img is None: + return bgr_img + if not bboxes: + return bgr_img + try: + self.load() + except Exception as e: + self.log(f" Hinweis: GFPGAN nicht bereit ({e}).") + return bgr_img + + out = bgr_img.copy() + h, w = out.shape[:2] + restored_any = False + for bbox in bboxes: + rect = self._bbox_to_rect(bbox, w, h, pad_ratio=self.pad_ratio) + if rect is None: + continue + x1, y1, x2, y2 = rect + rw = x2 - x1 + rh = y2 - y1 + if rw < self.min_face_px or rh < self.min_face_px: + continue + crop = out[y1:y2, x1:x2] + if crop.size == 0: + continue + try: + _, _, restored = self.restorer.enhance( + crop, + has_aligned=False, + only_center_face=True, + paste_back=True, + ) + except Exception: + continue + if restored is None: + continue + if restored.shape[:2] != crop.shape[:2]: + restored = cv2.resize(restored, (crop.shape[1], crop.shape[0]), interpolation=cv2.INTER_AREA) + alpha = max(0.0, min(1.0, float(self.blend_alpha))) + mixed = cv2.addWeighted(restored.astype(np.float32), alpha, crop.astype(np.float32), 1.0 - alpha, 0.0) + out[y1:y2, x1:x2] = np.clip(mixed, 0, 255).astype(np.uint8) + restored_any = True + if restored_any: + return out + return bgr_img + + +class FaceLibrary: + ROOT = SCRIPT_DIR / "face_library" + + def __init__(self, log_fn=print): + self.log = log_fn + self.ROOT.mkdir(exist_ok=True) + self._detector = None + + @staticmethod + def _slugify(name): + import re + import unicodedata + + raw = (name or "").strip().lower() + if not raw: + raw = "gesicht" + raw = unicodedata.normalize("NFKD", raw) + raw = raw.encode("ascii", "ignore").decode("ascii") + raw = raw.replace(" ", "_") + raw = re.sub(r"[^a-z0-9_]+", "", raw) + raw = re.sub(r"_+", "_", raw).strip("_") + return raw or "gesicht" + + def _ensure_detector(self): + if self._detector is not None: + return self._detector + import cv2 + + cascade_path = Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml" + detector = cv2.CascadeClassifier(str(cascade_path)) + if detector.empty(): + raise RuntimeError("Gesichtsdetektor konnte nicht geladen werden.") + self._detector = detector + return detector + + def _detect_largest_face(self, bgr_img): + import cv2 + + det = self._ensure_detector() + gray = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2GRAY) + faces = det.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30)) + if len(faces) == 0: + return None + x, y, w, h = max(faces, key=lambda f: int(f[2]) * int(f[3])) + return int(x), int(y), int(w), int(h) + + def add(self, name, image_path): + import cv2 + import datetime + + image_path = Path(image_path) + if not image_path.is_file(): + raise RuntimeError(f"Datei nicht gefunden: {image_path}") + + src_img = _cv2_imread_unicode(image_path) + if src_img is None: + raise RuntimeError("Bild konnte nicht geladen werden.") + + h0, w0 = src_img.shape[:2] + max_side = max(h0, w0) + scale = min(1.0, 1920.0 / max(1.0, float(max_side))) + if scale < 1.0: + work = cv2.resize(src_img, (int(round(w0 * scale)), int(round(h0 * scale))), interpolation=cv2.INTER_AREA) + else: + work = src_img + + face = self._detect_largest_face(work) + if face is None: + raise RuntimeError("Kein Gesicht im Bild erkannt.") + fx, fy, fw, fh = face + inv = 1.0 / max(scale, 1e-9) + fx = int(round(fx * inv)); fy = int(round(fy * inv)) + fw = int(round(fw * inv)); fh = int(round(fh * inv)) + + pad_x = int(round(fw * 0.20)) + pad_y = int(round(fh * 0.20)) + x1 = max(0, fx - pad_x); y1 = max(0, fy - pad_y) + x2 = min(w0, fx + fw + pad_x); y2 = min(h0, fy + fh + pad_y) + crop = src_img[y1:y2, x1:x2] + if crop.size == 0: + raise RuntimeError("Kein gueltiger Gesichtsausschnitt gefunden.") + + base_slug = self._slugify(name) + slug = base_slug + idx = 2 + while (self.ROOT / slug).exists(): + slug = f"{base_slug}_{idx}" + idx += 1 + + face_dir = self.ROOT / slug + face_dir.mkdir(parents=True, exist_ok=True) + source_path = face_dir / "source.jpg" + thumb_path = face_dir / "thumb.png" + meta_path = face_dir / "meta.json" + + if not _cv2_imwrite_unicode(source_path, src_img): + raise RuntimeError("source.jpg konnte nicht gespeichert werden.") + thumb = cv2.resize(crop, (96, 96), interpolation=cv2.INTER_AREA) + if not _cv2_imwrite_unicode(thumb_path, thumb): + raise RuntimeError("thumb.png konnte nicht gespeichert werden.") + + meta = { + "slug": slug, + "name": (name or "").strip() or slug, + "added": datetime.date.today().isoformat(), + } + meta_path.write_text(json.dumps(meta, indent=2, ensure_ascii=False), encoding="utf-8") + return { + "slug": slug, + "name": meta["name"], + "source_path": str(source_path), + "thumb_path": str(thumb_path), + "added": meta["added"], + } + + def remove(self, slug): + slug = self._slugify(slug) + target = self.ROOT / slug + if target.exists(): + shutil.rmtree(target, ignore_errors=True) + + def list_entries(self): + entries = [] + if not self.ROOT.exists(): + return entries + for d in self.ROOT.iterdir(): + if not d.is_dir(): + continue + source_path = d / "source.jpg" + meta_path = d / "meta.json" + thumb_path = d / "thumb.png" + if not source_path.exists(): + self.log(f"WARNUNG Bibliothekseintrag ohne source.jpg uebersprungen: {d.name}") + continue + name = d.name + added = "" + if meta_path.exists(): + try: + meta = json.loads(meta_path.read_text(encoding="utf-8")) + name = (meta.get("name") or name).strip() or name + added = (meta.get("added") or "").strip() + except Exception: + pass + entries.append({ + "slug": d.name, + "name": name, + "source_path": str(source_path), + "thumb_path": str(thumb_path), + "added": added, + }) + entries.sort(key=lambda x: x["name"].lower()) + return entries + + def get_source_path(self, slug): + p = self.ROOT / self._slugify(slug) / "source.jpg" + if not p.exists(): + raise RuntimeError(f"Eintrag nicht gefunden: {slug}") + return p + + class FaceSwapper: def __init__(self, log_fn=print): self.log = log_fn self.app = self.swapper = None + self.restorer = None + self.use_restoration = False + self._restoration_warned_cpu = False self.enhance = True self.color = True self.high_sensitivity = True @@ -995,12 +1478,20 @@ class FaceSwapper: original = work_img.copy() result = work_img.copy() + swapped_bboxes = [] for face in faces: result = self.swapper.get(result, face, src_face, paste_back=True) + swapped_bboxes.append(face.bbox) if self.color: result = _match_face_color(result, original, face.bbox) if self.enhance: result = _enhance_face_region(result, face.bbox, sharpen=True) + if self.use_restoration and self.restorer is not None: + providers = _get_providers() + if not self._restoration_warned_cpu and "CUDAExecutionProvider" not in providers: + self.log(" Hinweis: GFPGAN im CPU-Modus ist langsam (ca. 1-3 s pro Frame moeglich).") + self._restoration_warned_cpu = True + result = self.restorer.restore_faces(result, swapped_bboxes) if track_state is not None: result = self._blend_frames(result, original, float(track_state.get("alpha", 1.0))) active = track_state.get("active") @@ -1036,28 +1527,33 @@ class FaceSwapper: return self._pick_primary_face(faces) def swap_image(self, src_face, target, out): - import cv2 - original = cv2.imread(str(target)) + original = _cv2_imread_unicode(target) if original is None: return False result, face_count = self._swap_faces_in_frame(original, src_face) if face_count == 0: return False - cv2.imwrite(str(out), result) - return True + return _cv2_imwrite_unicode(out, result) def swap_video(self, src_face, target_video, out_video, progress_cb=None, cancel_check=None): - import cv2, subprocess, shutil - cap = cv2.VideoCapture(str(target_video)) + import cv2, subprocess, shutil, tempfile + cap, cap_tmp_copy = _open_videocapture_unicode(target_video, log_fn=self.log) if not cap.isOpened(): raise RuntimeError(f"Video konnte nicht geoeffnet werden: {target_video}") fps = cap.get(cv2.CAP_PROP_FPS) or 25 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - tmp_video = Path(out_video).with_suffix(".tmp_noaudio.mp4") + fd_tmp, tmp_name = tempfile.mkstemp(prefix="faceswap_noaudio_", suffix=".mp4", dir=str(SCRIPT_DIR)) + os.close(fd_tmp) + tmp_video = Path(tmp_name) fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(str(tmp_video), fourcc, fps, (width, height)) + if not writer.isOpened(): + cap.release() + _cleanup_temp_file(cap_tmp_copy) + _cleanup_temp_file(tmp_video) + raise RuntimeError(f"Video-Writer konnte nicht geoeffnet werden: {out_video}") done = 0; swapped_frames = 0; swapped_faces = 0 import numpy as np try: @@ -1103,6 +1599,7 @@ class FaceSwapper: finally: cap.release() writer.release() + _cleanup_temp_file(cap_tmp_copy) ffmpeg = shutil.which("ffmpeg") if ffmpeg: try: @@ -1110,14 +1607,99 @@ class FaceSwapper: "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0?", "-shortest", str(out_video)] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - tmp_video.unlink(missing_ok=True) + _cleanup_temp_file(tmp_video) except Exception: - tmp_video.rename(out_video) + shutil.move(str(tmp_video), str(out_video)) else: - tmp_video.rename(out_video) + shutil.move(str(tmp_video), str(out_video)) return {"frames_processed": done, "frames_total": total, "frames_swapped": swapped_frames, "faces_swapped": swapped_faces} + def swap_webcam( + self, + src_face, + camera_index=0, + record_path=None, + fps_target=25.0, + cancel_check=None, + frame_cb=None, + stats_cb=None, + use_tracking=False, + ): + import cv2 + import time + import tempfile + + cap = cv2.VideoCapture(int(camera_index)) + if not cap.isOpened(): + raise RuntimeError(f"Kamera konnte nicht geoeffnet werden (Index {camera_index}).") + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) + src_fps = cap.get(cv2.CAP_PROP_FPS) or float(fps_target or 25.0) + + writer = None + record_tmp = None + if record_path: + out_path = Path(record_path) + out_path.parent.mkdir(parents=True, exist_ok=True) + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + writer = cv2.VideoWriter(str(out_path), fourcc, src_fps, (width, height)) + if not writer.isOpened(): + try: + fd_tmp, tmp_name = tempfile.mkstemp(prefix="faceswap_webcam_", suffix=".mp4", dir=str(SCRIPT_DIR)) + os.close(fd_tmp) + record_tmp = Path(tmp_name) + writer = cv2.VideoWriter(str(record_tmp), fourcc, src_fps, (width, height)) + except Exception: + writer = None + if writer is None or not writer.isOpened(): + writer = None + _cleanup_temp_file(record_tmp) + record_tmp = None + self.log(f"WARNUNG: Aufnahme konnte nicht gestartet werden: {record_path}") + else: + self.log(" Hinweis: Unicode-Ausgabe-Fallback aktiv (temp Datei).") + + frame_count = 0 + track_state = {} if use_tracking else None + fps_smooth = 0.0 + try: + while True: + if cancel_check and cancel_check(): + break + started = time.perf_counter() + ret, frame = cap.read() + if not ret: + break + result, face_count = self._swap_faces_in_frame(frame, src_face, track_state=track_state) + if frame_cb: + frame_cb(result) + if writer is not None: + writer.write(result) + + frame_count += 1 + elapsed = max(1e-6, time.perf_counter() - started) + inst_fps = 1.0 / elapsed + fps_smooth = inst_fps if fps_smooth <= 0 else (0.90 * fps_smooth + 0.10 * inst_fps) + if stats_cb and frame_count % 30 == 0: + stats_cb(float(fps_smooth), int(face_count)) + + if fps_target and fps_target > 0: + wait_s = (1.0 / float(fps_target)) - elapsed + if wait_s > 0: + time.sleep(min(wait_s, 0.02)) + finally: + cap.release() + if writer is not None: + writer.release() + if record_tmp is not None and record_path: + try: + shutil.move(str(record_tmp), str(record_path)) + except Exception as e: + self.log(f"WARNUNG: Temp-Aufnahme konnte nicht verschoben werden ({e})") + finally: + _cleanup_temp_file(record_tmp) + class VoiceCloner: XTTS_MODEL = "tts_models/multilingual/multi-dataset/xtts_v2" @@ -1256,7 +1838,9 @@ class MainApp: _CONFIG_KEYS = [ "source", "input_dir", "output_dir", "video_input_dir", "video_output_dir", - "voice_ref", "voice_source_audio", "voice_output", "voice_language" + "voice_ref", "voice_source_audio", "voice_output", "voice_language", + "webcam_index", "webcam_output", "last_library_face", + "restoration", "webcam_record" ] def __init__(self): @@ -1267,6 +1851,15 @@ class MainApp: self.gpu = "CUDAExecutionProvider" in providers self.swapper = FaceSwapper(log_fn=self._log) self.voice = VoiceCloner(log_fn=self._log) + self.library = FaceLibrary(log_fn=self._log) + self.restorer = FaceRestorer(log_fn=self._log) + self.swapper.restorer = self.restorer + self._selected_library_slug = None + self._library_images = {} + self._webcam_thread = None + self._webcam_running = False + self._webcam_cancel = None + self._webcam_last_faces = 0 self._cfg = _load_config() # Gespeicherte Konfiguration laden @@ -1275,6 +1868,7 @@ class MainApp: self._root_real.geometry("760x700") self._root_real.resizable(True, True) self._root_real.configure(bg="#0d0d12") + self._root_real.protocol("WM_DELETE_WINDOW", self._on_close) canvas = tk.Canvas(self._root_real, bg="#0d0d12", highlightthickness=0) scrollbar = tk.Scrollbar(self._root_real, orient="vertical", command=canvas.yview) @@ -1320,27 +1914,45 @@ class MainApp: self._var_voice_mode = tk.StringVar(value=self._cfg.get("voice_mode", "text")) self._var_enhance = tk.BooleanVar(value=self._cfg.get("enhance", True)) self._var_color = tk.BooleanVar(value=self._cfg.get("color", True)) + self._var_restoration = tk.BooleanVar(value=bool(self._cfg.get("restoration", False))) + self._var_webcam_record = tk.BooleanVar(value=bool(self._cfg.get("webcam_record", False))) self._vars = {} for k in self._CONFIG_KEYS: v = tk.StringVar(value=self._cfg.get(k, "")) v.trace_add("write", lambda *_, key=k: self._on_var_change(key)) self._vars[k] = v + if self._vars["restoration"].get(): + self._var_restoration.set(str(self._vars["restoration"].get()).strip().lower() in ("1", "true", "yes", "on")) + else: + self._vars["restoration"].set("1" if self._var_restoration.get() else "0") + if self._vars["webcam_record"].get(): + self._var_webcam_record.set(str(self._vars["webcam_record"].get()).strip().lower() in ("1", "true", "yes", "on")) + else: + self._vars["webcam_record"].set("1" if self._var_webcam_record.get() else "0") + if not self._vars["webcam_index"].get(): + self._vars["webcam_index"].set("0") + if not self._vars["webcam_output"].get(): + self._vars["webcam_output"].set(str(SCRIPT_DIR / "webcam_recording.mp4")) if not self._vars["voice_language"].get(): self._vars["voice_language"].set("de") self._var_voice_mode.trace_add("write", lambda *_: self._save_now()) self._var_enhance.trace_add("write", lambda *_: self._save_now()) self._var_color.trace_add("write", lambda *_: self._save_now()) + self._var_restoration.trace_add("write", lambda *_: self._save_now()) + self._var_webcam_record.trace_add("write", lambda *_: self._save_now()) self._section("1 QUELLBILD - Gesicht, das eingefuegt wird") self._row("source", self._pick_source) + self._build_library_panel() self._prev_lbl = tk.Label(self.root, bg="#0d0d12") self._prev_lbl.pack() # Vorschaubild laden falls Quellbild gespeichert if self._vars["source"].get(): self._load_preview(self._vars["source"].get()) + self._refresh_library_grid() style = ttk.Style(self.root) style.theme_use("default") @@ -1433,6 +2045,8 @@ class MainApp: tk.Label(voice_tab, text="Hinweis: Beim ersten Lauf werden Sprachmodelle automatisch geladen.", font=("Courier New", 8), bg="#0d0d12", fg="#7a7a9a").pack(anchor="w", padx=4, pady=(4, 0)) + self._build_webcam_tab(nb) + self._nb = nb # Qualitaets-Optionen @@ -1448,6 +2062,16 @@ class MainApp: font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a", selectcolor="#0d0d12", activebackground="#0d0d12", command=self._update_quality).pack(side="left", padx=(12, 0)) + self._chk_restoration = tk.Checkbutton( + qf, text="Gesichtswiederherstellung (GFPGAN)", variable=self._var_restoration, + font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a", + selectcolor="#0d0d12", activebackground="#0d0d12", + command=self._update_quality + ) + self._chk_restoration.pack(side="left", padx=(12, 0)) + if not self.restorer.is_available(): + self._chk_restoration.configure(state="disabled", text="GFPGAN nicht installiert") + self._var_restoration.set(False) style.configure("G.Horizontal.TProgressbar", troughcolor="#101020", background="#3adf6a", thickness=14) self._pb = ttk.Progressbar(self.root, length=700, mode="determinate", style="G.Horizontal.TProgressbar") @@ -1472,6 +2096,7 @@ class MainApp: tk.Button(bf2, text="Setup wiederholen", font=("Courier New", 9), bg="#1a1a2c", fg="#8a8aff", activebackground="#252540", relief="flat", cursor="hand2", command=self._redo_setup).pack(side="left", padx=8) + self._update_quality() def _on_var_change(self, key): """Wird aufgerufen wenn sich ein Pfad-Feld aendert -> sofort speichern.""" @@ -1479,10 +2104,22 @@ class MainApp: def _save_now(self): """Aktuelle Einstellungen in config.json speichern.""" + if not hasattr(self, "_vars"): + return + if "restoration" not in self._vars or "webcam_record" not in self._vars: + return + restoration_val = "1" if self._var_restoration.get() else "0" + webcam_record_val = "1" if self._var_webcam_record.get() else "0" + if self._vars["restoration"].get() != restoration_val: + self._vars["restoration"].set(restoration_val) + if self._vars["webcam_record"].get() != webcam_record_val: + self._vars["webcam_record"].set(webcam_record_val) data = {k: self._vars[k].get() for k in self._CONFIG_KEYS} data["voice_mode"] = self._var_voice_mode.get() data["enhance"] = self._var_enhance.get() data["color"] = self._var_color.get() + data["restoration"] = self._var_restoration.get() + data["webcam_record"] = self._var_webcam_record.get() try: data["voice_text"] = self._voice_text.get("1.0", "end-1c") except Exception: @@ -1521,6 +2158,360 @@ class MainApp: self.tk.Button(f, text="...", bg="#22223c", fg="#d0d0e0", relief="flat", cursor="hand2", command=cmd).pack(side="right", padx=4) + def _build_library_panel(self): + tk = self.tk + wrap = tk.Frame(self.root, bg="#0d0d12") + wrap.pack(fill="x", padx=20, pady=(6, 0)) + self._library_open = False + self._btn_library_toggle = tk.Button( + wrap, text="Bibliothek oeffnen", font=("Courier New", 9), + bg="#1a1a2c", fg="#8a8aff", activebackground="#252540", + relief="flat", cursor="hand2", command=self._toggle_library_panel + ) + self._btn_library_toggle.pack(anchor="w", pady=(0, 4)) + + self._library_panel = tk.Frame(wrap, bg="#11111a", bd=1, relief="flat") + top = tk.Frame(self._library_panel, bg="#11111a") + top.pack(fill="x", padx=8, pady=(8, 4)) + tk.Label(top, text="Name:", font=("Courier New", 9), bg="#11111a", fg="#d8d8f0").pack(side="left") + self._var_library_name = tk.StringVar(value="") + tk.Entry(top, textvariable=self._var_library_name, font=("Courier New", 9), + bg="#14141e", fg="#d8d8f0", relief="flat", + insertbackground="white", width=18).pack(side="left", padx=8) + tk.Button(top, text="+ Hinzufuegen", font=("Courier New", 9), + bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a", + relief="flat", cursor="hand2", command=self._library_add_from_file).pack(side="left", padx=(0, 6)) + tk.Button(top, text="Als Quelle verwenden", font=("Courier New", 9), + bg="#22223c", fg="#d0d0e0", activebackground="#2e2e4e", + relief="flat", cursor="hand2", command=self._library_use_selected).pack(side="left", padx=(8, 6)) + tk.Button(top, text="Loeschen", font=("Courier New", 9), + bg="#3a1a1a", fg="#ffb0b0", activebackground="#4a2222", + relief="flat", cursor="hand2", command=self._library_delete_selected).pack(side="left") + + self._library_grid = tk.Frame(self._library_panel, bg="#11111a") + self._library_grid.pack(fill="x", padx=8, pady=(4, 8)) + self._library_placeholder = tk.Label( + self._library_grid, text="Noch keine Gesichter gespeichert.", + font=("Courier New", 9), bg="#11111a", fg="#7a7a9a" + ) + self._library_placeholder.grid(row=0, column=0, sticky="w") + + def _toggle_library_panel(self): + self._library_open = not self._library_open + if self._library_open: + self._library_panel.pack(fill="x") + self._btn_library_toggle.configure(text="Bibliothek schliessen") + self._refresh_library_grid() + else: + self._library_panel.pack_forget() + self._btn_library_toggle.configure(text="Bibliothek oeffnen") + + def _refresh_library_grid(self): + if not hasattr(self, "_library_grid"): + return + for child in list(self._library_grid.winfo_children()): + child.destroy() + self._library_images = {} + entries = self.library.list_entries() + if not entries: + self._library_placeholder = self.tk.Label( + self._library_grid, text="Noch keine Gesichter gespeichert.", + font=("Courier New", 9), bg="#11111a", fg="#7a7a9a" + ) + self._library_placeholder.grid(row=0, column=0, sticky="w") + return + + from PIL import Image, ImageTk + + last_slug = (self._vars.get("last_library_face").get().strip() + if "last_library_face" in self._vars else "") + if self._selected_library_slug is None and last_slug: + self._selected_library_slug = last_slug + + for i, entry in enumerate(entries): + col = i % 4 + row = i // 4 + cell = self.tk.Frame(self._library_grid, bg="#11111a", bd=0) + cell.grid(row=row, column=col, padx=6, pady=6, sticky="n") + + thumb_path = Path(entry["thumb_path"]) + if thumb_path.exists(): + img = Image.open(thumb_path).convert("RGB") + else: + img = Image.new("RGB", (96, 96), "#1a1a2c") + photo = ImageTk.PhotoImage(img) + self._library_images[entry["slug"]] = photo + btn = self.tk.Button( + cell, image=photo, relief="solid", bd=3, + highlightthickness=0, bg="#11111a", activebackground="#1a1a2c", + command=lambda slug=entry["slug"]: self._select_library_entry(slug) + ) + btn.pack() + self.tk.Label(cell, text=entry["name"], font=("Courier New", 8), + bg="#11111a", fg="#d8d8f0").pack(pady=(2, 0)) + cell._slug = entry["slug"] + cell._btn = btn + + self._select_library_entry(self._selected_library_slug, save=False) + + def _select_library_entry(self, slug, save=True): + if slug: + self._selected_library_slug = slug + if not hasattr(self, "_library_grid"): + return + selected = self._selected_library_slug + for cell in self._library_grid.winfo_children(): + b = getattr(cell, "_btn", None) + s = getattr(cell, "_slug", None) + if b is None: + continue + if s == selected: + b.configure(bg="#28442a") + else: + b.configure(bg="#11111a") + if save and selected: + self._vars["last_library_face"].set(selected) + self._save_now() + + def _library_add_from_file(self): + p = self.fd.askopenfilename( + title="Gesicht fuer Bibliothek waehlen", + filetypes=[("Bilder", "*.jpg *.jpeg *.png *.bmp *.webp"), ("Alle", "*.*")] + ) + if not p: + return + name = self._var_library_name.get().strip() or Path(p).stem + try: + entry = self.library.add(name, p) + self._selected_library_slug = entry["slug"] + self._vars["last_library_face"].set(entry["slug"]) + self._refresh_library_grid() + self.mb.showinfo("Bibliothek", f"Gesicht gespeichert: {entry['name']}") + except Exception as e: + self.mb.showerror("Bibliothek", str(e)) + + def _library_use_selected(self): + slug = self._selected_library_slug + if not slug: + return self.mb.showerror("Bibliothek", "Bitte zuerst ein Gesicht auswaehlen.") + try: + src = self.library.get_source_path(slug) + self._vars["source"].set(str(src)) + self._load_preview(str(src)) + self._save_now() + except Exception as e: + self.mb.showerror("Bibliothek", str(e)) + + def _library_delete_selected(self): + slug = self._selected_library_slug + if not slug: + return self.mb.showerror("Bibliothek", "Bitte zuerst ein Gesicht auswaehlen.") + if not self.mb.askyesno("Bibliothek", f"Eintrag '{slug}' wirklich loeschen?"): + return + try: + self.library.remove(slug) + self._selected_library_slug = None + self._vars["last_library_face"].set("") + self._refresh_library_grid() + except Exception as e: + self.mb.showerror("Bibliothek", str(e)) + + def _build_webcam_tab(self, notebook): + tk, ttk = self.tk, self.ttk + tab = tk.Frame(notebook, bg="#0d0d12") + notebook.add(tab, text="Webcam") + + row1 = tk.Frame(tab, bg="#0d0d12") + row1.pack(fill="x", pady=(10, 2)) + tk.Label(row1, text="Kamera-Index", font=("Courier New", 9, "bold"), + bg="#0d0d12", fg="#c8a96a").pack(side="left", padx=(0, 8)) + self._webcam_index_box = ttk.Combobox( + row1, textvariable=self._vars["webcam_index"], + values=[str(i) for i in range(5)], state="readonly", width=6, font=("Courier New", 9) + ) + self._webcam_index_box.pack(side="left") + self._webcam_resolution_label = tk.Label( + row1, text="Aufloesung: -", font=("Courier New", 8), bg="#0d0d12", fg="#7a7a9a" + ) + self._webcam_resolution_label.pack(side="left", padx=(12, 0)) + + row2 = tk.Frame(tab, bg="#0d0d12") + row2.pack(fill="x", pady=(8, 2)) + tk.Checkbutton(row2, text="Aufnahme aktiv", variable=self._var_webcam_record, + font=("Courier New", 9), bg="#0d0d12", fg="#8aff8a", + selectcolor="#0d0d12", activebackground="#0d0d12", + command=self._save_now).pack(side="left") + self._row_in(tab, "webcam_output", self._pick_webcam_out) + + self._webcam_preview_target = (640, 360) + self._webcam_preview_collapsed_h = 96 + self._webcam_preview_box = tk.Frame(tab, bg="#05050c", height=self._webcam_preview_collapsed_h) + self._webcam_preview_box.pack(fill="x", padx=4, pady=(8, 6)) + self._webcam_preview_box.pack_propagate(False) + self._webcam_preview = tk.Label( + self._webcam_preview_box, bg="#05050c", fg="#7a7a9a", + text="Webcam Vorschau (Starten fuer Live-Preview)" + ) + self._webcam_preview.pack(fill="both", expand=True) + self._webcam_stats = tk.Label( + tab, text="FPS: - | Gesichter: -", font=("Courier New", 9), + bg="#0d0d12", fg="#8a8aff" + ) + self._webcam_stats.pack(anchor="w", padx=4, pady=(0, 8)) + self._btn_webcam = tk.Button( + tab, text="Starten", font=("Courier New", 10, "bold"), + bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a", + relief="flat", cursor="hand2", command=self._toggle_webcam + ) + self._btn_webcam.pack(anchor="w", padx=4, pady=(0, 10)) + self._set_webcam_preview_collapsed(True) + + def _toggle_webcam(self): + if self._webcam_running: + self._stop_webcam() + else: + self._start_webcam() + + def _pick_webcam_out(self): + p = self.fd.asksaveasfilename( + title="Webcam-Aufnahme speichern", + defaultextension=".mp4", + filetypes=[("MP4 Video", "*.mp4"), ("Alle", "*.*")] + ) + if p: + self._vars["webcam_output"].set(p) + + def _set_webcam_preview_collapsed(self, collapsed): + if not hasattr(self, "_webcam_preview_box"): + return + if collapsed: + self._webcam_preview_box.configure(height=self._webcam_preview_collapsed_h) + self._webcam_preview.configure(image="", text="Webcam Vorschau (Starten fuer Live-Preview)") + self._webcam_preview.image = None + else: + self._webcam_preview_box.configure(height=int(self._webcam_preview_target[1])) + + def _start_webcam(self): + import cv2 + import threading + + src = self._vars["source"].get().strip() + if not src or not Path(src).is_file(): + return self.mb.showerror("Webcam", "Bitte zuerst ein gueltiges Quellbild waehlen.") + try: + cam_idx = int(self._vars["webcam_index"].get().strip() or "0") + except Exception: + return self.mb.showerror("Webcam", "Ungueltiger Kamera-Index.") + + probe = cv2.VideoCapture(cam_idx) + if not probe.isOpened(): + probe.release() + tried = [] + for i in range(5): + c = cv2.VideoCapture(i) + ok = c.isOpened() + c.release() + if ok: + tried.append(i) + return self.mb.showerror( + "Webcam", + f"Kamera konnte nicht geoeffnet werden (Index {cam_idx}).\n" + f"Verfuegbare Indizes: {tried if tried else 'keine'}" + ) + w = int(probe.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) + h = int(probe.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) + probe.release() + self._webcam_resolution_label.configure(text=f"Aufloesung: {w}x{h}") + + self._update_quality() + try: + self.swapper.init_models() + except Exception as e: + return self.mb.showerror("Webcam", f"Modellfehler: {e}") + + src_img = _cv2_imread_unicode(src) + if src_img is None: + return self.mb.showerror("Webcam", "Quellbild konnte nicht geladen werden.") + src_face = self.swapper.get_first_face(src_img) + if src_face is None: + return self.mb.showerror("Webcam", "Kein Gesicht im Quellbild gefunden!") + + self._webcam_cancel = threading.Event() + self._webcam_running = True + self._btn_webcam.configure(text="Stoppen", bg="#3a1a1a", fg="#ffb0b0", activebackground="#4a2222") + self._webcam_stats.configure(text="FPS: - | Gesichter: -") + self._set_webcam_preview_collapsed(False) + + record_path = self._vars["webcam_output"].get().strip() if self._var_webcam_record.get() else None + self._webcam_thread = threading.Thread( + target=self._webcam_worker, + args=(src_face, cam_idx, record_path), + daemon=True, + ) + self._webcam_thread.start() + + def _stop_webcam(self): + if self._webcam_cancel is not None: + self._webcam_cancel.set() + th = self._webcam_thread + if th is not None and th.is_alive(): + th.join(timeout=1.5) + self._webcam_running = False + self._webcam_thread = None + self._webcam_cancel = None + if hasattr(self, "_btn_webcam"): + self._btn_webcam.configure(text="Starten", bg="#1a3a2a", fg="#8aff8a", activebackground="#2a5a3a") + if hasattr(self, "_webcam_preview"): + self._set_webcam_preview_collapsed(True) + + def _webcam_worker(self, src_face, cam_idx, record_path): + import cv2 + from PIL import Image + + def on_frame(frame): + rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + img = Image.fromarray(rgb) + target_w, target_h = self._webcam_preview_target + src_w, src_h = img.size + scale = min(target_w / max(1, src_w), target_h / max(1, src_h)) + new_w = max(1, int(round(src_w * scale))) + new_h = max(1, int(round(src_h * scale))) + resampling = getattr(Image, "Resampling", Image).BILINEAR + fitted = img.resize((new_w, new_h), resampling) + canvas = Image.new("RGB", (target_w, target_h), (5, 5, 12)) + off_x = (target_w - new_w) // 2 + off_y = (target_h - new_h) // 2 + canvas.paste(fitted, (off_x, off_y)) + self._root_real.after(1, self._apply_webcam_frame, canvas) + + def on_stats(fps, faces): + self._webcam_last_faces = int(faces) + self._root_real.after(1, lambda: self._webcam_stats.configure(text=f"FPS: {fps:.1f} | Gesichter: {int(faces)}")) + + try: + self.swapper.swap_webcam( + src_face=src_face, + camera_index=cam_idx, + record_path=record_path, + fps_target=25.0, + cancel_check=(self._webcam_cancel.is_set if self._webcam_cancel is not None else None), + frame_cb=on_frame, + stats_cb=on_stats, + use_tracking=False, + ) + except Exception as e: + self._root_real.after(1, lambda: self.mb.showerror("Webcam", str(e))) + finally: + self._root_real.after(1, self._stop_webcam) + + def _apply_webcam_frame(self, pil_img): + from PIL import ImageTk + if not hasattr(self, "_webcam_preview"): + return + photo = ImageTk.PhotoImage(pil_img) + self._webcam_preview.configure(image=photo, text="") + self._webcam_preview.image = photo + def _pick_source(self): p = self.fd.askopenfilename(title="Quellbild waehlen", filetypes=[("Bilder", "*.jpg *.jpeg *.png *.bmp *.webp"), ("Alle", "*.*")]) @@ -1585,22 +2576,38 @@ class MainApp: def _update_quality(self): self.swapper.enhance = self._var_enhance.get() self.swapper.color = self._var_color.get() + allow_restoration = self._var_restoration.get() and self.restorer.is_available() + self.swapper.use_restoration = bool(allow_restoration) self._save_now() def _redo_setup(self): + if self._webcam_running: + self._stop_webcam() SETUP_FLAG.unlink(missing_ok=True) self._root_real.destroy() _show_setup_window() importlib.invalidate_caches() MainApp() + def _on_close(self): + if self._webcam_running: + self._stop_webcam() + self._root_real.destroy() + def _start(self): self._btn.configure(state="disabled") import threading tab_idx = self._nb.index(self._nb.select()) - if tab_idx == 1: target = self._run_video - elif tab_idx == 2: target = self._run_voice - else: target = self._run + if tab_idx == 3: + self._btn.configure(state="normal") + self._toggle_webcam() + return + if tab_idx == 1: + target = self._run_video + elif tab_idx == 2: + target = self._run_voice + else: + target = self._run threading.Thread(target=target, daemon=True).start() def _run(self): @@ -1624,7 +2631,7 @@ class MainApp: except Exception as e: return err("Modellfehler", str(e)) - src_img = cv2.imread(src) + src_img = _cv2_imread_unicode(src) if src_img is None: return err("Fehler", "Quellbild konnte nicht geladen werden.") src_face = self.swapper.get_first_face(src_img) if src_face is None: return err("Fehler", "Kein Gesicht im Quellbild gefunden!") @@ -1702,7 +2709,7 @@ class MainApp: except Exception as e: return err("Modellfehler", str(e)) - src_img = cv2.imread(src) + src_img = _cv2_imread_unicode(src) if src_img is None: return err("Fehler", "Quellbild konnte nicht geladen werden.") src_face = self.swapper.get_first_face(src_img) if src_face is None: return err("Fehler", "Kein Gesicht im Quellbild gefunden!") @@ -1711,12 +2718,13 @@ class MainApp: total_frames = 0 for vp in videos: - cap = cv2.VideoCapture(str(vp)) + cap, cap_tmp_copy = _open_videocapture_unicode(vp, log_fn=self._log) if cap.isOpened(): total_frames += max(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 1) else: total_frames += 1 cap.release() + _cleanup_temp_file(cap_tmp_copy) self._pb["maximum"] = max(total_frames, 1) self._pb["value"] = 0 @@ -1814,6 +2822,7 @@ class MainApp: def main(): + _require_python_version() if not SETUP_FLAG.exists(): _show_setup_window() importlib.invalidate_caches() @@ -1821,4 +2830,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/hay.mp3 b/hay.mp3 new file mode 100644 index 0000000..5709f6a Binary files /dev/null and b/hay.mp3 differ diff --git a/models/GFPGANv1.4.pth b/models/GFPGANv1.4.pth new file mode 100644 index 0000000..a71ee4e Binary files /dev/null and b/models/GFPGANv1.4.pth differ diff --git a/requirements.txt b/requirements.txt index 4cb00e8..f2afce7 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/setup.bat b/setup.bat index ec55997..4bd0609 100644 --- a/setup.bat +++ b/setup.bat @@ -18,9 +18,16 @@ echo. echo Schritt 5: Weitere Pakete installieren... py -3.12 -m pip install "onnx==1.16.2" "protobuf" "ml_dtypes" py -3.12 -m pip install "scipy" "scikit-learn" "scikit-image" "tqdm" "requests" "Pillow" "easydict" "prettytable" "matplotlib" -py -3.12 -m pip install "albumentations==1.3.1" +py -3.12 -m pip install "qudida==0.0.4" --no-deps --force-reinstall +py -3.12 -m pip install "albumentations==1.3.1" --no-deps --force-reinstall py -3.12 -m pip install "coqui-tts" "torch" "torchaudio" echo. +echo Schritt 5b: Optional GFPGAN installieren... +py -3.12 -m pip install "gfpgan" +if errorlevel 1 ( + echo WARNUNG: gfpgan konnte nicht installiert werden - Funktion bleibt optional deaktiviert. +) +echo. echo Schritt 6: insightface Wheel installieren... py -3.12 -m pip install "https://github.com/Gourieff/Assets/raw/main/Insightface/insightface-0.7.3-cp312-cp312-win_amd64.whl" --no-deps echo.