# -*- coding: utf-8 -*- """ AZA Desktop Updater — Startprompt + sichere Installation. Beim Startpanel-Start: freundliche Nachfrage bei neuer Version. Kein sichtbarer Update-Button, kein stilles Auto-Update. """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import threading import tkinter as tk from pathlib import Path from tkinter import messagebox, ttk from typing import Any, Callable from aza_update_core import ( check_update_for_startup, check_update_from_manifest, configure_test_install_dir, create_pre_update_backup, compute_sha256, download_update_package, evaluate_update, extract_update_zip, fetch_remote_manifest, fetch_startup_manifest, format_version_label, get_install_dir, get_project_root, get_test_install_dir, list_available_backups, load_local_version, load_manifest_from_source, log_update_check, resolve_manifest_sources, rollback_from_backup, save_local_version, validate_update_install_ready, get_auto_update_prompt_enabled, set_auto_update_prompt_enabled, save_update_pending, load_update_pending, clear_update_pending, ) def _log(msg: str) -> None: print(f"[AZA Updater] {msg}") log_update_check("updater", message=msg) def _apply_cli_env(install_dir: str | None = None) -> None: if install_dir: configure_test_install_dir(install_dir) return env_dir = (os.environ.get("AZA_UPDATE_TEST_INSTALL_DIR") or "").strip() if env_dir: configure_test_install_dir(env_dir) def check_update_status( *, startup: bool = False, manifest: str | None = None, ) -> dict[str, Any]: """Manifest pruefen — startup=True fuer Startpanel, manifest= fuer CLI/Env.""" if manifest: return check_update_from_manifest(manifest) if startup: env_manifest = (os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip() or None return check_update_for_startup(manifest=env_manifest) local = load_local_version() sources = resolve_manifest_sources() remote, err = fetch_startup_manifest(sources=sources) if err or not remote: return { "status": "error", "message": "Manifest nicht erreichbar.", "detail": err, "local": local, } result = evaluate_update(local, remote) result["detail"] = err return result def _stop_aza_processes(*, include_panel: bool = False) -> None: if sys.platform != "win32": return names = [ "aza_desktop.exe", "aza_controller.exe", "AZA_EmpfangShell.exe", "aza_office.exe", ] if include_panel: names.append("aza_start_panel.exe") _NO_WINDOW = 0x08000000 for name in names: try: subprocess.run( ["taskkill", "/F", "/IM", name], capture_output=True, text=True, timeout=15, creationflags=_NO_WINDOW, ) except Exception: pass def _is_dir_writable(path: Path) -> bool: try: path.mkdir(parents=True, exist_ok=True) probe = path / f".aza_write_test_{os.getpid()}" probe.write_text("x", encoding="utf-8") try: probe.unlink() except OSError: pass return True except Exception: return False def _resolve_installed_updater_exe(target_install: Path) -> Path | None: cand = target_install / "aza_updater.exe" if cand.is_file(): return cand if getattr(sys, "frozen", False): here = Path(sys.executable).resolve().parent cand2 = here / "aza_updater.exe" if cand2.is_file(): return cand2 return None def _copy_updater_to_temp(src_exe: Path) -> Path | None: import tempfile temp_root = Path(tempfile.gettempdir()) / "AzA_Updater_Run" try: temp_root.mkdir(parents=True, exist_ok=True) except Exception as exc: _log(f"temp_root mkdir failed: {type(exc).__name__}") return None dst = temp_root / "aza_updater.exe" try: if dst.is_file(): try: dst.unlink() except Exception: pass shutil.copy2(src_exe, dst) return dst except Exception as exc: _log(f"copy_updater_to_temp failed: {type(exc).__name__}") return None def launch_external_installer( result: dict[str, Any], *, install_dir: Path | None = None, manifest_url: str | None = None, restart_exe: str = "aza_start_panel.exe", ) -> tuple[bool, str]: """ Startet aza_updater.exe als externen Prozess (UAC falls noetig) und gibt die Kontrolle ab. Der externe Updater installiert sicher, weil er aus %TEMP% laeuft und das Startpanel selbst ersetzen kann. """ target_install = (install_dir or get_install_dir()).resolve() src_updater = _resolve_installed_updater_exe(target_install) if src_updater is None: log_update_check("ext_launch_no_updater", target=str(target_install)) return False, "aza_updater.exe nicht gefunden." temp_updater = _copy_updater_to_temp(src_updater) if temp_updater is None: log_update_check("ext_launch_copy_failed") return False, "Updater konnte nicht in TEMP kopiert werden." if not manifest_url: remote = result.get("remote") or {} manifest_url = str(remote.get("_manifest_url") or "").strip() if not manifest_url: manifest_url = (os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip() args = ["--install-now"] if manifest_url: args += ["--manifest", manifest_url] args += ["--target-dir", str(target_install)] if restart_exe: args += ["--restart-exe", restart_exe] use_runas = (sys.platform == "win32") and (not _is_dir_writable(target_install)) log_update_check( "ext_launch", updater=str(temp_updater), target=str(target_install), manifest=manifest_url or "", runas=use_runas, ) try: if use_runas: import ctypes params = " ".join(f'"{a}"' if (" " in a or a == "") else a for a in args) SW_HIDE = 0 ret = ctypes.windll.shell32.ShellExecuteW( # type: ignore[attr-defined] None, "runas", str(temp_updater), params, None, SW_HIDE ) if int(ret) <= 32: log_update_check("ext_launch_uac_failed", code=int(ret)) return False, f"UAC-Start fehlgeschlagen (code={int(ret)})" else: CREATE_NO_WINDOW = 0x08000000 creationflags = CREATE_NO_WINDOW if sys.platform == "win32": creationflags |= ( getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0) | getattr(subprocess, "DETACHED_PROCESS", 0) ) subprocess.Popen( [str(temp_updater)] + args, cwd=str(temp_updater.parent), close_fds=True, creationflags=creationflags, ) except Exception as exc: log_update_check("ext_launch_exception", error=f"{type(exc).__name__}: {exc}"[:160]) return False, f"Updater konnte nicht gestartet werden: {exc}" return True, "ok" def _apply_downloaded_update(package: Path, result: dict[str, Any]) -> tuple[bool, str]: install_dir = get_install_dir() log_update_check("apply_start", install_dir=str(install_dir), package=str(package)) try: backup_dir = create_pre_update_backup(install_dir) _log(f"Backup erstellt: {backup_dir}") log_update_check("apply_backup", backup=str(backup_dir)) except Exception as exc: log_update_check("apply_backup_failed", error=f"{type(exc).__name__}: {exc}"[:160]) return False, "Backup konnte nicht erstellt werden." suffix = package.suffix.lower() if suffix == ".zip": ok, msg = extract_update_zip(package, install_dir) log_update_check("apply_extract", ok=ok, msg=str(msg)[:120]) if not ok: rollback_from_backup(backup_dir, install_dir) log_update_check("apply_rollback_done") return False, f"Installation fehlgeschlagen.\nRollback durchgefuehrt.\nDetail: {msg}" elif suffix == ".exe": target = install_dir / package.name try: shutil.copy2(package, target) log_update_check("apply_copy_exe", target=str(target)) except Exception as exc: log_update_check("apply_copy_failed", error=f"{type(exc).__name__}: {exc}"[:160]) rollback_from_backup(backup_dir, install_dir) return False, f"Installation fehlgeschlagen.\nRollback durchgefuehrt.\nDetail: {exc}" else: return False, "Unbekanntes Update-Format." new_version = { "version": result.get("latest_version"), "build": result.get("latest_build"), "channel": (result.get("local") or {}).get("channel", "stable"), } try: save_local_version({k: v for k, v in new_version.items() if v}) log_update_check("apply_save_version", version=new_version.get("version")) except Exception as exc: _log(f"version.json write failed: {exc}") log_update_check("apply_savever_failed", error=f"{type(exc).__name__}: {exc}"[:160]) return True, "Das Update wurde installiert.\nBitte starten Sie AZA neu." def perform_confirmed_update( result: dict[str, Any], *, parent: tk.Misc | None = None, ) -> tuple[bool, str]: """Download + SHA256 + Backup + Installation — nur bei gueltigem Manifest.""" ready, reason = validate_update_install_ready(result) if not ready: return False, ( "Das Update ist noch nicht vollstaendig verfuegbar.\n" "Bitte versuchen Sie es spaeter erneut." ) files = result.get("files") or [] file_info = files[0] _log("Download starten") package, err = download_update_package(file_info) if not package: _log(f"Download fehlgeschlagen: {err}") return False, ( "Das Update konnte nicht heruntergeladen werden.\n" "Bitte versuchen Sie es spaeter erneut." ) _stop_aza_processes() return _apply_downloaded_update(package, result) def _run_install_now_window( *, manifest: str, target_dir: str | None, restart_exe: str | None, ) -> int: """Externes Update-Fenster im modernen AzA-Stil (Download + Install).""" import time if target_dir: configure_test_install_dir(target_dir) if sys.platform == "win32" and not _is_dir_writable(get_install_dir()): try: import ctypes exe = sys.executable params = " ".join(f'"{a}"' if (" " in a or a == "") else a for a in sys.argv[1:]) SW_HIDE = 0 ret = ctypes.windll.shell32.ShellExecuteW( # type: ignore[attr-defined] None, "runas", exe, params, None, SW_HIDE ) if int(ret) > 32: log_update_check("install_now_self_elevate") return 0 log_update_check("install_now_uac_denied", code=int(ret)) except Exception as exc: log_update_check( "install_now_elevate_exc", error=f"{type(exc).__name__}: {exc}"[:120], ) log_update_check( "install_now_start", manifest=manifest or "", target=target_dir or "", restart=restart_exe or "", ) BG = "#FFFFFF" TRACK = "#E1F6FC" FILL = "#1A8ACC" TEXT_STRONG = "#1A4D6D" SUBTLE = "#8899AA" BTN_BG = "#E8EFF5" PB_WIDTH = 464 PB_HEIGHT = 10 root = tk.Tk() root.title("AzA Update") root.resizable(False, False) try: root.attributes("-alpha", 0.0) except Exception: pass root.configure(bg=BG) def _find_logo_candidates(ext: str) -> list[Path]: """Sucht logo.ext in mehreren Verzeichnissen — robust auch aus %TEMP%.""" cands: list[Path] = [] if getattr(sys, "frozen", False): cands.append(Path(sys.executable).parent / f"logo.{ext}") cands.append(Path(__file__).resolve().parent / f"logo.{ext}") try: install = get_install_dir() cands.append(install / f"logo.{ext}") cands.append(install / "assets" / f"logo.{ext}") except Exception: pass if target_dir: td = Path(target_dir) cands.append(td / f"logo.{ext}") cands.append(td / "assets" / f"logo.{ext}") return cands try: for ico in _find_logo_candidates("ico"): if ico.is_file(): root.iconbitmap(default=str(ico)) break except Exception: pass W, H = 520, 232 try: sw, sh = root.winfo_screenwidth(), root.winfo_screenheight() x = max(0, (sw - W) // 2) y = max(0, (sh - H) // 2 - 60) root.geometry(f"{W}x{H}+{x}+{y}") except Exception: pass container = tk.Frame(root, bg=BG, padx=28, pady=22) container.pack(fill="both", expand=True) header = tk.Frame(container, bg=BG) header.pack(fill="x", pady=(0, 14)) try: for logo_path in _find_logo_candidates("png"): if logo_path.is_file(): logo_tk = tk.PhotoImage(file=str(logo_path)) ratio = max(1, logo_tk.width() // 40) if ratio > 1: logo_tk = logo_tk.subsample(ratio, ratio) _logo_lbl = tk.Label(header, image=logo_tk, bg=BG) _logo_lbl.image = logo_tk # type: ignore[attr-defined] _logo_lbl.pack(side="left", padx=(0, 14)) break except Exception: pass title_col = tk.Frame(header, bg=BG) title_col.pack(side="left", fill="x", expand=True) tk.Label( title_col, text="AzA wird aktualisiert", bg=BG, fg=TEXT_STRONG, font=("Segoe UI", 14, "bold"), anchor="w", ).pack(anchor="w") tk.Label( title_col, text="Bitte einen Moment Geduld.", bg=BG, fg=SUBTLE, font=("Segoe UI", 9), anchor="w", ).pack(anchor="w", pady=(2, 0)) status_var = tk.StringVar(value="Update wird vorbereitet …") tk.Label( container, textvariable=status_var, bg=BG, fg=TEXT_STRONG, font=("Segoe UI", 10), anchor="w", wraplength=PB_WIDTH, justify="left", ).pack(fill="x", pady=(0, 8)) pb_canvas = tk.Canvas( container, width=PB_WIDTH, height=PB_HEIGHT, bg=BG, highlightthickness=0, bd=0, ) pb_canvas.pack(fill="x", pady=(0, 6)) pb_canvas.create_rectangle(0, 0, PB_WIDTH, PB_HEIGHT, fill=TRACK, outline="") fill_rect = pb_canvas.create_rectangle(0, 0, 0, PB_HEIGHT, fill=FILL, outline="") pct_var = tk.StringVar(value="") tk.Label( container, textvariable=pct_var, bg=BG, fg=SUBTLE, font=("Segoe UI", 9), anchor="w", ).pack(anchor="w", pady=(0, 14)) btn_row = tk.Frame(container, bg=BG) btn_row.pack(fill="x") close_btn = tk.Button( btn_row, text="Schließen", state="disabled", command=root.destroy, bg=BTN_BG, fg=TEXT_STRONG, activebackground="#D8E3EE", activeforeground=TEXT_STRONG, font=("Segoe UI", 9), bd=0, padx=18, pady=6, cursor="hand2", relief="flat", ) close_btn.pack(side="right") anim_state = {"mode": "indeterminate", "x": -130, "after_id": None} def _anim_step() -> None: if anim_state["mode"] != "indeterminate": return seg = 140 anim_state["x"] += 6 if anim_state["x"] > PB_WIDTH: anim_state["x"] = -seg x0 = max(0, anim_state["x"]) x1 = min(PB_WIDTH, anim_state["x"] + seg) pb_canvas.coords(fill_rect, x0, 0, x1, PB_HEIGHT) anim_state["after_id"] = root.after(28, _anim_step) def set_indeterminate() -> None: if anim_state["after_id"] is not None: return anim_state["mode"] = "indeterminate" anim_state["x"] = -140 _anim_step() def stop_anim() -> None: if anim_state["after_id"] is not None: try: root.after_cancel(anim_state["after_id"]) except Exception: pass anim_state["after_id"] = None def set_determinate(pct: float) -> None: stop_anim() anim_state["mode"] = "determinate" pct = max(0.0, min(100.0, pct)) w = int(PB_WIDTH * pct / 100.0) pb_canvas.coords(fill_rect, 0, 0, w, PB_HEIGHT) def set_status(text: str) -> None: try: status_var.set(text) except Exception: pass set_indeterminate() root.update_idletasks() try: root.attributes("-alpha", 1.0) except Exception: pass try: root.attributes("-topmost", True) root.after(800, lambda: root.attributes("-topmost", False)) except Exception: pass state: dict[str, Any] = { "ok": False, "message": "", "done": False, "progress": None, } def schedule_progress(done: int, total: int | None) -> None: state["progress"] = (done, total) def pump_progress() -> None: p = state["progress"] if p is not None: done, total = p state["progress"] = None if total and total > 0: pct = 100.0 * done / total set_determinate(pct) pct_var.set(f"Herunterladen: {int(pct)} %") else: mb = done // (1024 * 1024) pct_var.set(f"Heruntergeladen: {mb} MB") if not state["done"]: root.after(120, pump_progress) def worker() -> None: try: time.sleep(0.4) root.after(0, lambda: set_status("Update wird vorbereitet …")) if manifest: result = check_update_from_manifest(manifest) else: result = check_update_status(startup=False) status = result.get("status") log_update_check("install_now_manifest", status=status) if status != "update_available": state["message"] = "Aktuell ist kein Update verfuegbar." return files = result.get("files") or [] if not files: state["message"] = "Das Update-Paket ist noch nicht verfuegbar." return file_info = files[0] root.after(0, lambda: ( set_status("Update wird heruntergeladen …"), pct_var.set("Herunterladen: 0 %"), set_determinate(0.0), )) log_update_check("install_now_download_start", url=file_info.get("url", "")) package, dl_msg = download_update_package(file_info, progress=schedule_progress) log_update_check("install_now_download", ok=bool(package), msg=str(dl_msg)[:120]) if not package: state["message"] = "Das Update konnte nicht heruntergeladen werden." return root.after(0, lambda: ( set_determinate(100.0), pct_var.set(""), set_status( "AzA schließt sich kurz, um das Update fertigzustellen …" ), )) time.sleep(1.8) _stop_aza_processes(include_panel=True) time.sleep(1.4) root.after(0, lambda: ( set_indeterminate(), set_status("Update wird installiert …"), )) ok, msg = _apply_downloaded_update(package, result) log_update_check("install_now_apply", ok=ok, msg=str(msg)[:120]) state["ok"] = ok state["message"] = msg except Exception as exc: log_update_check( "install_now_exception", error=f"{type(exc).__name__}: {exc}"[:200], ) state["message"] = "Update konnte nicht abgeschlossen werden." finally: state["done"] = True def finalize() -> None: if not state["done"]: root.after(200, finalize) return stop_anim() if state["ok"]: set_determinate(100.0) pct_var.set("") set_status("AzA wird neu gestartet …") log_update_check("install_now_result", ok=True) if restart_exe: install_dir = get_install_dir() exe = install_dir / restart_exe if exe.is_file(): try: subprocess.Popen( [str(exe)], cwd=str(install_dir), close_fds=True, creationflags=0x08000000, ) log_update_check("install_now_restart", exe=str(exe)) except Exception as exc: log_update_check( "install_now_restart_failed", error=f"{type(exc).__name__}: {exc}"[:120], ) close_btn.configure(state="normal") root.after(1800, root.destroy) else: pb_canvas.coords(fill_rect, 0, 0, 0, PB_HEIGHT) pct_var.set("") set_status(state.get("message") or "Update fehlgeschlagen.") log_update_check( "install_now_result", ok=False, msg=str(state.get("message"))[:200], ) close_btn.configure(state="normal") threading.Thread(target=worker, daemon=True, name="aza-install-now").start() root.after(120, pump_progress) root.after(400, finalize) root.mainloop() return 0 if state["ok"] else 1 def _show_friendly_update_dialog( result: dict[str, Any], *, parent: tk.Misc | None = None, ) -> tuple[str, bool]: """ Freundlicher 3-Optionen-Dialog. Rueckgabe: (choice, disable_auto_prompt) choice: "now" | "on_exit" | "later" """ owns = False root = parent if root is None: root = tk.Tk() root.withdraw() owns = True choice = {"value": "later"} disable_auto = {"value": False} dlg = tk.Toplevel(root) dlg.title("Update verfügbar") dlg.resizable(False, False) dlg.transient(root) dlg.grab_set() try: dlg.attributes("-topmost", True) dlg.after(800, lambda: dlg.attributes("-topmost", False)) except Exception: pass BG = "#FFFFFF" HDR = "#1A4D6D" TXT = "#1A3D55" SUB = "#607890" BTN = "#1A8ACC" # Header hdr = tk.Frame(dlg, bg=HDR, padx=18, pady=12) hdr.pack(fill="x") tk.Label(hdr, text="Update verfügbar", bg=HDR, fg="#FFFFFF", font=("Segoe UI", 12, "bold")).pack(anchor="w") # Body body = tk.Frame(dlg, bg=BG, padx=18, pady=14) body.pack(fill="both", expand=True) tk.Label(body, text="Eine neue AzA-Version ist verfügbar.", bg=BG, fg=TXT, font=("Segoe UI", 10, "bold")).pack(anchor="w", pady=(0, 6)) tk.Label(body, text="Möchten Sie sie jetzt installieren oder beim Beenden von AzA?", bg=BG, fg=TXT, font=("Segoe UI", 10), wraplength=380, justify="left", ).pack(anchor="w", pady=(0, 14)) btn_row = tk.Frame(body, bg=BG) btn_row.pack(fill="x", pady=(0, 12)) def _make_btn(parent, text, cmd, primary=False): bg = BTN if primary else "#D4E7F5" fg = "#FFFFFF" if primary else "#1A4D6D" b = tk.Button(parent, text=text, command=cmd, bg=bg, fg=fg, activebackground=bg, activeforeground=fg, font=("Segoe UI", 9), bd=0, padx=12, pady=7, cursor="hand2", relief="flat") return b def on_now(): choice["value"] = "now" dlg.destroy() def on_exit(): choice["value"] = "on_exit" dlg.destroy() def on_later(): choice["value"] = "later" dlg.destroy() _make_btn(btn_row, "Jetzt installieren", on_now, primary=True).pack( side="left", padx=(0, 8)) _make_btn(btn_row, "Beim Beenden installieren", on_exit).pack(side="left", padx=(0, 8)) _make_btn(btn_row, "Später", on_later).pack(side="left") # Checkbox sep = tk.Frame(body, bg="#E0EAF4", height=1) sep.pack(fill="x", pady=(0, 8)) auto_var = tk.BooleanVar(value=False) tk.Checkbutton( body, text="Nicht mehr automatisch nach Updates fragen", variable=auto_var, bg=BG, fg=SUB, activebackground=BG, selectcolor=BG, font=("Segoe UI", 8), anchor="w", ).pack(anchor="w") tk.Label(body, text="Sie können Updates jederzeit über Einstellungen → Updates suchen prüfen.", bg=BG, fg="#A0B8CC", font=("Segoe UI", 7), wraplength=380, justify="left", ).pack(anchor="w", pady=(2, 0)) dlg.protocol("WM_DELETE_WINDOW", on_later) dlg.update_idletasks() try: sw, sh = dlg.winfo_screenwidth(), dlg.winfo_screenheight() dlg.geometry( f"+{max(0,(sw - dlg.winfo_reqwidth()) // 2)}" f"+{max(0,(sh - dlg.winfo_reqheight()) // 2)}" ) except Exception: pass if owns: dlg.wait_window() try: root.destroy() except Exception: pass else: root.wait_window(dlg) return choice["value"], bool(auto_var.get()) def _show_update_dialog_native() -> str: """ Windows-nativer 3-Optionen-Dialog (thread-sicher neben pywebview). Rueckgabe: 'now' | 'on_exit' | 'later' """ if sys.platform != "win32": return "later" import ctypes MB_YESNOCANCEL = 0x00000003 MB_ICONINFORMATION = 0x00000040 MB_TOPMOST = 0x00040000 MB_SETFOREGROUND = 0x00010000 IDYES = 6 IDNO = 7 IDCANCEL = 2 text = ( "Eine neue AzA-Version ist verfügbar.\n\n" "Ja = Jetzt installieren\n" "Nein = Beim Beenden installieren\n" "Abbrechen = Später" ) ret = ctypes.windll.user32.MessageBoxW( # type: ignore[attr-defined] 0, text, "Update verfügbar", MB_YESNOCANCEL | MB_ICONINFORMATION | MB_TOPMOST | MB_SETFOREGROUND, ) if int(ret) == IDYES: return "now" if int(ret) == IDNO: return "on_exit" return "later" def _notify_native(title: str, message: str) -> None: if sys.platform != "win32": return import ctypes MB_OK = 0 MB_ICONINFORMATION = 0x40 MB_TOPMOST = 0x40000 ctypes.windll.user32.MessageBoxW( # type: ignore[attr-defined] 0, message, title, MB_OK | MB_ICONINFORMATION | MB_TOPMOST ) def _handle_update_prompt( result: dict[str, Any], *, parent: tk.Misc | None = None, use_native_dialog: bool = False, ) -> None: """Dialog + externer Installer oder 'Beim Beenden'-Merker.""" if not get_auto_update_prompt_enabled(): log_update_check("auto_prompt_skipped") return if use_native_dialog: choice = _show_update_dialog_native() disable_auto = False else: choice, disable_auto = _show_friendly_update_dialog(result, parent=parent) if disable_auto: set_auto_update_prompt_enabled(False) log_update_check("auto_prompt_disabled_by_user") if choice == "later": _log("startup update deferred by user") log_update_check("user_deferred") return if choice == "on_exit": remote = result.get("remote") or {} manifest_url = str(remote.get("_manifest_url") or "").strip() if not manifest_url: manifest_url = (os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip() latest_version = str(result.get("latest_version") or "").strip() save_update_pending(manifest_url, latest_version) log_update_check("pending_on_exit", version=latest_version) info = "Das Update wird beim Beenden von AzA automatisch installiert." if use_native_dialog: _notify_native("Update vorgemerkt", info) elif parent is not None: try: messagebox.showinfo("Update vorgemerkt", info, parent=parent) except Exception: pass return # choice == "now" ready, _reason = validate_update_install_ready(result) if not ready: msg = ( "Das Update-Modul wird vorbereitet.\n" "Bitte starten Sie die Aktualisierung später erneut." ) log_update_check("install_not_ready") if use_native_dialog: _notify_native("Update", msg) elif parent is not None: messagebox.showinfo("Update", msg, parent=parent) else: root = tk.Tk() root.withdraw() messagebox.showinfo("Update", msg, parent=root) root.destroy() return ok, msg = launch_external_installer(result) log_update_check("install_launch", ok=ok, msg=str(msg)[:160]) if ok: info = ( "Update wird vorbereitet.\n" "AzA wird kurz geschlossen und automatisch neu gestartet." ) try: if use_native_dialog: _notify_native("Update", info) elif parent is not None: try: parent.after(0, lambda: messagebox.showinfo("Update", info, parent=parent)) except Exception: pass except Exception: pass try: if parent is not None: parent.after(400, parent.destroy) except Exception: pass def _terminate() -> None: try: os._exit(0) except Exception: pass threading.Timer(1.8, _terminate).start() return err = f"Update konnte nicht gestartet werden.\n{msg}" if use_native_dialog: _notify_native("Update", err) elif parent is not None: messagebox.showinfo("Update", err, parent=parent) else: root = tk.Tk() root.withdraw() messagebox.showinfo("Update", err, parent=root) root.destroy() def maybe_prompt_update_on_startup( *, parent: tk.Misc | None = None, use_native_dialog: bool = False, ) -> None: """ Beim Startpanel: still pruefen, bei Update freundlich fragen. 404 / kein Manifest / aktuell => nichts anzeigen. """ result = check_update_status(startup=True) status = result.get("status") _log(f"startup status={status} detail={result.get('detail')}") if status in ("manifest_unavailable", "current", "channel_mismatch", "error"): return if status != "update_available": return _handle_update_prompt(result, parent=parent, use_native_dialog=use_native_dialog) def run_startup_update_check_in_background( *, delay_seconds: float = 1.2, parent: tk.Misc | None = None, use_native_dialog: bool = False, schedule_on_main: Callable[[Callable[[], None]], None] | None = None, ) -> None: """ Nicht-blockierende Startpruefung fuer aza_start_panel.py. Worker-Thread prueft nur das Manifest. Dialog/Installation erfolgt im Main-Thread (Tk-Fallback) oder per nativem Win32-Dialog (WebView). """ def worker() -> None: import time time.sleep(max(0.0, delay_seconds)) try: result = check_update_status(startup=True) status = result.get("status") _log(f"startup status={status} detail={result.get('detail')}") if status != "update_available": return def prompt() -> None: _handle_update_prompt( result, parent=parent, use_native_dialog=use_native_dialog, ) if schedule_on_main is not None: schedule_on_main(prompt) else: prompt() except Exception as exc: _log(f"startup check skipped: {type(exc).__name__}") log_update_check("startup_error", error=type(exc).__name__) threading.Thread( target=worker, daemon=True, name="aza-startup-update", ).start() def check_updates_interactive(*, parent: tk.Misc | None = None) -> None: """Manuelle Pruefung — fuer Einstellungen/Controller.""" result = check_update_status(startup=False) status = result.get("status") if status == "update_available": choice, disable_auto = _show_friendly_update_dialog(result, parent=parent) if disable_auto: set_auto_update_prompt_enabled(False) if choice == "now": _handle_update_prompt(result, parent=parent, use_native_dialog=False) elif choice == "on_exit": remote = result.get("remote") or {} manifest_url = str(remote.get("_manifest_url") or "").strip() save_update_pending(manifest_url, str(result.get("latest_version") or "")) messagebox.showinfo("Update vorgemerkt", "Das Update wird beim Beenden von AzA installiert.", parent=parent) return if status == "current": messagebox.showinfo( "Update", f"AZA ist aktuell ({format_version_label()}).", parent=parent, ) return messagebox.showinfo( "Update", "Derzeit ist kein Update verfuegbar.", parent=parent, ) def show_app_info(*, parent: tk.Misc | None = None) -> None: local = load_local_version() messagebox.showinfo( "AzA", f"Version: {format_version_label(local)}\n\nAzA Medwork", parent=parent, ) def _read_marker(install_dir: Path) -> str: p = install_dir / "app_marker.txt" if p.is_file(): return p.read_text(encoding="utf-8").strip() return "" def _reset_test_install_dir(install_dir: Path) -> None: install_dir.mkdir(parents=True, exist_ok=True) backups = install_dir / "_update_backups" if backups.is_dir(): shutil.rmtree(backups, ignore_errors=True) (install_dir / "version.json").write_text( json.dumps( { "version": "1.2.0", "build": "TEST_OLD", "channel": "stable", "app": "AZA Desktop", }, indent=2, ) + "\n", encoding="utf-8", ) (install_dir / "app_marker.txt").write_text("OLD_VERSION\n", encoding="utf-8") for name in ( "aza_controller.exe", "aza_office.exe", "aza_praxis_chat.exe", "aza_updater.exe", ): p = install_dir / name if not p.is_file(): p.write_bytes(b"OLD_DUMMY_EXE\n") def _build_test_package(project_root: Path) -> tuple[Path, Path, str, int]: import zipfile pkg_dir = project_root / "_updater_test_package" zip_path = project_root / "_updater_test_package.zip" if pkg_dir.is_dir(): shutil.rmtree(pkg_dir) pkg_dir.mkdir(parents=True) (pkg_dir / "version.json").write_text( json.dumps( { "version": "9.9.9", "build": "TEST_LOCAL", "channel": "stable", "app": "AZA Desktop", }, indent=2, ) + "\n", encoding="utf-8", ) (pkg_dir / "app_marker.txt").write_text("NEW_VERSION\n", encoding="utf-8") for name in ( "aza_controller.exe", "aza_office.exe", "aza_praxis_chat.exe", "aza_updater.exe", ): (pkg_dir / name).write_bytes(b"NEW_DUMMY_EXE\n") if zip_path.is_file(): zip_path.unlink() with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for fp in pkg_dir.rglob("*"): if fp.is_file(): zf.write(fp, fp.relative_to(pkg_dir).as_posix()) sha = compute_sha256(zip_path) return pkg_dir, zip_path, sha, zip_path.stat().st_size def _write_test_manifest( project_root: Path, zip_path: Path, sha256: str, size_bytes: int, *, bad_sha: str | None = None, missing_sha: bool = False, missing_zip: bool = False, ) -> Path: from datetime import date manifest_path = project_root / "test_update_manifest.json" url = str(zip_path.resolve()) if not missing_zip else str( (project_root / "_missing_update.zip").resolve() ) entry = { "name": zip_path.name, "url": url, "size_bytes": size_bytes, } if not missing_sha: entry["sha256"] = bad_sha or sha256 payload = { "product": "AZA Desktop", "channel": "stable", "latest_version": "9.9.9", "latest_build": "TEST_LOCAL", "min_supported_version": "1.0.0", "mandatory": False, "release_date": date.today().isoformat(), "notes_de": ["Lokaler Updater-Test"], "files": [entry], } manifest_path.write_text( json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) return manifest_path def run_local_e2e_tests(project_root: Path | None = None) -> dict[str, Any]: """Automatisierter lokaler End-to-End-Test ohne GUI und ohne Projektroot.""" root = (project_root or get_project_root()).resolve() install_dir = root / "_updater_test_install" configure_test_install_dir(install_dir) _reset_test_install_dir(install_dir) _, zip_path, sha256, size_bytes = _build_test_package(root) manifest_path = _write_test_manifest(root, zip_path, sha256, size_bytes) report: dict[str, Any] = { "install_dir": str(install_dir), "zip_path": str(zip_path), "manifest_path": str(manifest_path), "sha256": sha256, } result = check_update_from_manifest(manifest_path) report["update_detected"] = result.get("status") == "update_available" ok, msg = perform_confirmed_update(result) report["update_applied"] = ok report["update_message"] = msg report["marker_after_update"] = _read_marker(install_dir) report["version_after_update"] = load_local_version().get("version") backups = list_available_backups() report["backup_created"] = bool(backups) if backups: rb_ok, rb_msg = rollback_from_backup(backups[0], install_dir) report["rollback_tested"] = rb_ok report["rollback_message"] = rb_msg report["marker_after_rollback"] = _read_marker(install_dir) report["version_after_rollback"] = load_local_version().get("version") _reset_test_install_dir(install_dir) bad_manifest = _write_test_manifest( root, zip_path, sha256, size_bytes, bad_sha="0" * 64 ) bad_result = check_update_from_manifest(bad_manifest) bad_ok, _ = perform_confirmed_update(bad_result) report["bad_sha_blocked"] = (not bad_ok) and _read_marker(install_dir) == "OLD_VERSION" _reset_test_install_dir(install_dir) no_sha_manifest = _write_test_manifest( root, zip_path, sha256, size_bytes, missing_sha=True ) no_sha_result = check_update_from_manifest(no_sha_manifest) no_sha_ready, _ = validate_update_install_ready(no_sha_result) report["missing_sha_blocked"] = not no_sha_ready _reset_test_install_dir(install_dir) missing_zip_manifest = _write_test_manifest( root, zip_path, sha256, size_bytes, missing_zip=True ) missing_result = check_update_from_manifest(missing_zip_manifest) missing_ok, _ = perform_confirmed_update(missing_result) report["missing_zip_blocked"] = (not missing_ok) and _read_marker(install_dir) == "OLD_VERSION" _reset_test_install_dir(install_dir) later_result = check_update_from_manifest(manifest_path) report["later_unchanged"] = ( later_result.get("status") == "update_available" and _read_marker(install_dir) == "OLD_VERSION" and load_local_version().get("version") == "1.2.0" ) project_version_before = json.loads((root / "version.json").read_text(encoding="utf-8-sig")) report["project_root_version"] = project_version_before.get("version") report["sha256_verified"] = report.get("update_applied", False) report["project_root_unchanged"] = ( json.loads((root / "version.json").read_text(encoding="utf-8-sig")).get("version") == project_version_before.get("version") ) configure_test_install_dir(None) _write_test_manifest(root, zip_path, sha256, size_bytes) return report def _parse_cli() -> argparse.Namespace: import argparse p = argparse.ArgumentParser(description="AZA Desktop Updater") p.add_argument( "--e2e-local-test", action="store_true", help="Lokalen End-to-End-Test ausfuehren (ohne GUI)", ) p.add_argument( "--install-dir", help="Test-Installationsordner (setzt AZA_UPDATE_TEST_INSTALL_DIR)", ) p.add_argument( "--manifest", help="Manifest-URL oder lokaler Pfad (ueberschreibt Standard-Quellen)", ) p.add_argument( "--check-only", action="store_true", help="Nur pruefen und Ergebnis als JSON ausgeben (kein Dialog)", ) p.add_argument( "--install-now", action="store_true", help="Externer Installations-Modus mit Statusfenster", ) p.add_argument( "--target-dir", help="Installationsordner fuer --install-now", ) p.add_argument( "--restart-exe", default="aza_start_panel.exe", help="Nach erfolgreichem Update zu startende EXE", ) p.add_argument( "--yes", action="store_true", help="Update ohne zusaetzlichen Dialog installieren (nur Testmodus)", ) return p.parse_args() def main() -> int: args = _parse_cli() _apply_cli_env(args.install_dir) if args.install_now: manifest = (args.manifest or os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip() return _run_install_now_window( manifest=manifest, target_dir=args.target_dir, restart_exe=args.restart_exe, ) if args.e2e_local_test: report = run_local_e2e_tests() print(json.dumps(report, indent=2, ensure_ascii=False)) ok = all( report.get(k) for k in ( "update_detected", "update_applied", "backup_created", "rollback_tested", "bad_sha_blocked", "missing_sha_blocked", "missing_zip_blocked", "later_unchanged", "project_root_unchanged", ) ) and report.get("marker_after_update") == "NEW_VERSION" and report.get( "marker_after_rollback" ) == "OLD_VERSION" return 0 if ok else 1 manifest = (args.manifest or os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip() or None if manifest and args.check_only: result = check_update_from_manifest(manifest) local = result.get("local") or load_local_version() log_update_check( "cli_check", status=result.get("status"), manifest_url=manifest, local_version=local.get("version"), ) print(json.dumps(result, indent=2, ensure_ascii=False, default=str)) return 0 if manifest and args.yes: result = check_update_from_manifest(manifest) ok, msg = perform_confirmed_update(result) print(msg) return 0 if ok else 1 if manifest: result = check_update_from_manifest(manifest) status = result.get("status") owns = False parent: tk.Misc | None = None if not tk._default_root: # type: ignore[attr-defined] parent = tk.Tk() parent.withdraw() owns = True if status == "update_available": if _show_friendly_update_dialog(result, parent=parent): ok, msg = perform_confirmed_update(result, parent=parent) messagebox.showinfo("Update", msg, parent=parent) elif status == "current": messagebox.showinfo( "Update", f"AZA ist aktuell ({format_version_label()}).", parent=parent, ) else: messagebox.showinfo( "Update", f"Status: {status}\n{result.get('message', '')}", parent=parent, ) if owns and parent is not None: parent.destroy() return 0 owns = False parent = None if not tk._default_root: # type: ignore[attr-defined] parent = tk.Tk() parent.withdraw() owns = True check_updates_interactive(parent=parent) if owns and parent is not None: parent.destroy() return 0 if __name__ == "__main__": raise SystemExit(main())