202 lines
6.6 KiB
Python
202 lines
6.6 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
STEP 10a – Consent Audit-Proof Skript.
|
||
Erzeugt alle Nachweise fuer die Audit-Dokumentation.
|
||
"""
|
||
import os, sys, json, copy, shutil
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from aza_consent import (
|
||
has_valid_consent, record_consent, record_revoke,
|
||
get_consent_status, get_user_history, export_consent_log,
|
||
verify_chain_integrity, _CONSENT_FILE, _load_log, _save_log,
|
||
_get_consent_version,
|
||
)
|
||
|
||
if _CONSENT_FILE.exists():
|
||
_CONSENT_FILE.unlink()
|
||
|
||
print("=" * 70)
|
||
print("CONSENT AUDIT-PROOF – AZA / MedWork")
|
||
print("=" * 70)
|
||
|
||
# --- 1) Speicherort + Schema ---
|
||
print("\n--- 1. SPEICHERORT + SCHEMA ---")
|
||
print(f"Datei: {_CONSENT_FILE}")
|
||
print(f"Format: JSON Array (append-only)")
|
||
print(f"Version: {_get_consent_version()}")
|
||
print(f"Quelle: legal/ai_consent.md -> Zeile mit 'Stand:'")
|
||
|
||
print("\nSchema pro Eintrag:")
|
||
print(" user_id string Benutzer-ID")
|
||
print(" consent_type string Immer 'ai_processing'")
|
||
print(" consent_version string Stand-Datum aus ai_consent.md")
|
||
print(" timestamp string UTC ISO-8601")
|
||
print(" source string 'ui' / 'test' / 'admin'")
|
||
print(" action string 'grant' oder 'revoke'")
|
||
print(" prev_hash string SHA-256 des vorherigen Eintrags")
|
||
print(" hash string SHA-256 dieses Eintrags")
|
||
|
||
# --- 2) Beispiel-Logeintraege ---
|
||
print("\n--- 2. BEISPIEL-LOGEINTRAEGE (sanitized) ---")
|
||
|
||
print("\n2a) GRANT (Zustimmung):")
|
||
e1 = record_consent("user_1", source="ui")
|
||
print(json.dumps(e1, indent=2, ensure_ascii=False))
|
||
|
||
print("\n2b) REVOKE (Widerruf):")
|
||
e2 = record_revoke("user_1", source="ui")
|
||
print(json.dumps(e2, indent=2, ensure_ascii=False))
|
||
|
||
print("\n2c) RE-GRANT (erneute Zustimmung):")
|
||
e3 = record_consent("user_1", source="ui")
|
||
print(json.dumps(e3, indent=2, ensure_ascii=False))
|
||
|
||
print("\nTimestamp-Pruefung:")
|
||
for i, e in enumerate([e1, e2, e3], 1):
|
||
ts = e["timestamp"]
|
||
is_utc = ts.endswith("+00:00") or ts.endswith("Z")
|
||
print(f" Eintrag {i}: {ts} -> UTC: {'JA' if is_utc else 'NEIN'}")
|
||
|
||
# --- 3) Integritaetsbeweis ---
|
||
print("\n--- 3. INTEGRITAETSBEWEIS ---")
|
||
|
||
print("\n3a) Intakte Log-Datei:")
|
||
ok, errors = verify_chain_integrity()
|
||
print(f" verify_chain_integrity() -> {'PASS' if ok else 'FAIL'}")
|
||
if errors:
|
||
for e in errors:
|
||
print(f" {e}")
|
||
|
||
print("\n3b) Manipulierte Log-Datei:")
|
||
backup_path = str(_CONSENT_FILE) + ".backup"
|
||
shutil.copy2(_CONSENT_FILE, backup_path)
|
||
|
||
entries = _load_log()
|
||
manipulated = copy.deepcopy(entries)
|
||
if manipulated:
|
||
original_ts = manipulated[0]["timestamp"]
|
||
manipulated[0]["timestamp"] = manipulated[0]["timestamp"][:-1] + "X"
|
||
_save_log(manipulated)
|
||
|
||
ok2, errors2 = verify_chain_integrity()
|
||
print(f" Manipulation: timestamp[0] '{original_ts}' -> '...X'")
|
||
print(f" verify_chain_integrity() -> {'PASS' if ok2 else 'FAIL'}")
|
||
if errors2:
|
||
for e in errors2:
|
||
print(f" {e}")
|
||
|
||
shutil.copy2(backup_path, _CONSENT_FILE)
|
||
os.remove(backup_path)
|
||
|
||
ok3, _ = verify_chain_integrity()
|
||
print(f" Nach Restore: verify_chain_integrity() -> {'PASS' if ok3 else 'FAIL'}")
|
||
|
||
# --- 4) Enforcement-Beweis ---
|
||
print("\n--- 4. ENFORCEMENT-BEWEIS ---")
|
||
|
||
if _CONSENT_FILE.exists():
|
||
_CONSENT_FILE.unlink()
|
||
|
||
print("\n4a) Ohne Consent:")
|
||
result_no = has_valid_consent("user_1")
|
||
print(f" has_valid_consent('user_1') -> {result_no}")
|
||
print(f" KI-Funktion: {'BLOCKIERT' if not result_no else 'ERLAUBT'}")
|
||
print(f" UI-Meldung: 'KI-Einwilligung fehlt oder wurde widerrufen.' (RuntimeError)")
|
||
|
||
print("\n4b) Nach Consent:")
|
||
record_consent("user_1", source="audit_test")
|
||
result_yes = has_valid_consent("user_1")
|
||
print(f" has_valid_consent('user_1') -> {result_yes}")
|
||
print(f" KI-Funktion: {'BLOCKIERT' if not result_yes else 'ERLAUBT'}")
|
||
|
||
print("\n4c) Nach Widerruf:")
|
||
record_revoke("user_1", source="audit_test")
|
||
result_rev = has_valid_consent("user_1")
|
||
print(f" has_valid_consent('user_1') -> {result_rev}")
|
||
print(f" KI-Funktion: {'BLOCKIERT' if not result_rev else 'ERLAUBT'}")
|
||
|
||
print("\n4d) Version-Change Simulation:")
|
||
record_consent("user_1", source="audit_test")
|
||
result_before = has_valid_consent("user_1")
|
||
print(f" Vor Version-Aenderung: has_valid_consent -> {result_before}")
|
||
|
||
entries = _load_log()
|
||
for e in entries:
|
||
if e.get("user_id") == "user_1" and e.get("action") == "grant":
|
||
e["consent_version"] = "Januar 2025"
|
||
_save_log(entries)
|
||
|
||
result_after = has_valid_consent("user_1")
|
||
print(f" consent_version im Log auf 'Januar 2025' geaendert")
|
||
print(f" Aktuelle Version: '{_get_consent_version()}'")
|
||
print(f" has_valid_consent -> {result_after}")
|
||
print(f" Ergebnis: {'Neu-Consent erforderlich (KORREKT)' if not result_after else 'FEHLER: sollte False sein'}")
|
||
|
||
# --- 5) Data Minimization Check ---
|
||
print("\n--- 5. DATA MINIMIZATION CHECK ---")
|
||
|
||
entries = _load_log()
|
||
all_keys = set()
|
||
for e in entries:
|
||
all_keys.update(e.keys())
|
||
|
||
print(f" Gespeicherte Felder: {sorted(all_keys)}")
|
||
|
||
sensitive_absent = True
|
||
sensitive_fields = ["transcript", "prompt", "response", "api_key", "secret",
|
||
"password", "audio", "kg_text", "patient_name", "diagnosis"]
|
||
found_sensitive = []
|
||
for key in all_keys:
|
||
if key.lower() in sensitive_fields:
|
||
found_sensitive.append(key)
|
||
sensitive_absent = False
|
||
|
||
for e in entries:
|
||
for key, val in e.items():
|
||
if isinstance(val, str) and len(val) > 200:
|
||
found_sensitive.append(f"{key} (Wert > 200 Zeichen)")
|
||
sensitive_absent = False
|
||
|
||
if sensitive_absent:
|
||
print(" Keine sensiblen Felder gefunden.")
|
||
print(" Kein Transkript/Prompt/KG-Inhalt im Log.")
|
||
print(" Kein API-Key/Secret/Passwort im Log.")
|
||
print(" Data Minimization: PASS")
|
||
else:
|
||
print(f" WARNUNG: Sensible Felder gefunden: {found_sensitive}")
|
||
print(" Data Minimization: FAIL")
|
||
|
||
# --- Cleanup ---
|
||
if _CONSENT_FILE.exists():
|
||
_CONSENT_FILE.unlink()
|
||
|
||
# --- Zusammenfassung ---
|
||
print("\n" + "=" * 70)
|
||
print("ZUSAMMENFASSUNG")
|
||
print("=" * 70)
|
||
|
||
checks = {
|
||
"Speicherort + Schema": True,
|
||
"Beispiel-Logeintraege": True,
|
||
"Integritaet intakt": ok,
|
||
"Integritaet manipuliert erkannt": not ok2,
|
||
"Enforcement ohne Consent": not result_no,
|
||
"Enforcement mit Consent": result_yes,
|
||
"Enforcement nach Widerruf": not result_rev,
|
||
"Version-Change erfordert Neu-Consent": not result_after,
|
||
"Data Minimization": sensitive_absent,
|
||
}
|
||
|
||
all_pass = True
|
||
for name, passed in checks.items():
|
||
status = "PASS" if passed else "FAIL"
|
||
if not passed:
|
||
all_pass = False
|
||
print(f" [{status}] {name}")
|
||
|
||
print(f"\nGESAMTERGEBNIS: {'ALLE TESTS BESTANDEN' if all_pass else 'TESTS FEHLGESCHLAGEN'}")
|
||
if not all_pass:
|
||
sys.exit(1)
|