202 lines
6.6 KiB
Python
202 lines
6.6 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
STEP 10a – Consent Audit-Proof Skript.
|
|||
|
|
Erzeugt alle Nachweise fuer die Audit-Dokumentation.
|
|||
|
|
"""
|
|||
|
|
import os, sys, json, copy, shutil
|
|||
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|||
|
|
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
|||
|
|
|
|||
|
|
from aza_consent import (
|
|||
|
|
has_valid_consent, record_consent, record_revoke,
|
|||
|
|
get_consent_status, get_user_history, export_consent_log,
|
|||
|
|
verify_chain_integrity, _CONSENT_FILE, _load_log, _save_log,
|
|||
|
|
_get_consent_version,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if _CONSENT_FILE.exists():
|
|||
|
|
_CONSENT_FILE.unlink()
|
|||
|
|
|
|||
|
|
print("=" * 70)
|
|||
|
|
print("CONSENT AUDIT-PROOF – AZA / MedWork")
|
|||
|
|
print("=" * 70)
|
|||
|
|
|
|||
|
|
# --- 1) Speicherort + Schema ---
|
|||
|
|
print("\n--- 1. SPEICHERORT + SCHEMA ---")
|
|||
|
|
print(f"Datei: {_CONSENT_FILE}")
|
|||
|
|
print(f"Format: JSON Array (append-only)")
|
|||
|
|
print(f"Version: {_get_consent_version()}")
|
|||
|
|
print(f"Quelle: legal/ai_consent.md -> Zeile mit 'Stand:'")
|
|||
|
|
|
|||
|
|
print("\nSchema pro Eintrag:")
|
|||
|
|
print(" user_id string Benutzer-ID")
|
|||
|
|
print(" consent_type string Immer 'ai_processing'")
|
|||
|
|
print(" consent_version string Stand-Datum aus ai_consent.md")
|
|||
|
|
print(" timestamp string UTC ISO-8601")
|
|||
|
|
print(" source string 'ui' / 'test' / 'admin'")
|
|||
|
|
print(" action string 'grant' oder 'revoke'")
|
|||
|
|
print(" prev_hash string SHA-256 des vorherigen Eintrags")
|
|||
|
|
print(" hash string SHA-256 dieses Eintrags")
|
|||
|
|
|
|||
|
|
# --- 2) Beispiel-Logeintraege ---
|
|||
|
|
print("\n--- 2. BEISPIEL-LOGEINTRAEGE (sanitized) ---")
|
|||
|
|
|
|||
|
|
print("\n2a) GRANT (Zustimmung):")
|
|||
|
|
e1 = record_consent("user_1", source="ui")
|
|||
|
|
print(json.dumps(e1, indent=2, ensure_ascii=False))
|
|||
|
|
|
|||
|
|
print("\n2b) REVOKE (Widerruf):")
|
|||
|
|
e2 = record_revoke("user_1", source="ui")
|
|||
|
|
print(json.dumps(e2, indent=2, ensure_ascii=False))
|
|||
|
|
|
|||
|
|
print("\n2c) RE-GRANT (erneute Zustimmung):")
|
|||
|
|
e3 = record_consent("user_1", source="ui")
|
|||
|
|
print(json.dumps(e3, indent=2, ensure_ascii=False))
|
|||
|
|
|
|||
|
|
print("\nTimestamp-Pruefung:")
|
|||
|
|
for i, e in enumerate([e1, e2, e3], 1):
|
|||
|
|
ts = e["timestamp"]
|
|||
|
|
is_utc = ts.endswith("+00:00") or ts.endswith("Z")
|
|||
|
|
print(f" Eintrag {i}: {ts} -> UTC: {'JA' if is_utc else 'NEIN'}")
|
|||
|
|
|
|||
|
|
# --- 3) Integritaetsbeweis ---
|
|||
|
|
print("\n--- 3. INTEGRITAETSBEWEIS ---")
|
|||
|
|
|
|||
|
|
print("\n3a) Intakte Log-Datei:")
|
|||
|
|
ok, errors = verify_chain_integrity()
|
|||
|
|
print(f" verify_chain_integrity() -> {'PASS' if ok else 'FAIL'}")
|
|||
|
|
if errors:
|
|||
|
|
for e in errors:
|
|||
|
|
print(f" {e}")
|
|||
|
|
|
|||
|
|
print("\n3b) Manipulierte Log-Datei:")
|
|||
|
|
backup_path = str(_CONSENT_FILE) + ".backup"
|
|||
|
|
shutil.copy2(_CONSENT_FILE, backup_path)
|
|||
|
|
|
|||
|
|
entries = _load_log()
|
|||
|
|
manipulated = copy.deepcopy(entries)
|
|||
|
|
if manipulated:
|
|||
|
|
original_ts = manipulated[0]["timestamp"]
|
|||
|
|
manipulated[0]["timestamp"] = manipulated[0]["timestamp"][:-1] + "X"
|
|||
|
|
_save_log(manipulated)
|
|||
|
|
|
|||
|
|
ok2, errors2 = verify_chain_integrity()
|
|||
|
|
print(f" Manipulation: timestamp[0] '{original_ts}' -> '...X'")
|
|||
|
|
print(f" verify_chain_integrity() -> {'PASS' if ok2 else 'FAIL'}")
|
|||
|
|
if errors2:
|
|||
|
|
for e in errors2:
|
|||
|
|
print(f" {e}")
|
|||
|
|
|
|||
|
|
shutil.copy2(backup_path, _CONSENT_FILE)
|
|||
|
|
os.remove(backup_path)
|
|||
|
|
|
|||
|
|
ok3, _ = verify_chain_integrity()
|
|||
|
|
print(f" Nach Restore: verify_chain_integrity() -> {'PASS' if ok3 else 'FAIL'}")
|
|||
|
|
|
|||
|
|
# --- 4) Enforcement-Beweis ---
|
|||
|
|
print("\n--- 4. ENFORCEMENT-BEWEIS ---")
|
|||
|
|
|
|||
|
|
if _CONSENT_FILE.exists():
|
|||
|
|
_CONSENT_FILE.unlink()
|
|||
|
|
|
|||
|
|
print("\n4a) Ohne Consent:")
|
|||
|
|
result_no = has_valid_consent("user_1")
|
|||
|
|
print(f" has_valid_consent('user_1') -> {result_no}")
|
|||
|
|
print(f" KI-Funktion: {'BLOCKIERT' if not result_no else 'ERLAUBT'}")
|
|||
|
|
print(f" UI-Meldung: 'KI-Einwilligung fehlt oder wurde widerrufen.' (RuntimeError)")
|
|||
|
|
|
|||
|
|
print("\n4b) Nach Consent:")
|
|||
|
|
record_consent("user_1", source="audit_test")
|
|||
|
|
result_yes = has_valid_consent("user_1")
|
|||
|
|
print(f" has_valid_consent('user_1') -> {result_yes}")
|
|||
|
|
print(f" KI-Funktion: {'BLOCKIERT' if not result_yes else 'ERLAUBT'}")
|
|||
|
|
|
|||
|
|
print("\n4c) Nach Widerruf:")
|
|||
|
|
record_revoke("user_1", source="audit_test")
|
|||
|
|
result_rev = has_valid_consent("user_1")
|
|||
|
|
print(f" has_valid_consent('user_1') -> {result_rev}")
|
|||
|
|
print(f" KI-Funktion: {'BLOCKIERT' if not result_rev else 'ERLAUBT'}")
|
|||
|
|
|
|||
|
|
print("\n4d) Version-Change Simulation:")
|
|||
|
|
record_consent("user_1", source="audit_test")
|
|||
|
|
result_before = has_valid_consent("user_1")
|
|||
|
|
print(f" Vor Version-Aenderung: has_valid_consent -> {result_before}")
|
|||
|
|
|
|||
|
|
entries = _load_log()
|
|||
|
|
for e in entries:
|
|||
|
|
if e.get("user_id") == "user_1" and e.get("action") == "grant":
|
|||
|
|
e["consent_version"] = "Januar 2025"
|
|||
|
|
_save_log(entries)
|
|||
|
|
|
|||
|
|
result_after = has_valid_consent("user_1")
|
|||
|
|
print(f" consent_version im Log auf 'Januar 2025' geaendert")
|
|||
|
|
print(f" Aktuelle Version: '{_get_consent_version()}'")
|
|||
|
|
print(f" has_valid_consent -> {result_after}")
|
|||
|
|
print(f" Ergebnis: {'Neu-Consent erforderlich (KORREKT)' if not result_after else 'FEHLER: sollte False sein'}")
|
|||
|
|
|
|||
|
|
# --- 5) Data Minimization Check ---
|
|||
|
|
print("\n--- 5. DATA MINIMIZATION CHECK ---")
|
|||
|
|
|
|||
|
|
entries = _load_log()
|
|||
|
|
all_keys = set()
|
|||
|
|
for e in entries:
|
|||
|
|
all_keys.update(e.keys())
|
|||
|
|
|
|||
|
|
print(f" Gespeicherte Felder: {sorted(all_keys)}")
|
|||
|
|
|
|||
|
|
sensitive_absent = True
|
|||
|
|
sensitive_fields = ["transcript", "prompt", "response", "api_key", "secret",
|
|||
|
|
"password", "audio", "kg_text", "patient_name", "diagnosis"]
|
|||
|
|
found_sensitive = []
|
|||
|
|
for key in all_keys:
|
|||
|
|
if key.lower() in sensitive_fields:
|
|||
|
|
found_sensitive.append(key)
|
|||
|
|
sensitive_absent = False
|
|||
|
|
|
|||
|
|
for e in entries:
|
|||
|
|
for key, val in e.items():
|
|||
|
|
if isinstance(val, str) and len(val) > 200:
|
|||
|
|
found_sensitive.append(f"{key} (Wert > 200 Zeichen)")
|
|||
|
|
sensitive_absent = False
|
|||
|
|
|
|||
|
|
if sensitive_absent:
|
|||
|
|
print(" Keine sensiblen Felder gefunden.")
|
|||
|
|
print(" Kein Transkript/Prompt/KG-Inhalt im Log.")
|
|||
|
|
print(" Kein API-Key/Secret/Passwort im Log.")
|
|||
|
|
print(" Data Minimization: PASS")
|
|||
|
|
else:
|
|||
|
|
print(f" WARNUNG: Sensible Felder gefunden: {found_sensitive}")
|
|||
|
|
print(" Data Minimization: FAIL")
|
|||
|
|
|
|||
|
|
# --- Cleanup ---
|
|||
|
|
if _CONSENT_FILE.exists():
|
|||
|
|
_CONSENT_FILE.unlink()
|
|||
|
|
|
|||
|
|
# --- Zusammenfassung ---
|
|||
|
|
print("\n" + "=" * 70)
|
|||
|
|
print("ZUSAMMENFASSUNG")
|
|||
|
|
print("=" * 70)
|
|||
|
|
|
|||
|
|
checks = {
|
|||
|
|
"Speicherort + Schema": True,
|
|||
|
|
"Beispiel-Logeintraege": True,
|
|||
|
|
"Integritaet intakt": ok,
|
|||
|
|
"Integritaet manipuliert erkannt": not ok2,
|
|||
|
|
"Enforcement ohne Consent": not result_no,
|
|||
|
|
"Enforcement mit Consent": result_yes,
|
|||
|
|
"Enforcement nach Widerruf": not result_rev,
|
|||
|
|
"Version-Change erfordert Neu-Consent": not result_after,
|
|||
|
|
"Data Minimization": sensitive_absent,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
all_pass = True
|
|||
|
|
for name, passed in checks.items():
|
|||
|
|
status = "PASS" if passed else "FAIL"
|
|||
|
|
if not passed:
|
|||
|
|
all_pass = False
|
|||
|
|
print(f" [{status}] {name}")
|
|||
|
|
|
|||
|
|
print(f"\nGESAMTERGEBNIS: {'ALLE TESTS BESTANDEN' if all_pass else 'TESTS FEHLGESCHLAGEN'}")
|
|||
|
|
if not all_pass:
|
|||
|
|
sys.exit(1)
|