# -*- coding: utf-8 -*- """STEP 11a – Audit-Log Integritaet Proof""" import os, sys, json, copy, shutil sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) os.chdir(os.path.dirname(os.path.abspath(__file__))) from pathlib import Path from aza_audit_log import ( log_event, verify_integrity, verify_all_rotations, get_log_stats, export_audit_log, _LOG_FILE, _GENESIS_HASH, _CHAIN_HEADER_PREFIX, ) if _LOG_FILE.exists(): _LOG_FILE.unlink() passed = 0 failed = 0 def check(name, condition): global passed, failed if condition: print(f" PASS: {name}") passed += 1 else: print(f" FAIL: {name}") failed += 1 print("=" * 70) print("AUDIT-LOG INTEGRITAET PROOF") print("=" * 70) # --- 1) Hash-Kette schreiben + verifizieren --- print("\n--- 1. HASH-KETTE SCHREIBEN + VERIFIZIEREN ---") log_event("APP_START", "user_1", detail="test") log_event("LOGIN_OK", "user_1") log_event("AI_CHAT", "user_1", detail="model=gpt-5.2") log_event("CONSENT_GRANT", "user_1") log_event("APP_STOP", "user_1") ok, errs = verify_integrity() check("5 Eintraege, Integritaet PASS", ok) with open(_LOG_FILE, "r", encoding="utf-8") as f: lines = [l.strip() for l in f if l.strip() and not l.startswith("#")] check(f"5 Zeilen (gefunden: {len(lines)})", len(lines) == 5) parts = [p.strip() for p in lines[0].split("|")] check("8 Felder pro Zeile", len(parts) == 8) check(f"prev_hash[0] = GENESIS", parts[6] == _GENESIS_HASH) check("entry_hash[0] hat 64 hex", len(parts[7]) == 64) parts2 = [p.strip() for p in lines[1].split("|")] check("prev_hash[1] == entry_hash[0]", parts2[6] == parts[7]) print("\n Beispiel-Zeile (sanitized):") print(f" {lines[0][:80]}...") # --- 2) Manipulation erkennen --- print("\n--- 2. MANIPULATION ERKENNEN ---") backup = str(_LOG_FILE) + ".bak" shutil.copy2(_LOG_FILE, backup) with open(_LOG_FILE, "r", encoding="utf-8") as f: content = f.read() manipulated = content.replace("LOGIN_OK", "LOGIN_XX", 1) with open(_LOG_FILE, "w", encoding="utf-8") as f: f.write(manipulated) ok2, errs2 = verify_integrity() check("Manipulation erkannt (FAIL)", not ok2) if errs2: print(f" Fehler: {errs2[0]}") shutil.copy2(backup, _LOG_FILE) os.remove(backup) ok3, _ = verify_integrity() check("Nach Restore: PASS", ok3) # --- 3) Rotation mit Ketten-Uebergabe --- print("\n--- 3. ROTATION MIT KETTEN-UEBERGABE ---") if _LOG_FILE.exists(): _LOG_FILE.unlink() rot1 = _LOG_FILE.parent / f"{_LOG_FILE.stem}.1{_LOG_FILE.suffix}" if rot1.exists(): rot1.unlink() log_event("PRE_ROTATE_1", "user_1") log_event("PRE_ROTATE_2", "user_1") last_hash_before = None with open(_LOG_FILE, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#"): parts = [p.strip() for p in line.split("|")] if len(parts) >= 8: last_hash_before = parts[7] shutil.copy2(_LOG_FILE, rot1) with open(_LOG_FILE, "w", encoding="utf-8") as f: f.write(f"{_CHAIN_HEADER_PREFIX}{last_hash_before}\n") log_event("POST_ROTATE_1", "user_1") log_event("POST_ROTATE_2", "user_1") ok_rot1, errs_rot1 = verify_integrity(rot1) check("Rotierte Datei (.1) intakt", ok_rot1) ok_main, errs_main = verify_integrity(_LOG_FILE) check("Aktuelle Datei intakt (Kette ab Chain-Header)", ok_main) with open(_LOG_FILE, "r", encoding="utf-8") as f: first_line = f.readline().strip() check("Chain-Header vorhanden", first_line.startswith(_CHAIN_HEADER_PREFIX)) header_hash = first_line[len(_CHAIN_HEADER_PREFIX):] check("Header-Hash == letzter Hash der rotierten Datei", header_hash == last_hash_before) with open(_LOG_FILE, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#"): first_entry_parts = [p.strip() for p in line.split("|")] break check("prev_hash[0 in neuem File] == Chain-Header-Hash", first_entry_parts[6] == last_hash_before) ok_all, res_all = verify_all_rotations() check("verify_all_rotations PASS", ok_all) if rot1.exists(): rot1.unlink() # --- 4) Stats + Export --- print("\n--- 4. STATS + EXPORT ---") stats = get_log_stats() check("Integritaet in Stats = PASS", stats["integrity"] == "PASS") path = export_audit_log() with open(path, "r", encoding="utf-8") as f: data = json.load(f) check("Export integrity = PASS", data["integrity"] == "PASS") check("Export hat entries", len(data["entries"]) > 0) check("Entries haben entry_hash", "entry_hash" in data["entries"][0]) os.remove(path) # --- 5) Data Minimization --- print("\n--- 5. DATA MINIMIZATION ---") with open(_LOG_FILE, "r", encoding="utf-8") as f: content = f.read() check("Kein Passwort im Log", "password" not in content.lower()) check("Kein API-Key im Log", "sk-" not in content) check("Kein Prompt/Transkript", "TRANSKRIPT:" not in content) # Cleanup if _LOG_FILE.exists(): _LOG_FILE.unlink() # --- Zusammenfassung --- print(f"\n{'='*70}") print("ZUSAMMENFASSUNG") print(f"{'='*70}") print(f"ERGEBNIS: {passed} PASS, {failed} FAIL") if failed == 0: print("ALLE TESTS BESTANDEN") else: sys.exit(1)