Files
aza/AzA march 2026/desktop_update_check.py
2026-06-10 22:55:03 +02:00

581 lines
19 KiB
Python

"""
AZA Desktop — Phase-1-Updatepruefung (nur Hinweis + Browser-Download).
Keine automatische Installation, kein Beenden der App, keine E-Mail.
Diagnose: client_debug.log (UPDATE_*), ohne Lizenz-/Token-Daten.
"""
from __future__ import annotations
import json
import os
import re
import webbrowser
from datetime import datetime, timezone
from typing import Any
import requests
from aza_config import get_writable_data_dir
from aza_version import APP_CHANNEL, APP_VERSION
from aza_update_core import save_update_pending
# Session-Schutz: Pro Prozesslauf darf „Später" die Anzeige nicht sofort wiederholen.
_update_dialog_shown_this_session: bool = False
# Server-Manifest (mehrere Endpunkte fuer Deploy-Kompatibilitaet).
UPDATE_MANIFEST_URLS = (
"https://api.aza-medwork.ch/download/version.json",
"https://api.aza-medwork.ch/release/version.json",
)
DEFAULT_DOWNLOAD_URL = "https://api.aza-medwork.ch/downloads/aza_desktop_setup.exe"
_BUILD_STAMP_RE = re.compile(r"^\d{8}_\d{6}$")
_OFFICE_UPDATE_HINT_FILE = "office_update_hint.json"
_OFFICE_UPDATE_REQUEST_FLAG = "office_update_request.flag"
def _office_update_ipc_path(name: str) -> str:
return os.path.join(get_writable_data_dir(), name)
def sync_office_update_hint(info: dict[str, Any] | None) -> None:
"""IPC fuer Empfang-Huelle: Office ist alleiniger Update-Owner (kein Installer in Huelle)."""
hint_path = _office_update_ipc_path(_OFFICE_UPDATE_HINT_FILE)
try:
if not info or not info.get("update_available"):
if os.path.isfile(hint_path):
os.remove(hint_path)
return
payload = {
"available": True,
"latest_version": str(info.get("latest_version") or "").strip(),
"update_level": str(info.get("update_level") or "recommended").strip(),
}
with open(hint_path, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False)
except Exception:
pass
def read_office_update_hint() -> dict[str, Any] | None:
try:
path = _office_update_ipc_path(_OFFICE_UPDATE_HINT_FILE)
if not os.path.isfile(path):
return None
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict) and data.get("available"):
return data
except Exception:
pass
return None
def request_office_update_dialog_ipc() -> None:
"""Huelle -> Office: Flag setzen; Office oeffnet Update-Dialog beim naechsten Poll."""
try:
path = _office_update_ipc_path(_OFFICE_UPDATE_REQUEST_FLAG)
with open(path, "w", encoding="utf-8") as f:
f.write("1")
except Exception:
pass
def consume_office_update_request_flag() -> bool:
try:
path = _office_update_ipc_path(_OFFICE_UPDATE_REQUEST_FLAG)
if not os.path.isfile(path):
return False
os.remove(path)
return True
except Exception:
return False
def _update_debug_log(msg: str) -> None:
try:
import os
path = os.path.join(get_writable_data_dir(), "client_debug.log")
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
with open(path, "a", encoding="utf-8") as f:
f.write(f"[{ts}] {msg}\n")
except Exception:
pass
def _local_build_stamp() -> str:
try:
from _build_info import BUILD_TIMESTAMP
s = str(BUILD_TIMESTAMP).strip()
return s if _BUILD_STAMP_RE.match(s) else ""
except Exception:
return ""
def _installed_current_version() -> str:
"""Kanonische installierte Version aus version.json (gleiche Quelle wie Startpanel/Updater).
Fallback auf APP_VERSION (Quellcode), falls keine version.json gefunden wird.
Bei Abweichung wird nur Pfad + Version geloggt (keine Tokens/Patientendaten),
damit eine falsche installierte Version diagnostizierbar ist.
"""
try:
from aza_update_core import load_local_version, find_local_version_file
info = load_local_version()
ver = str(info.get("version") or "").strip()
if ver:
if not _semver_eq(ver, APP_VERSION):
try:
vf = find_local_version_file()
_update_debug_log(
f"VERSION_SOURCE installed={ver} code={APP_VERSION} "
f"source={vf if vf else 'fallback_code'}"
)
except Exception:
pass
return ver
except Exception:
pass
return str(APP_VERSION).strip()
def _parse_semver_tuple(v: str) -> tuple[int, ...]:
"""Semver-artig: numerische Segmente, nicht lexikographisch ('1.10' > '1.2')."""
v = str(v).strip()
if not v:
return tuple()
parts: list[int] = []
for seg in v.split("."):
seg = seg.strip()
m = re.match(r"^(\d+)", seg)
parts.append(int(m.group(1)) if m else 0)
return tuple(parts)
def _semver_pad(
a: tuple[int, ...], b: tuple[int, ...]
) -> tuple[tuple[int, ...], tuple[int, ...]]:
n = max(len(a), len(b))
aa = a + (0,) * (n - len(a))
bb = b + (0,) * (n - len(b))
return aa, bb
def _semver_gt(ver_a: str, ver_b: str) -> bool:
ta, tb = _parse_semver_tuple(ver_a), _parse_semver_tuple(ver_b)
if not ta or not tb:
return False
aa, bb = _semver_pad(ta, tb)
return aa > bb
def _semver_eq(ver_a: str, ver_b: str) -> bool:
ta, tb = _parse_semver_tuple(ver_a), _parse_semver_tuple(ver_b)
if not ta and not tb:
return str(ver_a).strip() == str(ver_b).strip()
aa, bb = _semver_pad(ta, tb)
return aa == bb
def _below_min_required(current: str, min_req: str) -> bool:
mr = (min_req or "").strip()
if not mr:
return False
return _semver_gt(mr, current)
def _build_gt(remote: str, local: str) -> bool:
"""Tie-Breaker bei gleicher Versionsnummer (yyyyMMdd_HHmmss)."""
r, l = (remote or "").strip(), (local or "").strip()
if not r or not l:
return False
if _BUILD_STAMP_RE.match(r) and _BUILD_STAMP_RE.match(l):
return r > l
return r > l
def _normalize_notes(data: dict[str, Any]) -> list[str]:
raw = data.get("notes")
if raw is None:
raw = data.get("release_notes")
if raw is None:
return []
if isinstance(raw, str):
return [raw.strip()] if raw.strip() else []
if isinstance(raw, list):
return [str(x).strip() for x in raw if str(x).strip()]
return []
def _min_required_from_manifest(data: dict[str, Any]) -> str:
for key in ("min_required_version", "minimum_supported_version"):
v = data.get(key)
if v is not None and str(v).strip():
return str(v).strip()
return ""
def _normalize_update_level(raw: Any) -> str:
s = str(raw or "recommended").strip().lower()
if s in ("optional", "recommended", "required"):
return s
return "recommended"
def fetch_remote_manifest() -> tuple[dict[str, Any] | None, str | None]:
"""HTTP + JSON parsen; ungueltige Antworten nicht in die App durchreichen."""
last: str | None = None
for url in UPDATE_MANIFEST_URLS:
try:
r = requests.get(url, timeout=6)
if r.status_code != 200:
last = f"http_status={r.status_code} url={url}"
continue
try:
data = r.json()
except ValueError:
last = f"invalid_json url={url}"
continue
if not isinstance(data, dict):
last = f"invalid_manifest_shape url={url}"
continue
return data, None
except requests.RequestException as e:
last = f"request_exc={type(e).__name__} url={url}"
except Exception as e:
last = f"exc={type(e).__name__} url={url}"
return None, last or "no_manifest"
def _build_update_info(data: dict[str, Any]) -> dict[str, Any] | None:
latest = str(data.get("version") or "").strip()
# current_version kanonisch aus installierter version.json (Fallback APP_VERSION),
# damit Office und Startpanel exakt dieselbe installierte Version vergleichen.
current = _installed_current_version()
if not latest or not current:
return None
min_req = _min_required_from_manifest(data)
download_url = str(data.get("download_url") or "").strip() or DEFAULT_DOWNLOAD_URL
update_level = _normalize_update_level(data.get("update_level"))
notes = _normalize_notes(data)
remote_build = str(data.get("build") or "").strip()
sha256 = str(data.get("sha256") or "").strip()
local_build = _local_build_stamp()
below_min = _below_min_required(current, min_req)
newer_semver = _semver_gt(latest, current)
same_semver = _semver_eq(latest, current)
# build_bump ist fuer den stable-Kanal deaktiviert: gleiche Version = kein Update.
# Build-Nummern koennen bei Manifest-Rebuilds hoher sein als der installierte Build
# obwohl dieselbe Codeversion installiert ist. Das wuerde sonst eine Endlosschleife
# verursachen (1.3.5 204826 > 1.3.5 151416 → immer Update angeboten).
build_bump = False
client_ahead = _semver_gt(current, latest)
if client_ahead and not below_min:
return None
need = bool(below_min or newer_semver or build_bump)
if not need:
return None
return {
"update_available": True,
"latest_version": latest,
"current_version": current,
"local_build": local_build,
"download_url": download_url,
"remote_build": remote_build,
"sha256": sha256,
"update_level": update_level,
"min_required_version": min_req,
"notes": notes,
"below_min_required": below_min,
}
def check_for_updates() -> dict[str, Any] | None:
"""Kompatibler Einstieg: laedt Manifest und wertet aus (ohne UI, ohne Logging)."""
data, err = fetch_remote_manifest()
if err or not data:
sync_office_update_hint(None)
return None
if str(data.get("channel") or "stable").strip() != APP_CHANNEL:
sync_office_update_hint(None)
return None
info = _build_update_info(data)
sync_office_update_hint(info)
return info
def prompt_update_if_available() -> None:
"""Start: pruefen, bei Bedarf Dialog; Netzwerkfehler nur Debug-Log."""
urls = ",".join(UPDATE_MANIFEST_URLS)
_update_debug_log(f"UPDATE_CHECK_START mode=startup urls={urls}")
data, err = fetch_remote_manifest()
if err or not data:
_update_debug_log(f"UPDATE_CHECK_FAILED mode=startup reason=fetch detail={err}")
return
rv = str(data.get("version") or "")
rb = str(data.get("build") or "")
rc = str(data.get("channel") or "stable").strip()
_update_debug_log(
f"UPDATE_CHECK_OK mode=startup remote_version={rv} remote_build={rb} remote_channel={rc}"
)
if rc != APP_CHANNEL:
_update_debug_log(
f"UPDATE_NOT_NEEDED mode=startup reason=channel_mismatch local_channel={APP_CHANNEL} "
f"remote_channel={rc}"
)
return
try:
info = _build_update_info(data)
except Exception as e:
_update_debug_log(f"UPDATE_CHECK_FAILED mode=startup reason=evaluate exc={type(e).__name__}")
return
if not info:
_update_debug_log(
f"UPDATE_NOT_NEEDED mode=startup reason=already_current local_ver={APP_VERSION} "
f"local_build={_local_build_stamp()}"
)
return
if not _startup_should_show_dialog(info):
_update_debug_log(
f"UPDATE_NOT_NEEDED mode=startup reason=optional_deferred_startup "
f"level={info.get('update_level')}"
)
return
lvl = info.get("update_level")
bm = 1 if info.get("below_min_required") else 0
_update_debug_log(
f"UPDATE_AVAILABLE mode=startup level={lvl} remote={info.get('latest_version')} "
f"local={APP_VERSION} below_min_required={bm}"
)
try:
_show_update_notification(info, parent=None)
except Exception as e:
_update_debug_log(f"UPDATE_CHECK_FAILED mode=startup reason=ui exc={type(e).__name__}")
def _trigger_app_close(parent) -> None:
"""Schliesst das Hauptfenster sauber, damit _on_close() das Pending-Update aufgreift.
WICHTIG: _on_close() statt shutdown_app_completely() verwenden, weil _on_close()
als Einziges den Pending-Update-Check enthaelt und launch_external_installer startet.
shutdown_app_completely() bypassed _on_close() und wuerde den Updater nie starten.
"""
try:
if hasattr(parent, "_on_close"):
parent._on_close()
elif hasattr(parent, "shutdown_app_completely"):
parent.shutdown_app_completely(reason="update_on_exit")
else:
parent.destroy()
except Exception:
pass
def manual_check_for_updates(parent=None) -> None:
"""Manuell: gleiche Logik; Fehler nur Log; kein Popup bei 'aktuell' oder Fehler."""
urls = ",".join(UPDATE_MANIFEST_URLS)
_update_debug_log(f"UPDATE_CHECK_START mode=manual urls={urls}")
data, err = fetch_remote_manifest()
if err or not data:
_update_debug_log(f"UPDATE_CHECK_FAILED mode=manual reason=fetch detail={err}")
return
rv = str(data.get("version") or "")
rb = str(data.get("build") or "")
rc = str(data.get("channel") or "stable").strip()
_update_debug_log(
f"UPDATE_CHECK_OK mode=manual remote_version={rv} remote_build={rb} remote_channel={rc}"
)
if rc != APP_CHANNEL:
_update_debug_log(
f"UPDATE_NOT_NEEDED mode=manual reason=channel_mismatch local_channel={APP_CHANNEL} "
f"remote_channel={rc}"
)
return
try:
info = _build_update_info(data)
except Exception as e:
_update_debug_log(f"UPDATE_CHECK_FAILED mode=manual reason=evaluate exc={type(e).__name__}")
return
if not info:
_update_debug_log(
f"UPDATE_NOT_NEEDED mode=manual reason=already_current local_ver={APP_VERSION} "
f"local_build={_local_build_stamp()}"
)
return
lvl = info.get("update_level")
bm = 1 if info.get("below_min_required") else 0
_update_debug_log(
f"UPDATE_AVAILABLE mode=manual level={lvl} remote={info.get('latest_version')} "
f"local={APP_VERSION} below_min_required={bm}"
)
try:
_show_update_notification(info, parent=parent)
except Exception as e:
_update_debug_log(f"UPDATE_CHECK_FAILED mode=manual reason=ui exc={type(e).__name__}")
def _startup_should_show_dialog(info: dict[str, Any]) -> bool:
global _update_dialog_shown_this_session
if _update_dialog_shown_this_session:
return False
if info.get("below_min_required"):
return True
if info.get("update_level") == "required":
return True
if info.get("update_level") == "optional":
return False
return True
# Verhindert zwei gleichzeitig offene Update-Dialoge (Startup-Poll + manueller Button).
_update_dialog_open: bool = False
def _show_update_notification(info: dict[str, Any], parent) -> None:
"""Office-Update-Dialog.
Nutzt das professionelle Fenster aus aza_updater._show_update_dialog_professional
(EINZIGE Kopie, keine zweite Dialog-Implementierung) und verarbeitet die drei
Auswahlmoeglichkeiten now / on_exit / later. Owner ist ausschliesslich AzA Office.
"""
global _update_dialog_shown_this_session, _update_dialog_open
if _update_dialog_open:
return
_update_dialog_shown_this_session = True
latest = str(info.get("latest_version", "") or "")
current = str(info.get("current_version", "") or "")
notes = info.get("notes") or []
level = info.get("update_level", "recommended")
mandatory = bool(info.get("below_min_required") or level == "required")
# Adapter auf das Eingabeformat des professionellen Dialogs.
result_for_dialog = {
"local": {"version": current},
"latest_version": latest,
"notes": notes,
"mandatory": mandatory,
"below_min_supported": bool(info.get("below_min_required")),
}
_update_dialog_open = True
try:
from aza_updater import _show_update_dialog_professional
choice = _show_update_dialog_professional(result_for_dialog, parent=parent)
except Exception as exc:
_update_debug_log(f"UPDATE_DIALOG_PRO_FAILED exc={type(exc).__name__}")
choice = "later"
finally:
_update_dialog_open = False
if choice == "later":
# Kein Pending, kein Installer. Dialog erscheint in dieser Sitzung nicht
# erneut (Session-Guard); der orange Update-Button bleibt sichtbar.
_update_debug_log("UPDATE_DIALOG_CHOICE=later")
return
if choice == "on_exit":
# Genau ein Pending speichern. Office bleibt offen; _on_close() konsumiert
# den Auftrag einmalig (clear vor launch).
try:
save_update_pending(UPDATE_MANIFEST_URLS[0], latest)
_update_debug_log(f"UPDATE_DIALOG_CHOICE=on_exit version={latest}")
except Exception as exc:
_update_debug_log(f"UPDATE_PENDING_SAVE_FAILED exc={type(exc).__name__}")
return
# choice == "now": genau ein Installerstart + kontrollierter Office-Shutdown.
_update_debug_log(f"UPDATE_DIALOG_CHOICE=now version={latest}")
ok, msg = False, ""
try:
from aza_update_core import clear_update_pending
from aza_updater import launch_external_installer
# Kein doppeltes Pending fuer einen spaeteren zweiten Start.
clear_update_pending()
result = {
"remote": {"_manifest_url": UPDATE_MANIFEST_URLS[0]},
"latest_version": latest,
}
ok, msg = launch_external_installer(result, restart_exe="aza_start_panel.exe")
except Exception as exc:
_update_debug_log(f"UPDATE_LAUNCH_EXC exc={type(exc).__name__}")
ok, msg = False, str(exc)
if not ok:
try:
import tkinter.messagebox as _mb
_mb.showerror(
"Update",
f"Der Updater konnte nicht gestartet werden:\n{msg}\n\n"
"Bitte versuchen Sie es spaeter erneut.",
parent=parent,
)
except Exception:
pass
return
# Office kontrolliert schliessen; der Installer beendet die uebrigen
# AzA-Fenster (CloseApplications / _stop_aza_processes) und ist einziger
# Restart-Owner ueber [Run] postinstall.
if parent is not None:
try:
parent.after(300, lambda: _close_app_after_update(parent))
except Exception:
pass
def maybe_show_startup_update_dialog(info: dict[str, Any] | None, parent=None) -> None:
"""Einmaliger Startup-Dialog aus AzA Office (Owner).
Verwendet das bereits vom Update-Button-Poll geladene ``info`` (kein zweiter
Manifest-Fetch), respektiert den Session-Guard und die optional/required-Regeln.
"""
if not info:
return
if not _startup_should_show_dialog(info):
return
_show_update_notification(info, parent=parent)
def _close_app_after_update(parent) -> None:
"""Schliesst die App nach gestartetem Updater sauber."""
try:
if hasattr(parent, "shutdown_app_completely"):
parent.shutdown_app_completely(reason="update_on_exit")
else:
parent.destroy()
except Exception:
pass
if __name__ == "__main__":
d, e = fetch_remote_manifest()
print("fetch_err", e, "keys", list(d.keys()) if d else None)