Files

2716 lines
94 KiB
Python
Raw Permalink Normal View History

2026-04-16 13:32:32 +02:00
# -*- coding: utf-8 -*-
"""
AZA Backend-Server mit TLS.
Starten:
python backend_main.py
Oder ohne TLS (nur Entwicklung):
AZA_TLS_REQUIRE=0 python backend_main.py
"""
import os
import json
import sys
import time
import uuid
import tempfile
import logging
import sqlite3
import hashlib
import threading
import re
from difflib import get_close_matches
from pathlib import Path
from collections import defaultdict
from datetime import date, datetime, timedelta
from typing import Any, Optional
from urllib.parse import urlparse
try:
from dotenv import load_dotenv # type: ignore
_root_env_path = Path(__file__).resolve().parents[0] / ".env"
_deploy_env_path = Path(__file__).resolve().parents[0] / "deploy" / ".env"
if _root_env_path.exists():
load_dotenv(dotenv_path=_root_env_path)
if _deploy_env_path.exists():
# deploy/.env als ergänzende Quelle laden, ohne bereits gesetzte Vars zu überschreiben
load_dotenv(dotenv_path=_deploy_env_path, override=False)
except Exception:
pass
try:
from openai_runtime_config import get_openai_api_key, is_openai_configured
except ImportError:
def get_openai_api_key():
_v = os.getenv("OPENAI_API_KEY", "").strip()
return _v if _v else None
def is_openai_configured():
return get_openai_api_key() is not None
from aza_tls import check_tls_or_exit, get_uvicorn_ssl_kwargs, has_tls_config
import json as _json
from fastapi import Body, Depends, FastAPI, Header, HTTPException, Query, UploadFile, File, Form, Request
from fastapi.responses import HTMLResponse, JSONResponse as _OrigJSONResponse, Response
from pydantic import BaseModel, Field
from openai import OpenAI
from aza_rate_limit import default_ip_limiter, default_token_limiter
from aza_security import require_api_token
from aza_license_logic import compute_license_decision
from aza_device_enforcement import enforce_and_touch_device, list_devices_for_email
from aza_news_backend import get_news_items, get_event_items
from services.live_event_search import SearchProviderConfigError
from services.event_llm_direct import query_events_direct
from services.news_llm_search import search_medical_news, NewsCandidate
class JSONResponse(_OrigJSONResponse):
media_type = "application/json; charset=utf-8"
def render(self, content) -> bytes:
return _json.dumps(content, ensure_ascii=False).encode("utf-8")
class ScheduleItemIn(BaseModel):
employee: str = Field(..., min_length=1)
date: date
type: str = Field(..., min_length=1)
note: str = ""
class ScheduleItemUpdate(BaseModel):
old: ScheduleItemIn
new: ScheduleItemIn
class TelemetryPing(BaseModel):
event: str
version: str
platform: str
app: str = "AZA"
crash_type: str | None = None
target_version: str | None = None
ALLOWED_TELEMETRY_EVENTS = {
"app_start",
"update_check",
"download_click",
"crash",
}
class ChatMessage(BaseModel):
role: str = Field(..., pattern=r"^(system|user|assistant)$")
content: str
class ChatRequest(BaseModel):
model: str = Field(default="gpt-5.2")
messages: list[ChatMessage] = Field(..., min_length=1)
temperature: float | None = Field(default=None, ge=0.0, le=2.0)
max_tokens: int | None = Field(default=None, ge=1, le=128000)
top_p: float | None = Field(default=None, ge=0.0, le=1.0)
ALLOWED_CHAT_MODELS = {
"gpt-5.2", "gpt-5-mini", "gpt-5-nano",
"gpt-4o", "gpt-4o-mini",
"gpt-4o-mini-search-preview",
}
MAX_CHAT_MESSAGES = 64
MAX_CHAT_CONTENT_CHARS = 100_000
ALLOWED_CRASH_TYPES = {
"startup_error",
"update_error",
"ui_error",
"network_error",
"unknown",
}
TELEMETRY_RATE_LIMIT = 60
TELEMETRY_RATE_WINDOW_SECONDS = 60
_telemetry_hits: dict[str, list[float]] = defaultdict(list)
_telemetry_event_counts: dict[str, int] = defaultdict(int)
_server_start_time = datetime.utcnow()
class NewsItemOut(BaseModel):
id: str
source: str
title: str
url: str
publishedAt: str
tags: list[str]
summaryOriginal: str
summaryTranslated: Optional[str] = None
languageOriginal: str
isOpenAccess: bool
evidenceType: str
regions: list[str]
class EventItemOut(BaseModel):
id: str
name: str
startDate: str
endDate: str
city: str
country: str
regions: list[str]
tags: list[str]
description: str
type: str
cmeFlag: bool = False
organizer: str
source: str
url: str
icsUrl: Optional[str] = None
class LiveVerificationOut(BaseModel):
httpStatus: int = 0
finalUrl: str = ""
redirectCount: int = 0
isDirectEventPage: bool = False
checkedAt: str = ""
class LiveEventItemOut(BaseModel):
id: str
name: str
startDate: str
endDate: str
city: str
country: str
regionTags: list[str]
specialtyTags: list[str]
url: str
description: str
organizer: Optional[str] = None
source: str = "live_search"
confidence: float
verification: LiveVerificationOut
TRANSCRIBE_MODEL = os.getenv("TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe")
LANGUAGE = os.getenv("LANGUAGE", "de")
AZA_TRANSCRIBE_DEBUG = str(os.getenv("AZA_TRANSCRIBE_DEBUG", "0")).strip().lower() in {"1", "true", "yes", "on"}
AZA_ENABLE_WHISPER_FALLBACK = str(os.getenv("AZA_ENABLE_WHISPER_FALLBACK", "0")).strip().lower() in {"1", "true", "yes", "on"}
PORT = int(os.getenv("PORT", "8000"))
def _resolve_api_token() -> str:
"""ENV first, then backend_token.txt (frozen-app aware)."""
t = os.getenv("MEDWORK_API_TOKEN", "").strip()
if t:
return t
_bases = [Path(__file__).resolve().parent]
if getattr(sys, "frozen", False):
_bases.insert(0, Path(sys.executable).resolve().parent)
for _base in _bases:
_tp = _base / "backend_token.txt"
try:
if _tp.is_file():
with open(_tp, "r", encoding="utf-8-sig") as _f:
t = (_f.read() or "").replace("\ufeff", "").strip()
if t:
return t
except Exception:
pass
return ""
API_TOKEN = _resolve_api_token()
_APP_VERSION = "0.1.0"
_START_TIME = time.time()
AUDIT_LOG_FILE = os.getenv("MEDWORK_AUDIT_LOG", "medwork_audit.log").strip()
def _audit_write(request_id: str, user: str, method: str, path: str,
action: str, success: bool, detail: str = ""):
try:
ts = datetime.utcnow().isoformat(timespec="milliseconds") + "Z"
status = "ok" if success else "fail"
line = f"{ts} | {request_id} | {user} | {method} {path} | {action} | {status} | {detail}\n"
with open(AUDIT_LOG_FILE, "a", encoding="utf-8") as f:
f.write(line)
except Exception:
pass
WHISPER_MEDICAL_PROMPT = (
"Medizinisches Diktat. "
"Transkribiere ausschliesslich den gesprochenen Inhalt wortgetreu. "
"Wenn Dialekt gesprochen wird (z.B. Schweizerdeutsch), transkribiere sinngemäss in standardsprachlicher medizinischer Schreibweise. "
"Ändere keine Diagnosen und korrigiere sie nicht eigenständig. "
"Antworte niemals auf Fragen, gib keine Erklärungen, keine Zusammenfassung und keine Interpretation. "
"Medizinische Begriffe, Medikamentennamen, Diagnosen, Zahlen und Dosierungen müssen möglichst exakt übernommen werden. "
"Wichtige Medikamenten-Markennamen: "
"Humira, Stelara, Cosentyx, Taltz, Skyrizi, Tremfya, Dupixent, "
"Roaccutan, Daivobet, Elidel, "
"Xarelto, Eliquis, Pradaxa, "
"Ozempic, Victoza, Trulicity, "
"Lantus, NovoRapid, Humalog. "
"Wichtige Wirkstoffe: "
"Adalimumab, Ustekinumab, Secukinumab, Ixekizumab, Risankizumab, "
"Dupilumab, Methotrexat, Ciclosporin, Acitretin, "
"Isotretinoin, Prednisolon, Metformin, "
"Ramipril, Lisinopril, Amlodipin, "
"Apixaban, Rivaroxaban, Dabigatran, "
"Atorvastatin, Levothyroxin. "
"Häufige medizinische Diagnosen: "
"Basaliom, Basalzellkarzinom, Spinaliom, Plattenepithelkarzinom, Melanom, "
"Aktinische Keratose, Psoriasis, Atopische Dermatitis, Ekzem, Urtikaria, "
"Akne vulgaris, Rosazea, Seborrhoische Keratose, Näevus, Muttermal, "
"Herpes simplex, Herpes zoster, Onychomykose, Tinea corporis, Tinea pedis, "
"Diabetes mellitus, Arterielle Hypertonie, Vorhofflimmern, Herzinsuffizienz, "
"Chronische Niereninsuffizienz, COPD, Asthma bronchiale, "
"Depression, Angststörung, Migräne, Lumbago, Bandscheibenhernie."
)
WHISPER_PROMPT_PREFIX = "Medizinisches Diktat"
GPT_TRANSCRIBE_SHORT_PROMPT = (
"Transkribiere ausschliesslich den gesprochenen Inhalt wortgetreu. "
"Keine Antworten, keine Erklärungen, keine Zusammenfassung."
)
WHISPER_GENERAL_PROMPT = (
"Transkribiere ausschliesslich den gesprochenen Inhalt woertlich auf Deutsch. "
"Antworte niemals auf Fragen, gib keine Erklaerungen, keine Zusammenfassung, keine Interpretation. "
"Allgemeines Diktat auf Deutsch mit sinnvoller Zeichensetzung."
)
ALLOWED_EXT = {".wav", ".mp3", ".m4a", ".ogg", ".webm"}
ALLOWED_TRANSCRIBE_CONTENT_TYPES = {
"audio/mp4", # m4a / AAC
"audio/wav", # WAV (Aufnahmeformat der Desktop-App)
}
COMMON_MEDICAL_CORRECTIONS = {
"metformine": "Metformin",
"metphormin": "Metformin",
"elquis": "Eliquis",
"xareltto": "Xarelto",
"ramiprill": "Ramipril",
"lisinoprill": "Lisinopril",
}
SPECIALTY_MEDICAL_CORRECTIONS = {
"dermatologie_und_venerologie": {
"basaliom": "Basaliom",
"basalzellkarzinom": "Basalzellkarzinom",
"spinaliom": "Spinaliom",
"plattenepithelkarzinom": "Plattenepithelkarzinom",
"humiera": "Humira",
"cosentix": "Cosentyx",
"stelarra": "Stelara",
"skirizi": "Skyrizi",
"tremfia": "Tremfya",
"dupixan": "Dupixent",
"elidell": "Elidel",
"methotrexat": "Methotrexat",
"ciclosporine": "Ciclosporin",
"acitretine": "Acitretin",
"isotretionin": "Isotretinoin",
},
"kardiologie": {
"eliqis": "Eliquis",
"xareltoh": "Xarelto",
"amiodaronn": "Amiodaron",
},
"allgemeine_innere_medizin": {
"pantoprazoll": "Pantoprazol",
"levotyroxin": "Levothyroxin",
},
}
SPECIALTY_MEDICATION_LEXICON = {
"allgemeine_innere_medizin": [],
"allergologie_und_klinische_immunologie": [],
"anaesthesiologie": [],
"angiologie": [],
"arbeitsmedizin": [],
"chirurgie": [],
"dermatologie_und_venerologie": [
"Humira",
"Stelara",
"Cosentyx",
"Taltz",
"Skyrizi",
"Tremfya",
"Ilumetri",
"Kyntheum",
"Dupixent",
"Adtralza",
"Cibinqo",
"Olumiant",
"Rinvoq",
"Roaccutan",
"Curakne",
"Daivobet",
"Daivonex",
"Elidel",
"Protopic",
"Advantan",
"Elocom",
"Dermovate",
"Methotrexat",
"Ciclosporin",
"Acitretin",
"Isotretinoin",
"Tacrolimus",
"Pimecrolimus",
"Adalimumab",
"Ustekinumab",
"Secukinumab",
"Ixekizumab",
"Risankizumab",
"Guselkumab",
"Tildrakizumab",
"Brodalumab",
"Dupilumab",
"Tralokinumab",
"Abrocitinib",
"Baricitinib",
"Upadacitinib",
"Calcipotriol",
"Betamethason",
"Mometason",
"Clobetasol",
],
"endokrinologie_und_diabetologie": [],
"gastroenterologie": [],
"gefaesschirurgie": [],
"genetik_medizinische": [],
"gynaekologie_und_geburtshilfe": [],
"haematologie": [],
"handchirurgie": [],
"herz_und_thorakale_gefaesschirurgie": [],
"infektiologie": [],
"intensivmedizin": [],
"kardiologie": [],
"kiefer_und_gesichtschirurgie": [],
"kinder_und_jugendmedizin": [],
"kinder_und_jugendpsychiatrie_und_psychotherapie": [],
"kinderchirurgie": [],
"klinische_pharmakologie_und_toxikologie": [],
"medizinische_onkologie": [],
"nephrologie": [],
"neurochirurgie": [],
"neurologie": [],
"nuklearmedizin": [],
"ophthalmologie": [],
"orthopaedische_chirurgie_und_traumatologie_des_bewegungsapparates": [],
"oto_rhino_laryngologie": [],
"pathologie": [],
"pharmazeutische_medizin": [],
"physikalische_medizin_und_rehabilitation": [],
"plastische_rekonstruktive_und_aesthetische_chirurgie": [],
"pneumologie": [],
"praevention_und_public_health": [],
"psychiatrie_und_psychotherapie": [],
"radiologie": [],
"radio_onkologie_strahlentherapie": [],
"rechtsmedizin": [],
"rheumatologie": [],
"thoraxchirurgie": [],
"urologie": [],
}
SPECIALTY_KEY_ALIASES = {
"dermatology": "dermatologie_und_venerologie",
"cardiology": "kardiologie",
"general_medicine": "allgemeine_innere_medizin",
}
COMMON_MEDICATION_LEXICON = [
"Metformin",
"Pantoprazol",
"Levothyroxin",
"Ramipril",
"Amlodipin",
"Bisoprolol",
"Torasemid",
"Xarelto",
"Eliquis",
"Pradaxa",
]
def normalize_specialty_key(specialty: str) -> str:
key = (specialty or "").strip().lower()
return SPECIALTY_KEY_ALIASES.get(key, key)
def get_active_medication_lexicon(specialty: str) -> list[str]:
specialty_key = normalize_specialty_key(specialty)
specialty_items = SPECIALTY_MEDICATION_LEXICON.get(specialty_key, [])
combined: list[str] = []
seen: set[str] = set()
for item in COMMON_MEDICATION_LEXICON + specialty_items:
name = (item or "").strip()
if not name:
continue
low = name.lower()
if low in seen:
continue
seen.add(low)
combined.append(name)
return combined
def apply_medical_corrections(text: str, specialty: str = "") -> str:
if not text:
return text
specialty = normalize_specialty_key(specialty)
corrections = dict(COMMON_MEDICAL_CORRECTIONS)
corrections.update(SPECIALTY_MEDICAL_CORRECTIONS.get((specialty or "").strip().lower(), {}))
for wrong, correct in corrections.items():
pattern = re.compile(rf"\b{re.escape(wrong)}\b", flags=re.IGNORECASE)
text = pattern.sub(correct, text)
return text
MEDICAL_POST_CORRECTIONS = {
"basalzell ca": "Basalzellkarzinom",
"basalzellkarzinom": "Basalzellkarzinom",
"plattenepithel ca": "Plattenepithelkarzinom",
"plattenepithelkarzinom": "Plattenepithelkarzinom",
"spinaliom": "Spinaliom",
"basaliom": "Basaliom",
"humiera": "Humira",
"eliquis": "Eliquis",
"xarelto": "Xarelto",
"metformine": "Metformin",
"metphormin": "Metformin",
}
def apply_medical_post_corrections(text: str) -> str:
if not text:
return text
corrected = text
for wrong, right in MEDICAL_POST_CORRECTIONS.items():
pattern = re.compile(rf"\b{re.escape(wrong)}\b", flags=re.IGNORECASE)
corrected = pattern.sub(right, corrected)
return corrected
_ENABLE_MED_FUZZY = str(os.getenv("AZA_ENABLE_MED_FUZZY", "0")).strip().lower() in {"1", "true", "yes", "on"}
MEDICATION_LEXICON: dict[str, str] = {
"metformin": "Metformin",
"eliquis": "Eliquis",
"xarelto": "Xarelto",
"ramipril": "Ramipril",
"lisinopril": "Lisinopril",
"amlodipin": "Amlodipin",
"pantoprazol": "Pantoprazol",
"levothyroxin": "Levothyroxin",
"atorvastatin": "Atorvastatin",
}
def apply_medication_fuzzy_corrections(text: str) -> str:
"""
Conservative fuzzy correction for medication names.
Only active if AZA_ENABLE_MED_FUZZY=1.
"""
if not text or not _ENABLE_MED_FUZZY:
return text
words = re.findall(r"\b[0-9A-Za-zÄÖÜäöüß]+\b", text)
if not words:
return text
keys = list(MEDICATION_LEXICON.keys())
out = text
replacements = 0
for w in words:
if replacements >= 25:
break
wl = w.lower()
if len(wl) < 6:
continue
if wl in MEDICATION_LEXICON:
continue
m = get_close_matches(wl, keys, n=1, cutoff=0.84)
if not m:
continue
canonical = MEDICATION_LEXICON.get(m[0])
if not canonical:
continue
pattern = re.compile(rf"\b{re.escape(w)}\b", flags=re.IGNORECASE)
new_out, n = pattern.subn(canonical, out, count=1)
if n:
out = new_out
replacements += 1
return out
def safe_upload_filename(original_name: str) -> str:
"""
Generate a safe filename for uploaded audio.
Removes any path components and replaces the name with a UUID,
keeping only the extension.
"""
name = os.path.basename(original_name or "")
ext = Path(name).suffix.lower()
if ext not in {".m4a", ".wav"}:
raise HTTPException(status_code=415, detail="unsupported audio file extension")
return f"{uuid.uuid4().hex}{ext}"
_client: Optional[OpenAI] = None
class _UvicornAccessHealthFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
# Suppress only access log lines for GET /health; keep everything else.
try:
msg = record.getMessage()
except Exception:
return True
return '"GET /health ' not in msg
def _get_openai() -> OpenAI:
global _client
if _client is None:
api_key = get_openai_api_key()
if not api_key:
raise HTTPException(
status_code=503,
detail=(
"OpenAI API key is not configured on this machine. "
"Please set OPENAI_API_KEY in the AZA runtime config."
),
)
_client = OpenAI(api_key=api_key)
return _client
def _runtime_base_dir() -> Path:
if getattr(sys, "frozen", False):
return Path(getattr(sys, "_MEIPASS", Path(sys.executable).resolve().parent))
return Path(__file__).resolve().parent
def _stripe_db_path() -> Path:
if getattr(sys, "frozen", False):
import shutil
try:
from aza_config import get_writable_data_dir
writable = Path(get_writable_data_dir()) / "data"
except Exception:
writable = Path(os.environ.get("APPDATA", "")) / "AZA Desktop" / "data"
writable.mkdir(parents=True, exist_ok=True)
dest = writable / "stripe_webhook.sqlite"
if not dest.exists():
bundled = _runtime_base_dir() / "data" / "stripe_webhook.sqlite"
if bundled.exists():
shutil.copy2(str(bundled), str(dest))
return dest
try:
from stripe_routes import DB_PATH as STRIPE_DB_PATH # type: ignore
return Path(STRIPE_DB_PATH)
except Exception:
return _runtime_base_dir() / "data" / "stripe_webhook.sqlite"
def _has_any_active_license() -> bool:
db_path = _stripe_db_path()
if not db_path.exists():
return False
try:
# Ensure Stripe DB schema exists before querying
try:
import stripe_routes # type: ignore
if hasattr(stripe_routes, "_ensure_storage"):
stripe_routes._ensure_storage() # type: ignore
except Exception as e:
print(f"[LICENSE] ensure_storage failed: {e}")
with sqlite3.connect(db_path) as con:
row = con.execute(
"""
SELECT 1
FROM licenses
WHERE status = 'active'
ORDER BY updated_at DESC
LIMIT 1
""",
).fetchone()
return row is not None
except Exception as e:
print(f"[LICENSE] has_any_active_license failed: {e}")
return False
def _active_license_count() -> int:
db_path = _stripe_db_path()
if not db_path.exists():
return 0
try:
# Ensure Stripe DB schema exists before querying
try:
import stripe_routes # type: ignore
if hasattr(stripe_routes, "_ensure_storage"):
stripe_routes._ensure_storage() # type: ignore
except Exception as e:
print(f"[LICENSE] ensure_storage failed: {e}")
with sqlite3.connect(db_path) as con:
row = con.execute("SELECT COUNT(*) FROM licenses WHERE status='active'").fetchone()
return int(row[0]) if row else 0
except Exception as e:
print(f"[LICENSE] active_license_count failed: {e}")
return 0
app = FastAPI(
title="AZA Transkriptions-Backend",
version="0.1.0",
default_response_class=JSONResponse,
)
_CORS_ORIGINS = [
o.strip() for o in os.environ.get("AZA_CORS_ORIGINS", "").split(",") if o.strip()
] or [
"https://aza-medwork.ch",
"https://www.aza-medwork.ch",
"http://127.0.0.1:8000",
"http://localhost:8000",
]
try:
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=_CORS_ORIGINS,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["Content-Type", "X-API-Token", "X-Device-Id", "X-Device-Name", "X-App-Version"],
allow_credentials=False,
)
except Exception:
pass
_app_root = Path(__file__).resolve().parent
_web_dir = _app_root / "web"
_release_dir = _app_root / "release"
try:
from fastapi.staticfiles import StaticFiles
if _web_dir.is_dir():
app.mount("/web", StaticFiles(directory=str(_web_dir), html=True), name="web_static")
if _release_dir.is_dir():
app.mount("/release", StaticFiles(directory=str(_release_dir)), name="release_static")
except Exception:
pass
MAX_REQUEST_BODY_BYTES = 1 * 1024 * 1024 # 1 MB
MAX_TRANSCRIBE_BODY_BYTES = 500 * 1024 * 1024 # 500 MB
@app.middleware("http")
async def request_size_limit_middleware(request: Request, call_next):
body = await request.body()
max_bytes = MAX_REQUEST_BODY_BYTES
if request.url.path.startswith("/v1/transcribe"):
max_bytes = MAX_TRANSCRIBE_BODY_BYTES
if len(body) > max_bytes:
raise HTTPException(status_code=413, detail="request body too large")
response = await call_next(request)
return response
try:
status_file = Path(__file__).resolve().parent / "project_status.json"
if status_file.is_file():
with open(status_file, "r", encoding="utf-8") as f:
s = json.load(f)
print("\n=== AZA PROJECT STATUS ===")
print("Phase:", s.get("phase"))
print("Step:", s.get("current_step"))
print("Next:", s.get("next_step"))
print("Notes:", s.get("last_update"))
print("==========================\n")
except Exception:
pass
# Stripe routes
try:
from stripe_routes import router as stripe_router
app.include_router(stripe_router, prefix="/stripe")
except Exception:
# Stripe is optional until env + deps are in place
pass
# Admin monitor routes
try:
from admin_routes import router as admin_router
app.include_router(admin_router, prefix="/admin")
except Exception as _admin_err:
print(f"[ADMIN] admin_routes not loaded: {_admin_err}")
# WooCommerce license bridge
try:
from wc_routes import router as wc_router
app.include_router(wc_router, prefix="/wc")
except Exception as _wc_err:
print(f"[WC] wc_routes not loaded: {_wc_err}")
# Project status route
try:
from project_status_routes import router as project_status_router
app.include_router(project_status_router)
except Exception:
pass
@app.on_event("startup")
def _print_routes():
if not API_TOKEN:
raise RuntimeError("FEHLER: ENV MEDWORK_API_TOKEN ist nicht gesetzt. Server wird nicht gestartet.")
print("=== Aktive Routes im Server-Prozess ===")
for route in app.routes:
methods = getattr(route, "methods", None)
path = getattr(route, "path", "?")
if methods:
print(f" {', '.join(sorted(methods)):8s} {path}")
print("========================================")
class TranscribeResponse(BaseModel):
success: bool
transcript: str
error: str
request_id: str
duration_ms: int
model: str
debug: dict | None = None
model_used: str | None = None
def _read_expected_token() -> str:
_bases = [Path(__file__).resolve().parent]
if getattr(sys, "frozen", False):
_bases.insert(0, Path(sys.executable).resolve().parent)
for _base in _bases:
_tp = _base / "backend_token.txt"
try:
if _tp.is_file():
with open(_tp, "r", encoding="utf-8-sig") as f:
t = (f.read() or "").replace("\ufeff", "").strip(" \t\r\n")
if t:
return t
except Exception:
pass
return (os.environ.get("MEDWORK_API_TOKEN", "") or "").strip()
def _extract_request_token(request: Request) -> str:
token = (request.headers.get("X-API-Token", "") or "").strip()
if token:
return token
auth = (request.headers.get("Authorization", "") or "").strip()
if auth.startswith("Bearer "):
return auth[len("Bearer "):].strip()
return ""
def _require_token(request: Request) -> None:
expected = _read_expected_token()
got = _extract_request_token(request)
if not expected or got != expected:
raise HTTPException(status_code=401, detail="Unauthorized")
def _check_token(request: Request) -> bool:
try:
_require_token(request)
return True
except HTTPException:
return False
def _get_user(request: Request) -> Optional[str]:
"""Return X-User header value or None if missing/empty."""
user = request.headers.get("X-User", "").strip()
return user if user else None
_NO_USER_RESPONSE = {"success": False, "error": "X-User header required"}
def _split_csv_values(value: Optional[str], default: list[str]) -> list[str]:
if value is None:
return list(default)
out = [part.strip() for part in str(value).split(",") if part.strip()]
return out or list(default)
LIVE_EVENTS_CACHE_TTL_SECONDS = int(os.getenv("LIVE_EVENTS_CACHE_TTL_SECONDS", "43200")) # 12h
LIVE_EVENTS_TOTAL_TIMEOUT_SECONDS = int(os.getenv("LIVE_EVENTS_TOTAL_TIMEOUT_SECONDS", "15"))
LIVE_EVENTS_MAX_QUERIES = int(os.getenv("LIVE_EVENTS_MAX_QUERIES", "8"))
LIVE_EVENTS_RESULTS_PER_QUERY = int(os.getenv("LIVE_EVENTS_RESULTS_PER_QUERY", "6"))
_live_events_cache_lock = threading.Lock()
_live_events_cache: dict[str, dict] = {}
def _norm_region_tokens(regions: list[str]) -> list[str]:
out = [str(r).strip().upper() for r in (regions or []) if str(r).strip()]
return out or ["CH", "EU"]
def _region_match(region_tags: list[str], selected_regions: list[str]) -> bool:
selected = {r.upper() for r in selected_regions}
tags = {r.upper() for r in (region_tags or [])}
if not selected:
return True
if "WORLD" in selected:
return True
if "EU" in selected and ({"EU", "CH"} & tags):
return True
if "CH" in selected and "CH" in tags:
return True
return bool(tags.intersection(selected))
def _live_cache_key(specialty: str, regions: list[str], from_date: date, to_date: date, lang: str, limit: int) -> str:
raw = "|".join(
[
specialty.strip().lower(),
",".join(sorted(_norm_region_tokens(regions))),
from_date.isoformat(),
to_date.isoformat(),
str(lang or "de").strip().lower(),
str(limit),
]
)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _event_id(name: str, start_date: str, city: str, url: str) -> str:
raw = "|".join([name.strip().lower(), start_date.strip(), city.strip().lower(), url.strip().lower()])
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:20]
def _dedupe_live_events(rows: list[dict]) -> list[dict]:
seen_a: set[str] = set()
seen_b: set[str] = set()
out: list[dict] = []
for row in rows:
try:
key_a = "|".join(
[
str(row.get("name", "")).strip().lower(),
str(row.get("startDate", "")).strip(),
str(row.get("city", "")).strip().lower(),
]
)
u = str(row.get("url", "")).strip().lower()
pu = urlparse(u)
key_b = f"{pu.netloc}{pu.path}".rstrip("/")
except Exception:
continue
if not key_a or key_a in seen_a:
continue
if key_b and key_b in seen_b:
continue
seen_a.add(key_a)
if key_b:
seen_b.add(key_b)
out.append(row)
return out
def _collect_live_events(
specialty_values: list[str],
region_values: list[str],
from_date: date,
to_date: date,
limit: int,
lang: str,
force_refresh: bool,
) -> list[dict]:
specialty = specialty_values[0] if specialty_values else "dermatology"
regions = _norm_region_tokens(region_values)
cache_key = _live_cache_key(specialty, regions, from_date, to_date, lang=lang, limit=limit)
now_ts = time.time()
if not force_refresh:
with _live_events_cache_lock:
cached = _live_events_cache.get(cache_key)
if cached and float(cached.get("expires_at", 0.0)) > now_ts:
return list(cached.get("items", []))
candidates = query_events_direct(
specialty=specialty,
regions=regions,
from_date=from_date,
to_date=to_date,
lang=lang,
limit=max(limit, 40),
)
rows_out: list[dict] = []
for cand in candidates:
if not cand.startDate:
continue
try:
d_start = date.fromisoformat(cand.startDate)
except Exception:
continue
end_date_str = cand.endDate or cand.startDate
try:
d_end = date.fromisoformat(end_date_str)
except Exception:
d_end = d_start
if d_end < from_date or d_start > to_date:
continue
url_to_use = (cand.urlCandidate or "").strip()
row = {
"id": _event_id(cand.name, cand.startDate, cand.city, url_to_use),
"name": cand.name,
"startDate": cand.startDate,
"endDate": end_date_str,
"city": cand.city,
"country": cand.country,
"regionTags": cand.regionTags or regions,
"specialtyTags": cand.specialtyTags or specialty_values,
"url": url_to_use,
"description": cand.shortDescription or "",
"organizer": cand.organizer or "",
"source": "live_search",
"confidence": round(max(0.0, min(1.0, float(cand.confidence))), 3),
"verification": {},
}
rows_out.append(row)
rows_out = _dedupe_live_events(rows_out)
rows_out.sort(key=lambda r: str(r.get("startDate", "")))
rows_out = rows_out[: max(1, min(limit, 120))]
with _live_events_cache_lock:
_live_events_cache[cache_key] = {
"items": list(rows_out),
"expires_at": now_ts + max(3600, LIVE_EVENTS_CACHE_TTL_SECONDS),
}
return rows_out
SCHEDULE_DATA = [
{"employee": "Dr. Müller", "date": "2026-02-23", "type": "work", "note": ""},
{"employee": "Dr. Müller", "date": "2026-02-24", "type": "work", "note": ""},
{"employee": "Dr. Müller", "date": "2026-02-25", "type": "vacation", "note": "Skiferien"},
{"employee": "Dr. Müller", "date": "2026-02-26", "type": "vacation", "note": "Skiferien"},
{"employee": "Dr. Müller", "date": "2026-02-27", "type": "vacation", "note": "Skiferien"},
{"employee": "Anna Meier", "date": "2026-02-23", "type": "work", "note": ""},
{"employee": "Anna Meier", "date": "2026-02-24", "type": "sick", "note": "Grippe"},
{"employee": "Anna Meier", "date": "2026-02-25", "type": "work", "note": ""},
{"employee": "Lisa Brunner", "date": "2026-02-23", "type": "work", "note": ""},
{"employee": "Lisa Brunner", "date": "2026-02-24", "type": "work", "note": ""},
{"employee": "Lisa Brunner", "date": "2026-02-25", "type": "work", "note": "Bürodienst"},
]
@app.get("/health")
def health():
return JSONResponse(content={
"status": "ok",
"version": _APP_VERSION,
"uptime_s": int(time.time() - _START_TIME),
"tls": has_tls_config(),
})
@app.get("/api/news", response_model=list[NewsItemOut])
def api_news(
request: Request,
specialties: Optional[str] = Query("dermatology"),
lang: str = Query("de"),
regions: Optional[str] = Query("CH,EU"),
region: Optional[str] = Query(None),
limit: int = Query(30, ge=1, le=100),
):
if not _check_token(request):
raise HTTPException(status_code=401, detail="Unauthorized")
try:
specialty_values = _split_csv_values(specialties, ["dermatology"])
region_values = _split_csv_values(regions if regions is not None else region, ["CH", "EU"])
return get_news_items(
specialties=specialty_values,
lang=lang,
region=",".join(region_values),
limit=limit,
)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"News-Quelle nicht erreichbar: {exc}")
class LiveNewsItemOut(BaseModel):
id: str
title: str
url: str
source: str
publishedAt: str
summary: str
tags: list[str]
language: str
_live_news_cache: dict[str, Any] = {}
_live_news_cache_lock = threading.Lock()
_LIVE_NEWS_CACHE_TTL = int(os.getenv("LIVE_NEWS_CACHE_TTL_SECONDS", "3600"))
@app.get("/api/news/live", response_model=list[LiveNewsItemOut])
def api_news_live(
request: Request,
specialties: Optional[str] = Query("dermatology"),
limit: int = Query(30, ge=1, le=60),
refresh: int = Query(0, ge=0, le=1),
):
if not _check_token(request):
raise HTTPException(status_code=401, detail="Unauthorized")
ip = request.client.host if request.client else "unknown"
tok = request.headers.get("X-API-Token", "none")
default_ip_limiter.consume(f"ip:{ip}", cost=1.0)
default_token_limiter.consume(f"tok:{tok}", cost=1.0)
try:
llm_key = get_openai_api_key() or os.getenv("GEMINI_API_KEY", "").strip()
if not llm_key:
raise SearchProviderConfigError(
"OpenAI API key is not configured. Please set OPENAI_API_KEY in the AZA runtime config."
)
specialty_values = _split_csv_values(specialties, ["dermatology"])
cache_key = f"news_live:{','.join(sorted(specialty_values))}:{limit}"
now_ts = time.time()
if not refresh:
with _live_news_cache_lock:
cached = _live_news_cache.get(cache_key)
if cached and float(cached.get("expires_at", 0.0)) > now_ts:
return list(cached.get("items", []))
candidates = search_medical_news(specialties=specialty_values, limit=limit)
rows: list[dict] = []
for i, c in enumerate(candidates):
h = hashlib.sha1(f"{c.title}:{c.url}:{c.publishedAt}".encode()).hexdigest()[:20]
rows.append({
"id": h,
"title": c.title,
"url": c.url,
"source": c.source,
"publishedAt": c.publishedAt,
"summary": c.summary,
"tags": c.tags,
"language": c.language,
})
rows.sort(key=lambda r: str(r.get("publishedAt", "")), reverse=True)
with _live_news_cache_lock:
_live_news_cache[cache_key] = {
"items": list(rows),
"expires_at": now_ts + max(1800, _LIVE_NEWS_CACHE_TTL),
}
return rows
except SearchProviderConfigError as exc:
raise HTTPException(status_code=503, detail=str(exc))
except Exception as exc:
raise HTTPException(status_code=502, detail=f"Live-News konnten nicht geladen werden: {exc}")
@app.get("/api/events", response_model=list[EventItemOut])
def api_events(
request: Request,
specialties: Optional[str] = Query("dermatology"),
regions: Optional[str] = Query("CH,EU"),
from_date: Optional[date] = Query(None, alias="from"),
to_date: Optional[date] = Query(None, alias="to"),
limit: int = Query(100, ge=1, le=300),
):
if not _check_token(request):
raise HTTPException(status_code=401, detail="Unauthorized")
try:
specialty_values = _split_csv_values(specialties, ["dermatology"])
region_values = _split_csv_values(regions, ["CH", "EU"])
return get_event_items(
specialties=specialty_values,
regions=region_values,
from_date=from_date,
to_date=to_date,
limit=limit,
)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"Events-Quelle nicht erreichbar: {exc}")
@app.get("/api/events/live", response_model=list[LiveEventItemOut])
def api_events_live(
request: Request,
specialty: Optional[str] = Query("dermatology"),
specialties: Optional[str] = Query(None),
regions: Optional[str] = Query("CH,EU"),
from_date: Optional[date] = Query(None, alias="from"),
to_date: Optional[date] = Query(None, alias="to"),
limit: int = Query(80, ge=1, le=120),
lang: str = Query("de"),
refresh: int = Query(0, ge=0, le=1),
):
if not _check_token(request):
raise HTTPException(status_code=401, detail="Unauthorized")
ip = request.client.host if request.client else "unknown"
tok = request.headers.get("X-API-Token", "none")
default_ip_limiter.consume(f"ip:{ip}", cost=1.0)
default_token_limiter.consume(f"tok:{tok}", cost=1.0)
try:
llm_key = get_openai_api_key() or os.getenv("GEMINI_API_KEY", "").strip()
if not llm_key:
raise SearchProviderConfigError(
"OpenAI API key is not configured. Please set OPENAI_API_KEY in the AZA runtime config."
)
specialty_values = _split_csv_values(specialties if specialties is not None else specialty, ["dermatology"])
region_values = _split_csv_values(regions, ["CH", "EU"])
f_date = from_date or date.today()
t_date = to_date or (date.today() + timedelta(days=396))
rows = _collect_live_events(
specialty_values=specialty_values,
region_values=region_values,
from_date=f_date,
to_date=t_date,
limit=limit,
lang=lang,
force_refresh=bool(refresh),
)
return rows
except SearchProviderConfigError as exc:
raise HTTPException(
status_code=503,
detail=str(exc),
)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"Live-Events konnten nicht geladen werden: {exc}")
# --- DuckDuckGo HTML fallback (keyless, robust) ──────────────────────────────
import html as _html_mod
import re as _re_mod
_ddg_cache: dict[str, dict] = {}
_DDG_CACHE_TTL = 300
_DDG_MONTH = {
"jan": 1, "january": 1, "feb": 2, "february": 2, "mar": 3, "march": 3,
"apr": 4, "april": 4, "may": 5, "jun": 6, "june": 6, "jul": 7, "july": 7,
"aug": 8, "august": 8, "sep": 9, "september": 9, "oct": 10, "october": 10,
"nov": 11, "november": 11, "dec": 12, "december": 12,
}
_DDG_DATE_RE = _re_mod.compile(
r'(\d{4})-(\d{2})-(\d{2})'
r'|(\d{1,2})\s*[-]\s*\d{1,2}\s+'
r'(Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|'
r'Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)'
r'\s+(\d{4})'
r'|(Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|'
r'Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)'
r'\s+(\d{1,2})(?:\s*[-]\s*\d{1,2})?,?\s*(\d{4})',
_re_mod.IGNORECASE,
)
def _ddg_extract_date(text: str) -> str | None:
if not text:
return None
m = _DDG_DATE_RE.search(text)
if not m:
return None
if m.group(1):
return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
if m.group(5) and m.group(6):
mon = _DDG_MONTH.get(m.group(5).lower(), 0)
if mon:
return f"{m.group(6)}-{mon:02d}-{int(m.group(4)):02d}"
if m.group(7) and m.group(9):
mon = _DDG_MONTH.get(m.group(7).lower(), 0)
day = int(m.group(8)) if m.group(8) else 1
if mon:
return f"{m.group(9)}-{mon:02d}-{day:02d}"
return None
def _ddg_fetch(query: str, retry: int = 2) -> tuple[str, str]:
"""Returns (html, diag_info). Tries multiple times."""
import urllib.request as _ur
import urllib.parse as _up
url = "https://html.duckduckgo.com/html/?" + _up.urlencode({"q": query})
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "de-CH,de;q=0.9,en;q=0.7",
}
last_err = ""
for attempt in range(retry):
try:
req = _ur.Request(url, headers=headers)
with _ur.urlopen(req, timeout=15) as resp:
html = resp.read().decode("utf-8", errors="replace")
return html, f"ok len={len(html)}"
except Exception as exc:
last_err = f"{type(exc).__name__}: {exc}"
if attempt < retry - 1:
time.sleep(0.5)
return "", f"fetch_failed after {retry} attempts: {last_err}"
def _ddg_parse(html: str) -> list[dict]:
"""Multi-pattern robust parser. Returns list of items."""
import urllib.parse as _up
if not html:
return []
snippets: dict[int, str] = {}
for sm in _re_mod.finditer(r'class="result__snippet"[^>]*>(.*?)</(?:a|td|div|span)', html, _re_mod.DOTALL):
txt = _re_mod.sub(r"<[^>]+>", "", sm.group(1))
snippets[sm.start()] = _html_mod.unescape(txt).strip()
items: list[dict] = []
patterns = [
_re_mod.compile(r'<a[^>]*class="result__a"[^>]*href="([^"]*)"[^>]*>(.*?)</a>', _re_mod.DOTALL),
_re_mod.compile(r'<a[^>]*href="([^"]*)"[^>]*class="result__a"[^>]*>(.*?)</a>', _re_mod.DOTALL),
_re_mod.compile(r'class="result__title"[^>]*>.*?<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>', _re_mod.DOTALL),
]
seen: set[str] = set()
for pat in patterns:
for m in pat.finditer(html):
href = m.group(1).strip()
title_html = m.group(2).strip()
title = _re_mod.sub(r"<[^>]+>", "", title_html)
title = _html_mod.unescape(title).strip()
if not href or not title or len(title) < 5:
continue
if href.startswith("//duckduckgo.com/l/?uddg="):
try:
href = _up.unquote(_up.parse_qs(_up.urlparse(href).query).get("uddg", [href])[0])
except Exception:
pass
if not href.startswith("http"):
continue
norm = href.split("?")[0].rstrip("/").lower()
if norm in seen:
continue
seen.add(norm)
source = ""
try:
source = _up.urlparse(href).netloc
except Exception:
pass
snippet = ""
best_d = 999999
for sp, st in snippets.items():
d = abs(sp - m.start())
if d < best_d:
best_d = d
snippet = st
dt = _ddg_extract_date(title) or _ddg_extract_date(snippet)
items.append({"title": title, "url": href, "date": dt, "source": source})
if not items:
for um in _re_mod.finditer(r'uddg=([^&"]+)', html):
try:
href = _up.unquote(um.group(1))
if not href.startswith("http"):
continue
norm = href.split("?")[0].rstrip("/").lower()
if norm in seen:
continue
seen.add(norm)
source = _up.urlparse(href).netloc
items.append({"title": source, "url": href, "date": None, "source": source})
except Exception:
continue
return items
def _ddg_build_queries(specialty: str, region: str, year: str) -> list[str]:
r_up = region.upper().replace(",", " ")
eu_terms = "Europe European EU"
ch_terms = "Switzerland Swiss Schweiz Zurich Basel Bern Geneva Lausanne"
region_block = ""
if "EU" in r_up or "EUROPE" in r_up or not r_up.strip():
region_block = f"{eu_terms} {ch_terms}"
elif "CH" in r_up or "SCHWEIZ" in region.lower():
region_block = ch_terms
else:
region_block = region
return [
f"{specialty} congress {year} {region_block}",
f"{specialty} conference {year} {region_block}",
f"{specialty} meeting symposium {year} {region_block}",
f"{specialty} workshop course {year} {region_block}",
f"{specialty} kongress fortbildung {year} {region_block}",
f"{specialty} weiterbildung tagung kurs {year} {region_block}",
f"{specialty} congress conference {year} Switzerland Basel Zurich Bern",
f"{specialty} annual meeting {year} Europe",
]
def _search_ddg_congresses(
specialty: str, region: str, from_date: str, to_date: str, limit: int = 30,
) -> tuple[list[dict], dict]:
"""Returns (items, diagnostics_dict)."""
from_year = from_date[:4] if from_date and len(from_date) >= 4 else "2026"
year = from_year
cache_key = f"ddg2|{specialty}|{region}|{year}|{limit}"
now = time.time()
cached = _ddg_cache.get(cache_key)
if cached and cached.get("exp", 0) > now:
return list(cached.get("items", [])), {"source": "cache"}
queries = _ddg_build_queries(specialty, region, year)
all_items: list[dict] = []
seen_urls: set[str] = set()
seen_titles: set[str] = set()
diag: dict[str, Any] = {"queries_run": 0, "total_raw": 0, "fetch_results": []}
for qi, q in enumerate(queries):
if len(all_items) >= limit * 2:
break
if qi > 0:
time.sleep(0.25)
html, fetch_info = _ddg_fetch(q)
diag["queries_run"] = qi + 1
diag["fetch_results"].append({"q": q[:60], "info": fetch_info, "html_len": len(html)})
if not html:
continue
page_items = _ddg_parse(html)
diag["total_raw"] += len(page_items)
for it in page_items:
url_norm = it["url"].split("?")[0].rstrip("/").lower()
title_norm = _re_mod.sub(r"\s+", " ", it["title"].lower().strip())[:80]
if url_norm in seen_urls or title_norm in seen_titles:
continue
seen_urls.add(url_norm)
seen_titles.add(title_norm)
all_items.append(it)
all_items.sort(key=lambda x: (0, x["date"]) if x.get("date") else (1, ""))
result = all_items[:limit]
diag["deduped"] = len(all_items)
diag["returned"] = len(result)
_ddg_cache[cache_key] = {"items": list(result), "exp": now + _DDG_CACHE_TTL}
return result, diag
# --- Kongress 2 endpoints (Google + DDG fallback) ────────────────────────────
@app.get("/api/events/live_google_test")
def api_events_live_google_test(
request: Request,
specialty: str = Query("dermatology"),
regions: str = Query("EU"),
from_date: Optional[str] = Query("2026-01-01", alias="from"),
to_date: Optional[str] = Query("2026-12-31", alias="to"),
limit: int = Query(30, ge=1, le=30),
):
if not _check_token(request):
raise HTTPException(status_code=401, detail="Unauthorized")
fd = from_date or "2026-01-01"
td = to_date or "2026-12-31"
try:
items, diag = _search_ddg_congresses(specialty, regions, fd, td, limit)
return {
"ok": True, "error": None, "items": items, "provider": "ddg",
"diag": {
"queries_run": diag.get("queries_run", 0),
"total_raw": diag.get("total_raw", 0),
"deduped": diag.get("deduped", 0),
"returned": diag.get("returned", 0),
},
}
except Exception as exc:
return {
"ok": False,
"error": f"Search failed: {type(exc).__name__}: {exc}",
"items": [], "provider": "ddg",
"diag": {"exception": str(exc)},
}
@app.get("/api/events/live_ddg")
def api_events_live_ddg(
request: Request,
specialty: str = Query("dermatology"),
regions: str = Query("EU"),
from_date: Optional[str] = Query("2026-01-01", alias="from"),
to_date: Optional[str] = Query("2026-12-31", alias="to"),
limit: int = Query(30, ge=1, le=30),
):
if not _check_token(request):
raise HTTPException(status_code=401, detail="Unauthorized")
try:
items, diag = _search_ddg_congresses(
specialty, regions,
from_date or "2026-01-01", to_date or "2026-12-31", limit,
)
return {"ok": True, "error": None, "items": items, "provider": "ddg", "diag": diag}
except Exception as exc:
return {"ok": False, "error": f"DDG: {type(exc).__name__}", "items": [], "provider": "ddg"}
# --- Minimal public info endpoint (leak-free) ---
# Keep stable: {"name":"AZA","build":"..."} where build is from env AZA_BUILD (default "dev")
@app.get("/version")
def version(_: None = Depends(require_api_token)):
try:
root = Path(__file__).resolve().parent
version_file = root / "release" / "version.json"
if version_file.exists():
with open(version_file, "r", encoding="utf-8") as f:
data = json.load(f)
return {
"name": data.get("name", "AZA"),
"build": data.get("build", "dev"),
"version": data.get("version", "0.0.0")
}
except Exception:
pass
return {"name": "AZA", "build": "dev", "version": "0.0.0"}
@app.get("/download")
def download_info():
try:
root = Path(__file__).resolve().parent
version_file = root / "release" / "version.json"
if version_file.exists():
with open(version_file, "r", encoding="utf-8") as f:
data = json.load(f)
return {
"version": data.get("version"),
"download_url": data.get("download_url"),
"release_notes": data.get("release_notes", "")
}
except Exception:
pass
return {
"version": None,
"download_url": None,
"release_notes": ""
}
@app.post("/telemetry/ping")
def telemetry_ping(data: TelemetryPing, request: Request):
client_ip = request.client.host if request.client else "unknown"
now_ts = datetime.utcnow().timestamp()
recent_hits = [
ts for ts in _telemetry_hits[client_ip]
if now_ts - ts < TELEMETRY_RATE_WINDOW_SECONDS
]
_telemetry_hits[client_ip] = recent_hits
if len(recent_hits) >= TELEMETRY_RATE_LIMIT:
raise HTTPException(status_code=429, detail="telemetry rate limit exceeded")
_telemetry_hits[client_ip].append(now_ts)
if data.event not in ALLOWED_TELEMETRY_EVENTS:
raise HTTPException(status_code=400, detail="invalid telemetry event")
if data.event == "crash":
if not data.crash_type or data.crash_type not in ALLOWED_CRASH_TYPES:
raise HTTPException(status_code=400, detail="invalid crash_type")
if data.event != "update_check" and data.target_version is not None:
raise HTTPException(status_code=400, detail="target_version only allowed for update_check")
# Minimal telemetry no PHI, no persistence yet
print(
"[telemetry]",
{
"time": datetime.utcnow().isoformat(),
"event": data.event,
"version": data.version,
"platform": data.platform,
"app": data.app,
"crash_type": data.crash_type,
"target_version": data.target_version,
},
)
_telemetry_event_counts[data.event] += 1
return {"status": "ok"}
@app.get("/admin/telemetry/stats")
def telemetry_stats():
uptime_seconds = int((datetime.utcnow() - _server_start_time).total_seconds())
return {
"server_start_time": _server_start_time.isoformat() + "Z",
"uptime_seconds": uptime_seconds,
"events": dict(_telemetry_event_counts)
}
@app.get("/license/debug")
def license_debug():
db_path = _stripe_db_path()
exists = db_path.exists()
active_count = 0
current_period_end = None
if exists:
try:
with sqlite3.connect(db_path) as con:
row = con.execute("SELECT COUNT(*) FROM licenses WHERE status='active'").fetchone()
active_count = int(row[0]) if row else 0
row2 = con.execute("SELECT MAX(current_period_end) FROM licenses").fetchone()
current_period_end = int(row2[0]) if row2 and row2[0] is not None else None
except Exception:
active_count = 0
current_period_end = None
return JSONResponse(content={
"stripe_db_path": str(db_path.resolve()),
"exists": exists,
"active_count": active_count,
"current_period_end": current_period_end,
"cwd": os.getcwd(),
})
@app.get("/license/status")
def license_status(
request: Request,
email: Optional[str] = Query(None),
license_key: Optional[str] = Query(None),
x_api_token: Optional[str] = Header(default=None, alias="X-API-Token"),
):
has_api_token = False
if x_api_token:
try:
require_api_token(x_api_token=x_api_token)
has_api_token = True
except HTTPException:
pass
has_license_key = bool(license_key and license_key.strip())
if not has_api_token and not has_license_key:
raise HTTPException(status_code=401, detail="Unauthorized API-Token oder Lizenzschluessel erforderlich.")
db_path = _stripe_db_path()
if not db_path.exists():
return {"valid": False, "valid_until": None}
status = None
current_period_end = None
customer_email = None
try:
try:
import stripe_routes # type: ignore
if hasattr(stripe_routes, "_ensure_storage"):
stripe_routes._ensure_storage() # type: ignore
except Exception:
pass
with sqlite3.connect(db_path) as con:
row = None
if license_key and license_key.strip():
row = con.execute(
"""
SELECT status, current_period_end, customer_email
FROM licenses
WHERE upper(license_key) = ?
ORDER BY updated_at DESC
LIMIT 1
""",
(license_key.strip().upper(),),
).fetchone()
if row is None and email and email.strip():
row = con.execute(
"""
SELECT status, current_period_end, customer_email
FROM licenses
WHERE lower(customer_email) = ?
ORDER BY updated_at DESC
LIMIT 1
""",
(email.strip().lower(),),
).fetchone()
if row is None:
row = con.execute(
"""
SELECT status, current_period_end, customer_email
FROM licenses
ORDER BY updated_at DESC
LIMIT 1
"""
).fetchone()
if row:
status = row[0]
current_period_end = int(row[1]) if row[1] is not None else None
customer_email = str(row[2]).strip() if row[2] is not None else None
except Exception:
status = None
current_period_end = None
customer_email = None
decision = compute_license_decision(current_period_end=current_period_end, status=status)
device_id = request.headers.get("X-Device-Id")
device_name = request.headers.get("X-Device-Name", "")
app_version = request.headers.get("X-App-Version", "")
result: dict = {
"valid": bool(decision.valid),
"valid_until": decision.valid_until if decision.valid else None,
"license_active": bool(decision.valid),
"allowed_devices": 0,
"used_devices": 0,
"device_allowed": True,
"reason": "ok",
}
if device_id and customer_email:
dd = enforce_and_touch_device(
customer_email=customer_email, user_key="default",
device_id=device_id, db_path=str(db_path),
device_name=device_name, app_version=app_version,
)
result["allowed_devices"] = dd.devices_allowed
result["used_devices"] = dd.devices_used
result["license_active"] = dd.license_active
result["device_allowed"] = dd.allowed
result["reason"] = dd.reason
if not dd.allowed:
result["valid"] = False
result["valid_until"] = None
return result
@app.post("/license/activate")
def license_activate(
request: Request,
license_key: str = Body(..., embed=True),
):
"""Aktiviert eine Lizenz per Lizenzschluessel. Kein API-Token noetig der Key selbst ist das Credential."""
db_path = _stripe_db_path()
if not db_path.exists():
raise HTTPException(status_code=404, detail="Keine Lizenz gefunden.")
key_clean = (license_key or "").strip().upper()
if not key_clean:
raise HTTPException(status_code=400, detail="Lizenzschluessel fehlt.")
try:
import stripe_routes # type: ignore
if hasattr(stripe_routes, "_ensure_storage"):
stripe_routes._ensure_storage()
except Exception:
pass
with sqlite3.connect(db_path) as con:
row = con.execute(
"""
SELECT subscription_id, status, current_period_end, customer_email,
allowed_users, devices_per_user
FROM licenses
WHERE upper(license_key) = ?
ORDER BY updated_at DESC
LIMIT 1
""",
(key_clean,),
).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Lizenzschluessel ungueltig.")
sub_id, status, cpe, cust_email, au, dpu = row
current_period_end = int(cpe) if cpe is not None else None
decision = compute_license_decision(current_period_end=current_period_end, status=status)
device_id = request.headers.get("X-Device-Id")
if not device_id:
raise HTTPException(
status_code=400,
detail="X-Device-Id Header fehlt. Bitte die AZA Desktop-App fuer die Aktivierung verwenden.",
)
device_name = request.headers.get("X-Device-Name", "")
app_version = request.headers.get("X-App-Version", "")
is_valid = bool(decision.valid)
result: dict = {
"valid": is_valid,
"valid_until": decision.valid_until if is_valid else None,
"customer_email": cust_email or "",
"status": status or "",
"license_active": is_valid,
"allowed_devices": 0,
"used_devices": 0,
"device_allowed": True,
"reason": "ok",
}
if cust_email:
dd = enforce_and_touch_device(
customer_email=cust_email, user_key="default",
device_id=device_id, db_path=str(db_path),
device_name=device_name, app_version=app_version,
)
result["allowed_devices"] = dd.devices_allowed
result["used_devices"] = dd.devices_used
result["device_allowed"] = dd.allowed
result["reason"] = dd.reason
if not dd.allowed:
raise HTTPException(
status_code=403,
detail=f"Geraete-Limit erreicht: {dd.devices_used}/{dd.devices_allowed} Geraete belegt.",
headers={"X-Device-Reason": dd.reason},
)
return result
@app.get("/billing/success")
def billing_success(session_id: Optional[str] = Query(None)) -> HTMLResponse:
customer_email = ""
if session_id:
try:
import stripe as _stripe
_stripe.api_key = os.environ.get("STRIPE_SECRET_KEY", "")
sess = _stripe.checkout.Session.retrieve(session_id)
customer_email = getattr(sess, "customer_email", "") or ""
if not customer_email and getattr(sess, "customer_details", None):
customer_email = sess.customer_details.get("email", "") or ""
except Exception:
pass
download_url = "/download/aza_desktop_setup.exe"
try:
vf = Path(__file__).resolve().parent / "release" / "version.json"
if vf.exists():
with open(vf, "r", encoding="utf-8") as _f:
_vd = json.load(_f)
download_url = _vd.get("download_url", download_url)
except Exception:
pass
license_key_display = ""
if customer_email:
try:
from stripe_routes import get_license_key_for_email
_lk = get_license_key_for_email(customer_email)
if _lk:
license_key_display = _lk
except Exception:
pass
email_line = ""
if customer_email:
email_line = f'<p style="margin-top:12px;font-size:14px;color:#555;">Ihr Konto: <strong>{customer_email}</strong></p>'
html = f"""<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AZA Vielen Dank</title>
<style>
body {{ font-family: 'Segoe UI', system-ui, sans-serif; margin:0; background:#F7F8FA; color:#1a1a2e; }}
.wrap {{ max-width:640px; margin:60px auto; background:#fff; border-radius:12px;
box-shadow:0 2px 12px rgba(0,0,0,.08); padding:48px 40px; }}
h1 {{ font-size:24px; margin:0 0 8px; color:#0078D7; }}
.sub {{ font-size:15px; color:#555; margin-bottom:32px; }}
.dl-btn {{ display:inline-block; padding:14px 32px; background:#0078D7; color:#fff;
text-decoration:none; border-radius:8px; font-size:16px; font-weight:600;
transition:background .2s; }}
.dl-btn:hover {{ background:#005fa3; }}
.steps {{ margin:32px 0 0; padding:0; list-style:none; counter-reset:step; }}
.steps li {{ position:relative; padding:0 0 20px 40px; font-size:14px; line-height:1.6; }}
.steps li::before {{ content:counter(step); counter-increment:step;
position:absolute; left:0; top:0; width:26px; height:26px; border-radius:50%;
background:#E8F4FD; color:#0078D7; font-weight:700; font-size:13px;
display:flex; align-items:center; justify-content:center; }}
.note {{ margin-top:28px; padding:16px 20px; background:#F0F7ED; border-radius:8px;
font-size:13px; color:#2E7D32; line-height:1.5; }}
.footer {{ margin-top:36px; font-size:12px; color:#999; text-align:center; }}
.footer a {{ color:#0078D7; text-decoration:none; }}
</style>
</head>
<body>
<div class="wrap">
<h1>Vielen Dank fuer Ihr Abonnement</h1>
<p class="sub">Ihr Zugang zu AZA Medical AI Assistant ist jetzt aktiv.</p>
{email_line}
<div style="text-align:center; margin:28px 0;">
<a class="dl-btn" href="{download_url}">AZA Desktop herunterladen</a>
</div>
{"" if not license_key_display else f'''
<div style="margin:24px 0; padding:20px 24px; background:#E8F4FD; border-radius:10px;
border:1px solid #B3D8F0; text-align:center;">
<p style="margin:0 0 8px; font-size:13px; color:#555;">Ihr Lizenzschluessel:</p>
<p style="margin:0; font-size:22px; font-weight:700; letter-spacing:2px; color:#0078D7;
font-family:Consolas,monospace;">{license_key_display}</p>
<p style="margin:8px 0 0; font-size:12px; color:#888;">
Bitte notieren Sie diesen Schluessel. Sie benoetigen ihn zur Aktivierung in der App.</p>
</div>
'''}
<h2 style="font-size:16px; margin-bottom:12px;">Installation in 3 Schritten</h2>
<ol class="steps">
<li><strong>Installer starten</strong> Doppelklick auf die heruntergeladene Datei.
Falls Windows SmartScreen warnt: &laquo;Weitere Informationen&raquo; &rarr; &laquo;Trotzdem ausfuehren&raquo;.</li>
<li><strong>Lizenzschluessel eingeben</strong> Oeffnen Sie in der App den Aktivierungsdialog
und geben Sie den oben angezeigten Schluessel ein.</li>
<li><strong>Loslegen</strong> Waehlen Sie im Startbildschirm Ihr gewuenschtes Modul
und beginnen Sie mit der Arbeit.</li>
</ol>
<div class="note">
<strong>Hinweis:</strong> Ihr Lizenzschluessel ist an Ihr Abonnement gebunden.
Bewahren Sie ihn sicher auf.
</div>
<div class="footer">
<p>Bei Fragen: <a href="mailto:support@aza-medwork.ch">support@aza-medwork.ch</a></p>
<p>&copy; AZA Medical AI Assistant aza-medwork.ch</p>
</div>
</div>
</body>
</html>"""
return HTMLResponse(content=html)
@app.get("/billing/cancel")
def billing_cancel() -> HTMLResponse:
html = """<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AZA Checkout abgebrochen</title>
<style>
body { font-family: 'Segoe UI', system-ui, sans-serif; margin:0; background:#F7F8FA; color:#1a1a2e; }
.wrap { max-width:520px; margin:80px auto; background:#fff; border-radius:12px;
box-shadow:0 2px 12px rgba(0,0,0,.08); padding:48px 40px; text-align:center; }
h1 { font-size:22px; margin:0 0 12px; }
p { font-size:15px; color:#555; line-height:1.6; }
.btn { display:inline-block; margin-top:24px; padding:12px 28px; background:#0078D7;
color:#fff; text-decoration:none; border-radius:8px; font-size:15px; }
.btn:hover { background:#005fa3; }
</style>
</head>
<body>
<div class="wrap">
<h1>Checkout abgebrochen</h1>
<p>Der Bezahlvorgang wurde nicht abgeschlossen.<br>
Sie koennen jederzeit zurueckkehren und es erneut versuchen.</p>
<a class="btn" href="/">Zurueck zur Startseite</a>
</div>
</body>
</html>"""
return HTMLResponse(content=html)
@app.get("/v1/schedule")
def get_schedule(
request: Request,
start: Optional[str] = None,
end: Optional[str] = None,
employee: Optional[str] = None,
date_from: Optional[str] = Query(None, alias="from"),
date_to: Optional[str] = Query(None, alias="to"),
):
request_id = f"srv_{uuid.uuid4().hex[:12]}"
t0 = time.perf_counter()
if not _check_token(request):
return JSONResponse(status_code=401, content={
"success": False, "items": [], "error": "unauthorized",
"request_id": request_id, "duration_ms": 0,
})
user = _get_user(request)
if not user:
return JSONResponse(status_code=400, content={**_NO_USER_RESPONSE, "items": [],
"request_id": request_id, "duration_ms": 0})
try:
eff_start = date_from or start
eff_end = date_to or end
has_any_filter = employee or eff_start or eff_end
if has_any_filter:
d_start = date.fromisoformat(eff_start) if eff_start else date.min
d_end = date.fromisoformat(eff_end) if eff_end else date.max
else:
today = date.today()
d_start = today - timedelta(days=30)
d_end = today + timedelta(days=90)
items = [
entry for entry in SCHEDULE_DATA
if entry.get("user") == user
and d_start <= date.fromisoformat(entry["date"]) <= d_end
and (not employee or entry["employee"] == employee)
]
duration_ms = int((time.perf_counter() - t0) * 1000)
return JSONResponse(content={
"success": True, "items": items, "error": "",
"request_id": request_id, "duration_ms": duration_ms,
})
except ValueError as e:
duration_ms = int((time.perf_counter() - t0) * 1000)
return JSONResponse(status_code=400, content={
"success": False, "items": [], "error": f"Ungültiges Datum: {e}",
"request_id": request_id, "duration_ms": duration_ms,
})
@app.get("/v1/backup")
def get_backup(request: Request):
request_id = f"srv_{uuid.uuid4().hex[:12]}"
if not _check_token(request):
return JSONResponse(status_code=401, content={
"success": False, "error": "unauthorized",
"request_id": request_id, "duration_ms": 0,
})
user = _get_user(request)
if not user:
return JSONResponse(status_code=400, content={**_NO_USER_RESPONSE,
"request_id": request_id, "duration_ms": 0})
user_items = [e for e in SCHEDULE_DATA if e.get("user") == user]
now = datetime.now()
payload = {
"version": 1,
"created_at": now.isoformat(),
"user": user,
"schedule_items": user_items,
}
body = _json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8")
filename = f"medwork_backup_{now.strftime('%Y%m%d_%H%M%S')}.json"
return Response(
content=body,
media_type="application/json; charset=utf-8",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
_REQUIRED_ITEM_KEYS = {"employee", "date", "type", "note"}
_VALID_TYPES = {"vacation", "sick", "work"}
@app.post("/v1/restore")
async def restore_backup(
request: Request,
file: UploadFile = File(...),
dry_run: bool = True,
):
request_id = f"srv_{uuid.uuid4().hex[:12]}"
t0 = time.perf_counter()
if not _check_token(request):
return JSONResponse(status_code=401, content={
"success": False, "import_items": 0, "dry_run": dry_run,
"error": "unauthorized", "request_id": request_id, "duration_ms": 0,
})
user = _get_user(request)
if not user:
return JSONResponse(status_code=400, content={**_NO_USER_RESPONSE,
"import_items": 0, "dry_run": dry_run,
"request_id": request_id, "duration_ms": 0})
try:
raw = await file.read()
data = _json.loads(raw.decode("utf-8"))
if not isinstance(data, dict) or data.get("version") != 1:
raise ValueError("version muss 1 sein")
items = data.get("schedule_items")
if not isinstance(items, list):
raise ValueError("schedule_items muss eine Liste sein")
for i, entry in enumerate(items):
if not isinstance(entry, dict):
raise ValueError(f"Item [{i}] ist kein Objekt")
missing = _REQUIRED_ITEM_KEYS - entry.keys()
if missing:
raise ValueError(f"Item [{i}]: fehlende Felder {sorted(missing)}")
entry["user"] = user
if not dry_run:
global SCHEDULE_DATA
SCHEDULE_DATA = [e for e in SCHEDULE_DATA if e.get("user") != user] + items
_audit_write(request_id, user, "POST", "/v1/restore", "RESTORE", True, f"import_items={len(items)}")
duration_ms = int((time.perf_counter() - t0) * 1000)
return JSONResponse(content={
"success": True, "import_items": len(items), "dry_run": dry_run,
"error": "", "request_id": request_id, "duration_ms": duration_ms,
})
except Exception as e:
duration_ms = int((time.perf_counter() - t0) * 1000)
if not dry_run:
_audit_write(request_id, user, "POST", "/v1/restore", "RESTORE", False, str(e))
return JSONResponse(status_code=400, content={
"success": False, "import_items": 0, "dry_run": dry_run,
"error": str(e), "request_id": request_id, "duration_ms": duration_ms,
})
@app.post("/v1/schedule/item")
def add_schedule_item(request: Request, payload: ScheduleItemIn):
request_id = f"srv_{uuid.uuid4().hex[:12]}"
t0 = time.perf_counter()
if not _check_token(request):
return JSONResponse(status_code=401, content={
"success": False, "error": "unauthorized",
"request_id": request_id, "duration_ms": 0,
})
user = _get_user(request)
if not user:
return JSONResponse(status_code=400, content={**_NO_USER_RESPONSE,
"request_id": request_id, "duration_ms": 0})
employee = payload.employee.strip()
raw_date = payload.date.isoformat()
item_type = payload.type.strip()
note = (payload.note if hasattr(payload, "note") and payload.note else "")
if item_type not in _VALID_TYPES:
_audit_write(request_id, user, "POST", "/v1/schedule/item", "CREATE", False, f"invalid type={item_type}")
return JSONResponse(status_code=400, content={
"success": False, "error": f"type muss einer von {sorted(_VALID_TYPES)} sein",
"request_id": request_id,
"duration_ms": int((time.perf_counter() - t0) * 1000),
})
for existing in SCHEDULE_DATA:
if (existing.get("user") == user
and existing["employee"] == employee
and existing["date"] == raw_date
and existing["type"] == item_type):
_audit_write(request_id, user, "POST", "/v1/schedule/item", "CREATE", False, "duplicate")
raise HTTPException(status_code=409, detail="duplicate")
SCHEDULE_DATA.append({
"employee": employee, "date": raw_date,
"type": item_type, "note": note, "user": user,
})
_audit_write(request_id, user, "POST", "/v1/schedule/item", "CREATE", True, f"employee={employee} date={raw_date} type={item_type}")
return JSONResponse(status_code=201, content={
"success": True, "error": "",
"request_id": request_id,
"duration_ms": int((time.perf_counter() - t0) * 1000),
})
@app.delete("/v1/schedule/item")
def delete_schedule_item(request: Request, payload: ScheduleItemIn):
request_id = f"srv_{uuid.uuid4().hex[:12]}"
t0 = time.perf_counter()
if not _check_token(request):
return JSONResponse(status_code=401, content={
"success": False, "error": "unauthorized",
"request_id": request_id, "duration_ms": 0,
})
user = _get_user(request)
if not user:
return JSONResponse(status_code=400, content={**_NO_USER_RESPONSE,
"request_id": request_id, "duration_ms": 0})
employee = payload.employee.strip()
raw_date = payload.date.isoformat()
item_type = payload.type.strip()
for i, existing in enumerate(SCHEDULE_DATA):
if (existing.get("user") == user
and existing["employee"] == employee
and existing["date"] == raw_date
and existing["type"] == item_type):
SCHEDULE_DATA.pop(i)
_audit_write(request_id, user, "DELETE", "/v1/schedule/item", "DELETE", True, f"employee={employee} date={raw_date} type={item_type}")
return JSONResponse(content={
"success": True, "error": "",
"request_id": request_id,
"duration_ms": int((time.perf_counter() - t0) * 1000),
})
_audit_write(request_id, user, "DELETE", "/v1/schedule/item", "DELETE", False, "not found")
raise HTTPException(status_code=404, detail="not found")
@app.put("/v1/schedule/item")
async def update_schedule_item(request: Request):
request_id = f"srv_{uuid.uuid4().hex[:12]}"
t0 = time.perf_counter()
if not _check_token(request):
return JSONResponse(status_code=401, content={
"success": False, "error": "unauthorized",
"request_id": request_id, "duration_ms": 0,
})
user = _get_user(request)
if not user:
return JSONResponse(status_code=400, content={**_NO_USER_RESPONSE,
"request_id": request_id, "duration_ms": 0})
try:
data = await request.json()
try:
payload = ScheduleItemUpdate.model_validate(data)
except AttributeError:
payload = ScheduleItemUpdate.parse_obj(data)
except Exception as e:
raise HTTPException(status_code=422, detail=f"invalid body: {e}")
old_emp = payload.old.employee.strip()
old_date = payload.old.date.isoformat()
old_type = payload.old.type.strip()
new_emp = payload.new.employee.strip()
new_date = payload.new.date.isoformat()
new_type = payload.new.type.strip()
new_note = payload.new.note or ""
if new_type not in _VALID_TYPES:
_audit_write(request_id, user, "PUT", "/v1/schedule/item", "UPDATE", False, f"invalid type={new_type}")
raise HTTPException(status_code=400, detail=f"type muss einer von {sorted(_VALID_TYPES)} sein")
found_idx = None
for i, entry in enumerate(SCHEDULE_DATA):
if (entry.get("user") == user
and entry["employee"] == old_emp
and entry["date"] == old_date
and entry["type"] == old_type):
found_idx = i
break
if found_idx is None:
_audit_write(request_id, user, "PUT", "/v1/schedule/item", "UPDATE", False, "not found")
raise HTTPException(status_code=404, detail="not found")
new_key = (new_emp, new_date, new_type)
old_key = (old_emp, old_date, old_type)
if new_key != old_key:
for j, entry in enumerate(SCHEDULE_DATA):
if (entry.get("user") == user
and entry["employee"] == new_emp
and entry["date"] == new_date
and entry["type"] == new_type):
_audit_write(request_id, user, "PUT", "/v1/schedule/item", "UPDATE", False, "duplicate")
raise HTTPException(status_code=409, detail="duplicate")
SCHEDULE_DATA[found_idx] = {
"employee": new_emp, "date": new_date,
"type": new_type, "note": new_note, "user": user,
}
_audit_write(request_id, user, "PUT", "/v1/schedule/item", "UPDATE", True, f"employee={new_emp} date={new_date} type={new_type}")
return JSONResponse(content={
"success": True, "error": "",
"request_id": request_id,
"duration_ms": int((time.perf_counter() - t0) * 1000),
})
@app.put("/v1/schedule/item/by_day")
async def update_schedule_item_by_day(request: Request):
request_id = f"srv_{uuid.uuid4().hex[:12]}"
t0 = time.perf_counter()
if not _check_token(request):
return JSONResponse(status_code=401, content={
"success": False, "error": "unauthorized",
"request_id": request_id, "duration_ms": 0,
})
user = _get_user(request)
if not user:
return JSONResponse(status_code=400, content={**_NO_USER_RESPONSE,
"request_id": request_id, "duration_ms": 0})
try:
data = await request.json()
try:
payload = ScheduleItemIn.model_validate(data)
except AttributeError:
payload = ScheduleItemIn.parse_obj(data)
except Exception as e:
raise HTTPException(status_code=422, detail=f"invalid body: {e}")
emp = payload.employee.strip()
d = payload.date.isoformat()
new_type = payload.type.strip()
new_note = payload.note or ""
if new_type not in _VALID_TYPES:
_audit_write(request_id, user, "PUT", "/v1/schedule/item/by_day", "UPDATE_BY_DAY", False, f"invalid type={new_type}")
raise HTTPException(status_code=400, detail=f"type muss einer von {sorted(_VALID_TYPES)} sein")
matches = [(i, entry) for i, entry in enumerate(SCHEDULE_DATA)
if entry.get("user") == user and entry["employee"] == emp and entry["date"] == d]
if len(matches) == 0:
_audit_write(request_id, user, "PUT", "/v1/schedule/item/by_day", "UPDATE_BY_DAY", False, "not found")
raise HTTPException(status_code=404, detail="not found")
if len(matches) > 1:
_audit_write(request_id, user, "PUT", "/v1/schedule/item/by_day", "UPDATE_BY_DAY", False, "ambiguous")
raise HTTPException(status_code=409, detail="ambiguous")
found_idx = matches[0][0]
found_entry = matches[0][1]
if found_entry["type"] != new_type:
for j, entry in enumerate(SCHEDULE_DATA):
if j != found_idx and entry.get("user") == user and entry["employee"] == emp and entry["date"] == d and entry["type"] == new_type:
_audit_write(request_id, user, "PUT", "/v1/schedule/item/by_day", "UPDATE_BY_DAY", False, "duplicate")
raise HTTPException(status_code=409, detail="duplicate")
SCHEDULE_DATA[found_idx] = {
"employee": emp, "date": d,
"type": new_type, "note": new_note, "user": user,
}
_audit_write(request_id, user, "PUT", "/v1/schedule/item/by_day", "UPDATE_BY_DAY", True, f"employee={emp} date={d} new_type={new_type}")
return JSONResponse(content={
"success": True, "error": "",
"request_id": request_id,
"duration_ms": int((time.perf_counter() - t0) * 1000),
})
@app.delete("/v1/schedule/item/by_day")
async def delete_schedule_item_by_day(request: Request):
request_id = f"srv_{uuid.uuid4().hex[:12]}"
t0 = time.perf_counter()
if not _check_token(request):
return JSONResponse(status_code=401, content={
"success": False, "error": "unauthorized",
"request_id": request_id, "duration_ms": 0,
})
user = _get_user(request)
if not user:
return JSONResponse(status_code=400, content={**_NO_USER_RESPONSE,
"request_id": request_id, "duration_ms": 0})
try:
data = await request.json()
emp = str(data.get("employee", "")).strip()
raw_date = str(data.get("date", "")).strip()
if not emp or not raw_date:
raise ValueError("employee und date sind erforderlich")
date.fromisoformat(raw_date)
except Exception as e:
raise HTTPException(status_code=422, detail=f"invalid body: {e}")
matches = [i for i, entry in enumerate(SCHEDULE_DATA)
if entry.get("user") == user and entry["employee"] == emp and entry["date"] == raw_date]
if len(matches) == 0:
_audit_write(request_id, user, "DELETE", "/v1/schedule/item/by_day", "DELETE_BY_DAY", False, "not found")
raise HTTPException(status_code=404, detail="not found")
if len(matches) > 1:
_audit_write(request_id, user, "DELETE", "/v1/schedule/item/by_day", "DELETE_BY_DAY", False, "ambiguous")
raise HTTPException(status_code=409, detail="ambiguous")
SCHEDULE_DATA.pop(matches[0])
_audit_write(request_id, user, "DELETE", "/v1/schedule/item/by_day", "DELETE_BY_DAY", True, f"employee={emp} date={raw_date}")
return JSONResponse(content={
"success": True, "error": "",
"request_id": request_id,
"duration_ms": int((time.perf_counter() - t0) * 1000),
})
@app.post("/v1/transcribe", dependencies=[Depends(require_api_token)])
async def transcribe(
request: Request,
file: UploadFile = File(...),
language: str = Form(LANGUAGE),
prompt: str = Form(""),
domain: str = Form("medical"),
specialty: str = Form(""),
):
# --- Abuse protection: rate limit + request size limit ---
ip = request.client.host if request.client else "unknown"
tok = request.headers.get("X-API-Token", "none")
default_ip_limiter.consume(f"ip:{ip}", cost=1.0)
default_token_limiter.consume(f"tok:{tok}", cost=1.0)
content_length = request.headers.get("content-length")
if content_length is not None:
try:
if int(content_length) > MAX_TRANSCRIBE_BODY_BYTES:
raise HTTPException(status_code=413, detail="Request too large")
except ValueError:
# ignore malformed header; downstream may still fail safely
pass
request_id = f"srv_{uuid.uuid4().hex[:12]}"
t0 = time.perf_counter()
tmp_path = None
fname = file.filename or "unknown"
file_bytes = 0
try:
safe_name = safe_upload_filename(fname)
ext = os.path.splitext(safe_name)[1].lower()
content_type = (file.content_type or "").strip().lower()
if content_type and content_type not in ALLOWED_TRANSCRIBE_CONTENT_TYPES:
raise ValueError(
f"Content-Type {content_type} nicht erlaubt, nur: {', '.join(sorted(ALLOWED_TRANSCRIBE_CONTENT_TYPES))}"
)
file_bytes = 0
with tempfile.NamedTemporaryFile(prefix="aza_", suffix=ext, delete=False) as tmp:
tmp_path = tmp.name
while True:
chunk = await file.read(1024 * 1024) # 1 MB chunks
if not chunk:
break
file_bytes += len(chunk)
if file_bytes > MAX_TRANSCRIBE_BODY_BYTES:
raise ValueError("Datei zu gross (max 500 MB)")
tmp.write(chunk)
client = _get_openai()
with open(tmp_path, "rb") as f:
is_gpt_transcribe = "gpt-" in TRANSCRIBE_MODEL
# NOTE: Some installed openai-python versions do not support an "instructions"
# parameter for audio transcriptions. Use "prompt" for maximum compatibility.
params = dict(model=TRANSCRIBE_MODEL, file=f, language=language)
chosen_prompt = (prompt or "").strip()
dom = str(domain or "").strip().lower()
if not chosen_prompt:
chosen_prompt = WHISPER_GENERAL_PROMPT if dom == "general" else WHISPER_MEDICAL_PROMPT
# Fachrichtungsabhängiges Medikamentenlexikon als zusätzlicher Prompt-Hinweis
# (wir ändern NICHT den Transkript-Text, nur den Prompt)
if dom != "general":
medication_lexicon = get_active_medication_lexicon(specialty)
if medication_lexicon:
chosen_prompt = (
chosen_prompt
+ " Wichtige Medikamente und Wirkstoffe: "
+ ", ".join(medication_lexicon[:50])
+ "."
)
if is_gpt_transcribe:
params["prompt"] = (prompt or "").strip() or GPT_TRANSCRIBE_SHORT_PROMPT
else:
params["prompt"] = chosen_prompt
# temperature is supported for whisper-style models; keep it conservative.
if not is_gpt_transcribe:
params["temperature"] = 0.0
params["file"] = f
resp = client.audio.transcriptions.create(**params)
text = getattr(resp, "text", "") or ""
if not text:
try:
if isinstance(resp, dict):
text = resp.get("text", "") or ""
except Exception:
pass
if not text:
try:
if hasattr(resp, "model_dump"):
d = resp.model_dump()
if isinstance(d, dict):
text = d.get("text", "") or ""
except Exception:
pass
used_fallback = False
if AZA_ENABLE_WHISPER_FALLBACK and (not text.strip()) and ("gpt-" in TRANSCRIBE_MODEL):
try:
min_fallback_bytes = 250_000
if int(file_bytes) < min_fallback_bytes:
raise RuntimeError("fallback_skipped_small_audio")
with open(tmp_path, "rb") as fb_f:
fb_params = dict(model="whisper-1", file=fb_f, language=language)
fb_params["prompt"] = WHISPER_GENERAL_PROMPT if dom == "general" else WHISPER_MEDICAL_PROMPT
fb_resp = client.audio.transcriptions.create(**fb_params)
text = getattr(fb_resp, "text", "") or ""
if not text and hasattr(fb_resp, "model_dump"):
dd = fb_resp.model_dump()
if isinstance(dd, dict):
text = dd.get("text", "") or ""
if text.strip():
_t = text.strip().lower()
_bad = {
"vielen dank fürs zuschauen.",
"vielen dank für's zuschauen.",
"thank you for watching.",
"thanks for watching.",
}
if _t in _bad:
text = ""
else:
used_fallback = True
except Exception:
pass
t_stripped = text.lstrip()
if t_stripped.startswith(WHISPER_PROMPT_PREFIX):
text = t_stripped[len(WHISPER_PROMPT_PREFIX):].lstrip(" :\t\r\n-")
text = text.replace("ß", "ss")
text = apply_medical_corrections(text, specialty)
text = apply_medical_post_corrections(text)
text = apply_medication_fuzzy_corrections(text)
duration_ms = int((time.perf_counter() - t0) * 1000)
dbg = None
if AZA_TRANSCRIBE_DEBUG and not text.strip():
try:
dbg = {
"file_name": getattr(file, "filename", "") or "",
"content_type": getattr(file, "content_type", "") or "",
"file_bytes": int(file_bytes),
"model": TRANSCRIBE_MODEL,
"is_gpt_transcribe": bool(is_gpt_transcribe),
"language": language,
"domain": dom,
"specialty": specialty,
"has_explicit_prompt": bool((prompt or "").strip()),
}
except Exception:
dbg = {"debug": "failed"}
if AZA_TRANSCRIBE_DEBUG:
if dbg is None:
dbg = {}
if isinstance(dbg, dict):
dbg["used_fallback_whisper1"] = bool(used_fallback)
print(f'TRANSCRIBE request_id={request_id} file="{fname}" bytes={file_bytes} ms={duration_ms} success=true')
model_used = "whisper-1" if used_fallback else TRANSCRIBE_MODEL
return JSONResponse(content={
"success": True,
"transcript": text,
"error": "",
"request_id": request_id,
"duration_ms": duration_ms,
"model": TRANSCRIBE_MODEL,
"debug": dbg if AZA_TRANSCRIBE_DEBUG else None,
"model_used": model_used,
})
except Exception as e:
duration_ms = int((time.perf_counter() - t0) * 1000)
print(f'TRANSCRIBE request_id={request_id} file="{fname}" bytes={file_bytes} ms={duration_ms} success=false error="{e}"')
return JSONResponse(content={
"success": False,
"transcript": "",
"error": str(e),
"request_id": request_id,
"duration_ms": duration_ms,
"model": TRANSCRIBE_MODEL,
})
finally:
if tmp_path:
try:
os.unlink(tmp_path)
except OSError:
pass
@app.post("/v1/chat", dependencies=[Depends(require_api_token)])
async def chat_proxy(request: Request, body: ChatRequest):
"""Proxy for OpenAI chat completions. The OpenAI key stays server-side only."""
ip = request.client.host if request.client else "unknown"
tok = request.headers.get("X-API-Token", "none")
default_ip_limiter.consume(f"ip:{ip}", cost=1.0)
default_token_limiter.consume(f"tok:{tok}", cost=1.0)
request_id = f"chat_{uuid.uuid4().hex[:12]}"
t0 = time.perf_counter()
model = body.model
if model not in ALLOWED_CHAT_MODELS:
return JSONResponse(
status_code=400,
content={
"success": False,
"error": f"Modell '{model}' nicht erlaubt. Erlaubt: {', '.join(sorted(ALLOWED_CHAT_MODELS))}",
"request_id": request_id,
},
)
if len(body.messages) > MAX_CHAT_MESSAGES:
return JSONResponse(
status_code=400,
content={
"success": False,
"error": f"Zu viele Nachrichten (max {MAX_CHAT_MESSAGES}).",
"request_id": request_id,
},
)
for msg in body.messages:
if len(msg.content) > MAX_CHAT_CONTENT_CHARS:
return JSONResponse(
status_code=400,
content={
"success": False,
"error": f"Nachricht zu lang (max {MAX_CHAT_CONTENT_CHARS} Zeichen).",
"request_id": request_id,
},
)
try:
client = _get_openai()
params: dict[str, Any] = {
"model": model,
"messages": [{"role": m.role, "content": m.content} for m in body.messages],
}
if body.temperature is not None:
params["temperature"] = body.temperature
if body.max_tokens is not None:
params["max_tokens"] = body.max_tokens
if body.top_p is not None:
params["top_p"] = body.top_p
resp = client.chat.completions.create(**params)
choice = resp.choices[0] if resp.choices else None
content = choice.message.content if choice and choice.message else ""
finish_reason = choice.finish_reason if choice else None
usage = None
if resp.usage:
usage = {
"prompt_tokens": resp.usage.prompt_tokens,
"completion_tokens": resp.usage.completion_tokens,
"total_tokens": resp.usage.total_tokens,
}
duration_ms = int((time.perf_counter() - t0) * 1000)
print(f"CHAT request_id={request_id} model={model} ms={duration_ms} success=true")
return JSONResponse(content={
"success": True,
"content": content or "",
"finish_reason": finish_reason,
"model": resp.model or model,
"usage": usage,
"request_id": request_id,
"duration_ms": duration_ms,
"error": "",
})
except HTTPException:
raise
except Exception as e:
duration_ms = int((time.perf_counter() - t0) * 1000)
err_msg = str(e)
for secret_prefix in ("sk-", "sk-proj-", "org-"):
if secret_prefix in err_msg:
err_msg = "OpenAI-Anfrage fehlgeschlagen (interner Serverfehler)."
break
print(f"CHAT request_id={request_id} model={model} ms={duration_ms} success=false")
return JSONResponse(
status_code=502,
content={
"success": False,
"content": "",
"error": err_msg,
"request_id": request_id,
"duration_ms": duration_ms,
"model": model,
},
)
if __name__ == "__main__":
import uvicorn
if not API_TOKEN:
sys.exit("FEHLER: ENV MEDWORK_API_TOKEN ist nicht gesetzt. Server wird nicht gestartet.")
logging.getLogger("uvicorn.access").addFilter(_UvicornAccessHealthFilter())
check_tls_or_exit()
ssl_kwargs = get_uvicorn_ssl_kwargs()
scheme = "https" if has_tls_config() else "http"
dbp = _stripe_db_path()
print(f"[LICENSE] STRIPE_DB_PATH={str(dbp)} exists={dbp.exists()}")
print(f"[LICENSE] active_count={_active_license_count()}")
if _has_any_active_license():
print("Lizenzmodus: VOLL")
else:
print("Lizenzmodus: DEMO")
print(f"API-Token aktiv ({len(API_TOKEN)} Zeichen)")
print(f"TLS: {'AKTIV' if has_tls_config() else 'DEAKTIVIERT'}")
print(f"Starte auf {scheme}://127.0.0.1:{PORT}")
print(f"Swagger: {scheme}://127.0.0.1:{PORT}/docs")
uvicorn.run(app, host="0.0.0.0", port=PORT, access_log=False, **ssl_kwargs)