Files
aza/AzA march 2026/aza_backup.py

490 lines
17 KiB
Python
Raw Permalink Normal View History

2026-03-25 22:03:39 +01:00
# -*- coding: utf-8 -*-
"""
AZA MedWork Automatisiertes Backup & Restore.
Erstellt versionierte, verschlüsselte Backups aller Praxisdaten.
Konfiguration über Umgebungsvariablen:
AZA_BACKUP_DIR Zielverzeichnis (Standard: ./backups)
AZA_BACKUP_KEEP_DAYS Aufbewahrungsdauer in Tagen (Standard: 90)
AZA_BACKUP_PASSWORD Passwort für ZIP-Verschlüsselung (optional)
"""
import os
import sys
import json
import shutil
import zipfile
import hashlib
import time
from datetime import datetime, timedelta
from pathlib import Path
_BASE_DIR = Path(__file__).resolve().parent
_BACKUP_DIR = Path(os.getenv("AZA_BACKUP_DIR", str(_BASE_DIR / "backups")))
_KEEP_DAYS = int(os.getenv("AZA_BACKUP_KEEP_DAYS", "90"))
_MEDICAL_JSON_FILES = [
"kg_diktat_user_profile.json",
"kg_diktat_todos.json",
"kg_diktat_todo_inbox.json",
"kg_diktat_notes.json",
"kg_diktat_checklists.json",
"kg_diktat_korrekturen.json",
"kg_diktat_textbloecke.json",
"kg_diktat_autotext.json",
"kg_diktat_soap_presets.json",
"kg_diktat_soap_order.json",
"kg_diktat_soap_visibility.json",
"kg_diktat_brief_presets.json",
"kg_diktat_medwork_contacts.json",
"aza_email_contacts.json",
"aza_medwork_messages.json",
"medwork_backup.json",
"kg_diktat_cloud_sync.json",
]
_CONFIG_FILES = [
"kg_diktat_config.txt",
"kg_diktat_signature.txt",
"kg_diktat_arztbrief_vorlage.txt",
"kg_diktat_op_bericht_template.txt",
"kg_diktat_todo_settings.json",
"aza_email_config.json",
"aza_docapp_config.json",
"translate_config.json",
"aza_whatsapp_config.json",
"text_font_sizes.json",
"paned_positions.json",
"kg_diktat_button_heat.json",
]
_UI_STATE_FILES = [
"kg_diktat_window.txt",
"kg_diktat_todo_window.txt",
"kg_diktat_pruefen_window.txt",
"kg_diktat_ordner_window.txt",
"kg_diktat_text_window.txt",
"kg_diktat_diktat_window.txt",
"kg_diktat_notizen_geometry.txt",
"kg_diktat_arbeitsplan_geometry.txt",
"kg_diktat_brief_vorlage_geometry.txt",
"kg_diktat_opacity.txt",
"kg_diktat_token_usage.txt",
]
_ABLAGE_DIR = "kg_diktat_ablage"
_LERNMODUS_DIR = "Lernmodus_Export"
_WP_DB_FILE = "workforce_planner.db"
_SENSITIVE_PATTERNS = [
"password", "secret", "token", "api_key", "anon_key",
]
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def _is_sensitive_content(filepath: Path) -> bool:
"""Prüft ob eine Datei sensible Daten enthält (für Manifest-Markierung)."""
name = filepath.name.lower()
return any(p in name for p in ("user_profile", "email_config", "contact"))
def create_backup(label: str = "") -> Path:
"""Erstellt ein vollständiges Backup als ZIP-Archiv.
Returns: Pfad zum erstellten Backup-Archiv.
"""
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
backup_name = f"aza_backup_{timestamp}"
if label:
backup_name += f"_{label}"
_BACKUP_DIR.mkdir(parents=True, exist_ok=True)
staging = _BACKUP_DIR / f".staging_{backup_name}"
staging.mkdir(parents=True, exist_ok=True)
manifest = {
"backup_version": 1,
"created_at": datetime.now().isoformat(),
"label": label,
"source_dir": str(_BASE_DIR),
"hostname": os.environ.get("COMPUTERNAME", os.environ.get("HOSTNAME", "unknown")),
"files": [],
}
copied = 0
for filename in _MEDICAL_JSON_FILES + _CONFIG_FILES + _UI_STATE_FILES:
src = _BASE_DIR / filename
if src.exists():
dst_dir = staging / "data"
dst_dir.mkdir(exist_ok=True)
dst = dst_dir / filename
shutil.copy2(src, dst)
manifest["files"].append({
"path": f"data/{filename}",
"category": "medical" if filename in _MEDICAL_JSON_FILES else
"config" if filename in _CONFIG_FILES else "ui_state",
"size": src.stat().st_size,
"sha256": _sha256_file(src),
"sensitive": _is_sensitive_content(src),
})
copied += 1
ablage_src = _BASE_DIR / _ABLAGE_DIR
if ablage_src.exists() and ablage_src.is_dir():
ablage_dst = staging / "data" / _ABLAGE_DIR
shutil.copytree(ablage_src, ablage_dst, dirs_exist_ok=True)
for root, _dirs, files in os.walk(ablage_dst):
for fname in files:
fp = Path(root) / fname
rel = fp.relative_to(staging)
manifest["files"].append({
"path": str(rel).replace("\\", "/"),
"category": "medical_documents",
"size": fp.stat().st_size,
"sha256": _sha256_file(fp),
"sensitive": True,
})
copied += 1
lern_src = _BASE_DIR / _LERNMODUS_DIR
if lern_src.exists() and lern_src.is_dir():
lern_dst = staging / "data" / _LERNMODUS_DIR
shutil.copytree(lern_src, lern_dst, dirs_exist_ok=True)
for root, _dirs, files in os.walk(lern_dst):
for fname in files:
fp = Path(root) / fname
rel = fp.relative_to(staging)
manifest["files"].append({
"path": str(rel).replace("\\", "/"),
"category": "learning",
"size": fp.stat().st_size,
"sha256": _sha256_file(fp),
"sensitive": False,
})
copied += 1
wp_db = _BASE_DIR / _WP_DB_FILE
if wp_db.exists():
db_dir = staging / "data"
db_dir.mkdir(exist_ok=True)
shutil.copy2(wp_db, db_dir / _WP_DB_FILE)
manifest["files"].append({
"path": f"data/{_WP_DB_FILE}",
"category": "database",
"size": wp_db.stat().st_size,
"sha256": _sha256_file(wp_db),
"sensitive": True,
})
copied += 1
manifest["total_files"] = copied
manifest_path = staging / "manifest.json"
with open(manifest_path, "w", encoding="utf-8") as f:
json.dump(manifest, f, ensure_ascii=False, indent=2)
zip_path = _BACKUP_DIR / f"{backup_name}.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
for root, _dirs, files in os.walk(staging):
for fname in files:
fp = Path(root) / fname
arcname = fp.relative_to(staging)
zf.write(fp, arcname)
shutil.rmtree(staging)
size_mb = zip_path.stat().st_size / (1024 * 1024)
print(f"BACKUP ERSTELLT: {zip_path}")
print(f" Dateien: {copied}")
print(f" Groesse: {size_mb:.2f} MB")
print(f" Zeitstempel: {timestamp}")
return zip_path
def verify_backup(zip_path: Path) -> bool:
"""Prüft die Integrität eines Backup-Archivs anhand der SHA-256-Checksummen."""
zip_path = Path(zip_path)
if not zip_path.exists():
print(f"FEHLER: Backup nicht gefunden: {zip_path}", file=sys.stderr)
return False
with zipfile.ZipFile(zip_path, "r") as zf:
bad = zf.testzip()
if bad:
print(f"FEHLER: Korrupte Datei im Archiv: {bad}", file=sys.stderr)
return False
try:
manifest_data = zf.read("manifest.json")
manifest = json.loads(manifest_data)
except (KeyError, json.JSONDecodeError) as e:
print(f"FEHLER: Manifest nicht lesbar: {e}", file=sys.stderr)
return False
errors = 0
with zipfile.ZipFile(zip_path, "r") as zf:
for entry in manifest.get("files", []):
fpath = entry["path"]
expected_hash = entry.get("sha256", "")
try:
data = zf.read(fpath)
actual_hash = hashlib.sha256(data).hexdigest()
if actual_hash != expected_hash:
print(f" HASH MISMATCH: {fpath}", file=sys.stderr)
errors += 1
except KeyError:
print(f" DATEI FEHLT: {fpath}", file=sys.stderr)
errors += 1
if errors == 0:
print(f"BACKUP VERIFIZIERT: {zip_path} ({manifest.get('total_files', '?')} Dateien, OK)")
return True
else:
print(f"BACKUP FEHLERHAFT: {errors} Fehler in {zip_path}", file=sys.stderr)
return False
def restore_backup(zip_path: Path, target_dir: Path = None, dry_run: bool = False) -> bool:
"""Stellt ein Backup wieder her.
Args:
zip_path: Pfad zum Backup-Archiv.
target_dir: Zielverzeichnis (Standard: Originalverzeichnis aus Manifest).
dry_run: Wenn True, wird nur geprüft aber nicht wiederhergestellt.
"""
zip_path = Path(zip_path)
if not zip_path.exists():
print(f"FEHLER: Backup nicht gefunden: {zip_path}", file=sys.stderr)
return False
if not verify_backup(zip_path):
print("FEHLER: Backup-Verifikation fehlgeschlagen. Restore abgebrochen.", file=sys.stderr)
return False
with zipfile.ZipFile(zip_path, "r") as zf:
manifest = json.loads(zf.read("manifest.json"))
if target_dir is None:
target_dir = Path(manifest.get("source_dir", str(_BASE_DIR)))
target_dir = Path(target_dir)
if dry_run:
print(f"DRY-RUN: Restore von {zip_path}")
print(f" Ziel: {target_dir}")
print(f" Dateien: {manifest.get('total_files', '?')}")
for entry in manifest.get("files", []):
fpath = entry["path"]
dest = target_dir / fpath.replace("data/", "", 1)
exists = dest.exists()
print(f" {'UEBERSCHREIBEN' if exists else 'NEU'}: {dest}")
return True
pre_restore_dir = _BACKUP_DIR / f".pre_restore_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
pre_restore_dir.mkdir(parents=True, exist_ok=True)
restored = 0
for entry in manifest.get("files", []):
fpath = entry["path"]
dest_rel = fpath.replace("data/", "", 1)
dest = target_dir / dest_rel
if dest.exists():
pre_dest = pre_restore_dir / dest_rel
pre_dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(dest, pre_dest)
dest.parent.mkdir(parents=True, exist_ok=True)
data = zf.read(fpath)
with open(dest, "wb") as f:
f.write(data)
restored += 1
print(f"RESTORE ABGESCHLOSSEN: {restored} Dateien wiederhergestellt")
print(f" Quelle: {zip_path}")
print(f" Ziel: {target_dir}")
print(f" Pre-Restore-Sicherung: {pre_restore_dir}")
return True
def cleanup_old_backups():
"""Entfernt Backups, die älter als AZA_BACKUP_KEEP_DAYS sind."""
if not _BACKUP_DIR.exists():
return
cutoff = time.time() - (_KEEP_DAYS * 86400)
removed = 0
for f in _BACKUP_DIR.glob("aza_backup_*.zip"):
if f.stat().st_mtime < cutoff:
f.unlink()
print(f"ENTFERNT: {f.name} (aelter als {_KEEP_DAYS} Tage)")
removed += 1
if removed:
print(f"CLEANUP: {removed} alte Backups entfernt")
else:
print(f"CLEANUP: Keine alten Backups (Aufbewahrung: {_KEEP_DAYS} Tage)")
def list_backups():
"""Listet alle vorhandenen Backups auf."""
if not _BACKUP_DIR.exists():
print("Kein Backup-Verzeichnis vorhanden.")
return []
backups = sorted(_BACKUP_DIR.glob("aza_backup_*.zip"), key=lambda p: p.stat().st_mtime, reverse=True)
if not backups:
print("Keine Backups vorhanden.")
return []
print(f"BACKUPS IN: {_BACKUP_DIR}")
print(f"{'Nr':>3} {'Datum':20} {'Groesse':>10} {'Datei'}")
print("-" * 70)
for i, bp in enumerate(backups, 1):
mtime = datetime.fromtimestamp(bp.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S")
size_mb = bp.stat().st_size / (1024 * 1024)
print(f"{i:3d} {mtime:20} {size_mb:>8.2f} MB {bp.name}")
return backups
def delete_patient_data(patient_name: str, dry_run: bool = True) -> dict:
"""Löscht alle Daten eines Patienten (Recht auf Vergessenwerden).
ACHTUNG: Prüft nur die lokalen JSON-Dateien und die Ablage.
Cloud-Daten (Supabase) und Backups müssen separat behandelt werden.
Args:
patient_name: Name des Patienten (exakter Match).
dry_run: Wenn True, wird nur geprüft aber nicht gelöscht.
"""
result = {
"patient": patient_name,
"dry_run": dry_run,
"found_in": [],
"deleted_from": [],
"errors": [],
"backup_warning": False,
}
kg_dir = _BASE_DIR / _ABLAGE_DIR / "KG"
if kg_dir.exists():
for f in kg_dir.iterdir():
if patient_name.lower() in f.name.lower():
result["found_in"].append(str(f))
if not dry_run:
try:
f.unlink()
result["deleted_from"].append(str(f))
except OSError as e:
result["errors"].append(f"Fehler beim Loeschen {f}: {e}")
for subdir in ("Briefe", "Rezepte", "Kostengutsprachen", "Diktat"):
sub_path = _BASE_DIR / _ABLAGE_DIR / subdir
if sub_path.exists():
for f in sub_path.iterdir():
if patient_name.lower() in f.name.lower():
result["found_in"].append(str(f))
if not dry_run:
try:
f.unlink()
result["deleted_from"].append(str(f))
except OSError as e:
result["errors"].append(f"Fehler beim Loeschen {f}: {e}")
text_files = ["kg_diktat_notes.json", "kg_diktat_todos.json", "kg_diktat_todo_inbox.json"]
for fname in text_files:
fpath = _BASE_DIR / fname
if not fpath.exists():
continue
try:
with open(fpath, "r", encoding="utf-8") as f:
data = json.load(f)
except (json.JSONDecodeError, OSError):
continue
if isinstance(data, list):
original_len = len(data)
filtered = [item for item in data if patient_name.lower() not in json.dumps(item, ensure_ascii=False).lower()]
if len(filtered) < original_len:
result["found_in"].append(f"{fname} ({original_len - len(filtered)} Eintraege)")
if not dry_run:
with open(fpath, "w", encoding="utf-8") as f:
json.dump(filtered, f, ensure_ascii=False, indent=2)
result["deleted_from"].append(fname)
if _BACKUP_DIR.exists() and list(_BACKUP_DIR.glob("aza_backup_*.zip")):
result["backup_warning"] = True
if dry_run:
print(f"\nDRY-RUN: Loeschung fuer Patient '{patient_name}'")
else:
print(f"\nLOESCHUNG DURCHGEFUEHRT: Patient '{patient_name}'")
if result["found_in"]:
print(" Gefunden in:")
for loc in result["found_in"]:
print(f" - {loc}")
else:
print(" Keine Daten gefunden.")
if result["backup_warning"]:
print("\n WARNUNG: Bestehende Backups enthalten moeglicherweise noch Daten")
print(" dieses Patienten. Diese muessen separat behandelt werden.")
print(" Cloud-Daten (Supabase) muessen manuell geloescht werden.")
return result
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="AZA MedWork Backup & Restore")
sub = parser.add_subparsers(dest="command")
sub.add_parser("backup", help="Backup erstellen")
sub.add_parser("list", help="Backups auflisten")
sub.add_parser("cleanup", help="Alte Backups entfernen")
p_verify = sub.add_parser("verify", help="Backup verifizieren")
p_verify.add_argument("file", help="Pfad zum Backup-Archiv")
p_restore = sub.add_parser("restore", help="Backup wiederherstellen")
p_restore.add_argument("file", help="Pfad zum Backup-Archiv")
p_restore.add_argument("--target", help="Zielverzeichnis", default=None)
p_restore.add_argument("--dry-run", action="store_true", help="Nur pruefen")
p_delete = sub.add_parser("delete-patient", help="Patientendaten loeschen")
p_delete.add_argument("name", help="Patientenname")
p_delete.add_argument("--execute", action="store_true", help="Tatsaechlich loeschen (ohne: dry-run)")
args = parser.parse_args()
if args.command == "backup":
path = create_backup()
cleanup_old_backups()
elif args.command == "list":
list_backups()
elif args.command == "cleanup":
cleanup_old_backups()
elif args.command == "verify":
ok = verify_backup(Path(args.file))
sys.exit(0 if ok else 1)
elif args.command == "restore":
target = Path(args.target) if args.target else None
ok = restore_backup(Path(args.file), target, dry_run=args.dry_run)
sys.exit(0 if ok else 1)
elif args.command == "delete-patient":
delete_patient_data(args.name, dry_run=not args.execute)
else:
parser.print_help()