Files
aza/AzA march 2026/aza_consent.py

212 lines
6.4 KiB
Python
Raw Normal View History

2026-03-25 22:03:39 +01:00
# -*- coding: utf-8 -*-
"""
AZA MedWork Consent Management (Einwilligungs-Protokollierung).
Append-only Log mit SHA-256-Integritaetskette (jeder Eintrag
referenziert den Hash des vorherigen Eintrags).
"""
import os
import sys
import json
import hashlib
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from aza_config import get_writable_data_dir
_BUNDLE_DIR = Path(__file__).resolve().parent
_CONSENT_VERSION_FILE = _BUNDLE_DIR / "legal" / "ai_consent.md"
_CONSENT_FILE = Path(get_writable_data_dir()) / "aza_consent_log.json"
_CONSENT_TYPE_AI = "ai_processing"
def _get_consent_version() -> str:
"""Extrahiert die Version (Stand-Datum) aus ai_consent.md."""
try:
with open(_CONSENT_VERSION_FILE, "r", encoding="utf-8") as f:
for line in f:
if line.strip().lower().startswith("stand:"):
return line.strip().split(":", 1)[1].strip()
except (FileNotFoundError, OSError):
pass
return "unknown"
def _load_log() -> list:
if not _CONSENT_FILE.exists():
return []
try:
with open(_CONSENT_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return data
except (json.JSONDecodeError, OSError):
pass
return []
def _save_log(entries: list) -> None:
with open(_CONSENT_FILE, "w", encoding="utf-8") as f:
json.dump(entries, f, ensure_ascii=False, indent=2)
def _compute_hash(entry: dict, prev_hash: str) -> str:
"""SHA-256 ueber den Eintrag + vorherigen Hash (Integritaetskette)."""
canonical = json.dumps({
"user_id": entry.get("user_id", ""),
"consent_type": entry.get("consent_type", ""),
"consent_version": entry.get("consent_version", ""),
"timestamp": entry.get("timestamp", ""),
"source": entry.get("source", ""),
"action": entry.get("action", ""),
"prev_hash": prev_hash,
}, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
def _get_last_hash(entries: list) -> str:
if not entries:
return "0" * 64
return entries[-1].get("hash", "0" * 64)
def record_consent(user_id: str, source: str = "ui") -> dict:
"""Protokolliert eine neue Einwilligung (append-only)."""
entries = _load_log()
prev_hash = _get_last_hash(entries)
entry = {
"user_id": user_id,
"consent_type": _CONSENT_TYPE_AI,
"consent_version": _get_consent_version(),
"timestamp": datetime.now(timezone.utc).isoformat(),
"source": source,
"action": "grant",
}
entry["prev_hash"] = prev_hash
entry["hash"] = _compute_hash(entry, prev_hash)
entries.append(entry)
_save_log(entries)
return entry
def record_revoke(user_id: str, source: str = "ui") -> dict:
"""Protokolliert einen Widerruf (append-only)."""
entries = _load_log()
prev_hash = _get_last_hash(entries)
entry = {
"user_id": user_id,
"consent_type": _CONSENT_TYPE_AI,
"consent_version": _get_consent_version(),
"timestamp": datetime.now(timezone.utc).isoformat(),
"source": source,
"action": "revoke",
}
entry["prev_hash"] = prev_hash
entry["hash"] = _compute_hash(entry, prev_hash)
entries.append(entry)
_save_log(entries)
return entry
def has_valid_consent(user_id: str) -> bool:
"""Prueft ob eine gueltige, nicht widerrufene Einwilligung vorliegt
UND ob die Consent-Version aktuell ist."""
entries = _load_log()
current_version = _get_consent_version()
last_action = None
last_version = None
for e in entries:
if e.get("user_id") == user_id and e.get("consent_type") == _CONSENT_TYPE_AI:
last_action = e.get("action")
last_version = e.get("consent_version")
if last_action != "grant":
return False
if last_version != current_version:
return False
return True
def get_consent_status(user_id: str) -> dict:
"""Gibt den aktuellen Consent-Status zurueck."""
entries = _load_log()
current_version = _get_consent_version()
last_grant = None
last_revoke = None
for e in entries:
if e.get("user_id") == user_id and e.get("consent_type") == _CONSENT_TYPE_AI:
if e.get("action") == "grant":
last_grant = e
elif e.get("action") == "revoke":
last_revoke = e
return {
"user_id": user_id,
"has_consent": has_valid_consent(user_id),
"current_version": current_version,
"last_grant": last_grant,
"last_revoke": last_revoke,
"version_match": (last_grant or {}).get("consent_version") == current_version,
}
def get_user_history(user_id: str) -> list:
"""Gibt die komplette Consent-Historie eines Users zurueck."""
entries = _load_log()
return [e for e in entries if e.get("user_id") == user_id]
def export_consent_log(output_path: Optional[str] = None) -> str:
"""Exportiert das komplette Consent-Log als JSON.
Returns: Pfad zur exportierten Datei."""
entries = _load_log()
if output_path is None:
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_path = str(Path(get_writable_data_dir()) / f"consent_export_{ts}.json")
export_data = {
"export_timestamp": datetime.now(timezone.utc).isoformat(),
"total_entries": len(entries),
"current_consent_version": _get_consent_version(),
"entries": entries,
}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(export_data, f, ensure_ascii=False, indent=2)
return output_path
def verify_chain_integrity() -> tuple[bool, list]:
"""Prueft die Integritaet der Hash-Kette.
Returns: (ok, errors)"""
entries = _load_log()
errors = []
prev_hash = "0" * 64
for i, entry in enumerate(entries):
expected = _compute_hash(entry, prev_hash)
actual = entry.get("hash", "")
stored_prev = entry.get("prev_hash", "")
if stored_prev != prev_hash:
errors.append(f"Eintrag {i}: prev_hash stimmt nicht (erwartet {prev_hash[:16]}..., gefunden {stored_prev[:16]}...)")
if actual != expected:
errors.append(f"Eintrag {i}: Hash stimmt nicht (erwartet {expected[:16]}..., gefunden {actual[:16]}...)")
prev_hash = actual
return len(errors) == 0, errors