Files
aza/AzA march 2026 - Kopie/_test_audit_integrity.py
2026-03-30 07:59:11 +02:00

174 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""STEP 11a Audit-Log Integritaet Proof"""
import os, sys, json, copy, shutil
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
os.chdir(os.path.dirname(os.path.abspath(__file__)))
from pathlib import Path
from aza_audit_log import (
log_event, verify_integrity, verify_all_rotations,
get_log_stats, export_audit_log, _LOG_FILE, _GENESIS_HASH,
_CHAIN_HEADER_PREFIX,
)
if _LOG_FILE.exists():
_LOG_FILE.unlink()
passed = 0
failed = 0
def check(name, condition):
global passed, failed
if condition:
print(f" PASS: {name}")
passed += 1
else:
print(f" FAIL: {name}")
failed += 1
print("=" * 70)
print("AUDIT-LOG INTEGRITAET PROOF")
print("=" * 70)
# --- 1) Hash-Kette schreiben + verifizieren ---
print("\n--- 1. HASH-KETTE SCHREIBEN + VERIFIZIEREN ---")
log_event("APP_START", "user_1", detail="test")
log_event("LOGIN_OK", "user_1")
log_event("AI_CHAT", "user_1", detail="model=gpt-5.2")
log_event("CONSENT_GRANT", "user_1")
log_event("APP_STOP", "user_1")
ok, errs = verify_integrity()
check("5 Eintraege, Integritaet PASS", ok)
with open(_LOG_FILE, "r", encoding="utf-8") as f:
lines = [l.strip() for l in f if l.strip() and not l.startswith("#")]
check(f"5 Zeilen (gefunden: {len(lines)})", len(lines) == 5)
parts = [p.strip() for p in lines[0].split("|")]
check("8 Felder pro Zeile", len(parts) == 8)
check(f"prev_hash[0] = GENESIS", parts[6] == _GENESIS_HASH)
check("entry_hash[0] hat 64 hex", len(parts[7]) == 64)
parts2 = [p.strip() for p in lines[1].split("|")]
check("prev_hash[1] == entry_hash[0]", parts2[6] == parts[7])
print("\n Beispiel-Zeile (sanitized):")
print(f" {lines[0][:80]}...")
# --- 2) Manipulation erkennen ---
print("\n--- 2. MANIPULATION ERKENNEN ---")
backup = str(_LOG_FILE) + ".bak"
shutil.copy2(_LOG_FILE, backup)
with open(_LOG_FILE, "r", encoding="utf-8") as f:
content = f.read()
manipulated = content.replace("LOGIN_OK", "LOGIN_XX", 1)
with open(_LOG_FILE, "w", encoding="utf-8") as f:
f.write(manipulated)
ok2, errs2 = verify_integrity()
check("Manipulation erkannt (FAIL)", not ok2)
if errs2:
print(f" Fehler: {errs2[0]}")
shutil.copy2(backup, _LOG_FILE)
os.remove(backup)
ok3, _ = verify_integrity()
check("Nach Restore: PASS", ok3)
# --- 3) Rotation mit Ketten-Uebergabe ---
print("\n--- 3. ROTATION MIT KETTEN-UEBERGABE ---")
if _LOG_FILE.exists():
_LOG_FILE.unlink()
rot1 = _LOG_FILE.parent / f"{_LOG_FILE.stem}.1{_LOG_FILE.suffix}"
if rot1.exists():
rot1.unlink()
log_event("PRE_ROTATE_1", "user_1")
log_event("PRE_ROTATE_2", "user_1")
last_hash_before = None
with open(_LOG_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 8:
last_hash_before = parts[7]
shutil.copy2(_LOG_FILE, rot1)
with open(_LOG_FILE, "w", encoding="utf-8") as f:
f.write(f"{_CHAIN_HEADER_PREFIX}{last_hash_before}\n")
log_event("POST_ROTATE_1", "user_1")
log_event("POST_ROTATE_2", "user_1")
ok_rot1, errs_rot1 = verify_integrity(rot1)
check("Rotierte Datei (.1) intakt", ok_rot1)
ok_main, errs_main = verify_integrity(_LOG_FILE)
check("Aktuelle Datei intakt (Kette ab Chain-Header)", ok_main)
with open(_LOG_FILE, "r", encoding="utf-8") as f:
first_line = f.readline().strip()
check("Chain-Header vorhanden", first_line.startswith(_CHAIN_HEADER_PREFIX))
header_hash = first_line[len(_CHAIN_HEADER_PREFIX):]
check("Header-Hash == letzter Hash der rotierten Datei", header_hash == last_hash_before)
with open(_LOG_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
first_entry_parts = [p.strip() for p in line.split("|")]
break
check("prev_hash[0 in neuem File] == Chain-Header-Hash",
first_entry_parts[6] == last_hash_before)
ok_all, res_all = verify_all_rotations()
check("verify_all_rotations PASS", ok_all)
if rot1.exists():
rot1.unlink()
# --- 4) Stats + Export ---
print("\n--- 4. STATS + EXPORT ---")
stats = get_log_stats()
check("Integritaet in Stats = PASS", stats["integrity"] == "PASS")
path = export_audit_log()
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
check("Export integrity = PASS", data["integrity"] == "PASS")
check("Export hat entries", len(data["entries"]) > 0)
check("Entries haben entry_hash", "entry_hash" in data["entries"][0])
os.remove(path)
# --- 5) Data Minimization ---
print("\n--- 5. DATA MINIMIZATION ---")
with open(_LOG_FILE, "r", encoding="utf-8") as f:
content = f.read()
check("Kein Passwort im Log", "password" not in content.lower())
check("Kein API-Key im Log", "sk-" not in content)
check("Kein Prompt/Transkript", "TRANSKRIPT:" not in content)
# Cleanup
if _LOG_FILE.exists():
_LOG_FILE.unlink()
# --- Zusammenfassung ---
print(f"\n{'='*70}")
print("ZUSAMMENFASSUNG")
print(f"{'='*70}")
print(f"ERGEBNIS: {passed} PASS, {failed} FAIL")
if failed == 0:
print("ALLE TESTS BESTANDEN")
else:
sys.exit(1)