212 lines
6.4 KiB
Python
212 lines
6.4 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
AZA MedWork – Consent Management (Einwilligungs-Protokollierung).
|
||
|
||
Append-only Log mit SHA-256-Integritaetskette (jeder Eintrag
|
||
referenziert den Hash des vorherigen Eintrags).
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import hashlib
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from aza_config import get_writable_data_dir
|
||
|
||
_BUNDLE_DIR = Path(__file__).resolve().parent
|
||
_CONSENT_VERSION_FILE = _BUNDLE_DIR / "legal" / "ai_consent.md"
|
||
_CONSENT_FILE = Path(get_writable_data_dir()) / "aza_consent_log.json"
|
||
|
||
_CONSENT_TYPE_AI = "ai_processing"
|
||
|
||
|
||
def _get_consent_version() -> str:
|
||
"""Extrahiert die Version (Stand-Datum) aus ai_consent.md."""
|
||
try:
|
||
with open(_CONSENT_VERSION_FILE, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
if line.strip().lower().startswith("stand:"):
|
||
return line.strip().split(":", 1)[1].strip()
|
||
except (FileNotFoundError, OSError):
|
||
pass
|
||
return "unknown"
|
||
|
||
|
||
def _load_log() -> list:
|
||
if not _CONSENT_FILE.exists():
|
||
return []
|
||
try:
|
||
with open(_CONSENT_FILE, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, list):
|
||
return data
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return []
|
||
|
||
|
||
def _save_log(entries: list) -> None:
|
||
with open(_CONSENT_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(entries, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _compute_hash(entry: dict, prev_hash: str) -> str:
|
||
"""SHA-256 ueber den Eintrag + vorherigen Hash (Integritaetskette)."""
|
||
canonical = json.dumps({
|
||
"user_id": entry.get("user_id", ""),
|
||
"consent_type": entry.get("consent_type", ""),
|
||
"consent_version": entry.get("consent_version", ""),
|
||
"timestamp": entry.get("timestamp", ""),
|
||
"source": entry.get("source", ""),
|
||
"action": entry.get("action", ""),
|
||
"prev_hash": prev_hash,
|
||
}, sort_keys=True, ensure_ascii=False)
|
||
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def _get_last_hash(entries: list) -> str:
|
||
if not entries:
|
||
return "0" * 64
|
||
return entries[-1].get("hash", "0" * 64)
|
||
|
||
|
||
def record_consent(user_id: str, source: str = "ui") -> dict:
|
||
"""Protokolliert eine neue Einwilligung (append-only)."""
|
||
entries = _load_log()
|
||
prev_hash = _get_last_hash(entries)
|
||
|
||
entry = {
|
||
"user_id": user_id,
|
||
"consent_type": _CONSENT_TYPE_AI,
|
||
"consent_version": _get_consent_version(),
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"source": source,
|
||
"action": "grant",
|
||
}
|
||
entry["prev_hash"] = prev_hash
|
||
entry["hash"] = _compute_hash(entry, prev_hash)
|
||
|
||
entries.append(entry)
|
||
_save_log(entries)
|
||
return entry
|
||
|
||
|
||
def record_revoke(user_id: str, source: str = "ui") -> dict:
|
||
"""Protokolliert einen Widerruf (append-only)."""
|
||
entries = _load_log()
|
||
prev_hash = _get_last_hash(entries)
|
||
|
||
entry = {
|
||
"user_id": user_id,
|
||
"consent_type": _CONSENT_TYPE_AI,
|
||
"consent_version": _get_consent_version(),
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"source": source,
|
||
"action": "revoke",
|
||
}
|
||
entry["prev_hash"] = prev_hash
|
||
entry["hash"] = _compute_hash(entry, prev_hash)
|
||
|
||
entries.append(entry)
|
||
_save_log(entries)
|
||
return entry
|
||
|
||
|
||
def has_valid_consent(user_id: str) -> bool:
|
||
"""Prueft ob eine gueltige, nicht widerrufene Einwilligung vorliegt
|
||
UND ob die Consent-Version aktuell ist."""
|
||
entries = _load_log()
|
||
current_version = _get_consent_version()
|
||
|
||
last_action = None
|
||
last_version = None
|
||
|
||
for e in entries:
|
||
if e.get("user_id") == user_id and e.get("consent_type") == _CONSENT_TYPE_AI:
|
||
last_action = e.get("action")
|
||
last_version = e.get("consent_version")
|
||
|
||
if last_action != "grant":
|
||
return False
|
||
if last_version != current_version:
|
||
return False
|
||
return True
|
||
|
||
|
||
def get_consent_status(user_id: str) -> dict:
|
||
"""Gibt den aktuellen Consent-Status zurueck."""
|
||
entries = _load_log()
|
||
current_version = _get_consent_version()
|
||
|
||
last_grant = None
|
||
last_revoke = None
|
||
|
||
for e in entries:
|
||
if e.get("user_id") == user_id and e.get("consent_type") == _CONSENT_TYPE_AI:
|
||
if e.get("action") == "grant":
|
||
last_grant = e
|
||
elif e.get("action") == "revoke":
|
||
last_revoke = e
|
||
|
||
return {
|
||
"user_id": user_id,
|
||
"has_consent": has_valid_consent(user_id),
|
||
"current_version": current_version,
|
||
"last_grant": last_grant,
|
||
"last_revoke": last_revoke,
|
||
"version_match": (last_grant or {}).get("consent_version") == current_version,
|
||
}
|
||
|
||
|
||
def get_user_history(user_id: str) -> list:
|
||
"""Gibt die komplette Consent-Historie eines Users zurueck."""
|
||
entries = _load_log()
|
||
return [e for e in entries if e.get("user_id") == user_id]
|
||
|
||
|
||
def export_consent_log(output_path: Optional[str] = None) -> str:
|
||
"""Exportiert das komplette Consent-Log als JSON.
|
||
Returns: Pfad zur exportierten Datei."""
|
||
entries = _load_log()
|
||
if output_path is None:
|
||
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||
output_path = str(Path(get_writable_data_dir()) / f"consent_export_{ts}.json")
|
||
|
||
export_data = {
|
||
"export_timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"total_entries": len(entries),
|
||
"current_consent_version": _get_consent_version(),
|
||
"entries": entries,
|
||
}
|
||
|
||
with open(output_path, "w", encoding="utf-8") as f:
|
||
json.dump(export_data, f, ensure_ascii=False, indent=2)
|
||
|
||
return output_path
|
||
|
||
|
||
def verify_chain_integrity() -> tuple[bool, list]:
|
||
"""Prueft die Integritaet der Hash-Kette.
|
||
Returns: (ok, errors)"""
|
||
entries = _load_log()
|
||
errors = []
|
||
prev_hash = "0" * 64
|
||
|
||
for i, entry in enumerate(entries):
|
||
expected = _compute_hash(entry, prev_hash)
|
||
actual = entry.get("hash", "")
|
||
stored_prev = entry.get("prev_hash", "")
|
||
|
||
if stored_prev != prev_hash:
|
||
errors.append(f"Eintrag {i}: prev_hash stimmt nicht (erwartet {prev_hash[:16]}..., gefunden {stored_prev[:16]}...)")
|
||
if actual != expected:
|
||
errors.append(f"Eintrag {i}: Hash stimmt nicht (erwartet {expected[:16]}..., gefunden {actual[:16]}...)")
|
||
|
||
prev_hash = actual
|
||
|
||
return len(errors) == 0, errors
|