This commit is contained in:
2026-05-05 23:36:13 +02:00
parent 72ecd579de
commit 2363564013
10324 changed files with 1666654 additions and 180 deletions

View File

@@ -522,8 +522,36 @@ def _extract_client_ip(request: Request) -> str:
# Messages (practice-scoped)
# =====================================================================
EMPFANG_MESSAGE_RETENTION_DAYS = 14
def _msg_timestamp_for_retention(m: dict) -> str:
return (m.get("empfangen") or m.get("zeitstempel") or "").strip()
def _message_within_retention(m: dict, cutoff_str: str) -> bool:
"""Behalten wenn Zeitstempel unbekannt oder >= cutoff (ISO-artige Strings)."""
t = _msg_timestamp_for_retention(m)
if not t:
return True
return t >= cutoff_str
def _prune_messages_by_retention(messages: list[dict]) -> tuple[list[dict], int]:
cutoff_str = time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(time.time() - EMPFANG_MESSAGE_RETENTION_DAYS * 86400),
)
kept = [m for m in messages if _message_within_retention(m, cutoff_str)]
return kept, len(messages) - len(kept)
def _load_messages() -> list[dict]:
return _load_json(_EMPFANG_FILE, [])
messages = _load_json(_EMPFANG_FILE, [])
kept, removed = _prune_messages_by_retention(messages)
if removed > 0:
_save_messages(kept)
return kept
def _save_messages(messages: list[dict]):
@@ -929,13 +957,32 @@ async def auth_provision(request: Request):
email = (body.get("email") or "").strip()
password = (body.get("password") or "").strip()
practice_name = (body.get("practice_name") or "").strip()
invite_code_in = (body.get("invite_code") or "").strip()
if not name:
raise HTTPException(status_code=400, detail="Name erforderlich")
if not password or len(password) < 4:
raise HTTPException(status_code=400, detail="Passwort (min. 4 Zeichen) erforderlich")
pid = request.headers.get("X-Practice-Id", "").strip()
pid = pid or (body.get("practice_id") or "").strip()
pid = ""
# Bewusster Beitritt zu einem bestehenden Praxis-Chat per Einladungscode:
# ueberschreibt X-Practice-Id / practice_id im Body — sonst legt jedes neue
# Geraet ohne gespeicherte practice_id eine eigene Praxis an (Realbefund).
if invite_code_in:
practices = _load_practices()
target_pid = None
for pida, pdata in practices.items():
if (pdata.get("invite_code") or "").strip() == invite_code_in:
target_pid = pida
break
if not target_pid:
raise HTTPException(
status_code=403,
detail="Ungueltiger Chat-Einladungscode — Praxis nicht gefunden.",
)
pid = target_pid
else:
pid = request.headers.get("X-Practice-Id", "").strip()
pid = pid or (body.get("practice_id") or "").strip()
if not pid and email:
try:
@@ -1725,6 +1772,14 @@ async def empfang_send(msg: EmpfangMessage, request: Request):
messages.insert(0, entry)
_save_messages(messages)
# Live-Pulse: sofortiger Tick, damit empfangsseitige Clients das
# Signal in <1 s aufnehmen koennen, statt auf den naechsten 510 s
# Polling-Tick zu warten.
try:
_pulse_bump(pid, sender=absender)
except Exception:
pass
return JSONResponse(content={
"success": True, "id": msg_id, "thread_id": thread_id,
"practice_id": pid,
@@ -1741,6 +1796,194 @@ async def empfang_list(request: Request, practice_id: Optional[str] = Query(None
return JSONResponse(content={"success": True, "messages": filtered})
# =====================================================================
# CONVERSATION + LIVE-PULSE
# Eine einzige serverseitige Wahrheit fuer Browser, Hülle und
# "An Empfang senden". Kein Client-Filter, keine lokale Sonderwahrheit.
# =====================================================================
# In-Memory Pulse: bei jedem POST /send wird der Tick erhoeht.
# Clients koennen mit kurzen Polls (z. B. 800 ms) auf den Tick lauschen
# und nur dann die volle Conversation neu holen, wenn der Tick wechselt.
# Damit wirkt das Signal sofort, ohne traege Sekunden-Lags.
_PRACTICE_PULSE: dict[str, dict] = {}
def _pulse_bump(practice_id: str, sender: str = ""):
p = _PRACTICE_PULSE.setdefault(practice_id, {"tick": 0, "ts": 0.0, "last_sender": ""})
p["tick"] = int(p.get("tick", 0)) + 1
p["ts"] = time.time()
p["last_sender"] = sender or ""
_PRACTICE_PULSE[practice_id] = p
def _pulse_get(practice_id: str) -> dict:
p = _PRACTICE_PULSE.get(practice_id)
if not p:
# Beim ersten Abruf einen Tick aus den Daten ableiten, damit
# Clients nach Server-Restart nicht alle "neue Nachricht!" denken.
msgs = _filter_by_practice(_load_messages(), practice_id)
latest = ""
for m in msgs:
t = (m.get("empfangen") or m.get("zeitstempel") or "")
if t > latest:
latest = t
p = {"tick": 1, "ts": time.time(), "last_sender": "", "boot": latest}
_PRACTICE_PULSE[practice_id] = p
return p
def _norm_name(s: str) -> str:
return (s or "").strip().lower()
def _sender_core(absender: str) -> str:
"""Aus 'Vorname Nachname (HOST)' -> 'Vorname Nachname'."""
s = (absender or "").split("(")[0].strip()
return s
def _msg_recipient(m: dict) -> str:
extras = m.get("extras") or {}
return (extras.get("recipient") or "").strip()
def _normalized_group_key_from_extras(extras: dict) -> str:
"""Canonical key 'name|name|...' lowercase for multi-recipient threads."""
if not isinstance(extras, dict):
return ""
rlist = extras.get("recipients")
if isinstance(rlist, list) and len(rlist) >= 2:
parts = sorted({_norm_name(str(x)) for x in rlist if str(x).strip()})
return "|".join(parts) if parts else ""
rcpt = (extras.get("recipient") or "").strip()
if "," in rcpt:
parts = sorted({_norm_name(p) for p in rcpt.split(",") if p.strip()})
if len(parts) >= 2:
return "|".join(parts)
return ""
def _normalized_group_key_from_message(m: dict) -> str:
return _normalized_group_key_from_extras(m.get("extras") or {})
def _conversation_for_audience(messages: list[dict], me: str, audience: str) -> list[dict]:
"""
Audience-Modell:
- audience leer / 'alle' / 'all' / 'allgemein' -> Broadcast-Verlauf:
Nachrichten ohne expliziten Einzel-/Gruppen-Empfaenger (multi).
- audience = '<benutzername>' -> 1:1 inkl. Thread
(Antworten mit nur reply_to werden ueber thread_id einbezogen).
- audience beginnt mit 'GROUP|' + 'a|b|c' -> Gruppen-Verlauf inkl. Thread.
Sortiert aufsteigend nach Empfangszeit (chronologisch).
"""
me_n = _norm_name(me)
aud_raw = (audience or "").strip()
aud_lower = aud_raw.lower()
is_broadcast = aud_lower in ("", "alle", "all", "allgemein")
def _tid(msg: dict) -> str:
return str(msg.get("thread_id") or msg.get("id") or "")
out: list[dict] = []
if is_broadcast:
for m in messages:
if _normalized_group_key_from_message(m):
continue
rcpt_n = _norm_name(_msg_recipient(m))
if rcpt_n in ("", "alle"):
out.append(m)
out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or ""))
return out
# --- Gruppen-Chat ---
if aud_lower.startswith("group|"):
target_key = aud_lower[len("group|"):].strip()
thread_ids: set[str] = set()
for m in messages:
gk = _normalized_group_key_from_message(m)
if gk == target_key:
thread_ids.add(_tid(m))
for m in messages:
if _tid(m) in thread_ids:
out.append(m)
out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or ""))
return out
# --- 1:1 Direktverlauf (Thread-Schließe fuer reply_to-Antworten) ---
aud_n = _norm_name(aud_raw)
thread_ids: set[str] = set()
for m in messages:
if _normalized_group_key_from_message(m):
continue
tid = _tid(m)
rcpt_n = _norm_name(_msg_recipient(m))
sender_n = _norm_name(_sender_core(m.get("absender", "")))
if me_n:
if sender_n == me_n and rcpt_n == aud_n:
thread_ids.add(tid)
elif sender_n == aud_n and rcpt_n == me_n:
thread_ids.add(tid)
else:
if rcpt_n == aud_n:
thread_ids.add(tid)
for m in messages:
if _tid(m) in thread_ids:
out.append(m)
out.sort(key=lambda x: (x.get("empfangen") or x.get("zeitstempel") or ""))
return out
@router.get("/conversation")
async def empfang_conversation(
request: Request,
audience: str = Query(""),
me: str = Query(""),
practice_id: Optional[str] = Query(None),
):
"""Liefert den vollstaendigen, serverseitig gefilterten Verlauf.
Eine Quelle fuer Browser, Hülle und Desktop-Dialog "An Empfang senden".
"""
pid = (practice_id or "").strip() or _resolve_practice_id(request)
if not pid:
return JSONResponse(content={"success": True, "messages": [], "tick": 0})
s = _session_from_request(request)
me_eff = (me or "").strip() or (s.get("display_name") if s else "")
messages = _filter_by_practice(_load_messages(), pid)
conv = _conversation_for_audience(messages, me_eff, audience)
pulse = _pulse_get(pid)
return JSONResponse(content={
"success": True,
"messages": conv,
"audience": audience or "",
"me": me_eff,
"tick": int(pulse.get("tick", 0)),
"ts": pulse.get("ts", 0.0),
})
@router.get("/pulse")
async def empfang_pulse(request: Request, practice_id: Optional[str] = Query(None)):
"""Sehr leichter Endpoint fuer Live-Pulse.
Clients pollen kurz (z. B. 800 ms) und holen die Conversation nur dann
neu, wenn sich 'tick' geaendert hat. Damit erscheint das Signal sofort
und ohne 510 s Verzoegerung der alten Polling-Loop.
"""
pid = (practice_id or "").strip() or _resolve_practice_id(request)
if not pid:
return JSONResponse(content={"tick": 0, "ts": 0.0})
p = _pulse_get(pid)
return JSONResponse(content={
"tick": int(p.get("tick", 0)),
"ts": float(p.get("ts", 0.0)),
"last_sender": p.get("last_sender", ""),
})
@router.get("/thread/{thread_id}")
async def empfang_thread(thread_id: str, request: Request,
practice_id: Optional[str] = Query(None)):
@@ -1766,6 +2009,10 @@ async def empfang_done(msg_id: str):
if m.get("thread_id") == tid and _msg_practice(m) == pid:
m["status"] = "erledigt"
_save_messages(messages)
try:
_pulse_bump(pid, sender="")
except Exception:
pass
return JSONResponse(content={"success": True})
@@ -1784,6 +2031,10 @@ async def empfang_delete(msg_id: str):
else:
new = [m for m in messages if m.get("id") != msg_id]
_save_messages(new)
try:
_pulse_bump(pid, sender="")
except Exception:
pass
return JSONResponse(content={"success": True})