# -*- coding: utf-8 -*- """ AZA MedWork – Audit-Logging (DSG-konform, tamper-evident). Protokolliert sicherheitsrelevante Ereignisse in einer Append-only-Logdatei mit SHA-256-Hash-Kette. Keine Patientendaten, keine Prompts, keine KI-Antworten. Format pro Zeile (pipe-separiert, 8 Felder): TIMESTAMP | EVENT | USER | STATUS | SOURCE | DETAIL | PREV_HASH | ENTRY_HASH Konfiguration: AZA_AUDIT_LOG – Pfad zur Logdatei (Standard: aza_audit.log) AZA_AUDIT_ROTATE_MB – Max. Groesse in MB vor Rotation (Standard: 10) AZA_AUDIT_KEEP – Anzahl rotierter Dateien (Standard: 12) """ import os import sys import json import hashlib import shutil from datetime import datetime, timezone from pathlib import Path from typing import Optional from aza_config import get_writable_data_dir _LOG_FILE = Path(os.getenv("AZA_AUDIT_LOG", str(Path(get_writable_data_dir()) / "aza_audit.log"))) _ROTATE_MB = float(os.getenv("AZA_AUDIT_ROTATE_MB", "10")) _KEEP_COUNT = int(os.getenv("AZA_AUDIT_KEEP", "12")) _GENESIS_HASH = "0" * 64 _CHAIN_HEADER_PREFIX = "#CHAIN_FROM=" def _compute_entry_hash(payload: str, prev_hash: str) -> str: """SHA-256 ueber prev_hash + payload (ohne Hash-Felder).""" raw = prev_hash + payload return hashlib.sha256(raw.encode("utf-8")).hexdigest() def _get_last_hash(path: Path = None) -> str: """Liest den letzten entry_hash aus einer Logdatei.""" if path is None: path = _LOG_FILE if not path.exists(): return _GENESIS_HASH last_hash = _GENESIS_HASH try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): if line.startswith(_CHAIN_HEADER_PREFIX): last_hash = line[len(_CHAIN_HEADER_PREFIX):].strip() continue parts = [p.strip() for p in line.split("|")] if len(parts) >= 8: last_hash = parts[7] except OSError: pass return last_hash def _rotate_if_needed(): """Rotiert die Logdatei wenn sie die Maximalgroesse ueberschreitet. Speichert den letzten Hash als Chain-Header in der neuen Datei.""" if not _LOG_FILE.exists(): return False try: size_mb = _LOG_FILE.stat().st_size / (1024 * 1024) except OSError: return False if size_mb < _ROTATE_MB: return False last_hash = _get_last_hash(_LOG_FILE) for i in range(_KEEP_COUNT - 1, 0, -1): src = _LOG_FILE.parent / f"{_LOG_FILE.stem}.{i}{_LOG_FILE.suffix}" dst = _LOG_FILE.parent / f"{_LOG_FILE.stem}.{i + 1}{_LOG_FILE.suffix}" if src.exists(): try: shutil.move(str(src), str(dst)) except OSError: pass rotated = _LOG_FILE.parent / f"{_LOG_FILE.stem}.1{_LOG_FILE.suffix}" try: shutil.move(str(_LOG_FILE), str(rotated)) except OSError: pass try: with open(_LOG_FILE, "w", encoding="utf-8") as f: f.write(f"{_CHAIN_HEADER_PREFIX}{last_hash}\n") except OSError: pass return True def log_event( event: str, user_id: str = "", success: bool = True, detail: str = "", source: str = "desktop", ): """Schreibt einen Audit-Eintrag mit Hash-Kette. Format: TS | EVENT | USER | STATUS | SOURCE | DETAIL | PREV_HASH | ENTRY_HASH """ _rotate_if_needed() ts = datetime.now(timezone.utc).isoformat(timespec="milliseconds") status = "OK" if success else "FAIL" safe_detail = detail.replace("|", "/").replace("\n", " ").replace("\r", "") if len(safe_detail) > 200: safe_detail = safe_detail[:197] + "..." prev_hash = _get_last_hash() payload = f"{ts} | {event} | {user_id} | {status} | {source} | {safe_detail}" entry_hash = _compute_entry_hash(payload, prev_hash) line = f"{payload} | {prev_hash} | {entry_hash}\n" try: _LOG_FILE.parent.mkdir(parents=True, exist_ok=True) with open(_LOG_FILE, "a", encoding="utf-8") as f: f.write(line) except OSError as e: print(f"AUDIT-LOG FEHLER: {e}", file=sys.stderr) def verify_integrity(path: Path = None) -> tuple[bool, list]: """Prueft die Integritaet der Hash-Kette einer Logdatei. Returns: (ok, errors) – errors enthaelt Zeilennummer + Beschreibung. """ if path is None: path = _LOG_FILE path = Path(path) if not path.exists(): return True, [] errors = [] prev_hash = _GENESIS_HASH line_num = 0 try: with open(path, "r", encoding="utf-8") as f: for raw_line in f: raw_line = raw_line.strip() if not raw_line: continue if raw_line.startswith("#"): if raw_line.startswith(_CHAIN_HEADER_PREFIX): prev_hash = raw_line[len(_CHAIN_HEADER_PREFIX):].strip() continue line_num += 1 parts = [p.strip() for p in raw_line.split("|")] if len(parts) < 8: errors.append(f"Zeile {line_num}: Ungueltig ({len(parts)} Felder, erwartet 8)") continue stored_prev = parts[6] stored_hash = parts[7] if stored_prev != prev_hash: errors.append( f"Zeile {line_num}: prev_hash stimmt nicht " f"(erwartet {prev_hash[:16]}..., gefunden {stored_prev[:16]}...)" ) payload = " | ".join(parts[:6]) expected_hash = _compute_entry_hash(payload, stored_prev) if stored_hash != expected_hash: errors.append( f"Zeile {line_num}: entry_hash stimmt nicht " f"(erwartet {expected_hash[:16]}..., gefunden {stored_hash[:16]}...)" ) prev_hash = stored_hash except OSError as e: errors.append(f"Dateifehler: {e}") return len(errors) == 0, errors def verify_all_rotations() -> tuple[bool, dict]: """Prueft die Integritaet ueber alle rotierten Logdateien hinweg. Returns: (all_ok, results_per_file) """ results = {} all_ok = True rotation_files = [] for i in range(_KEEP_COUNT, 0, -1): rp = _LOG_FILE.parent / f"{_LOG_FILE.stem}.{i}{_LOG_FILE.suffix}" if rp.exists(): rotation_files.append(rp) if _LOG_FILE.exists(): rotation_files.append(_LOG_FILE) for fp in rotation_files: ok, errs = verify_integrity(fp) results[str(fp.name)] = {"ok": ok, "errors": errs} if not ok: all_ok = False return all_ok, results def export_audit_log(output_path: Optional[str] = None) -> str: """Exportiert das Audit-Log als JSON.""" entries = _parse_log_file(_LOG_FILE) if output_path is None: ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_path = str(Path(get_writable_data_dir()) / f"audit_export_{ts}.json") ok, errs = verify_integrity(_LOG_FILE) export_data = { "export_timestamp": datetime.now(timezone.utc).isoformat(), "total_entries": len(entries), "source_file": str(_LOG_FILE), "integrity": "PASS" if ok else "FAIL", "integrity_errors": errs, "entries": entries, } with open(output_path, "w", encoding="utf-8") as f: json.dump(export_data, f, ensure_ascii=False, indent=2) return output_path def _parse_log_file(path: Path) -> list: """Parst eine Logdatei in eine Liste von Dicts.""" entries = [] if not path.exists(): return entries try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = [p.strip() for p in line.split("|")] if len(parts) >= 6: entry = { "timestamp": parts[0], "event": parts[1], "user_id": parts[2], "status": parts[3], "source": parts[4], "detail": parts[5], } if len(parts) >= 8: entry["prev_hash"] = parts[6] entry["entry_hash"] = parts[7] entries.append(entry) except OSError: pass return entries def get_log_stats() -> dict: """Gibt Statistiken ueber das Audit-Log zurueck.""" entries = _parse_log_file(_LOG_FILE) ok, _ = verify_integrity(_LOG_FILE) stats = { "log_file": str(_LOG_FILE), "exists": _LOG_FILE.exists(), "size_mb": 0.0, "total_lines": len(entries), "integrity": "PASS" if ok else "FAIL", "events": {}, "first_entry": None, "last_entry": None, } if _LOG_FILE.exists(): try: stats["size_mb"] = round(_LOG_FILE.stat().st_size / (1024 * 1024), 3) except OSError: pass for e in entries: ev = e.get("event", "") stats["events"][ev] = stats["events"].get(ev, 0) + 1 if entries: stats["first_entry"] = entries[0].get("timestamp") stats["last_entry"] = entries[-1].get("timestamp") return stats if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="AZA MedWork Audit-Log") sub = parser.add_subparsers(dest="command") p_verify = sub.add_parser("verify", help="Integritaet pruefen") p_verify.add_argument("--file", help="Logdatei (Standard: aktuelle)", default=None) p_verify.add_argument("--all", action="store_true", help="Alle Rotationsdateien pruefen") sub.add_parser("stats", help="Statistiken anzeigen") p_export = sub.add_parser("export", help="Log exportieren") p_export.add_argument("--output", help="Ausgabepfad", default=None) args = parser.parse_args() if args.command == "verify": if args.all: ok, results = verify_all_rotations() for fname, res in results.items(): status = "PASS" if res["ok"] else "FAIL" print(f" {fname}: {status}") for e in res["errors"]: print(f" {e}") print(f"\nGESAMT: {'PASS' if ok else 'FAIL'}") sys.exit(0 if ok else 1) else: fp = Path(args.file) if args.file else _LOG_FILE ok, errs = verify_integrity(fp) print(f"Datei: {fp}") print(f"Integritaet: {'PASS' if ok else 'FAIL'}") for e in errs: print(f" {e}") sys.exit(0 if ok else 1) elif args.command == "stats": s = get_log_stats() print(f"Datei: {s['log_file']}") print(f"Existiert: {s['exists']}") print(f"Groesse: {s['size_mb']} MB") print(f"Eintraege: {s['total_lines']}") print(f"Integritaet: {s['integrity']}") if s["first_entry"]: print(f"Erster: {s['first_entry']}") print(f"Letzter: {s['last_entry']}") if s["events"]: print("Events:") for ev, cnt in sorted(s["events"].items()): print(f" {ev}: {cnt}") elif args.command == "export": path = export_audit_log(args.output) print(f"Exportiert: {path}") else: parser.print_help()