# -*- coding: utf-8 -*- """ Kongress 2 – Live Congress Search (Provider Test / Fallback). Visual style matches the main "Kongresse & Weiterbildungen" window. Calls /api/events/live_google_test (DDG fallback). GUARANTEE: result area is NEVER empty — always shows items or diagnostics. """ from __future__ import annotations import json import os import re import webbrowser import tkinter as tk from tkinter import ttk from datetime import date, datetime from pathlib import Path from urllib.parse import urlparse, quote_plus _BASE_DIR = Path(__file__).resolve().parent _TOKEN_FILE = _BASE_DIR / "backend_token.txt" _API_BASE = "http://127.0.0.1:8000" _API_URL = _API_BASE + "/api/events/live_google_test" _DEVICE_ID = "pc-local" _MAX_LIMIT = 30 _BG = "#f7fafc" _HDR_BG = "#e3ecf4" _HDR_FG = "#1e4060" _CARD_BG = "#ffffff" _TEXT_FG = "#23404f" _LINK_FG = "#186aa5" _SEP_FG = "#dce6f0" _FONT = "Segoe UI" _MONTH_NAMES_DE = [ "", "Januar", "Februar", "März", "April", "Mai", "Juni", "Juli", "August", "September", "Oktober", "November", "Dezember", ] _ISO_RE = re.compile(r"\d{4}-\d{2}-\d{2}") def _read_token() -> str: tok = os.environ.get("MEDWORK_API_TOKEN", "").strip() if not tok: try: with open(str(_TOKEN_FILE), "r", encoding="utf-8") as f: tok = (f.readline() or "").strip() except Exception: tok = "" return tok or "" def _call_api(specialty: str, regions: str, from_d: str, to_d: str, limit: int) -> dict: from urllib.request import Request, urlopen from urllib.parse import urlencode tok = _read_token() if not tok: return {"ok": False, "error": "Kein Token (backend_token.txt fehlt)", "items": [], "diag": {"reason": "no token"}} params = urlencode({ "specialty": specialty, "regions": regions, "from": from_d, "to": to_d, "limit": min(max(limit, 1), _MAX_LIMIT), }) url = f"{_API_URL}?{params}" req = Request(url, headers={ "X-API-Token": tok, "X-Device-Id": _DEVICE_ID, }) try: with urlopen(req, timeout=60) as resp: body = resp.read().decode("utf-8", errors="replace") data = json.loads(body) data.setdefault("_request_url", url.split("?")[0]) data.setdefault("_resp_len", len(body)) return data except Exception as exc: return { "ok": False, "error": f"Backend-Fehler: {type(exc).__name__}: {exc}", "items": [], "diag": {"exception": str(exc), "url": url.split("?")[0]}, } def _parse_iso(s: str): if not s: return None try: return datetime.strptime(s[:10], "%Y-%m-%d").date() except Exception: return None def _format_date_eu(item: dict) -> str: d = item.get("date") or "" if d and _ISO_RE.match(d): p = d.split("-") if len(p) == 3: return f"{p[2]}.{p[1]}.{p[0]}" return "" def _month_label(d: date) -> str: return f"{_MONTH_NAMES_DE[d.month]} {d.year}" class Kongress2Window(tk.Toplevel): _TYPE_LABELS = { "congress": "Kongress", "symposium": "Symposium", "workshop": "Workshop", "course": "Kurs / CME", "webinar": "Webinar", } _FORMAT_LABELS = {"online": "Online", "hybrid": "Hybrid"} def __init__(self, parent): super().__init__(parent) self.title("Kongress 2 – Provider Test (Fallback)") self.configure(bg=_BG) self.minsize(620, 500) self._link_cnt = 0 try: sw = max(1200, int(parent.winfo_screenwidth())) sh = max(800, int(parent.winfo_screenheight())) w, h = max(640, int(sw * 0.36)), max(640, int(sh * 0.85)) self.geometry(f"{w}x{h}+30+50") except Exception: self.geometry("850x700") try: parent._register_window(self) except Exception: pass self._items: list[dict] = [] self._build_header() self._build_text_area() self._build_status_bar() # ── Header ──────────────────────────────────────────────────────────── def _build_header(self): hdr = tk.Frame(self, bg=_HDR_BG, padx=8, pady=5) hdr.pack(fill="x") tk.Label(hdr, text="Kongresse & Weiterbildungen (Live-Suche)", bg=_HDR_BG, fg=_HDR_FG, font=(_FONT, 10, "bold")).pack(side="left") right = tk.Frame(hdr, bg=_HDR_BG) right.pack(side="right") self._provider_lbl = tk.Label( right, text="Provider: –", bg=_HDR_BG, fg="#5a7a8c", font=(_FONT, 7)) self._provider_lbl.pack(side="left", padx=(0, 8)) form = tk.Frame(self, bg=_BG, padx=8, pady=4) form.pack(fill="x") fields = [ ("Specialty", "dermatology", 18), ("Regions", "EU", 8), ("From", "2026-01-01", 12), ("To", "2026-12-31", 12), ("Limit", "30", 5), ] self._entries: dict[str, tk.Entry] = {} for label, default, w in fields: tk.Label(form, text=label + ":", font=(_FONT, 8), bg=_BG, fg=_HDR_FG).pack(side="left", padx=(4, 1)) ent = tk.Entry(form, font=(_FONT, 9), width=w, bg="#ffffff", fg=_TEXT_FG, bd=1, relief="solid") ent.insert(0, default) ent.pack(side="left", padx=(0, 4)) self._entries[label.lower()] = ent ttk.Button(form, text="Suche starten", command=self._do_search).pack(side="left", padx=6) # ── Text area (same structure as CongressWindow) ────────────────────── def _build_text_area(self): f = tk.Frame(self, bg=_CARD_BG, bd=0) f.pack(fill="both", expand=True, padx=6, pady=(2, 4)) self._text = tk.Text( f, wrap="word", font=(_FONT, 9), bg=_CARD_BG, fg=_TEXT_FG, relief="flat", padx=10, pady=8, cursor="arrow", spacing1=1, spacing3=1, ) sb = ttk.Scrollbar(f, orient="vertical", command=self._text.yview) self._text.configure(yscrollcommand=sb.set) sb.pack(side="right", fill="y") self._text.pack(side="left", fill="both", expand=True) self._text.configure(state="disabled") self._configure_tags() def _configure_tags(self): w = self._text w.tag_configure("month_hdr", font=(_FONT, 10, "bold"), foreground="#0e3350", spacing1=10, spacing3=4, background="#e0ecf5") w.tag_configure("section", font=(_FONT, 9, "bold"), foreground="#2060a0", spacing1=6, spacing3=2) w.tag_configure("title", font=(_FONT, 9, "bold"), foreground="#0e3350") w.tag_configure("normal", font=(_FONT, 9), foreground="#2b4a5c") w.tag_configure("meta", font=(_FONT, 8), foreground="#5a7a8c") w.tag_configure("type_tag", font=(_FONT, 7, "bold"), foreground="#ffffff", background="#5090c0") w.tag_configure("loading", font=(_FONT, 9), foreground="#6a9ab0") w.tag_configure("warn", font=(_FONT, 9), foreground="#b07020") w.tag_configure("error", font=(_FONT, 9, "bold"), foreground="#c0392b") w.tag_configure("diag", font=("Consolas", 8), foreground="#888") w.tag_configure("sep", font=(_FONT, 2), foreground=_SEP_FG) # ── Status bar ──────────────────────────────────────────────────────── def _build_status_bar(self): self._status = tk.StringVar( value="Klick auf einen Eintrag oeffnet die Webseite") tk.Label(self, textvariable=self._status, bg=_HDR_BG, fg="#4a7a8c", font=(_FONT, 8), anchor="w", padx=8).pack(fill="x", side="bottom") # ── Render helpers (copied from CongressWindow pattern) ─────────────── def _open_url(self, url): try: webbrowser.open(url) except Exception: pass def _insert_link(self, url: str, label: str | None = None): self._link_cnt += 1 tag = f"cl_{self._link_cnt}" self._text.tag_configure(tag, font=(_FONT, 8, "underline"), foreground=_LINK_FG) self._text.tag_bind(tag, "", lambda _e, u=url: self._open_url(u)) self._text.tag_bind(tag, "", lambda _e: self._text.configure(cursor="hand2")) self._text.tag_bind(tag, "", lambda _e: self._text.configure(cursor="arrow")) self._text.tag_bind( tag, "", lambda e, u=url: self._show_link_menu(e, u)) self._text.insert("end", label or url, tag) def _show_link_menu(self, event, url: str): m = tk.Menu(self._text, tearoff=0, font=(_FONT, 9)) m.add_command(label="Link kopieren", command=lambda: self._copy(url)) m.add_command(label="Im Browser oeffnen", command=lambda: self._open_url(url)) try: m.tk_popup(event.x_root, event.y_root) finally: m.grab_release() def _copy(self, text): try: self.clipboard_clear() self.clipboard_append(text) except Exception: pass def _render_card(self, item: dict, idx: int): """Render one result card exactly like CongressWindow._render_card.""" w = self._text url = item.get("url", "") title = item.get("title", "Unbekannt") source = item.get("source", "") date_eu = _format_date_eu(item) date_iso = item.get("date") or "" parsed = _parse_iso(date_iso) if date_iso else None typ = item.get("type", "") type_label = self._TYPE_LABELS.get(typ, "") fmt = item.get("format", "") fmt_label = self._FORMAT_LABELS.get(fmt, "") w.insert("end", " ") if type_label: w.insert("end", f" {type_label} ", "type_tag") w.insert("end", " ") if url: self._link_cnt += 1 ttag = f"ct_{self._link_cnt}" w.tag_configure(ttag, font=(_FONT, 9, "bold"), foreground="#0e3350") w.tag_bind(ttag, "", lambda _e, u=url: self._open_url(u)) w.tag_bind(ttag, "", lambda _e: w.configure(cursor="hand2")) w.tag_bind(ttag, "", lambda _e: w.configure(cursor="arrow")) w.insert("end", title, ttag) else: w.insert("end", title, "title") w.insert("end", "\n") meta = [] if date_eu: meta.append(date_eu) elif date_iso: meta.append(date_iso) else: meta.append("Datum unbek.") if fmt_label: meta.append(fmt_label) if source: try: domain = urlparse(url).netloc if url else source except Exception: domain = source meta.append(domain) if meta: w.insert("end", f" {' · '.join(meta)}\n", "meta") if url: w.insert("end", " ") self._insert_link(url) w.insert("end", "\n\n") else: search_q = title google_url = "https://www.google.com/search?q=" + quote_plus(search_q) w.insert("end", " ") self._insert_link(google_url, "Im Google suchen") w.insert("end", "\n\n") def _render_results(self, items: list[dict]): """Render results grouped by month, like CongressWindow._render_events.""" current_month = "" for idx, item in enumerate(items): date_iso = item.get("date") or "" parsed = _parse_iso(date_iso) if parsed: ml = _month_label(parsed) if ml != current_month: current_month = ml self._text.insert("end", f" {ml}\n", "month_hdr") self._text.insert("end", "\n") elif current_month != "__unknown__": current_month = "__unknown__" self._text.insert("end", " Weiteres\n", "month_hdr") self._text.insert("end", "\n") self._render_card(item, idx) # ── Search + diagnostics ────────────────────────────────────────────── def _do_search(self): self._text.configure(state="normal") self._text.delete("1.0", "end") self._link_cnt = 0 self._items = [] self._text.insert("end", " Suche laeuft ...\n", "loading") self._text.configure(state="disabled") self._status.set("Suche laeuft (kann 10-30 Sek. dauern) ...") self.update_idletasks() specialty = self._entries["specialty"].get().strip() or "dermatology" regions = self._entries["regions"].get().strip() or "EU" from_d = self._entries["from"].get().strip() or "2026-01-01" to_d = self._entries["to"].get().strip() or "2026-12-31" try: limit = int(self._entries["limit"].get().strip()) except ValueError: limit = _MAX_LIMIT limit = min(max(limit, 1), _MAX_LIMIT) data: dict = {} try: data = _call_api(specialty, regions, from_d, to_d, limit) except Exception as exc: data = {"ok": False, "error": str(exc), "items": [], "diag": {"exception": str(exc)}} self._text.configure(state="normal") self._text.delete("1.0", "end") self._link_cnt = 0 provider = data.get("provider", "?") prov_display = { "ddg": "DuckDuckGo (Fallback)", "google": "Google CSE", "none": "keiner", }.get(provider, provider) self._provider_lbl.config(text=f"Provider: {prov_display}") if not data.get("ok", False): err = data.get("error", "Unbekannter Fehler") self._text.insert("end", f" FEHLER\n", "month_hdr") self._text.insert("end", "\n") self._text.insert("end", f" {err}\n\n", "error") self._show_diagnostics(data, specialty, regions, from_d, to_d, limit, prov_display) self._text.configure(state="disabled") self._status.set(f"Fehler – {err}") return items = data.get("items", []) if not items: self._text.insert("end", " 0 Ergebnisse\n", "month_hdr") self._text.insert("end", "\n") self._text.insert( "end", " Provider lieferte keine Treffer.\n" " Moeglicherweise blockt der Provider oder die Query\n" " liefert nichts Passendes.\n\n", "warn", ) self._show_diagnostics(data, specialty, regions, from_d, to_d, limit, prov_display) self._text.configure(state="disabled") self._status.set(f"0 Ergebnisse via {prov_display}") return items.sort(key=lambda x: ( (0, x["date"]) if x.get("date") and _ISO_RE.match(x["date"]) else (1, "") )) self._items = items shown = len(items) self._text.insert( "end", f" Live-Kongresse ({shown} Ergebnisse)\n", "month_hdr") self._text.insert("end", "\n") self._render_results(items) self._text.insert("end", "\n" + "─" * 60 + "\n", "sep") diag = data.get("diag", {}) if diag: self._text.insert( "end", f" Queries: {diag.get('queries_run', '?')} · " f"Raw: {diag.get('total_raw', '?')} · " f"Dedup: {diag.get('deduped', '?')} · " f"Gezeigt: {diag.get('returned', shown)}\n", "meta", ) self._text.configure(state="disabled") if shown >= limit: info = f"{shown} Ergebnisse via {prov_display}" else: info = f"{shown} gefunden via {prov_display}" self._status.set(info) def _show_diagnostics(self, data: dict, specialty: str, regions: str, from_d: str, to_d: str, limit: int, prov_display: str): """Append diagnostic block so the result area is never empty.""" w = self._text w.insert("end", " Diagnose\n", "section") w.insert("end", f" Provider: {prov_display}\n", "diag") w.insert("end", f" Specialty: {specialty}\n", "diag") w.insert("end", f" Regions: {regions}\n", "diag") w.insert("end", f" Zeitraum: {from_d} – {to_d}\n", "diag") w.insert("end", f" Limit: {limit}\n", "diag") diag = data.get("diag", {}) if diag: w.insert("end", "\n Backend-Diagnose:\n", "diag") for k, v in diag.items(): if k == "fetch_results": w.insert("end", f" Fetch-Details:\n", "diag") for fr in (v or [])[:8]: q_short = fr.get("q", "?")[:55] info = fr.get("info", "?") hl = fr.get("html_len", 0) w.insert("end", f" [{info}] len={hl} q={q_short}\n", "diag") else: w.insert("end", f" {k}: {v}\n", "diag") resp_len = data.get("_resp_len", "?") w.insert("end", f"\n Response-Laenge: {resp_len}\n", "diag") w.insert("end", "\n")