# -*- coding: utf-8 -*- """ AZA MedWork – Consent Management (Einwilligungs-Protokollierung). Append-only Log mit SHA-256-Integritaetskette (jeder Eintrag referenziert den Hash des vorherigen Eintrags). """ import os import sys import json import hashlib import time from datetime import datetime, timezone from pathlib import Path from typing import Optional from aza_config import get_writable_data_dir _BUNDLE_DIR = Path(__file__).resolve().parent _CONSENT_VERSION_FILE = _BUNDLE_DIR / "legal" / "ai_consent.md" _CONSENT_FILE = Path(get_writable_data_dir()) / "aza_consent_log.json" _CONSENT_TYPE_AI = "ai_processing" def _get_consent_version() -> str: """Extrahiert die Version (Stand-Datum) aus ai_consent.md.""" try: with open(_CONSENT_VERSION_FILE, "r", encoding="utf-8") as f: for line in f: if line.strip().lower().startswith("stand:"): return line.strip().split(":", 1)[1].strip() except (FileNotFoundError, OSError): pass return "unknown" def _load_log() -> list: if not _CONSENT_FILE.exists(): return [] try: with open(_CONSENT_FILE, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): return data except (json.JSONDecodeError, OSError): pass return [] def _save_log(entries: list) -> None: with open(_CONSENT_FILE, "w", encoding="utf-8") as f: json.dump(entries, f, ensure_ascii=False, indent=2) def _compute_hash(entry: dict, prev_hash: str) -> str: """SHA-256 ueber den Eintrag + vorherigen Hash (Integritaetskette).""" canonical = json.dumps({ "user_id": entry.get("user_id", ""), "consent_type": entry.get("consent_type", ""), "consent_version": entry.get("consent_version", ""), "timestamp": entry.get("timestamp", ""), "source": entry.get("source", ""), "action": entry.get("action", ""), "prev_hash": prev_hash, }, sort_keys=True, ensure_ascii=False) return hashlib.sha256(canonical.encode("utf-8")).hexdigest() def _get_last_hash(entries: list) -> str: if not entries: return "0" * 64 return entries[-1].get("hash", "0" * 64) def record_consent(user_id: str, source: str = "ui") -> dict: """Protokolliert eine neue Einwilligung (append-only).""" entries = _load_log() prev_hash = _get_last_hash(entries) entry = { "user_id": user_id, "consent_type": _CONSENT_TYPE_AI, "consent_version": _get_consent_version(), "timestamp": datetime.now(timezone.utc).isoformat(), "source": source, "action": "grant", } entry["prev_hash"] = prev_hash entry["hash"] = _compute_hash(entry, prev_hash) entries.append(entry) _save_log(entries) return entry def record_revoke(user_id: str, source: str = "ui") -> dict: """Protokolliert einen Widerruf (append-only).""" entries = _load_log() prev_hash = _get_last_hash(entries) entry = { "user_id": user_id, "consent_type": _CONSENT_TYPE_AI, "consent_version": _get_consent_version(), "timestamp": datetime.now(timezone.utc).isoformat(), "source": source, "action": "revoke", } entry["prev_hash"] = prev_hash entry["hash"] = _compute_hash(entry, prev_hash) entries.append(entry) _save_log(entries) return entry def has_valid_consent(user_id: str) -> bool: """Prueft ob eine gueltige, nicht widerrufene Einwilligung vorliegt UND ob die Consent-Version aktuell ist.""" entries = _load_log() current_version = _get_consent_version() last_action = None last_version = None for e in entries: if e.get("user_id") == user_id and e.get("consent_type") == _CONSENT_TYPE_AI: last_action = e.get("action") last_version = e.get("consent_version") if last_action != "grant": return False if last_version != current_version: return False return True def get_consent_status(user_id: str) -> dict: """Gibt den aktuellen Consent-Status zurueck.""" entries = _load_log() current_version = _get_consent_version() last_grant = None last_revoke = None for e in entries: if e.get("user_id") == user_id and e.get("consent_type") == _CONSENT_TYPE_AI: if e.get("action") == "grant": last_grant = e elif e.get("action") == "revoke": last_revoke = e return { "user_id": user_id, "has_consent": has_valid_consent(user_id), "current_version": current_version, "last_grant": last_grant, "last_revoke": last_revoke, "version_match": (last_grant or {}).get("consent_version") == current_version, } def get_user_history(user_id: str) -> list: """Gibt die komplette Consent-Historie eines Users zurueck.""" entries = _load_log() return [e for e in entries if e.get("user_id") == user_id] def export_consent_log(output_path: Optional[str] = None) -> str: """Exportiert das komplette Consent-Log als JSON. Returns: Pfad zur exportierten Datei.""" entries = _load_log() if output_path is None: ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_path = str(Path(get_writable_data_dir()) / f"consent_export_{ts}.json") export_data = { "export_timestamp": datetime.now(timezone.utc).isoformat(), "total_entries": len(entries), "current_consent_version": _get_consent_version(), "entries": entries, } with open(output_path, "w", encoding="utf-8") as f: json.dump(export_data, f, ensure_ascii=False, indent=2) return output_path def verify_chain_integrity() -> tuple[bool, list]: """Prueft die Integritaet der Hash-Kette. Returns: (ok, errors)""" entries = _load_log() errors = [] prev_hash = "0" * 64 for i, entry in enumerate(entries): expected = _compute_hash(entry, prev_hash) actual = entry.get("hash", "") stored_prev = entry.get("prev_hash", "") if stored_prev != prev_hash: errors.append(f"Eintrag {i}: prev_hash stimmt nicht (erwartet {prev_hash[:16]}..., gefunden {stored_prev[:16]}...)") if actual != expected: errors.append(f"Eintrag {i}: Hash stimmt nicht (erwartet {expected[:16]}..., gefunden {actual[:16]}...)") prev_hash = actual return len(errors) == 0, errors