# -*- coding: utf-8 -*- """ AZA Empfang - Backend-Routen V5: Admin, Devices, Federation, Channels. Serverseitige Auth, Benutzer, Sessions, Nachrichten, Aufgaben, Geraeteverwaltung, Kanaele, Praxis-Federation. Alle Daten practice-scoped. Backend ist die einzige Wahrheit. """ import base64 import hashlib import hmac import json import logging import os import re from collections import defaultdict import secrets import tempfile import time import unicodedata import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional, Tuple import asyncio from fastapi import ( APIRouter, Cookie, File, Form, HTTPException, Query, Request, Response, UploadFile, WebSocket, WebSocketDisconnect, ) from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse from pydantic import BaseModel, Field router = APIRouter() _log = logging.getLogger(__name__) _DATA_DIR = Path(__file__).resolve().parent / "data" _EMPFANG_FILE = _DATA_DIR / "empfang_nachrichten.json" _EXTERNAL_DM_MESSAGES_FILE = _DATA_DIR / "empfang_external_messages.json" _EXTERNAL_DM_TEXT_MAX = 16000 _EXTERNAL_DM_READS_FILE = _DATA_DIR / "empfang_external_dm_reads.json" _ATTACHMENT_MAX_BYTES = 5 * 1024 * 1024 _ATTACHMENT_MAX_PER_MESSAGE = 3 _ATTACHMENT_ALLOWED_MIME = frozenset({ "image/png", "image/jpeg", "image/webp", "audio/webm", "audio/ogg", "audio/opus", "audio/mpeg", "audio/mp4", "audio/wav", "audio/x-wav", "audio/aac", }) _ATTACHMENTS_DIR = _DATA_DIR / "empfang_attachments" _ATTACHMENTS_META_FILE = _DATA_DIR / "empfang_attachments_meta.json" _PRACTICES_FILE = _DATA_DIR / "empfang_practices.json" _ACCOUNTS_FILE = _DATA_DIR / "empfang_accounts.json" _SESSIONS_FILE = _DATA_DIR / "empfang_sessions.json" _TASKS_FILE = _DATA_DIR / "empfang_tasks.json" _DEVICES_FILE = _DATA_DIR / "empfang_devices.json" _CHANNELS_FILE = _DATA_DIR / "empfang_channels.json" _CONNECTIONS_FILE = _DATA_DIR / "empfang_connections.json" _LEGACY_DEFAULT_PID = "default" SESSION_MAX_AGE = 30 * 24 * 3600 # 30 Tage RESET_LINK_TTL_SEC = min( max(3600, int(os.environ.get("EMPFANG_RESET_TTL_SECONDS", str(86400)))), 7 * 86400, ) def _generate_practice_id() -> str: return f"prac_{uuid.uuid4().hex[:12]}" def _resolve_practice_id(request: Request) -> str: """Ermittelt die practice_id aus Session, Header oder Query. Kein stiller Fallback auf eine Default-Praxis.""" s = _session_from_request(request) if s: return s["practice_id"] pid = request.headers.get("X-Practice-Id", "").strip() if pid: return pid pid = request.query_params.get("practice_id", "").strip() if pid: return pid return "" def _require_practice_id(request: Request) -> str: """Wie _resolve_practice_id, aber wirft 400 wenn keine practice_id.""" pid = _resolve_practice_id(request) if not pid: raise HTTPException( status_code=400, detail="practice_id erforderlich (X-Practice-Id Header, Session oder Query)") return pid def _ensure_data_dir(): _DATA_DIR.mkdir(parents=True, exist_ok=True) # ===================================================================== # JSON helpers (atomic write) # ===================================================================== def _load_json(path: Path, default=None): if not path.is_file(): return default if default is not None else [] try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return default if default is not None else [] def _save_json(path: Path, data): _ensure_data_dir() tmp = str(path) + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) os.replace(tmp, str(path)) # ===================================================================== # Password hashing (PBKDF2 – no external dependency) # ===================================================================== def _hash_password(password: str, salt: str = None) -> tuple[str, str]: salt = salt or secrets.token_hex(16) dk = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000) return dk.hex(), salt def _verify_password(password: str, stored_hash: str, salt: str) -> bool: dk = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000) return hmac.compare_digest(dk.hex(), stored_hash) # ===================================================================== # Practices # ===================================================================== def _load_practices() -> dict: return _load_json(_PRACTICES_FILE, {}) def _save_practices(data: dict): _save_json(_PRACTICES_FILE, data) def _invite_code_key(raw: str) -> str: """Vergleicht Einladungscodes unabhaengig von Leerzeichen und Gedankenstrich-Varianten.""" s = (raw or "").strip().upper().replace(" ", "") for ch in ("\u2011", "\u2013", "\u2014", "\u2212"): s = s.replace(ch, "-") return s def _generate_chat_invite_code() -> str: """Lesbarer Chat-Einladungscode im Format CHAT-XXXX-XXXX.""" import random chars = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" part1 = "".join(random.choices(chars, k=4)) part2 = "".join(random.choices(chars, k=4)) return f"CHAT-{part1}-{part2}" def _ensure_practice(practice_id: str, name: str = "Meine Praxis", admin_email: str = "") -> dict: """Stellt sicher, dass eine Praxis mit dieser ID existiert.""" practices = _load_practices() if practice_id not in practices: practices[practice_id] = { "practice_id": practice_id, "name": name, "specialty": "", "invite_code": _generate_chat_invite_code(), "admin_email": admin_email, "created": time.strftime("%Y-%m-%d %H:%M:%S"), } _save_practices(practices) return practices[practice_id] def _migrate_legacy_to_practice(new_pid: str): """Migriert alle Daten von der alten 'default'-Praxis zur neuen practice_id. Wird einmalig beim ersten Provisioning aufgerufen.""" accounts = _load_accounts() migrated = False for a in accounts.values(): if a.get("practice_id") == _LEGACY_DEFAULT_PID: a["practice_id"] = new_pid migrated = True if migrated: _save_accounts(accounts) devices = _load_devices() for d in devices.values(): if d.get("practice_id") == _LEGACY_DEFAULT_PID: d["practice_id"] = new_pid _save_devices(devices) sessions = _load_sessions() for s in sessions.values(): if s.get("practice_id") == _LEGACY_DEFAULT_PID: s["practice_id"] = new_pid _save_sessions(sessions) messages = _load_messages() for m in messages: if m.get("practice_id", _LEGACY_DEFAULT_PID) == _LEGACY_DEFAULT_PID: m["practice_id"] = new_pid _save_messages(messages) practices = _load_practices() if _LEGACY_DEFAULT_PID in practices: old = practices.pop(_LEGACY_DEFAULT_PID) if new_pid not in practices: old["practice_id"] = new_pid practices[new_pid] = old _save_practices(practices) try: channels = _load_channels() for c in channels: if c.get("practice_id") == _LEGACY_DEFAULT_PID: c["practice_id"] = new_pid _save_channels(channels) except Exception: pass _migrate_old_users(new_pid) def _migrate_old_users(practice_id: str): """Migriert alte empfang_users.json Strings zu echten Accounts.""" old_file = _DATA_DIR / "empfang_users.json" if not old_file.is_file(): return try: names = json.loads(old_file.read_text(encoding="utf-8")) if not isinstance(names, list): return accounts = _load_accounts() for name in names: name = name.strip() if not name: continue exists = any( a["display_name"] == name and a["practice_id"] == practice_id for a in accounts.values() ) if not exists: uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(name.lower()) ln_mu = _preferred_unique_login_for_display( accounts, practice_id, name, "", ) accounts[uid] = { "user_id": uid, "practice_id": practice_id, "display_name": name, "login_name": ln_mu, "role": "mpa", "pw_hash": pw_hash, "pw_salt": pw_salt, "created": time.strftime("%Y-%m-%d %H:%M:%S"), "status": "active", "last_login": "", "email": "", } _save_accounts(accounts) except Exception: pass # ===================================================================== # Accounts (practice-scoped users with auth) # ===================================================================== def _load_accounts() -> dict: return _load_json(_ACCOUNTS_FILE, {}) def _save_accounts(data: dict): _save_json(_ACCOUNTS_FILE, data) def _practice_users(practice_id: str) -> list[dict]: accounts = _load_accounts() return [ { "user_id": a["user_id"], "display_name": a["display_name"], "role": a["role"], "login_name": (a.get("login_name") or "").strip(), "email": (a.get("email") or "").strip(), "has_password_hash": bool((a.get("pw_hash") or "").strip()), "specialty": (a.get("specialty") or "").strip(), "title": (a.get("title") or "").strip(), } for a in accounts.values() if a.get("practice_id") == practice_id ] # ===================================================================== # Login / Passwort-Reset Hilfen (Mandant, Mehrfach-E-Mail) # ===================================================================== def _is_likely_email(s: str) -> bool: """Grobe Erkennung E-Mail vs. Benutzername (Anzeigename).""" s = (s or "").strip() if "@" not in s or len(s) < 5: return False parts = s.split("@", 1) if len(parts) != 2 or not parts[0] or not parts[1]: return False return "." in parts[1] def _norm_email(e: str) -> str: return (e or "").strip().lower() def _practice_id_from_client(request: Request, body: dict) -> str: """practice_id fuer Login/Forgot ohne Session: Body, Header, Query.""" pid = (body.get("practice_id") or "").strip() if pid: return pid pid = request.headers.get("X-Practice-Id", "").strip() if pid: return pid return request.query_params.get("practice_id", "").strip() def _audit_invite_tail(raw: str) -> str: """Letzte Zeichen des Codes fuer Logs (kein vollstaendiger Invite/CHAT-Code).""" s = (raw or "").strip().upper().replace(" ", "") if len(s) < 5: return "----" return s[-4:] def _lookup_practice_id_by_invite(invite_raw: str) -> str: """Liefert practice_id fuer einen Einladungscode (normalisierter Vergleich) oder ''.""" if not (invite_raw or "").strip(): return "" practices = _load_practices() want = _invite_code_key(invite_raw) for pida, pdata in practices.items(): if _invite_code_key(pdata.get("invite_code")) == want: return pida return "" def _practice_id_for_login(request: Request, body: dict) -> tuple[str, str]: """Login: practice_id ohne Session-Cookie (sonst ueberholt alte Session die Einladung). Reihenfolge: invite_code > Body > Header > Query. Rueckgabe: (practice_id, quelle) mit quelle in 'invite'|'body'|'header'|'query'|''|'invite_invalid' """ invite_raw = (body.get("invite_code") or "").strip() if invite_raw: got = _lookup_practice_id_by_invite(invite_raw) if got: return got, "invite" return "", "invite_invalid" pid = (body.get("practice_id") or "").strip() if pid: return pid, "body" pid = request.headers.get("X-Practice-Id", "").strip() if pid: return pid, "header" pid = request.query_params.get("practice_id", "").strip() if pid: return pid, "query" return "", "" def _practice_label(practices: dict, pid: str) -> str: p = practices.get(pid) or {} return (p.get("name") or "").strip() or pid def _invite_join_clone_account( accounts: dict, source: dict, target_pid: str, now: str, ) -> dict: """Second account in target practice with same credentials (cross-practice chat join). Verhindert stillen Login in der alten Praxis, wenn ein gueltiger Einladungscode die Ziel-practice_id festlegt. Keine Praxis-Migration: Quellkonto bleibt unveraendert. """ display = (source.get("display_name") or "").strip() if not display: raise HTTPException( status_code=400, detail="Ungueltiges Quellkonto.", ) if any( _normalize_login_username(a.get("display_name") or "") == _normalize_login_username(display) and a.get("practice_id") == target_pid for a in accounts.values() ): raise HTTPException( status_code=409, detail=( "In dieser Praxis existiert bereits ein Benutzer mit diesem Namen. " "Bitte melden Sie sich mit diesem Konto an." ), ) uid = uuid.uuid4().hex[:12] pref_ln = (source.get("login_name") or "").strip() or display ln_assign = _preferred_unique_login_for_display(accounts, target_pid, pref_ln, "") role_s = (source.get("role") or "mpa").strip() if role_s == "admin": role_s = "mpa" elif role_s not in ("arzt", "mpa", "empfang"): role_s = "mpa" accounts[uid] = { "user_id": uid, "practice_id": target_pid, "display_name": display, "email": (source.get("email") or "").strip(), "login_name": ln_assign, "role": role_s, "pw_hash": source["pw_hash"], "pw_salt": source["pw_salt"], "created": now, "status": "active", "last_login": now, } if source.get("must_change_password"): accounts[uid]["must_change_password"] = True return accounts[uid] def _mask_email_for_response(addr: str) -> str: """Kurze Maskierung fuer API-Antworten (kein Klartext der vollstaendigen Adresse).""" s = (addr or "").strip() if "@" not in s: return "" local, _, domain = s.partition("@") dom = domain.strip() loc = local.strip() if not loc or not dom: return "" head = loc[0] if loc else "?" return f"{head}***@{dom}" def _forgot_password_neutral_payload() -> dict: """Gleiche Nutzer-Meldung wie zuvor, aber ohne falsche Zustellungs-Zusicherung.""" return { "success": True, "step": "none", "message": ( "Wenn ein passendes Konto existiert, wurde ein Link an die hinterlegte " "E-Mail-Adresse gesendet." ), "reset_token_created": False, "target_email_masked": "", "mail_delivered": False, "attempted_delivery": False, } def _send_reset_for_account(acc: dict) -> dict: """Token erstellen, Mail senden — ehrliche Statusfelder ohne Token-Leak.""" pid_acc = (acc.get("practice_id") or "").strip() practices = _load_practices() email_to = (acc.get("email") or "").strip() if not email_to and pid_acc: email_to = ((practices.get(pid_acc) or {}).get("admin_email") or "").strip() if not email_to and pid_acc: try: from stripe_routes import lookup_license_email_for_practice le = (lookup_license_email_for_practice(pid_acc) or "").strip() if le: email_to = le except Exception: pass if not email_to: return { "success": False, "step": "no_email", "message": ( "Fuer dieses Konto ist keine direkte E-Mail hinterlegt und keine " "gueltige Praxis-/Lizenz-E-Mail wurde gefunden. Bitte einen Administrator " "in der Hauptinstallation bitten oder Passwort dort setzen." ), "reset_token_created": False, "target_email_masked": "", "mail_delivered": False, "attempted_delivery": False, } masked = _mask_email_for_response(email_to) reset_token = secrets.token_urlsafe(32) resets = _load_json(_DATA_DIR / "empfang_resets.json", {}) resets[reset_token] = { "user_id": acc["user_id"], "email": _norm_email(email_to), "display_name": (acc.get("display_name") or "").strip(), "practice_id": pid_acc, "created": time.time(), "delivery_email": email_to.strip(), } for k in list(resets.keys()): if time.time() - resets[k].get("created", 0) > RESET_LINK_TTL_SEC: del resets[k] _save_json(_DATA_DIR / "empfang_resets.json", resets) _web_base = os.environ.get( "EMPFANG_WEB_BASE", "https://empfang.aza-medwork.ch/empfang" ).rstrip("/") reset_link = f"{_web_base}/?reset_token={reset_token}" ok = _send_reset_email(email_to, acc.get("display_name", ""), reset_link) if not ok: if reset_token in resets: del resets[reset_token] _save_json(_DATA_DIR / "empfang_resets.json", resets) return { "success": False, "step": "mail_failed", "message": ( "Der Reset-Link konnte nicht per E-Mail zugestellt werden. " "Bitte konfigurieren Sie SMTP oder RESEND_API_KEY auf dem Server " "oder wenden Sie sich an Ihren Administrator." ), "reset_token_created": False, "target_email_masked": masked, "mail_delivered": False, "attempted_delivery": True, } return { "success": True, "step": "sent", "message": ( "Ein Link zum Zurücksetzen wurde an die hinterlegte E-Mail-Adresse gesendet." ), "reset_token_created": True, "target_email_masked": masked, "mail_delivered": True, "attempted_delivery": True, } # ===================================================================== # Sessions (mit device_id) # ===================================================================== def _load_sessions() -> dict: return _load_json(_SESSIONS_FILE, {}) def _save_sessions(data: dict): _save_json(_SESSIONS_FILE, data) def _create_session(user_id: str, practice_id: str, display_name: str, role: str, device_id: str = None, user_agent: str = "", ip_addr: str = "") -> str: token = secrets.token_urlsafe(32) sessions = _load_sessions() sessions[token] = { "user_id": user_id, "practice_id": practice_id, "display_name": display_name, "role": role, "device_id": device_id or "", "created": time.time(), "last_active": time.time(), } _save_sessions(sessions) if device_id: _register_or_update_device( device_id=device_id, user_id=user_id, practice_id=practice_id, user_agent=user_agent, ip_addr=ip_addr, ) return token def _get_session(token: str) -> Optional[dict]: if not token: return None sessions = _load_sessions() s = sessions.get(token) if not s: return None if time.time() - s.get("created", 0) > SESSION_MAX_AGE: del sessions[token] _save_sessions(sessions) return None s["last_active"] = time.time() sessions[token] = s _save_sessions(sessions) return s def _delete_session(token: str): sessions = _load_sessions() if token in sessions: del sessions[token] _save_sessions(sessions) def _session_from_request(request: Request) -> Optional[dict]: token = request.cookies.get("aza_session") or "" if not token: auth = request.headers.get("Authorization", "") if auth.startswith("Bearer "): token = auth[7:] if not token: token = request.query_params.get("session_token", "") return _get_session(token) def _require_session(request: Request) -> dict: s = _session_from_request(request) if not s: raise HTTPException(status_code=401, detail="Nicht angemeldet") return s # ===================================================================== # Devices (Geraeteverwaltung) # ===================================================================== def _load_devices() -> dict: return _load_json(_DEVICES_FILE, {}) def _save_devices(data: dict): _save_json(_DEVICES_FILE, data) def _parse_device_info(user_agent: str) -> dict: """Einfache Heuristik zum Erkennen von Plattform, Geraetetyp und Name.""" ua = user_agent.lower() if "iphone" in ua: platform, device_type = "iOS", "mobile" elif "ipad" in ua: platform, device_type = "iOS", "tablet" elif "android" in ua: if "mobile" in ua: platform, device_type = "Android", "mobile" else: platform, device_type = "Android", "tablet" elif "macintosh" in ua or "mac os" in ua: platform, device_type = "macOS", "browser" elif "windows" in ua: platform, device_type = "Windows", "browser" elif "linux" in ua: platform, device_type = "Linux", "browser" else: platform, device_type = "Unbekannt", "browser" if "electron" in ua or "cursor" in ua: device_type = "desktop" browser = "Browser" if "edg/" in ua: browser = "Edge" elif "chrome" in ua and "chromium" not in ua: browser = "Chrome" elif "firefox" in ua: browser = "Firefox" elif "safari" in ua and "chrome" not in ua: browser = "Safari" device_name = f"{browser} auf {platform}" if device_type == "desktop": device_name = f"Desktop-App auf {platform}" elif device_type in ("mobile", "tablet"): device_name = f"{platform} {device_type.capitalize()}" return { "device_name": device_name, "platform": platform, "device_type": device_type, } def _make_device_id(user_id: str, user_agent: str) -> str: raw = f"{user_id}:{user_agent}" return hashlib.sha256(raw.encode()).hexdigest()[:12] def _record_practice_new_device_notice( practice_id: str, user_id: str, device_id: str, ip_addr: str ) -> None: pid = (practice_id or "").strip() if not pid or not device_id: return practices = _load_practices() p = practices.get(pid) if not isinstance(p, dict): return alerts = list(p.get("pdevice_alerts") or []) alerts.append( { "ts": time.time(), "user_id": (user_id or "").strip(), "device_suffix": str(device_id)[-8:], "ip": (ip_addr or "").strip(), } ) p["pdevice_alerts"] = alerts[-80:] practices[pid] = p _save_practices(practices) def _register_or_update_device(device_id: str, user_id: str, practice_id: str, user_agent: str, ip_addr: str): devices = _load_devices() now = time.strftime("%Y-%m-%d %H:%M:%S") info = _parse_device_info(user_agent) if device_id in devices: dev = devices[device_id] dev["last_active"] = now dev["ip_last"] = ip_addr dev["user_agent"] = user_agent dev["device_name"] = info["device_name"] dev["platform"] = info["platform"] dev["device_type"] = info["device_type"] else: devices[device_id] = { "device_id": device_id, "user_id": user_id, "practice_id": practice_id, "device_name": info["device_name"], "platform": info["platform"], "device_type": info["device_type"], "user_agent": user_agent, "first_seen": now, "last_active": now, "trust_status": "trusted", "ip_last": ip_addr, } try: _record_practice_new_device_notice(practice_id, user_id, device_id, ip_addr) except Exception: pass _save_devices(devices) def _extract_client_ip(request: Request) -> str: forwarded = request.headers.get("X-Forwarded-For", "") if forwarded: return forwarded.split(",")[0].strip() if request.client: return request.client.host return "" # ===================================================================== # Messages (practice-scoped) # ===================================================================== EMPFANG_MESSAGE_RETENTION_DAYS = 14 def _utc_now_iso_z() -> str: """UTC-Zeitstempel fuer Chat-Nachrichten, ISO-8601 mit Z (eindeutig UTC).""" return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _parse_msg_instant_utc_ts(raw: str) -> float: """Parse eines gespeicherten Chat-Zeitstempels zu POSIX-Sekunden (UTC). Naive Legacy-Strings ('YYYY-MM-DD HH:MM:SS' / '...T...' ohne TZ) werden wie in Produktion ueblich als UTC interpretiert (Server/VPS typisch).""" s = (raw or "").strip() if not s: return 0.0 s_norm = s.replace(" ", "T", 1) try: if s_norm.endswith("Z"): dt = datetime.fromisoformat(s_norm.replace("Z", "+00:00")) return dt.timestamp() dt = datetime.fromisoformat(s_norm) if dt.tzinfo is not None: return dt.timestamp() return dt.replace(tzinfo=timezone.utc).timestamp() except Exception: return 0.0 def _msg_timestamp_for_retention(m: dict) -> str: return (m.get("empfangen") or m.get("zeitstempel") or "").strip() def _msg_chrono_sort_key(m: dict) -> tuple[float, str]: raw = _msg_timestamp_for_retention(m) return (_parse_msg_instant_utc_ts(raw), str(m.get("id") or "")) def _message_within_retention(m: dict, cutoff_ts: float) -> bool: """Behalten wenn Zeitstempel unbekannt oder Augenblick >= cutoff (UTC-Sekunden).""" t = _msg_timestamp_for_retention(m) if not t: return True ts = _parse_msg_instant_utc_ts(t) if ts <= 0: return True return ts >= cutoff_ts def _prune_messages_by_retention(messages: list[dict]) -> tuple[list[dict], int]: cutoff_ts = time.time() - EMPFANG_MESSAGE_RETENTION_DAYS * 86400 kept = [m for m in messages if _message_within_retention(m, cutoff_ts)] return kept, len(messages) - len(kept) def _load_messages() -> list[dict]: messages = _load_json(_EMPFANG_FILE, []) kept, removed = _prune_messages_by_retention(messages) if removed > 0: _save_messages(kept) return kept def _save_messages(messages: list[dict]): _save_json(_EMPFANG_FILE, messages) def _load_external_dm_messages() -> list[dict]: """Cross-Praxis-Direktchat (separate Datei, kein Mix mit internen DMs).""" data = _load_json(_EXTERNAL_DM_MESSAGES_FILE, []) if not isinstance(data, list): return [] kept, removed = _prune_messages_by_retention(data) if removed > 0: _save_external_dm_messages(kept) return kept def _save_external_dm_messages(messages: list[dict]): _save_json(_EXTERNAL_DM_MESSAGES_FILE, messages) def _external_dm_conversation_id(pid_a: str, uid_a: str, pid_b: str, uid_b: str) -> str: """Stabile, symmetrische Konversations-ID; Kollision mit internem direct_conv_key ausgeschlossen.""" t1 = f"{(pid_a or '').strip()}\t{(uid_a or '').strip()}" t2 = f"{(pid_b or '').strip()}\t{(uid_b or '').strip()}" lo, hi = (t1, t2) if t1 <= t2 else (t2, t1) raw = f"external_dm|{lo}|{hi}" return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32] def _account_record_for_practice(uid: str, pid: str) -> Optional[dict]: if not uid or not pid: return None accounts = _load_accounts() acc = accounts.get(uid) if not isinstance(acc, dict): return None if (acc.get("practice_id") or "").strip() != pid: return None return acc def _account_is_sendable(acc: Optional[dict]) -> bool: if not acc: return False st = str(acc.get("status") or "active").strip().lower() return st not in ("deactivated", "deleted", "inactive") def _external_dm_authorize_send_direction( sender_pid: str, sender_uid: str, recipient_pid: str, recipient_uid: str, ) -> tuple[str, dict]: """Richtungsbezogen: darf sender an recipient schreiben? Liefert (link_id, link).""" sender_pid = (sender_pid or "").strip() sender_uid = (sender_uid or "").strip() recipient_pid = (recipient_pid or "").strip() recipient_uid = (recipient_uid or "").strip() if not sender_pid or not sender_uid or not recipient_pid or not recipient_uid: raise HTTPException(status_code=400, detail="Adressaten unvollstaendig") if sender_pid == recipient_pid: raise HTTPException( status_code=400, detail="Innerhalb einer Praxis bitte den bestehenden Direktchat nutzen.", ) s_acc = _account_record_for_practice(sender_uid, sender_pid) r_acc = _account_record_for_practice(recipient_uid, recipient_pid) if not _account_is_sendable(s_acc): raise HTTPException(status_code=403, detail="Absenderkonto nicht zulaessig") if not _account_is_sendable(r_acc): raise HTTPException(status_code=403, detail="Empfaenger nicht erreichbar") store = _load_practice_links_store() practice_link: Optional[dict] = None person_link: Optional[dict] = None for link in store.get("links") or []: if not isinstance(link, dict): continue st = str(link.get("status") or "").strip().lower() if st != "accepted": continue a = (link.get("source_practice_id") or "").strip() b = (link.get("target_practice_id") or "").strip() if {a, b} != {sender_pid, recipient_pid}: continue ct = _link_contact_type(link) if ct == _CONTACT_TYPE_PRACTICE: practice_link = link elif ct == _CONTACT_TYPE_PERSON: person_link = link if practice_link is not None: lid = str(practice_link.get("id") or "").strip() if not lid: raise HTTPException(status_code=500, detail="Verbindungsdaten ungueltig") return lid, practice_link if person_link is None: raise HTTPException(status_code=403, detail="Keine freigegebene Verbindung") sp = (person_link.get("source_practice_id") or "").strip() tp = (person_link.get("target_practice_id") or "").strip() su = (person_link.get("source_user_id") or "").strip() tu = _effective_person_target_user_id(person_link) if not su or not tu: raise HTTPException(status_code=403, detail="Keine freigegebene Verbindung") # Strikt 1:1 — nur su darf mit tu schreiben und umgekehrt. if sender_pid == sp and sender_uid == su and recipient_pid == tp and recipient_uid == tu: lid = str(person_link.get("id") or "").strip() if lid: return lid, person_link if sender_pid == tp and sender_uid == tu and recipient_pid == sp and recipient_uid == su: lid = str(person_link.get("id") or "").strip() if lid: return lid, person_link raise HTTPException(status_code=403, detail="Keine freigegebene Verbindung") def _external_dm_authorize_pair( a_pid: str, a_uid: str, b_pid: str, b_uid: str, ) -> tuple[str, dict]: """Thread-Zugriff: darf die Konversation zwischen a und b existieren (in eine Richtung).""" try: return _external_dm_authorize_send_direction(a_pid, a_uid, b_pid, b_uid) except HTTPException: pass return _external_dm_authorize_send_direction(b_pid, b_uid, a_pid, a_uid) def _external_dm_to_client_message(m: dict) -> dict: """Gleiches Kerndatenformat wie interne DMs fuer die Web-UI.""" ex0 = m.get("extras") if isinstance(m.get("extras"), dict) else {} ex = dict(ex0) mid = str(m.get("id") or "").strip() s_pid = str(m.get("sender_practice_id") or "").strip() s_uid = str(m.get("sender_user_id") or "").strip() s_dn = str(m.get("sender_display_name") or "").strip() r_pid = str(m.get("recipient_practice_id") or "").strip() r_uid = str(m.get("recipient_user_id") or "").strip() r_dn = str(m.get("recipient_display_name") or "").strip() ex.setdefault("sender_user_id", s_uid) ex.setdefault("recipient_user_id", r_uid) ex.setdefault("sender_practice_id", s_pid) ex.setdefault("recipient_practice_id", r_pid) ex.setdefault("conversation_id", str(m.get("conversation_id") or "")) ex.setdefault("external_link_id", str(m.get("external_link_id") or "")) ex.setdefault("external_dm", True) ex.setdefault("conv_type", "external_dm") ts = str(m.get("empfangen") or m.get("zeitstempel") or "").strip() return { "id": mid, "thread_id": mid, "kommentar": str(m.get("kommentar") or ""), "absender": (s_dn + " (Empfang)") if s_dn else " (Empfang)", "zeitstempel": ts, "empfangen": ts, "status": str(m.get("status") or "offen"), "extras": ex, } def _msg_practice(m: dict) -> str: return m.get("practice_id") or _LEGACY_DEFAULT_PID def _filter_by_practice(messages: list[dict], pid: str) -> list[dict]: return [m for m in messages if _msg_practice(m) == pid] # ===================================================================== # Tasks (practice-scoped, server-side) # ===================================================================== def _load_tasks() -> list[dict]: return _load_json(_TASKS_FILE, []) def _save_tasks(tasks: list[dict]): _save_json(_TASKS_FILE, tasks) # ===================================================================== # Channels (Kanaele, practice-scoped) # ===================================================================== def _load_channels() -> list[dict]: return _load_json(_CHANNELS_FILE, []) def _save_channels(channels: list[dict]): _save_json(_CHANNELS_FILE, channels) _DEFAULT_CHANNEL_DEFS = [ {"name": "Allgemein", "scope": "internal", "channel_type": "group", "allowed_roles": []}, {"name": "Aerzte", "scope": "internal", "channel_type": "group", "allowed_roles": ["arzt", "admin"]}, {"name": "MPA", "scope": "internal", "channel_type": "group", "allowed_roles": ["mpa", "admin"]}, {"name": "Empfang", "scope": "internal", "channel_type": "group", "allowed_roles": ["empfang", "admin"]}, ] def _ensure_default_channels(practice_id: str): channels = _load_channels() practice_channels = [c for c in channels if c.get("practice_id") == practice_id] if practice_channels: return now = time.strftime("%Y-%m-%d %H:%M:%S") for defn in _DEFAULT_CHANNEL_DEFS: channels.append({ "channel_id": uuid.uuid4().hex[:12], "practice_id": practice_id, "name": defn["name"], "scope": defn["scope"], "channel_type": defn["channel_type"], "allowed_roles": defn["allowed_roles"], "connection_id": "", "created": now, "created_by": "", }) _save_channels(channels) # ===================================================================== # Connections / Federation (Praxis-zu-Praxis) # ===================================================================== def _load_connections() -> list[dict]: return _load_json(_CONNECTIONS_FILE, []) def _save_connections(conns: list[dict]): _save_json(_CONNECTIONS_FILE, conns) # ===================================================================== # AUTH ENDPOINTS # ===================================================================== @router.post("/auth/setup") async def auth_setup(request: Request): """Erstellt den ersten Admin-Benutzer und eine neue Praxis.""" try: body = await request.json() except Exception: body = {} name = (body.get("name") or "").strip() password = (body.get("password") or "").strip() practice_name = (body.get("practice_name") or "").strip() or "Meine Praxis" admin_email = (body.get("email") or "").strip() pid = (body.get("practice_id") or "").strip() if not name or not password or len(password) < 4: raise HTTPException(status_code=400, detail="Name und Passwort (min. 4 Zeichen) erforderlich") if not pid: pid = _generate_practice_id() practice = _ensure_practice(pid, name=practice_name, admin_email=admin_email) accounts = _load_accounts() practice_accounts = [a for a in accounts.values() if a.get("practice_id") == pid] if practice_accounts: raise HTTPException(status_code=409, detail="Setup bereits abgeschlossen. Bitte Login verwenden.") if practice_name: practices = _load_practices() practices[pid]["name"] = practice_name if admin_email: practices[pid]["admin_email"] = admin_email _save_practices(practices) practice = practices[pid] uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(password) now = time.strftime("%Y-%m-%d %H:%M:%S") ln_u = _allocate_unique_login_name(accounts, pid, name) accounts[uid] = { "user_id": uid, "practice_id": pid, "display_name": name, "email": admin_email, "login_name": ln_u, "role": "admin", "pw_hash": pw_hash, "pw_salt": pw_salt, "created": now, "status": "active", "last_login": now, } _save_accounts(accounts) _ensure_default_channels(pid) ua = request.headers.get("User-Agent", "") ip = _extract_client_ip(request) dev_id = _make_device_id(uid, ua) token = _create_session(uid, pid, name, "admin", device_id=dev_id, user_agent=ua, ip_addr=ip) resp = JSONResponse(content={ "success": True, "user_id": uid, "role": "admin", "display_name": name, "practice_id": pid, "practice_name": practice.get("name", ""), "invite_code": practice.get("invite_code", ""), }) resp.set_cookie("aza_session", token, httponly=True, samesite="lax", max_age=SESSION_MAX_AGE) return resp @router.post("/auth/login") async def auth_login(request: Request): """Login mit Benutzername (Anzeigename) oder E-Mail + Passwort, mandantenbewusst.""" try: body = await request.json() except Exception: body = {} raw = (body.get("name") or "").strip() password = (body.get("password") or "").strip() pid, pid_src = _practice_id_for_login(request, body) if pid_src == "invite_invalid": raise HTTPException( status_code=403, detail="Ungueltiger Einladungscode — bitte aktuellen Link aus der Hauptinstallation verwenden.", ) if not raw or not password: raise HTTPException( status_code=400, detail="Login-Name oder E-Mail und Passwort erforderlich", ) accounts = _load_accounts() scoped = ( [a for a in accounts.values() if a.get("practice_id") == pid] if pid else list(accounts.values()) ) target = None login_recovered_practice = False if _is_likely_email(raw): em = _norm_email(raw) matches = [ a for a in scoped if em and _norm_email(a.get("email") or "") == em ] if len(matches) == 1: target = matches[0] elif len(matches) == 0: target = None else: if pid: raise HTTPException( status_code=409, detail={ "code": "ambiguous_email", "message": ( "Diese E-Mail ist mehreren Benutzern zugeordnet. " "Bitte melden Sie sich mit Ihrem Benutzernamen an." ), "candidates": [ { "display_name": (a.get("display_name") or ""), "login_name": (a.get("login_name") or "").strip(), } for a in matches ], }, ) raise HTTPException( status_code=409, detail={ "code": "ambiguous_email", "message": ( "Diese E-Mail ist mehreren Benutzern zugeordnet. " "Bitte melden Sie sich mit Ihrem Benutzernamen an." ), }, ) else: matches, _via_login = _resolve_browser_login_matches(scoped, raw) if len(matches) == 1: target = matches[0] elif len(matches) == 0: target = None else: raise HTTPException( status_code=409, detail={ "code": "ambiguous_username", "message": ( "Diese Anmeldedaten sind in dieser Praxis nicht eindeutig. " "Ein Administrator muss im Hauptfenster fuer die betroffenen Konten einen " "eindeutigen Login-Namen festlegen. Bitte verwenden Sie danach diesen " "Benutzernamen fuer die Anmeldung." ), }, ) if not target and pid and not _is_likely_email(raw): gmatches, _via_global = _resolve_browser_login_matches( list(accounts.values()), raw, ) if len(gmatches) == 1: target = gmatches[0] if pid_src != "invite": login_recovered_practice = True if not target: raise HTTPException( status_code=401, detail="Benutzer nicht gefunden oder falsches Passwort", ) tpid = (target.get("practice_id") or "").strip() invite_will_clone = bool(pid_src == "invite" and pid and tpid and tpid != pid) if pid and tpid != pid and not login_recovered_practice and not invite_will_clone: raise HTTPException( status_code=403, detail=( "Anmeldung passt nicht zur gewaehlten Praxis (Einladungscode / gespeicherte Praxis-ID). " "Bitte den Einladungslink der Hauptinstallation erneut oeffnen." ), ) if not pid or login_recovered_practice: if not invite_will_clone: pid = tpid if target.get("status") == "deactivated": raise HTTPException( status_code=403, detail="Konto deaktiviert. Bitte Administrator kontaktieren.", ) if not _verify_password(password, target["pw_hash"], target["pw_salt"]): raise HTTPException( status_code=401, detail="Benutzer nicht gefunden oder falsches Passwort", ) did_invite_clone = False now = time.strftime("%Y-%m-%d %H:%M:%S") old_practice_before = "" source_user_before = "" if invite_will_clone: old_practice_before = (target.get("practice_id") or "").strip() source_user_before = (target.get("user_id") or "").strip() if invite_will_clone: _ensure_practice(pid) _ensure_default_channels(pid) target = _invite_join_clone_account(accounts, target, pid, now) did_invite_clone = True _log.info( "AZA_EMPFANG_PRACTICE_JOIN_CLONE src_user=%s old_practice=%s new_practice=%s " "clone_user=%s role=%s admin_assigned=%s invite_tail=%s", (source_user_before or "")[:12], (old_practice_before or "")[:16], (pid or "")[:16], (target.get("user_id") or "")[:12], (target.get("role") or ""), str(_account_has_practice_admin_privileges(target)).lower(), _audit_invite_tail(body.get("invite_code")), ) else: target["last_login"] = now _save_accounts(accounts) dn = (target.get("display_name") or raw).strip() ua = request.headers.get("User-Agent", "") ip = _extract_client_ip(request) dev_id = body.get("device_id") or _make_device_id(target["user_id"], ua) token = _create_session( target["user_id"], pid, dn, target["role"], device_id=dev_id, user_agent=ua, ip_addr=ip, ) bind_src = ( "invite_code_join" if did_invite_clone else ( "invite_code" if pid_src == "invite" else ( "username_recovered_practice" if login_recovered_practice else ( "stored_practice_id" if pid_src in ("body", "header", "query") else "account" ) ) ) ) result = { "success": True, "user_id": target["user_id"], "role": target["role"], "display_name": dn, "practice_id": pid, "practice_bind_source": bind_src, } if target.get("must_change_password"): result["must_change_password"] = True resp = JSONResponse(content=result) resp.set_cookie( "aza_session", token, httponly=True, samesite="lax", max_age=SESSION_MAX_AGE, ) return resp @router.post("/auth/register") async def auth_register(request: Request): """Neuen Benutzer registrieren mit Einladungscode.""" try: body = await request.json() except Exception: body = {} invite_code = (body.get("invite_code") or "").strip() name = (body.get("name") or "").strip() password = (body.get("password") or "").strip() role = (body.get("role") or "mpa").strip() email = (body.get("email") or "").strip() if not invite_code or not name or not password or len(password) < 4: raise HTTPException(status_code=400, detail="Einladungscode, Name und Passwort (min. 4 Zeichen) erforderlich") if role not in ("arzt", "mpa", "empfang"): role = "mpa" target_pid = _lookup_practice_id_by_invite(invite_code) if not target_pid: raise HTTPException(status_code=403, detail="Ungueltiger Einladungscode") accounts = _load_accounts() exists = any( _normalize_login_username(a.get("display_name") or "") == _normalize_login_username(name) and a.get("practice_id") == target_pid for a in accounts.values() ) if exists: raise HTTPException(status_code=409, detail="Benutzername bereits vergeben") uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(password) now = time.strftime("%Y-%m-%d %H:%M:%S") ln_assign = _preferred_unique_login_for_display(accounts, target_pid, name, "") accounts[uid] = { "user_id": uid, "practice_id": target_pid, "display_name": name, "email": email, "login_name": ln_assign, "role": role, "pw_hash": pw_hash, "pw_salt": pw_salt, "created": now, "status": "active", "last_login": now, } _save_accounts(accounts) _ensure_default_channels(target_pid) ua = request.headers.get("User-Agent", "") ip = _extract_client_ip(request) dev_id = _make_device_id(uid, ua) token = _create_session(uid, target_pid, name, role, device_id=dev_id, user_agent=ua, ip_addr=ip) resp = JSONResponse(content={ "success": True, "user_id": uid, "role": role, "display_name": name, "practice_id": target_pid, "practice_bind_source": "invite_code", }) resp.set_cookie("aza_session", token, httponly=True, samesite="lax", max_age=SESSION_MAX_AGE) return resp @router.get("/auth/me") async def auth_me(request: Request): """Aktuelle Session pruefen. Liefert User-Daten oder 401.""" s = _session_from_request(request) if not s: return JSONResponse(status_code=401, content={"authenticated": False}) return JSONResponse(content={ "authenticated": True, "user_id": s["user_id"], "display_name": s["display_name"], "role": s["role"], "practice_id": s["practice_id"], }) @router.post("/auth/logout") async def auth_logout(request: Request): token = request.cookies.get("aza_session", "") s = _session_from_request(request) if s: try: _presence_clear_user( (s.get("practice_id") or "").strip(), (s.get("user_id") or "").strip(), ) except Exception: pass _delete_session(token) resp = JSONResponse(content={"success": True}) resp.delete_cookie("aza_session") return resp @router.get("/auth/resolve_invite") async def auth_resolve_invite(code: str = Query("")): """Loesst einen Chat-Einladungscode in practice_id auf (ohne Login). Fuer Browser-Start mit ?invite=.""" raw = (code or "").strip() if not raw: return JSONResponse(content={"valid": False}) pid = _lookup_practice_id_by_invite(raw) if not pid: return JSONResponse( content={"valid": False, "detail": "Ungueltiger oder veralteter Einladungscode"}, ) practices = _load_practices() pdata = practices.get(pid, {}) return JSONResponse(content={ "valid": True, "practice_id": pid, "practice_name": (pdata.get("name") or "").strip(), "invite_code": (pdata.get("invite_code") or "").strip(), }) # ===================================================================== # Externe Praxen (Praxis-zu-Praxis-Verbindungen) # ===================================================================== # # Datenmodell: # data/empfang_practice_links.json = {"links": [{ # "id":"lnk_...", # "source_practice_id":"prac_A", # "target_practice_id":"prac_B", # "source_practice_name":"...", # "target_practice_name":"...", # "status":"accepted"|"pending_outgoing"|"pending_incoming"| # "rejected"|"removed", # "created_by_user_id":"u_...", # "created_at":"...Z", # "updated_at":"...Z", # "invite_code_used":"CHAT-..." # }]} # # Semantik: # - Eine externe Praxis-Verbindung wird ausschliesslich UEBER DEN CHAT- # EINLADUNGSCODE der ZIELPRAXIS angelegt. Der Code gilt als # beidseitige Genehmigung, weil ihn die Zielpraxis bewusst aus der # Adminverwaltung exportiert hat. Status nach Anlegen: "accepted". # - Eine externe Verbindung ANDERT NICHT die eigene practice_id und # legt KEINE Benutzer in der Zielpraxis an. Rollen wie admin/mpa/arzt # werden NICHT praxisuebergreifend uebernommen. # - Beide Praxen sehen den Link in ihrer eigenen GET-Liste. # - Auth: Browser-Cookie-Session oder Desktop mit X-API-Token + # X-Practice-Id + X-AzA-Empfang-User-Id (wie Presence/Desktop-Shell). _PRACTICE_LINKS_FILE = _DATA_DIR / "empfang_practice_links.json" def _load_practice_links_store() -> dict: data = _load_json(_PRACTICE_LINKS_FILE, {"links": []}) if not isinstance(data, dict): return {"links": []} links = data.get("links") if not isinstance(links, list): data["links"] = [] return data def _save_practice_links_store(data: dict) -> None: _save_json(_PRACTICE_LINKS_FILE, data) def _generate_practice_link_id() -> str: return f"lnk_{int(time.time() * 1000)}_{secrets.token_hex(4)}" def _practice_links_visible_to(pid: str) -> list: """Alle Links, in denen pid entweder source oder target ist und die nicht "removed" sind (removed bleibt fuer Audit erhalten, ist aber UI- seitig unsichtbar).""" store = _load_practice_links_store() out = [] for link in store.get("links") or []: if not isinstance(link, dict): continue if (link.get("status") or "") == "removed": continue if link.get("source_practice_id") == pid or link.get("target_practice_id") == pid: out.append(link) return out def _serialize_practice_link_for(pid: str, link: dict) -> dict: """Fuer GET-Antworten: wir markieren explizit, ob pid in source oder target steht, damit der Client schnell rendern kann ("eingehend" vs. "ausgehend"). Wir geben absichtlich Display-Namen mit.""" is_outgoing = link.get("source_practice_id") == pid peer_pid = link.get("target_practice_id") if is_outgoing else link.get("source_practice_id") peer_name = link.get("target_practice_name") if is_outgoing else link.get("source_practice_name") return { "id": str(link.get("id") or ""), "status": str(link.get("status") or ""), "direction": "outgoing" if is_outgoing else "incoming", "peer_practice_id": str(peer_pid or ""), "peer_practice_name": str(peer_name or ""), "created_at": str(link.get("created_at") or ""), "updated_at": str(link.get("updated_at") or ""), "created_by_user_id": str(link.get("created_by_user_id") or ""), "last_message_at": str(link.get("last_message_at") or ""), } def _find_practice_link(link_id: str, pid: str) -> Optional[dict]: """Liefert den Roh-Link, wenn pid Teilhaber ist; sonst None.""" if not link_id or not pid: return None store = _load_practice_links_store() for link in store.get("links") or []: if not isinstance(link, dict): continue if link.get("id") != link_id: continue if link.get("source_practice_id") == pid or link.get("target_practice_id") == pid: return link return None def _practice_name_safe(pid: str) -> str: if not pid: return "" try: practices = _load_practices() return str((practices.get(pid) or {}).get("name") or "").strip() except Exception: return "" @router.get("/external-practices") async def empfang_external_practices_list(request: Request): """Liste der eigenen externen Praxis-Verbindungen.""" s = _require_session(request) pid = (s.get("practice_id") or "").strip() if not pid: raise HTTPException(status_code=400, detail="Session unvollstaendig") own_links = [ x for x in _practice_links_visible_to(pid) if _link_contact_type(x) == _CONTACT_TYPE_PRACTICE ] own_links.sort(key=lambda x: str(x.get("updated_at") or x.get("created_at") or ""), reverse=True) return JSONResponse(content={ "success": True, "external_practices": [_serialize_practice_link_for(pid, link) for link in own_links], }) @router.post("/external-practices/link-by-code") async def empfang_external_practices_link_by_code(request: Request): """Verbindet die eigene Praxis mit einer fremden Praxis via Chat- Einladungscode der Zielpraxis. Andert NICHT die eigene practice_id. Body: {"code": "CHAT-...-..."} Sicherheit: - Code ist beidseitige Genehmigung (kommt aus B-Adminverwaltung). - Selbst-Link (source==target) wird abgelehnt. - Mehrfach-Linken wird idempotent gehandhabt (Update statt Duplikat). """ s = _require_session(request) source_pid = (s.get("practice_id") or "").strip() source_uid = (s.get("user_id") or "").strip() if not source_pid or not source_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") try: body = await request.json() except Exception: body = {} raw_code = "" if isinstance(body, dict): raw_code = (body.get("code") or body.get("invite_code") or "").strip() if not raw_code: raise HTTPException(status_code=400, detail="code erforderlich") target_pid = _lookup_practice_id_by_invite(raw_code) if not target_pid: raise HTTPException( status_code=404, detail="Code ist keiner Praxis zugeordnet oder bereits abgelaufen.", ) if target_pid == source_pid: raise HTTPException( status_code=400, detail="Sie koennen Ihre eigene Praxis nicht als externen Kontakt hinzufuegen.", ) practices = _load_practices() src_name = str((practices.get(source_pid) or {}).get("name") or "").strip() tgt_name = str((practices.get(target_pid) or {}).get("name") or "").strip() store = _load_practice_links_store() now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) # Suche existierenden Link (egal in welcher Richtung gespeichert). existing = None for link in store.get("links") or []: if not isinstance(link, dict): continue a = link.get("source_practice_id") b = link.get("target_practice_id") if {a, b} == {source_pid, target_pid} \ and _link_contact_type(link) == _CONTACT_TYPE_PRACTICE: existing = link break if existing is not None: existing["contact_type"] = _CONTACT_TYPE_PRACTICE existing["status"] = "accepted" existing["updated_at"] = now if not existing.get("invite_code_used"): existing["invite_code_used"] = raw_code # Display-Namen aktualisieren (falls Umbenennung). if existing.get("source_practice_id") == source_pid: existing["source_practice_name"] = src_name existing["target_practice_name"] = tgt_name else: existing["source_practice_name"] = tgt_name existing["target_practice_name"] = src_name _save_practice_links_store(store) out = existing _log.info( "AZA_EMPFANG_EXTLINK_UPDATED source=%s target=%s status=%s", (source_pid or "")[:16], (target_pid or "")[:16], out["status"], ) else: out = { "id": _generate_practice_link_id(), "contact_type": _CONTACT_TYPE_PRACTICE, "source_practice_id": source_pid, "target_practice_id": target_pid, "source_practice_name": src_name, "target_practice_name": tgt_name, "status": "accepted", # Code = beidseitige Genehmigung "created_by_user_id": source_uid, "created_at": now, "updated_at": now, "invite_code_used": raw_code, } store.setdefault("links", []).append(out) _save_practice_links_store(store) _log.info( "AZA_EMPFANG_EXTLINK_CREATED source=%s target=%s", (source_pid or "")[:16], (target_pid or "")[:16], ) return JSONResponse(content={ "success": True, "external_practice": _serialize_practice_link_for(source_pid, out), }) @router.post("/external-practices/{link_id}/accept") async def empfang_external_practices_accept(link_id: str, request: Request): """Markiert eine eingehende externe Verbindung als accepted. Erlaubt fuer Mitglieder der Zielpraxis (die der ankommenden Seite). Idempotent: bereits accepted -> bleibt accepted. """ s = _require_session(request) pid = (s.get("practice_id") or "").strip() if not pid: raise HTTPException(status_code=400, detail="Session unvollstaendig") link = _find_practice_link(link_id, pid) if not link or _link_contact_type(link) != _CONTACT_TYPE_PRACTICE: raise HTTPException(status_code=404, detail="Verbindung nicht gefunden") link["status"] = "accepted" link["updated_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) store = _load_practice_links_store() for i, x in enumerate(store.get("links") or []): if isinstance(x, dict) and x.get("id") == link_id: store["links"][i] = link break _save_practice_links_store(store) return JSONResponse(content={ "success": True, "external_practice": _serialize_practice_link_for(pid, link), }) @router.post("/external-practices/{link_id}/reject") async def empfang_external_practices_reject(link_id: str, request: Request): """Lehnt eine externe Verbindung ab. Sichtbar fuer beide Seiten als 'rejected'. Kein Datenverlust auf der Gegenseite.""" s = _require_session(request) pid = (s.get("practice_id") or "").strip() if not pid: raise HTTPException(status_code=400, detail="Session unvollstaendig") link = _find_practice_link(link_id, pid) if not link or _link_contact_type(link) != _CONTACT_TYPE_PRACTICE: raise HTTPException(status_code=404, detail="Verbindung nicht gefunden") link["status"] = "rejected" link["updated_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) store = _load_practice_links_store() for i, x in enumerate(store.get("links") or []): if isinstance(x, dict) and x.get("id") == link_id: store["links"][i] = link break _save_practice_links_store(store) return JSONResponse(content={ "success": True, "external_practice": _serialize_practice_link_for(pid, link), }) @router.delete("/external-practices/{link_id}") async def empfang_external_practices_remove(link_id: str, request: Request): """Entfernt die externe Verbindung aus der eigenen Sicht (status=removed). Die Gegenseite bleibt informiert (sie sieht den Link weiterhin, aber mit status=removed -- so kann sie ggf. erneut anfragen ohne Spam). """ s = _require_session(request) pid = (s.get("practice_id") or "").strip() if not pid: raise HTTPException(status_code=400, detail="Session unvollstaendig") link = _find_practice_link(link_id, pid) if not link: return JSONResponse(content={"success": True, "removed": 0}) if _link_contact_type(link) != _CONTACT_TYPE_PRACTICE: raise HTTPException( status_code=400, detail="Persoenliche externe Kontakte bitte ueber /external-contacts entfernen.", ) link["status"] = "removed" link["updated_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) store = _load_practice_links_store() for i, x in enumerate(store.get("links") or []): if isinstance(x, dict) and x.get("id") == link_id: store["links"][i] = link break _save_practice_links_store(store) return JSONResponse(content={"success": True, "removed": 1}) # ===================================================================== # Persoenliche Benutzer-Notizen (pro practice_id + user_id) # ===================================================================== # # Strikte Trennung von Aufgaben, Briefen, Chatnachrichten und externen # Kontakten: # - Notizen sind PERSOENLICH des angemeldeten Benutzers. # - Speicherung in data/empfang_user_notes.json: # {"notes":[{ # "id":"note_...","practice_id":"prac_...","user_id":"u_...", # "title":"...optional","body":"...","pinned":false, # "client_id":"...optional","source":"manual", # "status":"active"|"deleted", # "created_at":"...Z","updated_at":"...Z","deleted_at":"...Z"|"" # }]} # - Endpunkte verwenden ausschliesslich practice_id + user_id aus # der serverseitigen Session. Body/Header-Felder zur User- oder # Praxis-Identitaet werden ignoriert. # - Soft-Delete: status=deleted, deleted_at gesetzt; GET filtert # gelosechte Notizen aus. # - Audit-Log: nur id und Anzahl, niemals Inhalt der Notiz. _USER_NOTES_FILE = _DATA_DIR / "empfang_user_notes.json" _NOTES_MAX_TITLE = 200 _NOTES_MAX_BODY = 8000 _NOTES_MAX_PER_USER = 500 def _load_notes_store() -> dict: data = _load_json(_USER_NOTES_FILE, {"notes": []}) if not isinstance(data, dict): return {"notes": []} if not isinstance(data.get("notes"), list): data["notes"] = [] return data def _save_notes_store(data: dict) -> None: _save_json(_USER_NOTES_FILE, data) def _generate_note_id() -> str: return f"note_{int(time.time() * 1000)}_{secrets.token_hex(4)}" def _public_note(note: dict) -> dict: return { "id": str(note.get("id") or ""), "title": str(note.get("title") or ""), "body": str(note.get("body") or ""), "pinned": bool(note.get("pinned")), "source": str(note.get("source") or "manual"), "created_at": str(note.get("created_at") or ""), "updated_at": str(note.get("updated_at") or ""), } def _user_notes(pid: str, uid: str) -> list: """Aktive Notizen eines Benutzers in seiner Praxis.""" store = _load_notes_store() out = [] for n in store.get("notes") or []: if not isinstance(n, dict): continue if (n.get("status") or "active") == "deleted": continue if n.get("practice_id") != pid: continue if n.get("user_id") != uid: continue out.append(n) return out @router.get("/notes") async def empfang_notes_list(request: Request): """Liste der eigenen aktiven Notizen, sortiert: gepinnte zuerst, dann nach updated_at desc (Fallback created_at).""" s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") notes = _user_notes(pid, uid) notes.sort( key=lambda n: ( 0 if n.get("pinned") else 1, str(n.get("updated_at") or n.get("created_at") or ""), ), reverse=False, ) # `reverse=False` mit Tupel (pinned-Schluessel, ts) waere falsch sortiert # bei der Zeit-Komponente; daher explizit nachsortieren: notes.sort(key=lambda n: str(n.get("updated_at") or n.get("created_at") or ""), reverse=True) notes.sort(key=lambda n: 0 if n.get("pinned") else 1) return JSONResponse(content={ "success": True, "notes": [_public_note(n) for n in notes], }) @router.post("/notes") async def empfang_notes_create(request: Request): """Erstellt eine neue Notiz fuer (eigene practice_id, eigene user_id). Body: {"title": "...", "body": "...", "client_id": "...", "pinned": false} `practice_id` und `user_id` werden aus der Session entnommen. """ s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") try: body = await request.json() except Exception: body = {} if not isinstance(body, dict): body = {} title = _clip_text(body.get("title", ""), _NOTES_MAX_TITLE).strip() content = _clip_text(body.get("body", ""), _NOTES_MAX_BODY) if not content.strip() and not title: raise HTTPException(status_code=400, detail="body oder title erforderlich") client_id = str(body.get("client_id") or "").strip() if client_id and not client_id.startswith("note_"): client_id = "" pinned = bool(body.get("pinned")) store = _load_notes_store() existing_for_user = [ n for n in store.get("notes") or [] if isinstance(n, dict) and n.get("practice_id") == pid and n.get("user_id") == uid and (n.get("status") or "active") == "active" ] if len(existing_for_user) >= _NOTES_MAX_PER_USER: raise HTTPException( status_code=400, detail=f"Maximal {_NOTES_MAX_PER_USER} Notizen pro Benutzer.", ) if client_id and any( isinstance(n, dict) and n.get("id") == client_id for n in store.get("notes") or [] ): client_id = "" note_id = client_id or _generate_note_id() now = _now_z() note = { "id": note_id, "practice_id": pid, "user_id": uid, "title": title, "body": content, "pinned": pinned, "source": "manual", "status": "active", "created_at": now, "updated_at": now, "deleted_at": "", } store.setdefault("notes", []).append(note) _save_notes_store(store) _log.info( "AZA_EMPFANG_NOTE_CREATED practice=%s user=%s note=%s", (pid or "")[:16], (uid or "")[:16], (note_id or "")[:18], ) return JSONResponse(content={"success": True, "note": _public_note(note)}) @router.patch("/notes/{note_id}") async def empfang_notes_update(note_id: str, request: Request): """Aktualisiert title/body/pinned einer eigenen Notiz.""" s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") try: body = await request.json() except Exception: body = {} if not isinstance(body, dict): body = {} store = _load_notes_store() target = None target_idx = -1 for i, n in enumerate(store.get("notes") or []): if not isinstance(n, dict): continue if n.get("id") != note_id: continue if n.get("practice_id") != pid or n.get("user_id") != uid: continue if (n.get("status") or "active") == "deleted": continue target = n target_idx = i break if target is None: raise HTTPException(status_code=404, detail="Notiz nicht gefunden") changed = False if "title" in body: target["title"] = _clip_text(body.get("title", ""), _NOTES_MAX_TITLE).strip() changed = True if "body" in body: content = _clip_text(body.get("body", ""), _NOTES_MAX_BODY) target["body"] = content changed = True if "pinned" in body: target["pinned"] = bool(body.get("pinned")) changed = True if changed: target["updated_at"] = _now_z() store["notes"][target_idx] = target _save_notes_store(store) _log.info( "AZA_EMPFANG_NOTE_UPDATED practice=%s user=%s note=%s", (pid or "")[:16], (uid or "")[:16], (note_id or "")[:18], ) return JSONResponse(content={"success": True, "note": _public_note(target)}) @router.delete("/notes/{note_id}") async def empfang_notes_delete(note_id: str, request: Request): """Soft-Delete einer eigenen Notiz.""" s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") store = _load_notes_store() found = False for i, n in enumerate(store.get("notes") or []): if not isinstance(n, dict): continue if n.get("id") != note_id: continue if n.get("practice_id") != pid or n.get("user_id") != uid: continue if (n.get("status") or "active") == "deleted": return JSONResponse(content={"success": True, "removed": 0}) n["status"] = "deleted" n["deleted_at"] = _now_z() n["updated_at"] = n["deleted_at"] store["notes"][i] = n found = True break if not found: raise HTTPException(status_code=404, detail="Notiz nicht gefunden") _save_notes_store(store) _log.info( "AZA_EMPFANG_NOTE_DELETED practice=%s user=%s note=%s", (pid or "")[:16], (uid or "")[:16], (note_id or "")[:18], ) return JSONResponse(content={"success": True, "removed": 1}) # ===================================================================== # Externe Kontakte (Praxis-Verbindungen UND einzelne externe Personen) # ===================================================================== # # Aufbauend auf dem bestehenden practice-links-Store werden Eintraege um # das Feld ``contact_type`` ergaenzt: # - "external_practice": Praxis-zu-Praxis-Verbindung # (Code = beidseitige Genehmigung -> accepted) # - "external_person" (Alias API: personal_external_contact): # Persoenlicher 1:1-Kontakt Praxisgrenzen ueber # genau zwei Benutzer. Keine Praxisliste der # Gegenseite; keine automatischen Neukontakte. # # Bestehende Eintraege ohne contact_type werden weiterhin als # external_practice interpretiert (Backward-Compat). # # Sicherheitsmodell (external_person): # - Person bleibt Benutzer ihrer Quellpraxis — kein locals.json-Eintrag # in der Zielpraxis, keine Rollenuebernahme. # - Sichtbarkeit: nur source_user_id und target_user_id (Legacy ohne # target_user_id: siehe _person_link_visible_to_user). # - Annahme/Ablehnung: Zielbenutzer (target_user_id), nicht „alle Admins“. # Legacy-Anfragen ohne target_user_id: weiterhin Admin der Zielpraxis. # - Entfernen: die beiden Parteien (bzw. Legacy-Admin); Praxisbeitritt # entsteht dadurch nicht. _CONTACT_TYPE_PRACTICE = "external_practice" _CONTACT_TYPE_PERSON = "external_person" # API-Alias (Produktsprache); wird beim Lesen normalisiert, intern weiterhin # ``external_person`` als JSON-Wert gespeichert. _CONTACT_TYPE_PERSON_ALIAS = "personal_external_contact" _VALID_CONTACT_TYPES = { _CONTACT_TYPE_PRACTICE, _CONTACT_TYPE_PERSON, _CONTACT_TYPE_PERSON_ALIAS, } _VALID_LINK_STATUS = { "pending_outgoing", "pending_incoming", "accepted", "rejected", "blocked", "removed", } def _link_contact_type(link: dict) -> str: """Liest contact_type robust; Defaults auf external_practice.""" if not isinstance(link, dict): return _CONTACT_TYPE_PRACTICE t = (link.get("contact_type") or "").strip() if t == _CONTACT_TYPE_PERSON_ALIAS: return _CONTACT_TYPE_PERSON if t in (_CONTACT_TYPE_PRACTICE, _CONTACT_TYPE_PERSON): return t return _CONTACT_TYPE_PRACTICE def _effective_person_target_user_id(link: dict) -> str: """Zielbenutzer einer persoenlichen externen Verbindung (1:1). Neu: explizites ``target_user_id``. Legacy: bei ``accepted`` ohne Ziel-ID wird ``approved_by_user_id`` als Gegenpart angenommen (Admin hat frueher angenommen). """ if not isinstance(link, dict): return "" tu = (link.get("target_user_id") or "").strip() if tu: return tu st = str(link.get("status") or "").strip().lower() if st == "accepted": return ( (link.get("approved_by_user_id") or link.get("accepted_by_user_id") or "") .strip() ) return "" def _person_link_visible_to_user( link: dict, pid: str, uid: str, session_is_admin: bool, ) -> bool: """Sichtbarkeit strikt 1:1; Admins sehen keine neuen Personenanfragen, ausser Legacy.""" if _link_contact_type(link) != _CONTACT_TYPE_PERSON: return True pid = (pid or "").strip() uid = (uid or "").strip() sp = (link.get("source_practice_id") or "").strip() tp = (link.get("target_practice_id") or "").strip() su = (link.get("source_user_id") or "").strip() if pid == sp and su and uid == su: return True if pid != tp: return False tu_eff = (link.get("target_user_id") or "").strip() st = str(link.get("status") or "").strip().lower() if tu_eff: return uid == tu_eff # Legacy: keine explizite Zielperson — eingehend nur fuer Admins der Zielpraxis if st in ("pending_outgoing", "pending_incoming", "pending"): return bool(session_is_admin) if st == "accepted": ap = (link.get("approved_by_user_id") or "").strip() return bool(ap and uid == ap) return False def _practice_links_visible_for_user( pid: str, uid: str, session_is_admin: bool, ) -> list: """Wie _practice_links_visible_to, aber Persoenliche Kontakte nur fuer die zwei Parteien.""" pid = (pid or "").strip() uid = (uid or "").strip() store = _load_practice_links_store() out: list = [] for link in store.get("links") or []: if not isinstance(link, dict): continue if (link.get("status") or "") == "removed": continue if link.get("source_practice_id") != pid and link.get("target_practice_id") != pid: continue if _link_contact_type(link) == _CONTACT_TYPE_PERSON: if not _person_link_visible_to_user(link, pid, uid, session_is_admin): continue out.append(link) return out def _find_practice_link_for_user( link_id: str, pid: str, uid: str, session_is_admin: bool, ) -> Optional[dict]: link = _find_practice_link(link_id, pid) if not link: return None if _link_contact_type(link) == _CONTACT_TYPE_PERSON: if not _person_link_visible_to_user(link, pid, uid, session_is_admin): return None return link def _person_link_may_remove( link: dict, pid: str, uid: str, session_is_admin: bool, ) -> bool: if _link_contact_type(link) != _CONTACT_TYPE_PERSON: return True sp = (link.get("source_practice_id") or "").strip() tp = (link.get("target_practice_id") or "").strip() su = (link.get("source_user_id") or "").strip() tu = (link.get("target_user_id") or "").strip() if pid == sp and su and uid == su: return True if pid == tp and tu and uid == tu: return True if pid == tp and (not tu) and session_is_admin: return True ap = (link.get("approved_by_user_id") or "").strip() if pid == tp and (not tu) and ap and uid == ap: return True return False def _person_may_moderate_incoming(link: dict, uid: str, session_is_admin: bool) -> bool: """accept/reject/block: Zielbenutzer oder Legacy-Admin.""" if _link_contact_type(link) != _CONTACT_TYPE_PERSON: return session_is_admin uid = (uid or "").strip() if link.get("target_practice_id") and (link.get("source_practice_id") == link.get("target_practice_id")): return False tu = (link.get("target_user_id") or "").strip() if tu: return uid == tu return session_is_admin def _serialize_external_contact_for(pid: str, uid: str, link: dict) -> dict: """Generischer Serializer fuer beide Kontaktarten. Dreht 'direction' aus pid-Sicht (outgoing wenn source==pid, sonst incoming), und liefert je nach contact_type passende 'peer_*'-Felder: - external_practice: peer = Gegenpraxis - external_person: bei pid==target: peer_user_display = Anzeige der Person; bei pid==source: peer_user_display ist entweder leer (vor accept) oder ein evtl. spaeter eingetragener Zielname; peer_practice_name immer die Gegenpraxis. """ is_outgoing = link.get("source_practice_id") == pid ctype = _link_contact_type(link) peer_pid = link.get("target_practice_id") if is_outgoing else link.get("source_practice_id") peer_pname = link.get("target_practice_name") if is_outgoing else link.get("source_practice_name") own_pname = link.get("source_practice_name") if is_outgoing else link.get("target_practice_name") peer_user_display = "" if ctype == _CONTACT_TYPE_PERSON: if is_outgoing: peer_user_display = str(link.get("target_display_name") or "").strip() if not peer_user_display: tuid = (link.get("target_user_id") or "").strip() if tuid: acc_t = (_load_accounts().get(tuid) or {}) if isinstance(acc_t, dict) and (acc_t.get("practice_id") or "").strip() == str( link.get("target_practice_id") or "", ).strip(): peer_user_display = str(acc_t.get("display_name") or "").strip() else: peer_user_display = str(link.get("source_display_name") or "").strip() out = { "id": str(link.get("id") or ""), "contact_type": ctype, "status": str(link.get("status") or ""), "direction": "outgoing" if is_outgoing else "incoming", "peer_practice_id": str(peer_pid or ""), "peer_practice_name": str(peer_pname or ""), "peer_user_display": peer_user_display, "own_practice_name": str(own_pname or ""), "note": str(link.get("note") or "")[:500], "created_at": str(link.get("created_at") or ""), "updated_at": str(link.get("updated_at") or ""), "requested_by_user_id": str(link.get("requested_by_user_id") or link.get("created_by_user_id") or ""), "approved_by_user_id": str(link.get("approved_by_user_id") or ""), "accepted_by_user_id": str(link.get("accepted_by_user_id") or link.get("approved_by_user_id") or ""), "target_user_id": str(link.get("target_user_id") or ""), "last_message_at": str(link.get("last_message_at") or ""), } # Nur bei akzeptierter Personen-Verbindung: Gegen-user_id fuer 1:1-Chat. if ( ctype == _CONTACT_TYPE_PERSON and str(link.get("status") or "").strip().lower() == "accepted" ): sp = (link.get("source_practice_id") or "").strip() tp = (link.get("target_practice_id") or "").strip() su = (link.get("source_user_id") or "").strip() tu_eff = _effective_person_target_user_id(link) if pid == tp and su: out["peer_user_id"] = su elif pid == sp and tu_eff: out["peer_user_id"] = tu_eff else: out["peer_user_id"] = "" return out def _is_admin_session(s: dict) -> bool: try: uid = (s.get("user_id") or "").strip() if not uid: return False accounts = _load_accounts() acc = accounts.get(uid) return _account_has_practice_admin_privileges(acc) except Exception: return False def _now_z() -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) def _persist_link_update(link_id: str, updated: dict) -> None: store = _load_practice_links_store() for i, x in enumerate(store.get("links") or []): if isinstance(x, dict) and x.get("id") == link_id: store["links"][i] = updated break else: store.setdefault("links", []).append(updated) _save_practice_links_store(store) @router.get("/external-contacts") async def empfang_external_contacts_list( request: Request, contact_type: Optional[str] = Query(None), ): """Liste aller externen Kontakte (beide Typen) aus eigener Praxis-Sicht. Optional ``?contact_type=external_practice`` oder ``external_person``. Eintraege mit status=removed werden ausgeblendet. """ s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid: raise HTTPException(status_code=400, detail="Session unvollstaendig") is_adm = _is_admin_session(s) own_links = _practice_links_visible_for_user(pid, uid, is_adm) if contact_type: ct = contact_type.strip().lower() if ct == _CONTACT_TYPE_PERSON_ALIAS: ct = _CONTACT_TYPE_PERSON if ct in (_CONTACT_TYPE_PRACTICE, _CONTACT_TYPE_PERSON): own_links = [x for x in own_links if _link_contact_type(x) == ct] own_links.sort( key=lambda x: str(x.get("updated_at") or x.get("created_at") or ""), reverse=True, ) return JSONResponse(content={ "success": True, "external_contacts": [ _serialize_external_contact_for(pid, uid, x) for x in own_links ], }) def _external_peer_practice_user_rows(peer_pid: str) -> list[dict]: """Aktuelle Benutzer der Peer-Praxis aus der Account-DB (nicht als Snapshot am Link). Nur sendbare Konten (_account_is_sendable): keine deaktivierten/geloeschten. """ peer_pid = (peer_pid or "").strip() out: list[dict] = [] if not peer_pid: return out accounts = _load_accounts() for a in accounts.values(): if not isinstance(a, dict): continue if (a.get("practice_id") or "").strip() != peer_pid: continue if not _account_is_sendable(a): continue uid = str(a.get("user_id") or "").strip() if not uid: continue acc_st = str(a.get("status") or "active").strip().lower() out.append({ "user_id": uid, "display_name": str(a.get("display_name") or "").strip() or uid[:12], "role": str(a.get("role") or "").strip(), "status": acc_st or "active", "external_practice_id": peer_pid, }) out.sort(key=lambda x: ( (x.get("display_name") or "").strip().lower(), x.get("user_id") or "", )) return out @router.get("/external-contacts/{link_id}/peer-users") async def empfang_external_contact_peer_users(link_id: str, request: Request): """Minimaldaten der Gegenstelle fuer eine **accepted** Verbindung. - external_practice: **immer live** alle sendbaren Benutzer der Gegenpraxis (aktueller Stand aus der Account-DB; kein Snapshot am Link, kein Code neu). - external_person: auf **beiden** Seiten genau **ein** Benutzer (1:1), nie die gesamte Gegenpraxis-Liste. Keine E-Mail, keine Tokens. Keine Uebernahme von Rollen in die andere Praxis. """ s = _session_or_shell_identity(request) my_pid = (s.get("practice_id") or "").strip() my_uid = (s.get("user_id") or "").strip() if not my_pid or not my_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") is_adm = _is_admin_session(s) link = _find_practice_link_for_user(link_id, my_pid, my_uid, is_adm) if not link: raise HTTPException(status_code=404, detail="Verbindung nicht gefunden") st = str(link.get("status") or "").strip().lower() if st != "accepted": raise HTTPException( status_code=403, detail="Verbindung nicht freigegeben", ) ctype = _link_contact_type(link) peer_pid = "" out: list[dict] = [] if ctype == _CONTACT_TYPE_PRACTICE: is_outgoing = link.get("source_practice_id") == my_pid peer_pid = ( link.get("target_practice_id") if is_outgoing else link.get("source_practice_id") ) peer_pid = str(peer_pid or "").strip() if not peer_pid or peer_pid == my_pid: raise HTTPException(status_code=400, detail="Peer-Praxis ungueltig") out = _external_peer_practice_user_rows(peer_pid) elif ctype == _CONTACT_TYPE_PERSON: sp = (link.get("source_practice_id") or "").strip() tp = (link.get("target_practice_id") or "").strip() su = (link.get("source_user_id") or "").strip() tu_eff = _effective_person_target_user_id(link) if not sp or not tp or not su or not tu_eff: raise HTTPException(status_code=400, detail="Verbindung unvollstaendig") if my_pid == tp: peer_pid = sp acc = _account_record_for_practice(su, sp) if acc and _account_is_sendable(acc): acc_st = str(acc.get("status") or "active").strip().lower() out = [{ "user_id": su, "display_name": str(acc.get("display_name") or "").strip() or su[:12], "role": str(acc.get("role") or "").strip(), "status": acc_st or "active", "external_practice_id": sp, }] elif my_pid == sp: if my_uid != su: raise HTTPException(status_code=403, detail="Keine Berechtigung") peer_pid = tp acc = _account_record_for_practice(tu_eff, tp) if acc and _account_is_sendable(acc): acc_st = str(acc.get("status") or "active").strip().lower() out = [{ "user_id": tu_eff, "display_name": str(acc.get("display_name") or "").strip() or tu_eff[:12], "role": str(acc.get("role") or "").strip(), "status": acc_st or "active", "external_practice_id": tp, }] else: raise HTTPException(status_code=404, detail="Verbindung nicht gefunden") else: raise HTTPException(status_code=400, detail="Unbekannter Kontakttyp") return JSONResponse( content={ "success": True, "link_id": link_id, "external_practice_id": peer_pid, "users": out, }, headers={"Cache-Control": "no-store, no-cache, must-revalidate"}, ) # --------------------------------------------------------------------- # Externe DM: Lesestatus (serverseitig, mehrere Geraete) # --------------------------------------------------------------------- def _load_external_dm_reads_store() -> dict: data = _load_json(_EXTERNAL_DM_READS_FILE, {"by_user": {}}) if not isinstance(data.get("by_user"), dict): data["by_user"] = {} return data def _save_external_dm_reads_store(data: dict) -> None: _save_json(_EXTERNAL_DM_READS_FILE, data) def _external_dm_reads_actor_key(pid: str, uid: str) -> str: return f"{(pid or '').strip()}|{(uid or '').strip()}" def _external_dm_get_last_read_iso(pid: str, uid: str, conv_id: str) -> str: store = _load_external_dm_reads_store() d = (store.get("by_user") or {}).get(_external_dm_reads_actor_key(pid, uid)) or {} return str(d.get((conv_id or "").strip()) or "").strip() def _external_dm_set_last_read_iso(pid: str, uid: str, conv_id: str, iso: str) -> None: if not pid or not uid or not conv_id or not iso: return store = _load_external_dm_reads_store() bucket = store.setdefault("by_user", {}) key = _external_dm_reads_actor_key(pid, uid) inner = bucket.setdefault(key, {}) inner[conv_id] = iso bucket[key] = inner store["by_user"] = bucket _save_external_dm_reads_store(store) def _external_dm_unread_aggregate(my_pid: str, my_uid: str) -> tuple[int, list[dict]]: grouped: dict[str, list[dict]] = defaultdict(list) for m in _load_external_dm_messages(): if not isinstance(m, dict) or str(m.get("conv_type") or "") != "external_dm": continue cid = str(m.get("conversation_id") or "").strip() if not cid: continue sp = str(m.get("sender_practice_id") or "").strip() su = str(m.get("sender_user_id") or "").strip() rp = str(m.get("recipient_practice_id") or "").strip() ru = str(m.get("recipient_user_id") or "").strip() if (sp == my_pid and su == my_uid) or (rp == my_pid and ru == my_uid): pass else: continue peer_p = rp if sp == my_pid and su == my_uid else sp peer_u = ru if sp == my_pid and su == my_uid else su try: _external_dm_authorize_pair(my_pid, my_uid, peer_p, peer_u) except HTTPException: continue grouped[cid].append(m) out: list[dict] = [] total = 0 for cid, msgs in grouped.items(): if not msgs: continue m0 = msgs[0] sp = str(m0.get("sender_practice_id") or "").strip() su = str(m0.get("sender_user_id") or "").strip() rp = str(m0.get("recipient_practice_id") or "").strip() ru = str(m0.get("recipient_user_id") or "").strip() if rp == my_pid and ru == my_uid: peer_p, peer_u = sp, su else: peer_p, peer_u = rp, ru lr_iso = _external_dm_get_last_read_iso(my_pid, my_uid, cid) lr_ts = _parse_msg_instant_utc_ts(lr_iso) last_ts = 0.0 last_iso = "" unread = 0 peer_dn = "" for m in msgs: ts_raw = str(m.get("empfangen") or m.get("zeitstempel") or "").strip() ts_val = _parse_msg_instant_utc_ts(ts_raw) if ts_val >= last_ts: last_ts = ts_val last_iso = ts_raw rp_m = str(m.get("recipient_practice_id") or "").strip() ru_m = str(m.get("recipient_user_id") or "").strip() sp_m = str(m.get("sender_practice_id") or "").strip() su_m = str(m.get("sender_user_id") or "").strip() incoming = ( rp_m == my_pid and ru_m == my_uid and (sp_m != my_pid or su_m != my_uid) ) ex_m = m.get("extras") if isinstance(m.get("extras"), dict) else {} if incoming and ts_val > lr_ts and not bool(ex_m.get("chat_ack")): unread += 1 if not peer_dn: acc = _account_record_for_practice(peer_u, peer_p) or {} peer_dn = str(acc.get("display_name") or "").strip() if not peer_dn: for m in msgs: if str(m.get("sender_user_id") or "").strip() == peer_u: peer_dn = str(m.get("sender_display_name") or "").strip() break if str(m.get("recipient_user_id") or "").strip() == peer_u: peer_dn = str(m.get("recipient_display_name") or "").strip() break if unread > 0: total += unread out.append({ "conversation_id": cid, "peer_practice_id": peer_p, "peer_practice_name": _practice_name_safe(peer_p), "peer_user_id": peer_u, "peer_display_name": peer_dn or peer_u[:12], "last_message_at": last_iso, "unread_count": unread, }) out.sort(key=lambda x: _parse_msg_instant_utc_ts(str(x.get("last_message_at") or "")), reverse=True) return total, out # --------------------------------------------------------------------- # Chat-Anhaenge (Datei, Auth beim Abruf) # --------------------------------------------------------------------- def _ensure_attachments_dir() -> None: _ATTACHMENTS_DIR.mkdir(parents=True, exist_ok=True) def _load_attachment_meta_store() -> dict: data = _load_json(_ATTACHMENTS_META_FILE, {"attachments": {}}) if not isinstance(data.get("attachments"), dict): data["attachments"] = {} return data def _save_attachment_meta_store(data: dict) -> None: _save_json(_ATTACHMENTS_META_FILE, data) def _generate_attachment_id() -> str: return f"att_{uuid.uuid4().hex[:16]}" def _normalize_upload_mime(raw: str, filename: str) -> str: s = (raw or "").split(";", 1)[0].strip().lower() if s in _ATTACHMENT_ALLOWED_MIME: return s fn = (filename or "").lower() if fn.endswith(".png"): return "image/png" if fn.endswith(".jpg") or fn.endswith(".jpeg"): return "image/jpeg" if fn.endswith(".webp"): return "image/webp" if fn.endswith(".webm"): return "audio/webm" if fn.endswith(".ogg"): return "audio/ogg" if fn.endswith(".opus"): return "audio/opus" if fn.endswith(".mp3"): return "audio/mpeg" if fn.endswith(".m4a"): return "audio/mp4" if fn.endswith(".wav"): return "audio/wav" return "" def _safe_attachment_basename(name: str) -> str: base = Path(str(name or "")).name.strip() if not base or base in (".", ".."): return "bild" return base[:180] def _attachment_meta_get(att_id: str) -> Optional[dict]: store = _load_attachment_meta_store() rec = (store.get("attachments") or {}).get(att_id) return rec if isinstance(rec, dict) else None def _attachment_can_view(rec: dict, my_pid: str, my_uid: str) -> bool: if not rec or not my_pid or not my_uid: return False scope = str(rec.get("scope") or "") if scope == "internal_dm": if str(rec.get("practice_id") or "") != my_pid: return False if rec.get("status") != "committed": return str(rec.get("user_id") or "") == my_uid dck = str(rec.get("direct_conv_key") or "").strip() parts = dck.split("|") if len(parts) >= 4 and parts[1] == "direct": return my_uid in (parts[2], parts[3]) return True if scope == "external_dm": owner_pid = str(rec.get("practice_id") or "").strip() owner_uid = str(rec.get("user_id") or "").strip() peer_p = str(rec.get("peer_practice_id") or "").strip() peer_u = str(rec.get("peer_user_id") or "").strip() conv = str(rec.get("conversation_id") or "").strip() if not owner_pid or not owner_uid or not peer_p or not peer_u: return False is_owner = owner_pid == my_pid and owner_uid == my_uid is_peer_recipient = peer_p == my_pid and peer_u == my_uid if not is_owner and not is_peer_recipient: return False if rec.get("status") != "committed": return is_owner if not conv: return False try: if is_owner: _external_dm_authorize_pair(my_pid, my_uid, peer_p, peer_u) expect = _external_dm_conversation_id(my_pid, my_uid, peer_p, peer_u) else: _external_dm_authorize_pair(my_pid, my_uid, owner_pid, owner_uid) expect = _external_dm_conversation_id(my_pid, my_uid, owner_pid, owner_uid) except HTTPException: return False return conv == expect return False def _attachment_row_for_message(rec: dict) -> dict: return { "attachment_id": str(rec.get("id") or ""), "name": str(rec.get("original_name") or "bild"), "mime_type": str(rec.get("mime_type") or "application/octet-stream"), "size": int(rec.get("size") or 0), "kind": "stored", } def _finalize_attachment_ids_internal( ids: list[str], pid: str, sender_uid: str, recipient_uid: str, conv_key: str, ) -> list[dict]: out: list[dict] = [] if not ids: return out if len(ids) > _ATTACHMENT_MAX_PER_MESSAGE: raise HTTPException(status_code=400, detail="Zu viele Anhaenge") store = _load_attachment_meta_store() bucket = store.setdefault("attachments", {}) for raw_id in ids: aid = str(raw_id or "").strip() if not aid: continue rec = bucket.get(aid) if not isinstance(rec, dict): raise HTTPException(status_code=400, detail="Anhang nicht gefunden") if rec.get("status") != "pending": raise HTTPException(status_code=400, detail="Anhang bereits verwendet") if str(rec.get("scope") or "") != "internal_dm": raise HTTPException(status_code=400, detail="Anhang ungueltig") if str(rec.get("practice_id") or "") != pid or str(rec.get("user_id") or "") != sender_uid: raise HTTPException(status_code=403, detail="Anhang nicht erlaubt") rec["status"] = "committed" rec["direct_conv_key"] = conv_key rec["recipient_user_id"] = recipient_uid bucket[aid] = rec out.append(_attachment_row_for_message(rec)) store["attachments"] = bucket _save_attachment_meta_store(store) return out def _finalize_attachment_ids_external( ids: list[str], sender_pid: str, sender_uid: str, recipient_pid: str, recipient_uid: str, conv_id: str, ) -> list[dict]: out: list[dict] = [] if not ids: return out if len(ids) > _ATTACHMENT_MAX_PER_MESSAGE: raise HTTPException(status_code=400, detail="Zu viele Anhaenge") store = _load_attachment_meta_store() bucket = store.setdefault("attachments", {}) for raw_id in ids: aid = str(raw_id or "").strip() if not aid: continue rec = bucket.get(aid) if not isinstance(rec, dict): raise HTTPException(status_code=400, detail="Anhang nicht gefunden") if rec.get("status") != "pending": raise HTTPException(status_code=400, detail="Anhang bereits verwendet") if str(rec.get("scope") or "") != "external_dm": raise HTTPException(status_code=400, detail="Anhang ungueltig") if str(rec.get("practice_id") or "") != sender_pid or str(rec.get("user_id") or "") != sender_uid: raise HTTPException(status_code=403, detail="Anhang nicht erlaubt") if str(rec.get("peer_practice_id") or "") != recipient_pid \ or str(rec.get("peer_user_id") or "") != recipient_uid: raise HTTPException(status_code=400, detail="Anhang passt nicht zum Empfaenger") rec["status"] = "committed" rec["conversation_id"] = conv_id bucket[aid] = rec out.append(_attachment_row_for_message(rec)) store["attachments"] = bucket _save_attachment_meta_store(store) return out # ===================================================================== # Cross-Praxis Direktnachrichten (eigenes Speicherfile, eigene Endpunkte) # ===================================================================== class ExternalDmSendIn(BaseModel): recipient_practice_id: str = "" recipient_user_id: str = "" text: str = "" attachment_ids: list[str] = Field(default_factory=list) @router.post("/external-messages/send") async def empfang_external_dm_send(request: Request, payload: ExternalDmSendIn): """Sendet eine DM an einen Benutzer einer anderen Praxis (nur mit accepted Link). practice_id / sender_user_id ausschliesslich aus Session bzw. Shell-Identitaet. """ s = _session_or_shell_identity(request) sender_pid = (s.get("practice_id") or "").strip() sender_uid = (s.get("user_id") or "").strip() if not sender_pid or not sender_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") recipient_pid = (payload.recipient_practice_id or "").strip() recipient_uid = (payload.recipient_user_id or "").strip() raw_text = (payload.text or "").strip() text = _clip_text(raw_text, _EXTERNAL_DM_TEXT_MAX) att_ids = [str(x or "").strip() for x in (payload.attachment_ids or []) if str(x or "").strip()] link_id, _link = _external_dm_authorize_send_direction( sender_pid, sender_uid, recipient_pid, recipient_uid, ) if not text and not att_ids: raise HTTPException( status_code=400, detail="Nachrichtstext oder Anhang erforderlich", ) s_acc = _account_record_for_practice(sender_uid, sender_pid) or {} r_acc = _account_record_for_practice(recipient_uid, recipient_pid) or {} s_dn = str(s_acc.get("display_name") or "").strip() or sender_uid[:12] r_dn = str(r_acc.get("display_name") or "").strip() or recipient_uid[:12] conv_id = _external_dm_conversation_id( sender_pid, sender_uid, recipient_pid, recipient_uid, ) att_rows = _finalize_attachment_ids_external( att_ids, sender_pid, sender_uid, recipient_pid, recipient_uid, conv_id, ) msg_id = uuid.uuid4().hex[:12] now = _utc_now_iso_z() extras = { "audience": "direct", "conv_type": "external_dm", "external_dm": True, "conversation_id": conv_id, "external_link_id": link_id, "sender_user_id": sender_uid, "recipient_user_id": recipient_uid, "sender_practice_id": sender_pid, "recipient_practice_id": recipient_pid, } if att_rows: extras["attachments"] = att_rows row = { "id": msg_id, "conv_type": "external_dm", "conversation_id": conv_id, "external_link_id": link_id, "sender_practice_id": sender_pid, "sender_user_id": sender_uid, "sender_display_name": s_dn, "recipient_practice_id": recipient_pid, "recipient_user_id": recipient_uid, "recipient_display_name": r_dn, "kommentar": text or ("\u200b" if att_rows else ""), "zeitstempel": now, "empfangen": now, "status": "offen", "extras": extras, } msgs = _load_external_dm_messages() msgs.insert(0, row) _save_external_dm_messages(msgs) try: _pulse_bump(sender_pid, sender="external_dm") _pulse_bump(recipient_pid, sender="external_dm") except Exception: pass _log.info( "AZA_EXT_DM_SEND conv=%s from=%s/%s to=%s/%s", conv_id[:16], sender_pid[:12], (sender_uid or "")[:12], recipient_pid[:12], (recipient_uid or "")[:12], ) return JSONResponse(content={ "success": True, "message_id": msg_id, "conversation_id": conv_id, "created_at": now, }) @router.get("/external-messages/thread") async def empfang_external_dm_thread( request: Request, peer_practice_id: str = Query("", alias="peer_practice_id"), peer_user_id: str = Query("", alias="peer_user_id"), ): """Liefert Nachrichten einer externen 1:1-Konversation.""" s = _session_or_shell_identity(request) my_pid = (s.get("practice_id") or "").strip() my_uid = (s.get("user_id") or "").strip() if not my_pid or not my_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") peer_pid = (peer_practice_id or "").strip() peer_uid = (peer_user_id or "").strip() link_id, _link = _external_dm_authorize_pair(my_pid, my_uid, peer_pid, peer_uid) conv_id = _external_dm_conversation_id(my_pid, my_uid, peer_pid, peer_uid) stored = _load_external_dm_messages() thread_raw = [ m for m in stored if isinstance(m, dict) and str(m.get("conversation_id") or "") == conv_id ] thread_raw.sort(key=_msg_chrono_sort_key) out_msgs = [_external_dm_to_client_message(m) for m in thread_raw] pulse = _pulse_get(my_pid) return JSONResponse(content={ "success": True, "conversation_id": conv_id, "external_link_id": link_id, "messages": out_msgs, "tick": int(pulse.get("tick", 0)), }) @router.get("/external-messages/conversations") async def empfang_external_dm_conversations(request: Request): """Letzte externe 1:1-Konversationen des angemeldeten Benutzers (serverseitig).""" s = _session_or_shell_identity(request) my_pid = (s.get("practice_id") or "").strip() my_uid = (s.get("user_id") or "").strip() if not my_pid or not my_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") stored = _load_external_dm_messages() conv_last: dict[str, dict] = {} for m in stored: if not isinstance(m, dict): continue if str(m.get("conv_type") or "") != "external_dm": continue cid = str(m.get("conversation_id") or "").strip() if not cid: continue sp = str(m.get("sender_practice_id") or "").strip() su = str(m.get("sender_user_id") or "").strip() rp = str(m.get("recipient_practice_id") or "").strip() ru = str(m.get("recipient_user_id") or "").strip() if sp == my_pid and su == my_uid: peer_p, peer_u = rp, ru peer_dn = str(m.get("recipient_display_name") or "").strip() elif rp == my_pid and ru == my_uid: peer_p, peer_u = sp, su peer_dn = str(m.get("sender_display_name") or "").strip() else: continue try: _external_dm_authorize_pair(my_pid, my_uid, peer_p, peer_u) except HTTPException: continue ts = str(m.get("empfangen") or m.get("zeitstempel") or "").strip() prev = conv_last.get(cid) if prev and prev.get("last_message_at", "") >= ts: continue conv_last[cid] = { "conversation_id": cid, "kind": "external_dm", "peer_practice_id": peer_p, "peer_practice_name": _practice_name_safe(peer_p), "peer_user_id": peer_u, "peer_display_name": peer_dn or peer_u[:12], "external_link_id": str(m.get("external_link_id") or ""), "last_message_at": ts, } rows = sorted( conv_last.values(), key=lambda x: str(x.get("last_message_at") or ""), reverse=True, ) return JSONResponse(content={"success": True, "conversations": rows}) class ExternalDmMarkReadIn(BaseModel): peer_practice_id: str = "" peer_user_id: str = "" @router.get("/external-messages/unread-summary") async def empfang_external_dm_unread_summary(request: Request): """Ungelesene externe 1:1-Threads (serverseitiger Lesestatus).""" s = _session_or_shell_identity(request) my_pid = (s.get("practice_id") or "").strip() my_uid = (s.get("user_id") or "").strip() if not my_pid or not my_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") total, items = _external_dm_unread_aggregate(my_pid, my_uid) return JSONResponse( content={"success": True, "total_unread": total, "items": items}, headers={"Cache-Control": "no-store, no-cache, must-revalidate"}, ) @router.post("/external-messages/mark-read") async def empfang_external_dm_mark_read(request: Request, body: ExternalDmMarkReadIn): """Setzt Lesestatus fuer einen externen Thread auf die juengste bekannte Nachricht.""" s = _session_or_shell_identity(request) my_pid = (s.get("practice_id") or "").strip() my_uid = (s.get("user_id") or "").strip() if not my_pid or not my_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") peer_pid = (body.peer_practice_id or "").strip() peer_uid = (body.peer_user_id or "").strip() if not peer_pid or not peer_uid: raise HTTPException(status_code=400, detail="peer_practice_id und peer_user_id erforderlich") _external_dm_authorize_pair(my_pid, my_uid, peer_pid, peer_uid) conv_id = _external_dm_conversation_id(my_pid, my_uid, peer_pid, peer_uid) max_iso = _utc_now_iso_z() max_ts = _parse_msg_instant_utc_ts(max_iso) for m in _load_external_dm_messages(): if not isinstance(m, dict): continue if str(m.get("conversation_id") or "") != conv_id: continue ts_raw = str(m.get("empfangen") or m.get("zeitstempel") or "").strip() t = _parse_msg_instant_utc_ts(ts_raw) if t >= max_ts: max_ts = t max_iso = ts_raw _external_dm_set_last_read_iso(my_pid, my_uid, conv_id, max_iso) return JSONResponse(content={"success": True, "conversation_id": conv_id}) def _external_dm_reaction_actor_key(pid: str, uid: str) -> str: """Reaktionsschluessel fuer Praxis-zu-Praxis-DM (user_id allein reicht nicht).""" return f"{(pid or '').strip()}|{(uid or '').strip()}" @router.post("/external-messages/{msg_id}/chat-ack") async def empfang_external_dm_chat_ack(msg_id: str, request: Request): """OK/Quittierung fuer external_dm (``extras.chat_ack``), analog internem /messages/.../chat-ack.""" s = _session_or_shell_identity(request) my_pid = (s.get("practice_id") or "").strip() my_uid = (s.get("user_id") or "").strip() if not my_pid or not my_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") try: body = await request.json() except Exception: body = {} ack = bool(body.get("ack")) scope = str((body.get("scope") or "")).strip().lower() stored = _load_external_dm_messages() mid = (msg_id or "").strip() target = next( (m for m in stored if isinstance(m, dict) and str(m.get("id") or "").strip() == mid), None, ) if not target: raise HTTPException(status_code=404, detail="Nachricht nicht gefunden") if str(target.get("conv_type") or "") != "external_dm": raise HTTPException(status_code=400, detail="Keine externe Nachricht") sp = str(target.get("sender_practice_id") or "").strip() su = str(target.get("sender_user_id") or "").strip() rp = str(target.get("recipient_practice_id") or "").strip() ru = str(target.get("recipient_user_id") or "").strip() conv_id = str(target.get("conversation_id") or "").strip() if not conv_id or conv_id != _external_dm_conversation_id(my_pid, my_uid, sp, su): raise HTTPException(status_code=403, detail="Kein Zugriff") try: _external_dm_authorize_pair(my_pid, my_uid, sp, su) except HTTPException: raise HTTPException(status_code=403, detail="Kein Zugriff") def _is_incoming_to_session(m: dict) -> bool: if not isinstance(m, dict): return False return ( str(m.get("recipient_practice_id") or "").strip() == my_pid and str(m.get("recipient_user_id") or "").strip() == my_uid and ( str(m.get("sender_practice_id") or "").strip() != my_pid or str(m.get("sender_user_id") or "").strip() != my_uid ) ) if not _is_incoming_to_session(target): raise HTTPException(status_code=403, detail="Nur eingehende Nachrichten quittierbar") bulk_targets: list[dict] = [] if ack and scope == "thread_until_message": clk_chrono = _msg_chrono_sort_key(target) clk_sp = sp clk_su = su if conv_id and my_uid and rp == my_pid and ru == my_uid and sp and su and ( sp != my_pid or su != my_uid ): for m in stored: if not isinstance(m, dict): continue if str(m.get("conversation_id") or "").strip() != conv_id: continue if str(m.get("conv_type") or "") != "external_dm": continue ex_loop = m.get("extras") if isinstance(m.get("extras"), dict) else {} rpp = str(m.get("recipient_practice_id") or "").strip() ruu = str(m.get("recipient_user_id") or "").strip() spp = str(m.get("sender_practice_id") or "").strip() suu = str(m.get("sender_user_id") or "").strip() if rpp != my_pid or ruu != my_uid: continue if not spp or not suu or (spp == my_pid and suu == my_uid): continue if spp != clk_sp or suu != clk_su: continue if bool(ex_loop.get("chat_ack")): continue if _msg_chrono_sort_key(m) > clk_chrono: continue bulk_targets.append(m) affected = 0 target_id = str(target.get("id") or "").strip() if bulk_targets: for m in bulk_targets: ex_m = dict(m.get("extras") or {}) ex_m["chat_ack"] = True if str(m.get("id") or "").strip() == target_id: ex_m["chat_ack_visual"] = True else: ex_m.pop("chat_ack_visual", None) m["extras"] = ex_m affected += 1 ex_t = dict(target.get("extras") or {}) if not ex_t.get("chat_ack"): ex_t["chat_ack"] = True ex_t["chat_ack_visual"] = True target["extras"] = ex_t affected += 1 elif not ex_t.get("chat_ack_visual"): ex_t["chat_ack_visual"] = True target["extras"] = ex_t else: ex = dict(target.get("extras") or {}) if ack: if not ex.get("chat_ack"): ex["chat_ack"] = True affected = 1 ex["chat_ack_visual"] = True else: if ex.pop("chat_ack", None) is not None: affected = 1 ex.pop("chat_ack_visual", None) target["extras"] = ex _save_external_dm_messages(stored) if ack: try: max_iso = "" max_ts = -1.0 scan_list = list(bulk_targets) if bulk_targets else [target] for m in scan_list: ts_raw = str(m.get("empfangen") or m.get("zeitstempel") or "").strip() t = _parse_msg_instant_utc_ts(ts_raw) if t >= max_ts: max_ts = t max_iso = ts_raw if max_iso: prev_lr = _external_dm_get_last_read_iso(my_pid, my_uid, conv_id) if not prev_lr or _parse_msg_instant_utc_ts(max_iso) > _parse_msg_instant_utc_ts(prev_lr): _external_dm_set_last_read_iso(my_pid, my_uid, conv_id, max_iso) except Exception: pass try: _pulse_bump(sp, sender="external_dm") _pulse_bump(rp, sender="external_dm") except Exception: pass pid_short = (my_pid or "")[:16] mid_short = (msg_id or "")[:16] _log.info( "AZA_EXT_CHAT_ACK practice=%s msg=%s ack=%s scope=%s affected=%s", pid_short, mid_short, ack, scope or "single", affected, ) return JSONResponse(content={ "success": True, "ack": ack, "scope": scope or "single", "affected": affected, }) @router.post("/external-messages/{msg_id}/reaction") async def empfang_external_dm_reaction(msg_id: str, request: Request): """Emoji-Reaktion zu external_dm (Schluessel practice_id|user_id in user_reactions).""" s = _session_or_shell_identity(request) my_pid = (s.get("practice_id") or "").strip() my_uid = (s.get("user_id") or "").strip() if not my_pid or not my_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") try: body = await request.json() except Exception: body = {} raw_emoji = str((body.get("emoji") if isinstance(body, dict) else "") or "") emoji = raw_emoji.strip() if emoji and emoji not in _ALLOWED_REACTION_EMOJIS: if emoji.rstrip("\uFE0F") in {e.rstrip("\uFE0F") for e in _ALLOWED_REACTION_EMOJIS}: emoji = "\u2764\uFE0F" if emoji.startswith("\u2764") else emoji else: raise HTTPException(status_code=400, detail="Emoji nicht unterstuetzt") stored = _load_external_dm_messages() mid = (msg_id or "").strip() target = next( (m for m in stored if isinstance(m, dict) and str(m.get("id") or "").strip() == mid), None, ) if not target: raise HTTPException(status_code=404, detail="Nachricht nicht gefunden") if str(target.get("conv_type") or "") != "external_dm": raise HTTPException(status_code=400, detail="Keine externe Nachricht") sp = str(target.get("sender_practice_id") or "").strip() su = str(target.get("sender_user_id") or "").strip() rp = str(target.get("recipient_practice_id") or "").strip() ru = str(target.get("recipient_user_id") or "").strip() conv_id = str(target.get("conversation_id") or "").strip() if not conv_id or conv_id != _external_dm_conversation_id(my_pid, my_uid, sp, su): raise HTTPException(status_code=403, detail="Kein Zugriff") try: _external_dm_authorize_pair(my_pid, my_uid, sp, su) except HTTPException: raise HTTPException(status_code=403, detail="Kein Zugriff") if not ( (sp == my_pid and su == my_uid) or (rp == my_pid and ru == my_uid) ): raise HTTPException(status_code=403, detail="Kein Zugriff") ex = dict(target.get("extras") or {}) rkey = _external_dm_reaction_actor_key(my_pid, my_uid) reactions = dict(ex.get("user_reactions") or {}) if emoji: reactions[rkey] = emoji else: reactions.pop(rkey, None) if reactions: ex["user_reactions"] = reactions else: ex.pop("user_reactions", None) target["extras"] = ex _save_external_dm_messages(stored) try: _pulse_bump(sp, sender="external_dm") _pulse_bump(rp, sender="external_dm") except Exception: pass _log.info( "AZA_EXT_CHAT_REACTION practice=%s msg=%s set=%s", (my_pid or "")[:16], (msg_id or "")[:16], bool(emoji), ) return JSONResponse(content={ "success": True, "emoji": emoji, "user_reactions": reactions, }) @router.post("/attachments/upload") async def empfang_attachment_upload( request: Request, file: UploadFile = File(...), scope: str = Form("internal_dm"), peer_practice_id: str = Form(""), peer_user_id: str = Form(""), ): """Laedt einen Bild-Anhang hoch (pending bis zur ersten zugehoerigen Nachricht).""" s = _session_or_shell_identity(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=401, detail="Nicht angemeldet") sc = (scope or "").strip().lower() if sc not in ("internal_dm", "external_dm"): raise HTTPException(status_code=400, detail="Ungueltiger scope") mime = _normalize_upload_mime(file.content_type or "", file.filename or "") if not mime: raise HTTPException(status_code=400, detail="Dateityp nicht erlaubt") peer_p = (peer_practice_id or "").strip() peer_u = (peer_user_id or "").strip() if sc == "external_dm": _external_dm_authorize_send_direction(pid, uid, peer_p, peer_u) _ensure_attachments_dir() aid = _generate_attachment_id() dest = _ATTACHMENTS_DIR / f"{aid}.bin" safe_name = _safe_attachment_basename(file.filename) sz = 0 try: with open(dest, "wb") as outf: while True: chunk = await file.read(1024 * 1024) if not chunk: break sz += len(chunk) if sz > _ATTACHMENT_MAX_BYTES: raise HTTPException(status_code=400, detail="Datei zu gross") outf.write(chunk) except HTTPException: try: dest.unlink(missing_ok=True) except Exception: pass raise except Exception: try: dest.unlink(missing_ok=True) except Exception: pass raise HTTPException(status_code=500, detail="Speichern fehlgeschlagen") now = _utc_now_iso_z() store = _load_attachment_meta_store() bucket = store.setdefault("attachments", {}) rec: dict = { "id": aid, "scope": sc, "practice_id": pid, "user_id": uid, "status": "pending", "original_name": safe_name, "mime_type": mime, "size": sz, "created_at": now, } if sc == "external_dm": rec["peer_practice_id"] = peer_p rec["peer_user_id"] = peer_u bucket[aid] = rec store["attachments"] = bucket _save_attachment_meta_store(store) return JSONResponse( content={ "success": True, "attachment_id": aid, "mime_type": mime, "size": sz, "name": safe_name, }, headers={"Cache-Control": "no-store"}, ) @router.get("/attachments/{attachment_id}/file") async def empfang_attachment_get_file(attachment_id: str, request: Request): """Liefert Dateiinhalt nur mit passender Session/Berechtigung.""" s = _session_or_shell_identity(request) my_pid = (s.get("practice_id") or "").strip() my_uid = (s.get("user_id") or "").strip() if not my_pid or not my_uid: raise HTTPException(status_code=401, detail="Nicht angemeldet") aid = (attachment_id or "").strip() rec = _attachment_meta_get(aid) if not rec: raise HTTPException(status_code=404, detail="Nicht gefunden") if not _attachment_can_view(rec, my_pid, my_uid): raise HTTPException(status_code=403, detail="Kein Zugriff") path = _ATTACHMENTS_DIR / f"{aid}.bin" if not path.is_file(): raise HTTPException(status_code=404, detail="Nicht gefunden") return FileResponse( path, media_type=str(rec.get("mime_type") or "application/octet-stream"), filename=str(rec.get("original_name") or "bild"), headers={"Cache-Control": "private, no-store"}, ) @router.post("/external-contacts/request-practice") async def empfang_external_contacts_request_practice(request: Request): """Praxis-zu-Praxis-Verbindung via CHAT-Code (Code = beidseitige Genehmigung). Body: ``{"code": "CHAT-...-...", "note": "...optional..."}`` Wird intern auf den bestehenden practice-link-Pfad gemappt (contact_type=external_practice, status=accepted). """ s = _session_or_shell_identity(request) source_pid = (s.get("practice_id") or "").strip() source_uid = (s.get("user_id") or "").strip() if not source_pid or not source_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") try: body = await request.json() except Exception: body = {} if not isinstance(body, dict): body = {} raw_code = (body.get("code") or body.get("invite_code") or "").strip() note = (body.get("note") or "")[:500] if not raw_code: raise HTTPException(status_code=400, detail="code erforderlich") target_pid = _lookup_practice_id_by_invite(raw_code) if not target_pid: raise HTTPException(status_code=404, detail="Code ist keiner Praxis zugeordnet oder bereits abgelaufen.") if target_pid == source_pid: raise HTTPException(status_code=400, detail="Sie koennen Ihre eigene Praxis nicht als externen Kontakt hinzufuegen.") practices = _load_practices() src_name = str((practices.get(source_pid) or {}).get("name") or "").strip() tgt_name = str((practices.get(target_pid) or {}).get("name") or "").strip() store = _load_practice_links_store() existing = None for link in store.get("links") or []: if not isinstance(link, dict): continue a = link.get("source_practice_id"); b = link.get("target_practice_id") if {a, b} == {source_pid, target_pid} and _link_contact_type(link) == _CONTACT_TYPE_PRACTICE: existing = link break now = _now_z() if existing is not None: existing["contact_type"] = _CONTACT_TYPE_PRACTICE existing["status"] = "accepted" existing["updated_at"] = now if not existing.get("invite_code_used"): existing["invite_code_used"] = raw_code if note and not existing.get("note"): existing["note"] = note if existing.get("source_practice_id") == source_pid: existing["source_practice_name"] = src_name existing["target_practice_name"] = tgt_name else: existing["source_practice_name"] = tgt_name existing["target_practice_name"] = src_name _persist_link_update(existing["id"], existing) out = existing else: out = { "id": _generate_practice_link_id(), "contact_type": _CONTACT_TYPE_PRACTICE, "source_practice_id": source_pid, "target_practice_id": target_pid, "source_practice_name": src_name, "target_practice_name": tgt_name, "status": "accepted", "requested_by_user_id": source_uid, "created_by_user_id": source_uid, "created_at": now, "updated_at": now, "invite_code_used": raw_code, "note": note, } store.setdefault("links", []).append(out) _save_practice_links_store(store) _log.info( "AZA_EMPFANG_EXTCONT_PRACTICE source=%s target=%s status=%s", (source_pid or "")[:16], (target_pid or "")[:16], out["status"], ) return JSONResponse(content={ "success": True, "external_contact": _serialize_external_contact_for(source_pid, source_uid, out), }) @router.post("/external-contacts/request-person") async def empfang_external_contacts_request_person(request: Request): """Persoenliche 1:1-externe Kontaktanfrage an einen konkreten Benutzer. Body: ``{"code": "CHAT-...", "target_user_id": "...", "note": "..."}`` - code: CHAT-Code der **Zielpraxis** (nur Mandanten-Schutz, keine Praxisverknuepfung). - target_user_id: verpflichtend — Benutzer-ID des Empfaengers in der Zielpraxis (spaeter: persoenlicher Einladungscode-Token). - note: optional. ``contact_type=external_person``, status ``pending_outgoing`` bis der Zielbenutzer annimmt. """ s = _session_or_shell_identity(request) source_pid = (s.get("practice_id") or "").strip() source_uid = (s.get("user_id") or "").strip() source_dn = (s.get("display_name") or "").strip() if not source_pid or not source_uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") try: body = await request.json() except Exception: body = {} if not isinstance(body, dict): body = {} raw_code = (body.get("code") or body.get("invite_code") or "").strip() note = (body.get("note") or "")[:500] target_user_id = ( body.get("target_user_id") or body.get("target_user") or "" ).strip() if not raw_code: raise HTTPException(status_code=400, detail="code erforderlich") if not target_user_id: raise HTTPException( status_code=400, detail=( "target_user_id erforderlich: persoenliche Kontakte sind nur " "personenbezogen. Ohne Ziel-Benutzer-ID bitte " "«Externe Praxis verbinden» nutzen oder einen persoenlichen " "Einladungslink (Folgeausbaustufe)." ), ) target_pid = _lookup_practice_id_by_invite(raw_code) if not target_pid: raise HTTPException(status_code=404, detail="Code ist keiner Praxis zugeordnet oder bereits abgelaufen.") if target_pid == source_pid: raise HTTPException(status_code=400, detail="Sie koennen sich nicht als externen Kontakt Ihrer eigenen Praxis anfragen.") tgt_acc = _account_record_for_practice(target_user_id, target_pid) if not tgt_acc or not _account_is_sendable(tgt_acc): raise HTTPException( status_code=404, detail="Zielbenutzer in dieser Praxis nicht gefunden oder nicht erreichbar.", ) practices = _load_practices() src_name = str((practices.get(source_pid) or {}).get("name") or "").strip() tgt_name = str((practices.get(target_pid) or {}).get("name") or "").strip() if not source_dn: try: accounts = _load_accounts() acc = accounts.get(source_uid) or {} source_dn = str(acc.get("display_name") or "").strip() or "Externer Benutzer" except Exception: source_dn = "Externer Benutzer" target_display_name = str(tgt_acc.get("display_name") or "").strip() store = _load_practice_links_store() existing = None for link in store.get("links") or []: if not isinstance(link, dict): continue if _link_contact_type(link) != _CONTACT_TYPE_PERSON: continue if link.get("source_practice_id") == source_pid \ and link.get("target_practice_id") == target_pid \ and link.get("source_user_id") == source_uid \ and (link.get("target_user_id") or "").strip() == target_user_id \ and link.get("status") not in ("removed", "rejected"): existing = link break now = _now_z() if existing is not None: existing["contact_type"] = _CONTACT_TYPE_PERSON existing["status"] = "pending_outgoing" existing["updated_at"] = now existing["source_display_name"] = source_dn existing["target_user_id"] = target_user_id existing["target_display_name"] = target_display_name if note: existing["note"] = note existing["source_practice_name"] = src_name existing["target_practice_name"] = tgt_name if not existing.get("invite_code_used"): existing["invite_code_used"] = raw_code _persist_link_update(existing["id"], existing) out = existing else: out = { "id": _generate_practice_link_id(), "contact_type": _CONTACT_TYPE_PERSON, "source_practice_id": source_pid, "target_practice_id": target_pid, "source_user_id": source_uid, "source_display_name": source_dn, "target_user_id": target_user_id, "target_display_name": target_display_name, "source_practice_name": src_name, "target_practice_name": tgt_name, "status": "pending_outgoing", "requested_by_user_id": source_uid, "created_by_user_id": source_uid, "created_at": now, "updated_at": now, "invite_code_used": raw_code, "note": note, } store.setdefault("links", []).append(out) _save_practice_links_store(store) _log.info( "AZA_EMPFANG_EXTCONT_PERSON_REQ source=%s target=%s tgtuser=%s status=%s", (source_pid or "")[:16], (target_pid or "")[:16], (target_user_id or "")[:12], out["status"], ) return JSONResponse(content={ "success": True, "external_contact": _serialize_external_contact_for(source_pid, source_uid, out), }) @router.post("/external-contacts/{link_id}/accept") async def empfang_external_contacts_accept(link_id: str, request: Request): """Akzeptiert eine eingehende externe Anfrage. - external_practice: Administrator der Zielpraxis. - external_person: nur der adressierte Zielbenutzer (target_user_id); Legacy ohne target_user_id: Administrator der Zielpraxis. """ s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") is_adm = _is_admin_session(s) link = _find_practice_link_for_user(link_id, pid, uid, is_adm) if not link: raise HTTPException(status_code=404, detail="Verbindung nicht gefunden") if link.get("target_practice_id") != pid: raise HTTPException(status_code=403, detail="Nur die Zielpraxis darf Anfragen genehmigen.") ctype = _link_contact_type(link) if ctype == _CONTACT_TYPE_PRACTICE: if not is_adm: raise HTTPException(status_code=403, detail="Nur Administratoren der Zielpraxis duerfen Anfragen akzeptieren.") elif ctype == _CONTACT_TYPE_PERSON: if not _person_may_moderate_incoming(link, uid, is_adm): raise HTTPException( status_code=403, detail="Nur der adressierte Benutzer darf diese persoenliche Anfrage annehmen.", ) else: raise HTTPException(status_code=400, detail="Unbekannter Kontakttyp") link["status"] = "accepted" link["approved_by_user_id"] = uid link["accepted_by_user_id"] = uid link["updated_at"] = _now_z() _persist_link_update(link_id, link) _log.info( "AZA_EMPFANG_EXTCONT_ACCEPTED practice=%s link=%s type=%s", (pid or "")[:16], (link_id or "")[:16], ctype, ) return JSONResponse(content={ "success": True, "external_contact": _serialize_external_contact_for(pid, uid, link), }) @router.post("/external-contacts/{link_id}/reject") async def empfang_external_contacts_reject(link_id: str, request: Request): """Lehnt eine externe Anfrage ab (Praxis: Admin; Person: Zielbenutzer).""" s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") is_adm = _is_admin_session(s) link = _find_practice_link_for_user(link_id, pid, uid, is_adm) if not link: raise HTTPException(status_code=404, detail="Verbindung nicht gefunden") if link.get("target_practice_id") != pid: raise HTTPException(status_code=403, detail="Nur die Zielpraxis darf Anfragen ablehnen.") ctype = _link_contact_type(link) if ctype == _CONTACT_TYPE_PRACTICE: if not is_adm: raise HTTPException(status_code=403, detail="Nur Administratoren der Zielpraxis duerfen Anfragen ablehnen.") elif ctype == _CONTACT_TYPE_PERSON: if not _person_may_moderate_incoming(link, uid, is_adm): raise HTTPException(status_code=403, detail="Keine Berechtigung fuer diese persoenliche Anfrage.") else: raise HTTPException(status_code=400, detail="Unbekannter Kontakttyp") link["status"] = "rejected" link["approved_by_user_id"] = uid link["updated_at"] = _now_z() _persist_link_update(link_id, link) _log.info( "AZA_EMPFANG_EXTCONT_REJECTED practice=%s link=%s type=%s", (pid or "")[:16], (link_id or "")[:16], ctype, ) return JSONResponse(content={ "success": True, "external_contact": _serialize_external_contact_for(pid, uid, link), }) @router.post("/external-contacts/{link_id}/block") async def empfang_external_contacts_block(link_id: str, request: Request): """Blockiert eine externe Verbindung (Praxis: Admin; Person: Zielbenutzer).""" s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") is_adm = _is_admin_session(s) link = _find_practice_link_for_user(link_id, pid, uid, is_adm) if not link: raise HTTPException(status_code=404, detail="Verbindung nicht gefunden") if link.get("target_practice_id") != pid: raise HTTPException(status_code=403, detail="Nur die Zielpraxis darf Verbindungen blockieren.") ctype = _link_contact_type(link) if ctype == _CONTACT_TYPE_PRACTICE: if not is_adm: raise HTTPException(status_code=403, detail="Nur Administratoren duerfen blockieren.") elif ctype == _CONTACT_TYPE_PERSON: if not _person_may_moderate_incoming(link, uid, is_adm): raise HTTPException(status_code=403, detail="Keine Berechtigung.") else: raise HTTPException(status_code=400, detail="Unbekannter Kontakttyp") link["status"] = "blocked" link["approved_by_user_id"] = uid link["blocked_by_practice_id"] = pid link["updated_at"] = _now_z() _persist_link_update(link_id, link) _log.info( "AZA_EMPFANG_EXTCONT_BLOCKED practice=%s link=%s type=%s", (pid or "")[:16], (link_id or "")[:16], ctype, ) return JSONResponse(content={ "success": True, "external_contact": _serialize_external_contact_for(pid, uid, link), }) @router.delete("/external-contacts/{link_id}") async def empfang_external_contacts_remove(link_id: str, request: Request): """Entfernt die externe Verbindung aus eigener Sicht (status=removed). Persoenliche Kontakte: nur die beiden beteiligten Benutzer (bzw. Legacy-Admin). Praxis-zu-Praxis: jedes Mitglied der beteiligten Praxis. """ s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") is_adm = _is_admin_session(s) link = _find_practice_link_for_user(link_id, pid, uid, is_adm) if not link: return JSONResponse(content={"success": True, "removed": 0}) if not _person_link_may_remove(link, pid, uid, is_adm): raise HTTPException(status_code=403, detail="Keine Berechtigung zum Entfernen dieser Verbindung.") link["status"] = "removed" link["updated_at"] = _now_z() _persist_link_update(link_id, link) _log.info( "AZA_EMPFANG_EXTCONT_REMOVED practice=%s link=%s type=%s", (pid or "")[:16], (link_id or "")[:16], _link_contact_type(link), ) return JSONResponse(content={"success": True, "removed": 1}) # ===================================================================== # Admin-Verwaltung pro Praxis (license-anchored, mit Last-Admin-Schutz) # ===================================================================== # # Sicherheitsmodell: # - Adminrolle ist striktes practice-Property. Ein Admin von Praxis A # ist NICHT automatisch Admin in Praxis B. # - Der allererste Admin entsteht ueber das Hauptprogramm bei der # Lizenzaktivierung (siehe auth/provision: bootstrap_admin_allowed). # - Spaetere Admins werden nur durch einen bereits autorisierten # Admin (rolle=admin in derselben practice_id) gesetzt oder # entzogen. Ausnahme: ist eine Praxis komplett adminlos, kann ein # Mitglied dieser Praxis sich selbst ueber /repair-no-admin als # Admin festlegen ("Self-Repair", aber NUR in der eigenen Praxis). # - Externe Kontakte werden niemals als lokale Admins gefuehrt: # accounts.json enthaelt nur lokale Benutzer; externe Personen sind # in practice_links.json gespeichert und nie Teil von accounts. # - Letzter Admin kann nicht entzogen werden (HTTP 400). Ein anderer # Admin muss erst gesetzt werden. _ADMIN_SOURCE_LICENSE = "license_activation" _ADMIN_SOURCE_MANUAL = "manual_admin_assignment" _ADMIN_SOURCE_REPAIR = "manual_admin_assignment" # gleicher Quelltext im Audit _ADMIN_SOURCE_LEGACY = "legacy" _ADMIN_SOURCE_LICENSE_JOIN_EXISTING = "license_join_existing" def _account_has_practice_admin_privileges(acc: Optional[dict]) -> bool: """Praxis-Administrator im Sinn der Admin-APIs (Rolle admin oder Arzt mit Office-Lizenz-Beitritt).""" if not isinstance(acc, dict): return False r = (acc.get("role") or "").strip().lower() if r == "admin": return True if r == "arzt" and (acc.get("admin_source") or "").strip() == _ADMIN_SOURCE_LICENSE_JOIN_EXISTING: return True return False def _public_account_for_admin_ui(acc: dict) -> dict: """Pro Account ein kompakter Diagnose-/Admin-UI-Datensatz. Es werden KEINE Passwort-Hashes oder sensiblen Felder preisgegeben. `admin_source` wird mit "legacy" befuellt, wenn der Account Admin ist, aber keine explizite Quelle gespeichert hat (historische Daten). """ role = (acc.get("role") or "").strip() src_stored = (acc.get("admin_source") or "").strip() if role == "admin" and not src_stored: src_stored = _ADMIN_SOURCE_LEGACY ap = _account_has_practice_admin_privileges(acc) src_out = "" if ap: src_out = (acc.get("admin_source") or "").strip() if role.lower() == "admin" and not src_out: src_out = _ADMIN_SOURCE_LEGACY return { "user_id": str(acc.get("user_id") or ""), "display_name": str(acc.get("display_name") or ""), "login_name": str(acc.get("login_name") or ""), "email": str(acc.get("email") or ""), "role": role, "status": str(acc.get("status") or "active"), "admin_source": src_out, "created": str(acc.get("created") or ""), "last_login": str(acc.get("last_login") or ""), } def _admins_for_practice(accounts: dict, pid: str) -> list: return [ a for a in accounts.values() if a.get("practice_id") == pid and _account_has_practice_admin_privileges(a) and (a.get("status") or "active") != "deactivated" ] def _members_for_practice(accounts: dict, pid: str) -> list: return [a for a in accounts.values() if a.get("practice_id") == pid] def _require_self_practice_admin(request: Request) -> dict: """Liefert die Session, wenn sie Admin-Rolle in der eigenen Praxis hat.""" s = _require_session(request) if not _is_admin_session(s): raise HTTPException(status_code=403, detail="Nur Administratoren der eigenen Praxis duerfen diese Aktion ausfuehren.") return s @router.get("/admin/diagnosis") async def empfang_admin_diagnosis(request: Request): """Diagnose-Sicht der EIGENEN Praxis fuer den Profil-UI. Erfordert eine bestehende Session; jeder Benutzer der Praxis darf seine eigene Praxis-Diagnose lesen (auch MPA/Arzt). Es werden NUR nicht-sensible Felder ausgegeben (keine Passwoerter, keine Patienten- daten, keine externen Kontakte). """ s = _require_session(request) pid = (s.get("practice_id") or "").strip() if not pid: raise HTTPException(status_code=400, detail="Session unvollstaendig") practices = _load_practices() pdata = practices.get(pid) or {} accounts = _load_accounts() members = _members_for_practice(accounts, pid) members.sort( key=lambda a: ( not _account_has_practice_admin_privileges(a), str(a.get("display_name") or "").lower(), ), ) admins = _admins_for_practice(accounts, pid) return JSONResponse(content={ "success": True, "practice": { "practice_id": pid, "practice_name": str(pdata.get("name") or "").strip(), "admin_email_on_practice": str(pdata.get("admin_email") or "").strip(), "created": str(pdata.get("created") or ""), }, "has_admin": bool(admins), "admin_count": len(admins), "members": [_public_account_for_admin_ui(a) for a in members], "own_user_id": str(s.get("user_id") or ""), "own_role": str(s.get("role") or ""), }) @router.get("/admin/admins") async def empfang_admin_admins(request: Request): """Kurzliste der Admins der eigenen Praxis. Fuer Header/Quick-Info.""" s = _require_session(request) pid = (s.get("practice_id") or "").strip() if not pid: raise HTTPException(status_code=400, detail="Session unvollstaendig") accounts = _load_accounts() admins = _admins_for_practice(accounts, pid) return JSONResponse(content={ "success": True, "admins": [_public_account_for_admin_ui(a) for a in admins], }) @router.post("/admin/set-admin/{user_id}") async def empfang_admin_set_admin(user_id: str, request: Request): """Bestehender Admin der Praxis setzt einen anderen Benutzer derselben Praxis ebenfalls als Admin. Externe Kontakte (sind nicht in accounts) sind automatisch ausgeschlossen. """ s = _require_self_practice_admin(request) pid = (s.get("practice_id") or "").strip() accounts = _load_accounts() target = accounts.get(user_id) if not target or target.get("practice_id") != pid: raise HTTPException(status_code=404, detail="Benutzer nicht in dieser Praxis gefunden") if (target.get("status") or "active") == "deactivated": raise HTTPException(status_code=400, detail="Deaktivierte Konten koennen nicht zum Admin gemacht werden.") if _account_has_practice_admin_privileges(target): return JSONResponse(content={ "success": True, "user": _public_account_for_admin_ui(target), "no_op": True, }) target["role"] = "admin" target["admin_source"] = _ADMIN_SOURCE_MANUAL target["admin_assigned_by_user_id"] = str(s.get("user_id") or "") target["admin_assigned_at"] = _now_z() accounts[user_id] = target _save_accounts(accounts) _log.info( "AZA_EMPFANG_ADMIN_SET practice=%s by=%s target=%s", (pid or "")[:16], (s.get("user_id") or "")[:16], (user_id or "")[:16], ) return JSONResponse(content={ "success": True, "user": _public_account_for_admin_ui(target), }) @router.post("/admin/revoke-admin/{user_id}") async def empfang_admin_revoke_admin(user_id: str, request: Request): """Adminrechte eines Benutzers in der eigenen Praxis entziehen. Last-Admin-Schutz: wenn der Zielbenutzer der letzte verbleibende Admin der Praxis waere, wird mit HTTP 400 abgelehnt. """ s = _require_self_practice_admin(request) pid = (s.get("practice_id") or "").strip() accounts = _load_accounts() target = accounts.get(user_id) if not target or target.get("practice_id") != pid: raise HTTPException(status_code=404, detail="Benutzer nicht in dieser Praxis gefunden") if not _account_has_practice_admin_privileges(target): return JSONResponse(content={ "success": True, "user": _public_account_for_admin_ui(target), "no_op": True, }) admins_now = _admins_for_practice(accounts, pid) other_admins = [a for a in admins_now if a.get("user_id") != user_id] if not other_admins: raise HTTPException( status_code=400, detail="Letzter Admin kann nicht entzogen werden. Bitte zuerst einen anderen Benutzer als Admin festlegen.", ) target["role"] = "arzt" target["admin_source"] = "" target["admin_revoked_by_user_id"] = str(s.get("user_id") or "") target["admin_revoked_at"] = _now_z() accounts[user_id] = target _save_accounts(accounts) _log.info( "AZA_EMPFANG_ADMIN_REVOKE practice=%s by=%s target=%s", (pid or "")[:16], (s.get("user_id") or "")[:16], (user_id or "")[:16], ) return JSONResponse(content={ "success": True, "user": _public_account_for_admin_ui(target), }) @router.post("/admin/repair-no-admin") async def empfang_admin_repair_no_admin(request: Request): """Self-Repair: Wenn die Praxis aktuell KEINEN Admin hat, darf der aktuell angemeldete Benutzer sich selbst als Admin der eigenen Praxis festlegen. Wenn bereits ein Admin existiert, wird mit HTTP 400 abgelehnt (dann muss /set-admin durch den Admin verwendet werden). """ s = _require_session(request) pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if not pid or not uid: raise HTTPException(status_code=400, detail="Session unvollstaendig") accounts = _load_accounts() admins_now = _admins_for_practice(accounts, pid) if admins_now: raise HTTPException( status_code=400, detail=( "Diese Praxis hat bereits einen Administrator. Bitte den " "bestehenden Administrator bitten, Sie zum Administrator zu " "machen." ), ) target = accounts.get(uid) if not target or target.get("practice_id") != pid: raise HTTPException(status_code=403, detail="Benutzer gehoert nicht zur eigenen Praxis.") if (target.get("status") or "active") == "deactivated": raise HTTPException(status_code=400, detail="Deaktivierte Konten koennen sich nicht zum Admin machen.") target["role"] = "admin" target["admin_source"] = _ADMIN_SOURCE_REPAIR target["admin_assigned_at"] = _now_z() accounts[uid] = target _save_accounts(accounts) _log.info( "AZA_EMPFANG_ADMIN_SELF_REPAIR practice=%s user=%s", (pid or "")[:16], (uid or "")[:16], ) return JSONResponse(content={ "success": True, "user": _public_account_for_admin_ui(target), }) @router.post("/auth/regenerate_invite") async def auth_regenerate_invite(request: Request): """Erzeugt einen neuen Chat-Einladungscode (Admin-Session oder API-Token).""" api_token = request.headers.get("X-API-Token", "") s = _session_from_request(request) if s: if not _is_admin_session(s): raise HTTPException(status_code=403, detail="Nur Admin darf Einladungscode erneuern") pid = s["practice_id"] elif api_token: pid = request.headers.get("X-Practice-Id", "").strip() if not pid: raise HTTPException(status_code=400, detail="X-Practice-Id Header erforderlich") else: raise HTTPException(status_code=401, detail="Nicht authentifiziert") _ensure_practice(pid) practices = _load_practices() if pid in practices: practices[pid]["invite_code"] = _generate_chat_invite_code() _save_practices(practices) return JSONResponse(content={ "success": True, "invite_code": practices.get(pid, {}).get("invite_code", ""), }) def _auth_provision_profile_attach( practices: dict, pid: str, account: Optional[dict], ) -> dict: """Lesbare Profil-Metadaten fuer Desktop-Sync (keine Secrets).""" p = practices.get(pid) or {} out: dict = { "practice_name": str(p.get("name") or "").strip(), "practice_specialty": str(p.get("specialty") or "").strip(), "practice_phone": str(p.get("phone") or "").strip(), "practice_contact_email": str(p.get("contact_email") or "").strip(), } try: from stripe_routes import lookup_license_email_for_practice lm = (lookup_license_email_for_practice(pid) or "").strip() if lm: out["license_customer_email"] = lm except Exception: pass if account and isinstance(account, dict): out["account_display_name"] = str(account.get("display_name") or "").strip() out["account_email"] = str(account.get("email") or "").strip() out["account_role"] = str(account.get("role") or "").strip() out["account_specialty"] = str( account.get("specialty") or account.get("desktop_specialty") or "", ).strip() out["account_title"] = str( account.get("title") or account.get("desktop_title") or "", ).strip() out["account_job_function"] = str( account.get("job_function") or account.get("function") or "", ).strip() return out def _weak_practice_public_name(name: str) -> bool: s = " ".join((name or "").strip().lower().split()) return (not s) or s in frozenset({"meine praxis"}) or len(s) < 2 def _practice_profile_public_shape(p: dict, pid: str) -> dict: return { "practice_id": pid, "name": str(p.get("name") or "").strip(), "specialty": str(p.get("specialty") or "").strip(), "phone": str(p.get("phone") or "").strip(), "address": str(p.get("address") or "").strip(), "website": str(p.get("website") or "").strip(), "contact_email": str(p.get("contact_email") or "").strip(), "admin_email": str(p.get("admin_email") or "").strip(), "profile_updated_at": str(p.get("profile_updated_at") or "").strip(), "profile_updated_by_user_id": str( p.get("profile_updated_by_user_id") or "", ).strip(), } def _user_profile_public_shape(acc: dict) -> dict: return { "user_id": str(acc.get("user_id") or "").strip(), "display_name": str(acc.get("display_name") or "").strip(), "title": str(acc.get("title") or "").strip(), "role": str(acc.get("role") or "").strip(), "email": str(acc.get("email") or "").strip(), "job_function": str( acc.get("job_function") or acc.get("function") or "", ).strip(), "specialty_user": str(acc.get("specialty") or "").strip(), "profile_updated_at": str(acc.get("profile_updated_at") or "").strip(), } def _practice_profile_warnings_payload( practice: dict, user: Optional[dict], license_customer_email: str, ) -> list[str]: w: list[str] = [] nm = str(practice.get("name") or "").strip() if _weak_practice_public_name(nm): w.append("Praxisname fehlt oder ist nur Platzhalter") if not str(practice.get("specialty") or "").strip(): w.append("Fachrichtung (Praxis) ist nicht hinterlegt") if not str(practice.get("admin_email") or "").strip(): w.append("Admin-E-Mail der Praxis fehlt") if isinstance(user, dict) and user: if not str(user.get("display_name") or "").strip(): w.append("Benutzerprofil: Anzeigename fehlt") if not str(user.get("email") or "").strip(): w.append("Benutzerprofil: E-Mail fehlt") if not ( str(user.get("specialty_user") or "").strip() or str(user.get("title") or "").strip() ): w.append("Benutzerprofil: Titel oder Fachrichtung fehlt") if (license_customer_email or "").strip() and _weak_practice_public_name(nm): w.append( "Lizenz-E-Mail ist bekannt, aber das Praxisprofil wirkt unvollstaendig", ) return w def _entitlements_from_lookup_key(lookup_key: Optional[str]) -> tuple[bool, bool]: """office_allowed, chat_allowed — Rueckwaertscompat: ohne Marker = Office+Chat. Explizite Chat-only-Produkte erkennen wir nur an klar erkennbaren lookup_key-Mustern (keineHeuristik aus Namen). """ lk = (lookup_key or "").strip().lower() if not lk: return True, True chat_only_markers = ( "chat_only", "chat-only", "aza_chat_only", "empfang_only", "minichat_only", "chatonly", ) for m in chat_only_markers: if m in lk: return False, True return True, True def internal_license_join_existing_practice_account( *, target_practice_id: str, name: str, email: str, password: str, assign_admin: bool, license_customer_email: str, body: dict, ) -> dict: """Server-intern (ohne Empfang-API-Token): Konto in **bestehender** Praxis. Wird von ``POST /license/join_existing_practice`` nach erfolgreicher Lizenz-Anbindung aufgerufen. Legt keine neue Praxis an und benennt bestehende Praxisdaten nicht um. """ target_practice_id = (target_practice_id or "").strip() if not target_practice_id: raise HTTPException(status_code=400, detail="Ziel-practice_id fehlt") nm = (name or "").strip() em = (email or "").strip() pw = (password or "").strip() if not nm or not em or not pw or len(pw) < 4: raise HTTPException( status_code=400, detail="Name, E-Mail und Passwort (min. 4 Zeichen) erforderlich", ) lce = (license_customer_email or "").strip().lower() if lce and em.lower() != lce: raise HTTPException( status_code=403, detail="E-Mail stimmt nicht mit der Lizenz-Kundenadresse ueberein.", ) practices = _load_practices() if target_practice_id not in practices: raise HTTPException( status_code=404, detail="Praxis zum Einladungscode nicht gefunden oder nicht provisioniert.", ) # Nur sicherstellen, dass Datensatz existiert (ohne Namen zu ueberschreiben). _ensure_practice(target_practice_id) accounts = _load_accounts() scoped_pr = [a for a in accounts.values() if a.get("practice_id") == target_practice_id] target = None email_lower = em.lower() if em else "" if email_lower: email_matches = [ a for a in scoped_pr if (a.get("email") or "").strip().lower() == email_lower ] if len(email_matches) == 1: target = email_matches[0] if target is None: for a in scoped_pr: if _normalize_login_username(a.get("display_name") or "") == _normalize_login_username(nm): target = a break if target: pw_hash, pw_salt = _hash_password(pw) target["pw_hash"] = pw_hash target["pw_salt"] = pw_salt if em: target["email"] = em incoming_dn = nm cur_dn_existing = (target.get("display_name") or "").strip() if incoming_dn and not cur_dn_existing: target["display_name"] = incoming_dn if not (target.get("login_name") or "").strip(): target["login_name"] = _preferred_unique_login_for_display( accounts, target_practice_id, nm, str(target.get("user_id") or ""), ) ds = " ".join( (body.get("desktop_specialty") or body.get("specialty") or "").strip().split() ) dt = " ".join( (body.get("desktop_title") or body.get("title") or "").strip().split() ) if ds and len(ds) <= 160 and not (str(target.get("specialty") or "").strip()): target["specialty"] = ds if dt and len(dt) <= 80 and not (str(target.get("title") or "").strip()): target["title"] = dt if assign_admin: r0 = (target.get("role") or "").strip().lower() if r0 != "admin": target["role"] = "arzt" target["admin_source"] = _ADMIN_SOURCE_LICENSE_JOIN_EXISTING _save_accounts(accounts) practices_snap = _load_practices() out: dict = { "success": True, "user_id": target["user_id"], "display_name": target.get("display_name"), "role": target.get("role"), "admin": _account_has_practice_admin_privileges(target), "practice_id": target_practice_id, "action": "updated", } out.update(_auth_provision_profile_attach(practices_snap, target_practice_id, target)) return out role = "arzt" admin_source = _ADMIN_SOURCE_LICENSE_JOIN_EXISTING if assign_admin else "" uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(pw) ln_pv = _preferred_unique_login_for_display(accounts, target_practice_id, nm, "") new_account = { "user_id": uid, "practice_id": target_practice_id, "display_name": nm, "email": em, "login_name": ln_pv, "role": role, "pw_hash": pw_hash, "pw_salt": pw_salt, "status": "active", "created": time.strftime("%Y-%m-%d %H:%M:%S"), } ds2 = " ".join( (body.get("desktop_specialty") or body.get("specialty") or "").strip().split() ) dt2 = " ".join( (body.get("desktop_title") or body.get("title") or "").strip().split() ) if ds2 and len(ds2) <= 160: new_account["specialty"] = ds2 if dt2 and len(dt2) <= 80: new_account["title"] = dt2 if admin_source: new_account["admin_source"] = admin_source accounts[uid] = new_account _save_accounts(accounts) _log.info( "AZA_EMPFANG_LICENSE_JOIN_ACCOUNT practice=%s uid=%s role=%s", (target_practice_id or "")[:16], (uid or "")[:16], role, ) practices_snap = _load_practices() acc_ref = accounts.get(uid) out2: dict = { "success": True, "user_id": uid, "display_name": nm, "role": role, "admin": _account_has_practice_admin_privileges(acc_ref), "practice_id": target_practice_id, "action": "created", } out2.update(_auth_provision_profile_attach(practices_snap, target_practice_id, acc_ref)) return out2 @router.post("/auth/provision") async def auth_provision(request: Request): """Provisioning: Desktop-App erstellt/findet Server-Account. Authentifiziert via X-API-Token (Backend-Token), nicht via Session. Erstellt bei Bedarf eine neue Praxis mit echter practice_id.""" api_token = request.headers.get("X-API-Token", "") if not api_token: raise HTTPException(status_code=401, detail="API-Token erforderlich") try: body = await request.json() except Exception: body = {} name = (body.get("name") or "").strip() email = (body.get("email") or "").strip() password = (body.get("password") or "").strip() practice_name = (body.get("practice_name") or "").strip() invite_code_in = (body.get("invite_code") or "").strip() if not name: raise HTTPException(status_code=400, detail="Name erforderlich") if not password or len(password) < 4: raise HTTPException(status_code=400, detail="Passwort (min. 4 Zeichen) erforderlich") pid = "" # Bewusster Beitritt zu einem bestehenden Praxis-Chat per Einladungscode: # ueberschreibt X-Practice-Id / practice_id im Body — sonst legt jedes neue # Geraet ohne gespeicherte practice_id eine eigene Praxis an (Realbefund). if invite_code_in: practices = _load_practices() want = _invite_code_key(invite_code_in) target_pid = None for pida, pdata in practices.items(): if _invite_code_key(pdata.get("invite_code")) == want: target_pid = pida break if not target_pid: raise HTTPException( status_code=403, detail="Ungueltiger Chat-Einladungscode — Praxis nicht gefunden.", ) pid = target_pid else: pid = request.headers.get("X-Practice-Id", "").strip() pid = pid or (body.get("practice_id") or "").strip() resolved_from_license = False resolved_practice_name = "" if not pid: license_key_in = (body.get("license_key") or "").strip() cand_list: list[str] = [] seen_c: set[str] = set() try: from stripe_routes import ( list_distinct_practice_ids_for_license_email, list_distinct_practice_ids_for_license_key, ) for p in list_distinct_practice_ids_for_license_email(email): p = (p or "").strip() if p and p not in seen_c: seen_c.add(p) cand_list.append(p) for p in list_distinct_practice_ids_for_license_key(license_key_in): p = (p or "").strip() if p and p not in seen_c: seen_c.add(p) cand_list.append(p) except Exception as exc: print(f"[EMPFANG] license practice resolution: {exc}") if len(cand_list) > 1: practices = _load_practices() candidates_payload = [] for p in cand_list: pdata = practices.get(p) or {} pname = (pdata.get("name") or "").strip() or p candidates_payload.append({"practice_id": p, "practice_name": pname}) return JSONResponse(content={ "success": False, "step": "choose_practice", "message": ( "Mehrere Praxen passen zu dieser Lizenz bzw. E-Mail. " "Bitte waehlen Sie die Praxis aus, der dieses Geraet zugehoeren soll." ), "candidates": candidates_payload, }) if len(cand_list) == 1: pid = cand_list[0] resolved_from_license = True practices = _load_practices() pdata = practices.get(pid) or {} resolved_practice_name = (pdata.get("name") or "").strip() if not pid: practices = _load_practices() has_legacy = _LEGACY_DEFAULT_PID in practices accounts = _load_accounts() has_legacy_accounts = any( a.get("practice_id") == _LEGACY_DEFAULT_PID for a in accounts.values()) if has_legacy or has_legacy_accounts: pid = _generate_practice_id() _migrate_legacy_to_practice(pid) else: pid = _generate_practice_id() practice = _ensure_practice(pid, name=practice_name or "Meine Praxis", admin_email=email) if practice_name or email: practices = _load_practices() raw_entry = practices.get(pid) entry = dict(raw_entry) if isinstance(raw_entry, dict) else {} entry["practice_id"] = pid changed_pr = False pnm = " ".join((practice_name or "").strip().split()) cur_nm = str(entry.get("name") or "").strip() if pnm and _weak_practice_public_name(cur_nm) and not _weak_practice_public_name(pnm): entry["name"] = pnm[:240] changed_pr = True elif pnm and not _weak_practice_public_name(cur_nm) and pnm != cur_nm: # Bewusste Umbenennung gehoert in PATCH /practice/profile, nicht in Provision. pass if email: prev_ae = str(entry.get("admin_email") or "").strip() if not prev_ae: entry["admin_email"] = email.strip()[:240] changed_pr = True if changed_pr: practices[pid] = entry _save_practices(practices) accounts = _load_accounts() target = None scoped_pr = [a for a in accounts.values() if a.get("practice_id") == pid] email_lower = email.lower() if email else "" if email_lower: email_matches = [ a for a in scoped_pr if (a.get("email") or "").strip().lower() == email_lower ] if len(email_matches) == 1: target = email_matches[0] if target is None: for a in scoped_pr: if _normalize_login_username(a.get("display_name") or "") == _normalize_login_username(name): target = a break if target: pw_hash, pw_salt = _hash_password(password) target["pw_hash"] = pw_hash target["pw_salt"] = pw_salt if email: target["email"] = email incoming_dn = (name or "").strip() cur_dn_existing = (target.get("display_name") or "").strip() if incoming_dn and not cur_dn_existing: target["display_name"] = incoming_dn if not (target.get("login_name") or "").strip(): target["login_name"] = _preferred_unique_login_for_display( accounts, pid, name, str(target.get("user_id") or ""), ) ds = " ".join( (body.get("desktop_specialty") or body.get("specialty") or "").strip().split() ) dt = " ".join( (body.get("desktop_title") or body.get("title") or "").strip().split() ) if ds and len(ds) <= 160 and not (str(target.get("specialty") or "").strip()): target["specialty"] = ds if dt and len(dt) <= 80 and not (str(target.get("title") or "").strip()): target["title"] = dt ps_practice = " ".join( ( body.get("practice_specialty") or body.get("desktop_practice_specialty") or body.get("desktop_specialty") or body.get("specialty") or "" ) .strip() .split() ) if ps_practice and len(ps_practice) <= 160: practices_mut = _load_practices() ex = practices_mut.get(pid) if isinstance(ex, dict) and not (str(ex.get("specialty") or "").strip()): ex2 = dict(ex) ex2["specialty"] = ps_practice practices_mut[pid] = ex2 _save_practices(practices_mut) # STRENGE Admin-Safety bei bestehendem Account in der Zielpraxis: # Ein bereits existierendes Konto soll NICHT mehr stillschweigend zum # Admin promoviert werden, nur weil aktuell kein Admin in der Praxis # existiert. Auto-Admin gilt ausschliesslich beim erstmaligen Bootstrap # einer neuen Praxis ueber Lizenzaktivierung (siehe Block unten beim # erstmaligen Anlegen eines Accounts). Adminlose Praxen werden ueber # /empfang/admin/repair-no-admin oder /empfang/admin/set-admin # nachtraeglich versorgt. _save_accounts(accounts) practices_snap = _load_practices() body_out = { "success": True, "user_id": target["user_id"], "display_name": target["display_name"], "role": target["role"], "practice_id": pid, "action": "updated", } body_out.update(_auth_provision_profile_attach(practices_snap, pid, target)) if resolved_from_license: body_out["resolved_existing_practice"] = True if resolved_practice_name: body_out["resolved_practice_name"] = resolved_practice_name return JSONResponse(content=body_out) # STRENGE Auto-Admin-Regel bei NEU angelegtem Account: # Admin nur, wenn ALLE Bedingungen erfuellt sind: # (1) Die Praxis hat aktuell KEINEN einzigen Account # (echter Bootstrap, nicht nur "kein Admin"). # (2) KEIN invite_code im Request (Beitritts-Pfad ausgeschlossen). # (3) license_key wurde mitgeschickt -> bindet den Auto-Admin an # die Lizenzaktivierung des Hauptprogramms. # Andernfalls bekommt der neue Account `role="arzt"` (nicht Admin). # Adminlose Praxen koennen ueber /empfang/admin/repair-no-admin oder # /empfang/admin/set-admin durch einen bestehenden Admin (oder durch # den ersten legitimen Benutzer) versorgt werden. has_admin = any( _account_has_practice_admin_privileges(a) and a.get("practice_id") == pid for a in accounts.values() ) has_any_account_in_practice = any( a.get("practice_id") == pid for a in accounts.values() ) license_key_provided = bool((body.get("license_key") or "").strip()) bootstrap_admin_allowed = ( not has_any_account_in_practice and not invite_code_in and license_key_provided ) if bootstrap_admin_allowed: role = "admin" admin_source = "license_activation" else: role = "arzt" admin_source = "" uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(password) ln_pv = _preferred_unique_login_for_display(accounts, pid, name, "") new_account = { "user_id": uid, "practice_id": pid, "display_name": name, "email": email, "login_name": ln_pv, "role": role, "pw_hash": pw_hash, "pw_salt": pw_salt, "status": "active", "created": time.strftime("%Y-%m-%d %H:%M:%S"), } ds = " ".join( (body.get("desktop_specialty") or body.get("specialty") or "").strip().split() ) dt = " ".join( (body.get("desktop_title") or body.get("title") or "").strip().split() ) if ds and len(ds) <= 160: new_account["specialty"] = ds if dt and len(dt) <= 80: new_account["title"] = dt if admin_source: new_account["admin_source"] = admin_source accounts[uid] = new_account _save_accounts(accounts) if bootstrap_admin_allowed: _log.info( "AZA_EMPFANG_ADMIN_BOOTSTRAP practice=%s uid=%s source=license_activation has_admin_before=%s", (pid or "")[:16], (uid or "")[:16], int(has_admin), ) else: _log.info( "AZA_EMPFANG_ACCOUNT_CREATED practice=%s uid=%s role=%s " "has_admin=%s had_accounts=%s invite=%s license=%s", (pid or "")[:16], (uid or "")[:16], role, int(has_admin), int(has_any_account_in_practice), int(bool(invite_code_in)), int(license_key_provided), ) body_created = { "success": True, "user_id": uid, "display_name": name, "role": role, "practice_id": pid, "action": "created", } practices_snap = _load_practices() acc_ref = accounts.get(uid) body_created.update(_auth_provision_profile_attach(practices_snap, pid, acc_ref)) if resolved_from_license: body_created["resolved_existing_practice"] = True if resolved_practice_name: body_created["resolved_practice_name"] = resolved_practice_name return JSONResponse(content=body_created) @router.post("/auth/forgot_password") async def auth_forgot_password(request: Request): """Passwort-Reset mit Benutzername oder E-Mail; kein automatisches Konten-Waehlen bei gleicher E-Mail.""" try: body = await request.json() except Exception: body = {} raw = (body.get("login") or body.get("email") or body.get("name") or "").strip() pid = _practice_id_from_client(request, body) if not raw: raise HTTPException( status_code=400, detail="Benutzername oder E-Mail erforderlich", ) accounts = _load_accounts() ambiguous_email_body = { "success": False, "step": "ambiguous_email", "message": ( "Diese E-Mail ist mehreren Benutzern zugeordnet. " "Geben Sie bitte Ihren Benutzernamen ein, damit das richtige Konto eindeutig ist." ), "reset_token_created": False, "target_email_masked": "", "mail_delivered": False, "attempted_delivery": False, } if _is_likely_email(raw): em = _norm_email(raw) if pid: scoped = [a for a in accounts.values() if a.get("practice_id") == pid] matches = [ a for a in scoped if _norm_email(a.get("email") or "") == em ] else: matches = [ a for a in accounts.values() if _norm_email(a.get("email") or "") == em ] if len(matches) == 0: return JSONResponse(content=_forgot_password_neutral_payload()) if len(matches) == 1: return JSONResponse(content=_send_reset_for_account(matches[0])) return JSONResponse(content=ambiguous_email_body) if pid: scoped = [a for a in accounts.values() if a.get("practice_id") == pid] matches, _via_forgot = _resolve_browser_login_matches(scoped, raw) else: matches, _via_forgot = _resolve_browser_login_matches( list(accounts.values()), raw, ) if len(matches) == 0: return JSONResponse(content=_forgot_password_neutral_payload()) if len(matches) > 1: if pid: msg = ( "Dieser Benutzername ist in dieser Praxis nicht eindeutig. " "Bitte verwenden Sie Ihren eindeutigen Login-Namen oder bitten Sie den " "Administrator im Hauptfenster, fuer die Konten eindeutige Login-Namen zu setzen." ) step_code = "ambiguous_username_in_practice" else: msg = ( "Dieser Benutzername ist ohne gespeicherte Praxis mehrdeutig. " "Bitte laden Sie die Seite ueber den Einladungslink der Hauptinstallation " "oder geben Sie Ihre gemeinschaftliche E-Mail ein, wenn sie nur einem Konto gilt." ) step_code = "ambiguous_login_no_practice" return JSONResponse( content={ "success": False, "step": step_code, "message": msg, "reset_token_created": False, "target_email_masked": "", "mail_delivered": False, "attempted_delivery": False, }, ) return JSONResponse(content=_send_reset_for_account(matches[0])) @router.get("/auth/reset_verify") async def auth_reset_verify(reset_token: str = Query("")): """Prüft, ob ein Reset-Token noch gültig ist (ohne Verbrauch).""" token = (reset_token or "").strip() if not token: return JSONResponse( content={"valid": False, "detail": "Kein Reset-Token angegeben."} ) resets = _load_json(_DATA_DIR / "empfang_resets.json", {}) entry = resets.get(token) if not entry: return JSONResponse( content={"valid": False, "detail": "Ungültiger oder abgelaufener Link."} ) if time.time() - entry.get("created", 0) > RESET_LINK_TTL_SEC: return JSONResponse( content={ "valid": False, "detail": "Der Link ist abgelaufen. Bitte neuen Reset anfordern.", } ) email_raw = (entry.get("email") or "").strip() accounts = _load_accounts() uid = entry.get("user_id") acc = accounts.get(uid) if uid else None display_name = ( entry.get("display_name") or (acc or {}).get("display_name") or "" ).strip() return JSONResponse( content={ "valid": True, "display_name": display_name, "email_masked": _mask_email_for_response(email_raw), }, ) @router.post("/auth/reset_password") async def auth_reset_password(request: Request): """Setzt das Passwort mit einem gültigen Reset-Token.""" try: body = await request.json() except Exception: body = {} token = (body.get("reset_token") or "").strip() new_password = (body.get("password") or "").strip() if not token or not new_password or len(new_password) < 4: raise HTTPException(status_code=400, detail="Reset-Token und neues Passwort (min. 4 Zeichen) erforderlich") resets = _load_json(_DATA_DIR / "empfang_resets.json", {}) entry = resets.get(token) if not entry: raise HTTPException( status_code=400, detail="Ungültiger oder abgelaufener Reset-Link" ) if time.time() - entry.get("created", 0) > RESET_LINK_TTL_SEC: del resets[token] _save_json(_DATA_DIR / "empfang_resets.json", resets) raise HTTPException( status_code=400, detail="Reset-Link ist abgelaufen" ) user_id = entry["user_id"] accounts = _load_accounts() if user_id not in accounts: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") pw_hash, pw_salt = _hash_password(new_password) accounts[user_id]["pw_hash"] = pw_hash accounts[user_id]["pw_salt"] = pw_salt accounts[user_id].pop("must_change_password", None) _save_accounts(accounts) del resets[token] _save_json(_DATA_DIR / "empfang_resets.json", resets) acc = accounts[user_id] dn_hint = (acc.get("display_name") or "").strip() ln_saved = (acc.get("login_name") or "").strip() em_raw = (acc.get("email") or "").strip() return JSONResponse( content={ "success": True, "message": "Passwort wurde erfolgreich geändert.", "display_name": dn_hint, "login_name": ln_saved, "email_masked": _mask_email_for_response(em_raw), } ) def _reset_email_subject_body(display_name: str, reset_link: str) -> Tuple[str, str, str]: """Betreff, Plain-Text, HTML für Passwort-Reset.""" subject = "AZA Praxis-Chat – Passwort zurücksetzen" text = ( f"Hallo {display_name},\n\n" f"Sie haben eine Passwort-Zurücksetzung angefordert.\n\n" f"Klicken Sie auf diesen Link, um Ihr Passwort neu zu setzen:\n" f"{reset_link}\n\n" f"Der Link ist 1 Stunde gültig.\n\n" f"Falls Sie diese Anfrage nicht gestellt haben, ignorieren Sie diese E-Mail.\n\n" f"AZA Praxis-Chat" ) html = ( f"
Hallo {display_name},
" f"Sie haben eine Passwort-Zurücksetzung angefordert.
" f"" f"Der Link ist 1 Stunde gültig.
" f"Falls Sie diese Anfrage nicht gestellt haben, " f"ignorieren Sie diese E-Mail.