Files
aza/AzA march 2026/_test_consent_audit.py

202 lines
6.6 KiB
Python
Raw Permalink Normal View History

2026-03-25 22:03:39 +01:00
# -*- coding: utf-8 -*-
"""
STEP 10a Consent Audit-Proof Skript.
Erzeugt alle Nachweise fuer die Audit-Dokumentation.
"""
import os, sys, json, copy, shutil
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
os.chdir(os.path.dirname(os.path.abspath(__file__)))
from aza_consent import (
has_valid_consent, record_consent, record_revoke,
get_consent_status, get_user_history, export_consent_log,
verify_chain_integrity, _CONSENT_FILE, _load_log, _save_log,
_get_consent_version,
)
if _CONSENT_FILE.exists():
_CONSENT_FILE.unlink()
print("=" * 70)
print("CONSENT AUDIT-PROOF AZA / MedWork")
print("=" * 70)
# --- 1) Speicherort + Schema ---
print("\n--- 1. SPEICHERORT + SCHEMA ---")
print(f"Datei: {_CONSENT_FILE}")
print(f"Format: JSON Array (append-only)")
print(f"Version: {_get_consent_version()}")
print(f"Quelle: legal/ai_consent.md -> Zeile mit 'Stand:'")
print("\nSchema pro Eintrag:")
print(" user_id string Benutzer-ID")
print(" consent_type string Immer 'ai_processing'")
print(" consent_version string Stand-Datum aus ai_consent.md")
print(" timestamp string UTC ISO-8601")
print(" source string 'ui' / 'test' / 'admin'")
print(" action string 'grant' oder 'revoke'")
print(" prev_hash string SHA-256 des vorherigen Eintrags")
print(" hash string SHA-256 dieses Eintrags")
# --- 2) Beispiel-Logeintraege ---
print("\n--- 2. BEISPIEL-LOGEINTRAEGE (sanitized) ---")
print("\n2a) GRANT (Zustimmung):")
e1 = record_consent("user_1", source="ui")
print(json.dumps(e1, indent=2, ensure_ascii=False))
print("\n2b) REVOKE (Widerruf):")
e2 = record_revoke("user_1", source="ui")
print(json.dumps(e2, indent=2, ensure_ascii=False))
print("\n2c) RE-GRANT (erneute Zustimmung):")
e3 = record_consent("user_1", source="ui")
print(json.dumps(e3, indent=2, ensure_ascii=False))
print("\nTimestamp-Pruefung:")
for i, e in enumerate([e1, e2, e3], 1):
ts = e["timestamp"]
is_utc = ts.endswith("+00:00") or ts.endswith("Z")
print(f" Eintrag {i}: {ts} -> UTC: {'JA' if is_utc else 'NEIN'}")
# --- 3) Integritaetsbeweis ---
print("\n--- 3. INTEGRITAETSBEWEIS ---")
print("\n3a) Intakte Log-Datei:")
ok, errors = verify_chain_integrity()
print(f" verify_chain_integrity() -> {'PASS' if ok else 'FAIL'}")
if errors:
for e in errors:
print(f" {e}")
print("\n3b) Manipulierte Log-Datei:")
backup_path = str(_CONSENT_FILE) + ".backup"
shutil.copy2(_CONSENT_FILE, backup_path)
entries = _load_log()
manipulated = copy.deepcopy(entries)
if manipulated:
original_ts = manipulated[0]["timestamp"]
manipulated[0]["timestamp"] = manipulated[0]["timestamp"][:-1] + "X"
_save_log(manipulated)
ok2, errors2 = verify_chain_integrity()
print(f" Manipulation: timestamp[0] '{original_ts}' -> '...X'")
print(f" verify_chain_integrity() -> {'PASS' if ok2 else 'FAIL'}")
if errors2:
for e in errors2:
print(f" {e}")
shutil.copy2(backup_path, _CONSENT_FILE)
os.remove(backup_path)
ok3, _ = verify_chain_integrity()
print(f" Nach Restore: verify_chain_integrity() -> {'PASS' if ok3 else 'FAIL'}")
# --- 4) Enforcement-Beweis ---
print("\n--- 4. ENFORCEMENT-BEWEIS ---")
if _CONSENT_FILE.exists():
_CONSENT_FILE.unlink()
print("\n4a) Ohne Consent:")
result_no = has_valid_consent("user_1")
print(f" has_valid_consent('user_1') -> {result_no}")
print(f" KI-Funktion: {'BLOCKIERT' if not result_no else 'ERLAUBT'}")
print(f" UI-Meldung: 'KI-Einwilligung fehlt oder wurde widerrufen.' (RuntimeError)")
print("\n4b) Nach Consent:")
record_consent("user_1", source="audit_test")
result_yes = has_valid_consent("user_1")
print(f" has_valid_consent('user_1') -> {result_yes}")
print(f" KI-Funktion: {'BLOCKIERT' if not result_yes else 'ERLAUBT'}")
print("\n4c) Nach Widerruf:")
record_revoke("user_1", source="audit_test")
result_rev = has_valid_consent("user_1")
print(f" has_valid_consent('user_1') -> {result_rev}")
print(f" KI-Funktion: {'BLOCKIERT' if not result_rev else 'ERLAUBT'}")
print("\n4d) Version-Change Simulation:")
record_consent("user_1", source="audit_test")
result_before = has_valid_consent("user_1")
print(f" Vor Version-Aenderung: has_valid_consent -> {result_before}")
entries = _load_log()
for e in entries:
if e.get("user_id") == "user_1" and e.get("action") == "grant":
e["consent_version"] = "Januar 2025"
_save_log(entries)
result_after = has_valid_consent("user_1")
print(f" consent_version im Log auf 'Januar 2025' geaendert")
print(f" Aktuelle Version: '{_get_consent_version()}'")
print(f" has_valid_consent -> {result_after}")
print(f" Ergebnis: {'Neu-Consent erforderlich (KORREKT)' if not result_after else 'FEHLER: sollte False sein'}")
# --- 5) Data Minimization Check ---
print("\n--- 5. DATA MINIMIZATION CHECK ---")
entries = _load_log()
all_keys = set()
for e in entries:
all_keys.update(e.keys())
print(f" Gespeicherte Felder: {sorted(all_keys)}")
sensitive_absent = True
sensitive_fields = ["transcript", "prompt", "response", "api_key", "secret",
"password", "audio", "kg_text", "patient_name", "diagnosis"]
found_sensitive = []
for key in all_keys:
if key.lower() in sensitive_fields:
found_sensitive.append(key)
sensitive_absent = False
for e in entries:
for key, val in e.items():
if isinstance(val, str) and len(val) > 200:
found_sensitive.append(f"{key} (Wert > 200 Zeichen)")
sensitive_absent = False
if sensitive_absent:
print(" Keine sensiblen Felder gefunden.")
print(" Kein Transkript/Prompt/KG-Inhalt im Log.")
print(" Kein API-Key/Secret/Passwort im Log.")
print(" Data Minimization: PASS")
else:
print(f" WARNUNG: Sensible Felder gefunden: {found_sensitive}")
print(" Data Minimization: FAIL")
# --- Cleanup ---
if _CONSENT_FILE.exists():
_CONSENT_FILE.unlink()
# --- Zusammenfassung ---
print("\n" + "=" * 70)
print("ZUSAMMENFASSUNG")
print("=" * 70)
checks = {
"Speicherort + Schema": True,
"Beispiel-Logeintraege": True,
"Integritaet intakt": ok,
"Integritaet manipuliert erkannt": not ok2,
"Enforcement ohne Consent": not result_no,
"Enforcement mit Consent": result_yes,
"Enforcement nach Widerruf": not result_rev,
"Version-Change erfordert Neu-Consent": not result_after,
"Data Minimization": sensitive_absent,
}
all_pass = True
for name, passed in checks.items():
status = "PASS" if passed else "FAIL"
if not passed:
all_pass = False
print(f" [{status}] {name}")
print(f"\nGESAMTERGEBNIS: {'ALLE TESTS BESTANDEN' if all_pass else 'TESTS FEHLGESCHLAGEN'}")
if not all_pass:
sys.exit(1)