Files
aza/AzA march 2026/empfang_routes.py

3747 lines
130 KiB
Python
Raw Normal View History

2026-04-16 15:23:14 +02:00
# -*- coding: utf-8 -*-
"""
AZA Empfang - Backend-Routen V5: Admin, Devices, Federation, Channels.
Serverseitige Auth, Benutzer, Sessions, Nachrichten, Aufgaben,
Geraeteverwaltung, Kanaele, Praxis-Federation.
Alle Daten practice-scoped. Backend ist die einzige Wahrheit.
2026-04-16 15:23:14 +02:00
"""
import hashlib
import hmac
2026-04-16 15:23:14 +02:00
import json
2026-05-08 14:53:53 +02:00
import logging
2026-04-16 15:23:14 +02:00
import os
import re
import secrets
2026-04-16 15:23:14 +02:00
import time
2026-05-06 22:43:22 +02:00
import unicodedata
2026-04-16 15:23:14 +02:00
import uuid
2026-05-08 14:53:53 +02:00
from datetime import datetime, timezone
2026-04-16 15:23:14 +02:00
from pathlib import Path
2026-04-21 10:00:36 +02:00
from typing import Optional, Tuple
2026-04-16 15:23:14 +02:00
from fastapi import APIRouter, Cookie, HTTPException, Query, Request, Response
2026-05-08 14:53:53 +02:00
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse
2026-04-16 15:23:14 +02:00
from pydantic import BaseModel, Field
router = APIRouter()
2026-05-08 14:53:53 +02:00
_log = logging.getLogger(__name__)
2026-04-16 15:23:14 +02:00
_DATA_DIR = Path(__file__).resolve().parent / "data"
_EMPFANG_FILE = _DATA_DIR / "empfang_nachrichten.json"
_PRACTICES_FILE = _DATA_DIR / "empfang_practices.json"
_ACCOUNTS_FILE = _DATA_DIR / "empfang_accounts.json"
_SESSIONS_FILE = _DATA_DIR / "empfang_sessions.json"
_TASKS_FILE = _DATA_DIR / "empfang_tasks.json"
_DEVICES_FILE = _DATA_DIR / "empfang_devices.json"
_CHANNELS_FILE = _DATA_DIR / "empfang_channels.json"
_CONNECTIONS_FILE = _DATA_DIR / "empfang_connections.json"
2026-04-21 10:00:36 +02:00
_LEGACY_DEFAULT_PID = "default"
SESSION_MAX_AGE = 30 * 24 * 3600 # 30 Tage
2026-04-16 15:23:14 +02:00
2026-04-21 10:00:36 +02:00
def _generate_practice_id() -> str:
return f"prac_{uuid.uuid4().hex[:12]}"
def _resolve_practice_id(request: Request) -> str:
"""Ermittelt die practice_id aus Session, Header oder Query.
Kein stiller Fallback auf eine Default-Praxis."""
s = _session_from_request(request)
if s:
return s["practice_id"]
pid = request.headers.get("X-Practice-Id", "").strip()
if pid:
return pid
pid = request.query_params.get("practice_id", "").strip()
if pid:
return pid
return ""
def _require_practice_id(request: Request) -> str:
"""Wie _resolve_practice_id, aber wirft 400 wenn keine practice_id."""
pid = _resolve_practice_id(request)
if not pid:
raise HTTPException(
status_code=400,
detail="practice_id erforderlich (X-Practice-Id Header, Session oder Query)")
return pid
2026-04-16 15:23:14 +02:00
def _ensure_data_dir():
_DATA_DIR.mkdir(parents=True, exist_ok=True)
# =====================================================================
# JSON helpers (atomic write)
# =====================================================================
def _load_json(path: Path, default=None):
if not path.is_file():
return default if default is not None else []
2026-04-16 15:23:14 +02:00
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
2026-04-16 15:23:14 +02:00
except Exception:
return default if default is not None else []
2026-04-16 15:23:14 +02:00
def _save_json(path: Path, data):
2026-04-16 15:23:14 +02:00
_ensure_data_dir()
tmp = str(path) + ".tmp"
2026-04-16 15:23:14 +02:00
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
os.replace(tmp, str(path))
# =====================================================================
# Password hashing (PBKDF2 no external dependency)
# =====================================================================
def _hash_password(password: str, salt: str = None) -> tuple[str, str]:
salt = salt or secrets.token_hex(16)
dk = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
return dk.hex(), salt
def _verify_password(password: str, stored_hash: str, salt: str) -> bool:
dk = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
return hmac.compare_digest(dk.hex(), stored_hash)
# =====================================================================
# Practices
# =====================================================================
def _load_practices() -> dict:
return _load_json(_PRACTICES_FILE, {})
def _save_practices(data: dict):
_save_json(_PRACTICES_FILE, data)
2026-05-06 22:43:22 +02:00
def _invite_code_key(raw: str) -> str:
"""Vergleicht Einladungscodes unabhaengig von Leerzeichen und Gedankenstrich-Varianten."""
s = (raw or "").strip().upper().replace(" ", "")
for ch in ("\u2011", "\u2013", "\u2014", "\u2212"):
s = s.replace(ch, "-")
return s
2026-04-21 10:00:36 +02:00
def _generate_chat_invite_code() -> str:
"""Lesbarer Chat-Einladungscode im Format CHAT-XXXX-XXXX."""
import random
chars = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
part1 = "".join(random.choices(chars, k=4))
part2 = "".join(random.choices(chars, k=4))
return f"CHAT-{part1}-{part2}"
def _ensure_practice(practice_id: str, name: str = "Meine Praxis",
admin_email: str = "") -> dict:
"""Stellt sicher, dass eine Praxis mit dieser ID existiert."""
practices = _load_practices()
2026-04-21 10:00:36 +02:00
if practice_id not in practices:
practices[practice_id] = {
"practice_id": practice_id,
"name": name,
"invite_code": _generate_chat_invite_code(),
"admin_email": admin_email,
"created": time.strftime("%Y-%m-%d %H:%M:%S"),
}
_save_practices(practices)
2026-04-21 10:00:36 +02:00
return practices[practice_id]
def _migrate_legacy_to_practice(new_pid: str):
"""Migriert alle Daten von der alten 'default'-Praxis zur neuen practice_id.
Wird einmalig beim ersten Provisioning aufgerufen."""
accounts = _load_accounts()
migrated = False
for a in accounts.values():
if a.get("practice_id") == _LEGACY_DEFAULT_PID:
a["practice_id"] = new_pid
migrated = True
if migrated:
_save_accounts(accounts)
devices = _load_devices()
for d in devices.values():
if d.get("practice_id") == _LEGACY_DEFAULT_PID:
d["practice_id"] = new_pid
_save_devices(devices)
sessions = _load_sessions()
for s in sessions.values():
if s.get("practice_id") == _LEGACY_DEFAULT_PID:
s["practice_id"] = new_pid
_save_sessions(sessions)
messages = _load_messages()
for m in messages:
if m.get("practice_id", _LEGACY_DEFAULT_PID) == _LEGACY_DEFAULT_PID:
m["practice_id"] = new_pid
_save_messages(messages)
practices = _load_practices()
if _LEGACY_DEFAULT_PID in practices:
old = practices.pop(_LEGACY_DEFAULT_PID)
if new_pid not in practices:
old["practice_id"] = new_pid
practices[new_pid] = old
_save_practices(practices)
try:
channels = _load_channels()
for c in channels:
if c.get("practice_id") == _LEGACY_DEFAULT_PID:
c["practice_id"] = new_pid
_save_channels(channels)
except Exception:
pass
_migrate_old_users(new_pid)
def _migrate_old_users(practice_id: str):
"""Migriert alte empfang_users.json Strings zu echten Accounts."""
old_file = _DATA_DIR / "empfang_users.json"
if not old_file.is_file():
return
try:
names = json.loads(old_file.read_text(encoding="utf-8"))
if not isinstance(names, list):
return
accounts = _load_accounts()
for name in names:
name = name.strip()
if not name:
continue
exists = any(
a["display_name"] == name and a["practice_id"] == practice_id
for a in accounts.values()
)
if not exists:
uid = uuid.uuid4().hex[:12]
pw_hash, pw_salt = _hash_password(name.lower())
accounts[uid] = {
"user_id": uid,
"practice_id": practice_id,
"display_name": name,
"role": "mpa",
"pw_hash": pw_hash,
"pw_salt": pw_salt,
"created": time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "active",
"last_login": "",
"email": "",
}
_save_accounts(accounts)
except Exception:
pass
# =====================================================================
# Accounts (practice-scoped users with auth)
# =====================================================================
def _load_accounts() -> dict:
return _load_json(_ACCOUNTS_FILE, {})
def _save_accounts(data: dict):
_save_json(_ACCOUNTS_FILE, data)
def _practice_users(practice_id: str) -> list[dict]:
accounts = _load_accounts()
return [
{"user_id": a["user_id"], "display_name": a["display_name"], "role": a["role"]}
for a in accounts.values()
if a.get("practice_id") == practice_id
]
2026-04-21 10:00:36 +02:00
# =====================================================================
# Login / Passwort-Reset Hilfen (Mandant, Mehrfach-E-Mail)
# =====================================================================
def _is_likely_email(s: str) -> bool:
"""Grobe Erkennung E-Mail vs. Benutzername (Anzeigename)."""
s = (s or "").strip()
if "@" not in s or len(s) < 5:
return False
parts = s.split("@", 1)
if len(parts) != 2 or not parts[0] or not parts[1]:
return False
return "." in parts[1]
def _norm_email(e: str) -> str:
return (e or "").strip().lower()
def _practice_id_from_client(request: Request, body: dict) -> str:
"""practice_id fuer Login/Forgot ohne Session: Body, Header, Query."""
pid = (body.get("practice_id") or "").strip()
if pid:
return pid
pid = request.headers.get("X-Practice-Id", "").strip()
if pid:
return pid
return request.query_params.get("practice_id", "").strip()
2026-05-08 14:53:53 +02:00
def _lookup_practice_id_by_invite(invite_raw: str) -> str:
"""Liefert practice_id fuer einen Einladungscode (normalisierter Vergleich) oder ''."""
if not (invite_raw or "").strip():
return ""
practices = _load_practices()
want = _invite_code_key(invite_raw)
for pida, pdata in practices.items():
if _invite_code_key(pdata.get("invite_code")) == want:
return pida
return ""
def _practice_id_for_login(request: Request, body: dict) -> tuple[str, str]:
"""Login: practice_id ohne Session-Cookie (sonst ueberholt alte Session die Einladung).
Reihenfolge: invite_code > Body > Header > Query.
Rueckgabe: (practice_id, quelle) mit quelle in
'invite'|'body'|'header'|'query'|''|'invite_invalid'
"""
invite_raw = (body.get("invite_code") or "").strip()
if invite_raw:
got = _lookup_practice_id_by_invite(invite_raw)
if got:
return got, "invite"
return "", "invite_invalid"
pid = (body.get("practice_id") or "").strip()
if pid:
return pid, "body"
pid = request.headers.get("X-Practice-Id", "").strip()
if pid:
return pid, "header"
pid = request.query_params.get("practice_id", "").strip()
if pid:
return pid, "query"
return "", ""
2026-04-21 10:00:36 +02:00
def _practice_label(practices: dict, pid: str) -> str:
p = practices.get(pid) or {}
return (p.get("name") or "").strip() or pid
def _send_reset_for_account(acc: dict) -> dict:
"""Token erstellen, Mail senden — Inhalt fuer JSONResponse(content=...)."""
email_to = (acc.get("email") or "").strip()
if not email_to:
return {
"success": False,
"step": "no_email",
"message": (
"Für dieses Konto ist keine E-Mail-Adresse hinterlegt. Bitte lassen Sie das "
"Passwort von einer Administratorin oder einem Administrator der Praxis "
"zurücksetzen."
),
}
reset_token = secrets.token_urlsafe(32)
resets = _load_json(_DATA_DIR / "empfang_resets.json", {})
resets[reset_token] = {
"user_id": acc["user_id"],
"email": _norm_email(email_to),
"display_name": (acc.get("display_name") or "").strip(),
"practice_id": (acc.get("practice_id") or "").strip(),
"created": time.time(),
}
for k in list(resets.keys()):
if time.time() - resets[k].get("created", 0) > 3600:
del resets[k]
_save_json(_DATA_DIR / "empfang_resets.json", resets)
_web_base = os.environ.get(
"EMPFANG_WEB_BASE", "https://empfang.aza-medwork.ch/empfang"
).rstrip("/")
reset_link = f"{_web_base}/?reset_token={reset_token}"
_send_reset_email(email_to, acc.get("display_name", ""), reset_link)
return {
"success": True,
"step": "sent",
"message": (
"Ein Link zum Zurücksetzen wurde an die hinterlegte E-Mail-Adresse gesendet."
),
}
# =====================================================================
# Sessions (mit device_id)
# =====================================================================
2026-04-16 15:23:14 +02:00
def _load_sessions() -> dict:
return _load_json(_SESSIONS_FILE, {})
def _save_sessions(data: dict):
_save_json(_SESSIONS_FILE, data)
def _create_session(user_id: str, practice_id: str, display_name: str,
role: str, device_id: str = None,
user_agent: str = "", ip_addr: str = "") -> str:
token = secrets.token_urlsafe(32)
sessions = _load_sessions()
sessions[token] = {
"user_id": user_id,
"practice_id": practice_id,
"display_name": display_name,
"role": role,
"device_id": device_id or "",
"created": time.time(),
"last_active": time.time(),
}
_save_sessions(sessions)
if device_id:
_register_or_update_device(
device_id=device_id,
user_id=user_id,
practice_id=practice_id,
user_agent=user_agent,
ip_addr=ip_addr,
)
return token
def _get_session(token: str) -> Optional[dict]:
if not token:
return None
sessions = _load_sessions()
s = sessions.get(token)
if not s:
return None
if time.time() - s.get("created", 0) > SESSION_MAX_AGE:
del sessions[token]
_save_sessions(sessions)
return None
s["last_active"] = time.time()
sessions[token] = s
_save_sessions(sessions)
return s
def _delete_session(token: str):
sessions = _load_sessions()
if token in sessions:
del sessions[token]
_save_sessions(sessions)
def _session_from_request(request: Request) -> Optional[dict]:
token = request.cookies.get("aza_session") or ""
if not token:
auth = request.headers.get("Authorization", "")
if auth.startswith("Bearer "):
token = auth[7:]
if not token:
token = request.query_params.get("session_token", "")
return _get_session(token)
def _require_session(request: Request) -> dict:
s = _session_from_request(request)
if not s:
raise HTTPException(status_code=401, detail="Nicht angemeldet")
return s
# =====================================================================
# Devices (Geraeteverwaltung)
# =====================================================================
def _load_devices() -> dict:
return _load_json(_DEVICES_FILE, {})
def _save_devices(data: dict):
_save_json(_DEVICES_FILE, data)
def _parse_device_info(user_agent: str) -> dict:
"""Einfache Heuristik zum Erkennen von Plattform, Geraetetyp und Name."""
ua = user_agent.lower()
if "iphone" in ua:
platform, device_type = "iOS", "mobile"
elif "ipad" in ua:
platform, device_type = "iOS", "tablet"
elif "android" in ua:
if "mobile" in ua:
platform, device_type = "Android", "mobile"
else:
platform, device_type = "Android", "tablet"
elif "macintosh" in ua or "mac os" in ua:
platform, device_type = "macOS", "browser"
elif "windows" in ua:
platform, device_type = "Windows", "browser"
elif "linux" in ua:
platform, device_type = "Linux", "browser"
else:
platform, device_type = "Unbekannt", "browser"
if "electron" in ua or "cursor" in ua:
device_type = "desktop"
browser = "Browser"
if "edg/" in ua:
browser = "Edge"
elif "chrome" in ua and "chromium" not in ua:
browser = "Chrome"
elif "firefox" in ua:
browser = "Firefox"
elif "safari" in ua and "chrome" not in ua:
browser = "Safari"
device_name = f"{browser} auf {platform}"
if device_type == "desktop":
device_name = f"Desktop-App auf {platform}"
elif device_type in ("mobile", "tablet"):
device_name = f"{platform} {device_type.capitalize()}"
return {
"device_name": device_name,
"platform": platform,
"device_type": device_type,
}
def _make_device_id(user_id: str, user_agent: str) -> str:
raw = f"{user_id}:{user_agent}"
return hashlib.sha256(raw.encode()).hexdigest()[:12]
def _register_or_update_device(device_id: str, user_id: str,
practice_id: str, user_agent: str,
ip_addr: str):
devices = _load_devices()
now = time.strftime("%Y-%m-%d %H:%M:%S")
info = _parse_device_info(user_agent)
if device_id in devices:
dev = devices[device_id]
dev["last_active"] = now
dev["ip_last"] = ip_addr
dev["user_agent"] = user_agent
dev["device_name"] = info["device_name"]
dev["platform"] = info["platform"]
dev["device_type"] = info["device_type"]
else:
devices[device_id] = {
"device_id": device_id,
"user_id": user_id,
"practice_id": practice_id,
"device_name": info["device_name"],
"platform": info["platform"],
"device_type": info["device_type"],
"user_agent": user_agent,
"first_seen": now,
"last_active": now,
"trust_status": "trusted",
"ip_last": ip_addr,
}
_save_devices(devices)
def _extract_client_ip(request: Request) -> str:
forwarded = request.headers.get("X-Forwarded-For", "")
if forwarded:
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return ""
# =====================================================================
# Messages (practice-scoped)
# =====================================================================
2026-05-05 23:36:13 +02:00
EMPFANG_MESSAGE_RETENTION_DAYS = 14
def _msg_timestamp_for_retention(m: dict) -> str:
return (m.get("empfangen") or m.get("zeitstempel") or "").strip()
def _message_within_retention(m: dict, cutoff_str: str) -> bool:
"""Behalten wenn Zeitstempel unbekannt oder >= cutoff (ISO-artige Strings)."""
t = _msg_timestamp_for_retention(m)
if not t:
return True
return t >= cutoff_str
def _prune_messages_by_retention(messages: list[dict]) -> tuple[list[dict], int]:
cutoff_str = time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(time.time() - EMPFANG_MESSAGE_RETENTION_DAYS * 86400),
)
kept = [m for m in messages if _message_within_retention(m, cutoff_str)]
return kept, len(messages) - len(kept)
def _load_messages() -> list[dict]:
2026-05-05 23:36:13 +02:00
messages = _load_json(_EMPFANG_FILE, [])
kept, removed = _prune_messages_by_retention(messages)
if removed > 0:
_save_messages(kept)
return kept
def _save_messages(messages: list[dict]):
_save_json(_EMPFANG_FILE, messages)
def _msg_practice(m: dict) -> str:
2026-04-21 10:00:36 +02:00
return m.get("practice_id") or _LEGACY_DEFAULT_PID
def _filter_by_practice(messages: list[dict], pid: str) -> list[dict]:
return [m for m in messages if _msg_practice(m) == pid]
# =====================================================================
# Tasks (practice-scoped, server-side)
# =====================================================================
def _load_tasks() -> list[dict]:
return _load_json(_TASKS_FILE, [])
def _save_tasks(tasks: list[dict]):
_save_json(_TASKS_FILE, tasks)
# =====================================================================
# Channels (Kanaele, practice-scoped)
# =====================================================================
def _load_channels() -> list[dict]:
return _load_json(_CHANNELS_FILE, [])
def _save_channels(channels: list[dict]):
_save_json(_CHANNELS_FILE, channels)
_DEFAULT_CHANNEL_DEFS = [
{"name": "Allgemein", "scope": "internal", "channel_type": "group", "allowed_roles": []},
{"name": "Aerzte", "scope": "internal", "channel_type": "group", "allowed_roles": ["arzt", "admin"]},
{"name": "MPA", "scope": "internal", "channel_type": "group", "allowed_roles": ["mpa", "admin"]},
{"name": "Empfang", "scope": "internal", "channel_type": "group", "allowed_roles": ["empfang", "admin"]},
]
def _ensure_default_channels(practice_id: str):
channels = _load_channels()
practice_channels = [c for c in channels if c.get("practice_id") == practice_id]
if practice_channels:
return
now = time.strftime("%Y-%m-%d %H:%M:%S")
for defn in _DEFAULT_CHANNEL_DEFS:
channels.append({
"channel_id": uuid.uuid4().hex[:12],
"practice_id": practice_id,
"name": defn["name"],
"scope": defn["scope"],
"channel_type": defn["channel_type"],
"allowed_roles": defn["allowed_roles"],
"connection_id": "",
"created": now,
"created_by": "",
})
_save_channels(channels)
# =====================================================================
# Connections / Federation (Praxis-zu-Praxis)
# =====================================================================
def _load_connections() -> list[dict]:
return _load_json(_CONNECTIONS_FILE, [])
def _save_connections(conns: list[dict]):
_save_json(_CONNECTIONS_FILE, conns)
# =====================================================================
# AUTH ENDPOINTS
# =====================================================================
@router.post("/auth/setup")
async def auth_setup(request: Request):
2026-04-21 10:00:36 +02:00
"""Erstellt den ersten Admin-Benutzer und eine neue Praxis."""
try:
body = await request.json()
except Exception:
body = {}
name = (body.get("name") or "").strip()
password = (body.get("password") or "").strip()
2026-04-21 10:00:36 +02:00
practice_name = (body.get("practice_name") or "").strip() or "Meine Praxis"
admin_email = (body.get("email") or "").strip()
2026-04-21 10:00:36 +02:00
pid = (body.get("practice_id") or "").strip()
if not name or not password or len(password) < 4:
raise HTTPException(status_code=400,
detail="Name und Passwort (min. 4 Zeichen) erforderlich")
2026-04-21 10:00:36 +02:00
if not pid:
pid = _generate_practice_id()
practice = _ensure_practice(pid, name=practice_name, admin_email=admin_email)
accounts = _load_accounts()
practice_accounts = [a for a in accounts.values()
if a.get("practice_id") == pid]
if practice_accounts:
raise HTTPException(status_code=409,
detail="Setup bereits abgeschlossen. Bitte Login verwenden.")
if practice_name:
practices = _load_practices()
2026-04-21 10:00:36 +02:00
practices[pid]["name"] = practice_name
if admin_email:
2026-04-21 10:00:36 +02:00
practices[pid]["admin_email"] = admin_email
_save_practices(practices)
2026-04-21 10:00:36 +02:00
practice = practices[pid]
uid = uuid.uuid4().hex[:12]
pw_hash, pw_salt = _hash_password(password)
now = time.strftime("%Y-%m-%d %H:%M:%S")
accounts[uid] = {
"user_id": uid,
2026-04-21 10:00:36 +02:00
"practice_id": pid,
"display_name": name,
"email": admin_email,
"role": "admin",
"pw_hash": pw_hash,
"pw_salt": pw_salt,
"created": now,
"status": "active",
"last_login": now,
}
_save_accounts(accounts)
2026-04-21 10:00:36 +02:00
_ensure_default_channels(pid)
ua = request.headers.get("User-Agent", "")
ip = _extract_client_ip(request)
dev_id = _make_device_id(uid, ua)
2026-04-21 10:00:36 +02:00
token = _create_session(uid, pid, name, "admin",
device_id=dev_id, user_agent=ua, ip_addr=ip)
resp = JSONResponse(content={
"success": True, "user_id": uid, "role": "admin",
2026-04-21 10:00:36 +02:00
"display_name": name, "practice_id": pid,
"practice_name": practice.get("name", ""),
"invite_code": practice.get("invite_code", ""),
})
resp.set_cookie("aza_session", token, httponly=True, samesite="lax",
max_age=SESSION_MAX_AGE)
return resp
@router.post("/auth/login")
async def auth_login(request: Request):
2026-04-21 10:00:36 +02:00
"""Login mit Benutzername (Anzeigename) oder E-Mail + Passwort, mandantenbewusst."""
try:
body = await request.json()
except Exception:
body = {}
2026-04-21 10:00:36 +02:00
raw = (body.get("name") or "").strip()
password = (body.get("password") or "").strip()
2026-05-08 14:53:53 +02:00
pid, pid_src = _practice_id_for_login(request, body)
if pid_src == "invite_invalid":
raise HTTPException(
status_code=403,
detail="Ungueltiger Einladungscode — bitte aktuellen Link aus der Hauptinstallation verwenden.",
)
2026-04-21 10:00:36 +02:00
if not raw or not password:
raise HTTPException(
status_code=400,
detail="Benutzername oder E-Mail und Passwort erforderlich",
)
accounts = _load_accounts()
2026-04-21 10:00:36 +02:00
scoped = (
[a for a in accounts.values() if a.get("practice_id") == pid]
if pid
else list(accounts.values())
)
target = None
2026-04-21 10:00:36 +02:00
if _is_likely_email(raw):
em = _norm_email(raw)
matches = [
a
for a in scoped
if em and _norm_email(a.get("email") or "") == em
]
if len(matches) == 1:
target = matches[0]
elif len(matches) == 0:
target = None
else:
if pid:
raise HTTPException(
status_code=409,
detail={
"code": "ambiguous_email",
"message": (
"Mehrere Benutzer mit dieser E-Mail in dieser Praxis. "
"Bitte melden Sie sich mit Ihrem Benutzernamen an."
),
"candidates": [
{"display_name": (a.get("display_name") or "")}
for a in matches
],
},
)
raise HTTPException(
status_code=409,
detail={
"code": "ambiguous_email",
"message": (
"Diese E-Mail ist mehreren Konten zugeordnet. "
"Bitte melden Sie sich mit Ihrem Benutzernamen an."
),
},
)
else:
matches = [a for a in scoped if (a.get("display_name") or "") == raw]
if len(matches) == 1:
target = matches[0]
elif len(matches) == 0:
target = None
else:
raise HTTPException(
status_code=409,
detail={
"code": "ambiguous_username",
"message": (
"Dieser Benutzername ist mehrdeutig. Bitte wenden Sie sich an Ihre Praxis."
),
},
)
if not target:
2026-04-21 10:00:36 +02:00
raise HTTPException(
status_code=401,
detail="Benutzer nicht gefunden oder falsches Passwort",
)
2026-05-08 14:53:53 +02:00
tpid = (target.get("practice_id") or "").strip()
if pid and tpid != pid:
raise HTTPException(
status_code=403,
detail=(
"Anmeldung passt nicht zur gewaehlten Praxis (Einladungscode / gespeicherte Praxis-ID). "
"Bitte den Einladungslink der Hauptinstallation erneut oeffnen."
),
)
2026-04-21 10:00:36 +02:00
if not pid:
2026-05-08 14:53:53 +02:00
pid = tpid
if target.get("status") == "deactivated":
2026-04-21 10:00:36 +02:00
raise HTTPException(
status_code=403,
detail="Konto deaktiviert. Bitte Administrator kontaktieren.",
)
if not _verify_password(password, target["pw_hash"], target["pw_salt"]):
2026-04-21 10:00:36 +02:00
raise HTTPException(
status_code=401,
detail="Benutzer nicht gefunden oder falsches Passwort",
)
now = time.strftime("%Y-%m-%d %H:%M:%S")
target["last_login"] = now
_save_accounts(accounts)
2026-04-21 10:00:36 +02:00
dn = (target.get("display_name") or raw).strip()
ua = request.headers.get("User-Agent", "")
ip = _extract_client_ip(request)
dev_id = body.get("device_id") or _make_device_id(target["user_id"], ua)
2026-04-21 10:00:36 +02:00
token = _create_session(
target["user_id"], pid, dn, target["role"],
device_id=dev_id, user_agent=ua, ip_addr=ip,
)
2026-05-08 14:53:53 +02:00
bind_src = (
"invite_code"
if pid_src == "invite"
else (
"stored_practice_id"
if pid_src in ("body", "header", "query")
else "account"
)
)
result = {
2026-04-21 10:00:36 +02:00
"success": True,
"user_id": target["user_id"],
"role": target["role"],
"display_name": dn,
"practice_id": pid,
2026-05-08 14:53:53 +02:00
"practice_bind_source": bind_src,
}
if target.get("must_change_password"):
result["must_change_password"] = True
resp = JSONResponse(content=result)
2026-04-21 10:00:36 +02:00
resp.set_cookie(
"aza_session", token, httponly=True, samesite="lax",
max_age=SESSION_MAX_AGE,
)
return resp
@router.post("/auth/register")
async def auth_register(request: Request):
"""Neuen Benutzer registrieren mit Einladungscode."""
try:
body = await request.json()
except Exception:
body = {}
invite_code = (body.get("invite_code") or "").strip()
name = (body.get("name") or "").strip()
password = (body.get("password") or "").strip()
role = (body.get("role") or "mpa").strip()
email = (body.get("email") or "").strip()
if not invite_code or not name or not password or len(password) < 4:
raise HTTPException(status_code=400,
detail="Einladungscode, Name und Passwort (min. 4 Zeichen) erforderlich")
if role not in ("arzt", "mpa", "empfang"):
role = "mpa"
2026-05-08 14:53:53 +02:00
target_pid = _lookup_practice_id_by_invite(invite_code)
if not target_pid:
raise HTTPException(status_code=403, detail="Ungueltiger Einladungscode")
accounts = _load_accounts()
exists = any(a["display_name"] == name and a.get("practice_id") == target_pid
for a in accounts.values())
if exists:
raise HTTPException(status_code=409, detail="Benutzername bereits vergeben")
uid = uuid.uuid4().hex[:12]
pw_hash, pw_salt = _hash_password(password)
now = time.strftime("%Y-%m-%d %H:%M:%S")
accounts[uid] = {
"user_id": uid,
"practice_id": target_pid,
"display_name": name,
"email": email,
"role": role,
"pw_hash": pw_hash,
"pw_salt": pw_salt,
"created": now,
"status": "active",
"last_login": now,
}
_save_accounts(accounts)
_ensure_default_channels(target_pid)
ua = request.headers.get("User-Agent", "")
ip = _extract_client_ip(request)
dev_id = _make_device_id(uid, ua)
token = _create_session(uid, target_pid, name, role,
device_id=dev_id, user_agent=ua, ip_addr=ip)
resp = JSONResponse(content={
"success": True, "user_id": uid, "role": role,
"display_name": name, "practice_id": target_pid,
2026-05-08 14:53:53 +02:00
"practice_bind_source": "invite_code",
})
resp.set_cookie("aza_session", token, httponly=True, samesite="lax",
max_age=SESSION_MAX_AGE)
return resp
@router.get("/auth/me")
async def auth_me(request: Request):
"""Aktuelle Session pruefen. Liefert User-Daten oder 401."""
s = _session_from_request(request)
if not s:
return JSONResponse(status_code=401, content={"authenticated": False})
return JSONResponse(content={
"authenticated": True,
"user_id": s["user_id"],
"display_name": s["display_name"],
"role": s["role"],
"practice_id": s["practice_id"],
})
@router.post("/auth/logout")
async def auth_logout(request: Request):
token = request.cookies.get("aza_session", "")
2026-05-08 14:53:53 +02:00
s = _session_from_request(request)
if s:
try:
_presence_clear_user(
(s.get("practice_id") or "").strip(),
(s.get("user_id") or "").strip(),
)
except Exception:
pass
_delete_session(token)
resp = JSONResponse(content={"success": True})
resp.delete_cookie("aza_session")
return resp
2026-05-08 14:53:53 +02:00
@router.get("/auth/resolve_invite")
async def auth_resolve_invite(code: str = Query("")):
"""Loesst einen Chat-Einladungscode in practice_id auf (ohne Login). Fuer Browser-Start mit ?invite=."""
raw = (code or "").strip()
if not raw:
return JSONResponse(content={"valid": False})
pid = _lookup_practice_id_by_invite(raw)
if not pid:
return JSONResponse(
content={"valid": False, "detail": "Ungueltiger oder veralteter Einladungscode"},
)
practices = _load_practices()
pdata = practices.get(pid, {})
return JSONResponse(content={
"valid": True,
"practice_id": pid,
"practice_name": (pdata.get("name") or "").strip(),
"invite_code": (pdata.get("invite_code") or "").strip(),
})
@router.post("/auth/regenerate_invite")
async def auth_regenerate_invite(request: Request):
2026-04-21 10:00:36 +02:00
"""Erzeugt einen neuen Chat-Einladungscode (Admin-Session oder API-Token)."""
api_token = request.headers.get("X-API-Token", "")
s = _session_from_request(request)
if s:
if s.get("role") != "admin":
raise HTTPException(status_code=403, detail="Nur Admin darf Einladungscode erneuern")
pid = s["practice_id"]
elif api_token:
pid = request.headers.get("X-Practice-Id", "").strip()
if not pid:
raise HTTPException(status_code=400, detail="X-Practice-Id Header erforderlich")
else:
raise HTTPException(status_code=401, detail="Nicht authentifiziert")
_ensure_practice(pid)
practices = _load_practices()
if pid in practices:
2026-04-21 10:00:36 +02:00
practices[pid]["invite_code"] = _generate_chat_invite_code()
_save_practices(practices)
return JSONResponse(content={
"success": True,
"invite_code": practices.get(pid, {}).get("invite_code", ""),
})
2026-04-21 10:00:36 +02:00
@router.post("/auth/provision")
async def auth_provision(request: Request):
"""Provisioning: Desktop-App erstellt/findet Server-Account.
Authentifiziert via X-API-Token (Backend-Token), nicht via Session.
Erstellt bei Bedarf eine neue Praxis mit echter practice_id."""
api_token = request.headers.get("X-API-Token", "")
if not api_token:
raise HTTPException(status_code=401, detail="API-Token erforderlich")
try:
body = await request.json()
except Exception:
body = {}
2026-04-21 10:00:36 +02:00
name = (body.get("name") or "").strip()
email = (body.get("email") or "").strip()
password = (body.get("password") or "").strip()
practice_name = (body.get("practice_name") or "").strip()
2026-05-05 23:36:13 +02:00
invite_code_in = (body.get("invite_code") or "").strip()
2026-04-21 10:00:36 +02:00
if not name:
raise HTTPException(status_code=400, detail="Name erforderlich")
if not password or len(password) < 4:
raise HTTPException(status_code=400, detail="Passwort (min. 4 Zeichen) erforderlich")
2026-05-05 23:36:13 +02:00
pid = ""
# Bewusster Beitritt zu einem bestehenden Praxis-Chat per Einladungscode:
# ueberschreibt X-Practice-Id / practice_id im Body — sonst legt jedes neue
# Geraet ohne gespeicherte practice_id eine eigene Praxis an (Realbefund).
if invite_code_in:
practices = _load_practices()
2026-05-06 22:43:22 +02:00
want = _invite_code_key(invite_code_in)
2026-05-05 23:36:13 +02:00
target_pid = None
for pida, pdata in practices.items():
2026-05-06 22:43:22 +02:00
if _invite_code_key(pdata.get("invite_code")) == want:
2026-05-05 23:36:13 +02:00
target_pid = pida
break
if not target_pid:
raise HTTPException(
status_code=403,
detail="Ungueltiger Chat-Einladungscode — Praxis nicht gefunden.",
)
pid = target_pid
else:
pid = request.headers.get("X-Practice-Id", "").strip()
pid = pid or (body.get("practice_id") or "").strip()
2026-04-21 10:00:36 +02:00
if not pid and email:
try:
from stripe_routes import lookup_practice_id_for_license_email
lp = lookup_practice_id_for_license_email(email)
if lp:
pid = lp.strip()
except Exception as exc:
print(f"[EMPFANG] lookup_practice_id_for_license_email: {exc}")
if not pid:
practices = _load_practices()
has_legacy = _LEGACY_DEFAULT_PID in practices
accounts = _load_accounts()
has_legacy_accounts = any(
a.get("practice_id") == _LEGACY_DEFAULT_PID for a in accounts.values())
if has_legacy or has_legacy_accounts:
pid = _generate_practice_id()
_migrate_legacy_to_practice(pid)
else:
pid = _generate_practice_id()
practice = _ensure_practice(pid, name=practice_name or "Meine Praxis",
admin_email=email)
if practice_name:
practices = _load_practices()
practices[pid]["name"] = practice_name
if email:
practices[pid]["admin_email"] = email
_save_practices(practices)
accounts = _load_accounts()
target = None
2026-04-21 10:00:36 +02:00
email_lower = email.lower() if email else ""
for a in accounts.values():
2026-04-21 10:00:36 +02:00
if a.get("practice_id") != pid:
continue
if email_lower and (a.get("email") or "").strip().lower() == email_lower:
target = a
break
2026-04-21 10:00:36 +02:00
if a["display_name"] == name:
target = a
break
if target:
pw_hash, pw_salt = _hash_password(password)
target["pw_hash"] = pw_hash
target["pw_salt"] = pw_salt
if email:
target["email"] = email
target["display_name"] = name
if not target.get("role") or target.get("role") == "mpa":
has_admin = any(a.get("role") == "admin" and a.get("practice_id") == pid
for a in accounts.values())
if not has_admin:
target["role"] = "admin"
_save_accounts(accounts)
return JSONResponse(content={
"success": True, "user_id": target["user_id"],
"display_name": target["display_name"], "role": target["role"],
"practice_id": pid,
"action": "updated",
})
has_admin = any(a.get("role") == "admin" and a.get("practice_id") == pid
for a in accounts.values())
role = "admin" if not has_admin else "arzt"
uid = uuid.uuid4().hex[:12]
pw_hash, pw_salt = _hash_password(password)
accounts[uid] = {
"user_id": uid,
"practice_id": pid,
"display_name": name,
"email": email,
2026-04-21 10:00:36 +02:00
"role": role,
"pw_hash": pw_hash,
"pw_salt": pw_salt,
"status": "active",
"created": time.strftime("%Y-%m-%d %H:%M:%S"),
}
2026-04-21 10:00:36 +02:00
_save_accounts(accounts)
return JSONResponse(content={
"success": True, "user_id": uid,
"display_name": name, "role": role,
"practice_id": pid,
"action": "created",
})
@router.post("/auth/forgot_password")
async def auth_forgot_password(request: Request):
"""Passwort-Reset: Benutzername oder E-Mail; bei mehreren Konten mit gleicher E-Mail zweiter Schritt."""
try:
body = await request.json()
except Exception:
body = {}
raw = (body.get("login") or body.get("email") or body.get("name") or "").strip()
chosen_display_name = (body.get("display_name") or body.get("username") or "").strip()
pid = _practice_id_from_client(request, body)
if not raw:
raise HTTPException(
status_code=400, detail="Benutzername oder E-Mail erforderlich",
)
accounts = _load_accounts()
practices = _load_practices()
_neutral = {
"success": True,
"step": "sent",
"message": (
"Wenn ein passendes Konto existiert, wurde ein Link an die hinterlegte "
"E-Mail-Adresse gesendet."
),
}
# Zweiter Schritt: E-Mail + gewählter Benutzername (eindeutig)
if chosen_display_name and _is_likely_email(raw):
if not pid:
raise HTTPException(
status_code=400,
detail=(
"Praxis-Kontext fehlt. Bitte laden Sie die Seite neu oder "
"kontaktieren Sie Ihre Praxis."
),
)
em = _norm_email(raw)
scoped = [a for a in accounts.values() if a.get("practice_id") == pid]
picked = [
a
for a in scoped
if _norm_email(a.get("email") or "") == em
and (a.get("display_name") or "") == chosen_display_name
]
if len(picked) == 1:
return JSONResponse(content=_send_reset_for_account(picked[0]))
return JSONResponse(content=_neutral)
if _is_likely_email(raw):
em = _norm_email(raw)
if pid:
scoped = [a for a in accounts.values() if a.get("practice_id") == pid]
matches = [a for a in scoped if _norm_email(a.get("email") or "") == em]
else:
matches = [
a for a in accounts.values()
if _norm_email(a.get("email") or "") == em
]
if len(matches) == 0:
return JSONResponse(content=_neutral)
if len(matches) == 1:
return JSONResponse(content=_send_reset_for_account(matches[0]))
cands = []
for a in matches:
pida = a.get("practice_id", "")
cands.append({
"display_name": a.get("display_name", ""),
"practice_id": pida,
"practice_name": _practice_label(practices, pida),
})
return JSONResponse(
content={
"success": True,
"step": "pick_user",
"login": raw,
"candidates": cands,
},
)
if pid:
scoped = [a for a in accounts.values() if a.get("practice_id") == pid]
matches = [a for a in scoped if (a.get("display_name") or "") == raw]
else:
matches = [
a for a in accounts.values()
if (a.get("display_name") or "") == raw
]
if len(matches) == 0:
return JSONResponse(content=_neutral)
if len(matches) > 1:
return JSONResponse(
content={
"success": False,
"step": "ambiguous_practice",
"message": (
"Dieser Benutzername ist in mehreren Praxen registriert. Bitte "
"setzen Sie das Passwort über Ihre E-Mail-Adresse zurück oder "
"wenden Sie sich an Ihre Praxis."
),
},
)
return JSONResponse(content=_send_reset_for_account(matches[0]))
@router.get("/auth/reset_verify")
async def auth_reset_verify(reset_token: str = Query("")):
"""Prüft, ob ein Reset-Token noch gültig ist (ohne Verbrauch)."""
token = (reset_token or "").strip()
if not token:
return JSONResponse(
content={"valid": False, "detail": "Kein Reset-Token angegeben."}
)
resets = _load_json(_DATA_DIR / "empfang_resets.json", {})
entry = resets.get(token)
if not entry:
return JSONResponse(
content={"valid": False, "detail": "Ungültiger oder abgelaufener Link."}
)
if time.time() - entry.get("created", 0) > 3600:
return JSONResponse(
content={
"valid": False,
"detail": "Der Link ist abgelaufen (max. 1 Stunde).",
}
)
email = (entry.get("email") or "").strip()
accounts = _load_accounts()
uid = entry.get("user_id")
acc = accounts.get(uid) if uid else None
display_name = (entry.get("display_name") or (acc or {}).get("display_name") or "").strip()
return JSONResponse(
content={"valid": True, "email": email, "display_name": display_name}
)
@router.post("/auth/reset_password")
async def auth_reset_password(request: Request):
2026-04-21 10:00:36 +02:00
"""Setzt das Passwort mit einem gültigen Reset-Token."""
try:
body = await request.json()
except Exception:
body = {}
token = (body.get("reset_token") or "").strip()
new_password = (body.get("password") or "").strip()
if not token or not new_password or len(new_password) < 4:
raise HTTPException(status_code=400,
detail="Reset-Token und neues Passwort (min. 4 Zeichen) erforderlich")
resets = _load_json(_DATA_DIR / "empfang_resets.json", {})
entry = resets.get(token)
if not entry:
2026-04-21 10:00:36 +02:00
raise HTTPException(
status_code=400, detail="Ungültiger oder abgelaufener Reset-Link"
)
if time.time() - entry.get("created", 0) > 3600:
del resets[token]
_save_json(_DATA_DIR / "empfang_resets.json", resets)
2026-04-21 10:00:36 +02:00
raise HTTPException(
status_code=400, detail="Reset-Link ist abgelaufen (max. 1 Stunde)"
)
user_id = entry["user_id"]
accounts = _load_accounts()
if user_id not in accounts:
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
pw_hash, pw_salt = _hash_password(new_password)
accounts[user_id]["pw_hash"] = pw_hash
accounts[user_id]["pw_salt"] = pw_salt
accounts[user_id].pop("must_change_password", None)
_save_accounts(accounts)
del resets[token]
_save_json(_DATA_DIR / "empfang_resets.json", resets)
2026-04-21 10:00:36 +02:00
acc = accounts[user_id]
email_hint = (acc.get("email") or "").strip()
dn_hint = (acc.get("display_name") or "").strip()
return JSONResponse(
content={
"success": True,
"message": "Passwort wurde erfolgreich geändert.",
"email": email_hint,
"display_name": dn_hint,
}
)
2026-04-21 10:00:36 +02:00
def _reset_email_subject_body(display_name: str, reset_link: str) -> Tuple[str, str, str]:
"""Betreff, Plain-Text, HTML für Passwort-Reset."""
subject = "AZA Praxis-Chat Passwort zurücksetzen"
text = (
f"Hallo {display_name},\n\n"
2026-04-21 10:00:36 +02:00
f"Sie haben eine Passwort-Zurücksetzung angefordert.\n\n"
f"Klicken Sie auf diesen Link, um Ihr Passwort neu zu setzen:\n"
f"{reset_link}\n\n"
2026-04-21 10:00:36 +02:00
f"Der Link ist 1 Stunde gültig.\n\n"
f"Falls Sie diese Anfrage nicht gestellt haben, ignorieren Sie diese E-Mail.\n\n"
f"AZA Praxis-Chat"
)
html = (
f"<div style='font-family:Segoe UI,sans-serif;max-width:480px;margin:0 auto'>"
2026-04-21 10:00:36 +02:00
f"<h2 style='color:#5B8DB3'>Passwort zurücksetzen</h2>"
f"<p>Hallo {display_name},</p>"
2026-04-21 10:00:36 +02:00
f"<p>Sie haben eine Passwort-Zurücksetzung angefordert.</p>"
f"<p><a href='{reset_link}' style='display:inline-block;background:#5B8DB3;"
f"color:white;padding:10px 24px;border-radius:6px;text-decoration:none;"
2026-04-22 22:33:46 +02:00
f"font-weight:600'>Neues Passwort wählen</a></p>"
2026-04-21 10:00:36 +02:00
f"<p style='color:#888;font-size:13px'>Der Link ist 1 Stunde gültig.</p>"
f"<p style='color:#888;font-size:12px'>Falls Sie diese Anfrage nicht gestellt haben, "
f"ignorieren Sie diese E-Mail.</p></div>"
)
2026-04-21 10:00:36 +02:00
return subject, text, html
def _send_reset_via_resend(to_email: str, subject: str, text: str, html: str) -> bool:
"""Resend HTTP API (gleiche Umgebung wie Lizenz-Mail in stripe_routes)."""
import json
import urllib.error
import urllib.request
api_key = os.environ.get("RESEND_API_KEY", "").strip()
sender = os.environ.get("MAIL_FROM", "AZA MedWork <noreply@aza-medwork.ch>").strip()
if not api_key:
return False
payload = json.dumps({
"from": sender,
"to": [to_email],
"subject": subject,
"html": html,
"text": text,
}).encode("utf-8")
req = urllib.request.Request(
"https://api.resend.com/emails",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": "AZA-MedWork/1.0",
},
method="POST",
)
try:
2026-04-21 10:00:36 +02:00
with urllib.request.urlopen(req, timeout=15) as resp:
if resp.status in (200, 201):
print(f"[RESET-MAIL] Resend OK -> {to_email}")
return True
body = resp.read().decode()[:300]
print(f"[RESET-MAIL] Resend HTTP {resp.status}: {body}")
return False
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")[:300] if exc.fp else ""
print(f"[RESET-MAIL] Resend HTTP {exc.code}: {body}")
return False
except Exception as exc:
2026-04-21 10:00:36 +02:00
print(f"[RESET-MAIL] Resend {type(exc).__name__}: {exc}")
return False
def _send_reset_email(to_email: str, display_name: str, reset_link: str):
"""Sendet Passwort-Reset: zuerst SMTP (falls vollstaendig), sonst Resend API."""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
subject, text, html = _reset_email_subject_body(display_name, reset_link)
host = os.environ.get("SMTP_HOST", "").strip()
port_str = os.environ.get("SMTP_PORT", "587").strip()
user = os.environ.get("SMTP_USER", "").strip()
password = os.environ.get("SMTP_PASS", "").strip()
sender = os.environ.get("SMTP_FROM", "").strip() or user
if all([host, user, password]):
try:
msg = MIMEMultipart("alternative")
msg["From"] = sender
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(text, "plain", "utf-8"))
msg.attach(MIMEText(html, "html", "utf-8"))
port = int(port_str)
if port == 465:
with smtplib.SMTP_SSL(host, port, timeout=15) as srv:
srv.login(user, password)
srv.sendmail(sender, [to_email], msg.as_string())
else:
with smtplib.SMTP(host, port, timeout=15) as srv:
srv.ehlo()
srv.starttls()
srv.ehlo()
srv.login(user, password)
srv.sendmail(sender, [to_email], msg.as_string())
print(f"[RESET-MAIL] SMTP OK -> {to_email}")
return
except Exception as exc:
print(f"[RESET-MAIL] SMTP FEHLER: {exc} versuche Resend …")
if _send_reset_via_resend(to_email, subject, text, html):
return
print(
"[RESET-MAIL] Weder SMTP noch Resend erfolgreich. "
"Setzen Sie SMTP_HOST/SMTP_USER/SMTP_PASS oder RESEND_API_KEY (+ MAIL_FROM). "
f"Reset-Link (nur Server-Log): {reset_link}"
)
@router.get("/auth/needs_setup")
2026-04-21 10:00:36 +02:00
async def auth_needs_setup(request: Request):
"""Pruefen ob Setup noetig ist (keine Accounts vorhanden)."""
2026-04-21 10:00:36 +02:00
pid = _resolve_practice_id(request)
if not pid:
accounts = _load_accounts()
return JSONResponse(content={
"needs_setup": len(accounts) == 0,
"invite_code": "",
})
_ensure_practice(pid)
accounts = _load_accounts()
2026-04-21 10:00:36 +02:00
has_accounts = any(a.get("practice_id") == pid for a in accounts.values())
practices = _load_practices()
2026-04-21 10:00:36 +02:00
invite_code = practices.get(pid, {}).get("invite_code", "")
return JSONResponse(content={
"needs_setup": not has_accounts,
"invite_code": invite_code if not has_accounts else "",
})
# =====================================================================
# ADMIN ENDPOINTS (nur Rolle admin)
# =====================================================================
def _require_admin(request: Request) -> dict:
s = _require_session(request)
if s.get("role") != "admin":
raise HTTPException(status_code=403, detail="Admin-Berechtigung erforderlich")
return s
@router.get("/admin/users")
async def admin_list_users(request: Request):
"""Alle Benutzer der Praxis mit vollen Details."""
s = _require_admin(request)
pid = s["practice_id"]
accounts = _load_accounts()
result = []
for a in accounts.values():
if a.get("practice_id") != pid:
continue
result.append({
"user_id": a["user_id"],
"display_name": a["display_name"],
"role": a.get("role", "mpa"),
"status": a.get("status", "active"),
"created": a.get("created", ""),
"last_login": a.get("last_login", ""),
"email": a.get("email", ""),
})
return JSONResponse(content={"success": True, "users": result})
@router.post("/admin/users/{user_id}/role")
async def admin_change_role(user_id: str, request: Request):
"""Rolle eines Benutzers aendern."""
s = _require_admin(request)
try:
body = await request.json()
except Exception:
body = {}
new_role = (body.get("role") or "").strip()
if not new_role:
raise HTTPException(status_code=400, detail="Rolle erforderlich")
if new_role not in ("admin", "arzt", "mpa", "empfang"):
raise HTTPException(status_code=400, detail="Ungueltige Rolle")
if user_id == s["user_id"] and new_role != "admin":
raise HTTPException(status_code=400,
detail="Eigene Admin-Rolle kann nicht entfernt werden")
accounts = _load_accounts()
if user_id not in accounts:
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
if accounts[user_id].get("practice_id") != s["practice_id"]:
raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis")
accounts[user_id]["role"] = new_role
_save_accounts(accounts)
return JSONResponse(content={"success": True, "user_id": user_id, "role": new_role})
@router.post("/admin/users/{user_id}/deactivate")
async def admin_deactivate_user(user_id: str, request: Request):
"""Benutzer deaktivieren und alle Sessions loeschen."""
s = _require_admin(request)
if user_id == s["user_id"]:
raise HTTPException(status_code=400, detail="Eigenes Konto kann nicht deaktiviert werden")
accounts = _load_accounts()
if user_id not in accounts:
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
if accounts[user_id].get("practice_id") != s["practice_id"]:
raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis")
accounts[user_id]["status"] = "deactivated"
_save_accounts(accounts)
sessions = _load_sessions()
sessions = {k: v for k, v in sessions.items() if v.get("user_id") != user_id}
_save_sessions(sessions)
return JSONResponse(content={"success": True, "user_id": user_id, "status": "deactivated"})
@router.post("/admin/users/{user_id}/activate")
async def admin_activate_user(user_id: str, request: Request):
"""Benutzer reaktivieren."""
s = _require_admin(request)
accounts = _load_accounts()
if user_id not in accounts:
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
if accounts[user_id].get("practice_id") != s["practice_id"]:
raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis")
accounts[user_id]["status"] = "active"
_save_accounts(accounts)
return JSONResponse(content={"success": True, "user_id": user_id, "status": "active"})
@router.delete("/admin/users/{user_id}")
async def admin_delete_user(user_id: str, request: Request):
"""Benutzer permanent loeschen inkl. Sessions und Geraete."""
s = _require_admin(request)
if user_id == s["user_id"]:
raise HTTPException(status_code=400, detail="Eigenes Konto kann nicht geloescht werden")
accounts = _load_accounts()
if user_id not in accounts:
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
if accounts[user_id].get("practice_id") != s["practice_id"]:
raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis")
2026-05-08 14:53:53 +02:00
acc_del = accounts.get(user_id)
if acc_del and acc_del.get("role") == "admin":
other_adm = [
a for a in accounts.values()
if a.get("practice_id") == s["practice_id"]
and a.get("user_id") != user_id
and a.get("role") == "admin"
]
if not other_adm:
raise HTTPException(
status_code=400,
detail="Letzter Administrator kann nicht geloescht werden.",
)
del accounts[user_id]
_save_accounts(accounts)
sessions = _load_sessions()
sessions = {k: v for k, v in sessions.items() if v.get("user_id") != user_id}
_save_sessions(sessions)
devices = _load_devices()
devices = {k: v for k, v in devices.items() if v.get("user_id") != user_id}
_save_devices(devices)
return JSONResponse(content={"success": True, "deleted": user_id})
@router.post("/admin/users/{user_id}/reset_password")
async def admin_reset_password(user_id: str, request: Request):
"""Temporaeres Passwort generieren. Benutzer muss es beim naechsten Login aendern."""
s = _require_admin(request)
accounts = _load_accounts()
if user_id not in accounts:
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
if accounts[user_id].get("practice_id") != s["practice_id"]:
raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis")
temp_pw = secrets.token_urlsafe(8)
pw_hash, pw_salt = _hash_password(temp_pw)
accounts[user_id]["pw_hash"] = pw_hash
accounts[user_id]["pw_salt"] = pw_salt
accounts[user_id]["must_change_password"] = True
_save_accounts(accounts)
return JSONResponse(content={
"success": True, "user_id": user_id,
"temp_password": temp_pw,
})
@router.get("/admin/devices")
async def admin_list_devices(request: Request):
"""Alle Geraete aller Benutzer der Praxis."""
s = _require_admin(request)
pid = s["practice_id"]
devices = _load_devices()
accounts = _load_accounts()
user_names = {a["user_id"]: a["display_name"] for a in accounts.values()}
result = []
for d in devices.values():
if d.get("practice_id") != pid:
continue
entry = dict(d)
entry["user_name"] = user_names.get(d.get("user_id"), d.get("user_id", ""))
result.append(entry)
result.sort(key=lambda d: d.get("last_active", ""), reverse=True)
return JSONResponse(content={"success": True, "devices": result})
@router.post("/admin/devices/{device_id}/block")
async def admin_block_device(device_id: str, request: Request):
"""Geraet blockieren und zugehoerige Sessions loeschen."""
s = _require_admin(request)
devices = _load_devices()
if device_id not in devices:
raise HTTPException(status_code=404, detail="Geraet nicht gefunden")
dev = devices[device_id]
if dev.get("practice_id") != s["practice_id"]:
raise HTTPException(status_code=403, detail="Geraet gehoert zu anderer Praxis")
dev["trust_status"] = "blocked"
_save_devices(devices)
sessions = _load_sessions()
sessions = {k: v for k, v in sessions.items()
if v.get("device_id") != device_id}
_save_sessions(sessions)
return JSONResponse(content={"success": True, "device_id": device_id, "trust_status": "blocked"})
@router.delete("/admin/devices/{device_id}")
async def admin_delete_device(device_id: str, request: Request):
"""Geraetedatensatz loeschen."""
s = _require_admin(request)
devices = _load_devices()
if device_id not in devices:
raise HTTPException(status_code=404, detail="Geraet nicht gefunden")
if devices[device_id].get("practice_id") != s["practice_id"]:
raise HTTPException(status_code=403, detail="Geraet gehoert zu anderer Praxis")
del devices[device_id]
_save_devices(devices)
return JSONResponse(content={"success": True, "deleted": device_id})
@router.post("/admin/users/{user_id}/logout_all")
async def admin_logout_all(user_id: str, request: Request):
"""Alle Sessions eines Benutzers loeschen."""
s = _require_admin(request)
accounts = _load_accounts()
if user_id not in accounts:
raise HTTPException(status_code=404, detail="Benutzer nicht gefunden")
if accounts[user_id].get("practice_id") != s["practice_id"]:
raise HTTPException(status_code=403, detail="Benutzer gehoert zu anderer Praxis")
sessions = _load_sessions()
removed = sum(1 for v in sessions.values() if v.get("user_id") == user_id)
sessions = {k: v for k, v in sessions.items() if v.get("user_id") != user_id}
_save_sessions(sessions)
return JSONResponse(content={"success": True, "user_id": user_id, "sessions_removed": removed})
# =====================================================================
# USER MANAGEMENT (admin only for invite/role changes)
# =====================================================================
2026-05-08 14:53:53 +02:00
def _attach_devices_to_practice_users(users: list, pid: str) -> None:
"""Last aktiv / Presence pro Benutzer; identisch zum frueheren API-Token-Zweig."""
devices = _load_devices()
user_devices: dict[str, list] = {}
for d in devices.values():
if d.get("practice_id") != pid:
continue
uid = d.get("user_id", "")
user_devices.setdefault(uid, []).append({
"device_name": d.get("device_name", ""),
"platform": d.get("platform", ""),
"last_active": d.get("last_active", ""),
"ip_last": d.get("ip_last", ""),
})
for u in users:
u["devices"] = user_devices.get(u.get("user_id", ""), [])
def _attach_presence_to_practice_users(users: list, pid: str) -> None:
"""Reichert users_full mit serverseitiger Session-Presence an (RAM, TTL)."""
pid = (pid or "").strip()
if not pid:
return
for u in users:
if not isinstance(u, dict):
continue
uid = str(u.get("user_id") or "").strip()
snap = _presence_snapshot_for_user(pid, uid)
u["presence_online"] = snap["presence_online"]
u["presence_last_seen"] = snap["presence_last_seen"]
u["presence_source"] = snap["presence_source"]
u["presence_age_seconds"] = snap["presence_age_seconds"]
@router.get("/users")
async def empfang_users(request: Request):
"""Liefert Benutzer der Praxis. Offen fuer alle authentifizierten + Legacy."""
s = _session_from_request(request)
if s:
users = _practice_users(s["practice_id"])
2026-05-08 14:53:53 +02:00
_attach_devices_to_practice_users(users, s["practice_id"])
_attach_presence_to_practice_users(users, s["practice_id"])
return JSONResponse(content={
"users": [u["display_name"] for u in users],
"users_full": users,
"practice_id": s["practice_id"],
})
2026-04-21 10:00:36 +02:00
api_token = request.headers.get("X-API-Token", "")
pid = _resolve_practice_id(request)
if not pid:
return JSONResponse(content={"users": [], "practice_id": ""})
users = _practice_users(pid)
if users:
2026-04-21 10:00:36 +02:00
result: dict = {
"users": [u["display_name"] for u in users],
"practice_id": pid,
2026-04-21 10:00:36 +02:00
}
if api_token:
2026-05-08 14:53:53 +02:00
_attach_devices_to_practice_users(users, pid)
_attach_presence_to_practice_users(users, pid)
2026-04-21 10:00:36 +02:00
result["users_full"] = users
return JSONResponse(content=result)
old_file = _DATA_DIR / "empfang_users.json"
if old_file.is_file():
try:
names = json.loads(old_file.read_text(encoding="utf-8"))
if isinstance(names, list):
return JSONResponse(content={"users": names, "practice_id": pid})
except Exception:
pass
return JSONResponse(content={"users": [], "practice_id": pid})
@router.post("/users")
async def empfang_register_user(request: Request):
"""Legacy-kompatibel: Benutzer anlegen/umbenennen/loeschen."""
try:
body = await request.json()
except Exception:
body = {}
name = (body.get("name") or "").strip()
action = (body.get("action") or "add").strip()
2026-04-21 10:00:36 +02:00
pid = _resolve_practice_id(request)
if not pid or not name:
return JSONResponse(content={"success": False})
accounts = _load_accounts()
if action == "delete":
2026-05-08 14:53:53 +02:00
sess = _session_from_request(request)
actor_name = (body.get("actor_display_name") or "").strip()
to_del = [uid for uid, a in accounts.items()
if a["display_name"] == name and a.get("practice_id") == pid]
2026-05-08 14:53:53 +02:00
if not to_del:
return JSONResponse(content={"success": False, "detail": "Benutzer nicht gefunden"}, status_code=404)
for uid in list(to_del):
acc = accounts.get(uid)
if not acc:
continue
if sess and acc.get("user_id") == sess.get("user_id"):
return JSONResponse(
content={"success": False, "detail": "Eigenes Konto kann nicht geloescht werden"},
status_code=400,
)
if actor_name and (acc.get("display_name") or "").strip() == actor_name:
return JSONResponse(
content={"success": False,
"detail": "Der aktive Benutzer kann hier nicht geloescht werden"},
status_code=400,
)
if acc.get("role") == "admin":
others = [
a for a in accounts.values()
if a.get("practice_id") == pid and a.get("role") == "admin"
and a.get("user_id") != uid
]
if not others:
return JSONResponse(
content={"success": False,
"detail": "Letzter Administrator kann nicht geloescht werden"},
status_code=400,
)
del accounts[uid]
_save_accounts(accounts)
elif action == "rename":
new_name = (body.get("new_name") or "").strip()
if new_name:
for a in accounts.values():
if a["display_name"] == name and a.get("practice_id") == pid:
a["display_name"] = new_name
_save_accounts(accounts)
2026-05-06 22:43:22 +02:00
elif action == "add_secure":
s = _session_from_request(request)
if not s:
raise HTTPException(status_code=401, detail="Nicht angemeldet")
sess_role = str(s.get("role") or "").strip().lower()
if sess_role not in ("admin", "empfang"):
raise HTTPException(status_code=403, detail="Keine Berechtigung Benutzer anzulegen")
pid = str(s.get("practice_id") or "").strip()
if not pid:
return JSONResponse(content={"success": False, "detail": "Keine Praxis"}, status_code=400)
pw = (body.get("password") or "").strip()
pw2 = (body.get("password_repeat") or body.get("password2") or "").strip()
if pw != pw2:
return JSONResponse(content={"success": False, "detail": "Passwoerter stimmen nicht ueberein"}, status_code=400)
if len(pw) < 4:
return JSONResponse(content={"success": False, "detail": "Passwort mindestens 4 Zeichen"}, status_code=400)
allowed_roles = {"mpa", "arzt"}
if sess_role == "admin":
allowed_roles.update({"admin", "empfang"})
role_new = str(body.get("role") or "mpa").strip().lower()
if role_new not in allowed_roles:
role_new = "mpa"
exists = any(a["display_name"] == name and a.get("practice_id") == pid
for a in accounts.values())
if not exists:
uid = uuid.uuid4().hex[:12]
pw_hash, pw_salt = _hash_password(pw)
accounts[uid] = {
"user_id": uid,
"practice_id": pid,
"display_name": name,
"role": role_new,
"pw_hash": pw_hash,
"pw_salt": pw_salt,
"created": time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "active",
"last_login": "",
"email": "",
}
_save_accounts(accounts)
else:
return JSONResponse(content={"success": False, "detail": "Name bereits vergeben"}, status_code=409)
else:
exists = any(a["display_name"] == name and a.get("practice_id") == pid
for a in accounts.values())
if not exists:
uid = uuid.uuid4().hex[:12]
pw_hash, pw_salt = _hash_password(name.lower())
accounts[uid] = {
"user_id": uid,
"practice_id": pid,
"display_name": name,
"role": "mpa",
"pw_hash": pw_hash,
"pw_salt": pw_salt,
"created": time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "active",
"last_login": "",
"email": "",
}
_save_accounts(accounts)
users = _practice_users(pid)
return JSONResponse(content={
"success": True,
"users": [u["display_name"] for u in users],
"practice_id": pid,
})
# =====================================================================
# MESSAGE ROUTES (practice-scoped)
# =====================================================================
2026-04-16 15:23:14 +02:00
class EmpfangMessage(BaseModel):
medikamente: str = ""
therapieplan: str = ""
procedere: str = ""
kommentar: str = ""
patient: str = ""
absender: str = ""
zeitstempel: str = ""
practice_id: str = ""
2026-04-16 15:23:14 +02:00
extras: dict = Field(default_factory=dict)
@router.post("/send")
async def empfang_send(msg: EmpfangMessage, request: Request):
s = _session_from_request(request)
2026-04-21 10:00:36 +02:00
pid = msg.practice_id.strip() or _resolve_practice_id(request)
if not pid:
raise HTTPException(status_code=400, detail="practice_id erforderlich")
absender = msg.absender.strip()
if s and not absender:
absender = s["display_name"]
2026-05-08 14:53:53 +02:00
# Strikte Direktchat-Validierung: wenn der Client explizit einen Direktchat
# markiert (audience=direct ODER recipient_user_id gesetzt) MUSS die
# Empfaenger-User-ID auf ein Konto in dieser Praxis aufloesbar sein.
# Verhindert: stille Speicherung als Allgemein, wenn der Browser/Desktop
# einen Direktchat anzeigt.
_ex_in = msg.extras or {}
_aud_in = str(_ex_in.get("audience") or "").strip().lower()
_ru_in = str(_ex_in.get("recipient_user_id") or "").strip()
_r_in = str(_ex_in.get("recipient") or "").strip()
_is_multi_in = (
isinstance(_ex_in.get("recipients"), list)
and len(_ex_in.get("recipients") or []) >= 2
) or ("," in _r_in and _r_in.count(",") >= 1)
_claims_dm = (_aud_in == "direct") or (_ru_in and not _is_multi_in)
if _claims_dm:
_acc = _accounts_by_practice(pid)
_resolved_ru = _ru_in if _ru_in in _acc else (
_resolve_user_uid_in_practice(pid, _r_in) if _r_in else ""
)
_r_lower = _r_in.lower()
if not _resolved_ru or _r_lower in ("alle", "all", "allgemein", "an alle"):
raise HTTPException(
status_code=400,
detail=(
"Direktchat konnte technisch nicht eindeutig zugeordnet "
"werden (recipient_user_id fehlt oder ist ungueltig). "
"Bitte den Empfaenger erneut auswaehlen."
),
)
_sess_uid_chk = (s.get("user_id") if s else "") or ""
_claim_su_chk = str(_ex_in.get("sender_user_id") or "").strip()
if _sess_uid_chk:
pass
elif _claim_su_chk and _claim_su_chk in _acc:
pass
else:
raise HTTPException(
status_code=400,
detail=(
"Direktchat: gueltige Session oder sender_user_id fuer "
"diese Praxis erforderlich."
),
)
2026-04-19 20:41:37 +02:00
msg_id = uuid.uuid4().hex[:12]
messages = _load_messages()
thread_id = msg_id
reply_to = (msg.extras or {}).get("reply_to", "")
if reply_to:
for m in messages:
if m.get("id") == reply_to:
thread_id = m.get("thread_id", reply_to)
break
else:
thread_id = reply_to
2026-04-16 15:23:14 +02:00
entry = {
2026-04-19 20:41:37 +02:00
"id": msg_id,
"thread_id": thread_id,
"practice_id": pid,
2026-04-16 15:23:14 +02:00
"medikamente": msg.medikamente.strip(),
"therapieplan": msg.therapieplan.strip(),
"procedere": msg.procedere.strip(),
"kommentar": msg.kommentar.strip(),
"patient": msg.patient.strip(),
"absender": absender,
2026-04-16 15:23:14 +02:00
"zeitstempel": msg.zeitstempel.strip() or time.strftime("%Y-%m-%d %H:%M:%S"),
"empfangen": time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "offen",
"user_id": s["user_id"] if s else "",
2026-04-16 15:23:14 +02:00
}
2026-05-08 14:53:53 +02:00
ex_raw = dict(msg.extras or {})
session_uid_out = (s["user_id"] if s else "") or ""
entry["extras"] = _enrich_outgoing_direct_extras(pid, absender, ex_raw, session_uid_out)
exo_dbg = entry.get("extras") or {}
if _claims_dm:
_send_mode = "direct"
elif _extras_indicates_broadcast(exo_dbg):
_send_mode = "all"
else:
_send_mode = "other"
_log.info(
"empfang_send mode=%s practice_id=%s sender_uid=%s recipient_uid=%s "
"conv_key=%s msg_id=%s",
_send_mode,
pid,
str(exo_dbg.get("sender_user_id") or ""),
str(exo_dbg.get("recipient_user_id") or ""),
str(exo_dbg.get("direct_conv_key") or ""),
msg_id,
)
2026-04-16 15:23:14 +02:00
messages.insert(0, entry)
_save_messages(messages)
2026-05-05 23:36:13 +02:00
try:
_pulse_bump(pid, sender=absender)
except Exception:
pass
return JSONResponse(content={
"success": True, "id": msg_id, "thread_id": thread_id,
"practice_id": pid,
})
2026-04-16 15:23:14 +02:00
@router.get("/messages")
async def empfang_list(request: Request, practice_id: Optional[str] = Query(None)):
2026-04-21 10:00:36 +02:00
pid = (practice_id or "").strip() or _resolve_practice_id(request)
if not pid:
return JSONResponse(content={"success": True, "messages": []})
2026-04-16 15:23:14 +02:00
messages = _load_messages()
filtered = _filter_by_practice(messages, pid)
return JSONResponse(content={"success": True, "messages": filtered})
2026-04-16 15:23:14 +02:00
2026-05-05 23:36:13 +02:00
# =====================================================================
# CONVERSATION + LIVE-PULSE
# Eine einzige serverseitige Wahrheit fuer Browser, Hülle und
# "An Empfang senden". Kein Client-Filter, keine lokale Sonderwahrheit.
# =====================================================================
# In-Memory Pulse: bei jedem POST /send wird der Tick erhoeht.
# Clients koennen mit kurzen Polls (z. B. 800 ms) auf den Tick lauschen
# und nur dann die volle Conversation neu holen, wenn der Tick wechselt.
# Damit wirkt das Signal sofort, ohne traege Sekunden-Lags.
_PRACTICE_PULSE: dict[str, dict] = {}
def _pulse_bump(practice_id: str, sender: str = ""):
p = _PRACTICE_PULSE.setdefault(practice_id, {"tick": 0, "ts": 0.0, "last_sender": ""})
p["tick"] = int(p.get("tick", 0)) + 1
p["ts"] = time.time()
p["last_sender"] = sender or ""
_PRACTICE_PULSE[practice_id] = p
def _pulse_get(practice_id: str) -> dict:
p = _PRACTICE_PULSE.get(practice_id)
if not p:
# Beim ersten Abruf einen Tick aus den Daten ableiten, damit
# Clients nach Server-Restart nicht alle "neue Nachricht!" denken.
msgs = _filter_by_practice(_load_messages(), practice_id)
latest = ""
for m in msgs:
t = (m.get("empfangen") or m.get("zeitstempel") or "")
if t > latest:
latest = t
p = {"tick": 1, "ts": time.time(), "last_sender": "", "boot": latest}
_PRACTICE_PULSE[practice_id] = p
return p
2026-05-08 14:53:53 +02:00
# =====================================================================
# Client-Presence (Ping pro angemeldeter Empfang-Instanz, practice-scoped)
# RAM-beschraenkt wie Pulse; TTL definiert „online“ für /empfang/users.
# =====================================================================
EMPFANG_PRESENCE_TTL_SECONDS = 120
_PRACTICE_USER_PRESENCE: dict[str, dict] = {}
def _presence_key(pid: str, uid: str) -> str:
return f"{(pid or '').strip()}|{(uid or '').strip()}"
def _presence_record_ping(pid: str, uid: str, source: str = "web") -> None:
if not pid or not uid:
return
src = ((source or "web").strip() or "web")[:32]
_PRACTICE_USER_PRESENCE[_presence_key(pid, uid)] = {
"last_seen": time.time(),
"source": src,
}
def _presence_clear_user(pid: str, uid: str) -> None:
if not pid or not uid:
return
_PRACTICE_USER_PRESENCE.pop(_presence_key(pid, uid), None)
def _presence_iso_utc(ts: float) -> str:
if ts <= 0:
return ""
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ")
def _presence_snapshot_for_user(pid: str, uid: str) -> dict:
rec = _PRACTICE_USER_PRESENCE.get(_presence_key(pid, uid))
now = time.time()
absent = {
"presence_online": False,
"presence_last_seen": None,
"presence_source": "",
"presence_age_seconds": None,
}
if not rec:
return absent
last = float(rec.get("last_seen", 0))
age = max(0.0, now - last)
online = age <= float(EMPFANG_PRESENCE_TTL_SECONDS)
return {
"presence_online": online,
"presence_last_seen": _presence_iso_utc(last) if last > 0 else None,
"presence_source": str(rec.get("source") or ""),
"presence_age_seconds": int(age),
}
2026-05-05 23:36:13 +02:00
def _norm_name(s: str) -> str:
2026-05-06 22:43:22 +02:00
"""Vergleichts-String fuer Namen: lower, trim, Akzente ueber NFKD entfernen."""
t = (s or "").strip().lower()
t = unicodedata.normalize("NFKD", t)
return "".join(ch for ch in t if unicodedata.combining(ch) == "")
2026-05-05 23:36:13 +02:00
2026-05-08 14:53:53 +02:00
def _accounts_by_practice(pid: str) -> dict[str, dict]:
accounts = _load_accounts()
return {a["user_id"]: a for a in accounts.values() if a.get("practice_id") == pid}
def _resolve_user_uid_in_practice(pid: str, hint: str) -> str:
"""Loesst Kurz-ID oder display_name innerhalb einer Praxis zu user_id auf."""
hint = (hint or "").strip()
if not hint or not pid:
return ""
by_uid = _accounts_by_practice(pid)
if hint in by_uid:
return hint
hn = _norm_name(hint)
best = ""
for uid, a in by_uid.items():
if _norm_name(a.get("display_name") or "") == hn:
return uid
return best
def _resolve_user_uid_in_practice_loose(pid: str, hint: str) -> str:
"""Eindeutige Teilstring-Zuordnung display_name ↔ user_id innerhalb einer Praxis.
Nur wenn genau ein Konto matched; sonst leer keine automatische Fusion.
Mindestlaenge fuer Hint, um kurze Artefakte wie ``test`` zu vermeiden.
"""
hint = (hint or "").strip()
if not hint or not pid:
return ""
ex = _resolve_user_uid_in_practice(pid, hint)
if ex:
return ex
hn = _norm_name(hint)
if len(hn) < 5:
return ""
cand: list[str] = []
for uid, a in _accounts_by_practice(pid).items():
dn = _norm_name(a.get("display_name") or "")
if not dn:
continue
if hn in dn or dn in hn:
cand.append(uid)
if len(cand) != 1:
return ""
return cand[0]
def _resolve_user_uid_unique_last_token(pid: str, full_name_core: str) -> str:
"""Wenn nach normiertem Nachnamen nur genau ein Konto in der Praxis matched."""
hn = _norm_name(full_name_core)
parts = [p for p in hn.split() if p]
if len(parts) < 2:
return ""
last = parts[-1]
if len(last) < 4:
return ""
cand: list[str] = []
for uid, a in _accounts_by_practice(pid).items():
dn = _norm_name(a.get("display_name") or "")
dp = [p for p in dn.split() if p]
if not dp:
continue
if dp[-1] == last:
cand.append(uid)
if len(cand) != 1:
return ""
return cand[0]
def _extras_indicates_broadcast(ex: dict) -> bool:
if not isinstance(ex, dict):
return False
if ex.get("rcpt_broadcast") in (True, "true", "1", 1):
return True
aud = str(ex.get("audience") or "").strip().lower()
if aud in ("all", "everyone", "broadcast", "general"):
return True
return False
def _thread_id_stable(m: dict) -> str:
return str(m.get("thread_id") or m.get("id") or "").strip()
def _thread_requires_broadcast_exclusion(
messages: list[dict],
pid: str,
tid: str,
) -> bool:
"""Threads mit klarem (Nicht-)Broadcast-Adressaten nicht unter «An alle» listen."""
if not tid or not pid:
return False
acc_map = _accounts_by_practice(pid)
for m in messages:
if _thread_id_stable(m) != tid:
continue
if _normalized_group_key_from_message(m):
continue
ex = m.get("extras") or {}
if _extras_indicates_broadcast(ex):
continue
su = str(ex.get("sender_user_id") or "").strip()
ru = str(ex.get("recipient_user_id") or "").strip()
if su in acc_map and ru in acc_map and su != ru:
return True
rcpt_raw = (ex.get("recipient") or "").strip()
rl = rcpt_raw.lower()
if rcpt_raw and rl not in ("alle", "all", "allgemein", "an alle"):
return True
dk = str(ex.get("direct_conv_key") or "").strip()
if dk and "|direct|" in dk:
return True
return False
def _uid_pair_from_message_for_practice(m: dict, pid: str) -> tuple[str, str]:
"""Liefert zwei user_ids wenn aus extras + Stammdaten konservativ ableitbar."""
acc_map = _accounts_by_practice(pid)
ex = m.get("extras") or {}
su_ex = str(ex.get("sender_user_id") or "").strip()
ru_ex = str(ex.get("recipient_user_id") or "").strip()
su = su_ex if su_ex in acc_map else ""
ru = ru_ex if ru_ex in acc_map else ""
core_s = _sender_core(m.get("absender", ""))
r_s = (ex.get("recipient") or "").strip()
rl = r_s.lower()
if rl in ("alle", "all", "allgemein", "an alle"):
return ("", "")
if _extras_indicates_broadcast(ex):
return ("", "")
rs = _resolve_user_uid_in_practice(pid, core_s)
rr = _resolve_user_uid_in_practice(pid, r_s) if r_s else ""
if not rs:
rs = _resolve_user_uid_in_practice_loose(pid, core_s)
if not rs:
rs = _resolve_user_uid_unique_last_token(pid, core_s)
if not rr and r_s:
rr = _resolve_user_uid_in_practice_loose(pid, r_s)
if not rr and r_s:
rr = _resolve_user_uid_unique_last_token(pid, r_s)
if not su and rs:
su = rs
if not ru and rr:
ru = rr
if _normalized_group_key_from_message(m):
return ("", "")
if su and ru and su != ru:
return (su, ru)
return ("", "")
def _direct_conv_key(pid: str, uid_a: str, uid_b: str) -> str:
ua, ub = sorted([uid_a, uid_b])
return f"{pid}|direct|{ua}|{ub}"
2026-05-05 23:36:13 +02:00
def _sender_core(absender: str) -> str:
"""Aus 'Vorname Nachname (HOST)' -> 'Vorname Nachname'."""
s = (absender or "").split("(")[0].strip()
return s
2026-05-08 14:53:53 +02:00
def _enrich_outgoing_direct_extras(pid: str, absender: str, extras: dict,
session_uid: str) -> dict:
"""DM: sender_user_id, recipient_user_id, direct_conv_key (stabil)."""
ex = dict(extras or {})
recipient_raw = (ex.get("recipient") or "").strip()
rlist = ex.get("recipients")
is_multi = (
isinstance(rlist, list) and len(rlist) >= 2
) or ("," in recipient_raw and recipient_raw.count(",") >= 1)
broadcast_rcpt = not recipient_raw or recipient_raw.lower() in ("alle", "all", "allgemein")
if broadcast_rcpt or is_multi:
return ex
by_uid = _accounts_by_practice(pid)
core = _sender_core(absender)
sender_uid = (session_uid or "").strip()
if not sender_uid:
su_claim = str(ex.get("sender_user_id") or "").strip()
resolved = _resolve_user_uid_in_practice(pid, core)
if su_claim and su_claim in by_uid and su_claim == resolved:
sender_uid = su_claim
elif su_claim and su_claim in by_uid and not resolved:
sender_uid = su_claim
else:
sender_uid = resolved
else:
if sender_uid not in by_uid:
fb = _resolve_user_uid_in_practice(pid, core)
if fb:
sender_uid = fb
if sender_uid:
ex["sender_user_id"] = sender_uid
recipient_uid = _resolve_user_uid_in_practice(pid, recipient_raw)
ru_claim = str(ex.get("recipient_user_id") or "").strip()
if ru_claim and ru_claim in by_uid:
if not recipient_uid or ru_claim == recipient_uid:
recipient_uid = ru_claim
if recipient_uid:
ex["recipient_user_id"] = recipient_uid
if sender_uid and recipient_uid:
ex["direct_conv_key"] = _direct_conv_key(pid, sender_uid, recipient_uid)
# Explizites DM-Tagging: garantiert, dass diese Nachricht nicht als
# Allgemein-/Broadcast-Inbox-Treffer ausgewertet wird.
if recipient_raw and not is_multi:
ex["audience"] = "direct"
ex["rcpt_broadcast"] = False
return ex
2026-05-05 23:36:13 +02:00
def _msg_recipient(m: dict) -> str:
extras = m.get("extras") or {}
return (extras.get("recipient") or "").strip()
def _normalized_group_key_from_extras(extras: dict) -> str:
"""Canonical key 'name|name|...' lowercase for multi-recipient threads."""
if not isinstance(extras, dict):
return ""
rlist = extras.get("recipients")
if isinstance(rlist, list) and len(rlist) >= 2:
parts = sorted({_norm_name(str(x)) for x in rlist if str(x).strip()})
return "|".join(parts) if parts else ""
rcpt = (extras.get("recipient") or "").strip()
if "," in rcpt:
parts = sorted({_norm_name(p) for p in rcpt.split(",") if p.strip()})
if len(parts) >= 2:
return "|".join(parts)
return ""
def _normalized_group_key_from_message(m: dict) -> str:
return _normalized_group_key_from_extras(m.get("extras") or {})
2026-05-08 14:53:53 +02:00
def _dm_message_matches_pair(m: dict, me_n: str, peer_n: str) -> bool:
"""True, wenn Nachricht zum 1:1-Paar (mit display_name-normalisierten Kernen) gehoert."""
if _normalized_group_key_from_message(m):
return False
sender_n = _norm_name(_sender_core(m.get("absender", "")))
rcpt_n = _norm_name(_msg_recipient(m))
ex = m.get("extras") or {}
has_reply = bool(str(ex.get("reply_to") or "").strip())
if me_n and peer_n:
if sender_n and sender_n not in (me_n, peer_n):
return False
if rcpt_n in ("", "alle"):
# Leerer Empfaenger: nur echte Thread-Antwort (kein Rundschreiben ohne Adresse).
return has_reply and bool(sender_n) and sender_n in (me_n, peer_n)
if rcpt_n not in (me_n, peer_n):
return False
return (
(sender_n == me_n and rcpt_n == peer_n)
or (sender_n == peer_n and rcpt_n == me_n)
)
# Wenig 'me' vom Client: sehr konservativ, keine Rundmails ohne reply_to reinziehen.
if not peer_n:
return False
if rcpt_n in ("", "alle"):
return has_reply and sender_n == peer_n
if sender_n == peer_n and rcpt_n and rcpt_n not in ("alle",) and rcpt_n != peer_n:
return True
if rcpt_n == peer_n and sender_n and sender_n != peer_n:
return True
return False
def _msg_by_id_index(messages: list[dict]) -> dict[str, dict]:
return {str(m.get("id")): m for m in messages if m.get("id")}
def _thread_root_msg(m: dict, by_id: dict[str, dict]) -> Optional[dict]:
cur: Optional[dict] = m
steps = 0
while cur is not None and steps < 500:
steps += 1
rto = str((cur.get("extras") or {}).get("reply_to") or "").strip()
if not rto:
return cur
nxt = by_id.get(rto)
if nxt is None:
return cur
cur = nxt
return cur
def _root_is_broadcast_inbox(root: Optional[dict], pid: str = "") -> bool:
if not root:
return False
if _normalized_group_key_from_message(root):
return False
pid = (pid or "").strip()
exroot = root.get("extras") or {}
if pid and isinstance(exroot, dict) and not _extras_indicates_broadcast(exroot):
acc_map = _accounts_by_practice(pid)
su = str(exroot.get("sender_user_id") or "").strip()
ru = str(exroot.get("recipient_user_id") or "").strip()
if su in acc_map and ru in acc_map and su != ru:
return False
rcpt_n = _norm_name(_msg_recipient(root))
return rcpt_n in ("", "alle")
def _dm_extras_uid_symmetric_match(
ex: dict, me_uid: str, peer_uid: str, acc_map: dict[str, dict]
) -> bool:
su = str(ex.get("sender_user_id") or "").strip()
ru = str(ex.get("recipient_user_id") or "").strip()
if su not in acc_map or ru not in acc_map or su == ru:
return False
return {su, ru} == {me_uid, peer_uid}
def _dm_uid_pair_matches_message(m: dict, pid: str, me_uid: str, peer_uid: str) -> bool:
"""True, wenn aus Absender-/Empfaenger-/extras eindeutig dasselbe 1:1-Paar wird."""
if not me_uid or not peer_uid:
return False
a, b = _uid_pair_from_message_for_practice(m, pid)
return bool(a and b and {a, b} == {me_uid, peer_uid})
def _conversation_dm_by_key_or_names(
messages: list[dict],
pid: str,
me_uid: str,
peer_uid: str,
me_display: str,
peer_display_fallback: str,
) -> list[dict]:
acc_map = _accounts_by_practice(pid)
peer_dn = peer_display_fallback
if peer_uid and peer_uid in acc_map:
peer_dn = (acc_map[peer_uid].get("display_name") or "").strip() or peer_dn
me_dn = me_display
if me_uid and me_uid in acc_map:
me_dn = (acc_map[me_uid].get("display_name") or "").strip() or me_dn
me_n = _norm_name(me_dn)
peer_n = _norm_name(peer_dn)
key_need = ""
if me_uid and peer_uid:
key_need = _direct_conv_key(pid, me_uid, peer_uid)
out: list[dict] = []
for m in messages:
if _normalized_group_key_from_message(m):
continue
ex = m.get("extras") or {}
rcpt_raw_l = (ex.get("recipient") or "").strip().lower()
if rcpt_raw_l in ("alle", "all", "allgemein", "an alle"):
continue
if _extras_indicates_broadcast(ex):
continue
matched = False
if key_need:
km = str(ex.get("direct_conv_key") or "").strip()
if km == key_need:
matched = True
elif _dm_extras_uid_symmetric_match(ex, me_uid, peer_uid, acc_map):
matched = True
elif km and km != key_need:
# Alter/falscher Key: nur bei Nachweis desselben Teilnehmerpaares
# oder konservativem Legacy-Namenmatch zulassen (nicht fremdes DM).
if _dm_uid_pair_matches_message(m, pid, me_uid, peer_uid):
matched = True
elif me_n and peer_n and _dm_message_matches_pair(m, me_n, peer_n):
matched = True
else:
continue
if not matched and me_uid and peer_uid:
if _dm_uid_pair_matches_message(m, pid, me_uid, peer_uid):
matched = True
if not matched and me_n and peer_n and _dm_message_matches_pair(m, me_n, peer_n):
matched = True
if matched:
out.append(m)
return out
def _conversation_for_audience(
messages: list[dict],
practice_id_scope: str,
me_display: str,
audience: str,
me_user_id: str = "",
peer_user_id: str = "",
) -> list[dict]:
2026-05-05 23:36:13 +02:00
"""
Audience-Modell:
2026-05-08 14:53:53 +02:00
- Optional me_user_id + peer_user_id: stabiler Direktchat (direct_conv_key).
- Sonst Fallback ueber normierte display_name-Paare.
2026-05-05 23:36:13 +02:00
"""
aud_raw = (audience or "").strip()
aud_lower = aud_raw.lower()
2026-05-08 14:53:53 +02:00
if aud_lower in ("__noop__", "__multi__"):
return []
2026-05-05 23:36:13 +02:00
is_broadcast = aud_lower in ("", "alle", "all", "allgemein")
2026-05-08 14:53:53 +02:00
pid = (practice_id_scope or "").strip()
2026-05-05 23:36:13 +02:00
def _tid(msg: dict) -> str:
return str(msg.get("thread_id") or msg.get("id") or "")
out: list[dict] = []
if is_broadcast:
2026-05-08 14:53:53 +02:00
by_id = _msg_by_id_index(messages)
2026-05-05 23:36:13 +02:00
for m in messages:
if _normalized_group_key_from_message(m):
continue
rcpt_n = _norm_name(_msg_recipient(m))
2026-05-08 14:53:53 +02:00
if rcpt_n not in ("", "alle"):
continue
root = _thread_root_msg(m, by_id)
if not _root_is_broadcast_inbox(root, pid):
continue
tid = _tid(m)
if _thread_requires_broadcast_exclusion(messages, pid, tid):
continue
out.append(m)
2026-05-05 23:36:13 +02:00
out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or ""))
return out
# --- Gruppen-Chat ---
if aud_lower.startswith("group|"):
target_key = aud_lower[len("group|"):].strip()
2026-05-08 14:53:53 +02:00
participants = {_norm_name(p) for p in target_key.split("|") if p.strip()}
2026-05-05 23:36:13 +02:00
thread_ids: set[str] = set()
for m in messages:
gk = _normalized_group_key_from_message(m)
if gk == target_key:
thread_ids.add(_tid(m))
for m in messages:
2026-05-08 14:53:53 +02:00
if _tid(m) not in thread_ids:
continue
gk2 = _normalized_group_key_from_message(m)
if gk2 == target_key:
2026-05-05 23:36:13 +02:00
out.append(m)
2026-05-08 14:53:53 +02:00
continue
if not gk2:
sn = _norm_name(_sender_core(m.get("absender", "")))
if sn and sn in participants:
out.append(m)
2026-05-05 23:36:13 +02:00
out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or ""))
return out
2026-05-08 14:53:53 +02:00
# --- 1:1 Direktverlauf ---
mu = (me_user_id or "").strip() or ""
pu = (peer_user_id or "").strip() or _resolve_user_uid_in_practice(pid, aud_raw)
dm_list = _conversation_dm_by_key_or_names(
messages,
pid,
mu,
pu,
me_display,
aud_raw,
)
out.extend(dm_list)
2026-05-05 23:36:13 +02:00
out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or ""))
return out
@router.get("/conversation")
async def empfang_conversation(
request: Request,
audience: str = Query(""),
me: str = Query(""),
2026-05-08 14:53:53 +02:00
me_user_id: str = Query(""),
peer_user_id: str = Query(""),
2026-05-05 23:36:13 +02:00
practice_id: Optional[str] = Query(None),
):
"""Liefert den vollstaendigen, serverseitig gefilterten Verlauf.
Eine Quelle fuer Browser, Hülle und Desktop-Dialog "An Empfang senden".
2026-05-08 14:53:53 +02:00
Optional: me_user_id + peer_user_id fuer stabilen Direktchat (gleiche Logik ueberall).
2026-05-05 23:36:13 +02:00
"""
pid = (practice_id or "").strip() or _resolve_practice_id(request)
if not pid:
2026-05-08 14:53:53 +02:00
return JSONResponse(
content={"success": True, "messages": [], "tick": 0},
headers={"Cache-Control": "no-store, no-cache, must-revalidate"},
)
2026-05-05 23:36:13 +02:00
s = _session_from_request(request)
me_eff = (me or "").strip() or (s.get("display_name") if s else "")
2026-05-08 14:53:53 +02:00
me_uid_eff = (me_user_id or "").strip() or (str(s.get("user_id") or "").strip() if s else "")
if not me_uid_eff and me_eff:
# Fallback: aus dem Anzeigename in der Praxis aufloesen, damit
# Desktop ohne Browser-Session denselben direct_conv_key trifft.
me_uid_eff = _resolve_user_uid_in_practice(pid, me_eff)
peer_uid_eff = (peer_user_id or "").strip()
if not peer_uid_eff:
aud_raw = (audience or "").strip()
aud_lower = aud_raw.lower()
if aud_raw and aud_lower not in ("", "alle", "all", "allgemein", "__noop__", "__multi__") and not aud_lower.startswith("group|"):
peer_uid_eff = _resolve_user_uid_in_practice(pid, aud_raw)
2026-05-05 23:36:13 +02:00
messages = _filter_by_practice(_load_messages(), pid)
2026-05-08 14:53:53 +02:00
conv = _conversation_for_audience(
messages,
pid,
me_eff,
audience,
me_uid_eff,
peer_uid_eff,
)
2026-05-05 23:36:13 +02:00
pulse = _pulse_get(pid)
2026-05-08 14:53:53 +02:00
_log.info(
"empfang_conversation load practice_id=%s me_uid=%s peer_uid=%s "
"audience_key=%s msg_count=%s tick=%s",
pid,
me_uid_eff,
peer_uid_eff,
(audience or "")[:80] if audience else "",
len(conv),
int(pulse.get("tick", 0)),
)
return JSONResponse(
content={
"success": True,
"messages": conv,
"audience": audience or "",
"me": me_eff,
"me_user_id": me_uid_eff,
"peer_user_id_used": peer_uid_eff,
"tick": int(pulse.get("tick", 0)),
"ts": pulse.get("ts", 0.0),
},
headers={
"Cache-Control": "no-store, no-cache, must-revalidate",
"Pragma": "no-cache",
},
)
2026-05-05 23:36:13 +02:00
@router.get("/pulse")
async def empfang_pulse(request: Request, practice_id: Optional[str] = Query(None)):
"""Sehr leichter Endpoint fuer Live-Pulse.
Clients pollen kurz (z. B. 800 ms) und holen die Conversation nur dann
neu, wenn sich 'tick' geaendert hat. Damit erscheint das Signal sofort
und ohne 510 s Verzoegerung der alten Polling-Loop.
"""
pid = (practice_id or "").strip() or _resolve_practice_id(request)
if not pid:
2026-05-08 14:53:53 +02:00
return JSONResponse(
content={"tick": 0, "ts": 0.0},
headers={"Cache-Control": "no-store, no-cache, must-revalidate"},
)
2026-05-05 23:36:13 +02:00
p = _pulse_get(pid)
2026-05-08 14:53:53 +02:00
return JSONResponse(
content={
"tick": int(p.get("tick", 0)),
"ts": float(p.get("ts", 0.0)),
"last_sender": p.get("last_sender", ""),
},
headers={
"Cache-Control": "no-store, no-cache, must-revalidate",
"Pragma": "no-cache",
},
)
@router.post("/presence/ping")
async def empfang_presence_ping(request: Request):
"""Setzt fuer die aktuelle Praxis/User-Kombination last_seen (TTL = online).
Cookie-Session: Web / WebView. Optional: Desktop mit X-API-Token +
X-Practice-Id + X-AzA-Empfang-User-Id (gleiche Pruefung wie Shell-Erzeugung).
"""
now = time.time()
ttl = int(EMPFANG_PRESENCE_TTL_SECONDS)
s = _session_from_request(request)
if s:
pid = (s.get("practice_id") or "").strip()
uid = (s.get("user_id") or "").strip()
if pid and uid:
_presence_record_ping(pid, uid, "web")
return JSONResponse(
content={
"success": True,
"server_time": now,
"ttl_seconds": ttl,
"own_user_id": uid,
},
headers={"Cache-Control": "no-store, no-cache, must-revalidate"},
)
api_raw = (request.headers.get("X-API-Token") or "").strip()
if api_raw:
pid, uid = _require_shell_api_identity(request)
_presence_record_ping(pid, uid, "desktop")
return JSONResponse(
content={
"success": True,
"server_time": now,
"ttl_seconds": ttl,
"own_user_id": uid,
},
headers={"Cache-Control": "no-store, no-cache, must-revalidate"},
)
raise HTTPException(status_code=401, detail="Nicht angemeldet")
# =====================================================================
# DM v2 — Direct-Only, Fail-Closed
# Eigene, isolierte API ausschliesslich fuer Personenchats:
# - kein Allgemein/Broadcast-Pfad
# - kein Namens-/Heuristik-Matching
# - Speicherung und Laden nur ueber direct_conv_key
# - Verifizierbar (msg_id + conversation_key in Antwort)
# =====================================================================
class DmSendIn(BaseModel):
practice_id: str = ""
sender_user_id: str = ""
recipient_user_id: str = ""
text: str = ""
attachments: list = Field(default_factory=list)
client_msg_id: str = ""
def _dm_v2_load_for_pair(pid: str, uid_a: str, uid_b: str) -> tuple[list[dict], str]:
"""Liefert genau die Nachrichten dieses 1:1-Schluessels, sortiert chronologisch."""
conv_key = _direct_conv_key(pid, uid_a, uid_b)
msgs = _filter_by_practice(_load_messages(), pid)
out: list[dict] = []
for m in msgs:
ex = m.get("extras") or {}
if str(ex.get("direct_conv_key") or "").strip() == conv_key:
out.append(m)
out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or ""))
return out, conv_key
@router.post("/dm/send")
async def empfang_dm_send(payload: DmSendIn, request: Request):
"""Direct-Only Senden. Fail-Closed:
- practice_id Pflicht (Body oder Session)
- sender_user_id Pflicht (Body oder Session)
- recipient_user_id Pflicht
- sender != recipient
- beide Konten muessen zur Praxis gehoeren
- Kein Fallback auf Allgemein. Kein audience=all.
"""
s = _session_from_request(request)
pid = (payload.practice_id or "").strip() or _resolve_practice_id(request)
if not pid:
raise HTTPException(status_code=400, detail="practice_id erforderlich")
sender_uid = (payload.sender_user_id or "").strip()
if not sender_uid and s:
sender_uid = str(s.get("user_id") or "").strip()
recipient_uid = (payload.recipient_user_id or "").strip()
text = (payload.text or "").strip()
if not sender_uid:
raise HTTPException(status_code=400, detail="sender_user_id erforderlich")
if not recipient_uid:
raise HTTPException(status_code=400, detail="recipient_user_id erforderlich")
if sender_uid == recipient_uid:
raise HTTPException(status_code=400, detail="Selbstchat nicht erlaubt")
acc_map = _accounts_by_practice(pid)
if sender_uid not in acc_map:
raise HTTPException(status_code=400, detail="sender_user_id gehoert nicht zu dieser Praxis")
if recipient_uid not in acc_map:
raise HTTPException(status_code=400, detail="recipient_user_id gehoert nicht zu dieser Praxis")
has_attachments = bool(payload.attachments)
if not text and not has_attachments:
raise HTTPException(status_code=400, detail="Leere Nachricht ohne Anhang nicht erlaubt")
sender_dn = (acc_map[sender_uid].get("display_name") or "").strip()
recipient_dn = (acc_map[recipient_uid].get("display_name") or "").strip()
conv_key = _direct_conv_key(pid, sender_uid, recipient_uid)
msg_id = uuid.uuid4().hex[:12]
now = time.strftime("%Y-%m-%d %H:%M:%S")
extras = {
"audience": "direct",
"rcpt_broadcast": False,
"recipient": recipient_dn,
"recipient_user_id": recipient_uid,
"sender_user_id": sender_uid,
"direct_conv_key": conv_key,
"dm_v2": True,
}
if payload.client_msg_id:
extras["client_msg_id"] = str(payload.client_msg_id)[:64]
if has_attachments:
extras["attachments"] = payload.attachments
entry = {
"id": msg_id,
"thread_id": msg_id,
"practice_id": pid,
"medikamente": "",
"therapieplan": "",
"procedere": "",
"kommentar": text or ("\u200b" if has_attachments else ""),
"patient": "Direkt: " + recipient_dn,
"absender": sender_dn + " (Empfang)",
"zeitstempel": now,
"empfangen": now,
"status": "offen",
"user_id": sender_uid,
"extras": extras,
}
messages = _load_messages()
messages.insert(0, entry)
_save_messages(messages)
try:
_pulse_bump(pid, sender=sender_dn)
except Exception:
pass
_log.info(
"AZA_CHAT_SEND mode=direct practice=%s sender=%s recipient=%s conv=%s msg=%s",
pid, sender_uid, recipient_uid, conv_key, msg_id,
)
return JSONResponse(
content={
"success": True,
"ok": True,
"mode": "direct",
"message_id": msg_id,
"thread_id": msg_id,
"practice_id": pid,
"sender_user_id": sender_uid,
"recipient_user_id": recipient_uid,
"conversation_key": conv_key,
"created_at": now,
},
headers={
"Cache-Control": "no-store, no-cache, must-revalidate",
"Pragma": "no-cache",
},
)
@router.get("/dm/conversation")
async def empfang_dm_conversation(
request: Request,
sender_user_id: str = Query(""),
recipient_user_id: str = Query(""),
practice_id: Optional[str] = Query(None),
):
"""Liefert ausschliesslich Direct-Nachrichten dieses 1:1-Paares (per direct_conv_key).
Keine Heuristik, kein Broadcast-Fallback.
"""
s = _session_from_request(request)
pid = (practice_id or "").strip() or _resolve_practice_id(request)
if not pid:
raise HTTPException(status_code=400, detail="practice_id erforderlich")
me_uid = (sender_user_id or "").strip()
if not me_uid and s:
me_uid = str(s.get("user_id") or "").strip()
peer_uid = (recipient_user_id or "").strip()
if not me_uid:
raise HTTPException(status_code=400, detail="sender_user_id erforderlich")
if not peer_uid:
raise HTTPException(status_code=400, detail="recipient_user_id erforderlich")
if me_uid == peer_uid:
raise HTTPException(status_code=400, detail="Selbstchat nicht erlaubt")
acc_map = _accounts_by_practice(pid)
if me_uid not in acc_map:
raise HTTPException(status_code=400, detail="sender_user_id gehoert nicht zu dieser Praxis")
if peer_uid not in acc_map:
raise HTTPException(status_code=400, detail="recipient_user_id gehoert nicht zu dieser Praxis")
msgs, conv_key = _dm_v2_load_for_pair(pid, me_uid, peer_uid)
pulse = _pulse_get(pid)
_log.info(
"AZA_CHAT_LOAD mode=direct practice=%s me=%s peer=%s conv=%s count=%s",
pid, me_uid, peer_uid, conv_key, len(msgs),
)
return JSONResponse(
content={
"success": True,
"ok": True,
"mode": "direct",
"practice_id": pid,
"sender_user_id": me_uid,
"recipient_user_id": peer_uid,
"conversation_key": conv_key,
"messages": msgs,
"count": len(msgs),
"tick": int(pulse.get("tick", 0)),
"ts": pulse.get("ts", 0.0),
},
headers={
"Cache-Control": "no-store, no-cache, must-revalidate",
"Pragma": "no-cache",
},
)
2026-05-05 23:36:13 +02:00
2026-04-19 20:41:37 +02:00
@router.get("/thread/{thread_id}")
async def empfang_thread(thread_id: str, request: Request,
practice_id: Optional[str] = Query(None)):
2026-04-21 10:00:36 +02:00
pid = (practice_id or "").strip() or _resolve_practice_id(request)
if not pid:
return JSONResponse(content={"success": True, "messages": []})
2026-04-19 20:41:37 +02:00
messages = _load_messages()
thread = [m for m in messages
if m.get("thread_id") == thread_id and _msg_practice(m) == pid]
2026-04-19 20:41:37 +02:00
thread.sort(key=lambda m: m.get("empfangen", ""))
return JSONResponse(content={"success": True, "messages": thread})
2026-04-16 15:23:14 +02:00
@router.post("/messages/{msg_id}/done")
async def empfang_done(msg_id: str):
messages = _load_messages()
2026-04-19 20:41:37 +02:00
target = next((m for m in messages if m.get("id") == msg_id), None)
if not target:
raise HTTPException(status_code=404, detail="Nachricht nicht gefunden")
tid = target.get("thread_id", msg_id)
pid = _msg_practice(target)
2026-04-16 15:23:14 +02:00
for m in messages:
if m.get("thread_id") == tid and _msg_practice(m) == pid:
2026-04-16 15:23:14 +02:00
m["status"] = "erledigt"
2026-04-19 20:41:37 +02:00
_save_messages(messages)
2026-05-05 23:36:13 +02:00
try:
_pulse_bump(pid, sender="")
except Exception:
pass
2026-04-19 20:41:37 +02:00
return JSONResponse(content={"success": True})
2026-04-16 15:23:14 +02:00
@router.delete("/messages/{msg_id}")
async def empfang_delete(msg_id: str):
messages = _load_messages()
2026-04-19 20:41:37 +02:00
target = next((m for m in messages if m.get("id") == msg_id), None)
if not target:
2026-04-16 15:23:14 +02:00
raise HTTPException(status_code=404, detail="Nachricht nicht gefunden")
2026-04-19 20:41:37 +02:00
tid = target.get("thread_id", msg_id)
pid = _msg_practice(target)
2026-04-19 20:41:37 +02:00
if tid == msg_id:
new = [m for m in messages
if not (m.get("thread_id", m.get("id")) == msg_id and _msg_practice(m) == pid)
and not (m.get("id") == msg_id)]
2026-04-19 20:41:37 +02:00
else:
new = [m for m in messages if m.get("id") != msg_id]
2026-04-16 15:23:14 +02:00
_save_messages(new)
2026-05-05 23:36:13 +02:00
try:
_pulse_bump(pid, sender="")
except Exception:
pass
2026-04-16 15:23:14 +02:00
return JSONResponse(content={"success": True})
# =====================================================================
# TASKS (practice-scoped, server-side)
# =====================================================================
2026-04-19 20:41:37 +02:00
@router.get("/tasks")
async def empfang_tasks_list(request: Request):
2026-04-21 10:00:36 +02:00
pid = _resolve_practice_id(request)
if not pid:
return JSONResponse(content={"success": True, "tasks": []})
tasks = _load_tasks()
2026-04-21 10:00:36 +02:00
filtered = [t for t in tasks if t.get("practice_id") == pid]
return JSONResponse(content={"success": True, "tasks": filtered})
2026-04-19 20:41:37 +02:00
@router.post("/tasks")
async def empfang_tasks_create(request: Request):
2026-04-21 10:00:36 +02:00
pid = _require_practice_id(request)
2026-05-06 22:43:22 +02:00
s = _session_from_request(request)
2026-04-19 20:41:37 +02:00
try:
body = await request.json()
2026-04-19 20:41:37 +02:00
except Exception:
body = {}
text = (body.get("text") or "").strip()
if not text:
raise HTTPException(status_code=400, detail="Text erforderlich")
2026-05-06 22:43:22 +02:00
title_opt = (body.get("title") or "").strip()
meta_opt = (body.get("source_meta") or "").strip()
peer_opt = (body.get("source_peer") or "").strip()
stid_opt = (body.get("source_thread_id") or "").strip()
task = {
"task_id": uuid.uuid4().hex[:12],
"practice_id": pid,
"text": text,
2026-05-06 22:43:22 +02:00
"title": title_opt or "",
"source_meta": meta_opt or "",
"source_peer": peer_opt or "",
"source_thread_id": stid_opt or "",
"done": False,
"assignee": (body.get("assignee") or "").strip(),
2026-05-06 22:43:22 +02:00
"created_by": (s or {}).get("display_name", "") if s else "",
"created": time.strftime("%Y-%m-%d %H:%M:%S"),
2026-05-06 22:43:22 +02:00
"source_msg_id": (body.get("source_msg_id") or "").strip(),
}
tasks = _load_tasks()
tasks.insert(0, task)
_save_tasks(tasks)
return JSONResponse(content={"success": True, "task": task})
2026-04-19 20:41:37 +02:00
@router.post("/tasks/{task_id}/update")
async def empfang_tasks_update(task_id: str, request: Request):
2026-04-19 20:41:37 +02:00
try:
body = await request.json()
except Exception:
body = {}
tasks = _load_tasks()
target = next((t for t in tasks if t.get("task_id") == task_id), None)
if not target:
raise HTTPException(status_code=404, detail="Aufgabe nicht gefunden")
if "done" in body:
target["done"] = bool(body["done"])
if "text" in body:
target["text"] = (body["text"] or "").strip() or target["text"]
2026-05-06 22:43:22 +02:00
if "title" in body:
target["title"] = (body.get("title") or "").strip()
if "assignee" in body:
2026-05-06 22:43:22 +02:00
target["assignee"] = (body.get("assignee") or "").strip()
if "source_meta" in body:
target["source_meta"] = (body.get("source_meta") or "").strip()
_save_tasks(tasks)
return JSONResponse(content={"success": True, "task": target})
@router.delete("/tasks/{task_id}")
async def empfang_tasks_delete(task_id: str):
tasks = _load_tasks()
tasks = [t for t in tasks if t.get("task_id") != task_id]
_save_tasks(tasks)
return JSONResponse(content={"success": True})
2026-04-19 20:41:37 +02:00
# =====================================================================
# CHANNEL ENDPOINTS (Kanaele)
# =====================================================================
@router.get("/channels")
async def channels_list(request: Request):
"""Kanaele anzeigen, gefiltert nach Rolle des Benutzers."""
s = _require_session(request)
pid = s["practice_id"]
role = s.get("role", "mpa")
_ensure_default_channels(pid)
channels = _load_channels()
visible = []
for c in channels:
if c.get("practice_id") != pid:
continue
allowed = c.get("allowed_roles", [])
if not allowed or role in allowed:
visible.append(c)
return JSONResponse(content={"success": True, "channels": visible})
@router.post("/channels")
async def channels_create(request: Request):
"""Neuen Kanal erstellen (nur Admin)."""
s = _require_admin(request)
try:
body = await request.json()
except Exception:
body = {}
name = (body.get("name") or "").strip()
if not name:
raise HTTPException(status_code=400, detail="Kanalname erforderlich")
scope = body.get("scope", "internal")
if scope not in ("internal", "external"):
scope = "internal"
channel_type = body.get("channel_type", "group")
if channel_type not in ("group", "direct", "external"):
channel_type = "group"
allowed_roles = body.get("allowed_roles", [])
if not isinstance(allowed_roles, list):
allowed_roles = []
channel = {
"channel_id": uuid.uuid4().hex[:12],
"practice_id": s["practice_id"],
"name": name,
"scope": scope,
"channel_type": channel_type,
"allowed_roles": allowed_roles,
"connection_id": body.get("connection_id", ""),
"created": time.strftime("%Y-%m-%d %H:%M:%S"),
"created_by": s["user_id"],
}
channels = _load_channels()
channels.append(channel)
_save_channels(channels)
return JSONResponse(content={"success": True, "channel": channel})
@router.post("/channels/{channel_id}/update")
async def channels_update(channel_id: str, request: Request):
"""Kanal aktualisieren (nur Admin)."""
s = _require_admin(request)
try:
body = await request.json()
except Exception:
body = {}
channels = _load_channels()
target = None
for c in channels:
if c.get("channel_id") == channel_id and c.get("practice_id") == s["practice_id"]:
target = c
break
if not target:
raise HTTPException(status_code=404, detail="Kanal nicht gefunden")
if "name" in body:
new_name = (body["name"] or "").strip()
if new_name:
target["name"] = new_name
if "allowed_roles" in body:
ar = body["allowed_roles"]
if isinstance(ar, list):
target["allowed_roles"] = ar
_save_channels(channels)
return JSONResponse(content={"success": True, "channel": target})
@router.delete("/channels/{channel_id}")
async def channels_delete(channel_id: str, request: Request):
"""Kanal loeschen (nur Admin, keine Default-Kanaele)."""
s = _require_admin(request)
channels = _load_channels()
target = None
for c in channels:
if c.get("channel_id") == channel_id and c.get("practice_id") == s["practice_id"]:
target = c
break
if not target:
raise HTTPException(status_code=404, detail="Kanal nicht gefunden")
default_names = {d["name"] for d in _DEFAULT_CHANNEL_DEFS}
if target.get("name") in default_names and target.get("scope") == "internal":
raise HTTPException(status_code=400,
detail="Standard-Kanaele koennen nicht geloescht werden")
channels = [c for c in channels if c.get("channel_id") != channel_id]
_save_channels(channels)
return JSONResponse(content={"success": True, "deleted": channel_id})
# =====================================================================
# FEDERATION ENDPOINTS (Praxis-zu-Praxis-Verbindungen)
# =====================================================================
@router.post("/federation/invite")
async def federation_invite(request: Request):
"""Einladung zur Praxis-Verbindung erstellen."""
s = _require_admin(request)
try:
body = await request.json()
except Exception:
body = {}
pid = s["practice_id"]
practices = _load_practices()
practice_name = practices.get(pid, {}).get("name", "Unbekannte Praxis")
conn = {
"connection_id": uuid.uuid4().hex[:12],
"practice_a_id": pid,
"practice_b_id": "",
"status": "pending",
"invite_token": secrets.token_urlsafe(24),
"created_by": s["user_id"],
"accepted_by": "",
"created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"accepted_at": "",
"revoked_at": "",
"practice_a_name": practice_name,
"practice_b_name": "",
"message": (body.get("message") or "").strip(),
}
conns = _load_connections()
conns.append(conn)
_save_connections(conns)
return JSONResponse(content={
"success": True,
"connection_id": conn["connection_id"],
"invite_token": conn["invite_token"],
})
@router.post("/federation/accept")
async def federation_accept(request: Request):
"""Verbindungseinladung annehmen."""
s = _require_admin(request)
try:
body = await request.json()
except Exception:
body = {}
invite_token = (body.get("invite_token") or "").strip()
if not invite_token:
raise HTTPException(status_code=400, detail="invite_token erforderlich")
conns = _load_connections()
target = None
for c in conns:
if c.get("invite_token") == invite_token and c.get("status") == "pending":
target = c
break
if not target:
raise HTTPException(status_code=404, detail="Einladung nicht gefunden oder bereits verwendet")
pid_b = s["practice_id"]
if target["practice_a_id"] == pid_b:
raise HTTPException(status_code=400, detail="Kann eigene Einladung nicht annehmen")
practices = _load_practices()
practice_b_name = practices.get(pid_b, {}).get("name", "Unbekannte Praxis")
now = time.strftime("%Y-%m-%d %H:%M:%S")
target["practice_b_id"] = pid_b
target["practice_b_name"] = practice_b_name
target["status"] = "active"
target["accepted_by"] = s["user_id"]
target["accepted_at"] = now
_save_connections(conns)
channel_name = f"{target['practice_a_name']} \u2194 {practice_b_name}"
conn_id = target["connection_id"]
channels = _load_channels()
for practice_id in (target["practice_a_id"], pid_b):
channels.append({
"channel_id": uuid.uuid4().hex[:12],
"practice_id": practice_id,
"name": channel_name,
"scope": "external",
"channel_type": "external",
"allowed_roles": [],
"connection_id": conn_id,
"created": now,
"created_by": s["user_id"],
})
_save_channels(channels)
return JSONResponse(content={
"success": True,
"connection_id": conn_id,
"practice_a": target["practice_a_name"],
"practice_b": practice_b_name,
})
@router.get("/federation/connections")
async def federation_connections(request: Request):
"""Alle Verbindungen der eigenen Praxis anzeigen."""
s = _require_admin(request)
pid = s["practice_id"]
conns = _load_connections()
result = [c for c in conns
if c.get("practice_a_id") == pid or c.get("practice_b_id") == pid]
return JSONResponse(content={"success": True, "connections": result})
@router.post("/federation/connections/{connection_id}/revoke")
async def federation_revoke(connection_id: str, request: Request):
"""Verbindung widerrufen / trennen."""
s = _require_admin(request)
pid = s["practice_id"]
conns = _load_connections()
target = None
for c in conns:
if c.get("connection_id") == connection_id:
if c.get("practice_a_id") == pid or c.get("practice_b_id") == pid:
target = c
break
if not target:
raise HTTPException(status_code=404, detail="Verbindung nicht gefunden")
if target["status"] == "revoked":
raise HTTPException(status_code=400, detail="Verbindung bereits widerrufen")
target["status"] = "revoked"
target["revoked_at"] = time.strftime("%Y-%m-%d %H:%M:%S")
_save_connections(conns)
return JSONResponse(content={"success": True, "connection_id": connection_id, "status": "revoked"})
@router.get("/federation/practices")
async def federation_practices(request: Request):
"""Verbundene Praxen anzeigen (fuer alle authentifizierten Benutzer)."""
s = _require_session(request)
pid = s["practice_id"]
conns = _load_connections()
result = []
for c in conns:
if c.get("status") != "active":
continue
if c.get("practice_a_id") == pid:
result.append({
"practice_id": c.get("practice_b_id"),
"practice_name": c.get("practice_b_name", ""),
"connection_id": c.get("connection_id"),
"status": c.get("status"),
})
elif c.get("practice_b_id") == pid:
result.append({
"practice_id": c.get("practice_a_id"),
"practice_name": c.get("practice_a_name", ""),
"connection_id": c.get("connection_id"),
"status": c.get("status"),
})
return JSONResponse(content={"success": True, "practices": result})
# =====================================================================
# CLEANUP + PRACTICE INFO
# =====================================================================
2026-04-19 20:41:37 +02:00
@router.post("/cleanup")
async def empfang_cleanup(request: Request):
try:
body = await request.json()
except Exception:
body = {}
max_days = int(body.get("max_age_days", 30))
2026-04-21 10:00:36 +02:00
pid = (body.get("practice_id") or "").strip() or _resolve_practice_id(request)
if not pid:
return JSONResponse(content={"success": True, "removed": 0, "remaining": 0})
cutoff = time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(time.time() - max_days * 86400),
)
2026-04-19 20:41:37 +02:00
messages = _load_messages()
before = len(messages)
kept = [
m for m in messages
if _msg_practice(m) != pid
or (m.get("empfangen") or m.get("zeitstempel", "")) >= cutoff
]
2026-04-19 20:41:37 +02:00
removed = before - len(kept)
if removed > 0:
_save_messages(kept)
return JSONResponse(content={
"success": True, "removed": removed, "remaining": len(kept),
})
@router.get("/practice/info")
async def empfang_practice_info(request: Request):
2026-04-21 10:00:36 +02:00
api_token = request.headers.get("X-API-Token", "")
s = _session_from_request(request)
2026-04-21 10:00:36 +02:00
pid = _resolve_practice_id(request)
if not pid:
return JSONResponse(content={"practice_id": "", "practice_name": "",
"user_count": 0, "message_count": 0, "open_count": 0})
_ensure_practice(pid)
users = _practice_users(pid)
messages = _filter_by_practice(_load_messages(), pid)
open_count = sum(1 for m in messages if m.get("status") == "offen")
practices = _load_practices()
p = practices.get(pid, {})
result = {
"practice_id": pid,
"practice_name": p.get("name", ""),
"user_count": len(users),
"message_count": len(messages),
"open_count": open_count,
}
2026-05-08 14:53:53 +02:00
role_l = str(s.get("role") or "").strip().lower() if s else ""
show_invite = bool(api_token) or role_l in ("admin", "empfang")
if show_invite:
result["invite_code"] = p.get("invite_code", "")
2026-05-08 14:53:53 +02:00
if api_token or role_l == "admin":
result["admin_email"] = p.get("admin_email", "")
return JSONResponse(content=result)
2026-04-19 20:41:37 +02:00
2026-05-08 14:53:53 +02:00
# =====================================================================
# Shell session (kurzlebiger Desktop-Web-Huelle-Bootstrap, API-validiert)
# =====================================================================
#
# Nur POST /shell/session darf mittels gueltigem MEDWORK-X-API-Token + Practice
# einen shell_token ausstellen (kein Browser-Cookie als Identitaetsgrundlage).
#
# POST /shell/consume tauscht bearer shell_token gegen normale HttpOnly-Session —
# ohne API-Token, damit spaetere Web-Huelle den Token ohne Desktop-Shared-Secret
# einloesen kann.
_SHELL_TTL_DEFAULT = 300
_SHELL_PURPOSE = "send_to_reception_shell"
_shell_store: dict[str, dict] = {}
def _shell_cleanup_expired() -> None:
now = time.time()
stale = [
tok for tok, rec in _shell_store.items()
if isinstance(rec, dict) and float(rec.get("expires_at", 0)) < now
]
for tok in stale:
try:
del _shell_store[tok]
except KeyError:
pass
def _require_shell_api_identity(request: Request) -> Tuple[str, str]:
"""Validiert X-API-Token wie aza_security; liefert (practice_id, desktop_user_id)."""
api_raw = (request.headers.get("X-API-Token") or "").strip()
if not api_raw:
raise HTTPException(status_code=401, detail="API-Token erforderlich")
try:
from aza_security import get_required_api_tokens
allowed = get_required_api_tokens()
except RuntimeError:
raise HTTPException(status_code=503, detail="API token nicht konfiguriert")
if not any(hmac.compare_digest(api_raw, t) for t in allowed):
raise HTTPException(status_code=401, detail="Unauthorized")
pid = request.headers.get("X-Practice-Id", "").strip()
if not pid:
raise HTTPException(
status_code=400,
detail="X-Practice-Id erforderlich",
)
claimed_uid = (request.headers.get("X-AzA-Empfang-User-Id") or "").strip()
if not claimed_uid:
raise HTTPException(
status_code=403,
detail=(
"X-AzA-Empfang-User-Id fehlt: Desktop muss vom Server ermittelte "
"user_id aus dem Provisionierungs-/Users-Endpunkt mitschicken."
),
)
accounts = _load_accounts()
acc = accounts.get(claimed_uid)
if not acc or (acc.get("practice_id") or "").strip() != pid:
raise HTTPException(
status_code=403,
detail="Benutzer gehoert nicht zu dieser Praxis oder user_id ungueltig",
)
if (acc.get("status") or "active") != "active":
raise HTTPException(status_code=403, detail="Benutzerkonto nicht aktiv")
return pid, claimed_uid
def _consume_shell_token_core(request: Request, shell_raw: str) -> Tuple[str, dict]:
"""POP shell_token, erstelle Serversession. Liefert (aza_session-Wert, Kennzeichen ohne Geheimnis)."""
_shell_cleanup_expired()
stok = (shell_raw or "").strip()
if not stok:
raise HTTPException(
status_code=400,
detail="shell_token fehlt",
)
rec = _shell_store.get(stok)
if not rec:
raise HTTPException(
status_code=401,
detail="Shell-Token ungueltig, verbraucht oder abgelaufen",
)
if time.time() > float(rec.get("expires_at", 0)):
try:
del _shell_store[stok]
except KeyError:
pass
raise HTTPException(status_code=401, detail="Shell-Token abgelaufen")
try:
del _shell_store[stok]
except KeyError:
raise HTTPException(
status_code=401,
detail="Shell-Token ungueltig, verbraucht oder abgelaufen",
)
uid = (rec.get("user_id") or "").strip()
pid = (rec.get("practice_id") or "").strip()
dn = (rec.get("display_name") or "").strip()
role = (rec.get("role") or "mpa").strip()
ua = request.headers.get("User-Agent") or "AzA-Shell-Client"
ip = _extract_client_ip(request)
sess_token = _create_session(
uid,
pid,
dn,
role,
device_id="",
user_agent=ua,
ip_addr=ip,
)
_log.info(
"AZA_SHELL_SESSION_CONSUMED practice=%s user=%s purpose=%s",
pid,
uid,
_SHELL_PURPOSE,
)
public = {
"user_id": uid,
"practice_id": pid,
"display_name": dn,
"role": role,
"purpose": _SHELL_PURPOSE,
}
return sess_token, public
@router.post("/shell/session")
async def empfang_shell_session_create(request: Request):
"""Erzeugt kurzlebigen shell_token (nur mit Desktop-API-Token + validierter user_id)."""
_shell_cleanup_expired()
pid, uid = _require_shell_api_identity(request)
accounts = _load_accounts()
acc = accounts.get(uid) or {}
display_name = (acc.get("display_name") or "").strip() or "Benutzer"
role = (acc.get("role") or "mpa").strip()
now = time.time()
expires_at = now + float(_SHELL_TTL_DEFAULT)
shell_token = secrets.token_urlsafe(32)
_shell_store[shell_token] = {
"practice_id": pid,
"user_id": uid,
"display_name": display_name,
"role": role,
"purpose": _SHELL_PURPOSE,
"expires_at": expires_at,
"created_at": now,
}
expires_iso = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(expires_at))
_log.info(
"AZA_SHELL_SESSION_CREATED practice=%s user=%s purpose=%s ttl=%s",
pid,
uid,
_SHELL_PURPOSE,
int(_SHELL_TTL_DEFAULT),
)
return JSONResponse(content={
"shell_token": shell_token,
"expires_at": expires_iso,
"expires_at_unix": int(expires_at),
"ttl_seconds": int(_SHELL_TTL_DEFAULT),
"practice_id": pid,
"user_id": uid,
"display_name": display_name,
"role": role,
"purpose": _SHELL_PURPOSE,
})
@router.post("/shell/consume")
async def empfang_shell_consume(request: Request):
"""Tauscht shell_token gegen aza_session (HttpOnly Cookie). Einmal-Verbrauch."""
try:
body = await request.json()
except Exception:
body = {}
shell_raw = ""
if isinstance(body, dict):
shell_raw = (body.get("shell_token") or "").strip()
if not shell_raw:
raise HTTPException(
status_code=400,
detail="shell_token im JSON Body erforderlich",
)
sess_token, pub = _consume_shell_token_core(request, shell_raw)
resp = JSONResponse(content={"success": True, **pub})
resp.set_cookie(
"aza_session",
sess_token,
httponly=True,
samesite="lax",
max_age=SESSION_MAX_AGE,
)
return resp
@router.get("/shell/launch")
async def empfang_shell_launch(
request: Request,
token: str = Query("", description="kurzlebiger Shell-Token (einmaliger Verbrauch)"),
):
"""Web-Huelle: Token per GET einloesen — setzt Cookie, Redirect ohne Token in URL.
Einmal-Verbrauch. Kein MEDWORK_API-Token hier (Browser/WebView ohne Desktop-Secret).
"""
sess_token, _pub = _consume_shell_token_core(request, token)
loc = "/empfang/?desktop_shell=1&shell_source=aza_desktop"
resp = RedirectResponse(url=loc, status_code=302)
resp.set_cookie(
"aza_session",
sess_token,
httponly=True,
samesite="lax",
max_age=SESSION_MAX_AGE,
)
return resp
# =====================================================================
# HTML PAGE
# =====================================================================
2026-04-19 20:41:37 +02:00
2026-04-22 22:33:46 +02:00
_HTML_NO_CACHE = {
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
"Content-Security-Policy": "frame-ancestors *;",
}
2026-05-08 14:53:53 +02:00
_LOGO_CACHE = {
"Cache-Control": "public, max-age=86400",
}
@router.get("/aza_logo.png")
async def empfang_logo_png():
"""Statisches Logo fuer Nav-Leiste (liegt neben empfang.html in web/)."""
p = Path(__file__).resolve().parent / "web" / "aza_logo.png"
if p.is_file():
return FileResponse(
path=p,
media_type="image/png",
headers=_LOGO_CACHE,
)
raise HTTPException(status_code=404, detail="aza_logo.png nicht gefunden")
2026-04-22 22:33:46 +02:00
2026-04-16 15:23:14 +02:00
@router.get("/", response_class=HTMLResponse)
async def empfang_page(request: Request):
html_path = Path(__file__).resolve().parent / "web" / "empfang.html"
if html_path.is_file():
2026-04-19 20:41:37 +02:00
return HTMLResponse(
content=html_path.read_text(encoding="utf-8"),
2026-04-22 22:33:46 +02:00
headers=_HTML_NO_CACHE,
2026-04-19 20:41:37 +02:00
)
2026-04-16 15:23:14 +02:00
return HTMLResponse(content="<h1>empfang.html nicht gefunden</h1>", status_code=404)
2026-04-22 22:33:46 +02:00
@router.get("/chatwin.html", response_class=HTMLResponse)
async def empfang_chatwin_page():
"""Kompaktes Chat-Fenster (neues Fenster / Tab), z. B. 1:1 oder Allgemein."""
html_path = Path(__file__).resolve().parent / "web" / "empfang_chat_minimal.html"
if html_path.is_file():
return HTMLResponse(
content=html_path.read_text(encoding="utf-8"),
headers=_HTML_NO_CACHE,
)
return HTMLResponse(content="<h1>chatwin nicht gefunden</h1>", status_code=404)