1416 lines
45 KiB
Python
1416 lines
45 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
AZA Desktop Updater — Startprompt + sichere Installation.
|
|
|
|
Beim Startpanel-Start: freundliche Nachfrage bei neuer Version.
|
|
Kein sichtbarer Update-Button, kein stilles Auto-Update.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import tkinter as tk
|
|
from pathlib import Path
|
|
from tkinter import messagebox, ttk
|
|
from typing import Any, Callable
|
|
|
|
from aza_update_core import (
|
|
check_update_for_startup,
|
|
check_update_from_manifest,
|
|
configure_test_install_dir,
|
|
create_pre_update_backup,
|
|
compute_sha256,
|
|
download_update_package,
|
|
evaluate_update,
|
|
extract_update_zip,
|
|
fetch_remote_manifest,
|
|
fetch_startup_manifest,
|
|
format_version_label,
|
|
get_install_dir,
|
|
get_project_root,
|
|
get_test_install_dir,
|
|
list_available_backups,
|
|
load_local_version,
|
|
load_manifest_from_source,
|
|
log_update_check,
|
|
resolve_manifest_sources,
|
|
rollback_from_backup,
|
|
save_local_version,
|
|
validate_update_install_ready,
|
|
get_auto_update_prompt_enabled,
|
|
set_auto_update_prompt_enabled,
|
|
save_update_pending,
|
|
load_update_pending,
|
|
clear_update_pending,
|
|
)
|
|
|
|
|
|
def _log(msg: str) -> None:
|
|
print(f"[AZA Updater] {msg}")
|
|
log_update_check("updater", message=msg)
|
|
|
|
|
|
def _apply_cli_env(install_dir: str | None = None) -> None:
|
|
if install_dir:
|
|
configure_test_install_dir(install_dir)
|
|
return
|
|
env_dir = (os.environ.get("AZA_UPDATE_TEST_INSTALL_DIR") or "").strip()
|
|
if env_dir:
|
|
configure_test_install_dir(env_dir)
|
|
|
|
|
|
def check_update_status(
|
|
*,
|
|
startup: bool = False,
|
|
manifest: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Manifest pruefen — startup=True fuer Startpanel, manifest= fuer CLI/Env."""
|
|
if manifest:
|
|
return check_update_from_manifest(manifest)
|
|
if startup:
|
|
env_manifest = (os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip() or None
|
|
return check_update_for_startup(manifest=env_manifest)
|
|
local = load_local_version()
|
|
sources = resolve_manifest_sources()
|
|
remote, err = fetch_startup_manifest(sources=sources)
|
|
if err or not remote:
|
|
return {
|
|
"status": "error",
|
|
"message": "Manifest nicht erreichbar.",
|
|
"detail": err,
|
|
"local": local,
|
|
}
|
|
result = evaluate_update(local, remote)
|
|
result["detail"] = err
|
|
return result
|
|
|
|
|
|
def _stop_aza_processes(*, include_panel: bool = False) -> None:
|
|
if sys.platform != "win32":
|
|
return
|
|
names = [
|
|
"aza_desktop.exe",
|
|
"aza_controller.exe",
|
|
"AZA_EmpfangShell.exe",
|
|
"aza_office.exe",
|
|
]
|
|
if include_panel:
|
|
names.append("aza_start_panel.exe")
|
|
_NO_WINDOW = 0x08000000
|
|
for name in names:
|
|
try:
|
|
subprocess.run(
|
|
["taskkill", "/F", "/IM", name],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15,
|
|
creationflags=_NO_WINDOW,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _is_dir_writable(path: Path) -> bool:
|
|
try:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
probe = path / f".aza_write_test_{os.getpid()}"
|
|
probe.write_text("x", encoding="utf-8")
|
|
try:
|
|
probe.unlink()
|
|
except OSError:
|
|
pass
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _resolve_installed_updater_exe(target_install: Path) -> Path | None:
|
|
cand = target_install / "aza_updater.exe"
|
|
if cand.is_file():
|
|
return cand
|
|
if getattr(sys, "frozen", False):
|
|
here = Path(sys.executable).resolve().parent
|
|
cand2 = here / "aza_updater.exe"
|
|
if cand2.is_file():
|
|
return cand2
|
|
return None
|
|
|
|
|
|
def _copy_updater_to_temp(src_exe: Path) -> Path | None:
|
|
import tempfile
|
|
|
|
temp_root = Path(tempfile.gettempdir()) / "AzA_Updater_Run"
|
|
try:
|
|
temp_root.mkdir(parents=True, exist_ok=True)
|
|
except Exception as exc:
|
|
_log(f"temp_root mkdir failed: {type(exc).__name__}")
|
|
return None
|
|
dst = temp_root / "aza_updater.exe"
|
|
try:
|
|
if dst.is_file():
|
|
try:
|
|
dst.unlink()
|
|
except Exception:
|
|
pass
|
|
shutil.copy2(src_exe, dst)
|
|
return dst
|
|
except Exception as exc:
|
|
_log(f"copy_updater_to_temp failed: {type(exc).__name__}")
|
|
return None
|
|
|
|
|
|
def launch_external_installer(
|
|
result: dict[str, Any],
|
|
*,
|
|
install_dir: Path | None = None,
|
|
manifest_url: str | None = None,
|
|
restart_exe: str = "aza_start_panel.exe",
|
|
) -> tuple[bool, str]:
|
|
"""
|
|
Startet aza_updater.exe als externen Prozess (UAC falls noetig)
|
|
und gibt die Kontrolle ab. Der externe Updater installiert sicher,
|
|
weil er aus %TEMP% laeuft und das Startpanel selbst ersetzen kann.
|
|
"""
|
|
target_install = (install_dir or get_install_dir()).resolve()
|
|
src_updater = _resolve_installed_updater_exe(target_install)
|
|
if src_updater is None:
|
|
log_update_check("ext_launch_no_updater", target=str(target_install))
|
|
return False, "aza_updater.exe nicht gefunden."
|
|
|
|
temp_updater = _copy_updater_to_temp(src_updater)
|
|
if temp_updater is None:
|
|
log_update_check("ext_launch_copy_failed")
|
|
return False, "Updater konnte nicht in TEMP kopiert werden."
|
|
|
|
if not manifest_url:
|
|
remote = result.get("remote") or {}
|
|
manifest_url = str(remote.get("_manifest_url") or "").strip()
|
|
if not manifest_url:
|
|
manifest_url = (os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip()
|
|
|
|
args = ["--install-now"]
|
|
if manifest_url:
|
|
args += ["--manifest", manifest_url]
|
|
args += ["--target-dir", str(target_install)]
|
|
if restart_exe:
|
|
args += ["--restart-exe", restart_exe]
|
|
|
|
use_runas = (sys.platform == "win32") and (not _is_dir_writable(target_install))
|
|
log_update_check(
|
|
"ext_launch",
|
|
updater=str(temp_updater),
|
|
target=str(target_install),
|
|
manifest=manifest_url or "",
|
|
runas=use_runas,
|
|
)
|
|
try:
|
|
if use_runas:
|
|
import ctypes
|
|
|
|
params = " ".join(f'"{a}"' if (" " in a or a == "") else a for a in args)
|
|
SW_HIDE = 0
|
|
ret = ctypes.windll.shell32.ShellExecuteW( # type: ignore[attr-defined]
|
|
None, "runas", str(temp_updater), params, None, SW_HIDE
|
|
)
|
|
if int(ret) <= 32:
|
|
log_update_check("ext_launch_uac_failed", code=int(ret))
|
|
return False, f"UAC-Start fehlgeschlagen (code={int(ret)})"
|
|
else:
|
|
CREATE_NO_WINDOW = 0x08000000
|
|
creationflags = CREATE_NO_WINDOW
|
|
if sys.platform == "win32":
|
|
creationflags |= (
|
|
getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
|
|
| getattr(subprocess, "DETACHED_PROCESS", 0)
|
|
)
|
|
subprocess.Popen(
|
|
[str(temp_updater)] + args,
|
|
cwd=str(temp_updater.parent),
|
|
close_fds=True,
|
|
creationflags=creationflags,
|
|
)
|
|
except Exception as exc:
|
|
log_update_check("ext_launch_exception", error=f"{type(exc).__name__}: {exc}"[:160])
|
|
return False, f"Updater konnte nicht gestartet werden: {exc}"
|
|
|
|
return True, "ok"
|
|
|
|
|
|
def _apply_downloaded_update(package: Path, result: dict[str, Any]) -> tuple[bool, str]:
|
|
install_dir = get_install_dir()
|
|
log_update_check("apply_start", install_dir=str(install_dir), package=str(package))
|
|
try:
|
|
backup_dir = create_pre_update_backup(install_dir)
|
|
_log(f"Backup erstellt: {backup_dir}")
|
|
log_update_check("apply_backup", backup=str(backup_dir))
|
|
except Exception as exc:
|
|
log_update_check("apply_backup_failed", error=f"{type(exc).__name__}: {exc}"[:160])
|
|
return False, "Backup konnte nicht erstellt werden."
|
|
|
|
suffix = package.suffix.lower()
|
|
if suffix == ".zip":
|
|
ok, msg = extract_update_zip(package, install_dir)
|
|
log_update_check("apply_extract", ok=ok, msg=str(msg)[:120])
|
|
if not ok:
|
|
rollback_from_backup(backup_dir, install_dir)
|
|
log_update_check("apply_rollback_done")
|
|
return False, f"Installation fehlgeschlagen.\nRollback durchgefuehrt.\nDetail: {msg}"
|
|
elif suffix == ".exe":
|
|
target = install_dir / package.name
|
|
try:
|
|
shutil.copy2(package, target)
|
|
log_update_check("apply_copy_exe", target=str(target))
|
|
except Exception as exc:
|
|
log_update_check("apply_copy_failed", error=f"{type(exc).__name__}: {exc}"[:160])
|
|
rollback_from_backup(backup_dir, install_dir)
|
|
return False, f"Installation fehlgeschlagen.\nRollback durchgefuehrt.\nDetail: {exc}"
|
|
else:
|
|
return False, "Unbekanntes Update-Format."
|
|
|
|
new_version = {
|
|
"version": result.get("latest_version"),
|
|
"build": result.get("latest_build"),
|
|
"channel": (result.get("local") or {}).get("channel", "stable"),
|
|
}
|
|
try:
|
|
save_local_version({k: v for k, v in new_version.items() if v})
|
|
log_update_check("apply_save_version", version=new_version.get("version"))
|
|
except Exception as exc:
|
|
_log(f"version.json write failed: {exc}")
|
|
log_update_check("apply_savever_failed", error=f"{type(exc).__name__}: {exc}"[:160])
|
|
|
|
return True, "Das Update wurde installiert.\nBitte starten Sie AZA neu."
|
|
|
|
|
|
def perform_confirmed_update(
|
|
result: dict[str, Any],
|
|
*,
|
|
parent: tk.Misc | None = None,
|
|
) -> tuple[bool, str]:
|
|
"""Download + SHA256 + Backup + Installation — nur bei gueltigem Manifest."""
|
|
ready, reason = validate_update_install_ready(result)
|
|
if not ready:
|
|
return False, (
|
|
"Das Update ist noch nicht vollstaendig verfuegbar.\n"
|
|
"Bitte versuchen Sie es spaeter erneut."
|
|
)
|
|
|
|
files = result.get("files") or []
|
|
file_info = files[0]
|
|
_log("Download starten")
|
|
|
|
package, err = download_update_package(file_info)
|
|
if not package:
|
|
_log(f"Download fehlgeschlagen: {err}")
|
|
return False, (
|
|
"Das Update konnte nicht heruntergeladen werden.\n"
|
|
"Bitte versuchen Sie es spaeter erneut."
|
|
)
|
|
|
|
_stop_aza_processes()
|
|
return _apply_downloaded_update(package, result)
|
|
|
|
|
|
def _run_install_now_window(
|
|
*,
|
|
manifest: str,
|
|
target_dir: str | None,
|
|
restart_exe: str | None,
|
|
) -> int:
|
|
"""Externes Update-Fenster im modernen AzA-Stil (Download + Install)."""
|
|
import time
|
|
|
|
if target_dir:
|
|
configure_test_install_dir(target_dir)
|
|
|
|
if sys.platform == "win32" and not _is_dir_writable(get_install_dir()):
|
|
try:
|
|
import ctypes
|
|
|
|
exe = sys.executable
|
|
params = " ".join(f'"{a}"' if (" " in a or a == "") else a for a in sys.argv[1:])
|
|
SW_HIDE = 0
|
|
ret = ctypes.windll.shell32.ShellExecuteW( # type: ignore[attr-defined]
|
|
None, "runas", exe, params, None, SW_HIDE
|
|
)
|
|
if int(ret) > 32:
|
|
log_update_check("install_now_self_elevate")
|
|
return 0
|
|
log_update_check("install_now_uac_denied", code=int(ret))
|
|
except Exception as exc:
|
|
log_update_check(
|
|
"install_now_elevate_exc",
|
|
error=f"{type(exc).__name__}: {exc}"[:120],
|
|
)
|
|
|
|
log_update_check(
|
|
"install_now_start",
|
|
manifest=manifest or "",
|
|
target=target_dir or "",
|
|
restart=restart_exe or "",
|
|
)
|
|
|
|
BG = "#FFFFFF"
|
|
TRACK = "#E1F6FC"
|
|
FILL = "#1A8ACC"
|
|
TEXT_STRONG = "#1A4D6D"
|
|
SUBTLE = "#8899AA"
|
|
BTN_BG = "#E8EFF5"
|
|
PB_WIDTH = 464
|
|
PB_HEIGHT = 10
|
|
|
|
root = tk.Tk()
|
|
root.title("AzA Update")
|
|
root.resizable(False, False)
|
|
try:
|
|
root.attributes("-alpha", 0.0)
|
|
except Exception:
|
|
pass
|
|
root.configure(bg=BG)
|
|
|
|
def _find_logo_candidates(ext: str) -> list[Path]:
|
|
"""Sucht logo.ext in mehreren Verzeichnissen — robust auch aus %TEMP%."""
|
|
cands: list[Path] = []
|
|
if getattr(sys, "frozen", False):
|
|
cands.append(Path(sys.executable).parent / f"logo.{ext}")
|
|
cands.append(Path(__file__).resolve().parent / f"logo.{ext}")
|
|
try:
|
|
install = get_install_dir()
|
|
cands.append(install / f"logo.{ext}")
|
|
cands.append(install / "assets" / f"logo.{ext}")
|
|
except Exception:
|
|
pass
|
|
if target_dir:
|
|
td = Path(target_dir)
|
|
cands.append(td / f"logo.{ext}")
|
|
cands.append(td / "assets" / f"logo.{ext}")
|
|
return cands
|
|
|
|
try:
|
|
for ico in _find_logo_candidates("ico"):
|
|
if ico.is_file():
|
|
root.iconbitmap(default=str(ico))
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
W, H = 520, 232
|
|
try:
|
|
sw, sh = root.winfo_screenwidth(), root.winfo_screenheight()
|
|
x = max(0, (sw - W) // 2)
|
|
y = max(0, (sh - H) // 2 - 60)
|
|
root.geometry(f"{W}x{H}+{x}+{y}")
|
|
except Exception:
|
|
pass
|
|
|
|
container = tk.Frame(root, bg=BG, padx=28, pady=22)
|
|
container.pack(fill="both", expand=True)
|
|
|
|
header = tk.Frame(container, bg=BG)
|
|
header.pack(fill="x", pady=(0, 14))
|
|
|
|
try:
|
|
for logo_path in _find_logo_candidates("png"):
|
|
if logo_path.is_file():
|
|
logo_tk = tk.PhotoImage(file=str(logo_path))
|
|
ratio = max(1, logo_tk.width() // 40)
|
|
if ratio > 1:
|
|
logo_tk = logo_tk.subsample(ratio, ratio)
|
|
_logo_lbl = tk.Label(header, image=logo_tk, bg=BG)
|
|
_logo_lbl.image = logo_tk # type: ignore[attr-defined]
|
|
_logo_lbl.pack(side="left", padx=(0, 14))
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
title_col = tk.Frame(header, bg=BG)
|
|
title_col.pack(side="left", fill="x", expand=True)
|
|
tk.Label(
|
|
title_col,
|
|
text="AzA wird aktualisiert",
|
|
bg=BG,
|
|
fg=TEXT_STRONG,
|
|
font=("Segoe UI", 14, "bold"),
|
|
anchor="w",
|
|
).pack(anchor="w")
|
|
tk.Label(
|
|
title_col,
|
|
text="Bitte einen Moment Geduld.",
|
|
bg=BG,
|
|
fg=SUBTLE,
|
|
font=("Segoe UI", 9),
|
|
anchor="w",
|
|
).pack(anchor="w", pady=(2, 0))
|
|
|
|
status_var = tk.StringVar(value="Update wird vorbereitet …")
|
|
tk.Label(
|
|
container,
|
|
textvariable=status_var,
|
|
bg=BG,
|
|
fg=TEXT_STRONG,
|
|
font=("Segoe UI", 10),
|
|
anchor="w",
|
|
wraplength=PB_WIDTH,
|
|
justify="left",
|
|
).pack(fill="x", pady=(0, 8))
|
|
|
|
pb_canvas = tk.Canvas(
|
|
container,
|
|
width=PB_WIDTH,
|
|
height=PB_HEIGHT,
|
|
bg=BG,
|
|
highlightthickness=0,
|
|
bd=0,
|
|
)
|
|
pb_canvas.pack(fill="x", pady=(0, 6))
|
|
pb_canvas.create_rectangle(0, 0, PB_WIDTH, PB_HEIGHT, fill=TRACK, outline="")
|
|
fill_rect = pb_canvas.create_rectangle(0, 0, 0, PB_HEIGHT, fill=FILL, outline="")
|
|
|
|
pct_var = tk.StringVar(value="")
|
|
tk.Label(
|
|
container,
|
|
textvariable=pct_var,
|
|
bg=BG,
|
|
fg=SUBTLE,
|
|
font=("Segoe UI", 9),
|
|
anchor="w",
|
|
).pack(anchor="w", pady=(0, 14))
|
|
|
|
btn_row = tk.Frame(container, bg=BG)
|
|
btn_row.pack(fill="x")
|
|
close_btn = tk.Button(
|
|
btn_row,
|
|
text="Schließen",
|
|
state="disabled",
|
|
command=root.destroy,
|
|
bg=BTN_BG,
|
|
fg=TEXT_STRONG,
|
|
activebackground="#D8E3EE",
|
|
activeforeground=TEXT_STRONG,
|
|
font=("Segoe UI", 9),
|
|
bd=0,
|
|
padx=18,
|
|
pady=6,
|
|
cursor="hand2",
|
|
relief="flat",
|
|
)
|
|
close_btn.pack(side="right")
|
|
|
|
anim_state = {"mode": "indeterminate", "x": -130, "after_id": None}
|
|
|
|
def _anim_step() -> None:
|
|
if anim_state["mode"] != "indeterminate":
|
|
return
|
|
seg = 140
|
|
anim_state["x"] += 6
|
|
if anim_state["x"] > PB_WIDTH:
|
|
anim_state["x"] = -seg
|
|
x0 = max(0, anim_state["x"])
|
|
x1 = min(PB_WIDTH, anim_state["x"] + seg)
|
|
pb_canvas.coords(fill_rect, x0, 0, x1, PB_HEIGHT)
|
|
anim_state["after_id"] = root.after(28, _anim_step)
|
|
|
|
def set_indeterminate() -> None:
|
|
if anim_state["after_id"] is not None:
|
|
return
|
|
anim_state["mode"] = "indeterminate"
|
|
anim_state["x"] = -140
|
|
_anim_step()
|
|
|
|
def stop_anim() -> None:
|
|
if anim_state["after_id"] is not None:
|
|
try:
|
|
root.after_cancel(anim_state["after_id"])
|
|
except Exception:
|
|
pass
|
|
anim_state["after_id"] = None
|
|
|
|
def set_determinate(pct: float) -> None:
|
|
stop_anim()
|
|
anim_state["mode"] = "determinate"
|
|
pct = max(0.0, min(100.0, pct))
|
|
w = int(PB_WIDTH * pct / 100.0)
|
|
pb_canvas.coords(fill_rect, 0, 0, w, PB_HEIGHT)
|
|
|
|
def set_status(text: str) -> None:
|
|
try:
|
|
status_var.set(text)
|
|
except Exception:
|
|
pass
|
|
|
|
set_indeterminate()
|
|
root.update_idletasks()
|
|
try:
|
|
root.attributes("-alpha", 1.0)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
root.attributes("-topmost", True)
|
|
root.after(800, lambda: root.attributes("-topmost", False))
|
|
except Exception:
|
|
pass
|
|
|
|
state: dict[str, Any] = {
|
|
"ok": False,
|
|
"message": "",
|
|
"done": False,
|
|
"progress": None,
|
|
}
|
|
|
|
def schedule_progress(done: int, total: int | None) -> None:
|
|
state["progress"] = (done, total)
|
|
|
|
def pump_progress() -> None:
|
|
p = state["progress"]
|
|
if p is not None:
|
|
done, total = p
|
|
state["progress"] = None
|
|
if total and total > 0:
|
|
pct = 100.0 * done / total
|
|
set_determinate(pct)
|
|
pct_var.set(f"Herunterladen: {int(pct)} %")
|
|
else:
|
|
mb = done // (1024 * 1024)
|
|
pct_var.set(f"Heruntergeladen: {mb} MB")
|
|
if not state["done"]:
|
|
root.after(120, pump_progress)
|
|
|
|
def worker() -> None:
|
|
try:
|
|
time.sleep(0.4)
|
|
root.after(0, lambda: set_status("Update wird vorbereitet …"))
|
|
|
|
if manifest:
|
|
result = check_update_from_manifest(manifest)
|
|
else:
|
|
result = check_update_status(startup=False)
|
|
status = result.get("status")
|
|
log_update_check("install_now_manifest", status=status)
|
|
if status != "update_available":
|
|
state["message"] = "Aktuell ist kein Update verfuegbar."
|
|
return
|
|
|
|
files = result.get("files") or []
|
|
if not files:
|
|
state["message"] = "Das Update-Paket ist noch nicht verfuegbar."
|
|
return
|
|
file_info = files[0]
|
|
|
|
root.after(0, lambda: (
|
|
set_status("Update wird heruntergeladen …"),
|
|
pct_var.set("Herunterladen: 0 %"),
|
|
set_determinate(0.0),
|
|
))
|
|
log_update_check("install_now_download_start", url=file_info.get("url", ""))
|
|
package, dl_msg = download_update_package(file_info, progress=schedule_progress)
|
|
log_update_check("install_now_download", ok=bool(package), msg=str(dl_msg)[:120])
|
|
if not package:
|
|
state["message"] = "Das Update konnte nicht heruntergeladen werden."
|
|
return
|
|
|
|
root.after(0, lambda: (
|
|
set_determinate(100.0),
|
|
pct_var.set(""),
|
|
set_status(
|
|
"AzA schließt sich kurz, um das Update fertigzustellen …"
|
|
),
|
|
))
|
|
time.sleep(1.8)
|
|
_stop_aza_processes(include_panel=True)
|
|
time.sleep(1.4)
|
|
|
|
root.after(0, lambda: (
|
|
set_indeterminate(),
|
|
set_status("Update wird installiert …"),
|
|
))
|
|
ok, msg = _apply_downloaded_update(package, result)
|
|
log_update_check("install_now_apply", ok=ok, msg=str(msg)[:120])
|
|
state["ok"] = ok
|
|
state["message"] = msg
|
|
except Exception as exc:
|
|
log_update_check(
|
|
"install_now_exception",
|
|
error=f"{type(exc).__name__}: {exc}"[:200],
|
|
)
|
|
state["message"] = "Update konnte nicht abgeschlossen werden."
|
|
finally:
|
|
state["done"] = True
|
|
|
|
def finalize() -> None:
|
|
if not state["done"]:
|
|
root.after(200, finalize)
|
|
return
|
|
stop_anim()
|
|
if state["ok"]:
|
|
set_determinate(100.0)
|
|
pct_var.set("")
|
|
set_status("AzA wird neu gestartet …")
|
|
log_update_check("install_now_result", ok=True)
|
|
if restart_exe:
|
|
install_dir = get_install_dir()
|
|
exe = install_dir / restart_exe
|
|
if exe.is_file():
|
|
try:
|
|
subprocess.Popen(
|
|
[str(exe)],
|
|
cwd=str(install_dir),
|
|
close_fds=True,
|
|
creationflags=0x08000000,
|
|
)
|
|
log_update_check("install_now_restart", exe=str(exe))
|
|
except Exception as exc:
|
|
log_update_check(
|
|
"install_now_restart_failed",
|
|
error=f"{type(exc).__name__}: {exc}"[:120],
|
|
)
|
|
close_btn.configure(state="normal")
|
|
root.after(1800, root.destroy)
|
|
else:
|
|
pb_canvas.coords(fill_rect, 0, 0, 0, PB_HEIGHT)
|
|
pct_var.set("")
|
|
set_status(state.get("message") or "Update fehlgeschlagen.")
|
|
log_update_check(
|
|
"install_now_result",
|
|
ok=False,
|
|
msg=str(state.get("message"))[:200],
|
|
)
|
|
close_btn.configure(state="normal")
|
|
|
|
threading.Thread(target=worker, daemon=True, name="aza-install-now").start()
|
|
root.after(120, pump_progress)
|
|
root.after(400, finalize)
|
|
root.mainloop()
|
|
return 0 if state["ok"] else 1
|
|
|
|
|
|
def _show_friendly_update_dialog(
|
|
result: dict[str, Any],
|
|
*,
|
|
parent: tk.Misc | None = None,
|
|
) -> tuple[str, bool]:
|
|
"""
|
|
Freundlicher 3-Optionen-Dialog.
|
|
Rueckgabe: (choice, disable_auto_prompt)
|
|
choice: "now" | "on_exit" | "later"
|
|
"""
|
|
owns = False
|
|
root = parent
|
|
if root is None:
|
|
root = tk.Tk()
|
|
root.withdraw()
|
|
owns = True
|
|
|
|
choice = {"value": "later"}
|
|
disable_auto = {"value": False}
|
|
|
|
dlg = tk.Toplevel(root)
|
|
dlg.title("Update verfügbar")
|
|
dlg.resizable(False, False)
|
|
dlg.transient(root)
|
|
dlg.grab_set()
|
|
try:
|
|
dlg.attributes("-topmost", True)
|
|
dlg.after(800, lambda: dlg.attributes("-topmost", False))
|
|
except Exception:
|
|
pass
|
|
|
|
BG = "#FFFFFF"
|
|
HDR = "#1A4D6D"
|
|
TXT = "#1A3D55"
|
|
SUB = "#607890"
|
|
BTN = "#1A8ACC"
|
|
|
|
# Header
|
|
hdr = tk.Frame(dlg, bg=HDR, padx=18, pady=12)
|
|
hdr.pack(fill="x")
|
|
tk.Label(hdr, text="Update verfügbar", bg=HDR, fg="#FFFFFF",
|
|
font=("Segoe UI", 12, "bold")).pack(anchor="w")
|
|
|
|
# Body
|
|
body = tk.Frame(dlg, bg=BG, padx=18, pady=14)
|
|
body.pack(fill="both", expand=True)
|
|
|
|
tk.Label(body, text="Eine neue AzA-Version ist verfügbar.",
|
|
bg=BG, fg=TXT, font=("Segoe UI", 10, "bold")).pack(anchor="w", pady=(0, 6))
|
|
tk.Label(body, text="Möchten Sie sie jetzt installieren oder beim Beenden von AzA?",
|
|
bg=BG, fg=TXT, font=("Segoe UI", 10), wraplength=380, justify="left",
|
|
).pack(anchor="w", pady=(0, 14))
|
|
|
|
btn_row = tk.Frame(body, bg=BG)
|
|
btn_row.pack(fill="x", pady=(0, 12))
|
|
|
|
def _make_btn(parent, text, cmd, primary=False):
|
|
bg = BTN if primary else "#D4E7F5"
|
|
fg = "#FFFFFF" if primary else "#1A4D6D"
|
|
b = tk.Button(parent, text=text, command=cmd, bg=bg, fg=fg,
|
|
activebackground=bg, activeforeground=fg,
|
|
font=("Segoe UI", 9), bd=0, padx=12, pady=7,
|
|
cursor="hand2", relief="flat")
|
|
return b
|
|
|
|
def on_now():
|
|
choice["value"] = "now"
|
|
dlg.destroy()
|
|
|
|
def on_exit():
|
|
choice["value"] = "on_exit"
|
|
dlg.destroy()
|
|
|
|
def on_later():
|
|
choice["value"] = "later"
|
|
dlg.destroy()
|
|
|
|
_make_btn(btn_row, "Jetzt installieren", on_now, primary=True).pack(
|
|
side="left", padx=(0, 8))
|
|
_make_btn(btn_row, "Beim Beenden installieren", on_exit).pack(side="left", padx=(0, 8))
|
|
_make_btn(btn_row, "Später", on_later).pack(side="left")
|
|
|
|
# Checkbox
|
|
sep = tk.Frame(body, bg="#E0EAF4", height=1)
|
|
sep.pack(fill="x", pady=(0, 8))
|
|
auto_var = tk.BooleanVar(value=False)
|
|
tk.Checkbutton(
|
|
body, text="Nicht mehr automatisch nach Updates fragen",
|
|
variable=auto_var,
|
|
bg=BG, fg=SUB, activebackground=BG, selectcolor=BG,
|
|
font=("Segoe UI", 8), anchor="w",
|
|
).pack(anchor="w")
|
|
tk.Label(body,
|
|
text="Sie können Updates jederzeit über Einstellungen → Updates suchen prüfen.",
|
|
bg=BG, fg="#A0B8CC", font=("Segoe UI", 7), wraplength=380, justify="left",
|
|
).pack(anchor="w", pady=(2, 0))
|
|
|
|
dlg.protocol("WM_DELETE_WINDOW", on_later)
|
|
dlg.update_idletasks()
|
|
try:
|
|
sw, sh = dlg.winfo_screenwidth(), dlg.winfo_screenheight()
|
|
dlg.geometry(
|
|
f"+{max(0,(sw - dlg.winfo_reqwidth()) // 2)}"
|
|
f"+{max(0,(sh - dlg.winfo_reqheight()) // 2)}"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
if owns:
|
|
dlg.wait_window()
|
|
try:
|
|
root.destroy()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
root.wait_window(dlg)
|
|
|
|
return choice["value"], bool(auto_var.get())
|
|
|
|
|
|
def _show_update_dialog_native() -> str:
|
|
"""
|
|
Windows-nativer 3-Optionen-Dialog (thread-sicher neben pywebview).
|
|
Rueckgabe: 'now' | 'on_exit' | 'later'
|
|
"""
|
|
if sys.platform != "win32":
|
|
return "later"
|
|
import ctypes
|
|
|
|
MB_YESNOCANCEL = 0x00000003
|
|
MB_ICONINFORMATION = 0x00000040
|
|
MB_TOPMOST = 0x00040000
|
|
MB_SETFOREGROUND = 0x00010000
|
|
IDYES = 6
|
|
IDNO = 7
|
|
IDCANCEL = 2
|
|
|
|
text = (
|
|
"Eine neue AzA-Version ist verfügbar.\n\n"
|
|
"Ja = Jetzt installieren\n"
|
|
"Nein = Beim Beenden installieren\n"
|
|
"Abbrechen = Später"
|
|
)
|
|
ret = ctypes.windll.user32.MessageBoxW( # type: ignore[attr-defined]
|
|
0,
|
|
text,
|
|
"Update verfügbar",
|
|
MB_YESNOCANCEL | MB_ICONINFORMATION | MB_TOPMOST | MB_SETFOREGROUND,
|
|
)
|
|
if int(ret) == IDYES:
|
|
return "now"
|
|
if int(ret) == IDNO:
|
|
return "on_exit"
|
|
return "later"
|
|
|
|
|
|
def _notify_native(title: str, message: str) -> None:
|
|
if sys.platform != "win32":
|
|
return
|
|
import ctypes
|
|
|
|
MB_OK = 0
|
|
MB_ICONINFORMATION = 0x40
|
|
MB_TOPMOST = 0x40000
|
|
ctypes.windll.user32.MessageBoxW( # type: ignore[attr-defined]
|
|
0, message, title, MB_OK | MB_ICONINFORMATION | MB_TOPMOST
|
|
)
|
|
|
|
|
|
def _handle_update_prompt(
|
|
result: dict[str, Any],
|
|
*,
|
|
parent: tk.Misc | None = None,
|
|
use_native_dialog: bool = False,
|
|
) -> None:
|
|
"""Dialog + externer Installer oder 'Beim Beenden'-Merker."""
|
|
if not get_auto_update_prompt_enabled():
|
|
log_update_check("auto_prompt_skipped")
|
|
return
|
|
|
|
if use_native_dialog:
|
|
choice = _show_update_dialog_native()
|
|
disable_auto = False
|
|
else:
|
|
choice, disable_auto = _show_friendly_update_dialog(result, parent=parent)
|
|
|
|
if disable_auto:
|
|
set_auto_update_prompt_enabled(False)
|
|
log_update_check("auto_prompt_disabled_by_user")
|
|
|
|
if choice == "later":
|
|
_log("startup update deferred by user")
|
|
log_update_check("user_deferred")
|
|
return
|
|
|
|
if choice == "on_exit":
|
|
remote = result.get("remote") or {}
|
|
manifest_url = str(remote.get("_manifest_url") or "").strip()
|
|
if not manifest_url:
|
|
manifest_url = (os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip()
|
|
latest_version = str(result.get("latest_version") or "").strip()
|
|
save_update_pending(manifest_url, latest_version)
|
|
log_update_check("pending_on_exit", version=latest_version)
|
|
info = "Das Update wird beim Beenden von AzA automatisch installiert."
|
|
if use_native_dialog:
|
|
_notify_native("Update vorgemerkt", info)
|
|
elif parent is not None:
|
|
try:
|
|
messagebox.showinfo("Update vorgemerkt", info, parent=parent)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# choice == "now"
|
|
ready, _reason = validate_update_install_ready(result)
|
|
if not ready:
|
|
msg = (
|
|
"Das Update-Modul wird vorbereitet.\n"
|
|
"Bitte starten Sie die Aktualisierung später erneut."
|
|
)
|
|
log_update_check("install_not_ready")
|
|
if use_native_dialog:
|
|
_notify_native("Update", msg)
|
|
elif parent is not None:
|
|
messagebox.showinfo("Update", msg, parent=parent)
|
|
else:
|
|
root = tk.Tk()
|
|
root.withdraw()
|
|
messagebox.showinfo("Update", msg, parent=root)
|
|
root.destroy()
|
|
return
|
|
|
|
ok, msg = launch_external_installer(result)
|
|
log_update_check("install_launch", ok=ok, msg=str(msg)[:160])
|
|
|
|
if ok:
|
|
info = (
|
|
"Update wird vorbereitet.\n"
|
|
"AzA wird kurz geschlossen und automatisch neu gestartet."
|
|
)
|
|
try:
|
|
if use_native_dialog:
|
|
_notify_native("Update", info)
|
|
elif parent is not None:
|
|
try:
|
|
parent.after(0, lambda: messagebox.showinfo("Update", info, parent=parent))
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
try:
|
|
if parent is not None:
|
|
parent.after(400, parent.destroy)
|
|
except Exception:
|
|
pass
|
|
|
|
def _terminate() -> None:
|
|
try:
|
|
os._exit(0)
|
|
except Exception:
|
|
pass
|
|
|
|
threading.Timer(1.8, _terminate).start()
|
|
return
|
|
|
|
err = f"Update konnte nicht gestartet werden.\n{msg}"
|
|
if use_native_dialog:
|
|
_notify_native("Update", err)
|
|
elif parent is not None:
|
|
messagebox.showinfo("Update", err, parent=parent)
|
|
else:
|
|
root = tk.Tk()
|
|
root.withdraw()
|
|
messagebox.showinfo("Update", err, parent=root)
|
|
root.destroy()
|
|
|
|
|
|
def maybe_prompt_update_on_startup(
|
|
*,
|
|
parent: tk.Misc | None = None,
|
|
use_native_dialog: bool = False,
|
|
) -> None:
|
|
"""
|
|
Beim Startpanel: still pruefen, bei Update freundlich fragen.
|
|
404 / kein Manifest / aktuell => nichts anzeigen.
|
|
"""
|
|
result = check_update_status(startup=True)
|
|
status = result.get("status")
|
|
_log(f"startup status={status} detail={result.get('detail')}")
|
|
|
|
if status in ("manifest_unavailable", "current", "channel_mismatch", "error"):
|
|
return
|
|
|
|
if status != "update_available":
|
|
return
|
|
|
|
_handle_update_prompt(result, parent=parent, use_native_dialog=use_native_dialog)
|
|
|
|
|
|
def run_startup_update_check_in_background(
|
|
*,
|
|
delay_seconds: float = 1.2,
|
|
parent: tk.Misc | None = None,
|
|
use_native_dialog: bool = False,
|
|
schedule_on_main: Callable[[Callable[[], None]], None] | None = None,
|
|
) -> None:
|
|
"""
|
|
Nicht-blockierende Startpruefung fuer aza_start_panel.py.
|
|
|
|
Worker-Thread prueft nur das Manifest. Dialog/Installation erfolgt
|
|
im Main-Thread (Tk-Fallback) oder per nativem Win32-Dialog (WebView).
|
|
"""
|
|
|
|
def worker() -> None:
|
|
import time
|
|
|
|
time.sleep(max(0.0, delay_seconds))
|
|
try:
|
|
result = check_update_status(startup=True)
|
|
status = result.get("status")
|
|
_log(f"startup status={status} detail={result.get('detail')}")
|
|
if status != "update_available":
|
|
return
|
|
|
|
def prompt() -> None:
|
|
_handle_update_prompt(
|
|
result,
|
|
parent=parent,
|
|
use_native_dialog=use_native_dialog,
|
|
)
|
|
|
|
if schedule_on_main is not None:
|
|
schedule_on_main(prompt)
|
|
else:
|
|
prompt()
|
|
except Exception as exc:
|
|
_log(f"startup check skipped: {type(exc).__name__}")
|
|
log_update_check("startup_error", error=type(exc).__name__)
|
|
|
|
threading.Thread(
|
|
target=worker,
|
|
daemon=True,
|
|
name="aza-startup-update",
|
|
).start()
|
|
|
|
|
|
def check_updates_interactive(*, parent: tk.Misc | None = None) -> None:
|
|
"""Manuelle Pruefung — fuer Einstellungen/Controller."""
|
|
result = check_update_status(startup=False)
|
|
status = result.get("status")
|
|
if status == "update_available":
|
|
choice, disable_auto = _show_friendly_update_dialog(result, parent=parent)
|
|
if disable_auto:
|
|
set_auto_update_prompt_enabled(False)
|
|
if choice == "now":
|
|
_handle_update_prompt(result, parent=parent, use_native_dialog=False)
|
|
elif choice == "on_exit":
|
|
remote = result.get("remote") or {}
|
|
manifest_url = str(remote.get("_manifest_url") or "").strip()
|
|
save_update_pending(manifest_url, str(result.get("latest_version") or ""))
|
|
messagebox.showinfo("Update vorgemerkt",
|
|
"Das Update wird beim Beenden von AzA installiert.",
|
|
parent=parent)
|
|
return
|
|
if status == "current":
|
|
messagebox.showinfo(
|
|
"Update",
|
|
f"AZA ist aktuell ({format_version_label()}).",
|
|
parent=parent,
|
|
)
|
|
return
|
|
messagebox.showinfo(
|
|
"Update",
|
|
"Derzeit ist kein Update verfuegbar.",
|
|
parent=parent,
|
|
)
|
|
|
|
|
|
def show_app_info(*, parent: tk.Misc | None = None) -> None:
|
|
local = load_local_version()
|
|
messagebox.showinfo(
|
|
"AzA",
|
|
f"Version: {format_version_label(local)}\n\nAzA Medwork",
|
|
parent=parent,
|
|
)
|
|
|
|
|
|
def _read_marker(install_dir: Path) -> str:
|
|
p = install_dir / "app_marker.txt"
|
|
if p.is_file():
|
|
return p.read_text(encoding="utf-8").strip()
|
|
return ""
|
|
|
|
|
|
def _reset_test_install_dir(install_dir: Path) -> None:
|
|
install_dir.mkdir(parents=True, exist_ok=True)
|
|
backups = install_dir / "_update_backups"
|
|
if backups.is_dir():
|
|
shutil.rmtree(backups, ignore_errors=True)
|
|
(install_dir / "version.json").write_text(
|
|
json.dumps(
|
|
{
|
|
"version": "1.2.0",
|
|
"build": "TEST_OLD",
|
|
"channel": "stable",
|
|
"app": "AZA Desktop",
|
|
},
|
|
indent=2,
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
(install_dir / "app_marker.txt").write_text("OLD_VERSION\n", encoding="utf-8")
|
|
for name in (
|
|
"aza_controller.exe",
|
|
"aza_office.exe",
|
|
"aza_praxis_chat.exe",
|
|
"aza_updater.exe",
|
|
):
|
|
p = install_dir / name
|
|
if not p.is_file():
|
|
p.write_bytes(b"OLD_DUMMY_EXE\n")
|
|
|
|
|
|
def _build_test_package(project_root: Path) -> tuple[Path, Path, str, int]:
|
|
import zipfile
|
|
|
|
pkg_dir = project_root / "_updater_test_package"
|
|
zip_path = project_root / "_updater_test_package.zip"
|
|
if pkg_dir.is_dir():
|
|
shutil.rmtree(pkg_dir)
|
|
pkg_dir.mkdir(parents=True)
|
|
(pkg_dir / "version.json").write_text(
|
|
json.dumps(
|
|
{
|
|
"version": "9.9.9",
|
|
"build": "TEST_LOCAL",
|
|
"channel": "stable",
|
|
"app": "AZA Desktop",
|
|
},
|
|
indent=2,
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
(pkg_dir / "app_marker.txt").write_text("NEW_VERSION\n", encoding="utf-8")
|
|
for name in (
|
|
"aza_controller.exe",
|
|
"aza_office.exe",
|
|
"aza_praxis_chat.exe",
|
|
"aza_updater.exe",
|
|
):
|
|
(pkg_dir / name).write_bytes(b"NEW_DUMMY_EXE\n")
|
|
if zip_path.is_file():
|
|
zip_path.unlink()
|
|
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
for fp in pkg_dir.rglob("*"):
|
|
if fp.is_file():
|
|
zf.write(fp, fp.relative_to(pkg_dir).as_posix())
|
|
sha = compute_sha256(zip_path)
|
|
return pkg_dir, zip_path, sha, zip_path.stat().st_size
|
|
|
|
|
|
def _write_test_manifest(
|
|
project_root: Path,
|
|
zip_path: Path,
|
|
sha256: str,
|
|
size_bytes: int,
|
|
*,
|
|
bad_sha: str | None = None,
|
|
missing_sha: bool = False,
|
|
missing_zip: bool = False,
|
|
) -> Path:
|
|
from datetime import date
|
|
|
|
manifest_path = project_root / "test_update_manifest.json"
|
|
url = str(zip_path.resolve()) if not missing_zip else str(
|
|
(project_root / "_missing_update.zip").resolve()
|
|
)
|
|
entry = {
|
|
"name": zip_path.name,
|
|
"url": url,
|
|
"size_bytes": size_bytes,
|
|
}
|
|
if not missing_sha:
|
|
entry["sha256"] = bad_sha or sha256
|
|
payload = {
|
|
"product": "AZA Desktop",
|
|
"channel": "stable",
|
|
"latest_version": "9.9.9",
|
|
"latest_build": "TEST_LOCAL",
|
|
"min_supported_version": "1.0.0",
|
|
"mandatory": False,
|
|
"release_date": date.today().isoformat(),
|
|
"notes_de": ["Lokaler Updater-Test"],
|
|
"files": [entry],
|
|
}
|
|
manifest_path.write_text(
|
|
json.dumps(payload, indent=2, ensure_ascii=False) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
return manifest_path
|
|
|
|
|
|
def run_local_e2e_tests(project_root: Path | None = None) -> dict[str, Any]:
|
|
"""Automatisierter lokaler End-to-End-Test ohne GUI und ohne Projektroot."""
|
|
root = (project_root or get_project_root()).resolve()
|
|
install_dir = root / "_updater_test_install"
|
|
configure_test_install_dir(install_dir)
|
|
_reset_test_install_dir(install_dir)
|
|
|
|
_, zip_path, sha256, size_bytes = _build_test_package(root)
|
|
manifest_path = _write_test_manifest(root, zip_path, sha256, size_bytes)
|
|
|
|
report: dict[str, Any] = {
|
|
"install_dir": str(install_dir),
|
|
"zip_path": str(zip_path),
|
|
"manifest_path": str(manifest_path),
|
|
"sha256": sha256,
|
|
}
|
|
|
|
result = check_update_from_manifest(manifest_path)
|
|
report["update_detected"] = result.get("status") == "update_available"
|
|
|
|
ok, msg = perform_confirmed_update(result)
|
|
report["update_applied"] = ok
|
|
report["update_message"] = msg
|
|
report["marker_after_update"] = _read_marker(install_dir)
|
|
report["version_after_update"] = load_local_version().get("version")
|
|
backups = list_available_backups()
|
|
report["backup_created"] = bool(backups)
|
|
|
|
if backups:
|
|
rb_ok, rb_msg = rollback_from_backup(backups[0], install_dir)
|
|
report["rollback_tested"] = rb_ok
|
|
report["rollback_message"] = rb_msg
|
|
report["marker_after_rollback"] = _read_marker(install_dir)
|
|
report["version_after_rollback"] = load_local_version().get("version")
|
|
|
|
_reset_test_install_dir(install_dir)
|
|
bad_manifest = _write_test_manifest(
|
|
root, zip_path, sha256, size_bytes, bad_sha="0" * 64
|
|
)
|
|
bad_result = check_update_from_manifest(bad_manifest)
|
|
bad_ok, _ = perform_confirmed_update(bad_result)
|
|
report["bad_sha_blocked"] = (not bad_ok) and _read_marker(install_dir) == "OLD_VERSION"
|
|
|
|
_reset_test_install_dir(install_dir)
|
|
no_sha_manifest = _write_test_manifest(
|
|
root, zip_path, sha256, size_bytes, missing_sha=True
|
|
)
|
|
no_sha_result = check_update_from_manifest(no_sha_manifest)
|
|
no_sha_ready, _ = validate_update_install_ready(no_sha_result)
|
|
report["missing_sha_blocked"] = not no_sha_ready
|
|
|
|
_reset_test_install_dir(install_dir)
|
|
missing_zip_manifest = _write_test_manifest(
|
|
root, zip_path, sha256, size_bytes, missing_zip=True
|
|
)
|
|
missing_result = check_update_from_manifest(missing_zip_manifest)
|
|
missing_ok, _ = perform_confirmed_update(missing_result)
|
|
report["missing_zip_blocked"] = (not missing_ok) and _read_marker(install_dir) == "OLD_VERSION"
|
|
|
|
_reset_test_install_dir(install_dir)
|
|
later_result = check_update_from_manifest(manifest_path)
|
|
report["later_unchanged"] = (
|
|
later_result.get("status") == "update_available"
|
|
and _read_marker(install_dir) == "OLD_VERSION"
|
|
and load_local_version().get("version") == "1.2.0"
|
|
)
|
|
|
|
project_version_before = json.loads((root / "version.json").read_text(encoding="utf-8-sig"))
|
|
report["project_root_version"] = project_version_before.get("version")
|
|
report["sha256_verified"] = report.get("update_applied", False)
|
|
report["project_root_unchanged"] = (
|
|
json.loads((root / "version.json").read_text(encoding="utf-8-sig")).get("version")
|
|
== project_version_before.get("version")
|
|
)
|
|
|
|
configure_test_install_dir(None)
|
|
_write_test_manifest(root, zip_path, sha256, size_bytes)
|
|
return report
|
|
|
|
|
|
def _parse_cli() -> argparse.Namespace:
|
|
import argparse
|
|
|
|
p = argparse.ArgumentParser(description="AZA Desktop Updater")
|
|
p.add_argument(
|
|
"--e2e-local-test",
|
|
action="store_true",
|
|
help="Lokalen End-to-End-Test ausfuehren (ohne GUI)",
|
|
)
|
|
p.add_argument(
|
|
"--install-dir",
|
|
help="Test-Installationsordner (setzt AZA_UPDATE_TEST_INSTALL_DIR)",
|
|
)
|
|
p.add_argument(
|
|
"--manifest",
|
|
help="Manifest-URL oder lokaler Pfad (ueberschreibt Standard-Quellen)",
|
|
)
|
|
p.add_argument(
|
|
"--check-only",
|
|
action="store_true",
|
|
help="Nur pruefen und Ergebnis als JSON ausgeben (kein Dialog)",
|
|
)
|
|
p.add_argument(
|
|
"--install-now",
|
|
action="store_true",
|
|
help="Externer Installations-Modus mit Statusfenster",
|
|
)
|
|
p.add_argument(
|
|
"--target-dir",
|
|
help="Installationsordner fuer --install-now",
|
|
)
|
|
p.add_argument(
|
|
"--restart-exe",
|
|
default="aza_start_panel.exe",
|
|
help="Nach erfolgreichem Update zu startende EXE",
|
|
)
|
|
p.add_argument(
|
|
"--yes",
|
|
action="store_true",
|
|
help="Update ohne zusaetzlichen Dialog installieren (nur Testmodus)",
|
|
)
|
|
return p.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = _parse_cli()
|
|
_apply_cli_env(args.install_dir)
|
|
|
|
if args.install_now:
|
|
manifest = (args.manifest or os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip()
|
|
return _run_install_now_window(
|
|
manifest=manifest,
|
|
target_dir=args.target_dir,
|
|
restart_exe=args.restart_exe,
|
|
)
|
|
|
|
if args.e2e_local_test:
|
|
report = run_local_e2e_tests()
|
|
print(json.dumps(report, indent=2, ensure_ascii=False))
|
|
ok = all(
|
|
report.get(k)
|
|
for k in (
|
|
"update_detected",
|
|
"update_applied",
|
|
"backup_created",
|
|
"rollback_tested",
|
|
"bad_sha_blocked",
|
|
"missing_sha_blocked",
|
|
"missing_zip_blocked",
|
|
"later_unchanged",
|
|
"project_root_unchanged",
|
|
)
|
|
) and report.get("marker_after_update") == "NEW_VERSION" and report.get(
|
|
"marker_after_rollback"
|
|
) == "OLD_VERSION"
|
|
return 0 if ok else 1
|
|
|
|
manifest = (args.manifest or os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip() or None
|
|
|
|
if manifest and args.check_only:
|
|
result = check_update_from_manifest(manifest)
|
|
local = result.get("local") or load_local_version()
|
|
log_update_check(
|
|
"cli_check",
|
|
status=result.get("status"),
|
|
manifest_url=manifest,
|
|
local_version=local.get("version"),
|
|
)
|
|
print(json.dumps(result, indent=2, ensure_ascii=False, default=str))
|
|
return 0
|
|
|
|
if manifest and args.yes:
|
|
result = check_update_from_manifest(manifest)
|
|
ok, msg = perform_confirmed_update(result)
|
|
print(msg)
|
|
return 0 if ok else 1
|
|
|
|
if manifest:
|
|
result = check_update_from_manifest(manifest)
|
|
status = result.get("status")
|
|
owns = False
|
|
parent: tk.Misc | None = None
|
|
if not tk._default_root: # type: ignore[attr-defined]
|
|
parent = tk.Tk()
|
|
parent.withdraw()
|
|
owns = True
|
|
if status == "update_available":
|
|
if _show_friendly_update_dialog(result, parent=parent):
|
|
ok, msg = perform_confirmed_update(result, parent=parent)
|
|
messagebox.showinfo("Update", msg, parent=parent)
|
|
elif status == "current":
|
|
messagebox.showinfo(
|
|
"Update",
|
|
f"AZA ist aktuell ({format_version_label()}).",
|
|
parent=parent,
|
|
)
|
|
else:
|
|
messagebox.showinfo(
|
|
"Update",
|
|
f"Status: {status}\n{result.get('message', '')}",
|
|
parent=parent,
|
|
)
|
|
if owns and parent is not None:
|
|
parent.destroy()
|
|
return 0
|
|
|
|
owns = False
|
|
parent = None
|
|
if not tk._default_root: # type: ignore[attr-defined]
|
|
parent = tk.Tk()
|
|
parent.withdraw()
|
|
owns = True
|
|
check_updates_interactive(parent=parent)
|
|
if owns and parent is not None:
|
|
parent.destroy()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|