# -*- coding: utf-8 -*- """ AZA Empfang - Backend-Routen V5: Admin, Devices, Federation, Channels. Serverseitige Auth, Benutzer, Sessions, Nachrichten, Aufgaben, Geraeteverwaltung, Kanaele, Praxis-Federation. Alle Daten practice-scoped. Backend ist die einzige Wahrheit. """ import hashlib import hmac import json import logging import os import re import secrets import time import unicodedata import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional, Tuple from fastapi import APIRouter, Cookie, HTTPException, Query, Request, Response from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse from pydantic import BaseModel, Field router = APIRouter() _log = logging.getLogger(__name__) _DATA_DIR = Path(__file__).resolve().parent / "data" _EMPFANG_FILE = _DATA_DIR / "empfang_nachrichten.json" _PRACTICES_FILE = _DATA_DIR / "empfang_practices.json" _ACCOUNTS_FILE = _DATA_DIR / "empfang_accounts.json" _SESSIONS_FILE = _DATA_DIR / "empfang_sessions.json" _TASKS_FILE = _DATA_DIR / "empfang_tasks.json" _DEVICES_FILE = _DATA_DIR / "empfang_devices.json" _CHANNELS_FILE = _DATA_DIR / "empfang_channels.json" _CONNECTIONS_FILE = _DATA_DIR / "empfang_connections.json" _LEGACY_DEFAULT_PID = "default" SESSION_MAX_AGE = 30 * 24 * 3600 # 30 Tage def _generate_practice_id() -> str: return f"prac_{uuid.uuid4().hex[:12]}" def _resolve_practice_id(request: Request) -> str: """Ermittelt die practice_id aus Session, Header oder Query. Kein stiller Fallback auf eine Default-Praxis.""" s = _session_from_request(request) if s: return s["practice_id"] pid = request.headers.get("X-Practice-Id", "").strip() if pid: return pid pid = request.query_params.get("practice_id", "").strip() if pid: return pid return "" def _require_practice_id(request: Request) -> str: """Wie _resolve_practice_id, aber wirft 400 wenn keine practice_id.""" pid = _resolve_practice_id(request) if not pid: raise HTTPException( status_code=400, detail="practice_id erforderlich (X-Practice-Id Header, Session oder Query)") return pid def _ensure_data_dir(): _DATA_DIR.mkdir(parents=True, exist_ok=True) # ===================================================================== # JSON helpers (atomic write) # ===================================================================== def _load_json(path: Path, default=None): if not path.is_file(): return default if default is not None else [] try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return default if default is not None else [] def _save_json(path: Path, data): _ensure_data_dir() tmp = str(path) + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) os.replace(tmp, str(path)) # ===================================================================== # Password hashing (PBKDF2 – no external dependency) # ===================================================================== def _hash_password(password: str, salt: str = None) -> tuple[str, str]: salt = salt or secrets.token_hex(16) dk = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000) return dk.hex(), salt def _verify_password(password: str, stored_hash: str, salt: str) -> bool: dk = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000) return hmac.compare_digest(dk.hex(), stored_hash) # ===================================================================== # Practices # ===================================================================== def _load_practices() -> dict: return _load_json(_PRACTICES_FILE, {}) def _save_practices(data: dict): _save_json(_PRACTICES_FILE, data) def _invite_code_key(raw: str) -> str: """Vergleicht Einladungscodes unabhaengig von Leerzeichen und Gedankenstrich-Varianten.""" s = (raw or "").strip().upper().replace(" ", "") for ch in ("\u2011", "\u2013", "\u2014", "\u2212"): s = s.replace(ch, "-") return s def _generate_chat_invite_code() -> str: """Lesbarer Chat-Einladungscode im Format CHAT-XXXX-XXXX.""" import random chars = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" part1 = "".join(random.choices(chars, k=4)) part2 = "".join(random.choices(chars, k=4)) return f"CHAT-{part1}-{part2}" def _ensure_practice(practice_id: str, name: str = "Meine Praxis", admin_email: str = "") -> dict: """Stellt sicher, dass eine Praxis mit dieser ID existiert.""" practices = _load_practices() if practice_id not in practices: practices[practice_id] = { "practice_id": practice_id, "name": name, "invite_code": _generate_chat_invite_code(), "admin_email": admin_email, "created": time.strftime("%Y-%m-%d %H:%M:%S"), } _save_practices(practices) return practices[practice_id] def _migrate_legacy_to_practice(new_pid: str): """Migriert alle Daten von der alten 'default'-Praxis zur neuen practice_id. Wird einmalig beim ersten Provisioning aufgerufen.""" accounts = _load_accounts() migrated = False for a in accounts.values(): if a.get("practice_id") == _LEGACY_DEFAULT_PID: a["practice_id"] = new_pid migrated = True if migrated: _save_accounts(accounts) devices = _load_devices() for d in devices.values(): if d.get("practice_id") == _LEGACY_DEFAULT_PID: d["practice_id"] = new_pid _save_devices(devices) sessions = _load_sessions() for s in sessions.values(): if s.get("practice_id") == _LEGACY_DEFAULT_PID: s["practice_id"] = new_pid _save_sessions(sessions) messages = _load_messages() for m in messages: if m.get("practice_id", _LEGACY_DEFAULT_PID) == _LEGACY_DEFAULT_PID: m["practice_id"] = new_pid _save_messages(messages) practices = _load_practices() if _LEGACY_DEFAULT_PID in practices: old = practices.pop(_LEGACY_DEFAULT_PID) if new_pid not in practices: old["practice_id"] = new_pid practices[new_pid] = old _save_practices(practices) try: channels = _load_channels() for c in channels: if c.get("practice_id") == _LEGACY_DEFAULT_PID: c["practice_id"] = new_pid _save_channels(channels) except Exception: pass _migrate_old_users(new_pid) def _migrate_old_users(practice_id: str): """Migriert alte empfang_users.json Strings zu echten Accounts.""" old_file = _DATA_DIR / "empfang_users.json" if not old_file.is_file(): return try: names = json.loads(old_file.read_text(encoding="utf-8")) if not isinstance(names, list): return accounts = _load_accounts() for name in names: name = name.strip() if not name: continue exists = any( a["display_name"] == name and a["practice_id"] == practice_id for a in accounts.values() ) if not exists: uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(name.lower()) accounts[uid] = { "user_id": uid, "practice_id": practice_id, "display_name": name, "role": "mpa", "pw_hash": pw_hash, "pw_salt": pw_salt, "created": time.strftime("%Y-%m-%d %H:%M:%S"), "status": "active", "last_login": "", "email": "", } _save_accounts(accounts) except Exception: pass # ===================================================================== # Accounts (practice-scoped users with auth) # ===================================================================== def _load_accounts() -> dict: return _load_json(_ACCOUNTS_FILE, {}) def _save_accounts(data: dict): _save_json(_ACCOUNTS_FILE, data) def _practice_users(practice_id: str) -> list[dict]: accounts = _load_accounts() return [ {"user_id": a["user_id"], "display_name": a["display_name"], "role": a["role"]} for a in accounts.values() if a.get("practice_id") == practice_id ] # ===================================================================== # Login / Passwort-Reset Hilfen (Mandant, Mehrfach-E-Mail) # ===================================================================== def _is_likely_email(s: str) -> bool: """Grobe Erkennung E-Mail vs. Benutzername (Anzeigename).""" s = (s or "").strip() if "@" not in s or len(s) < 5: return False parts = s.split("@", 1) if len(parts) != 2 or not parts[0] or not parts[1]: return False return "." in parts[1] def _norm_email(e: str) -> str: return (e or "").strip().lower() def _practice_id_from_client(request: Request, body: dict) -> str: """practice_id fuer Login/Forgot ohne Session: Body, Header, Query.""" pid = (body.get("practice_id") or "").strip() if pid: return pid pid = request.headers.get("X-Practice-Id", "").strip() if pid: return pid return request.query_params.get("practice_id", "").strip() def _lookup_practice_id_by_invite(invite_raw: str) -> str: """Liefert practice_id fuer einen Einladungscode (normalisierter Vergleich) oder ''.""" if not (invite_raw or "").strip(): return "" practices = _load_practices() want = _invite_code_key(invite_raw) for pida, pdata in practices.items(): if _invite_code_key(pdata.get("invite_code")) == want: return pida return "" def _practice_id_for_login(request: Request, body: dict) -> tuple[str, str]: """Login: practice_id ohne Session-Cookie (sonst ueberholt alte Session die Einladung). Reihenfolge: invite_code > Body > Header > Query. Rueckgabe: (practice_id, quelle) mit quelle in 'invite'|'body'|'header'|'query'|''|'invite_invalid' """ invite_raw = (body.get("invite_code") or "").strip() if invite_raw: got = _lookup_practice_id_by_invite(invite_raw) if got: return got, "invite" return "", "invite_invalid" pid = (body.get("practice_id") or "").strip() if pid: return pid, "body" pid = request.headers.get("X-Practice-Id", "").strip() if pid: return pid, "header" pid = request.query_params.get("practice_id", "").strip() if pid: return pid, "query" return "", "" def _practice_label(practices: dict, pid: str) -> str: p = practices.get(pid) or {} return (p.get("name") or "").strip() or pid def _send_reset_for_account(acc: dict) -> dict: """Token erstellen, Mail senden — Inhalt fuer JSONResponse(content=...).""" email_to = (acc.get("email") or "").strip() if not email_to: return { "success": False, "step": "no_email", "message": ( "Für dieses Konto ist keine E-Mail-Adresse hinterlegt. Bitte lassen Sie das " "Passwort von einer Administratorin oder einem Administrator der Praxis " "zurücksetzen." ), } reset_token = secrets.token_urlsafe(32) resets = _load_json(_DATA_DIR / "empfang_resets.json", {}) resets[reset_token] = { "user_id": acc["user_id"], "email": _norm_email(email_to), "display_name": (acc.get("display_name") or "").strip(), "practice_id": (acc.get("practice_id") or "").strip(), "created": time.time(), } for k in list(resets.keys()): if time.time() - resets[k].get("created", 0) > 3600: del resets[k] _save_json(_DATA_DIR / "empfang_resets.json", resets) _web_base = os.environ.get( "EMPFANG_WEB_BASE", "https://empfang.aza-medwork.ch/empfang" ).rstrip("/") reset_link = f"{_web_base}/?reset_token={reset_token}" _send_reset_email(email_to, acc.get("display_name", ""), reset_link) return { "success": True, "step": "sent", "message": ( "Ein Link zum Zurücksetzen wurde an die hinterlegte E-Mail-Adresse gesendet." ), } # ===================================================================== # Sessions (mit device_id) # ===================================================================== def _load_sessions() -> dict: return _load_json(_SESSIONS_FILE, {}) def _save_sessions(data: dict): _save_json(_SESSIONS_FILE, data) def _create_session(user_id: str, practice_id: str, display_name: str, role: str, device_id: str = None, user_agent: str = "", ip_addr: str = "") -> str: token = secrets.token_urlsafe(32) sessions = _load_sessions() sessions[token] = { "user_id": user_id, "practice_id": practice_id, "display_name": display_name, "role": role, "device_id": device_id or "", "created": time.time(), "last_active": time.time(), } _save_sessions(sessions) if device_id: _register_or_update_device( device_id=device_id, user_id=user_id, practice_id=practice_id, user_agent=user_agent, ip_addr=ip_addr, ) return token def _get_session(token: str) -> Optional[dict]: if not token: return None sessions = _load_sessions() s = sessions.get(token) if not s: return None if time.time() - s.get("created", 0) > SESSION_MAX_AGE: del sessions[token] _save_sessions(sessions) return None s["last_active"] = time.time() sessions[token] = s _save_sessions(sessions) return s def _delete_session(token: str): sessions = _load_sessions() if token in sessions: del sessions[token] _save_sessions(sessions) def _session_from_request(request: Request) -> Optional[dict]: token = request.cookies.get("aza_session") or "" if not token: auth = request.headers.get("Authorization", "") if auth.startswith("Bearer "): token = auth[7:] if not token: token = request.query_params.get("session_token", "") return _get_session(token) def _require_session(request: Request) -> dict: s = _session_from_request(request) if not s: raise HTTPException(status_code=401, detail="Nicht angemeldet") return s # ===================================================================== # Devices (Geraeteverwaltung) # ===================================================================== def _load_devices() -> dict: return _load_json(_DEVICES_FILE, {}) def _save_devices(data: dict): _save_json(_DEVICES_FILE, data) def _parse_device_info(user_agent: str) -> dict: """Einfache Heuristik zum Erkennen von Plattform, Geraetetyp und Name.""" ua = user_agent.lower() if "iphone" in ua: platform, device_type = "iOS", "mobile" elif "ipad" in ua: platform, device_type = "iOS", "tablet" elif "android" in ua: if "mobile" in ua: platform, device_type = "Android", "mobile" else: platform, device_type = "Android", "tablet" elif "macintosh" in ua or "mac os" in ua: platform, device_type = "macOS", "browser" elif "windows" in ua: platform, device_type = "Windows", "browser" elif "linux" in ua: platform, device_type = "Linux", "browser" else: platform, device_type = "Unbekannt", "browser" if "electron" in ua or "cursor" in ua: device_type = "desktop" browser = "Browser" if "edg/" in ua: browser = "Edge" elif "chrome" in ua and "chromium" not in ua: browser = "Chrome" elif "firefox" in ua: browser = "Firefox" elif "safari" in ua and "chrome" not in ua: browser = "Safari" device_name = f"{browser} auf {platform}" if device_type == "desktop": device_name = f"Desktop-App auf {platform}" elif device_type in ("mobile", "tablet"): device_name = f"{platform} {device_type.capitalize()}" return { "device_name": device_name, "platform": platform, "device_type": device_type, } def _make_device_id(user_id: str, user_agent: str) -> str: raw = f"{user_id}:{user_agent}" return hashlib.sha256(raw.encode()).hexdigest()[:12] def _register_or_update_device(device_id: str, user_id: str, practice_id: str, user_agent: str, ip_addr: str): devices = _load_devices() now = time.strftime("%Y-%m-%d %H:%M:%S") info = _parse_device_info(user_agent) if device_id in devices: dev = devices[device_id] dev["last_active"] = now dev["ip_last"] = ip_addr dev["user_agent"] = user_agent dev["device_name"] = info["device_name"] dev["platform"] = info["platform"] dev["device_type"] = info["device_type"] else: devices[device_id] = { "device_id": device_id, "user_id": user_id, "practice_id": practice_id, "device_name": info["device_name"], "platform": info["platform"], "device_type": info["device_type"], "user_agent": user_agent, "first_seen": now, "last_active": now, "trust_status": "trusted", "ip_last": ip_addr, } _save_devices(devices) def _extract_client_ip(request: Request) -> str: forwarded = request.headers.get("X-Forwarded-For", "") if forwarded: return forwarded.split(",")[0].strip() if request.client: return request.client.host return "" # ===================================================================== # Messages (practice-scoped) # ===================================================================== EMPFANG_MESSAGE_RETENTION_DAYS = 14 def _msg_timestamp_for_retention(m: dict) -> str: return (m.get("empfangen") or m.get("zeitstempel") or "").strip() def _message_within_retention(m: dict, cutoff_str: str) -> bool: """Behalten wenn Zeitstempel unbekannt oder >= cutoff (ISO-artige Strings).""" t = _msg_timestamp_for_retention(m) if not t: return True return t >= cutoff_str def _prune_messages_by_retention(messages: list[dict]) -> tuple[list[dict], int]: cutoff_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time() - EMPFANG_MESSAGE_RETENTION_DAYS * 86400), ) kept = [m for m in messages if _message_within_retention(m, cutoff_str)] return kept, len(messages) - len(kept) def _load_messages() -> list[dict]: messages = _load_json(_EMPFANG_FILE, []) kept, removed = _prune_messages_by_retention(messages) if removed > 0: _save_messages(kept) return kept def _save_messages(messages: list[dict]): _save_json(_EMPFANG_FILE, messages) def _msg_practice(m: dict) -> str: return m.get("practice_id") or _LEGACY_DEFAULT_PID def _filter_by_practice(messages: list[dict], pid: str) -> list[dict]: return [m for m in messages if _msg_practice(m) == pid] # ===================================================================== # Tasks (practice-scoped, server-side) # ===================================================================== def _load_tasks() -> list[dict]: return _load_json(_TASKS_FILE, []) def _save_tasks(tasks: list[dict]): _save_json(_TASKS_FILE, tasks) # ===================================================================== # Channels (Kanaele, practice-scoped) # ===================================================================== def _load_channels() -> list[dict]: return _load_json(_CHANNELS_FILE, []) def _save_channels(channels: list[dict]): _save_json(_CHANNELS_FILE, channels) _DEFAULT_CHANNEL_DEFS = [ {"name": "Allgemein", "scope": "internal", "channel_type": "group", "allowed_roles": []}, {"name": "Aerzte", "scope": "internal", "channel_type": "group", "allowed_roles": ["arzt", "admin"]}, {"name": "MPA", "scope": "internal", "channel_type": "group", "allowed_roles": ["mpa", "admin"]}, {"name": "Empfang", "scope": "internal", "channel_type": "group", "allowed_roles": ["empfang", "admin"]}, ] def _ensure_default_channels(practice_id: str): channels = _load_channels() practice_channels = [c for c in channels if c.get("practice_id") == practice_id] if practice_channels: return now = time.strftime("%Y-%m-%d %H:%M:%S") for defn in _DEFAULT_CHANNEL_DEFS: channels.append({ "channel_id": uuid.uuid4().hex[:12], "practice_id": practice_id, "name": defn["name"], "scope": defn["scope"], "channel_type": defn["channel_type"], "allowed_roles": defn["allowed_roles"], "connection_id": "", "created": now, "created_by": "", }) _save_channels(channels) # ===================================================================== # Connections / Federation (Praxis-zu-Praxis) # ===================================================================== def _load_connections() -> list[dict]: return _load_json(_CONNECTIONS_FILE, []) def _save_connections(conns: list[dict]): _save_json(_CONNECTIONS_FILE, conns) # ===================================================================== # AUTH ENDPOINTS # ===================================================================== @router.post("/auth/setup") async def auth_setup(request: Request): """Erstellt den ersten Admin-Benutzer und eine neue Praxis.""" try: body = await request.json() except Exception: body = {} name = (body.get("name") or "").strip() password = (body.get("password") or "").strip() practice_name = (body.get("practice_name") or "").strip() or "Meine Praxis" admin_email = (body.get("email") or "").strip() pid = (body.get("practice_id") or "").strip() if not name or not password or len(password) < 4: raise HTTPException(status_code=400, detail="Name und Passwort (min. 4 Zeichen) erforderlich") if not pid: pid = _generate_practice_id() practice = _ensure_practice(pid, name=practice_name, admin_email=admin_email) accounts = _load_accounts() practice_accounts = [a for a in accounts.values() if a.get("practice_id") == pid] if practice_accounts: raise HTTPException(status_code=409, detail="Setup bereits abgeschlossen. Bitte Login verwenden.") if practice_name: practices = _load_practices() practices[pid]["name"] = practice_name if admin_email: practices[pid]["admin_email"] = admin_email _save_practices(practices) practice = practices[pid] uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(password) now = time.strftime("%Y-%m-%d %H:%M:%S") accounts[uid] = { "user_id": uid, "practice_id": pid, "display_name": name, "email": admin_email, "role": "admin", "pw_hash": pw_hash, "pw_salt": pw_salt, "created": now, "status": "active", "last_login": now, } _save_accounts(accounts) _ensure_default_channels(pid) ua = request.headers.get("User-Agent", "") ip = _extract_client_ip(request) dev_id = _make_device_id(uid, ua) token = _create_session(uid, pid, name, "admin", device_id=dev_id, user_agent=ua, ip_addr=ip) resp = JSONResponse(content={ "success": True, "user_id": uid, "role": "admin", "display_name": name, "practice_id": pid, "practice_name": practice.get("name", ""), "invite_code": practice.get("invite_code", ""), }) resp.set_cookie("aza_session", token, httponly=True, samesite="lax", max_age=SESSION_MAX_AGE) return resp @router.post("/auth/login") async def auth_login(request: Request): """Login mit Benutzername (Anzeigename) oder E-Mail + Passwort, mandantenbewusst.""" try: body = await request.json() except Exception: body = {} raw = (body.get("name") or "").strip() password = (body.get("password") or "").strip() pid, pid_src = _practice_id_for_login(request, body) if pid_src == "invite_invalid": raise HTTPException( status_code=403, detail="Ungueltiger Einladungscode — bitte aktuellen Link aus der Hauptinstallation verwenden.", ) if not raw or not password: raise HTTPException( status_code=400, detail="Benutzername oder E-Mail und Passwort erforderlich", ) accounts = _load_accounts() scoped = ( [a for a in accounts.values() if a.get("practice_id") == pid] if pid else list(accounts.values()) ) target = None if _is_likely_email(raw): em = _norm_email(raw) matches = [ a for a in scoped if em and _norm_email(a.get("email") or "") == em ] if len(matches) == 1: target = matches[0] elif len(matches) == 0: target = None else: if pid: raise HTTPException( status_code=409, detail={ "code": "ambiguous_email", "message": ( "Mehrere Benutzer mit dieser E-Mail in dieser Praxis. " "Bitte melden Sie sich mit Ihrem Benutzernamen an." ), "candidates": [ {"display_name": (a.get("display_name") or "")} for a in matches ], }, ) raise HTTPException( status_code=409, detail={ "code": "ambiguous_email", "message": ( "Diese E-Mail ist mehreren Konten zugeordnet. " "Bitte melden Sie sich mit Ihrem Benutzernamen an." ), }, ) else: matches = [a for a in scoped if (a.get("display_name") or "") == raw] if len(matches) == 1: target = matches[0] elif len(matches) == 0: target = None else: raise HTTPException( status_code=409, detail={ "code": "ambiguous_username", "message": ( "Dieser Benutzername ist mehrdeutig. Bitte wenden Sie sich an Ihre Praxis." ), }, ) if not target: raise HTTPException( status_code=401, detail="Benutzer nicht gefunden oder falsches Passwort", ) tpid = (target.get("practice_id") or "").strip() if pid and tpid != pid: raise HTTPException( status_code=403, detail=( "Anmeldung passt nicht zur gewaehlten Praxis (Einladungscode / gespeicherte Praxis-ID). " "Bitte den Einladungslink der Hauptinstallation erneut oeffnen." ), ) if not pid: pid = tpid if target.get("status") == "deactivated": raise HTTPException( status_code=403, detail="Konto deaktiviert. Bitte Administrator kontaktieren.", ) if not _verify_password(password, target["pw_hash"], target["pw_salt"]): raise HTTPException( status_code=401, detail="Benutzer nicht gefunden oder falsches Passwort", ) now = time.strftime("%Y-%m-%d %H:%M:%S") target["last_login"] = now _save_accounts(accounts) dn = (target.get("display_name") or raw).strip() ua = request.headers.get("User-Agent", "") ip = _extract_client_ip(request) dev_id = body.get("device_id") or _make_device_id(target["user_id"], ua) token = _create_session( target["user_id"], pid, dn, target["role"], device_id=dev_id, user_agent=ua, ip_addr=ip, ) bind_src = ( "invite_code" if pid_src == "invite" else ( "stored_practice_id" if pid_src in ("body", "header", "query") else "account" ) ) result = { "success": True, "user_id": target["user_id"], "role": target["role"], "display_name": dn, "practice_id": pid, "practice_bind_source": bind_src, } if target.get("must_change_password"): result["must_change_password"] = True resp = JSONResponse(content=result) resp.set_cookie( "aza_session", token, httponly=True, samesite="lax", max_age=SESSION_MAX_AGE, ) return resp @router.post("/auth/register") async def auth_register(request: Request): """Neuen Benutzer registrieren mit Einladungscode.""" try: body = await request.json() except Exception: body = {} invite_code = (body.get("invite_code") or "").strip() name = (body.get("name") or "").strip() password = (body.get("password") or "").strip() role = (body.get("role") or "mpa").strip() email = (body.get("email") or "").strip() if not invite_code or not name or not password or len(password) < 4: raise HTTPException(status_code=400, detail="Einladungscode, Name und Passwort (min. 4 Zeichen) erforderlich") if role not in ("arzt", "mpa", "empfang"): role = "mpa" target_pid = _lookup_practice_id_by_invite(invite_code) if not target_pid: raise HTTPException(status_code=403, detail="Ungueltiger Einladungscode") accounts = _load_accounts() exists = any(a["display_name"] == name and a.get("practice_id") == target_pid for a in accounts.values()) if exists: raise HTTPException(status_code=409, detail="Benutzername bereits vergeben") uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(password) now = time.strftime("%Y-%m-%d %H:%M:%S") accounts[uid] = { "user_id": uid, "practice_id": target_pid, "display_name": name, "email": email, "role": role, "pw_hash": pw_hash, "pw_salt": pw_salt, "created": now, "status": "active", "last_login": now, } _save_accounts(accounts) _ensure_default_channels(target_pid) ua = request.headers.get("User-Agent", "") ip = _extract_client_ip(request) dev_id = _make_device_id(uid, ua) token = _create_session(uid, target_pid, name, role, device_id=dev_id, user_agent=ua, ip_addr=ip) resp = JSONResponse(content={ "success": True, "user_id": uid, "role": role, "display_name": name, "practice_id": target_pid, "practice_bind_source": "invite_code", }) resp.set_cookie("aza_session", token, httponly=True, samesite="lax", max_age=SESSION_MAX_AGE) return resp @router.get("/auth/me") async def auth_me(request: Request): """Aktuelle Session pruefen. Liefert User-Daten oder 401.""" s = _session_from_request(request) if not s: return JSONResponse(status_code=401, content={"authenticated": False}) return JSONResponse(content={ "authenticated": True, "user_id": s["user_id"], "display_name": s["display_name"], "role": s["role"], "practice_id": s["practice_id"], }) @router.post("/auth/logout") async def auth_logout(request: Request): token = request.cookies.get("aza_session", "") s = _session_from_request(request) if s: try: _presence_clear_user( (s.get("practice_id") or "").strip(), (s.get("user_id") or "").strip(), ) except Exception: pass _delete_session(token) resp = JSONResponse(content={"success": True}) resp.delete_cookie("aza_session") return resp @router.get("/auth/resolve_invite") async def auth_resolve_invite(code: str = Query("")): """Loesst einen Chat-Einladungscode in practice_id auf (ohne Login). Fuer Browser-Start mit ?invite=.""" raw = (code or "").strip() if not raw: return JSONResponse(content={"valid": False}) pid = _lookup_practice_id_by_invite(raw) if not pid: return JSONResponse( content={"valid": False, "detail": "Ungueltiger oder veralteter Einladungscode"}, ) practices = _load_practices() pdata = practices.get(pid, {}) return JSONResponse(content={ "valid": True, "practice_id": pid, "practice_name": (pdata.get("name") or "").strip(), "invite_code": (pdata.get("invite_code") or "").strip(), }) @router.post("/auth/regenerate_invite") async def auth_regenerate_invite(request: Request): """Erzeugt einen neuen Chat-Einladungscode (Admin-Session oder API-Token).""" api_token = request.headers.get("X-API-Token", "") s = _session_from_request(request) if s: if s.get("role") != "admin": raise HTTPException(status_code=403, detail="Nur Admin darf Einladungscode erneuern") pid = s["practice_id"] elif api_token: pid = request.headers.get("X-Practice-Id", "").strip() if not pid: raise HTTPException(status_code=400, detail="X-Practice-Id Header erforderlich") else: raise HTTPException(status_code=401, detail="Nicht authentifiziert") _ensure_practice(pid) practices = _load_practices() if pid in practices: practices[pid]["invite_code"] = _generate_chat_invite_code() _save_practices(practices) return JSONResponse(content={ "success": True, "invite_code": practices.get(pid, {}).get("invite_code", ""), }) @router.post("/auth/provision") async def auth_provision(request: Request): """Provisioning: Desktop-App erstellt/findet Server-Account. Authentifiziert via X-API-Token (Backend-Token), nicht via Session. Erstellt bei Bedarf eine neue Praxis mit echter practice_id.""" api_token = request.headers.get("X-API-Token", "") if not api_token: raise HTTPException(status_code=401, detail="API-Token erforderlich") try: body = await request.json() except Exception: body = {} name = (body.get("name") or "").strip() email = (body.get("email") or "").strip() password = (body.get("password") or "").strip() practice_name = (body.get("practice_name") or "").strip() invite_code_in = (body.get("invite_code") or "").strip() if not name: raise HTTPException(status_code=400, detail="Name erforderlich") if not password or len(password) < 4: raise HTTPException(status_code=400, detail="Passwort (min. 4 Zeichen) erforderlich") pid = "" # Bewusster Beitritt zu einem bestehenden Praxis-Chat per Einladungscode: # ueberschreibt X-Practice-Id / practice_id im Body — sonst legt jedes neue # Geraet ohne gespeicherte practice_id eine eigene Praxis an (Realbefund). if invite_code_in: practices = _load_practices() want = _invite_code_key(invite_code_in) target_pid = None for pida, pdata in practices.items(): if _invite_code_key(pdata.get("invite_code")) == want: target_pid = pida break if not target_pid: raise HTTPException( status_code=403, detail="Ungueltiger Chat-Einladungscode — Praxis nicht gefunden.", ) pid = target_pid else: pid = request.headers.get("X-Practice-Id", "").strip() pid = pid or (body.get("practice_id") or "").strip() if not pid and email: try: from stripe_routes import lookup_practice_id_for_license_email lp = lookup_practice_id_for_license_email(email) if lp: pid = lp.strip() except Exception as exc: print(f"[EMPFANG] lookup_practice_id_for_license_email: {exc}") if not pid: practices = _load_practices() has_legacy = _LEGACY_DEFAULT_PID in practices accounts = _load_accounts() has_legacy_accounts = any( a.get("practice_id") == _LEGACY_DEFAULT_PID for a in accounts.values()) if has_legacy or has_legacy_accounts: pid = _generate_practice_id() _migrate_legacy_to_practice(pid) else: pid = _generate_practice_id() practice = _ensure_practice(pid, name=practice_name or "Meine Praxis", admin_email=email) if practice_name: practices = _load_practices() practices[pid]["name"] = practice_name if email: practices[pid]["admin_email"] = email _save_practices(practices) accounts = _load_accounts() target = None email_lower = email.lower() if email else "" for a in accounts.values(): if a.get("practice_id") != pid: continue if email_lower and (a.get("email") or "").strip().lower() == email_lower: target = a break if a["display_name"] == name: target = a break if target: pw_hash, pw_salt = _hash_password(password) target["pw_hash"] = pw_hash target["pw_salt"] = pw_salt if email: target["email"] = email target["display_name"] = name if not target.get("role") or target.get("role") == "mpa": has_admin = any(a.get("role") == "admin" and a.get("practice_id") == pid for a in accounts.values()) if not has_admin: target["role"] = "admin" _save_accounts(accounts) return JSONResponse(content={ "success": True, "user_id": target["user_id"], "display_name": target["display_name"], "role": target["role"], "practice_id": pid, "action": "updated", }) has_admin = any(a.get("role") == "admin" and a.get("practice_id") == pid for a in accounts.values()) role = "admin" if not has_admin else "arzt" uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(password) accounts[uid] = { "user_id": uid, "practice_id": pid, "display_name": name, "email": email, "role": role, "pw_hash": pw_hash, "pw_salt": pw_salt, "status": "active", "created": time.strftime("%Y-%m-%d %H:%M:%S"), } _save_accounts(accounts) return JSONResponse(content={ "success": True, "user_id": uid, "display_name": name, "role": role, "practice_id": pid, "action": "created", }) @router.post("/auth/forgot_password") async def auth_forgot_password(request: Request): """Passwort-Reset: Benutzername oder E-Mail; bei mehreren Konten mit gleicher E-Mail zweiter Schritt.""" try: body = await request.json() except Exception: body = {} raw = (body.get("login") or body.get("email") or body.get("name") or "").strip() chosen_display_name = (body.get("display_name") or body.get("username") or "").strip() pid = _practice_id_from_client(request, body) if not raw: raise HTTPException( status_code=400, detail="Benutzername oder E-Mail erforderlich", ) accounts = _load_accounts() practices = _load_practices() _neutral = { "success": True, "step": "sent", "message": ( "Wenn ein passendes Konto existiert, wurde ein Link an die hinterlegte " "E-Mail-Adresse gesendet." ), } # Zweiter Schritt: E-Mail + gewählter Benutzername (eindeutig) if chosen_display_name and _is_likely_email(raw): if not pid: raise HTTPException( status_code=400, detail=( "Praxis-Kontext fehlt. Bitte laden Sie die Seite neu oder " "kontaktieren Sie Ihre Praxis." ), ) em = _norm_email(raw) scoped = [a for a in accounts.values() if a.get("practice_id") == pid] picked = [ a for a in scoped if _norm_email(a.get("email") or "") == em and (a.get("display_name") or "") == chosen_display_name ] if len(picked) == 1: return JSONResponse(content=_send_reset_for_account(picked[0])) return JSONResponse(content=_neutral) if _is_likely_email(raw): em = _norm_email(raw) if pid: scoped = [a for a in accounts.values() if a.get("practice_id") == pid] matches = [a for a in scoped if _norm_email(a.get("email") or "") == em] else: matches = [ a for a in accounts.values() if _norm_email(a.get("email") or "") == em ] if len(matches) == 0: return JSONResponse(content=_neutral) if len(matches) == 1: return JSONResponse(content=_send_reset_for_account(matches[0])) cands = [] for a in matches: pida = a.get("practice_id", "") cands.append({ "display_name": a.get("display_name", ""), "practice_id": pida, "practice_name": _practice_label(practices, pida), }) return JSONResponse( content={ "success": True, "step": "pick_user", "login": raw, "candidates": cands, }, ) if pid: scoped = [a for a in accounts.values() if a.get("practice_id") == pid] matches = [a for a in scoped if (a.get("display_name") or "") == raw] else: matches = [ a for a in accounts.values() if (a.get("display_name") or "") == raw ] if len(matches) == 0: return JSONResponse(content=_neutral) if len(matches) > 1: return JSONResponse( content={ "success": False, "step": "ambiguous_practice", "message": ( "Dieser Benutzername ist in mehreren Praxen registriert. Bitte " "setzen Sie das Passwort über Ihre E-Mail-Adresse zurück oder " "wenden Sie sich an Ihre Praxis." ), }, ) return JSONResponse(content=_send_reset_for_account(matches[0])) @router.get("/auth/reset_verify") async def auth_reset_verify(reset_token: str = Query("")): """Prüft, ob ein Reset-Token noch gültig ist (ohne Verbrauch).""" token = (reset_token or "").strip() if not token: return JSONResponse( content={"valid": False, "detail": "Kein Reset-Token angegeben."} ) resets = _load_json(_DATA_DIR / "empfang_resets.json", {}) entry = resets.get(token) if not entry: return JSONResponse( content={"valid": False, "detail": "Ungültiger oder abgelaufener Link."} ) if time.time() - entry.get("created", 0) > 3600: return JSONResponse( content={ "valid": False, "detail": "Der Link ist abgelaufen (max. 1 Stunde).", } ) email = (entry.get("email") or "").strip() accounts = _load_accounts() uid = entry.get("user_id") acc = accounts.get(uid) if uid else None display_name = (entry.get("display_name") or (acc or {}).get("display_name") or "").strip() return JSONResponse( content={"valid": True, "email": email, "display_name": display_name} ) @router.post("/auth/reset_password") async def auth_reset_password(request: Request): """Setzt das Passwort mit einem gültigen Reset-Token.""" try: body = await request.json() except Exception: body = {} token = (body.get("reset_token") or "").strip() new_password = (body.get("password") or "").strip() if not token or not new_password or len(new_password) < 4: raise HTTPException(status_code=400, detail="Reset-Token und neues Passwort (min. 4 Zeichen) erforderlich") resets = _load_json(_DATA_DIR / "empfang_resets.json", {}) entry = resets.get(token) if not entry: raise HTTPException( status_code=400, detail="Ungültiger oder abgelaufener Reset-Link" ) if time.time() - entry.get("created", 0) > 3600: del resets[token] _save_json(_DATA_DIR / "empfang_resets.json", resets) raise HTTPException( status_code=400, detail="Reset-Link ist abgelaufen (max. 1 Stunde)" ) user_id = entry["user_id"] accounts = _load_accounts() if user_id not in accounts: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") pw_hash, pw_salt = _hash_password(new_password) accounts[user_id]["pw_hash"] = pw_hash accounts[user_id]["pw_salt"] = pw_salt accounts[user_id].pop("must_change_password", None) _save_accounts(accounts) del resets[token] _save_json(_DATA_DIR / "empfang_resets.json", resets) acc = accounts[user_id] email_hint = (acc.get("email") or "").strip() dn_hint = (acc.get("display_name") or "").strip() return JSONResponse( content={ "success": True, "message": "Passwort wurde erfolgreich geändert.", "email": email_hint, "display_name": dn_hint, } ) def _reset_email_subject_body(display_name: str, reset_link: str) -> Tuple[str, str, str]: """Betreff, Plain-Text, HTML für Passwort-Reset.""" subject = "AZA Praxis-Chat – Passwort zurücksetzen" text = ( f"Hallo {display_name},\n\n" f"Sie haben eine Passwort-Zurücksetzung angefordert.\n\n" f"Klicken Sie auf diesen Link, um Ihr Passwort neu zu setzen:\n" f"{reset_link}\n\n" f"Der Link ist 1 Stunde gültig.\n\n" f"Falls Sie diese Anfrage nicht gestellt haben, ignorieren Sie diese E-Mail.\n\n" f"AZA Praxis-Chat" ) html = ( f"
" f"

Passwort zurücksetzen

" f"

Hallo {display_name},

" f"

Sie haben eine Passwort-Zurücksetzung angefordert.

" f"

Neues Passwort wählen

" f"

Der Link ist 1 Stunde gültig.

" f"

Falls Sie diese Anfrage nicht gestellt haben, " f"ignorieren Sie diese E-Mail.

" ) return subject, text, html def _send_reset_via_resend(to_email: str, subject: str, text: str, html: str) -> bool: """Resend HTTP API (gleiche Umgebung wie Lizenz-Mail in stripe_routes).""" import json import urllib.error import urllib.request api_key = os.environ.get("RESEND_API_KEY", "").strip() sender = os.environ.get("MAIL_FROM", "AZA MedWork ").strip() if not api_key: return False payload = json.dumps({ "from": sender, "to": [to_email], "subject": subject, "html": html, "text": text, }).encode("utf-8") req = urllib.request.Request( "https://api.resend.com/emails", data=payload, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "User-Agent": "AZA-MedWork/1.0", }, method="POST", ) try: with urllib.request.urlopen(req, timeout=15) as resp: if resp.status in (200, 201): print(f"[RESET-MAIL] Resend OK -> {to_email}") return True body = resp.read().decode()[:300] print(f"[RESET-MAIL] Resend HTTP {resp.status}: {body}") return False except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace")[:300] if exc.fp else "" print(f"[RESET-MAIL] Resend HTTP {exc.code}: {body}") return False except Exception as exc: print(f"[RESET-MAIL] Resend {type(exc).__name__}: {exc}") return False def _send_reset_email(to_email: str, display_name: str, reset_link: str): """Sendet Passwort-Reset: zuerst SMTP (falls vollstaendig), sonst Resend API.""" import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart subject, text, html = _reset_email_subject_body(display_name, reset_link) host = os.environ.get("SMTP_HOST", "").strip() port_str = os.environ.get("SMTP_PORT", "587").strip() user = os.environ.get("SMTP_USER", "").strip() password = os.environ.get("SMTP_PASS", "").strip() sender = os.environ.get("SMTP_FROM", "").strip() or user if all([host, user, password]): try: msg = MIMEMultipart("alternative") msg["From"] = sender msg["To"] = to_email msg["Subject"] = subject msg.attach(MIMEText(text, "plain", "utf-8")) msg.attach(MIMEText(html, "html", "utf-8")) port = int(port_str) if port == 465: with smtplib.SMTP_SSL(host, port, timeout=15) as srv: srv.login(user, password) srv.sendmail(sender, [to_email], msg.as_string()) else: with smtplib.SMTP(host, port, timeout=15) as srv: srv.ehlo() srv.starttls() srv.ehlo() srv.login(user, password) srv.sendmail(sender, [to_email], msg.as_string()) print(f"[RESET-MAIL] SMTP OK -> {to_email}") return except Exception as exc: print(f"[RESET-MAIL] SMTP FEHLER: {exc} – versuche Resend …") if _send_reset_via_resend(to_email, subject, text, html): return print( "[RESET-MAIL] Weder SMTP noch Resend erfolgreich. " "Setzen Sie SMTP_HOST/SMTP_USER/SMTP_PASS oder RESEND_API_KEY (+ MAIL_FROM). " f"Reset-Link (nur Server-Log): {reset_link}" ) @router.get("/auth/needs_setup") async def auth_needs_setup(request: Request): """Pruefen ob Setup noetig ist (keine Accounts vorhanden).""" pid = _resolve_practice_id(request) if not pid: accounts = _load_accounts() return JSONResponse(content={ "needs_setup": len(accounts) == 0, "invite_code": "", }) _ensure_practice(pid) accounts = _load_accounts() has_accounts = any(a.get("practice_id") == pid for a in accounts.values()) practices = _load_practices() invite_code = practices.get(pid, {}).get("invite_code", "") return JSONResponse(content={ "needs_setup": not has_accounts, "invite_code": invite_code if not has_accounts else "", }) # ===================================================================== # ADMIN ENDPOINTS (nur Rolle admin) # ===================================================================== def _require_admin(request: Request) -> dict: s = _require_session(request) if s.get("role") != "admin": raise HTTPException(status_code=403, detail="Admin-Berechtigung erforderlich") return s @router.get("/admin/users") async def admin_list_users(request: Request): """Alle Benutzer der Praxis mit vollen Details.""" s = _require_admin(request) pid = s["practice_id"] accounts = _load_accounts() result = [] for a in accounts.values(): if a.get("practice_id") != pid: continue result.append({ "user_id": a["user_id"], "display_name": a["display_name"], "role": a.get("role", "mpa"), "status": a.get("status", "active"), "created": a.get("created", ""), "last_login": a.get("last_login", ""), "email": a.get("email", ""), }) return JSONResponse(content={"success": True, "users": result}) @router.post("/admin/users/{user_id}/role") async def admin_change_role(user_id: str, request: Request): """Rolle eines Benutzers aendern.""" s = _require_admin(request) try: body = await request.json() except Exception: body = {} new_role = (body.get("role") or "").strip() if not new_role: raise HTTPException(status_code=400, detail="Rolle erforderlich") if new_role not in ("admin", "arzt", "mpa", "empfang"): raise HTTPException(status_code=400, detail="Ungueltige Rolle") if user_id == s["user_id"] and new_role != "admin": raise HTTPException(status_code=400, detail="Eigene Admin-Rolle kann nicht entfernt werden") accounts = _load_accounts() if user_id not in accounts: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") if accounts[user_id].get("practice_id") != s["practice_id"]: raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis") accounts[user_id]["role"] = new_role _save_accounts(accounts) return JSONResponse(content={"success": True, "user_id": user_id, "role": new_role}) @router.post("/admin/users/{user_id}/deactivate") async def admin_deactivate_user(user_id: str, request: Request): """Benutzer deaktivieren und alle Sessions loeschen.""" s = _require_admin(request) if user_id == s["user_id"]: raise HTTPException(status_code=400, detail="Eigenes Konto kann nicht deaktiviert werden") accounts = _load_accounts() if user_id not in accounts: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") if accounts[user_id].get("practice_id") != s["practice_id"]: raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis") accounts[user_id]["status"] = "deactivated" _save_accounts(accounts) sessions = _load_sessions() sessions = {k: v for k, v in sessions.items() if v.get("user_id") != user_id} _save_sessions(sessions) return JSONResponse(content={"success": True, "user_id": user_id, "status": "deactivated"}) @router.post("/admin/users/{user_id}/activate") async def admin_activate_user(user_id: str, request: Request): """Benutzer reaktivieren.""" s = _require_admin(request) accounts = _load_accounts() if user_id not in accounts: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") if accounts[user_id].get("practice_id") != s["practice_id"]: raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis") accounts[user_id]["status"] = "active" _save_accounts(accounts) return JSONResponse(content={"success": True, "user_id": user_id, "status": "active"}) @router.delete("/admin/users/{user_id}") async def admin_delete_user(user_id: str, request: Request): """Benutzer permanent loeschen inkl. Sessions und Geraete.""" s = _require_admin(request) if user_id == s["user_id"]: raise HTTPException(status_code=400, detail="Eigenes Konto kann nicht geloescht werden") accounts = _load_accounts() if user_id not in accounts: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") if accounts[user_id].get("practice_id") != s["practice_id"]: raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis") acc_del = accounts.get(user_id) if acc_del and acc_del.get("role") == "admin": other_adm = [ a for a in accounts.values() if a.get("practice_id") == s["practice_id"] and a.get("user_id") != user_id and a.get("role") == "admin" ] if not other_adm: raise HTTPException( status_code=400, detail="Letzter Administrator kann nicht geloescht werden.", ) del accounts[user_id] _save_accounts(accounts) sessions = _load_sessions() sessions = {k: v for k, v in sessions.items() if v.get("user_id") != user_id} _save_sessions(sessions) devices = _load_devices() devices = {k: v for k, v in devices.items() if v.get("user_id") != user_id} _save_devices(devices) return JSONResponse(content={"success": True, "deleted": user_id}) @router.post("/admin/users/{user_id}/reset_password") async def admin_reset_password(user_id: str, request: Request): """Temporaeres Passwort generieren. Benutzer muss es beim naechsten Login aendern.""" s = _require_admin(request) accounts = _load_accounts() if user_id not in accounts: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") if accounts[user_id].get("practice_id") != s["practice_id"]: raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis") temp_pw = secrets.token_urlsafe(8) pw_hash, pw_salt = _hash_password(temp_pw) accounts[user_id]["pw_hash"] = pw_hash accounts[user_id]["pw_salt"] = pw_salt accounts[user_id]["must_change_password"] = True _save_accounts(accounts) return JSONResponse(content={ "success": True, "user_id": user_id, "temp_password": temp_pw, }) @router.get("/admin/devices") async def admin_list_devices(request: Request): """Alle Geraete aller Benutzer der Praxis.""" s = _require_admin(request) pid = s["practice_id"] devices = _load_devices() accounts = _load_accounts() user_names = {a["user_id"]: a["display_name"] for a in accounts.values()} result = [] for d in devices.values(): if d.get("practice_id") != pid: continue entry = dict(d) entry["user_name"] = user_names.get(d.get("user_id"), d.get("user_id", "")) result.append(entry) result.sort(key=lambda d: d.get("last_active", ""), reverse=True) return JSONResponse(content={"success": True, "devices": result}) @router.post("/admin/devices/{device_id}/block") async def admin_block_device(device_id: str, request: Request): """Geraet blockieren und zugehoerige Sessions loeschen.""" s = _require_admin(request) devices = _load_devices() if device_id not in devices: raise HTTPException(status_code=404, detail="Geraet nicht gefunden") dev = devices[device_id] if dev.get("practice_id") != s["practice_id"]: raise HTTPException(status_code=403, detail="Geraet gehoert zu anderer Praxis") dev["trust_status"] = "blocked" _save_devices(devices) sessions = _load_sessions() sessions = {k: v for k, v in sessions.items() if v.get("device_id") != device_id} _save_sessions(sessions) return JSONResponse(content={"success": True, "device_id": device_id, "trust_status": "blocked"}) @router.delete("/admin/devices/{device_id}") async def admin_delete_device(device_id: str, request: Request): """Geraetedatensatz loeschen.""" s = _require_admin(request) devices = _load_devices() if device_id not in devices: raise HTTPException(status_code=404, detail="Geraet nicht gefunden") if devices[device_id].get("practice_id") != s["practice_id"]: raise HTTPException(status_code=403, detail="Geraet gehoert zu anderer Praxis") del devices[device_id] _save_devices(devices) return JSONResponse(content={"success": True, "deleted": device_id}) @router.post("/admin/users/{user_id}/logout_all") async def admin_logout_all(user_id: str, request: Request): """Alle Sessions eines Benutzers loeschen.""" s = _require_admin(request) accounts = _load_accounts() if user_id not in accounts: raise HTTPException(status_code=404, detail="Benutzer nicht gefunden") if accounts[user_id].get("practice_id") != s["practice_id"]: raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis") sessions = _load_sessions() removed = sum(1 for v in sessions.values() if v.get("user_id") == user_id) sessions = {k: v for k, v in sessions.items() if v.get("user_id") != user_id} _save_sessions(sessions) return JSONResponse(content={"success": True, "user_id": user_id, "sessions_removed": removed}) # ===================================================================== # USER MANAGEMENT (admin only for invite/role changes) # ===================================================================== def _attach_devices_to_practice_users(users: list, pid: str) -> None: """Last aktiv / Presence pro Benutzer; identisch zum frueheren API-Token-Zweig.""" devices = _load_devices() user_devices: dict[str, list] = {} for d in devices.values(): if d.get("practice_id") != pid: continue uid = d.get("user_id", "") user_devices.setdefault(uid, []).append({ "device_name": d.get("device_name", ""), "platform": d.get("platform", ""), "last_active": d.get("last_active", ""), "ip_last": d.get("ip_last", ""), }) for u in users: u["devices"] = user_devices.get(u.get("user_id", ""), []) def _attach_presence_to_practice_users(users: list, pid: str) -> None: """Reichert users_full mit serverseitiger Session-Presence an (RAM, TTL).""" pid = (pid or "").strip() if not pid: return for u in users: if not isinstance(u, dict): continue uid = str(u.get("user_id") or "").strip() snap = _presence_snapshot_for_user(pid, uid) u["presence_online"] = snap["presence_online"] u["presence_last_seen"] = snap["presence_last_seen"] u["presence_source"] = snap["presence_source"] u["presence_age_seconds"] = snap["presence_age_seconds"] @router.get("/users") async def empfang_users(request: Request): """Liefert Benutzer der Praxis. Offen fuer alle authentifizierten + Legacy.""" s = _session_from_request(request) if s: users = _practice_users(s["practice_id"]) _attach_devices_to_practice_users(users, s["practice_id"]) _attach_presence_to_practice_users(users, s["practice_id"]) return JSONResponse(content={ "users": [u["display_name"] for u in users], "users_full": users, "practice_id": s["practice_id"], }) api_token = request.headers.get("X-API-Token", "") pid = _resolve_practice_id(request) if not pid: return JSONResponse(content={"users": [], "practice_id": ""}) users = _practice_users(pid) if users: result: dict = { "users": [u["display_name"] for u in users], "practice_id": pid, } if api_token: _attach_devices_to_practice_users(users, pid) _attach_presence_to_practice_users(users, pid) result["users_full"] = users return JSONResponse(content=result) old_file = _DATA_DIR / "empfang_users.json" if old_file.is_file(): try: names = json.loads(old_file.read_text(encoding="utf-8")) if isinstance(names, list): return JSONResponse(content={"users": names, "practice_id": pid}) except Exception: pass return JSONResponse(content={"users": [], "practice_id": pid}) @router.post("/users") async def empfang_register_user(request: Request): """Legacy-kompatibel: Benutzer anlegen/umbenennen/loeschen.""" try: body = await request.json() except Exception: body = {} name = (body.get("name") or "").strip() action = (body.get("action") or "add").strip() pid = _resolve_practice_id(request) if not pid or not name: return JSONResponse(content={"success": False}) accounts = _load_accounts() if action == "delete": sess = _session_from_request(request) actor_name = (body.get("actor_display_name") or "").strip() to_del = [uid for uid, a in accounts.items() if a["display_name"] == name and a.get("practice_id") == pid] if not to_del: return JSONResponse(content={"success": False, "detail": "Benutzer nicht gefunden"}, status_code=404) for uid in list(to_del): acc = accounts.get(uid) if not acc: continue if sess and acc.get("user_id") == sess.get("user_id"): return JSONResponse( content={"success": False, "detail": "Eigenes Konto kann nicht geloescht werden"}, status_code=400, ) if actor_name and (acc.get("display_name") or "").strip() == actor_name: return JSONResponse( content={"success": False, "detail": "Der aktive Benutzer kann hier nicht geloescht werden"}, status_code=400, ) if acc.get("role") == "admin": others = [ a for a in accounts.values() if a.get("practice_id") == pid and a.get("role") == "admin" and a.get("user_id") != uid ] if not others: return JSONResponse( content={"success": False, "detail": "Letzter Administrator kann nicht geloescht werden"}, status_code=400, ) del accounts[uid] _save_accounts(accounts) elif action == "rename": new_name = (body.get("new_name") or "").strip() if new_name: for a in accounts.values(): if a["display_name"] == name and a.get("practice_id") == pid: a["display_name"] = new_name _save_accounts(accounts) elif action == "add_secure": s = _session_from_request(request) if not s: raise HTTPException(status_code=401, detail="Nicht angemeldet") sess_role = str(s.get("role") or "").strip().lower() if sess_role not in ("admin", "empfang"): raise HTTPException(status_code=403, detail="Keine Berechtigung Benutzer anzulegen") pid = str(s.get("practice_id") or "").strip() if not pid: return JSONResponse(content={"success": False, "detail": "Keine Praxis"}, status_code=400) pw = (body.get("password") or "").strip() pw2 = (body.get("password_repeat") or body.get("password2") or "").strip() if pw != pw2: return JSONResponse(content={"success": False, "detail": "Passwoerter stimmen nicht ueberein"}, status_code=400) if len(pw) < 4: return JSONResponse(content={"success": False, "detail": "Passwort mindestens 4 Zeichen"}, status_code=400) allowed_roles = {"mpa", "arzt"} if sess_role == "admin": allowed_roles.update({"admin", "empfang"}) role_new = str(body.get("role") or "mpa").strip().lower() if role_new not in allowed_roles: role_new = "mpa" exists = any(a["display_name"] == name and a.get("practice_id") == pid for a in accounts.values()) if not exists: uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(pw) accounts[uid] = { "user_id": uid, "practice_id": pid, "display_name": name, "role": role_new, "pw_hash": pw_hash, "pw_salt": pw_salt, "created": time.strftime("%Y-%m-%d %H:%M:%S"), "status": "active", "last_login": "", "email": "", } _save_accounts(accounts) else: return JSONResponse(content={"success": False, "detail": "Name bereits vergeben"}, status_code=409) else: exists = any(a["display_name"] == name and a.get("practice_id") == pid for a in accounts.values()) if not exists: uid = uuid.uuid4().hex[:12] pw_hash, pw_salt = _hash_password(name.lower()) accounts[uid] = { "user_id": uid, "practice_id": pid, "display_name": name, "role": "mpa", "pw_hash": pw_hash, "pw_salt": pw_salt, "created": time.strftime("%Y-%m-%d %H:%M:%S"), "status": "active", "last_login": "", "email": "", } _save_accounts(accounts) users = _practice_users(pid) return JSONResponse(content={ "success": True, "users": [u["display_name"] for u in users], "practice_id": pid, }) # ===================================================================== # MESSAGE ROUTES (practice-scoped) # ===================================================================== class EmpfangMessage(BaseModel): medikamente: str = "" therapieplan: str = "" procedere: str = "" kommentar: str = "" patient: str = "" absender: str = "" zeitstempel: str = "" practice_id: str = "" extras: dict = Field(default_factory=dict) @router.post("/send") async def empfang_send(msg: EmpfangMessage, request: Request): s = _session_from_request(request) pid = msg.practice_id.strip() or _resolve_practice_id(request) if not pid: raise HTTPException(status_code=400, detail="practice_id erforderlich") absender = msg.absender.strip() if s and not absender: absender = s["display_name"] # Strikte Direktchat-Validierung: wenn der Client explizit einen Direktchat # markiert (audience=direct ODER recipient_user_id gesetzt) MUSS die # Empfaenger-User-ID auf ein Konto in dieser Praxis aufloesbar sein. # Verhindert: stille Speicherung als Allgemein, wenn der Browser/Desktop # einen Direktchat anzeigt. _ex_in = msg.extras or {} _aud_in = str(_ex_in.get("audience") or "").strip().lower() _ru_in = str(_ex_in.get("recipient_user_id") or "").strip() _r_in = str(_ex_in.get("recipient") or "").strip() _is_multi_in = ( isinstance(_ex_in.get("recipients"), list) and len(_ex_in.get("recipients") or []) >= 2 ) or ("," in _r_in and _r_in.count(",") >= 1) _claims_dm = (_aud_in == "direct") or (_ru_in and not _is_multi_in) if _claims_dm: _acc = _accounts_by_practice(pid) _resolved_ru = _ru_in if _ru_in in _acc else ( _resolve_user_uid_in_practice(pid, _r_in) if _r_in else "" ) _r_lower = _r_in.lower() if not _resolved_ru or _r_lower in ("alle", "all", "allgemein", "an alle"): raise HTTPException( status_code=400, detail=( "Direktchat konnte technisch nicht eindeutig zugeordnet " "werden (recipient_user_id fehlt oder ist ungueltig). " "Bitte den Empfaenger erneut auswaehlen." ), ) _sess_uid_chk = (s.get("user_id") if s else "") or "" _claim_su_chk = str(_ex_in.get("sender_user_id") or "").strip() if _sess_uid_chk: pass elif _claim_su_chk and _claim_su_chk in _acc: pass else: raise HTTPException( status_code=400, detail=( "Direktchat: gueltige Session oder sender_user_id fuer " "diese Praxis erforderlich." ), ) msg_id = uuid.uuid4().hex[:12] messages = _load_messages() thread_id = msg_id reply_to = (msg.extras or {}).get("reply_to", "") if reply_to: for m in messages: if m.get("id") == reply_to: thread_id = m.get("thread_id", reply_to) break else: thread_id = reply_to entry = { "id": msg_id, "thread_id": thread_id, "practice_id": pid, "medikamente": msg.medikamente.strip(), "therapieplan": msg.therapieplan.strip(), "procedere": msg.procedere.strip(), "kommentar": msg.kommentar.strip(), "patient": msg.patient.strip(), "absender": absender, "zeitstempel": msg.zeitstempel.strip() or time.strftime("%Y-%m-%d %H:%M:%S"), "empfangen": time.strftime("%Y-%m-%d %H:%M:%S"), "status": "offen", "user_id": s["user_id"] if s else "", } ex_raw = dict(msg.extras or {}) session_uid_out = (s["user_id"] if s else "") or "" entry["extras"] = _enrich_outgoing_direct_extras(pid, absender, ex_raw, session_uid_out) exo_dbg = entry.get("extras") or {} if _claims_dm: _send_mode = "direct" elif _extras_indicates_broadcast(exo_dbg): _send_mode = "all" else: _send_mode = "other" _log.info( "empfang_send mode=%s practice_id=%s sender_uid=%s recipient_uid=%s " "conv_key=%s msg_id=%s", _send_mode, pid, str(exo_dbg.get("sender_user_id") or ""), str(exo_dbg.get("recipient_user_id") or ""), str(exo_dbg.get("direct_conv_key") or ""), msg_id, ) messages.insert(0, entry) _save_messages(messages) try: _pulse_bump(pid, sender=absender) except Exception: pass return JSONResponse(content={ "success": True, "id": msg_id, "thread_id": thread_id, "practice_id": pid, }) @router.get("/messages") async def empfang_list(request: Request, practice_id: Optional[str] = Query(None)): pid = (practice_id or "").strip() or _resolve_practice_id(request) if not pid: return JSONResponse(content={"success": True, "messages": []}) messages = _load_messages() filtered = _filter_by_practice(messages, pid) return JSONResponse(content={"success": True, "messages": filtered}) # ===================================================================== # CONVERSATION + LIVE-PULSE # Eine einzige serverseitige Wahrheit fuer Browser, Hülle und # "An Empfang senden". Kein Client-Filter, keine lokale Sonderwahrheit. # ===================================================================== # In-Memory Pulse: bei jedem POST /send wird der Tick erhoeht. # Clients koennen mit kurzen Polls (z. B. 800 ms) auf den Tick lauschen # und nur dann die volle Conversation neu holen, wenn der Tick wechselt. # Damit wirkt das Signal sofort, ohne traege Sekunden-Lags. _PRACTICE_PULSE: dict[str, dict] = {} def _pulse_bump(practice_id: str, sender: str = ""): p = _PRACTICE_PULSE.setdefault(practice_id, {"tick": 0, "ts": 0.0, "last_sender": ""}) p["tick"] = int(p.get("tick", 0)) + 1 p["ts"] = time.time() p["last_sender"] = sender or "" _PRACTICE_PULSE[practice_id] = p def _pulse_get(practice_id: str) -> dict: p = _PRACTICE_PULSE.get(practice_id) if not p: # Beim ersten Abruf einen Tick aus den Daten ableiten, damit # Clients nach Server-Restart nicht alle "neue Nachricht!" denken. msgs = _filter_by_practice(_load_messages(), practice_id) latest = "" for m in msgs: t = (m.get("empfangen") or m.get("zeitstempel") or "") if t > latest: latest = t p = {"tick": 1, "ts": time.time(), "last_sender": "", "boot": latest} _PRACTICE_PULSE[practice_id] = p return p # ===================================================================== # Client-Presence (Ping pro angemeldeter Empfang-Instanz, practice-scoped) # RAM-beschraenkt wie Pulse; TTL definiert „online“ für /empfang/users. # ===================================================================== EMPFANG_PRESENCE_TTL_SECONDS = 120 _PRACTICE_USER_PRESENCE: dict[str, dict] = {} def _presence_key(pid: str, uid: str) -> str: return f"{(pid or '').strip()}|{(uid or '').strip()}" def _presence_record_ping(pid: str, uid: str, source: str = "web") -> None: if not pid or not uid: return src = ((source or "web").strip() or "web")[:32] _PRACTICE_USER_PRESENCE[_presence_key(pid, uid)] = { "last_seen": time.time(), "source": src, } def _presence_clear_user(pid: str, uid: str) -> None: if not pid or not uid: return _PRACTICE_USER_PRESENCE.pop(_presence_key(pid, uid), None) def _presence_iso_utc(ts: float) -> str: if ts <= 0: return "" return datetime.fromtimestamp(ts, tz=timezone.utc).strftime( "%Y-%m-%dT%H:%M:%SZ") def _presence_snapshot_for_user(pid: str, uid: str) -> dict: rec = _PRACTICE_USER_PRESENCE.get(_presence_key(pid, uid)) now = time.time() absent = { "presence_online": False, "presence_last_seen": None, "presence_source": "", "presence_age_seconds": None, } if not rec: return absent last = float(rec.get("last_seen", 0)) age = max(0.0, now - last) online = age <= float(EMPFANG_PRESENCE_TTL_SECONDS) return { "presence_online": online, "presence_last_seen": _presence_iso_utc(last) if last > 0 else None, "presence_source": str(rec.get("source") or ""), "presence_age_seconds": int(age), } def _norm_name(s: str) -> str: """Vergleichts-String fuer Namen: lower, trim, Akzente ueber NFKD entfernen.""" t = (s or "").strip().lower() t = unicodedata.normalize("NFKD", t) return "".join(ch for ch in t if unicodedata.combining(ch) == "") def _accounts_by_practice(pid: str) -> dict[str, dict]: accounts = _load_accounts() return {a["user_id"]: a for a in accounts.values() if a.get("practice_id") == pid} def _resolve_user_uid_in_practice(pid: str, hint: str) -> str: """Loesst Kurz-ID oder display_name innerhalb einer Praxis zu user_id auf.""" hint = (hint or "").strip() if not hint or not pid: return "" by_uid = _accounts_by_practice(pid) if hint in by_uid: return hint hn = _norm_name(hint) best = "" for uid, a in by_uid.items(): if _norm_name(a.get("display_name") or "") == hn: return uid return best def _resolve_user_uid_in_practice_loose(pid: str, hint: str) -> str: """Eindeutige Teilstring-Zuordnung display_name ↔ user_id innerhalb einer Praxis. Nur wenn genau ein Konto matched; sonst leer — keine automatische Fusion. Mindestlaenge fuer Hint, um kurze Artefakte wie ``test`` zu vermeiden. """ hint = (hint or "").strip() if not hint or not pid: return "" ex = _resolve_user_uid_in_practice(pid, hint) if ex: return ex hn = _norm_name(hint) if len(hn) < 5: return "" cand: list[str] = [] for uid, a in _accounts_by_practice(pid).items(): dn = _norm_name(a.get("display_name") or "") if not dn: continue if hn in dn or dn in hn: cand.append(uid) if len(cand) != 1: return "" return cand[0] def _resolve_user_uid_unique_last_token(pid: str, full_name_core: str) -> str: """Wenn nach normiertem Nachnamen nur genau ein Konto in der Praxis matched.""" hn = _norm_name(full_name_core) parts = [p for p in hn.split() if p] if len(parts) < 2: return "" last = parts[-1] if len(last) < 4: return "" cand: list[str] = [] for uid, a in _accounts_by_practice(pid).items(): dn = _norm_name(a.get("display_name") or "") dp = [p for p in dn.split() if p] if not dp: continue if dp[-1] == last: cand.append(uid) if len(cand) != 1: return "" return cand[0] def _extras_indicates_broadcast(ex: dict) -> bool: if not isinstance(ex, dict): return False if ex.get("rcpt_broadcast") in (True, "true", "1", 1): return True aud = str(ex.get("audience") or "").strip().lower() if aud in ("all", "everyone", "broadcast", "general"): return True return False def _thread_id_stable(m: dict) -> str: return str(m.get("thread_id") or m.get("id") or "").strip() def _thread_requires_broadcast_exclusion( messages: list[dict], pid: str, tid: str, ) -> bool: """Threads mit klarem (Nicht-)Broadcast-Adressaten nicht unter «An alle» listen.""" if not tid or not pid: return False acc_map = _accounts_by_practice(pid) for m in messages: if _thread_id_stable(m) != tid: continue if _normalized_group_key_from_message(m): continue ex = m.get("extras") or {} if _extras_indicates_broadcast(ex): continue su = str(ex.get("sender_user_id") or "").strip() ru = str(ex.get("recipient_user_id") or "").strip() if su in acc_map and ru in acc_map and su != ru: return True rcpt_raw = (ex.get("recipient") or "").strip() rl = rcpt_raw.lower() if rcpt_raw and rl not in ("alle", "all", "allgemein", "an alle"): return True dk = str(ex.get("direct_conv_key") or "").strip() if dk and "|direct|" in dk: return True return False def _uid_pair_from_message_for_practice(m: dict, pid: str) -> tuple[str, str]: """Liefert zwei user_ids wenn aus extras + Stammdaten konservativ ableitbar.""" acc_map = _accounts_by_practice(pid) ex = m.get("extras") or {} su_ex = str(ex.get("sender_user_id") or "").strip() ru_ex = str(ex.get("recipient_user_id") or "").strip() su = su_ex if su_ex in acc_map else "" ru = ru_ex if ru_ex in acc_map else "" core_s = _sender_core(m.get("absender", "")) r_s = (ex.get("recipient") or "").strip() rl = r_s.lower() if rl in ("alle", "all", "allgemein", "an alle"): return ("", "") if _extras_indicates_broadcast(ex): return ("", "") rs = _resolve_user_uid_in_practice(pid, core_s) rr = _resolve_user_uid_in_practice(pid, r_s) if r_s else "" if not rs: rs = _resolve_user_uid_in_practice_loose(pid, core_s) if not rs: rs = _resolve_user_uid_unique_last_token(pid, core_s) if not rr and r_s: rr = _resolve_user_uid_in_practice_loose(pid, r_s) if not rr and r_s: rr = _resolve_user_uid_unique_last_token(pid, r_s) if not su and rs: su = rs if not ru and rr: ru = rr if _normalized_group_key_from_message(m): return ("", "") if su and ru and su != ru: return (su, ru) return ("", "") def _direct_conv_key(pid: str, uid_a: str, uid_b: str) -> str: ua, ub = sorted([uid_a, uid_b]) return f"{pid}|direct|{ua}|{ub}" def _sender_core(absender: str) -> str: """Aus 'Vorname Nachname (HOST)' -> 'Vorname Nachname'.""" s = (absender or "").split("(")[0].strip() return s def _enrich_outgoing_direct_extras(pid: str, absender: str, extras: dict, session_uid: str) -> dict: """DM: sender_user_id, recipient_user_id, direct_conv_key (stabil).""" ex = dict(extras or {}) recipient_raw = (ex.get("recipient") or "").strip() rlist = ex.get("recipients") is_multi = ( isinstance(rlist, list) and len(rlist) >= 2 ) or ("," in recipient_raw and recipient_raw.count(",") >= 1) broadcast_rcpt = not recipient_raw or recipient_raw.lower() in ("alle", "all", "allgemein") if broadcast_rcpt or is_multi: return ex by_uid = _accounts_by_practice(pid) core = _sender_core(absender) sender_uid = (session_uid or "").strip() if not sender_uid: su_claim = str(ex.get("sender_user_id") or "").strip() resolved = _resolve_user_uid_in_practice(pid, core) if su_claim and su_claim in by_uid and su_claim == resolved: sender_uid = su_claim elif su_claim and su_claim in by_uid and not resolved: sender_uid = su_claim else: sender_uid = resolved else: if sender_uid not in by_uid: fb = _resolve_user_uid_in_practice(pid, core) if fb: sender_uid = fb if sender_uid: ex["sender_user_id"] = sender_uid recipient_uid = _resolve_user_uid_in_practice(pid, recipient_raw) ru_claim = str(ex.get("recipient_user_id") or "").strip() if ru_claim and ru_claim in by_uid: if not recipient_uid or ru_claim == recipient_uid: recipient_uid = ru_claim if recipient_uid: ex["recipient_user_id"] = recipient_uid if sender_uid and recipient_uid: ex["direct_conv_key"] = _direct_conv_key(pid, sender_uid, recipient_uid) # Explizites DM-Tagging: garantiert, dass diese Nachricht nicht als # Allgemein-/Broadcast-Inbox-Treffer ausgewertet wird. if recipient_raw and not is_multi: ex["audience"] = "direct" ex["rcpt_broadcast"] = False return ex def _msg_recipient(m: dict) -> str: extras = m.get("extras") or {} return (extras.get("recipient") or "").strip() def _normalized_group_key_from_extras(extras: dict) -> str: """Canonical key 'name|name|...' lowercase for multi-recipient threads.""" if not isinstance(extras, dict): return "" rlist = extras.get("recipients") if isinstance(rlist, list) and len(rlist) >= 2: parts = sorted({_norm_name(str(x)) for x in rlist if str(x).strip()}) return "|".join(parts) if parts else "" rcpt = (extras.get("recipient") or "").strip() if "," in rcpt: parts = sorted({_norm_name(p) for p in rcpt.split(",") if p.strip()}) if len(parts) >= 2: return "|".join(parts) return "" def _normalized_group_key_from_message(m: dict) -> str: return _normalized_group_key_from_extras(m.get("extras") or {}) def _dm_message_matches_pair(m: dict, me_n: str, peer_n: str) -> bool: """True, wenn Nachricht zum 1:1-Paar (mit display_name-normalisierten Kernen) gehoert.""" if _normalized_group_key_from_message(m): return False sender_n = _norm_name(_sender_core(m.get("absender", ""))) rcpt_n = _norm_name(_msg_recipient(m)) ex = m.get("extras") or {} has_reply = bool(str(ex.get("reply_to") or "").strip()) if me_n and peer_n: if sender_n and sender_n not in (me_n, peer_n): return False if rcpt_n in ("", "alle"): # Leerer Empfaenger: nur echte Thread-Antwort (kein Rundschreiben ohne Adresse). return has_reply and bool(sender_n) and sender_n in (me_n, peer_n) if rcpt_n not in (me_n, peer_n): return False return ( (sender_n == me_n and rcpt_n == peer_n) or (sender_n == peer_n and rcpt_n == me_n) ) # Wenig 'me' vom Client: sehr konservativ, keine Rundmails ohne reply_to reinziehen. if not peer_n: return False if rcpt_n in ("", "alle"): return has_reply and sender_n == peer_n if sender_n == peer_n and rcpt_n and rcpt_n not in ("alle",) and rcpt_n != peer_n: return True if rcpt_n == peer_n and sender_n and sender_n != peer_n: return True return False def _msg_by_id_index(messages: list[dict]) -> dict[str, dict]: return {str(m.get("id")): m for m in messages if m.get("id")} def _thread_root_msg(m: dict, by_id: dict[str, dict]) -> Optional[dict]: cur: Optional[dict] = m steps = 0 while cur is not None and steps < 500: steps += 1 rto = str((cur.get("extras") or {}).get("reply_to") or "").strip() if not rto: return cur nxt = by_id.get(rto) if nxt is None: return cur cur = nxt return cur def _root_is_broadcast_inbox(root: Optional[dict], pid: str = "") -> bool: if not root: return False if _normalized_group_key_from_message(root): return False pid = (pid or "").strip() exroot = root.get("extras") or {} if pid and isinstance(exroot, dict) and not _extras_indicates_broadcast(exroot): acc_map = _accounts_by_practice(pid) su = str(exroot.get("sender_user_id") or "").strip() ru = str(exroot.get("recipient_user_id") or "").strip() if su in acc_map and ru in acc_map and su != ru: return False rcpt_n = _norm_name(_msg_recipient(root)) return rcpt_n in ("", "alle") def _dm_extras_uid_symmetric_match( ex: dict, me_uid: str, peer_uid: str, acc_map: dict[str, dict] ) -> bool: su = str(ex.get("sender_user_id") or "").strip() ru = str(ex.get("recipient_user_id") or "").strip() if su not in acc_map or ru not in acc_map or su == ru: return False return {su, ru} == {me_uid, peer_uid} def _dm_uid_pair_matches_message(m: dict, pid: str, me_uid: str, peer_uid: str) -> bool: """True, wenn aus Absender-/Empfaenger-/extras eindeutig dasselbe 1:1-Paar wird.""" if not me_uid or not peer_uid: return False a, b = _uid_pair_from_message_for_practice(m, pid) return bool(a and b and {a, b} == {me_uid, peer_uid}) def _conversation_dm_by_key_or_names( messages: list[dict], pid: str, me_uid: str, peer_uid: str, me_display: str, peer_display_fallback: str, ) -> list[dict]: acc_map = _accounts_by_practice(pid) peer_dn = peer_display_fallback if peer_uid and peer_uid in acc_map: peer_dn = (acc_map[peer_uid].get("display_name") or "").strip() or peer_dn me_dn = me_display if me_uid and me_uid in acc_map: me_dn = (acc_map[me_uid].get("display_name") or "").strip() or me_dn me_n = _norm_name(me_dn) peer_n = _norm_name(peer_dn) key_need = "" if me_uid and peer_uid: key_need = _direct_conv_key(pid, me_uid, peer_uid) out: list[dict] = [] for m in messages: if _normalized_group_key_from_message(m): continue ex = m.get("extras") or {} rcpt_raw_l = (ex.get("recipient") or "").strip().lower() if rcpt_raw_l in ("alle", "all", "allgemein", "an alle"): continue if _extras_indicates_broadcast(ex): continue matched = False if key_need: km = str(ex.get("direct_conv_key") or "").strip() if km == key_need: matched = True elif _dm_extras_uid_symmetric_match(ex, me_uid, peer_uid, acc_map): matched = True elif km and km != key_need: # Alter/falscher Key: nur bei Nachweis desselben Teilnehmerpaares # oder konservativem Legacy-Namenmatch zulassen (nicht fremdes DM). if _dm_uid_pair_matches_message(m, pid, me_uid, peer_uid): matched = True elif me_n and peer_n and _dm_message_matches_pair(m, me_n, peer_n): matched = True else: continue if not matched and me_uid and peer_uid: if _dm_uid_pair_matches_message(m, pid, me_uid, peer_uid): matched = True if not matched and me_n and peer_n and _dm_message_matches_pair(m, me_n, peer_n): matched = True if matched: out.append(m) return out def _conversation_for_audience( messages: list[dict], practice_id_scope: str, me_display: str, audience: str, me_user_id: str = "", peer_user_id: str = "", ) -> list[dict]: """ Audience-Modell: - Optional me_user_id + peer_user_id: stabiler Direktchat (direct_conv_key). - Sonst Fallback ueber normierte display_name-Paare. """ aud_raw = (audience or "").strip() aud_lower = aud_raw.lower() if aud_lower in ("__noop__", "__multi__"): return [] is_broadcast = aud_lower in ("", "alle", "all", "allgemein") pid = (practice_id_scope or "").strip() def _tid(msg: dict) -> str: return str(msg.get("thread_id") or msg.get("id") or "") out: list[dict] = [] if is_broadcast: by_id = _msg_by_id_index(messages) for m in messages: if _normalized_group_key_from_message(m): continue rcpt_n = _norm_name(_msg_recipient(m)) if rcpt_n not in ("", "alle"): continue root = _thread_root_msg(m, by_id) if not _root_is_broadcast_inbox(root, pid): continue tid = _tid(m) if _thread_requires_broadcast_exclusion(messages, pid, tid): continue out.append(m) out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or "")) return out # --- Gruppen-Chat --- if aud_lower.startswith("group|"): target_key = aud_lower[len("group|"):].strip() participants = {_norm_name(p) for p in target_key.split("|") if p.strip()} thread_ids: set[str] = set() for m in messages: gk = _normalized_group_key_from_message(m) if gk == target_key: thread_ids.add(_tid(m)) for m in messages: if _tid(m) not in thread_ids: continue gk2 = _normalized_group_key_from_message(m) if gk2 == target_key: out.append(m) continue if not gk2: sn = _norm_name(_sender_core(m.get("absender", ""))) if sn and sn in participants: out.append(m) out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or "")) return out # --- 1:1 Direktverlauf --- mu = (me_user_id or "").strip() or "" pu = (peer_user_id or "").strip() or _resolve_user_uid_in_practice(pid, aud_raw) dm_list = _conversation_dm_by_key_or_names( messages, pid, mu, pu, me_display, aud_raw, ) out.extend(dm_list) out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or "")) return out @router.get("/conversation") async def empfang_conversation( request: Request, audience: str = Query(""), me: str = Query(""), me_user_id: str = Query(""), peer_user_id: str = Query(""), practice_id: Optional[str] = Query(None), ): """Liefert den vollstaendigen, serverseitig gefilterten Verlauf. Eine Quelle fuer Browser, Hülle und Desktop-Dialog "An Empfang senden". Optional: me_user_id + peer_user_id fuer stabilen Direktchat (gleiche Logik ueberall). """ pid = (practice_id or "").strip() or _resolve_practice_id(request) if not pid: return JSONResponse( content={"success": True, "messages": [], "tick": 0}, headers={"Cache-Control": "no-store, no-cache, must-revalidate"}, ) s = _session_from_request(request) me_eff = (me or "").strip() or (s.get("display_name") if s else "") me_uid_eff = (me_user_id or "").strip() or (str(s.get("user_id") or "").strip() if s else "") if not me_uid_eff and me_eff: # Fallback: aus dem Anzeigename in der Praxis aufloesen, damit # Desktop ohne Browser-Session denselben direct_conv_key trifft. me_uid_eff = _resolve_user_uid_in_practice(pid, me_eff) peer_uid_eff = (peer_user_id or "").strip() if not peer_uid_eff: aud_raw = (audience or "").strip() aud_lower = aud_raw.lower() if aud_raw and aud_lower not in ("", "alle", "all", "allgemein", "__noop__", "__multi__") and not aud_lower.startswith("group|"): peer_uid_eff = _resolve_user_uid_in_practice(pid, aud_raw) messages = _filter_by_practice(_load_messages(), pid) conv = _conversation_for_audience( messages, pid, me_eff, audience, me_uid_eff, peer_uid_eff, ) pulse = _pulse_get(pid) _log.info( "empfang_conversation load practice_id=%s me_uid=%s peer_uid=%s " "audience_key=%s msg_count=%s tick=%s", pid, me_uid_eff, peer_uid_eff, (audience or "")[:80] if audience else "", len(conv), int(pulse.get("tick", 0)), ) return JSONResponse( content={ "success": True, "messages": conv, "audience": audience or "", "me": me_eff, "me_user_id": me_uid_eff, "peer_user_id_used": peer_uid_eff, "tick": int(pulse.get("tick", 0)), "ts": pulse.get("ts", 0.0), }, headers={ "Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache", }, ) @router.get("/pulse") async def empfang_pulse(request: Request, practice_id: Optional[str] = Query(None)): """Sehr leichter Endpoint fuer Live-Pulse. Clients pollen kurz (z. B. 800 ms) und holen die Conversation nur dann neu, wenn sich 'tick' geaendert hat. Damit erscheint das Signal sofort und ohne 5–10 s Verzoegerung der alten Polling-Loop. """ pid = (practice_id or "").strip() or _resolve_practice_id(request) if not pid: return JSONResponse( content={"tick": 0, "ts": 0.0}, headers={"Cache-Control": "no-store, no-cache, must-revalidate"}, ) p = _pulse_get(pid) return JSONResponse( content={ "tick": int(p.get("tick", 0)), "ts": float(p.get("ts", 0.0)), "last_sender": p.get("last_sender", ""), }, headers={ "Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache", }, ) @router.post("/presence/ping") async def empfang_presence_ping(request: Request): """Setzt fuer die aktuelle Praxis/User-Kombination last_seen (TTL = online). Cookie-Session: Web / WebView. Optional: Desktop mit X-API-Token + X-Practice-Id + X-AzA-Empfang-User-Id (gleiche Pruefung wie Shell-Erzeugung). """ now = time.time() ttl = int(EMPFANG_PRESENCE_TTL_SECONDS) s = _session_from_request(request) if s: pid = (s.get("practice_id") or "").strip() uid = (s.get("user_id") or "").strip() if pid and uid: _presence_record_ping(pid, uid, "web") return JSONResponse( content={ "success": True, "server_time": now, "ttl_seconds": ttl, "own_user_id": uid, }, headers={"Cache-Control": "no-store, no-cache, must-revalidate"}, ) api_raw = (request.headers.get("X-API-Token") or "").strip() if api_raw: pid, uid = _require_shell_api_identity(request) _presence_record_ping(pid, uid, "desktop") return JSONResponse( content={ "success": True, "server_time": now, "ttl_seconds": ttl, "own_user_id": uid, }, headers={"Cache-Control": "no-store, no-cache, must-revalidate"}, ) raise HTTPException(status_code=401, detail="Nicht angemeldet") # ===================================================================== # DM v2 — Direct-Only, Fail-Closed # Eigene, isolierte API ausschliesslich fuer Personenchats: # - kein Allgemein/Broadcast-Pfad # - kein Namens-/Heuristik-Matching # - Speicherung und Laden nur ueber direct_conv_key # - Verifizierbar (msg_id + conversation_key in Antwort) # ===================================================================== class DmSendIn(BaseModel): practice_id: str = "" sender_user_id: str = "" recipient_user_id: str = "" text: str = "" attachments: list = Field(default_factory=list) client_msg_id: str = "" def _dm_v2_load_for_pair(pid: str, uid_a: str, uid_b: str) -> tuple[list[dict], str]: """Liefert genau die Nachrichten dieses 1:1-Schluessels, sortiert chronologisch.""" conv_key = _direct_conv_key(pid, uid_a, uid_b) msgs = _filter_by_practice(_load_messages(), pid) out: list[dict] = [] for m in msgs: ex = m.get("extras") or {} if str(ex.get("direct_conv_key") or "").strip() == conv_key: out.append(m) out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or "")) return out, conv_key @router.post("/dm/send") async def empfang_dm_send(payload: DmSendIn, request: Request): """Direct-Only Senden. Fail-Closed: - practice_id Pflicht (Body oder Session) - sender_user_id Pflicht (Body oder Session) - recipient_user_id Pflicht - sender != recipient - beide Konten muessen zur Praxis gehoeren - Kein Fallback auf Allgemein. Kein audience=all. """ s = _session_from_request(request) pid = (payload.practice_id or "").strip() or _resolve_practice_id(request) if not pid: raise HTTPException(status_code=400, detail="practice_id erforderlich") sender_uid = (payload.sender_user_id or "").strip() if not sender_uid and s: sender_uid = str(s.get("user_id") or "").strip() recipient_uid = (payload.recipient_user_id or "").strip() text = (payload.text or "").strip() if not sender_uid: raise HTTPException(status_code=400, detail="sender_user_id erforderlich") if not recipient_uid: raise HTTPException(status_code=400, detail="recipient_user_id erforderlich") if sender_uid == recipient_uid: raise HTTPException(status_code=400, detail="Selbstchat nicht erlaubt") acc_map = _accounts_by_practice(pid) if sender_uid not in acc_map: raise HTTPException(status_code=400, detail="sender_user_id gehoert nicht zu dieser Praxis") if recipient_uid not in acc_map: raise HTTPException(status_code=400, detail="recipient_user_id gehoert nicht zu dieser Praxis") has_attachments = bool(payload.attachments) if not text and not has_attachments: raise HTTPException(status_code=400, detail="Leere Nachricht ohne Anhang nicht erlaubt") sender_dn = (acc_map[sender_uid].get("display_name") or "").strip() recipient_dn = (acc_map[recipient_uid].get("display_name") or "").strip() conv_key = _direct_conv_key(pid, sender_uid, recipient_uid) msg_id = uuid.uuid4().hex[:12] now = time.strftime("%Y-%m-%d %H:%M:%S") extras = { "audience": "direct", "rcpt_broadcast": False, "recipient": recipient_dn, "recipient_user_id": recipient_uid, "sender_user_id": sender_uid, "direct_conv_key": conv_key, "dm_v2": True, } if payload.client_msg_id: extras["client_msg_id"] = str(payload.client_msg_id)[:64] if has_attachments: extras["attachments"] = payload.attachments entry = { "id": msg_id, "thread_id": msg_id, "practice_id": pid, "medikamente": "", "therapieplan": "", "procedere": "", "kommentar": text or ("\u200b" if has_attachments else ""), "patient": "Direkt: " + recipient_dn, "absender": sender_dn + " (Empfang)", "zeitstempel": now, "empfangen": now, "status": "offen", "user_id": sender_uid, "extras": extras, } messages = _load_messages() messages.insert(0, entry) _save_messages(messages) try: _pulse_bump(pid, sender=sender_dn) except Exception: pass _log.info( "AZA_CHAT_SEND mode=direct practice=%s sender=%s recipient=%s conv=%s msg=%s", pid, sender_uid, recipient_uid, conv_key, msg_id, ) return JSONResponse( content={ "success": True, "ok": True, "mode": "direct", "message_id": msg_id, "thread_id": msg_id, "practice_id": pid, "sender_user_id": sender_uid, "recipient_user_id": recipient_uid, "conversation_key": conv_key, "created_at": now, }, headers={ "Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache", }, ) @router.get("/dm/conversation") async def empfang_dm_conversation( request: Request, sender_user_id: str = Query(""), recipient_user_id: str = Query(""), practice_id: Optional[str] = Query(None), ): """Liefert ausschliesslich Direct-Nachrichten dieses 1:1-Paares (per direct_conv_key). Keine Heuristik, kein Broadcast-Fallback. """ s = _session_from_request(request) pid = (practice_id or "").strip() or _resolve_practice_id(request) if not pid: raise HTTPException(status_code=400, detail="practice_id erforderlich") me_uid = (sender_user_id or "").strip() if not me_uid and s: me_uid = str(s.get("user_id") or "").strip() peer_uid = (recipient_user_id or "").strip() if not me_uid: raise HTTPException(status_code=400, detail="sender_user_id erforderlich") if not peer_uid: raise HTTPException(status_code=400, detail="recipient_user_id erforderlich") if me_uid == peer_uid: raise HTTPException(status_code=400, detail="Selbstchat nicht erlaubt") acc_map = _accounts_by_practice(pid) if me_uid not in acc_map: raise HTTPException(status_code=400, detail="sender_user_id gehoert nicht zu dieser Praxis") if peer_uid not in acc_map: raise HTTPException(status_code=400, detail="recipient_user_id gehoert nicht zu dieser Praxis") msgs, conv_key = _dm_v2_load_for_pair(pid, me_uid, peer_uid) pulse = _pulse_get(pid) _log.info( "AZA_CHAT_LOAD mode=direct practice=%s me=%s peer=%s conv=%s count=%s", pid, me_uid, peer_uid, conv_key, len(msgs), ) return JSONResponse( content={ "success": True, "ok": True, "mode": "direct", "practice_id": pid, "sender_user_id": me_uid, "recipient_user_id": peer_uid, "conversation_key": conv_key, "messages": msgs, "count": len(msgs), "tick": int(pulse.get("tick", 0)), "ts": pulse.get("ts", 0.0), }, headers={ "Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache", }, ) @router.get("/thread/{thread_id}") async def empfang_thread(thread_id: str, request: Request, practice_id: Optional[str] = Query(None)): pid = (practice_id or "").strip() or _resolve_practice_id(request) if not pid: return JSONResponse(content={"success": True, "messages": []}) messages = _load_messages() thread = [m for m in messages if m.get("thread_id") == thread_id and _msg_practice(m) == pid] thread.sort(key=lambda m: m.get("empfangen", "")) return JSONResponse(content={"success": True, "messages": thread}) @router.post("/messages/{msg_id}/done") async def empfang_done(msg_id: str): messages = _load_messages() target = next((m for m in messages if m.get("id") == msg_id), None) if not target: raise HTTPException(status_code=404, detail="Nachricht nicht gefunden") tid = target.get("thread_id", msg_id) pid = _msg_practice(target) for m in messages: if m.get("thread_id") == tid and _msg_practice(m) == pid: m["status"] = "erledigt" _save_messages(messages) try: _pulse_bump(pid, sender="") except Exception: pass return JSONResponse(content={"success": True}) @router.delete("/messages/{msg_id}") async def empfang_delete(msg_id: str): messages = _load_messages() target = next((m for m in messages if m.get("id") == msg_id), None) if not target: raise HTTPException(status_code=404, detail="Nachricht nicht gefunden") tid = target.get("thread_id", msg_id) pid = _msg_practice(target) if tid == msg_id: new = [m for m in messages if not (m.get("thread_id", m.get("id")) == msg_id and _msg_practice(m) == pid) and not (m.get("id") == msg_id)] else: new = [m for m in messages if m.get("id") != msg_id] _save_messages(new) try: _pulse_bump(pid, sender="") except Exception: pass return JSONResponse(content={"success": True}) # ===================================================================== # TASKS (practice-scoped, server-side) # ===================================================================== @router.get("/tasks") async def empfang_tasks_list(request: Request): pid = _resolve_practice_id(request) if not pid: return JSONResponse(content={"success": True, "tasks": []}) tasks = _load_tasks() filtered = [t for t in tasks if t.get("practice_id") == pid] return JSONResponse(content={"success": True, "tasks": filtered}) @router.post("/tasks") async def empfang_tasks_create(request: Request): pid = _require_practice_id(request) s = _session_from_request(request) try: body = await request.json() except Exception: body = {} text = (body.get("text") or "").strip() if not text: raise HTTPException(status_code=400, detail="Text erforderlich") title_opt = (body.get("title") or "").strip() meta_opt = (body.get("source_meta") or "").strip() peer_opt = (body.get("source_peer") or "").strip() stid_opt = (body.get("source_thread_id") or "").strip() task = { "task_id": uuid.uuid4().hex[:12], "practice_id": pid, "text": text, "title": title_opt or "", "source_meta": meta_opt or "", "source_peer": peer_opt or "", "source_thread_id": stid_opt or "", "done": False, "assignee": (body.get("assignee") or "").strip(), "created_by": (s or {}).get("display_name", "") if s else "", "created": time.strftime("%Y-%m-%d %H:%M:%S"), "source_msg_id": (body.get("source_msg_id") or "").strip(), } tasks = _load_tasks() tasks.insert(0, task) _save_tasks(tasks) return JSONResponse(content={"success": True, "task": task}) @router.post("/tasks/{task_id}/update") async def empfang_tasks_update(task_id: str, request: Request): try: body = await request.json() except Exception: body = {} tasks = _load_tasks() target = next((t for t in tasks if t.get("task_id") == task_id), None) if not target: raise HTTPException(status_code=404, detail="Aufgabe nicht gefunden") if "done" in body: target["done"] = bool(body["done"]) if "text" in body: target["text"] = (body["text"] or "").strip() or target["text"] if "title" in body: target["title"] = (body.get("title") or "").strip() if "assignee" in body: target["assignee"] = (body.get("assignee") or "").strip() if "source_meta" in body: target["source_meta"] = (body.get("source_meta") or "").strip() _save_tasks(tasks) return JSONResponse(content={"success": True, "task": target}) @router.delete("/tasks/{task_id}") async def empfang_tasks_delete(task_id: str): tasks = _load_tasks() tasks = [t for t in tasks if t.get("task_id") != task_id] _save_tasks(tasks) return JSONResponse(content={"success": True}) # ===================================================================== # CHANNEL ENDPOINTS (Kanaele) # ===================================================================== @router.get("/channels") async def channels_list(request: Request): """Kanaele anzeigen, gefiltert nach Rolle des Benutzers.""" s = _require_session(request) pid = s["practice_id"] role = s.get("role", "mpa") _ensure_default_channels(pid) channels = _load_channels() visible = [] for c in channels: if c.get("practice_id") != pid: continue allowed = c.get("allowed_roles", []) if not allowed or role in allowed: visible.append(c) return JSONResponse(content={"success": True, "channels": visible}) @router.post("/channels") async def channels_create(request: Request): """Neuen Kanal erstellen (nur Admin).""" s = _require_admin(request) try: body = await request.json() except Exception: body = {} name = (body.get("name") or "").strip() if not name: raise HTTPException(status_code=400, detail="Kanalname erforderlich") scope = body.get("scope", "internal") if scope not in ("internal", "external"): scope = "internal" channel_type = body.get("channel_type", "group") if channel_type not in ("group", "direct", "external"): channel_type = "group" allowed_roles = body.get("allowed_roles", []) if not isinstance(allowed_roles, list): allowed_roles = [] channel = { "channel_id": uuid.uuid4().hex[:12], "practice_id": s["practice_id"], "name": name, "scope": scope, "channel_type": channel_type, "allowed_roles": allowed_roles, "connection_id": body.get("connection_id", ""), "created": time.strftime("%Y-%m-%d %H:%M:%S"), "created_by": s["user_id"], } channels = _load_channels() channels.append(channel) _save_channels(channels) return JSONResponse(content={"success": True, "channel": channel}) @router.post("/channels/{channel_id}/update") async def channels_update(channel_id: str, request: Request): """Kanal aktualisieren (nur Admin).""" s = _require_admin(request) try: body = await request.json() except Exception: body = {} channels = _load_channels() target = None for c in channels: if c.get("channel_id") == channel_id and c.get("practice_id") == s["practice_id"]: target = c break if not target: raise HTTPException(status_code=404, detail="Kanal nicht gefunden") if "name" in body: new_name = (body["name"] or "").strip() if new_name: target["name"] = new_name if "allowed_roles" in body: ar = body["allowed_roles"] if isinstance(ar, list): target["allowed_roles"] = ar _save_channels(channels) return JSONResponse(content={"success": True, "channel": target}) @router.delete("/channels/{channel_id}") async def channels_delete(channel_id: str, request: Request): """Kanal loeschen (nur Admin, keine Default-Kanaele).""" s = _require_admin(request) channels = _load_channels() target = None for c in channels: if c.get("channel_id") == channel_id and c.get("practice_id") == s["practice_id"]: target = c break if not target: raise HTTPException(status_code=404, detail="Kanal nicht gefunden") default_names = {d["name"] for d in _DEFAULT_CHANNEL_DEFS} if target.get("name") in default_names and target.get("scope") == "internal": raise HTTPException(status_code=400, detail="Standard-Kanaele koennen nicht geloescht werden") channels = [c for c in channels if c.get("channel_id") != channel_id] _save_channels(channels) return JSONResponse(content={"success": True, "deleted": channel_id}) # ===================================================================== # FEDERATION ENDPOINTS (Praxis-zu-Praxis-Verbindungen) # ===================================================================== @router.post("/federation/invite") async def federation_invite(request: Request): """Einladung zur Praxis-Verbindung erstellen.""" s = _require_admin(request) try: body = await request.json() except Exception: body = {} pid = s["practice_id"] practices = _load_practices() practice_name = practices.get(pid, {}).get("name", "Unbekannte Praxis") conn = { "connection_id": uuid.uuid4().hex[:12], "practice_a_id": pid, "practice_b_id": "", "status": "pending", "invite_token": secrets.token_urlsafe(24), "created_by": s["user_id"], "accepted_by": "", "created_at": time.strftime("%Y-%m-%d %H:%M:%S"), "accepted_at": "", "revoked_at": "", "practice_a_name": practice_name, "practice_b_name": "", "message": (body.get("message") or "").strip(), } conns = _load_connections() conns.append(conn) _save_connections(conns) return JSONResponse(content={ "success": True, "connection_id": conn["connection_id"], "invite_token": conn["invite_token"], }) @router.post("/federation/accept") async def federation_accept(request: Request): """Verbindungseinladung annehmen.""" s = _require_admin(request) try: body = await request.json() except Exception: body = {} invite_token = (body.get("invite_token") or "").strip() if not invite_token: raise HTTPException(status_code=400, detail="invite_token erforderlich") conns = _load_connections() target = None for c in conns: if c.get("invite_token") == invite_token and c.get("status") == "pending": target = c break if not target: raise HTTPException(status_code=404, detail="Einladung nicht gefunden oder bereits verwendet") pid_b = s["practice_id"] if target["practice_a_id"] == pid_b: raise HTTPException(status_code=400, detail="Kann eigene Einladung nicht annehmen") practices = _load_practices() practice_b_name = practices.get(pid_b, {}).get("name", "Unbekannte Praxis") now = time.strftime("%Y-%m-%d %H:%M:%S") target["practice_b_id"] = pid_b target["practice_b_name"] = practice_b_name target["status"] = "active" target["accepted_by"] = s["user_id"] target["accepted_at"] = now _save_connections(conns) channel_name = f"{target['practice_a_name']} \u2194 {practice_b_name}" conn_id = target["connection_id"] channels = _load_channels() for practice_id in (target["practice_a_id"], pid_b): channels.append({ "channel_id": uuid.uuid4().hex[:12], "practice_id": practice_id, "name": channel_name, "scope": "external", "channel_type": "external", "allowed_roles": [], "connection_id": conn_id, "created": now, "created_by": s["user_id"], }) _save_channels(channels) return JSONResponse(content={ "success": True, "connection_id": conn_id, "practice_a": target["practice_a_name"], "practice_b": practice_b_name, }) @router.get("/federation/connections") async def federation_connections(request: Request): """Alle Verbindungen der eigenen Praxis anzeigen.""" s = _require_admin(request) pid = s["practice_id"] conns = _load_connections() result = [c for c in conns if c.get("practice_a_id") == pid or c.get("practice_b_id") == pid] return JSONResponse(content={"success": True, "connections": result}) @router.post("/federation/connections/{connection_id}/revoke") async def federation_revoke(connection_id: str, request: Request): """Verbindung widerrufen / trennen.""" s = _require_admin(request) pid = s["practice_id"] conns = _load_connections() target = None for c in conns: if c.get("connection_id") == connection_id: if c.get("practice_a_id") == pid or c.get("practice_b_id") == pid: target = c break if not target: raise HTTPException(status_code=404, detail="Verbindung nicht gefunden") if target["status"] == "revoked": raise HTTPException(status_code=400, detail="Verbindung bereits widerrufen") target["status"] = "revoked" target["revoked_at"] = time.strftime("%Y-%m-%d %H:%M:%S") _save_connections(conns) return JSONResponse(content={"success": True, "connection_id": connection_id, "status": "revoked"}) @router.get("/federation/practices") async def federation_practices(request: Request): """Verbundene Praxen anzeigen (fuer alle authentifizierten Benutzer).""" s = _require_session(request) pid = s["practice_id"] conns = _load_connections() result = [] for c in conns: if c.get("status") != "active": continue if c.get("practice_a_id") == pid: result.append({ "practice_id": c.get("practice_b_id"), "practice_name": c.get("practice_b_name", ""), "connection_id": c.get("connection_id"), "status": c.get("status"), }) elif c.get("practice_b_id") == pid: result.append({ "practice_id": c.get("practice_a_id"), "practice_name": c.get("practice_a_name", ""), "connection_id": c.get("connection_id"), "status": c.get("status"), }) return JSONResponse(content={"success": True, "practices": result}) # ===================================================================== # CLEANUP + PRACTICE INFO # ===================================================================== @router.post("/cleanup") async def empfang_cleanup(request: Request): try: body = await request.json() except Exception: body = {} max_days = int(body.get("max_age_days", 30)) pid = (body.get("practice_id") or "").strip() or _resolve_practice_id(request) if not pid: return JSONResponse(content={"success": True, "removed": 0, "remaining": 0}) cutoff = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time() - max_days * 86400), ) messages = _load_messages() before = len(messages) kept = [ m for m in messages if _msg_practice(m) != pid or (m.get("empfangen") or m.get("zeitstempel", "")) >= cutoff ] removed = before - len(kept) if removed > 0: _save_messages(kept) return JSONResponse(content={ "success": True, "removed": removed, "remaining": len(kept), }) @router.get("/practice/info") async def empfang_practice_info(request: Request): api_token = request.headers.get("X-API-Token", "") s = _session_from_request(request) pid = _resolve_practice_id(request) if not pid: return JSONResponse(content={"practice_id": "", "practice_name": "", "user_count": 0, "message_count": 0, "open_count": 0}) _ensure_practice(pid) users = _practice_users(pid) messages = _filter_by_practice(_load_messages(), pid) open_count = sum(1 for m in messages if m.get("status") == "offen") practices = _load_practices() p = practices.get(pid, {}) result = { "practice_id": pid, "practice_name": p.get("name", ""), "user_count": len(users), "message_count": len(messages), "open_count": open_count, } role_l = str(s.get("role") or "").strip().lower() if s else "" show_invite = bool(api_token) or role_l in ("admin", "empfang") if show_invite: result["invite_code"] = p.get("invite_code", "") if api_token or role_l == "admin": result["admin_email"] = p.get("admin_email", "") return JSONResponse(content=result) # ===================================================================== # Shell session (kurzlebiger Desktop-Web-Huelle-Bootstrap, API-validiert) # ===================================================================== # # Nur POST /shell/session darf mittels gueltigem MEDWORK-X-API-Token + Practice # einen shell_token ausstellen (kein Browser-Cookie als Identitaetsgrundlage). # # POST /shell/consume tauscht bearer shell_token gegen normale HttpOnly-Session — # ohne API-Token, damit spaetere Web-Huelle den Token ohne Desktop-Shared-Secret # einloesen kann. _SHELL_TTL_DEFAULT = 300 _SHELL_PURPOSE = "send_to_reception_shell" _shell_store: dict[str, dict] = {} def _shell_cleanup_expired() -> None: now = time.time() stale = [ tok for tok, rec in _shell_store.items() if isinstance(rec, dict) and float(rec.get("expires_at", 0)) < now ] for tok in stale: try: del _shell_store[tok] except KeyError: pass def _require_shell_api_identity(request: Request) -> Tuple[str, str]: """Validiert X-API-Token wie aza_security; liefert (practice_id, desktop_user_id).""" api_raw = (request.headers.get("X-API-Token") or "").strip() if not api_raw: raise HTTPException(status_code=401, detail="API-Token erforderlich") try: from aza_security import get_required_api_tokens allowed = get_required_api_tokens() except RuntimeError: raise HTTPException(status_code=503, detail="API token nicht konfiguriert") if not any(hmac.compare_digest(api_raw, t) for t in allowed): raise HTTPException(status_code=401, detail="Unauthorized") pid = request.headers.get("X-Practice-Id", "").strip() if not pid: raise HTTPException( status_code=400, detail="X-Practice-Id erforderlich", ) claimed_uid = (request.headers.get("X-AzA-Empfang-User-Id") or "").strip() if not claimed_uid: raise HTTPException( status_code=403, detail=( "X-AzA-Empfang-User-Id fehlt: Desktop muss vom Server ermittelte " "user_id aus dem Provisionierungs-/Users-Endpunkt mitschicken." ), ) accounts = _load_accounts() acc = accounts.get(claimed_uid) if not acc or (acc.get("practice_id") or "").strip() != pid: raise HTTPException( status_code=403, detail="Benutzer gehoert nicht zu dieser Praxis oder user_id ungueltig", ) if (acc.get("status") or "active") != "active": raise HTTPException(status_code=403, detail="Benutzerkonto nicht aktiv") return pid, claimed_uid def _consume_shell_token_core(request: Request, shell_raw: str) -> Tuple[str, dict]: """POP shell_token, erstelle Serversession. Liefert (aza_session-Wert, Kennzeichen ohne Geheimnis).""" _shell_cleanup_expired() stok = (shell_raw or "").strip() if not stok: raise HTTPException( status_code=400, detail="shell_token fehlt", ) rec = _shell_store.get(stok) if not rec: raise HTTPException( status_code=401, detail="Shell-Token ungueltig, verbraucht oder abgelaufen", ) if time.time() > float(rec.get("expires_at", 0)): try: del _shell_store[stok] except KeyError: pass raise HTTPException(status_code=401, detail="Shell-Token abgelaufen") try: del _shell_store[stok] except KeyError: raise HTTPException( status_code=401, detail="Shell-Token ungueltig, verbraucht oder abgelaufen", ) uid = (rec.get("user_id") or "").strip() pid = (rec.get("practice_id") or "").strip() dn = (rec.get("display_name") or "").strip() role = (rec.get("role") or "mpa").strip() ua = request.headers.get("User-Agent") or "AzA-Shell-Client" ip = _extract_client_ip(request) sess_token = _create_session( uid, pid, dn, role, device_id="", user_agent=ua, ip_addr=ip, ) _log.info( "AZA_SHELL_SESSION_CONSUMED practice=%s user=%s purpose=%s", pid, uid, _SHELL_PURPOSE, ) public = { "user_id": uid, "practice_id": pid, "display_name": dn, "role": role, "purpose": _SHELL_PURPOSE, } return sess_token, public @router.post("/shell/session") async def empfang_shell_session_create(request: Request): """Erzeugt kurzlebigen shell_token (nur mit Desktop-API-Token + validierter user_id).""" _shell_cleanup_expired() pid, uid = _require_shell_api_identity(request) accounts = _load_accounts() acc = accounts.get(uid) or {} display_name = (acc.get("display_name") or "").strip() or "Benutzer" role = (acc.get("role") or "mpa").strip() now = time.time() expires_at = now + float(_SHELL_TTL_DEFAULT) shell_token = secrets.token_urlsafe(32) _shell_store[shell_token] = { "practice_id": pid, "user_id": uid, "display_name": display_name, "role": role, "purpose": _SHELL_PURPOSE, "expires_at": expires_at, "created_at": now, } expires_iso = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(expires_at)) _log.info( "AZA_SHELL_SESSION_CREATED practice=%s user=%s purpose=%s ttl=%s", pid, uid, _SHELL_PURPOSE, int(_SHELL_TTL_DEFAULT), ) return JSONResponse(content={ "shell_token": shell_token, "expires_at": expires_iso, "expires_at_unix": int(expires_at), "ttl_seconds": int(_SHELL_TTL_DEFAULT), "practice_id": pid, "user_id": uid, "display_name": display_name, "role": role, "purpose": _SHELL_PURPOSE, }) @router.post("/shell/consume") async def empfang_shell_consume(request: Request): """Tauscht shell_token gegen aza_session (HttpOnly Cookie). Einmal-Verbrauch.""" try: body = await request.json() except Exception: body = {} shell_raw = "" if isinstance(body, dict): shell_raw = (body.get("shell_token") or "").strip() if not shell_raw: raise HTTPException( status_code=400, detail="shell_token im JSON Body erforderlich", ) sess_token, pub = _consume_shell_token_core(request, shell_raw) resp = JSONResponse(content={"success": True, **pub}) resp.set_cookie( "aza_session", sess_token, httponly=True, samesite="lax", max_age=SESSION_MAX_AGE, ) return resp @router.get("/shell/launch") async def empfang_shell_launch( request: Request, token: str = Query("", description="kurzlebiger Shell-Token (einmaliger Verbrauch)"), ): """Web-Huelle: Token per GET einloesen — setzt Cookie, Redirect ohne Token in URL. Einmal-Verbrauch. Kein MEDWORK_API-Token hier (Browser/WebView ohne Desktop-Secret). """ sess_token, _pub = _consume_shell_token_core(request, token) loc = "/empfang/?desktop_shell=1&shell_source=aza_desktop" resp = RedirectResponse(url=loc, status_code=302) resp.set_cookie( "aza_session", sess_token, httponly=True, samesite="lax", max_age=SESSION_MAX_AGE, ) return resp # ===================================================================== # HTML PAGE # ===================================================================== _HTML_NO_CACHE = { "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0", "Content-Security-Policy": "frame-ancestors *;", } _LOGO_CACHE = { "Cache-Control": "public, max-age=86400", } @router.get("/aza_logo.png") async def empfang_logo_png(): """Statisches Logo fuer Nav-Leiste (liegt neben empfang.html in web/).""" p = Path(__file__).resolve().parent / "web" / "aza_logo.png" if p.is_file(): return FileResponse( path=p, media_type="image/png", headers=_LOGO_CACHE, ) raise HTTPException(status_code=404, detail="aza_logo.png nicht gefunden") @router.get("/", response_class=HTMLResponse) async def empfang_page(request: Request): html_path = Path(__file__).resolve().parent / "web" / "empfang.html" if html_path.is_file(): return HTMLResponse( content=html_path.read_text(encoding="utf-8"), headers=_HTML_NO_CACHE, ) return HTMLResponse(content="

empfang.html nicht gefunden

", status_code=404) @router.get("/chatwin.html", response_class=HTMLResponse) async def empfang_chatwin_page(): """Kompaktes Chat-Fenster (neues Fenster / Tab), z. B. 1:1 oder Allgemein.""" html_path = Path(__file__).resolve().parent / "web" / "empfang_chat_minimal.html" if html_path.is_file(): return HTMLResponse( content=html_path.read_text(encoding="utf-8"), headers=_HTML_NO_CACHE, ) return HTMLResponse(content="

chatwin nicht gefunden

", status_code=404)