# -*- coding: utf-8 -*- """STEP 12 – Monitoring & Health Checks Proof""" import os, sys, json sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) os.chdir(os.path.dirname(os.path.abspath(__file__))) from pathlib import Path from aza_audit_log import log_event, verify_integrity, _LOG_FILE from aza_monitoring import ( collect_metrics, get_alert_metrics, check_integrity, run_nightly, ) if _LOG_FILE.exists(): _LOG_FILE.unlink() passed = 0 failed = 0 def check(name, condition): global passed, failed if condition: print(f" PASS: {name}") passed += 1 else: print(f" FAIL: {name}") failed += 1 print("=" * 70) print("MONITORING & HEALTH CHECKS PROOF") print("=" * 70) # --- 1) Metriken mit Audit-Daten --- print("\n--- 1. METRIKEN MIT AUDIT-DATEN ---") log_event("APP_START", "user_1") log_event("LOGIN_OK", "user_1") log_event("LOGIN_FAIL", "attacker", success=False, detail="wrong pw") log_event("LOGIN_FAIL", "attacker", success=False, detail="wrong pw") log_event("AI_CHAT", "user_1", detail="model=gpt-5.2") log_event("AI_TRANSCRIBE", "user_1") log_event("AI_BLOCKED", "user_2", success=False, detail="kein Consent") log_event("2FA_FAIL", "user_1", success=False) m = collect_metrics() check("Audit-Log Eintraege > 0", m["audit_log"]["total_lines"] > 0) check("Audit-Log Integritaet PASS", m["audit_log"]["integrity"] == "PASS") check("Backup-Count vorhanden", "count" in m["backup"]) # --- 2) Alert-Metriken --- print("\n--- 2. ALERT-METRIKEN ---") alerts = get_alert_metrics() alert_names = [a["metric"] for a in alerts] check("login_fail erkannt", "login_fail_count" in alert_names) check("ai_calls_total vorhanden", "ai_calls_total" in alert_names) check("ai_blocked erkannt", "ai_blocked_count" in alert_names) check("2fa_fail erkannt", "2fa_fail_count" in alert_names) ai_total = next((a for a in alerts if a["metric"] == "ai_calls_total"), None) check("AI-Calls = 2 (chat+transcribe)", ai_total and ai_total["value"] == 2) login_fail = next((a for a in alerts if a["metric"] == "login_fail_count"), None) check("Login-Fail = 2", login_fail and login_fail["value"] == 2) # --- 3) Integritaet --- print("\n--- 3. INTEGRITAETS-CHECK ---") r = check_integrity() check("Audit-Log Integritaet PASS", r["audit_log"]["status"] == "PASS") check("Consent-Log Status vorhanden", "status" in r["consent_log"]) # --- 4) Manipulation erkennen --- print("\n--- 4. MANIPULATION ERKENNEN ---") import shutil backup = str(_LOG_FILE) + ".bak" shutil.copy2(_LOG_FILE, backup) with open(_LOG_FILE, "r", encoding="utf-8") as f: content = f.read() manipulated = content.replace("LOGIN_OK", "LOGIN_XX", 1) with open(_LOG_FILE, "w", encoding="utf-8") as f: f.write(manipulated) r2 = check_integrity() check("Manipulation erkannt (FAIL)", r2["audit_log"]["status"] == "FAIL") shutil.copy2(backup, _LOG_FILE) os.remove(backup) r3 = check_integrity() check("Nach Restore: PASS", r3["audit_log"]["status"] == "PASS") # --- 5) Nightly-Report --- print("\n--- 5. NIGHTLY-REPORT ---") report = run_nightly() check("Nightly hat 'overall'", "overall" in report) check("Nightly hat 'integrity'", "integrity" in report) check("Nightly hat 'alerts'", "alerts" in report) check("Nightly hat 'metrics'", "metrics" in report) nightly_file = Path("monitoring_nightly_test.json") with open(nightly_file, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) check("Nightly JSON geschrieben", nightly_file.exists()) nightly_file.unlink() # --- 6) Data Minimization --- print("\n--- 6. DATA MINIMIZATION ---") with open(_LOG_FILE, "r", encoding="utf-8") as f: content = f.read() check("Kein Passwort im Audit-Log", "password" not in content.lower() or "wrong pw" in content.lower()) check("Kein API-Key im Audit-Log", "sk-" not in content) check("Kein Transkript im Audit-Log", "TRANSKRIPT:" not in content) report_json = json.dumps(report, ensure_ascii=False) check("Kein Passwort im Report", "password" not in report_json.lower() or "wrong pw" not in report_json) check("Kein API-Key im Report", "sk-" not in report_json) # --- 7) Health-Check Format --- print("\n--- 7. HEALTH-CHECK FORMAT ---") import importlib backend = importlib.import_module("backend_main") check("backend_main hat _APP_VERSION", hasattr(backend, "_APP_VERSION")) check("backend_main hat _START_TIME", hasattr(backend, "_START_TIME")) transcribe = importlib.import_module("transcribe_server") check("transcribe_server hat _APP_VERSION", hasattr(transcribe, "_APP_VERSION")) check("transcribe_server hat _START_TIME", hasattr(transcribe, "_START_TIME")) # Cleanup if _LOG_FILE.exists(): _LOG_FILE.unlink() # Nightly report cleanup for f in Path(".").glob("monitoring_nightly_*.json"): f.unlink() # --- Zusammenfassung --- print(f"\n{'='*70}") print("ZUSAMMENFASSUNG") print(f"{'='*70}") print(f"ERGEBNIS: {passed} PASS, {failed} FAIL") if failed == 0: print("ALLE TESTS BESTANDEN") else: sys.exit(1)