801 lines
30 KiB
Python
801 lines
30 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""Client- und Server-Sync für private und veröffentlichte Doku-Prompts.
|
||
|
||
Private Prompts: erweitert aza_sync_items (item_type=doku_prompt).
|
||
Öffentliche Prompts: eigene SQLite-Tabelle published_doku_prompts.
|
||
|
||
Kein Deploy in diesem Block — Routen werden lokal registriert.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import re
|
||
import sqlite3
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
from fastapi import Depends, HTTPException, Request
|
||
from pydantic import BaseModel, Field
|
||
|
||
from aza_security import require_api_token
|
||
|
||
MAX_PROMPT_CHARS = 32000
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Server DB
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def ensure_published_doku_prompts_schema(con: sqlite3.Connection) -> None:
|
||
con.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS published_doku_prompts (
|
||
id TEXT PRIMARY KEY,
|
||
user_id TEXT NOT NULL,
|
||
practice_id TEXT NOT NULL,
|
||
doc_type TEXT NOT NULL,
|
||
title TEXT NOT NULL,
|
||
description TEXT,
|
||
content TEXT NOT NULL,
|
||
author_display TEXT,
|
||
revision INTEGER DEFAULT 1,
|
||
status TEXT DEFAULT 'published',
|
||
language TEXT,
|
||
specialty TEXT,
|
||
created_at INTEGER,
|
||
updated_at INTEGER
|
||
)
|
||
"""
|
||
)
|
||
con.execute(
|
||
"CREATE INDEX IF NOT EXISTS idx_pub_doku_type_status "
|
||
"ON published_doku_prompts(doc_type, status, updated_at)"
|
||
)
|
||
cols = {r[1] for r in con.execute("PRAGMA table_info(published_doku_prompts)").fetchall()}
|
||
if "city" not in cols:
|
||
con.execute("ALTER TABLE published_doku_prompts ADD COLUMN city TEXT DEFAULT ''")
|
||
con.commit()
|
||
ensure_published_doku_prompt_ratings_schema(con)
|
||
|
||
|
||
VALID_HALF_STAR_RATINGS: Tuple[float, ...] = (
|
||
0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0,
|
||
)
|
||
|
||
|
||
def ensure_published_doku_prompt_ratings_schema(con: sqlite3.Connection) -> None:
|
||
con.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS published_doku_prompt_ratings (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
public_prompt_id TEXT NOT NULL,
|
||
rater_user_id TEXT NOT NULL,
|
||
rating_value REAL NOT NULL,
|
||
created_at INTEGER NOT NULL,
|
||
updated_at INTEGER NOT NULL,
|
||
UNIQUE(public_prompt_id, rater_user_id)
|
||
)
|
||
"""
|
||
)
|
||
con.execute(
|
||
"CREATE INDEX IF NOT EXISTS idx_pub_doku_ratings_prompt "
|
||
"ON published_doku_prompt_ratings(public_prompt_id)"
|
||
)
|
||
con.commit()
|
||
|
||
|
||
def normalize_rating_value(value: Any) -> Optional[float]:
|
||
try:
|
||
v = float(value)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
if abs(v * 2 - round(v * 2)) > 1e-9:
|
||
return None
|
||
v = round(v * 2) / 2.0
|
||
if v not in VALID_HALF_STAR_RATINGS:
|
||
return None
|
||
return float(v)
|
||
|
||
|
||
def _round_rating_average(avg: Optional[float]) -> Optional[float]:
|
||
if avg is None:
|
||
return None
|
||
try:
|
||
return round(float(avg) * 10) / 10.0
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
def fetch_rating_aggregates_batch(
|
||
con: sqlite3.Connection,
|
||
public_ids: List[str],
|
||
current_user_id: str,
|
||
) -> Dict[str, Dict[str, Any]]:
|
||
"""Aggregate + my_rating fuer mehrere public_prompt_ids (kein N+1)."""
|
||
ids = [str(i).strip() for i in (public_ids or []) if str(i).strip()]
|
||
out: Dict[str, Dict[str, Any]] = {
|
||
pid: {"rating_average": None, "rating_count": 0, "my_rating": None}
|
||
for pid in ids
|
||
}
|
||
if not ids:
|
||
return out
|
||
placeholders = ",".join("?" for _ in ids)
|
||
rows = con.execute(
|
||
f"""
|
||
SELECT public_prompt_id, AVG(rating_value) AS avg_v, COUNT(*) AS cnt
|
||
FROM published_doku_prompt_ratings
|
||
WHERE public_prompt_id IN ({placeholders})
|
||
GROUP BY public_prompt_id
|
||
""",
|
||
ids,
|
||
).fetchall()
|
||
for pid, avg_v, cnt in rows:
|
||
out[str(pid)] = {
|
||
"rating_average": _round_rating_average(avg_v),
|
||
"rating_count": int(cnt or 0),
|
||
"my_rating": None,
|
||
}
|
||
uid = (current_user_id or "").strip()
|
||
if uid:
|
||
mine = con.execute(
|
||
f"""
|
||
SELECT public_prompt_id, rating_value
|
||
FROM published_doku_prompt_ratings
|
||
WHERE public_prompt_id IN ({placeholders}) AND rater_user_id = ?
|
||
""",
|
||
ids + [uid],
|
||
).fetchall()
|
||
for pid, rv in mine:
|
||
key = str(pid)
|
||
if key in out:
|
||
out[key]["my_rating"] = float(rv)
|
||
return out
|
||
|
||
|
||
def compute_can_rate(*, author_user_id: str, current_user_id: str, published: bool) -> bool:
|
||
if not published:
|
||
return False
|
||
au = (author_user_id or "").strip()
|
||
cu = (current_user_id or "").strip()
|
||
if not cu or not au:
|
||
return False
|
||
return au != cu
|
||
|
||
|
||
def apply_rating_fields_to_public_item(
|
||
item: Dict[str, Any],
|
||
*,
|
||
current_user_id: str,
|
||
agg: Optional[Dict[str, Any]] = None,
|
||
) -> Dict[str, Any]:
|
||
author_uid = str(item.get("author_user_id") or item.get("user_id") or "").strip()
|
||
published = str(item.get("status") or "published").strip().lower() == "published"
|
||
if not published and "status" not in item:
|
||
published = True
|
||
base = dict(item)
|
||
a = agg or {}
|
||
avg = a.get("rating_average", item.get("rating_average"))
|
||
cnt = int(a.get("rating_count", item.get("rating_count") or 0) or 0)
|
||
my = a.get("my_rating", item.get("my_rating"))
|
||
base["rating_average"] = _round_rating_average(avg) if cnt else None
|
||
base["rating_count"] = cnt
|
||
base["my_rating"] = normalize_rating_value(my) if my is not None else None
|
||
base["can_rate"] = compute_can_rate(
|
||
author_user_id=author_uid,
|
||
current_user_id=current_user_id,
|
||
published=published,
|
||
)
|
||
return base
|
||
|
||
|
||
def upsert_public_prompt_rating(
|
||
con: sqlite3.Connection,
|
||
*,
|
||
public_prompt_id: str,
|
||
rater_user_id: str,
|
||
rating_value: float,
|
||
) -> Dict[str, Any]:
|
||
pid = (public_prompt_id or "").strip()
|
||
uid = (rater_user_id or "").strip()
|
||
rv = normalize_rating_value(rating_value)
|
||
if not pid or not uid:
|
||
raise ValueError("missing ids")
|
||
if rv is None:
|
||
raise ValueError("invalid rating")
|
||
row = con.execute(
|
||
"SELECT user_id, status FROM published_doku_prompts WHERE id = ?",
|
||
(pid,),
|
||
).fetchone()
|
||
if not row:
|
||
raise LookupError("not found")
|
||
author_uid, status = str(row[0] or ""), str(row[1] or "")
|
||
if status != "published":
|
||
raise PermissionError("not published")
|
||
if author_uid == uid:
|
||
raise PermissionError("own template")
|
||
now = _now_ts()
|
||
con.execute(
|
||
"""
|
||
INSERT INTO published_doku_prompt_ratings
|
||
(public_prompt_id, rater_user_id, rating_value, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
ON CONFLICT(public_prompt_id, rater_user_id) DO UPDATE SET
|
||
rating_value = excluded.rating_value,
|
||
updated_at = excluded.updated_at
|
||
""",
|
||
(pid, uid, rv, now, now),
|
||
)
|
||
con.commit()
|
||
agg = fetch_rating_aggregates_batch(con, [pid], uid).get(pid, {})
|
||
return apply_rating_fields_to_public_item(
|
||
{"id": pid, "author_user_id": author_uid, "status": status},
|
||
current_user_id=uid,
|
||
agg=agg,
|
||
)
|
||
|
||
|
||
def _now_ts() -> int:
|
||
return int(time.time())
|
||
|
||
|
||
def _sanitize_server_text(text: str) -> str:
|
||
"""Plaintext only — serverseitig ohne Desktop/tkinter-Import."""
|
||
if not text:
|
||
return ""
|
||
s = str(text)
|
||
s = re.sub(r"(?is)<script[^>]*>.*?</script>", "", s)
|
||
s = re.sub(r"(?is)<style[^>]*>.*?</style>", "", s)
|
||
s = re.sub(r"<[^>]+>", "", s)
|
||
s = re.sub(r"(?i)javascript\s*:", "", s)
|
||
s = re.sub(r"(?i)data\s*:", "", s)
|
||
s = s.replace("\x00", "")
|
||
if len(s) > MAX_PROMPT_CHARS:
|
||
s = s[:MAX_PROMPT_CHARS]
|
||
return s.strip()
|
||
|
||
|
||
def _resolve_auth_context(con: sqlite3.Connection, request: Request) -> Tuple[str, str]:
|
||
from aza_sync_items import _resolve_practice_id_for_request
|
||
|
||
practice_id = _resolve_practice_id_for_request(con, request)
|
||
user_id = (request.headers.get("X-User-Id") or request.headers.get("X-Empfang-User-Id") or "").strip()
|
||
if not user_id:
|
||
raise HTTPException(status_code=403, detail="user_id required")
|
||
if practice_id == "default":
|
||
raise HTTPException(status_code=403, detail="invalid practice_id")
|
||
return user_id, practice_id
|
||
|
||
|
||
class PublishDokuPromptIn(BaseModel):
|
||
doc_type: str
|
||
title: str = Field(min_length=1, max_length=200)
|
||
description: str = Field(default="", max_length=1000)
|
||
content: str = Field(min_length=1, max_length=MAX_PROMPT_CHARS)
|
||
author_display: str = Field(default="", max_length=120)
|
||
server_id: Optional[str] = ""
|
||
language: Optional[str] = ""
|
||
specialty: Optional[str] = ""
|
||
city: Optional[str] = ""
|
||
visibility: Optional[str] = "public"
|
||
|
||
|
||
class RateDokuPromptIn(BaseModel):
|
||
rating: float
|
||
|
||
|
||
def register_doku_prompt_routes(app, *, stripe_db_path_fn) -> None:
|
||
"""Registriert /v1/doku-prompts/* — lokal, Deploy separat."""
|
||
|
||
@app.get("/v1/doku-prompts/public", dependencies=[Depends(require_api_token)])
|
||
def doku_prompts_public_list(request: Request):
|
||
db_path = stripe_db_path_fn()
|
||
if not Path(db_path).exists():
|
||
raise HTTPException(status_code=503, detail="database missing")
|
||
with sqlite3.connect(db_path) as con:
|
||
ensure_published_doku_prompts_schema(con)
|
||
user_id, _practice_id = _resolve_auth_context(con, request)
|
||
rows = con.execute(
|
||
"""
|
||
SELECT id, user_id, practice_id, doc_type, title, description, content,
|
||
author_display, revision, updated_at, specialty, city
|
||
FROM published_doku_prompts
|
||
WHERE status = 'published'
|
||
ORDER BY doc_type, updated_at DESC
|
||
"""
|
||
).fetchall()
|
||
pub_ids = [str(r[0]) for r in rows]
|
||
agg_map = fetch_rating_aggregates_batch(con, pub_ids, user_id)
|
||
items = []
|
||
for r in rows:
|
||
raw = normalize_public_doku_item({
|
||
"id": r[0],
|
||
"user_id": r[1],
|
||
"practice_id": r[2],
|
||
"doc_type": r[3],
|
||
"title": r[4],
|
||
"description": r[5] or "",
|
||
"content": r[6],
|
||
"author_display": r[7] or "",
|
||
"revision": int(r[8] or 1),
|
||
"updated_at": int(r[9] or 0),
|
||
"specialty": r[10] or "",
|
||
"city": r[11] or "",
|
||
"visibility": "public",
|
||
"status": "published",
|
||
})
|
||
items.append(
|
||
apply_rating_fields_to_public_item(
|
||
raw,
|
||
current_user_id=user_id,
|
||
agg=agg_map.get(str(r[0])),
|
||
)
|
||
)
|
||
return {"ok": True, "items": items, "count": len(items)}
|
||
|
||
@app.post("/v1/doku-prompts/rate/{pub_id}", dependencies=[Depends(require_api_token)])
|
||
def doku_prompts_rate(pub_id: str, request: Request, body: RateDokuPromptIn):
|
||
db_path = stripe_db_path_fn()
|
||
if not Path(db_path).exists():
|
||
raise HTTPException(status_code=503, detail="database missing")
|
||
rv = normalize_rating_value(body.rating)
|
||
if rv is None:
|
||
raise HTTPException(status_code=400, detail="invalid rating")
|
||
with sqlite3.connect(db_path) as con:
|
||
ensure_published_doku_prompts_schema(con)
|
||
user_id, _practice_id = _resolve_auth_context(con, request)
|
||
try:
|
||
result = upsert_public_prompt_rating(
|
||
con,
|
||
public_prompt_id=pub_id,
|
||
rater_user_id=user_id,
|
||
rating_value=rv,
|
||
)
|
||
except LookupError:
|
||
raise HTTPException(status_code=404, detail="not found") from None
|
||
except PermissionError as exc:
|
||
msg = str(exc)
|
||
if msg == "own template":
|
||
raise HTTPException(status_code=403, detail="cannot rate own template") from None
|
||
raise HTTPException(status_code=403, detail="not published") from None
|
||
return {"ok": True, **result}
|
||
|
||
@app.post("/v1/doku-prompts/publish", dependencies=[Depends(require_api_token)])
|
||
def doku_prompts_publish(request: Request, body: PublishDokuPromptIn):
|
||
db_path = stripe_db_path_fn()
|
||
if not Path(db_path).exists():
|
||
raise HTTPException(status_code=503, detail="database missing")
|
||
content = _sanitize_server_text(body.content)
|
||
if not content:
|
||
raise HTTPException(status_code=400, detail="empty content")
|
||
with sqlite3.connect(db_path) as con:
|
||
ensure_published_doku_prompts_schema(con)
|
||
user_id, practice_id = _resolve_auth_context(con, request)
|
||
now = _now_ts()
|
||
pub_id = (body.server_id or "").strip() or f"pub_{user_id}_{now}"
|
||
existing = con.execute(
|
||
"SELECT user_id, revision, created_at FROM published_doku_prompts WHERE id = ?",
|
||
(pub_id,),
|
||
).fetchone()
|
||
if existing and existing[0] != user_id:
|
||
raise HTTPException(status_code=403, detail="not owner")
|
||
rev = int(existing[1] or 0) + 1 if existing else 1
|
||
created_at = int(existing[2] or now) if existing else now
|
||
con.execute(
|
||
"""
|
||
INSERT INTO published_doku_prompts
|
||
(id, user_id, practice_id, doc_type, title, description, content,
|
||
author_display, revision, status, language, specialty, city,
|
||
created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'published', ?, ?, ?, ?, ?)
|
||
ON CONFLICT(id) DO UPDATE SET
|
||
doc_type=excluded.doc_type, title=excluded.title, description=excluded.description,
|
||
content=excluded.content, author_display=excluded.author_display,
|
||
revision=excluded.revision, status='published',
|
||
language=excluded.language, specialty=excluded.specialty,
|
||
city=excluded.city, updated_at=excluded.updated_at
|
||
""",
|
||
(
|
||
pub_id, user_id, practice_id, body.doc_type.strip(),
|
||
body.title.strip(), body.description.strip(), content,
|
||
body.author_display.strip(), rev,
|
||
(body.language or "").strip(), (body.specialty or "").strip(),
|
||
(body.city or "").strip(), created_at, now,
|
||
),
|
||
)
|
||
con.commit()
|
||
return {"ok": True, "id": pub_id, "revision": rev}
|
||
|
||
@app.post("/v1/doku-prompts/unpublish/{pub_id}", dependencies=[Depends(require_api_token)])
|
||
def doku_prompts_unpublish(pub_id: str, request: Request):
|
||
db_path = stripe_db_path_fn()
|
||
with sqlite3.connect(db_path) as con:
|
||
ensure_published_doku_prompts_schema(con)
|
||
user_id, _ = _resolve_auth_context(con, request)
|
||
row = con.execute(
|
||
"SELECT user_id FROM published_doku_prompts WHERE id = ?", (pub_id,),
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="not found")
|
||
if row[0] != user_id:
|
||
raise HTTPException(status_code=403, detail="not owner")
|
||
con.execute(
|
||
"UPDATE published_doku_prompts SET status='unpublished', updated_at=? WHERE id=?",
|
||
(_now_ts(), pub_id),
|
||
)
|
||
con.commit()
|
||
return {"ok": True, "id": pub_id, "status": "unpublished"}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Client
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _sync_headers(app) -> Optional[Dict[str, str]]:
|
||
try:
|
||
from aza_sync_items import _sync_headers
|
||
h = _sync_headers(app)
|
||
if not h:
|
||
return None
|
||
uid = ""
|
||
if hasattr(app, "_empfang_self_user_id"):
|
||
uid = (app._empfang_self_user_id() or "").strip()
|
||
if uid:
|
||
h["X-User-Id"] = uid
|
||
h["X-Empfang-User-Id"] = uid
|
||
return h
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _http_get(url: str, headers: Dict[str, str]) -> Tuple[int, Any]:
|
||
import requests
|
||
r = requests.get(url, headers=headers, timeout=(3, 15))
|
||
try:
|
||
return r.status_code, r.json()
|
||
except Exception:
|
||
return r.status_code, {}
|
||
|
||
|
||
def _http_post(url: str, headers: Dict[str, str], body: Any) -> Tuple[int, Any]:
|
||
import requests
|
||
r = requests.post(url, headers=headers, json=body, timeout=(3, 20))
|
||
try:
|
||
return r.status_code, r.json()
|
||
except Exception:
|
||
raw = (r.text or "").strip()
|
||
return r.status_code, {"detail": raw} if raw else {}
|
||
|
||
|
||
def local_private_templates_to_sync_items(data: dict) -> List[Dict[str, Any]]:
|
||
"""Serialisiert nicht-system Vorlagen für Sync."""
|
||
items = []
|
||
order = 0
|
||
for doc_type, tpls in (data.get("templates") or {}).items():
|
||
if not isinstance(tpls, list):
|
||
continue
|
||
for t in tpls:
|
||
if not isinstance(t, dict) or t.get("is_system"):
|
||
continue
|
||
order += 1
|
||
payload = {
|
||
"doc_type": doc_type,
|
||
"name": t.get("name") or "",
|
||
"content": t.get("content") or "",
|
||
"description": t.get("description") or "",
|
||
"revision": int(t.get("revision") or 1),
|
||
"updated_at": t.get("updated_at") or "",
|
||
"server_id": t.get("server_id") or "",
|
||
}
|
||
items.append(
|
||
{
|
||
"id": t.get("id") or f"doku_{doc_type}_{order}",
|
||
"item_type": "doku_prompt",
|
||
"title": t.get("name") or doc_type,
|
||
"trigger": doc_type,
|
||
"content": json.dumps(payload, ensure_ascii=False),
|
||
"sort_order": order,
|
||
"is_active": True,
|
||
"created_by": t.get("user_id") or "",
|
||
"updated_by": t.get("user_id") or "",
|
||
}
|
||
)
|
||
return items
|
||
|
||
|
||
def merge_server_doku_items_local(data: dict, server_items: List[Dict[str, Any]]) -> tuple[dict, bool]:
|
||
"""Neueste Revision gewinnt; Konflikte werden markiert."""
|
||
changed = False
|
||
conflicts = data.setdefault("sync_meta", {}).setdefault("conflicts", [])
|
||
for it in server_items:
|
||
if (it.get("item_type") or "") != "doku_prompt":
|
||
continue
|
||
try:
|
||
payload = json.loads(it.get("content") or "{}")
|
||
except Exception:
|
||
continue
|
||
doc_type = payload.get("doc_type") or it.get("trigger") or ""
|
||
if not doc_type:
|
||
continue
|
||
tpl_id = it.get("id") or ""
|
||
if not tpl_id:
|
||
continue
|
||
local_tpls = data.setdefault("templates", {}).setdefault(doc_type, [])
|
||
local = next((x for x in local_tpls if isinstance(x, dict) and x.get("id") == tpl_id), None)
|
||
srv_rev = int(payload.get("revision") or 0)
|
||
loc_rev = int(local.get("revision") or 0) if local else 0
|
||
srv_ts = payload.get("updated_at") or ""
|
||
loc_ts = local.get("updated_at") or "" if local else ""
|
||
if local and loc_rev > srv_rev and loc_ts > srv_ts:
|
||
conflicts.append({"id": tpl_id, "doc_type": doc_type, "at": datetime.now(timezone.utc).isoformat()})
|
||
continue
|
||
entry = {
|
||
"id": tpl_id,
|
||
"name": payload.get("name") or it.get("title") or "Sync-Vorlage",
|
||
"is_system": False,
|
||
"content": payload.get("content") or "",
|
||
"description": payload.get("description") or "",
|
||
"created_at": local.get("created_at") if local else srv_ts,
|
||
"updated_at": srv_ts or _utc_now_iso(),
|
||
"revision": srv_rev or 1,
|
||
"user_id": it.get("created_by") or "",
|
||
"practice_id": "",
|
||
"server_id": payload.get("server_id") or "",
|
||
"published": bool(local.get("published")) if local else False,
|
||
"sync_updated_at": srv_ts,
|
||
"source_author": local.get("source_author", "") if local else "",
|
||
"source_server_id": local.get("source_server_id", "") if local else "",
|
||
}
|
||
if local:
|
||
local.update(entry)
|
||
else:
|
||
local_tpls.append(entry)
|
||
changed = True
|
||
if changed:
|
||
data["sync_meta"]["last_sync_at"] = _utc_now_iso()
|
||
return data, changed
|
||
|
||
|
||
def _utc_now_iso() -> str:
|
||
return datetime.now(timezone.utc).isoformat()
|
||
|
||
|
||
def sync_doku_prompts_with_server(app) -> bool:
|
||
from aza_doku_vorlagen import _load, _save
|
||
from aza_sync_items import _fetch_server_items, _upload_items_bulk
|
||
|
||
headers = _sync_headers(app)
|
||
if not headers:
|
||
return False
|
||
server_items = _fetch_server_items(app, "doku_prompt")
|
||
if server_items is None:
|
||
return False
|
||
data = _load()
|
||
if server_items:
|
||
data, changed = merge_server_doku_items_local(data, server_items)
|
||
if changed:
|
||
_save(data)
|
||
local_items = local_private_templates_to_sync_items(data)
|
||
if local_items:
|
||
return _upload_items_bulk(app, local_items)
|
||
return True
|
||
|
||
|
||
def normalize_public_doku_item(raw: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Einheitliches Client-Format für öffentliche Doku-Prompts."""
|
||
if not isinstance(raw, dict):
|
||
return {}
|
||
sid = str(raw.get("id") or raw.get("server_id") or "").strip()
|
||
author_uid = str(raw.get("author_user_id") or raw.get("user_id") or "").strip()
|
||
city = str(raw.get("city") or raw.get("location") or raw.get("region") or "").strip()
|
||
return {
|
||
"id": sid,
|
||
"server_id": sid,
|
||
"doc_type": str(raw.get("doc_type") or "").strip(),
|
||
"title": str(raw.get("title") or raw.get("name") or "").strip(),
|
||
"description": str(raw.get("description") or "").strip(),
|
||
"content": str(raw.get("content") or raw.get("prompt") or "").strip(),
|
||
"author_display": str(raw.get("author_display") or raw.get("author") or "").strip(),
|
||
"author_user_id": author_uid,
|
||
"specialty": str(raw.get("specialty") or "").strip(),
|
||
"city": city,
|
||
"location": city,
|
||
"practice_id": str(raw.get("practice_id") or "").strip(),
|
||
"visibility": str(raw.get("visibility") or raw.get("scope") or "public").strip(),
|
||
"revision": int(raw.get("revision") or 1),
|
||
"updated_at": int(raw.get("updated_at") or 0),
|
||
"rating_average": raw.get("rating_average"),
|
||
"rating_count": int(raw.get("rating_count") or 0),
|
||
"my_rating": raw.get("my_rating"),
|
||
"can_rate": bool(raw.get("can_rate")),
|
||
}
|
||
|
||
|
||
def normalize_public_doku_items(items: List[Any]) -> List[Dict[str, Any]]:
|
||
out: List[Dict[str, Any]] = []
|
||
for it in items or []:
|
||
norm = normalize_public_doku_item(it if isinstance(it, dict) else {})
|
||
if norm.get("id") or norm.get("title"):
|
||
out.append(norm)
|
||
return out
|
||
|
||
|
||
def fetch_public_doku_prompts_result(app) -> Tuple[List[Dict[str, Any]], Optional[str]]:
|
||
"""Lädt öffentliche Vorlagen; Fehler-Tags: __ROUTE_MISSING__, __CONN_ERROR__."""
|
||
headers = _sync_headers(app)
|
||
if not headers:
|
||
return [], None
|
||
try:
|
||
bu = app.get_backend_url().rstrip("/")
|
||
except Exception:
|
||
return [], "__CONN_ERROR__"
|
||
try:
|
||
code, data = _http_get(f"{bu}/v1/doku-prompts/public", headers)
|
||
except Exception:
|
||
return [], "__CONN_ERROR__"
|
||
if code in (404, 405):
|
||
return [], "__ROUTE_MISSING__"
|
||
if code != 200 or not isinstance(data, dict) or not data.get("ok"):
|
||
detail = ""
|
||
if isinstance(data, dict):
|
||
detail = str(data.get("detail") or data.get("message") or "").strip().lower()
|
||
if detail in ("not found", "notfound"):
|
||
return [], "__ROUTE_MISSING__"
|
||
return [], None
|
||
items = data.get("items")
|
||
if not isinstance(items, list):
|
||
return [], None
|
||
return normalize_public_doku_items(items), None
|
||
|
||
|
||
def fetch_public_doku_prompts(app) -> List[Dict[str, Any]]:
|
||
items, _err = fetch_public_doku_prompts_result(app)
|
||
return items
|
||
|
||
|
||
def _profile_publish_extras(app) -> Dict[str, str]:
|
||
prof = getattr(app, "_user_profile", {}) or {}
|
||
specialty = str(prof.get("specialty") or "").strip()
|
||
city = str(prof.get("city") or prof.get("location") or prof.get("region") or "").strip()
|
||
return {"specialty": specialty, "city": city}
|
||
|
||
|
||
def resolve_author_display(app) -> str:
|
||
"""Verfasser-Anzeigename: bevorzugt Profil-Anzeigename, sonst Signaturname."""
|
||
try:
|
||
prof = getattr(app, "_user_profile", {}) or {}
|
||
for k in ("display_name", "name"):
|
||
v = str(prof.get(k) or "").strip()
|
||
if v:
|
||
return v
|
||
except Exception:
|
||
pass
|
||
try:
|
||
from aza_persistence import load_signature_name
|
||
return (load_signature_name() or "").strip()
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def publish_doku_prompt(
|
||
app, doc_type: str, tpl: dict, title: str, description: str = "",
|
||
) -> Tuple[bool, str]:
|
||
"""Veröffentlicht eine Vorlage. Rückgabe (ok, message).
|
||
|
||
Sentinel-Meldungen für saubere UI-Übersetzung (kein rohes "Not Found"):
|
||
"__ROUTE_MISSING__" → Route nicht deployt (404)
|
||
"__CONN_ERROR__" → Server nicht erreichbar / Timeout
|
||
"""
|
||
headers = _sync_headers(app)
|
||
if not headers:
|
||
return False, "Nicht angemeldet oder keine Praxis-Zuordnung."
|
||
try:
|
||
bu = app.get_backend_url().rstrip("/")
|
||
except Exception:
|
||
return False, "Backend nicht konfiguriert."
|
||
author = resolve_author_display(app)
|
||
extras = _profile_publish_extras(app)
|
||
body = {
|
||
"doc_type": doc_type,
|
||
"title": title,
|
||
"description": description,
|
||
"content": tpl.get("content") or "",
|
||
"author_display": author,
|
||
"server_id": tpl.get("server_id") or tpl.get("id") or "",
|
||
"language": "de",
|
||
"specialty": (tpl.get("specialty") or extras.get("specialty") or "").strip(),
|
||
"city": (tpl.get("city") or tpl.get("location") or extras.get("city") or "").strip(),
|
||
"visibility": "public",
|
||
}
|
||
try:
|
||
code, data = _http_post(f"{bu}/v1/doku-prompts/publish", headers, body)
|
||
except Exception:
|
||
return False, "__CONN_ERROR__"
|
||
if code == 200 and isinstance(data, dict) and data.get("ok"):
|
||
return True, ""
|
||
if code in (404, 405):
|
||
# Route lokal vorhanden, aber auf dem Live-Server (noch) nicht deployt.
|
||
return False, "__ROUTE_MISSING__"
|
||
detail = ""
|
||
if isinstance(data, dict):
|
||
detail = str(data.get("detail") or data.get("message") or "")
|
||
if detail.strip().lower() in ("not found", "notfound"):
|
||
return False, "__ROUTE_MISSING__"
|
||
return False, detail or f"Server-Fehler ({code})"
|
||
|
||
|
||
def unpublish_doku_prompt(app, pub_id: str) -> Tuple[bool, str]:
|
||
"""Entfernt eigene Veröffentlichung vom Server (falls Route live)."""
|
||
headers = _sync_headers(app)
|
||
if not headers:
|
||
return False, "Nicht angemeldet oder keine Praxis-Zuordnung."
|
||
pid = (pub_id or "").strip()
|
||
if not pid:
|
||
return False, "Keine Veröffentlichungs-ID."
|
||
try:
|
||
bu = app.get_backend_url().rstrip("/")
|
||
except Exception:
|
||
return False, "Backend nicht konfiguriert."
|
||
try:
|
||
code, data = _http_post(f"{bu}/v1/doku-prompts/unpublish/{pid}", headers, {})
|
||
except Exception:
|
||
return False, "__CONN_ERROR__"
|
||
if code == 200 and isinstance(data, dict) and data.get("ok"):
|
||
return True, ""
|
||
if code in (404, 405):
|
||
return False, "__ROUTE_MISSING__"
|
||
detail = ""
|
||
if isinstance(data, dict):
|
||
detail = str(data.get("detail") or data.get("message") or "")
|
||
if detail.strip().lower() in ("not found", "notfound"):
|
||
return False, "__ROUTE_MISSING__"
|
||
return False, detail or f"Server-Fehler ({code})"
|
||
|
||
|
||
def rate_public_doku_prompt_result(
|
||
app, pub_id: str, rating: float,
|
||
) -> Tuple[bool, str, Dict[str, Any]]:
|
||
"""Bewertet fremde oeffentliche Vorlage. Rueckgabe (ok, message, payload)."""
|
||
headers = _sync_headers(app)
|
||
if not headers:
|
||
return False, "Nicht angemeldet oder keine Praxis-Zuordnung.", {}
|
||
pid = (pub_id or "").strip()
|
||
if not pid:
|
||
return False, "Keine Veröffentlichungs-ID.", {}
|
||
rv = normalize_rating_value(rating)
|
||
if rv is None:
|
||
return False, "Ungültige Bewertung (0,5–5,0 in 0,5-Schritten).", {}
|
||
try:
|
||
bu = app.get_backend_url().rstrip("/")
|
||
except Exception:
|
||
return False, "Backend nicht konfiguriert.", {}
|
||
try:
|
||
code, data = _http_post(
|
||
f"{bu}/v1/doku-prompts/rate/{pid}",
|
||
headers,
|
||
{"rating": rv},
|
||
)
|
||
except Exception:
|
||
return False, "__CONN_ERROR__", {}
|
||
if code == 200 and isinstance(data, dict) and data.get("ok"):
|
||
payload = {k: data.get(k) for k in (
|
||
"rating_average", "rating_count", "my_rating", "can_rate",
|
||
)}
|
||
return True, "", payload
|
||
if code in (404, 405):
|
||
return False, "__ROUTE_MISSING__", {}
|
||
if code == 403:
|
||
return False, "Diese Vorlage kann nicht bewertet werden.", {}
|
||
if code == 400:
|
||
return False, "Ungültige Bewertung.", {}
|
||
detail = ""
|
||
if isinstance(data, dict):
|
||
detail = str(data.get("detail") or data.get("message") or "")
|
||
if detail.strip().lower() in ("not found", "notfound"):
|
||
return False, "__ROUTE_MISSING__", {}
|
||
return False, detail or f"Server-Fehler ({code})", {}
|