# -*- coding: utf-8 -*- """Client- und Server-Sync für private und veröffentlichte Doku-Prompts. Private Prompts: erweitert aza_sync_items (item_type=doku_prompt). Öffentliche Prompts: eigene SQLite-Tabelle published_doku_prompts. Kein Deploy in diesem Block — Routen werden lokal registriert. """ from __future__ import annotations import json import re import sqlite3 import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from fastapi import Depends, HTTPException, Request from pydantic import BaseModel, Field from aza_security import require_api_token MAX_PROMPT_CHARS = 32000 # --------------------------------------------------------------------------- # Server DB # --------------------------------------------------------------------------- def ensure_published_doku_prompts_schema(con: sqlite3.Connection) -> None: con.execute( """ CREATE TABLE IF NOT EXISTS published_doku_prompts ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, practice_id TEXT NOT NULL, doc_type TEXT NOT NULL, title TEXT NOT NULL, description TEXT, content TEXT NOT NULL, author_display TEXT, revision INTEGER DEFAULT 1, status TEXT DEFAULT 'published', language TEXT, specialty TEXT, created_at INTEGER, updated_at INTEGER ) """ ) con.execute( "CREATE INDEX IF NOT EXISTS idx_pub_doku_type_status " "ON published_doku_prompts(doc_type, status, updated_at)" ) cols = {r[1] for r in con.execute("PRAGMA table_info(published_doku_prompts)").fetchall()} if "city" not in cols: con.execute("ALTER TABLE published_doku_prompts ADD COLUMN city TEXT DEFAULT ''") con.commit() ensure_published_doku_prompt_ratings_schema(con) VALID_HALF_STAR_RATINGS: Tuple[float, ...] = ( 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, ) def ensure_published_doku_prompt_ratings_schema(con: sqlite3.Connection) -> None: con.execute( """ CREATE TABLE IF NOT EXISTS published_doku_prompt_ratings ( id INTEGER PRIMARY KEY AUTOINCREMENT, public_prompt_id TEXT NOT NULL, rater_user_id TEXT NOT NULL, rating_value REAL NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, UNIQUE(public_prompt_id, rater_user_id) ) """ ) con.execute( "CREATE INDEX IF NOT EXISTS idx_pub_doku_ratings_prompt " "ON published_doku_prompt_ratings(public_prompt_id)" ) con.commit() def normalize_rating_value(value: Any) -> Optional[float]: try: v = float(value) except (TypeError, ValueError): return None if abs(v * 2 - round(v * 2)) > 1e-9: return None v = round(v * 2) / 2.0 if v not in VALID_HALF_STAR_RATINGS: return None return float(v) def _round_rating_average(avg: Optional[float]) -> Optional[float]: if avg is None: return None try: return round(float(avg) * 10) / 10.0 except (TypeError, ValueError): return None def fetch_rating_aggregates_batch( con: sqlite3.Connection, public_ids: List[str], current_user_id: str, ) -> Dict[str, Dict[str, Any]]: """Aggregate + my_rating fuer mehrere public_prompt_ids (kein N+1).""" ids = [str(i).strip() for i in (public_ids or []) if str(i).strip()] out: Dict[str, Dict[str, Any]] = { pid: {"rating_average": None, "rating_count": 0, "my_rating": None} for pid in ids } if not ids: return out placeholders = ",".join("?" for _ in ids) rows = con.execute( f""" SELECT public_prompt_id, AVG(rating_value) AS avg_v, COUNT(*) AS cnt FROM published_doku_prompt_ratings WHERE public_prompt_id IN ({placeholders}) GROUP BY public_prompt_id """, ids, ).fetchall() for pid, avg_v, cnt in rows: out[str(pid)] = { "rating_average": _round_rating_average(avg_v), "rating_count": int(cnt or 0), "my_rating": None, } uid = (current_user_id or "").strip() if uid: mine = con.execute( f""" SELECT public_prompt_id, rating_value FROM published_doku_prompt_ratings WHERE public_prompt_id IN ({placeholders}) AND rater_user_id = ? """, ids + [uid], ).fetchall() for pid, rv in mine: key = str(pid) if key in out: out[key]["my_rating"] = float(rv) return out def compute_can_rate(*, author_user_id: str, current_user_id: str, published: bool) -> bool: if not published: return False au = (author_user_id or "").strip() cu = (current_user_id or "").strip() if not cu or not au: return False return au != cu def apply_rating_fields_to_public_item( item: Dict[str, Any], *, current_user_id: str, agg: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: author_uid = str(item.get("author_user_id") or item.get("user_id") or "").strip() published = str(item.get("status") or "published").strip().lower() == "published" if not published and "status" not in item: published = True base = dict(item) a = agg or {} avg = a.get("rating_average", item.get("rating_average")) cnt = int(a.get("rating_count", item.get("rating_count") or 0) or 0) my = a.get("my_rating", item.get("my_rating")) base["rating_average"] = _round_rating_average(avg) if cnt else None base["rating_count"] = cnt base["my_rating"] = normalize_rating_value(my) if my is not None else None base["can_rate"] = compute_can_rate( author_user_id=author_uid, current_user_id=current_user_id, published=published, ) return base def upsert_public_prompt_rating( con: sqlite3.Connection, *, public_prompt_id: str, rater_user_id: str, rating_value: float, ) -> Dict[str, Any]: pid = (public_prompt_id or "").strip() uid = (rater_user_id or "").strip() rv = normalize_rating_value(rating_value) if not pid or not uid: raise ValueError("missing ids") if rv is None: raise ValueError("invalid rating") row = con.execute( "SELECT user_id, status FROM published_doku_prompts WHERE id = ?", (pid,), ).fetchone() if not row: raise LookupError("not found") author_uid, status = str(row[0] or ""), str(row[1] or "") if status != "published": raise PermissionError("not published") if author_uid == uid: raise PermissionError("own template") now = _now_ts() con.execute( """ INSERT INTO published_doku_prompt_ratings (public_prompt_id, rater_user_id, rating_value, created_at, updated_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(public_prompt_id, rater_user_id) DO UPDATE SET rating_value = excluded.rating_value, updated_at = excluded.updated_at """, (pid, uid, rv, now, now), ) con.commit() agg = fetch_rating_aggregates_batch(con, [pid], uid).get(pid, {}) return apply_rating_fields_to_public_item( {"id": pid, "author_user_id": author_uid, "status": status}, current_user_id=uid, agg=agg, ) def _now_ts() -> int: return int(time.time()) def _sanitize_server_text(text: str) -> str: """Plaintext only — serverseitig ohne Desktop/tkinter-Import.""" if not text: return "" s = str(text) s = re.sub(r"(?is)]*>.*?", "", s) s = re.sub(r"(?is)]*>.*?", "", s) s = re.sub(r"<[^>]+>", "", s) s = re.sub(r"(?i)javascript\s*:", "", s) s = re.sub(r"(?i)data\s*:", "", s) s = s.replace("\x00", "") if len(s) > MAX_PROMPT_CHARS: s = s[:MAX_PROMPT_CHARS] return s.strip() def _resolve_auth_context(con: sqlite3.Connection, request: Request) -> Tuple[str, str]: from aza_sync_items import _resolve_practice_id_for_request practice_id = _resolve_practice_id_for_request(con, request) user_id = (request.headers.get("X-User-Id") or request.headers.get("X-Empfang-User-Id") or "").strip() if not user_id: raise HTTPException(status_code=403, detail="user_id required") if practice_id == "default": raise HTTPException(status_code=403, detail="invalid practice_id") return user_id, practice_id class PublishDokuPromptIn(BaseModel): doc_type: str title: str = Field(min_length=1, max_length=200) description: str = Field(default="", max_length=1000) content: str = Field(min_length=1, max_length=MAX_PROMPT_CHARS) author_display: str = Field(default="", max_length=120) server_id: Optional[str] = "" language: Optional[str] = "" specialty: Optional[str] = "" city: Optional[str] = "" visibility: Optional[str] = "public" class RateDokuPromptIn(BaseModel): rating: float def register_doku_prompt_routes(app, *, stripe_db_path_fn) -> None: """Registriert /v1/doku-prompts/* — lokal, Deploy separat.""" @app.get("/v1/doku-prompts/public", dependencies=[Depends(require_api_token)]) def doku_prompts_public_list(request: Request): db_path = stripe_db_path_fn() if not Path(db_path).exists(): raise HTTPException(status_code=503, detail="database missing") with sqlite3.connect(db_path) as con: ensure_published_doku_prompts_schema(con) user_id, _practice_id = _resolve_auth_context(con, request) rows = con.execute( """ SELECT id, user_id, practice_id, doc_type, title, description, content, author_display, revision, updated_at, specialty, city FROM published_doku_prompts WHERE status = 'published' ORDER BY doc_type, updated_at DESC """ ).fetchall() pub_ids = [str(r[0]) for r in rows] agg_map = fetch_rating_aggregates_batch(con, pub_ids, user_id) items = [] for r in rows: raw = normalize_public_doku_item({ "id": r[0], "user_id": r[1], "practice_id": r[2], "doc_type": r[3], "title": r[4], "description": r[5] or "", "content": r[6], "author_display": r[7] or "", "revision": int(r[8] or 1), "updated_at": int(r[9] or 0), "specialty": r[10] or "", "city": r[11] or "", "visibility": "public", "status": "published", }) items.append( apply_rating_fields_to_public_item( raw, current_user_id=user_id, agg=agg_map.get(str(r[0])), ) ) return {"ok": True, "items": items, "count": len(items)} @app.post("/v1/doku-prompts/rate/{pub_id}", dependencies=[Depends(require_api_token)]) def doku_prompts_rate(pub_id: str, request: Request, body: RateDokuPromptIn): db_path = stripe_db_path_fn() if not Path(db_path).exists(): raise HTTPException(status_code=503, detail="database missing") rv = normalize_rating_value(body.rating) if rv is None: raise HTTPException(status_code=400, detail="invalid rating") with sqlite3.connect(db_path) as con: ensure_published_doku_prompts_schema(con) user_id, _practice_id = _resolve_auth_context(con, request) try: result = upsert_public_prompt_rating( con, public_prompt_id=pub_id, rater_user_id=user_id, rating_value=rv, ) except LookupError: raise HTTPException(status_code=404, detail="not found") from None except PermissionError as exc: msg = str(exc) if msg == "own template": raise HTTPException(status_code=403, detail="cannot rate own template") from None raise HTTPException(status_code=403, detail="not published") from None return {"ok": True, **result} @app.post("/v1/doku-prompts/publish", dependencies=[Depends(require_api_token)]) def doku_prompts_publish(request: Request, body: PublishDokuPromptIn): db_path = stripe_db_path_fn() if not Path(db_path).exists(): raise HTTPException(status_code=503, detail="database missing") content = _sanitize_server_text(body.content) if not content: raise HTTPException(status_code=400, detail="empty content") with sqlite3.connect(db_path) as con: ensure_published_doku_prompts_schema(con) user_id, practice_id = _resolve_auth_context(con, request) now = _now_ts() pub_id = (body.server_id or "").strip() or f"pub_{user_id}_{now}" existing = con.execute( "SELECT user_id, revision, created_at FROM published_doku_prompts WHERE id = ?", (pub_id,), ).fetchone() if existing and existing[0] != user_id: raise HTTPException(status_code=403, detail="not owner") rev = int(existing[1] or 0) + 1 if existing else 1 created_at = int(existing[2] or now) if existing else now con.execute( """ INSERT INTO published_doku_prompts (id, user_id, practice_id, doc_type, title, description, content, author_display, revision, status, language, specialty, city, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'published', ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET doc_type=excluded.doc_type, title=excluded.title, description=excluded.description, content=excluded.content, author_display=excluded.author_display, revision=excluded.revision, status='published', language=excluded.language, specialty=excluded.specialty, city=excluded.city, updated_at=excluded.updated_at """, ( pub_id, user_id, practice_id, body.doc_type.strip(), body.title.strip(), body.description.strip(), content, body.author_display.strip(), rev, (body.language or "").strip(), (body.specialty or "").strip(), (body.city or "").strip(), created_at, now, ), ) con.commit() return {"ok": True, "id": pub_id, "revision": rev} @app.post("/v1/doku-prompts/unpublish/{pub_id}", dependencies=[Depends(require_api_token)]) def doku_prompts_unpublish(pub_id: str, request: Request): db_path = stripe_db_path_fn() with sqlite3.connect(db_path) as con: ensure_published_doku_prompts_schema(con) user_id, _ = _resolve_auth_context(con, request) row = con.execute( "SELECT user_id FROM published_doku_prompts WHERE id = ?", (pub_id,), ).fetchone() if not row: raise HTTPException(status_code=404, detail="not found") if row[0] != user_id: raise HTTPException(status_code=403, detail="not owner") con.execute( "UPDATE published_doku_prompts SET status='unpublished', updated_at=? WHERE id=?", (_now_ts(), pub_id), ) con.commit() return {"ok": True, "id": pub_id, "status": "unpublished"} # --------------------------------------------------------------------------- # Client # --------------------------------------------------------------------------- def _sync_headers(app) -> Optional[Dict[str, str]]: try: from aza_sync_items import _sync_headers h = _sync_headers(app) if not h: return None uid = "" if hasattr(app, "_empfang_self_user_id"): uid = (app._empfang_self_user_id() or "").strip() if uid: h["X-User-Id"] = uid h["X-Empfang-User-Id"] = uid return h except Exception: return None def _http_get(url: str, headers: Dict[str, str]) -> Tuple[int, Any]: import requests r = requests.get(url, headers=headers, timeout=(3, 15)) try: return r.status_code, r.json() except Exception: return r.status_code, {} def _http_post(url: str, headers: Dict[str, str], body: Any) -> Tuple[int, Any]: import requests r = requests.post(url, headers=headers, json=body, timeout=(3, 20)) try: return r.status_code, r.json() except Exception: raw = (r.text or "").strip() return r.status_code, {"detail": raw} if raw else {} def local_private_templates_to_sync_items(data: dict) -> List[Dict[str, Any]]: """Serialisiert nicht-system Vorlagen für Sync.""" items = [] order = 0 for doc_type, tpls in (data.get("templates") or {}).items(): if not isinstance(tpls, list): continue for t in tpls: if not isinstance(t, dict) or t.get("is_system"): continue order += 1 payload = { "doc_type": doc_type, "name": t.get("name") or "", "content": t.get("content") or "", "description": t.get("description") or "", "revision": int(t.get("revision") or 1), "updated_at": t.get("updated_at") or "", "server_id": t.get("server_id") or "", } items.append( { "id": t.get("id") or f"doku_{doc_type}_{order}", "item_type": "doku_prompt", "title": t.get("name") or doc_type, "trigger": doc_type, "content": json.dumps(payload, ensure_ascii=False), "sort_order": order, "is_active": True, "created_by": t.get("user_id") or "", "updated_by": t.get("user_id") or "", } ) return items def merge_server_doku_items_local(data: dict, server_items: List[Dict[str, Any]]) -> tuple[dict, bool]: """Neueste Revision gewinnt; Konflikte werden markiert.""" changed = False conflicts = data.setdefault("sync_meta", {}).setdefault("conflicts", []) for it in server_items: if (it.get("item_type") or "") != "doku_prompt": continue try: payload = json.loads(it.get("content") or "{}") except Exception: continue doc_type = payload.get("doc_type") or it.get("trigger") or "" if not doc_type: continue tpl_id = it.get("id") or "" if not tpl_id: continue local_tpls = data.setdefault("templates", {}).setdefault(doc_type, []) local = next((x for x in local_tpls if isinstance(x, dict) and x.get("id") == tpl_id), None) srv_rev = int(payload.get("revision") or 0) loc_rev = int(local.get("revision") or 0) if local else 0 srv_ts = payload.get("updated_at") or "" loc_ts = local.get("updated_at") or "" if local else "" if local and loc_rev > srv_rev and loc_ts > srv_ts: conflicts.append({"id": tpl_id, "doc_type": doc_type, "at": datetime.now(timezone.utc).isoformat()}) continue entry = { "id": tpl_id, "name": payload.get("name") or it.get("title") or "Sync-Vorlage", "is_system": False, "content": payload.get("content") or "", "description": payload.get("description") or "", "created_at": local.get("created_at") if local else srv_ts, "updated_at": srv_ts or _utc_now_iso(), "revision": srv_rev or 1, "user_id": it.get("created_by") or "", "practice_id": "", "server_id": payload.get("server_id") or "", "published": bool(local.get("published")) if local else False, "sync_updated_at": srv_ts, "source_author": local.get("source_author", "") if local else "", "source_server_id": local.get("source_server_id", "") if local else "", } if local: local.update(entry) else: local_tpls.append(entry) changed = True if changed: data["sync_meta"]["last_sync_at"] = _utc_now_iso() return data, changed def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def sync_doku_prompts_with_server(app) -> bool: from aza_doku_vorlagen import _load, _save from aza_sync_items import _fetch_server_items, _upload_items_bulk headers = _sync_headers(app) if not headers: return False server_items = _fetch_server_items(app, "doku_prompt") if server_items is None: return False data = _load() if server_items: data, changed = merge_server_doku_items_local(data, server_items) if changed: _save(data) local_items = local_private_templates_to_sync_items(data) if local_items: return _upload_items_bulk(app, local_items) return True def normalize_public_doku_item(raw: Dict[str, Any]) -> Dict[str, Any]: """Einheitliches Client-Format für öffentliche Doku-Prompts.""" if not isinstance(raw, dict): return {} sid = str(raw.get("id") or raw.get("server_id") or "").strip() author_uid = str(raw.get("author_user_id") or raw.get("user_id") or "").strip() city = str(raw.get("city") or raw.get("location") or raw.get("region") or "").strip() return { "id": sid, "server_id": sid, "doc_type": str(raw.get("doc_type") or "").strip(), "title": str(raw.get("title") or raw.get("name") or "").strip(), "description": str(raw.get("description") or "").strip(), "content": str(raw.get("content") or raw.get("prompt") or "").strip(), "author_display": str(raw.get("author_display") or raw.get("author") or "").strip(), "author_user_id": author_uid, "specialty": str(raw.get("specialty") or "").strip(), "city": city, "location": city, "practice_id": str(raw.get("practice_id") or "").strip(), "visibility": str(raw.get("visibility") or raw.get("scope") or "public").strip(), "revision": int(raw.get("revision") or 1), "updated_at": int(raw.get("updated_at") or 0), "rating_average": raw.get("rating_average"), "rating_count": int(raw.get("rating_count") or 0), "my_rating": raw.get("my_rating"), "can_rate": bool(raw.get("can_rate")), } def normalize_public_doku_items(items: List[Any]) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] for it in items or []: norm = normalize_public_doku_item(it if isinstance(it, dict) else {}) if norm.get("id") or norm.get("title"): out.append(norm) return out def fetch_public_doku_prompts_result(app) -> Tuple[List[Dict[str, Any]], Optional[str]]: """Lädt öffentliche Vorlagen; Fehler-Tags: __ROUTE_MISSING__, __CONN_ERROR__.""" headers = _sync_headers(app) if not headers: return [], None try: bu = app.get_backend_url().rstrip("/") except Exception: return [], "__CONN_ERROR__" try: code, data = _http_get(f"{bu}/v1/doku-prompts/public", headers) except Exception: return [], "__CONN_ERROR__" if code in (404, 405): return [], "__ROUTE_MISSING__" if code != 200 or not isinstance(data, dict) or not data.get("ok"): detail = "" if isinstance(data, dict): detail = str(data.get("detail") or data.get("message") or "").strip().lower() if detail in ("not found", "notfound"): return [], "__ROUTE_MISSING__" return [], None items = data.get("items") if not isinstance(items, list): return [], None return normalize_public_doku_items(items), None def fetch_public_doku_prompts(app) -> List[Dict[str, Any]]: items, _err = fetch_public_doku_prompts_result(app) return items def _profile_publish_extras(app) -> Dict[str, str]: prof = getattr(app, "_user_profile", {}) or {} specialty = str(prof.get("specialty") or "").strip() city = str(prof.get("city") or prof.get("location") or prof.get("region") or "").strip() return {"specialty": specialty, "city": city} def resolve_author_display(app) -> str: """Verfasser-Anzeigename: bevorzugt Profil-Anzeigename, sonst Signaturname.""" try: prof = getattr(app, "_user_profile", {}) or {} for k in ("display_name", "name"): v = str(prof.get(k) or "").strip() if v: return v except Exception: pass try: from aza_persistence import load_signature_name return (load_signature_name() or "").strip() except Exception: return "" def publish_doku_prompt( app, doc_type: str, tpl: dict, title: str, description: str = "", ) -> Tuple[bool, str]: """Veröffentlicht eine Vorlage. Rückgabe (ok, message). Sentinel-Meldungen für saubere UI-Übersetzung (kein rohes "Not Found"): "__ROUTE_MISSING__" → Route nicht deployt (404) "__CONN_ERROR__" → Server nicht erreichbar / Timeout """ headers = _sync_headers(app) if not headers: return False, "Nicht angemeldet oder keine Praxis-Zuordnung." try: bu = app.get_backend_url().rstrip("/") except Exception: return False, "Backend nicht konfiguriert." author = resolve_author_display(app) extras = _profile_publish_extras(app) body = { "doc_type": doc_type, "title": title, "description": description, "content": tpl.get("content") or "", "author_display": author, "server_id": tpl.get("server_id") or tpl.get("id") or "", "language": "de", "specialty": (tpl.get("specialty") or extras.get("specialty") or "").strip(), "city": (tpl.get("city") or tpl.get("location") or extras.get("city") or "").strip(), "visibility": "public", } try: code, data = _http_post(f"{bu}/v1/doku-prompts/publish", headers, body) except Exception: return False, "__CONN_ERROR__" if code == 200 and isinstance(data, dict) and data.get("ok"): return True, "" if code in (404, 405): # Route lokal vorhanden, aber auf dem Live-Server (noch) nicht deployt. return False, "__ROUTE_MISSING__" detail = "" if isinstance(data, dict): detail = str(data.get("detail") or data.get("message") or "") if detail.strip().lower() in ("not found", "notfound"): return False, "__ROUTE_MISSING__" return False, detail or f"Server-Fehler ({code})" def unpublish_doku_prompt(app, pub_id: str) -> Tuple[bool, str]: """Entfernt eigene Veröffentlichung vom Server (falls Route live).""" headers = _sync_headers(app) if not headers: return False, "Nicht angemeldet oder keine Praxis-Zuordnung." pid = (pub_id or "").strip() if not pid: return False, "Keine Veröffentlichungs-ID." try: bu = app.get_backend_url().rstrip("/") except Exception: return False, "Backend nicht konfiguriert." try: code, data = _http_post(f"{bu}/v1/doku-prompts/unpublish/{pid}", headers, {}) except Exception: return False, "__CONN_ERROR__" if code == 200 and isinstance(data, dict) and data.get("ok"): return True, "" if code in (404, 405): return False, "__ROUTE_MISSING__" detail = "" if isinstance(data, dict): detail = str(data.get("detail") or data.get("message") or "") if detail.strip().lower() in ("not found", "notfound"): return False, "__ROUTE_MISSING__" return False, detail or f"Server-Fehler ({code})" def rate_public_doku_prompt_result( app, pub_id: str, rating: float, ) -> Tuple[bool, str, Dict[str, Any]]: """Bewertet fremde oeffentliche Vorlage. Rueckgabe (ok, message, payload).""" headers = _sync_headers(app) if not headers: return False, "Nicht angemeldet oder keine Praxis-Zuordnung.", {} pid = (pub_id or "").strip() if not pid: return False, "Keine Veröffentlichungs-ID.", {} rv = normalize_rating_value(rating) if rv is None: return False, "Ungültige Bewertung (0,5–5,0 in 0,5-Schritten).", {} try: bu = app.get_backend_url().rstrip("/") except Exception: return False, "Backend nicht konfiguriert.", {} try: code, data = _http_post( f"{bu}/v1/doku-prompts/rate/{pid}", headers, {"rating": rv}, ) except Exception: return False, "__CONN_ERROR__", {} if code == 200 and isinstance(data, dict) and data.get("ok"): payload = {k: data.get(k) for k in ( "rating_average", "rating_count", "my_rating", "can_rate", )} return True, "", payload if code in (404, 405): return False, "__ROUTE_MISSING__", {} if code == 403: return False, "Diese Vorlage kann nicht bewertet werden.", {} if code == 400: return False, "Ungültige Bewertung.", {} detail = "" if isinstance(data, dict): detail = str(data.get("detail") or data.get("message") or "") if detail.strip().lower() in ("not found", "notfound"): return False, "__ROUTE_MISSING__", {} return False, detail or f"Server-Fehler ({code})", {}