Files
aza/AzA march 2026/aza_update_core.py

756 lines
24 KiB
Python
Raw Normal View History

2026-05-23 21:31:34 +02:00
# -*- coding: utf-8 -*-
"""
AZA Desktop gemeinsame Versions- und Update-Logik (Controller, Updater, Office).
Phase 1: Manifest lesen, Version vergleichen, SHA256, Backup/Rollback vorbereiten.
Kein stilles Auto-Update, keine Aenderung von Benutzerdaten in %%APPDATA%%\\AzA.
"""
from __future__ import annotations
import hashlib
import json
import os
import re
import shutil
import sys
import tempfile
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable
try:
import requests
except ImportError: # pragma: no cover
requests = None # type: ignore
# Neues Update-Manifest (Ziel) + bestehende Endpunkte (Kompatibilitaet).
UPDATE_CHANNEL_MANIFEST_URL = "https://api.aza-medwork.ch/downloads/updates/manifest.json"
UPDATE_MANIFEST_URLS = (
UPDATE_CHANNEL_MANIFEST_URL,
"https://api.aza-medwork.ch/download/version.json",
"https://api.aza-medwork.ch/release/version.json",
)
LOCAL_TEST_MANIFEST_NAME = "test_update_manifest.json"
_BUILD_STAMP_RE = re.compile(r"^\d{8}_\d{6}$")
_VERSION_JSON_NAME = "version.json"
_test_install_dir_override: Path | None = None
def configure_test_install_dir(path: str | Path | None) -> None:
"""Isolierter Test-Installationsordner — nie den Projektroot ueberschreiben."""
global _test_install_dir_override
if path is None or not str(path).strip():
_test_install_dir_override = None
return
_test_install_dir_override = Path(path).resolve()
def get_test_install_dir() -> Path | None:
if _test_install_dir_override is not None:
return _test_install_dir_override
env = (os.environ.get("AZA_UPDATE_TEST_INSTALL_DIR") or "").strip()
if env:
return Path(env).resolve()
return None
def get_install_dir() -> Path:
test_dir = get_test_install_dir()
if test_dir is not None:
return test_dir
return get_project_root()
def get_project_root() -> Path:
"""Installations- bzw. Projektroot (EXE-Verzeichnis oder Skriptordner)."""
if getattr(sys, "frozen", False):
return Path(sys.executable).resolve().parent
return Path(__file__).resolve().parent
def get_user_data_root() -> Path:
"""Benutzerdaten — darf bei Updates nie geloescht werden."""
appdata = os.environ.get("APPDATA") or os.environ.get("LOCALAPPDATA") or ""
if appdata:
return Path(appdata) / "AzA"
return Path.home() / "AppData" / "Roaming" / "AzA"
def get_update_backup_root() -> Path:
"""Rollback-Speicher: im Testmodus unter dem Test-Installationsordner."""
test_dir = get_test_install_dir()
if test_dir is not None:
base = test_dir / "_update_backups"
try:
base.mkdir(parents=True, exist_ok=True)
except OSError:
pass
return base
program_data = os.environ.get("PROGRAMDATA") or ""
if program_data:
base = Path(program_data) / "AzA" / "update_backups"
else:
base = get_user_data_root() / "update_backups"
try:
base.mkdir(parents=True, exist_ok=True)
except OSError:
pass
return base
def find_local_version_file() -> Path | None:
root = get_install_dir()
direct = root / _VERSION_JSON_NAME
if direct.is_file():
return direct
if get_test_install_dir() is not None:
return None
release = get_project_root() / "release" / _VERSION_JSON_NAME
if release.is_file():
return release
return None
def _parse_semver_tuple(v: str) -> tuple[int, ...]:
v = str(v or "").strip()
if not v:
return tuple()
parts: list[int] = []
for seg in v.split("."):
seg = seg.strip()
m = re.match(r"^(\d+)", seg)
parts.append(int(m.group(1)) if m else 0)
return tuple(parts)
def _semver_pad(a: tuple[int, ...], b: tuple[int, ...]) -> tuple[tuple[int, ...], tuple[int, ...]]:
n = max(len(a), len(b))
return a + (0,) * (n - len(a)), b + (0,) * (n - len(b))
def semver_gt(ver_a: str, ver_b: str) -> bool:
ta, tb = _parse_semver_tuple(ver_a), _parse_semver_tuple(ver_b)
if not ta or not tb:
return False
aa, bb = _semver_pad(ta, tb)
return aa > bb
def semver_eq(ver_a: str, ver_b: str) -> bool:
ta, tb = _parse_semver_tuple(ver_a), _parse_semver_tuple(ver_b)
if not ta and not tb:
return str(ver_a).strip() == str(ver_b).strip()
aa, bb = _semver_pad(ta, tb)
return aa == bb
def build_gt(remote: str, local: str) -> bool:
r, l = (remote or "").strip(), (local or "").strip()
if not r or not l:
return False
if _BUILD_STAMP_RE.match(r) and _BUILD_STAMP_RE.match(l):
return r > l
return r > l
def _fallback_version_from_code() -> dict[str, Any]:
version = "0.0.0"
channel = "stable"
build = ""
try:
from aza_version import APP_VERSION, APP_CHANNEL
version = str(APP_VERSION).strip() or version
channel = str(APP_CHANNEL).strip() or channel
except Exception:
pass
if not build:
try:
from _build_info import BUILD_TIMESTAMP
build = str(BUILD_TIMESTAMP or "").strip()
except Exception:
pass
return {
"version": version,
"build": build,
"channel": channel,
"app": "AZA Desktop",
}
def load_local_version() -> dict[str, Any]:
"""Liest version.json; Fallback aza_version.py + _build_info.py."""
vf = find_local_version_file()
if vf is not None:
try:
data = json.loads(vf.read_text(encoding="utf-8"))
if isinstance(data, dict):
if get_test_install_dir() is not None:
base = {
"version": "0.0.0",
"build": "",
"channel": "stable",
"app": "AZA Desktop",
}
base.update({k: v for k, v in data.items() if v is not None})
return base
out = _fallback_version_from_code()
out.update({k: v for k, v in data.items() if v is not None})
return out
except Exception:
pass
if get_test_install_dir() is not None:
return {
"version": "0.0.0",
"build": "",
"channel": "stable",
"app": "AZA Desktop",
}
return _fallback_version_from_code()
def save_local_version(data: dict[str, Any]) -> Path:
"""Schreibt version.json neben der Installation."""
root = get_install_dir()
target = root / _VERSION_JSON_NAME
payload = dict(load_local_version())
payload.update(data)
target.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
return target
def format_version_label(data: dict[str, Any] | None = None) -> str:
info = data or load_local_version()
ver = str(info.get("version") or "?").strip()
build = str(info.get("build") or "").strip()
if build:
return f"v{ver} · {build}"
return f"v{ver}"
def _normalize_notes(data: dict[str, Any]) -> list[str]:
raw = data.get("notes_de")
if raw is None:
raw = data.get("notes")
if raw is None:
raw = data.get("release_notes")
if isinstance(raw, str):
return [raw.strip()] if raw.strip() else []
if isinstance(raw, list):
return [str(x).strip() for x in raw if str(x).strip()]
return []
def _remote_version_fields(data: dict[str, Any]) -> tuple[str, str, str]:
version = str(
data.get("latest_version") or data.get("version") or ""
).strip()
build = str(data.get("latest_build") or data.get("build") or "").strip()
channel = str(data.get("channel") or "stable").strip()
return version, build, channel
def _min_required(data: dict[str, Any]) -> str:
for key in ("min_supported_version", "minimum_supported_version", "min_required_version"):
v = data.get(key)
if v is not None and str(v).strip():
return str(v).strip()
return ""
def _log_internal(msg: str) -> None:
print(f"[AZA Update] {msg}")
def resolve_startup_manifest_sources() -> list[str]:
"""
Quellen fuer die Start-Updatepruefung (kein Legacy-Installer-Manifest).
Prioritaet:
1) AZA_UPDATE_MANIFEST_URL (URL oder lokaler Pfad)
2) test_update_manifest.json bei AZA_UPDATE_TEST_MANIFEST=1
3) oeffentlicher Update-Kanal /downloads/updates/manifest.json
"""
sources: list[str] = []
env_url = (os.environ.get("AZA_UPDATE_MANIFEST_URL") or "").strip()
if env_url:
sources.append(env_url)
use_test = (os.environ.get("AZA_UPDATE_TEST_MANIFEST") or "").strip().lower() in (
"1",
"true",
"yes",
)
test_path = get_project_root() / LOCAL_TEST_MANIFEST_NAME
if use_test and test_path.is_file():
sources.append(str(test_path.resolve()))
if UPDATE_CHANNEL_MANIFEST_URL not in sources:
sources.append(UPDATE_CHANNEL_MANIFEST_URL)
return sources
def _load_manifest_json_text(text: str, *, source: str) -> dict[str, Any] | None:
try:
data = json.loads(text)
except ValueError:
_log_internal(f"invalid_json source={source}")
return None
if not isinstance(data, dict):
_log_internal(f"invalid_manifest_shape source={source}")
return None
data["_manifest_url"] = source
return data
def load_manifest_from_source(source: str) -> tuple[dict[str, Any] | None, str | None]:
"""Laedt Manifest von HTTP(S)-URL oder lokaler Datei."""
src = (source or "").strip()
if not src:
return None, "empty_source"
if src.lower().startswith("file://"):
src = src[7:]
if sys.platform == "win32" and src.startswith("/") and len(src) > 2 and src[2] == ":":
src = src[1:]
path = Path(src)
if path.is_file() or (not src.lower().startswith("http") and path.exists()):
try:
text = path.read_text(encoding="utf-8-sig")
data = _load_manifest_json_text(text, source=str(path.resolve()))
return (data, None) if data else (None, "invalid_json")
except OSError as exc:
return None, f"file_read={type(exc).__name__}"
if requests is None:
return None, "requests_not_installed"
try:
r = requests.get(src, timeout=8.0)
if r.status_code == 404:
return None, f"http_status=404 url={src}"
if r.status_code != 200:
return None, f"http_status={r.status_code} url={src}"
text = r.content.decode("utf-8-sig")
data = _load_manifest_json_text(text, source=src)
return (data, None) if data else (None, "invalid_json")
except Exception as exc:
return None, f"exc={type(exc).__name__} url={src}"
def fetch_startup_manifest(
*,
sources: list[str] | None = None,
) -> tuple[dict[str, Any] | None, str | None]:
"""Nur Update-Kanal — kein Fallback auf Installer-version.json."""
last: str | None = None
for source in sources or resolve_startup_manifest_sources():
data, err = load_manifest_from_source(source)
if data:
return data, None
last = err
if err and "404" in err:
_log_internal(f"Update manifest not available ({err})")
return None, last or "no_manifest"
def _update_files_from_manifest(data: dict[str, Any]) -> list[dict[str, Any]]:
files = data.get("files")
if isinstance(files, list) and files:
out: list[dict[str, Any]] = []
for item in files:
if isinstance(item, dict) and item.get("url"):
out.append(dict(item))
return out
download_url = str(data.get("download_url") or "").strip()
if download_url:
return [
{
"name": Path(download_url).name or "aza_desktop_setup.exe",
"url": download_url,
"sha256": str(data.get("sha256") or "").strip(),
"size_bytes": data.get("size_bytes"),
}
]
return []
def fetch_remote_manifest(
*,
timeout: float = 8.0,
urls: tuple[str, ...] | None = None,
) -> tuple[dict[str, Any] | None, str | None]:
"""Laedt Manifest (inkl. Legacy-Installer-Endpunkte)."""
last: str | None = None
for url in urls or UPDATE_MANIFEST_URLS:
data, err = load_manifest_from_source(url)
if data:
return data, None
last = err
return None, last or "no_manifest"
def check_update_from_manifest(manifest_source: str | Path) -> dict[str, Any]:
"""Prueft Update anhand eines expliziten Manifests (lokal oder URL)."""
local = load_local_version()
remote, err = load_manifest_from_source(str(manifest_source))
if err or not remote:
return {
"status": "error",
"message": "Manifest nicht lesbar.",
"detail": err,
"local": local,
}
return evaluate_update(local, remote)
def check_update_for_startup() -> dict[str, Any]:
"""
Start-Updatepruefung fuer das schoene Startpanel.
status:
- manifest_unavailable (404/kein Netz still fuer Benutzer)
- current
- update_available
- channel_mismatch
- error
"""
local = load_local_version()
remote, err = fetch_startup_manifest()
if err or not remote:
silent = bool(
err
and (
"404" in err
or err in ("no_manifest", "requests_not_installed")
or err.startswith("exc=")
)
)
return {
"status": "manifest_unavailable" if silent else "error",
"message": "Update manifest not available" if silent else "Manifest nicht erreichbar.",
"detail": err,
"local": local,
}
result = evaluate_update(local, remote)
result["detail"] = err
return result
def validate_update_install_ready(result: dict[str, Any]) -> tuple[bool, str]:
"""Prueft alle Voraussetzungen vor einer Installation."""
if result.get("status") != "update_available":
return False, "Kein Update verfuegbar."
files = result.get("files") or []
if not files:
return False, "Kein Update-Paket im Manifest."
file_info = files[0]
url = str(file_info.get("url") or "").strip()
if not url:
return False, "Download-URL fehlt."
sha = str(file_info.get("sha256") or "").strip()
if not sha:
return False, "SHA256 fehlt im Manifest."
latest = str(result.get("latest_version") or "").strip()
if not latest:
return False, "Versionsangabe fehlt."
name = str(file_info.get("name") or Path(url).name or "").strip()
if name.lower().endswith(".zip"):
return True, "ok"
if name.lower().endswith(".exe"):
return True, "ok"
return False, "Unbekanntes Update-Format."
def evaluate_update(
local: dict[str, Any] | None = None,
remote: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Vergleicht lokal vs. remote.
status:
- current
- update_available
- below_min_supported
- channel_mismatch
- error
"""
loc = local or load_local_version()
if remote is None:
return {
"status": "error",
"message": "Kein Remote-Manifest",
"local": loc,
}
local_ver = str(loc.get("version") or "").strip()
local_build = str(loc.get("build") or "").strip()
local_channel = str(loc.get("channel") or "stable").strip()
remote_ver, remote_build, remote_channel = _remote_version_fields(remote)
if remote_channel != local_channel:
return {
"status": "channel_mismatch",
"message": f"Kanal lokal={local_channel}, remote={remote_channel}",
"local": loc,
"remote": remote,
}
min_req = _min_required(remote)
below_min = bool(min_req and semver_gt(min_req, local_ver))
newer_semver = semver_gt(remote_ver, local_ver)
same_semver = semver_eq(remote_ver, local_ver)
build_bump = bool(
same_semver and remote_build and local_build and build_gt(remote_build, local_build)
)
client_ahead = semver_gt(local_ver, remote_ver)
if client_ahead and not below_min:
return {
"status": "current",
"message": "Lokal ist aktuell oder neuer.",
"local": loc,
"remote": remote,
}
if below_min or newer_semver or build_bump:
files = _update_files_from_manifest(remote)
return {
"status": "update_available",
"message": "Update verfuegbar.",
"local": loc,
"remote": remote,
"latest_version": remote_ver,
"latest_build": remote_build,
"below_min_supported": below_min,
"mandatory": bool(remote.get("mandatory")) or below_min,
"notes": _normalize_notes(remote),
"files": files,
"manifest_url": remote.get("_manifest_url"),
}
return {
"status": "current",
"message": "Kein Update noetig.",
"local": loc,
"remote": remote,
}
def compute_sha256(path: Path, *, chunk_size: int = 1024 * 1024) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
return h.hexdigest().upper()
def verify_sha256(path: Path, expected: str) -> bool:
exp = str(expected or "").strip().upper()
if not exp or not path.is_file():
return False
return compute_sha256(path) == exp
def _resolve_package_source(url: str) -> Path | None:
src = (url or "").strip()
if not src:
return None
if src.lower().startswith("file://"):
src = src[7:]
if sys.platform == "win32" and src.startswith("/") and len(src) > 2 and src[2] == ":":
src = src[1:]
path = Path(src)
if path.is_file():
return path.resolve()
if not src.lower().startswith("http") and path.exists():
return path.resolve()
return None
def download_file(
url: str,
dest: Path,
*,
timeout: float = 120.0,
progress: Callable[[int, int | None], None] | None = None,
) -> tuple[bool, str]:
"""Laedt eine Datei herunter. Keine Installation."""
if requests is None:
return False, "requests_not_installed"
dest.parent.mkdir(parents=True, exist_ok=True)
tmp = dest.with_suffix(dest.suffix + ".part")
try:
with requests.get(url, stream=True, timeout=timeout) as r:
if r.status_code != 200:
return False, f"http_status={r.status_code}"
total = int(r.headers.get("Content-Length") or 0) or None
done = 0
with tmp.open("wb") as f:
for chunk in r.iter_content(chunk_size=1024 * 256):
if not chunk:
continue
f.write(chunk)
done += len(chunk)
if progress:
progress(done, total)
tmp.replace(dest)
return True, "ok"
except Exception as exc:
try:
if tmp.is_file():
tmp.unlink()
except OSError:
pass
return False, f"{type(exc).__name__}: {exc}"
def create_pre_update_backup(
install_dir: Path | None = None,
*,
extra_names: tuple[str, ...] = (
"aza_desktop.exe",
"aza_controller.exe",
"aza_office.exe",
"aza_praxis_chat.exe",
"aza_updater.exe",
"AZA_EmpfangShell.exe",
"version.json",
"BUILD_INFO.txt",
),
) -> Path:
"""
Sichert EXEs und version.json vor einem Update.
Benutzerdaten in APPDATA werden nicht angetastet.
"""
src = install_dir or get_install_dir()
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = get_update_backup_root() / stamp
backup_dir.mkdir(parents=True, exist_ok=True)
copied: list[str] = []
names = list(extra_names) + ["app_marker.txt"]
for name in names:
p = src / name
if p.is_file():
shutil.copy2(p, backup_dir / name)
copied.append(name)
runtime = src / "runtime"
if runtime.is_dir():
shutil.copytree(runtime, backup_dir / "runtime", dirs_exist_ok=True)
copied.append("runtime/")
meta = {
"created_at": datetime.now(timezone.utc).isoformat(),
"install_dir": str(src),
"copied": copied,
}
(backup_dir / "backup_meta.json").write_text(
json.dumps(meta, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
return backup_dir
def rollback_from_backup(
backup_dir: Path,
install_dir: Path | None = None,
) -> tuple[bool, str]:
"""Stellt gesicherte Programmdateien wieder her."""
if not backup_dir.is_dir():
return False, "Backup-Ordner nicht gefunden."
dst = install_dir or get_install_dir()
restored: list[str] = []
for item in backup_dir.iterdir():
if item.name == "backup_meta.json":
continue
target = dst / item.name
try:
if item.is_dir():
if target.exists():
shutil.rmtree(target)
shutil.copytree(item, target)
elif item.is_file():
shutil.copy2(item, target)
restored.append(item.name)
except Exception as exc:
return False, f"Rollback fehlgeschlagen bei {item.name}: {exc}"
return True, f"Wiederhergestellt: {', '.join(restored)}"
def list_available_backups() -> list[Path]:
root = get_update_backup_root()
if not root.is_dir():
return []
dirs = [p for p in root.iterdir() if p.is_dir()]
return sorted(dirs, reverse=True)
def extract_update_zip(zip_path: Path, target_dir: Path) -> tuple[bool, str]:
"""Entpackt ein Update-ZIP in das Installationsverzeichnis (ohne APPDATA)."""
if not zip_path.is_file():
return False, "ZIP nicht gefunden."
try:
with zipfile.ZipFile(zip_path, "r") as zf:
for member in zf.namelist():
if member.startswith("/") or ".." in Path(member).parts:
return False, f"Unsicherer ZIP-Eintrag: {member}"
zf.extractall(target_dir)
return True, "ok"
except Exception as exc:
return False, f"{type(exc).__name__}: {exc}"
def download_update_package(
file_info: dict[str, Any],
*,
work_dir: Path | None = None,
progress: Callable[[int, int | None], None] | None = None,
) -> tuple[Path | None, str]:
"""Laedt ein Update-Paket herunter und prueft SHA256."""
url = str(file_info.get("url") or "").strip()
if not url:
return None, "Keine Download-URL."
name = str(file_info.get("name") or Path(url).name or "aza_update.zip")
base = work_dir or Path(tempfile.gettempdir()) / "aza_updates"
base.mkdir(parents=True, exist_ok=True)
dest = base / name
local_src = _resolve_package_source(url)
if local_src is not None:
try:
shutil.copy2(local_src, dest)
ok, msg = True, "ok"
except Exception as exc:
return None, f"{type(exc).__name__}: {exc}"
elif not url.lower().startswith("http"):
return None, "Paket nicht gefunden."
else:
ok, msg = download_file(url, dest, progress=progress)
if not ok:
return None, msg
expected = str(file_info.get("sha256") or "").strip()
if not expected:
return None, "SHA256 fehlt — Download abgebrochen."
if not verify_sha256(dest, expected):
try:
dest.unlink()
except OSError:
pass
return None, "SHA256 stimmt nicht ueberein — Download verworfen."
return dest, "ok"