Files
aza/AzA march 2026 - Kopie/_test_audit_integrity.py

174 lines
5.0 KiB
Python
Raw Permalink Normal View History

2026-03-25 13:42:48 +01:00
# -*- coding: utf-8 -*-
"""STEP 11a Audit-Log Integritaet Proof"""
import os, sys, json, copy, shutil
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
os.chdir(os.path.dirname(os.path.abspath(__file__)))
from pathlib import Path
from aza_audit_log import (
log_event, verify_integrity, verify_all_rotations,
get_log_stats, export_audit_log, _LOG_FILE, _GENESIS_HASH,
_CHAIN_HEADER_PREFIX,
)
if _LOG_FILE.exists():
_LOG_FILE.unlink()
passed = 0
failed = 0
def check(name, condition):
global passed, failed
if condition:
print(f" PASS: {name}")
passed += 1
else:
print(f" FAIL: {name}")
failed += 1
print("=" * 70)
print("AUDIT-LOG INTEGRITAET PROOF")
print("=" * 70)
# --- 1) Hash-Kette schreiben + verifizieren ---
print("\n--- 1. HASH-KETTE SCHREIBEN + VERIFIZIEREN ---")
log_event("APP_START", "user_1", detail="test")
log_event("LOGIN_OK", "user_1")
log_event("AI_CHAT", "user_1", detail="model=gpt-5.2")
log_event("CONSENT_GRANT", "user_1")
log_event("APP_STOP", "user_1")
ok, errs = verify_integrity()
check("5 Eintraege, Integritaet PASS", ok)
with open(_LOG_FILE, "r", encoding="utf-8") as f:
lines = [l.strip() for l in f if l.strip() and not l.startswith("#")]
check(f"5 Zeilen (gefunden: {len(lines)})", len(lines) == 5)
parts = [p.strip() for p in lines[0].split("|")]
check("8 Felder pro Zeile", len(parts) == 8)
check(f"prev_hash[0] = GENESIS", parts[6] == _GENESIS_HASH)
check("entry_hash[0] hat 64 hex", len(parts[7]) == 64)
parts2 = [p.strip() for p in lines[1].split("|")]
check("prev_hash[1] == entry_hash[0]", parts2[6] == parts[7])
print("\n Beispiel-Zeile (sanitized):")
print(f" {lines[0][:80]}...")
# --- 2) Manipulation erkennen ---
print("\n--- 2. MANIPULATION ERKENNEN ---")
backup = str(_LOG_FILE) + ".bak"
shutil.copy2(_LOG_FILE, backup)
with open(_LOG_FILE, "r", encoding="utf-8") as f:
content = f.read()
manipulated = content.replace("LOGIN_OK", "LOGIN_XX", 1)
with open(_LOG_FILE, "w", encoding="utf-8") as f:
f.write(manipulated)
ok2, errs2 = verify_integrity()
check("Manipulation erkannt (FAIL)", not ok2)
if errs2:
print(f" Fehler: {errs2[0]}")
shutil.copy2(backup, _LOG_FILE)
os.remove(backup)
ok3, _ = verify_integrity()
check("Nach Restore: PASS", ok3)
# --- 3) Rotation mit Ketten-Uebergabe ---
print("\n--- 3. ROTATION MIT KETTEN-UEBERGABE ---")
if _LOG_FILE.exists():
_LOG_FILE.unlink()
rot1 = _LOG_FILE.parent / f"{_LOG_FILE.stem}.1{_LOG_FILE.suffix}"
if rot1.exists():
rot1.unlink()
log_event("PRE_ROTATE_1", "user_1")
log_event("PRE_ROTATE_2", "user_1")
last_hash_before = None
with open(_LOG_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 8:
last_hash_before = parts[7]
shutil.copy2(_LOG_FILE, rot1)
with open(_LOG_FILE, "w", encoding="utf-8") as f:
f.write(f"{_CHAIN_HEADER_PREFIX}{last_hash_before}\n")
log_event("POST_ROTATE_1", "user_1")
log_event("POST_ROTATE_2", "user_1")
ok_rot1, errs_rot1 = verify_integrity(rot1)
check("Rotierte Datei (.1) intakt", ok_rot1)
ok_main, errs_main = verify_integrity(_LOG_FILE)
check("Aktuelle Datei intakt (Kette ab Chain-Header)", ok_main)
with open(_LOG_FILE, "r", encoding="utf-8") as f:
first_line = f.readline().strip()
check("Chain-Header vorhanden", first_line.startswith(_CHAIN_HEADER_PREFIX))
header_hash = first_line[len(_CHAIN_HEADER_PREFIX):]
check("Header-Hash == letzter Hash der rotierten Datei", header_hash == last_hash_before)
with open(_LOG_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
first_entry_parts = [p.strip() for p in line.split("|")]
break
check("prev_hash[0 in neuem File] == Chain-Header-Hash",
first_entry_parts[6] == last_hash_before)
ok_all, res_all = verify_all_rotations()
check("verify_all_rotations PASS", ok_all)
if rot1.exists():
rot1.unlink()
# --- 4) Stats + Export ---
print("\n--- 4. STATS + EXPORT ---")
stats = get_log_stats()
check("Integritaet in Stats = PASS", stats["integrity"] == "PASS")
path = export_audit_log()
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
check("Export integrity = PASS", data["integrity"] == "PASS")
check("Export hat entries", len(data["entries"]) > 0)
check("Entries haben entry_hash", "entry_hash" in data["entries"][0])
os.remove(path)
# --- 5) Data Minimization ---
print("\n--- 5. DATA MINIMIZATION ---")
with open(_LOG_FILE, "r", encoding="utf-8") as f:
content = f.read()
check("Kein Passwort im Log", "password" not in content.lower())
check("Kein API-Key im Log", "sk-" not in content)
check("Kein Prompt/Transkript", "TRANSKRIPT:" not in content)
# Cleanup
if _LOG_FILE.exists():
_LOG_FILE.unlink()
# --- Zusammenfassung ---
print(f"\n{'='*70}")
print("ZUSAMMENFASSUNG")
print(f"{'='*70}")
print(f"ERGEBNIS: {passed} PASS, {failed} FAIL")
if failed == 0:
print("ALLE TESTS BESTANDEN")
else:
sys.exit(1)