# -*- coding: utf-8 -*- """ Kongresse & Weiterbildungen – Suchfenster ========================================== SpecialtyService – FMH/SIWF Facharzttitel (online + cache + fallback) MajorCongressRegistry – kuratierte Kongresse aus data/major_congresses.json EventDatabase – persistente lokale Event-DB (cache/events_db.json) SeedCrawler – leichtgewichtiger HTML-Crawler für Seed-Quellen CongressSearchEngine – GPT-Websuche, JSON-Extraktion, Merge/Dedup CongressWindow – Tkinter-UI mit Karten-Layout cache/events_db.json schema: { "last_refresh": "YYYY-MM-DDTHH:MM:SS", // ISO UTC "events": [ { "title": str, "start_date": "YYYY-MM-DD" or "", "end_date": "YYYY-MM-DD" or "", "location": str, "format": "onsite|online|hybrid|unknown", "cme": str, "type": "congress|symposium|workshop|course|webinar|unknown", "description": str, "url": str, "specialties": [str], "source": "registry|seed|search" }, ... ] } """ from __future__ import annotations import json import os import re import ssl import threading import time import webbrowser from datetime import date, datetime, timedelta from html.parser import HTMLParser from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple from urllib.parse import urlparse, urljoin, parse_qs, urlencode, urlunparse import tkinter as tk from tkinter import ttk from aza_config import get_writable_data_dir _BUNDLE_DIR = Path(__file__).resolve().parent _BUNDLE_DATA = _BUNDLE_DIR / "data" _WRITABLE_DATA = Path(get_writable_data_dir()) / "data" _CACHE = Path(get_writable_data_dir()) / "cache" # ═══════════════════════════════════════════════════════════════════════════ # SpecialtyService # ═══════════════════════════════════════════════════════════════════════════ _SPEC_CACHE = _WRITABLE_DATA / "fmh_specialties_cache.json" _FALLBACK: List[str] = [ "Allergologie und klinische Immunologie", "Allgemeine Innere Medizin", "Anästhesiologie", "Angiologie", "Arbeitsmedizin", "Chirurgie", "Dermatologie und Venerologie", "Endokrinologie-Diabetologie", "Gastroenterologie", "Gefässchirurgie", "Gynäkologie und Geburtshilfe", "Hämatologie", "Handchirurgie", "Herz- und thorakale Gefässchirurgie", "Infektiologie", "Intensivmedizin", "Kardiologie", "Kinder- und Jugendmedizin", "Kinder- und Jugendpsychiatrie und -psychotherapie", "Kinderchirurgie", "Klinische Pharmakologie und Toxikologie", "Medizinische Genetik", "Medizinische Onkologie", "Mund-, Kiefer- und Gesichtschirurgie", "Nephrologie", "Neurochirurgie", "Neurologie", "Neuropathologie", "Nuklearmedizin", "Ophthalmologie", "Orthopädische Chirurgie und Traumatologie des Bewegungsapparates", "Oto-Rhino-Laryngologie", "Pathologie", "Pharmazeutische Medizin", "Physikalische Medizin und Rehabilitation", "Plastische, Rekonstruktive und Ästhetische Chirurgie", "Pneumologie", "Prävention und Gesundheitswesen", "Psychiatrie und Psychotherapie", "Radiologie", "Radio-Onkologie / Strahlentherapie", "Rechtsmedizin", "Rheumatologie", "Thoraxchirurgie", "Tropen- und Reisemedizin", "Urologie", "Viszeralchirurgie", ] class _FMHParser(HTMLParser): def __init__(self): super().__init__() self._in = False self._d = 0 self._buf = "" self.titles: List[str] = [] def handle_starttag(self, tag, attrs): if tag in ("li", "a"): self._in = True self._d += 1 def handle_endtag(self, tag): if tag in ("li", "a") and self._in: self._d -= 1 if self._d <= 0: self._in = False self._d = 0 t = self._buf.strip() if t and len(t) > 4 and not t.startswith("http"): self.titles.append(t) self._buf = "" def handle_data(self, data): if self._in: self._buf += data def _fetch_fmh() -> Optional[List[str]]: try: import urllib.request ctx = ssl.create_default_context() for url in [ "https://www.siwf.ch/weiterbildung/facharzttitel-und-schwerpunkte.cfm", "https://www.fmh.ch/bildung-siwf/fachgebiete/facharzttitel-und-schwerpunkte.html", ]: try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=12, context=ctx) as resp: html = resp.read().decode("utf-8", errors="replace") p = _FMHParser() p.feed(html) kw = ["medizin", "chirurgie", "logie", "iatrie", "pathie"] out = [t for t in p.titles if any(k in t.lower() for k in kw) or "Ophthalmologie" in t or "Anästhes" in t or "Rechtsmedizin" in t] if len(out) >= 15: return sorted(set(out)) except Exception: continue except Exception: pass return None class SpecialtyService: def __init__(self): self._titles: List[str] = [] self._load() def _load(self): if _SPEC_CACHE.is_file(): try: with open(_SPEC_CACHE, "r", encoding="utf-8") as f: d = json.load(f) if isinstance(d, list) and len(d) >= 10: self._titles = d return except Exception: pass self._titles = list(_FALLBACK) def _save(self, titles): try: _WRITABLE_DATA.mkdir(parents=True, exist_ok=True) with open(_SPEC_CACHE, "w", encoding="utf-8") as f: json.dump(titles, f, ensure_ascii=False, indent=2) except Exception: pass def get_titles(self) -> List[str]: return list(self._titles) if self._titles else list(_FALLBACK) def refresh(self, callback=None): def _j(): r = _fetch_fmh() if r and len(r) >= 15: self._titles = r self._save(r) if callback: callback(True, len(r)) elif callback: callback(False, 0) threading.Thread(target=_j, daemon=True).start() _spec_svc = SpecialtyService() # ═══════════════════════════════════════════════════════════════════════════ # MajorCongressRegistry # ═══════════════════════════════════════════════════════════════════════════ _CONGRESS_JSON = _BUNDLE_DATA / "major_congresses.json" class MajorCongressRegistry: _cache: Optional[Dict[str, List[Dict[str, str]]]] = None @classmethod def _load(cls) -> Dict[str, List[Dict[str, str]]]: if cls._cache is not None: return cls._cache if _CONGRESS_JSON.is_file(): try: with open(_CONGRESS_JSON, "r", encoding="utf-8") as f: cls._cache = json.load(f) return cls._cache except Exception: pass cls._cache = {} return cls._cache @classmethod def get_for(cls, selected: Set[str]) -> List[Dict[str, str]]: data = cls._load() out: List[Dict[str, str]] = [] seen: Set[str] = set() for spec in sorted(selected): for key, entries in data.items(): if key.lower() in spec.lower() or spec.lower() in key.lower(): for e in entries: nk = e["name"].lower() if nk not in seen: seen.add(nk) out.append({**e, "specialty": key}) return out @classmethod def names_for(cls, selected: Set[str]) -> List[str]: return [e["name"] for e in cls.get_for(selected)] @classmethod def as_db_events(cls, selected: Set[str]) -> List[Dict[str, Any]]: out = [] for e in cls.get_for(selected): out.append({ "title": e["name"], "start_date": "", "end_date": "", "location": "", "format": "unknown", "cme": "", "type": "congress", "description": e.get("org", ""), "url": e["url"], "specialties": [e.get("specialty", "")], "source": "registry", "_org": e.get("org", ""), }) return out # ═══════════════════════════════════════════════════════════════════════════ # Utility functions # ═══════════════════════════════════════════════════════════════════════════ _TRACKING_PARAMS = {"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content", "fbclid", "gclid", "mc_cid", "mc_eid"} def _canonical_url(raw: str) -> str: u = raw.strip().rstrip(".,;)>").lstrip("<") if u.startswith("www."): u = "https://" + u try: p = urlparse(u) qs = parse_qs(p.query, keep_blank_values=False) cleaned = {k: v for k, v in qs.items() if k.lower() not in _TRACKING_PARAMS} new_q = urlencode(cleaned, doseq=True) path = p.path.rstrip("/") or "/" return urlunparse((p.scheme.lower(), p.netloc.lower(), path, p.params, new_q, "")) except Exception: return u def _clean_url(raw: str) -> str: u = raw.strip().rstrip(".,;)>").lstrip("<") if u.startswith("www."): u = "https://" + u return u def _valid_url(u: str) -> bool: try: p = urlparse(u) return p.scheme in ("http", "https") and bool(p.netloc) except Exception: return False def _is_url_reachable(url: str) -> bool: if not url or not _valid_url(url): return False try: import urllib.request ctx = ssl.create_default_context() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", } req = urllib.request.Request(url, method="HEAD", headers=headers) with urllib.request.urlopen(req, timeout=7, context=ctx) as resp: return resp.status < 400 except Exception as e: err = str(e) if "405" in err or "403" in err: try: import urllib.request as _ur req2 = _ur.Request(url, headers=headers) with _ur.urlopen(req2, timeout=8, context=ctx) as resp2: resp2.read(1024) return resp2.status < 400 except Exception: return False return False def _validate_search_event_url(ev: Dict[str, Any]) -> None: url = ev.get("url", "") if not url: return # Robustere Link-Prüfung: wenn möglich finale Event-URL auflösen. try: from services.link_verify import verify_event_url # type: ignore v = verify_event_url(url, ev.get("title", "")) if v.finalUrl: ev["url"] = v.finalUrl url = v.finalUrl if (v.httpStatus and v.httpStatus >= 400) or v.httpStatus == 0: ev["url"] = "" return # Registry-Homepages sind oft nicht direkt event-spezifisch: # dann lieber Google-Suche statt potenziell falschem Link anzeigen. if ev.get("source") == "registry" and not bool(v.isDirectEventPage): ev["url"] = "" return except Exception: pass html = _fetch_page(url, timeout=8) if html is None: # Kein stabil erreichbarer Inhalt: Link nicht anzeigen. # Der Event bleibt sichtbar und bekommt automatisch den Google-Fallback. ev["url"] = "" return extracted = _extract_event_from_html(html, url) or {} if extracted.get("location") and not ev.get("location"): ev["location"] = extracted.get("location", "") if extracted.get("format") and ev.get("format", "unknown") == "unknown": ev["format"] = extracted.get("format", "unknown") extracted_sd = extracted.get("start_date", "") or "" extracted_score = int(extracted.get("_date_score", -999)) extracted_conf = extracted.get("_date_confidence", "low") if extracted_conf: ev["_date_confidence"] = extracted_conf sd = ev.get("start_date", "") if not sd: if extracted_sd and extracted_score >= 2: ev["start_date"] = extracted_sd if extracted.get("end_date"): ev["end_date"] = extracted.get("end_date", "") return if sd: page_dates = _DATE_RE.findall(html[:30000]) found_on_page = False ev_date = _parse_date(sd) if ev_date: for group in page_dates: for raw in group: if not raw: continue pd = _parse_date(raw) if pd and pd == ev_date: found_on_page = True break if pd and abs((pd - ev_date).days) <= 2: found_on_page = True break if found_on_page: break if not found_on_page: year_str = str(ev_date.year) month_found = False for mname, mnum in _MONTH_MAP.items(): if mnum == ev_date.month and mname in html[:30000].lower(): month_found = True break if not (year_str in html[:30000] and month_found): # Bestehendes Datum passt nicht zur Seite -> nur ersetzen, # wenn eine kontextstarke Alternative gefunden wurde. if extracted_sd and extracted_score >= 3: ev["start_date"] = extracted_sd ev["end_date"] = extracted.get("end_date", "") or "" else: # Unsichere Daten nicht weiter als faktisches Eventdatum zeigen. ev["start_date"] = "" ev["end_date"] = "" def _norm_title(t: str) -> str: return re.sub(r"[^a-z0-9]", "", t.lower()) _MONTH_MAP = { "january": 1, "jan": 1, "januar": 1, "february": 2, "feb": 2, "februar": 2, "march": 3, "mar": 3, "märz": 3, "april": 4, "apr": 4, "may": 5, "mai": 5, "june": 6, "jun": 6, "juni": 6, "july": 7, "jul": 7, "juli": 7, "august": 8, "aug": 8, "september": 9, "sep": 9, "sept": 9, "october": 10, "oct": 10, "oktober": 10, "november": 11, "nov": 11, "december": 12, "dec": 12, "dezember": 12, } def _parse_date(s: str) -> Optional[date]: if not s: return None s = s.strip() s = re.sub(r"(?<=\d)[\s]*(?:–|-|to|bis)[\s]*\d{1,2}\.?(?=\s+[A-Za-zÄÖÜäöü])", "", s).strip() for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y", "%B %d, %Y", "%B %d %Y", "%d %B %Y", "%b %d, %Y", "%b %d %Y", "%d %b %Y"): try: return datetime.strptime(s, fmt).date() except Exception: continue m = re.match(r"(\d{1,2})\.?\s+(\w+)\s+(\d{4})", s) if m: mn = _MONTH_MAP.get(m.group(2).lower()) if mn: try: return date(int(m.group(3)), mn, int(m.group(1))) except ValueError: pass m = re.match(r"(\w+)\s+(\d{1,2}),?\s+(\d{4})", s) if m: mn = _MONTH_MAP.get(m.group(1).lower()) if mn: try: return date(int(m.group(3)), mn, int(m.group(2))) except ValueError: pass m = re.match(r"(\w+)\s+(\d{4})", s) if m: mn = _MONTH_MAP.get(m.group(1).lower()) if mn: try: return date(int(m.group(2)), mn, 1) except ValueError: pass return None def _month_label(d: date) -> str: months = ["", "Januar", "Februar", "März", "April", "Mai", "Juni", "Juli", "August", "September", "Oktober", "November", "Dezember"] return f"{months[d.month]} {d.year}" # ═══════════════════════════════════════════════════════════════════════════ # EventDatabase # ═══════════════════════════════════════════════════════════════════════════ _DB_FILE = _CACHE / "events_db.json" class EventDatabase: def __init__(self): self._events: List[Dict[str, Any]] = [] self._last_refresh: Optional[str] = None self._lock = threading.Lock() self.load() def load(self) -> List[Dict[str, Any]]: with self._lock: if _DB_FILE.is_file(): try: with open(_DB_FILE, "r", encoding="utf-8") as f: data = json.load(f) self._events = data.get("events", []) self._last_refresh = data.get("last_refresh") except Exception: self._events = [] self._last_refresh = None return list(self._events) def save(self, events: Optional[List[Dict[str, Any]]] = None): with self._lock: if events is not None: self._events = events self._last_refresh = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") try: _CACHE.mkdir(parents=True, exist_ok=True) payload = { "last_refresh": self._last_refresh, "events": self._events, } with open(_DB_FILE, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=1) except Exception: pass def merge_in(self, new_events: List[Dict[str, Any]]): with self._lock: seen_keys: Set[str] = set() source_priority = {"registry": 0, "seed": 1, "search": 2} combined = list(self._events) + new_events by_key: Dict[str, Dict[str, Any]] = {} for ev in combined: url = ev.get("url", "") canon = _canonical_url(url) if url else "" nt = _norm_title(ev.get("title", "")) sd = ev.get("start_date", "") key = canon if canon else f"{nt}|{sd}" if not key: continue existing = by_key.get(key) if existing is None: by_key[key] = ev else: ep = source_priority.get(existing.get("source", "search"), 2) np = source_priority.get(ev.get("source", "search"), 2) if np < ep: by_key[key] = ev elif np == ep: if ev.get("start_date") and not existing.get("start_date"): by_key[key] = ev elif ev.get("location") and not existing.get("location"): for field in ("location", "start_date", "end_date", "cme", "format", "description"): if ev.get(field) and not existing.get(field): existing[field] = ev[field] self._events = list(by_key.values()) def query(self, specialties: Set[str], months: int, regions: List[str]) -> List[Dict[str, Any]]: today = date.today() end = today + timedelta(days=months * 30) out = [] for ev in self._events: ev_specs = set(ev.get("specialties", [])) if specialties and not ev_specs.intersection(specialties): overlap = False for s in specialties: for es in ev_specs: if s.lower() in es.lower() or es.lower() in s.lower(): overlap = True break if overlap: break if not overlap and ev_specs: continue sd = _parse_date(ev.get("start_date", "")) if sd and sd < today: continue if sd and sd > end: continue ev["_parsed_start"] = sd ev["_parsed_end"] = _parse_date(ev.get("end_date", "")) out.append(ev) def _sk(e): d = e.get("_parsed_start") return (0, d) if d else (1, date.max) out.sort(key=_sk) return out def needs_refresh(self, max_age_days: int = 7) -> bool: if not self._last_refresh: return True try: lr = datetime.strptime(self._last_refresh, "%Y-%m-%dT%H:%M:%S") return (datetime.utcnow() - lr).days >= max_age_days except Exception: return True def last_refresh_time(self) -> str: if not self._last_refresh: return "nie" try: lr = datetime.strptime(self._last_refresh, "%Y-%m-%dT%H:%M:%S") return lr.strftime("%d.%m.%Y %H:%M") except Exception: return self._last_refresh or "nie" def count(self) -> int: return len(self._events) _event_db = EventDatabase() # ═══════════════════════════════════════════════════════════════════════════ # SeedCrawler # ═══════════════════════════════════════════════════════════════════════════ _SEED_JSON = _BUNDLE_DATA / "seed_sources.json" _EVENT_KEYWORDS = re.compile( r"event|congress|meeting|conference|course|cme|" r"fortbildung|kongress|symposium|workshop|webinar|" r"annual|session|hands.on|masterclass", re.IGNORECASE, ) _MONTH_NAMES = ( r"January|February|March|April|May|June|July|August|September|October|November|December" r"|Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec" r"|Januar|Februar|März|Mai|Juni|Juli|August|Oktober|Dezember" ) _DATE_RE = re.compile( r"(\d{1,2}[./\-]\d{1,2}[./\-]\d{2,4})" r"|(\d{4}[./\-]\d{1,2}[./\-]\d{1,2})" rf"|((?:{_MONTH_NAMES})\s+\d{{1,2}},?\s+\d{{4}})" rf"|(\d{{1,2}}[\.\s]*(?:–|-|to|bis)\s*\d{{1,2}}\.?\s+(?:{_MONTH_NAMES})\s+\d{{4}})" rf"|(\d{{1,2}}\.?\s+(?:{_MONTH_NAMES})\s+\d{{4}})" rf"|((?:{_MONTH_NAMES})\s+\d{{4}})", re.IGNORECASE, ) _CME_RE = re.compile(r"(CME|EACCME|ECMEC|CPD|AMA\s*PRA|credit)[:\s]*(\d+)?", re.IGNORECASE) _ONLINE_RE = re.compile(r"\b(online|virtual|webinar|digital)\b", re.IGNORECASE) _HYBRID_RE = re.compile(r"\b(hybrid)\b", re.IGNORECASE) _LOC_RE = re.compile( r"(?:Location|Venue|Ort|Where|Place|Lieu|City|Tagungsort|Veranstaltungsort|" r"Kongressort|Convention\s*Center|Conference\s*Venue|Austragungsort)" r"[:\s]+([^\n<]{5,120})", re.IGNORECASE, ) _CITY_RE = re.compile( r"\b(Zürich|Zurich|Bern|Basel|Genf|Geneva|Genève|Lausanne|Luzern|Lugano|St\.\s*Gallen|" r"Wien|Vienna|Berlin|München|Munich|Hamburg|Frankfurt|Köln|Düsseldorf|Stuttgart|" r"Paris|Lyon|Marseille|London|Manchester|Birmingham|Edinburgh|" r"Amsterdam|Rotterdam|Den\s*Haag|Brüssel|Brussels|Bruxelles|" r"Rom|Rome|Roma|Mailand|Milan|Milano|Florenz|Florence|Bologna|" r"Madrid|Barcelona|Sevilla|Lissabon|Lisbon|Lisboa|" r"Prag|Prague|Praha|Budapest|Warschau|Warsaw|Warszawa|Krakau|Krakow|" r"Kopenhagen|Copenhagen|Stockholm|Oslo|Helsinki|" r"New\s*York|Chicago|Boston|San\s*Francisco|Los\s*Angeles|Philadelphia|" r"Washington|Houston|Dallas|Atlanta|Miami|Seattle|Denver|" r"Toronto|Montréal|Montreal|Vancouver|" r"Dubai|Abu\s*Dhabi|Singapur|Singapore|Tokio|Tokyo|Seoul|Sydney|Melbourne|" r"Kapstadt|Cape\s*Town|Johannesburg|São\s*Paulo|Buenos\s*Aires|" r"Athen|Athens|Istanbul|Bukarest|Bucharest|Dublin|Reykjavik|" r"Innsbruck|Graz|Salzburg|Davos|Interlaken|Montreux)\b", re.IGNORECASE, ) _DATE_POSITIVE_CTX = ( "congress", "kongress", "meeting", "conference", "symposium", "workshop", "course", "cme", "fortbildung", "weiterbildung", "annual", "jahreskongress", "event", "veranstaltung", ) _DATE_NEGATIVE_CTX = ( "deadline", "abstract", "submission", "registr", "early bird", "newsletter", "published", "posted", "last updated", "copyright", "sponsor", "exhibit", "accommodation", "hotel", "application", "anmeldung bis", "einsendeschluss", ) def _html_to_text(snippet: str) -> str: txt = re.sub(r"<[^>]+>", " ", snippet or "") txt = re.sub(r"\s+", " ", txt).strip() return txt def _title_tokens_for_date_match(title: str) -> List[str]: toks = [t for t in re.split(r"[^a-z0-9äöüß]+", (title or "").lower()) if len(t) >= 5] # Häufige Funktionswörter raus, damit echte Event-Begriffe stärker zählen. stop = {"congress", "kongress", "annual", "meeting", "conference", "society"} return [t for t in toks if t not in stop][:8] def _extract_title_year(title: str) -> Optional[int]: m = re.search(r"\b(20\d{2})\b", title or "") if not m: return None try: return int(m.group(1)) except Exception: return None def _score_date_candidate(raw_html: str, match_start: int, match_end: int, dt: date, title: str) -> int: left = max(0, match_start - 260) right = min(len(raw_html), match_end + 260) raw_ctx = raw_html[left:right] ctx = _html_to_text(raw_ctx).lower() score = 0 # "Groß geschrieben"/prominent approximieren: Datumsnennung in Heading/Time-Umfeld. if " Tuple[Optional[date], int]: best_date: Optional[date] = None best_score = -999 for m in _DATE_RE.finditer(raw_html[:120000]): raw = "" for g in m.groups(): if g: raw = g break if not raw: continue pd = _parse_date(raw) if not pd: continue # Extremwerte ignorieren if pd.year < 2020 or pd.year > (date.today().year + 5): continue sc = _score_date_candidate(raw_html, m.start(), m.end(), pd, title) if sc > best_score or (sc == best_score and best_date is not None and pd < best_date): best_score = sc best_date = pd return best_date, best_score class _LinkExtractor(HTMLParser): def __init__(self, base_url: str): super().__init__() self._base = base_url self.links: List[str] = [] self._in_title = False self._in_h = False self._title = "" self._h_text = "" self.page_title = "" self.headings: List[str] = [] def handle_starttag(self, tag, attrs): if tag == "a": for k, v in attrs: if k == "href" and v: full = urljoin(self._base, v) self.links.append(full) if tag == "title": self._in_title = True self._title = "" if tag in ("h1", "h2"): self._in_h = True self._h_text = "" def handle_endtag(self, tag): if tag == "title" and self._in_title: self._in_title = False self.page_title = self._title.strip() if tag in ("h1", "h2") and self._in_h: self._in_h = False t = self._h_text.strip() if t and len(t) > 3: self.headings.append(t) def handle_data(self, data): if self._in_title: self._title += data if self._in_h: self._h_text += data def _fetch_page(url: str, timeout: int = 11) -> Optional[str]: try: import urllib.request ctx = ssl.create_default_context() req = urllib.request.Request(url, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "text/html,application/xhtml+xml", }) with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: ct = resp.headers.get("Content-Type", "") if "html" not in ct.lower() and "text" not in ct.lower(): return None return resp.read(500_000).decode("utf-8", errors="replace") except Exception: return None _CSS_JS_RE = re.compile(r"<\s*(?:style|script)[^>]*>.*?", re.DOTALL | re.IGNORECASE) _CSS_FRAG_RE = re.compile(r"[{};]\s*\w[\w-]*\s*:\s*\w") def _is_css_junk(text: str) -> bool: return bool(_CSS_FRAG_RE.search(text)) or text.strip().startswith(".") def _extract_event_from_html(html: str, url: str) -> Optional[Dict[str, Any]]: html = _CSS_JS_RE.sub(" ", html) parser = _LinkExtractor(url) try: parser.feed(html) except Exception: pass title = parser.page_title if parser.headings: title = parser.headings[0] if not title or len(title) < 4: return None if _is_css_junk(title): title = parser.page_title or "" if not title or len(title) < 4 or _is_css_junk(title): return None title = re.sub(r"\s*[\|–—-]\s*$", "", title).strip() title = re.sub(r"\s+", " ", title) if len(title) > 150: title = title[:147] + "…" best_date, best_score = _pick_best_date_from_html(html, title) start_date = best_date.isoformat() if best_date and best_score >= 2 else "" cme = "" cme_evidence = "" cme_match = _CME_RE.search(html[:20000]) if cme_match: start = max(0, cme_match.start() - 60) end_ctx = min(len(html), cme_match.end() + 60) context = re.sub(r"<[^>]+>", " ", html[start:end_ctx]).strip() context = re.sub(r"\s+", " ", context)[:120] cme = cme_match.group(0).strip() cme_evidence = context fmt = "unknown" if _HYBRID_RE.search(html[:10000]): fmt = "hybrid" elif _ONLINE_RE.search(html[:10000]): fmt = "online" elif start_date: fmt = "onsite" loc = "" loc_m = _LOC_RE.search(html[:30000]) if loc_m: loc = loc_m.group(1).strip() loc = re.sub(r"<[^>]+>", "", loc).strip() loc = re.sub(r"\s+", " ", loc) if len(loc) > 80: loc = loc[:77] + "…" if not loc: text_chunk = re.sub(r"<[^>]+>", " ", html[:20000]) city_m = _CITY_RE.search(text_chunk) if city_m: loc = city_m.group(1).strip() return { "title": title, "start_date": start_date, "end_date": "", "location": loc, "format": fmt, "cme": cme, "cme_evidence": cme_evidence, "type": "unknown", "description": "", "url": url, "_date_score": best_score, "_date_confidence": "high" if best_score >= 6 else ("medium" if best_score >= 2 else "low"), } class SeedCrawler: @staticmethod def _load_seeds() -> Dict[str, List[Dict[str, str]]]: if _SEED_JSON.is_file(): try: with open(_SEED_JSON, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return {} @staticmethod def crawl_for_specialties(specialties: Set[str], progress_cb=None) -> List[Dict[str, Any]]: seeds = SeedCrawler._load_seeds() relevant_seeds: List[Tuple[str, Dict[str, str]]] = [] for spec in specialties: for key, entries in seeds.items(): if key.lower() in spec.lower() or spec.lower() in key.lower(): for e in entries: relevant_seeds.append((key, e)) events: List[Dict[str, Any]] = [] seen_urls: Set[str] = set() total = len(relevant_seeds) for idx, (specialty, seed) in enumerate(relevant_seeds): if progress_cb: progress_cb(idx + 1, total, seed.get("name", "")) seed_url = seed.get("url", "") if not seed_url or not _valid_url(seed_url): continue html = _fetch_page(seed_url) if not html: continue parser = _LinkExtractor(seed_url) try: parser.feed(html) except Exception: continue event_links = [] for link in parser.links: if _canonical_url(link) in seen_urls: continue if _EVENT_KEYWORDS.search(link): event_links.append(link) for link in event_links[:10]: canon = _canonical_url(link) if canon in seen_urls: continue seen_urls.add(canon) page_html = _fetch_page(link) if not page_html: continue ev = _extract_event_from_html(page_html, link) if ev and ev.get("title"): ev["specialties"] = [specialty] ev["source"] = "seed" events.append(ev) return events # ═══════════════════════════════════════════════════════════════════════════ # CongressSearchEngine # ═══════════════════════════════════════════════════════════════════════════ def _validate_cme_batch(events: List[Dict[str, Any]]) -> None: freq: Dict[str, int] = {} for ev in events: c = str(ev.get("cme", "")).strip() ev["cme"] = c if c: num = re.sub(r"\D", "", c) key = num if num else c freq[key] = freq.get(key, 0) + 1 threshold = max(2, int(len(events) * 0.15)) suspicious = {v for v, cnt in freq.items() if cnt >= threshold} for ev in events: c = ev.get("cme", "") evidence = ev.get("cme_evidence", "") if not c: continue num = re.sub(r"\D", "", c) key = num if num else c if key in suspicious: ev["cme"] = "" ev["cme_evidence"] = "" continue if not evidence: ev["cme"] = "" continue class CongressSearchEngine: @staticmethod def build_prompt(specs: List[str], months: int, regions: List[str], major_names: List[str]) -> Tuple[str, str]: spec_text = ", ".join(specs) today_s = date.today().isoformat() end_s = (date.today() + timedelta(days=months * 30)).isoformat() region_text = ", ".join(regions) if regions else "CH, EU" neighbor_hint = "" if "CH" in regions: neighbor_hint = ( "WICHTIG – Der Benutzer ist in der Schweiz. Du MUSST gezielt auch in diesen " "Nachbarländern und Städten nach Kongressen suchen:\n" " - DEUTSCHLAND: Berlin, München, Hamburg, Frankfurt, Düsseldorf, Köln, Stuttgart, Leipzig, Dresden\n" " - ÖSTERREICH: Wien, Salzburg, Innsbruck, Graz\n" " - FRANKREICH: Paris, Lyon, Strasbourg, Marseille, Nizza\n" " - ITALIEN: Mailand/Milano, Rom/Roma, Bologna, Florenz, Turin\n" " - SCHWEIZ: Zürich, Bern, Basel, Genf, Lausanne, Luzern, St. Gallen, Davos\n" "Suche aktiv nach Kongressen in diesen Städten! Mindestens 5 Events aus Nachbarländern.\n" ) major_block = "\n".join(f" - {n}" for n in major_names) if major_names else "(keine)" system = ( "Du bist ein weltweiter medizinischer Kongresskalender-Assistent. " "Du suchst im Internet und gibst Ergebnisse als JSON-Zeilen zurück. " "JEDE Zeile ist ein eigenständiges JSON-Objekt. KEIN Array-Wrapper. " "KEIN Markdown. KEIN erklärender Text. NUR JSON-Zeilen." ) user = ( f"Suche im Internet nach medizinischen Kongressen, Symposien, Workshops, " f"Masterclasses, CME-Kursen, Fortbildungen, Weiterbildungen, Hands-on-Trainings " f"und Webinaren WELTWEIT für: {spec_text}.\n" f"Zeitraum: {today_s} bis {end_s}.\n" f"Regionen: {region_text}.\n" f"{neighbor_hint}\n" f"PFLICHT – suche zwingend Datum, Ort und URL für:\n{major_block}\n\n" f"Ergänze weitere Events: Workshops, CME-Kurse, Masterclasses, Hands-on, " f"Fortbildungen, Webinare.\n" f"Suche auch auf DEUTSCH: \"Kongress {spec_text}\", \"Fortbildung {specs[0] if specs else ''}\", " f"\"Jahrestagung {specs[0] if specs else ''}\", \"Symposium {specs[0] if specs else ''}\".\n" f"Suche auch auf FRANZÖSISCH: \"congrès\", \"formation continue\".\n" f"Suche auch auf ITALIENISCH: \"congresso\", \"formazione\".\n\n" f"Antworte AUSSCHLIESSLICH mit JSON-Zeilen. Eine Zeile pro Event:\n" f'{{"title":"...","start_date":"YYYY-MM-DD","end_date":"YYYY-MM-DD",' f'"location":"...","format":"onsite|online|hybrid|unknown",' f'"cme":"...","cme_evidence":"...",' f'"type":"congress|symposium|workshop|course|webinar|unknown",' f'"description":"...","url":"https://...","specialties":["{specs[0] if specs else ""}"]}}\n\n' f"Regeln:\n" f"- URL: EXTREM WICHTIG – jede URL muss die ECHTE offizielle Event-Seite sein.\n" f" Öffne die URL gedanklich und prüfe, ob sie zum genannten Event passt.\n" f" KEINE erfundenen URLs! KEINE generischen Homepages wenn es eine Event-Unterseite gibt.\n" f" Wenn du dir bei einer URL nicht sicher bist: url=\"\" (lieber leer als falsch).\n" f"- DATUM + URL müssen zusammenpassen: Das Datum in start_date/end_date MUSS\n" f" das Datum sein, das auf der verlinkten Webseite steht.\n" f" NIEMALS ein Datum erfinden oder raten!\n" f"- URLs immer mit https://.\n" f"- CME/Credits: EXTREM WICHTIG – folge diesen Regeln strikt:\n" f" 1) Setze cme nur dann, wenn du auf der offiziellen Event-Webseite eine EXPLIZITE Angabe findest\n" f" wie \"CME: 12 Credits\", \"EACCME: 18\", \"AMA PRA Category 1: 25\", \"CPD: 8 points\".\n" f" 2) Kopiere den exakten Originaltext in cme_evidence (z.B. \"EACCME®: 18 European CME credits\").\n" f" 3) Wenn die Event-Seite KEINE explizite CME/Credit-Angabe enthält: cme=\"\" und cme_evidence=\"\".\n" f" 4) NIEMALS eine CME-Zahl schätzen, raten oder erfinden.\n" f" 5) Im Zweifelsfall: cme=\"\" und cme_evidence=\"\". Lieber weglassen als falsch.\n" f"- Wenn Format unbekannt: \"unknown\".\n" f"- DATUM: EXTREM WICHTIG – folge diesen Regeln strikt:\n" f" 1) Das Datum MUSS direkt von der offiziellen Event-Webseite stammen.\n" f" 2) Öffne die Event-URL und lies das Datum von dort ab.\n" f" 3) NIEMALS ein Datum schätzen, raten oder aus dem Gedächtnis nehmen!\n" f" 4) Wenn auf der Webseite z.B. \"August 21-23, 2026\" steht, dann start_date=\"2026-08-21\".\n" f" 5) Wenn du das Datum auf der Webseite NICHT findest: start_date=\"\" und end_date=\"\".\n" f" 6) Lieber kein Datum als ein falsches Datum!\n" f"- ORT: Suche IMMER gezielt nach dem Veranstaltungsort (Stadt, Land).\n" f" Gib location als \"Stadt, Land\" an (z.B. \"Wien, Österreich\", \"Barcelona, Spain\").\n" f" Bei Online-Events: \"Online\". Nur wenn unklar: leerer String.\n" f"- Mindestens 30 Events finden, davon mindestens 5 in CH/DE/AT/FR/IT.\n" f"- specialties: Array mit passenden Fachrichtungen aus: {spec_text}\n" f"- KEIN Markdown, KEIN Text ausserhalb der JSON-Zeilen." ) return system, user @staticmethod def parse_response(raw: str, fallback_specs: List[str] = None) -> List[Dict[str, Any]]: events: List[Dict[str, Any]] = [] for line in raw.split("\n"): line = line.strip() if not line or not line.startswith("{"): continue try: obj = json.loads(line) if isinstance(obj, dict) and "title" in obj: url = _clean_url(obj.get("url", "")) if not _valid_url(url): continue obj["url"] = url obj["start_date"] = obj.get("start_date", "") or "" obj["end_date"] = obj.get("end_date", "") or "" obj.setdefault("source", "search") obj.setdefault("cme_evidence", "") if not obj.get("specialties") and fallback_specs: obj["specialties"] = fallback_specs obj["_parsed_start"] = _parse_date(obj["start_date"]) obj["_parsed_end"] = _parse_date(obj["end_date"]) events.append(obj) except (json.JSONDecodeError, ValueError): continue return events @staticmethod def merge_all(registry_events: List[Dict[str, Any]], db_events: List[Dict[str, Any]], search_events: List[Dict[str, Any]]) -> List[Dict[str, Any]]: source_priority = {"registry": 0, "seed": 1, "search": 2} by_key: Dict[str, Dict[str, Any]] = {} for ev_list in [registry_events, db_events, search_events]: for ev in ev_list: url = ev.get("url", "") canon = _canonical_url(url) if url else "" nt = _norm_title(ev.get("title", "")) sd = ev.get("start_date", "") key = canon if canon else f"{nt}|{sd}" if not key: continue existing = by_key.get(key) if existing is None: by_key[key] = dict(ev) else: ep = source_priority.get(existing.get("source", "search"), 2) np = source_priority.get(ev.get("source", "search"), 2) if np < ep: by_key[key] = dict(ev) else: for field in ("start_date", "end_date", "location", "cme", "format", "description", "type"): if ev.get(field) and not existing.get(field): existing[field] = ev[field] merged = list(by_key.values()) for ev in merged: ev["_parsed_start"] = _parse_date(ev.get("start_date", "")) ev["_parsed_end"] = _parse_date(ev.get("end_date", "")) today = date.today() merged = [ev for ev in merged if not ev.get("_parsed_start") or ev["_parsed_start"] >= today] def _sk(e): d = e.get("_parsed_start") return (0, d) if d else (1, date.max) merged.sort(key=_sk) return merged # ═══════════════════════════════════════════════════════════════════════════ # CongressWindow # ═══════════════════════════════════════════════════════════════════════════ class CongressWindow: _BG = "#f7fafc" _HDR_BG = "#e3ecf4" _HDR_FG = "#1e4060" _CARD_BG = "#ffffff" _TEXT_FG = "#23404f" _LINK_FG = "#186aa5" _SEP_FG = "#dce6f0" _TYPE_LABELS = { "congress": "Kongress", "symposium": "Symposium", "workshop": "Workshop", "course": "Kurs / CME", "webinar": "Webinar", "unknown": "", } _FORMAT_LABELS = { "onsite": "", "online": "Online", "hybrid": "Hybrid", "unknown": "", } def __init__(self, parent, openai_client, autotext_data: dict, save_fn): self._parent = parent self._client = openai_client self._data = autotext_data self._save_fn = save_fn self._link_cnt = 0 self._font_size = 9 self._search_running = False self._search_start = 0.0 self._inline_msg = "" self._inline_var = tk.StringVar() existing = getattr(parent, "_kongress_window", None) if existing is not None: try: if existing.winfo_exists(): existing.deiconify() existing.lift() existing.focus_force() return except Exception: pass saved_specs = self._data.get("kongress_specialties") if isinstance(saved_specs, list) and saved_specs: self._selected: Set[str] = set(saved_specs) else: self._selected = {"Dermatologie und Venerologie"} self._months: int = self._data.get("kongress_months_ahead", 12) self._regions: List[str] = list( self._data.get("kongress_regions", ["CH", "EU", "Weltweit"]) ) win = tk.Toplevel(parent) self._win = win parent._kongress_window = win win.title("Kongresse & Weiterbildungen") win.configure(bg=self._BG) win.minsize(600, 520) try: sw = max(1200, int(parent.winfo_screenwidth())) sh = max(800, int(parent.winfo_screenheight())) w, h = max(620, int(sw * 0.36)), max(640, int(sh * 0.88)) win.geometry(f"{w}x{h}+8+40") except Exception: pass self._build_header() self._build_text_area() self._build_status_bar() self._run_pipeline() # ── Header ────────────────────────────────────────────────────────── def _build_header(self): hdr = tk.Frame(self._win, bg=self._HDR_BG, padx=8, pady=5) hdr.pack(fill="x") tk.Label(hdr, text="Kongresse & Weiterbildungen", bg=self._HDR_BG, fg=self._HDR_FG, font=("Segoe UI", 10, "bold")).pack(side="left") right = tk.Frame(hdr, bg=self._HDR_BG) right.pack(side="right") rf = tk.Frame(right, bg=self._HDR_BG) rf.pack(side="left", padx=(0, 6)) self._rv = {} for r in ["CH", "EU", "Weltweit"]: v = tk.BooleanVar(value=(r in self._regions)) self._rv[r] = v ttk.Checkbutton(rf, text=r, variable=v, command=self._on_regions).pack(side="left", padx=1) self._mv = tk.StringVar(value=f"{self._months} Monate") cb = ttk.Combobox(right, textvariable=self._mv, width=10, values=["6 Monate", "12 Monate", "24 Monate"], state="readonly") cb.pack(side="left", padx=(0, 4)) cb.bind("<>", self._on_months) ttk.Button(right, text="Fachrichtungen…", command=self._open_spec_dlg).pack(side="left", padx=2) ttk.Button(right, text="Suche starten", command=self._run_pipeline).pack(side="left", padx=2) ttk.Button(right, text="DB aktualisieren", command=self._force_refresh).pack(side="left", padx=2) self._db_label = tk.Label(right, text="", bg=self._HDR_BG, fg="#5a7a8c", font=("Segoe UI", 7)) self._db_label.pack(side="left", padx=(6, 0)) self._update_db_label() zf = tk.Frame(right, bg=self._HDR_BG) zf.pack(side="left", padx=(8, 0)) tk.Label(zf, text="▲", bg=self._HDR_BG, fg="#3060a0", font=("Segoe UI", 7, "bold"), cursor="hand2").pack(side="top", pady=0) tk.Label(zf, text="▼", bg=self._HDR_BG, fg="#3060a0", font=("Segoe UI", 7, "bold"), cursor="hand2").pack(side="top", pady=0) for child in zf.winfo_children(): if child.cget("text") == "▲": child.bind("", lambda e: self._change_font_size(1)) else: child.bind("", lambda e: self._change_font_size(-1)) def _update_db_label(self): t = _event_db.last_refresh_time() n = _event_db.count() self._db_label.configure(text=f"DB: {t} · {n} Events") def _on_months(self, _e=None): try: self._months = int(self._mv.get().split()[0]) except Exception: self._months = 12 self._data["kongress_months_ahead"] = self._months self._persist() def _on_regions(self): self._regions = [r for r, v in self._rv.items() if v.get()] if not self._regions: self._regions = ["CH", "EU", "Weltweit"] for v in self._rv.values(): v.set(True) self._data["kongress_regions"] = self._regions self._persist() # ── Specialty dialog ──────────────────────────────────────────────── def _open_spec_dlg(self): dlg = tk.Toplevel(self._win) dlg.title("Fachrichtungen auswählen") dlg.configure(bg="#f7fafc") dlg.geometry("380x580") dlg.transient(self._win) dlg.grab_set() self._center_on_screen(dlg, 380, 580) top = tk.Frame(dlg, bg="#f7fafc") top.pack(fill="x", padx=8, pady=(8, 4)) tk.Label(top, text="FMH/SIWF Facharzttitel", bg="#f7fafc", fg="#1e4060", font=("Segoe UI", 9, "bold")).pack(side="left") sv = tk.StringVar() ttk.Entry(top, textvariable=sv, width=22).pack(side="right", padx=(6, 0)) tk.Label(top, text="Filter:", bg="#f7fafc", fg="#555", font=("Segoe UI", 8)).pack(side="right") lf = tk.Frame(dlg, bg="#fff") lf.pack(fill="both", expand=True, padx=8, pady=4) cvs = tk.Canvas(lf, bg="#fff", highlightthickness=0) sb = ttk.Scrollbar(lf, orient="vertical", command=cvs.yview) inner = tk.Frame(cvs, bg="#fff") inner.bind("", lambda e: cvs.configure(scrollregion=cvs.bbox("all"))) cvs.create_window((0, 0), window=inner, anchor="nw") cvs.configure(yscrollcommand=sb.set) sb.pack(side="right", fill="y") cvs.pack(side="left", fill="both", expand=True) titles = _spec_svc.get_titles() cvar: dict[str, tk.BooleanVar] = {} cwid: list[tuple[str, ttk.Checkbutton]] = [] for t in titles: v = tk.BooleanVar(value=(t in self._selected)) cvar[t] = v c = ttk.Checkbutton(inner, text=t, variable=v) c.pack(anchor="w", padx=6) cwid.append((t, c)) def _filt(*_): q = sv.get().lower().strip() for t, c in cwid: if q and q not in t.lower(): c.pack_forget() else: c.pack(anchor="w", padx=6) sv.trace_add("write", _filt) def _mw(e): cvs.yview_scroll(int(-1 * (e.delta / 120)), "units") cvs.bind_all("", _mw) bf = tk.Frame(dlg, bg="#f7fafc") bf.pack(fill="x", padx=8, pady=6) ttk.Button(bf, text="Alle", command=lambda: [v.set(True) for v in cvar.values()]).pack(side="left", padx=2) ttk.Button(bf, text="Keine", command=lambda: [v.set(False) for v in cvar.values()]).pack(side="left", padx=2) rl = tk.Label(bf, text="", bg="#f7fafc", fg="#555", font=("Segoe UI", 8)) rl.pack(side="left", padx=8) def _rd(ok, n): self._win.after(0, lambda: rl.configure( text=f"{n} Titel geladen" if ok else "Fehlgeschlagen")) ttk.Button(bf, text="Liste aktualisieren", command=lambda: (rl.configure(text="Lade…"), _spec_svc.refresh(callback=_rd))).pack(side="left", padx=2) def _apply(): cvs.unbind_all("") ch = {k for k, v in cvar.items() if v.get()} if not ch: self._show_select_specialty_hint(dlg) return self._selected = ch self._data["kongress_specialties"] = sorted(self._selected) self._persist() dlg.destroy() def _apply_search(): _apply() self._run_pipeline() def _close(): cvs.unbind_all("") dlg.destroy() ttk.Button(bf, text="Übernehmen", command=_apply).pack(side="right", padx=2) ttk.Button(bf, text="Neue Suche starten", command=_apply_search).pack(side="right", padx=2) dlg.protocol("WM_DELETE_WINDOW", _close) def _center_on_screen(self, win: tk.Toplevel, width: int, height: int): try: sw = max(800, int(win.winfo_screenwidth())) sh = max(600, int(win.winfo_screenheight())) x = max(0, (sw - width) // 2) y = max(0, (sh - height) // 2) win.geometry(f"{width}x{height}+{x}+{y}") except Exception: pass def _show_select_specialty_hint(self, parent: tk.Toplevel): hint = tk.Toplevel(parent) hint.title("Fachrichtung wählen") hint.transient(parent) hint.grab_set() hint.configure(bg="#f7fafc") self._center_on_screen(hint, 420, 150) body = tk.Frame(hint, bg="#f7fafc", padx=16, pady=14) body.pack(fill="both", expand=True) tk.Label( body, text="Bitte mindestens eine Fachrichtung auswählen,\n" "damit die Kongresssuche gezielt bleibt und keine unnötigen Tokens verbraucht.", bg="#f7fafc", fg="#1e4060", justify="left", anchor="w", font=("Segoe UI", 9), ).pack(fill="x", pady=(0, 12)) ttk.Button(body, text="OK", command=hint.destroy).pack(anchor="e") # ── Text area ─────────────────────────────────────────────────────── def _build_text_area(self): f = tk.Frame(self._win, bg=self._CARD_BG, bd=0) f.pack(fill="both", expand=True, padx=6, pady=(2, 4)) self._major_wrap = tk.Frame(f, bg=self._CARD_BG, bd=0) self._major_wrap.pack(fill="x", side="top", pady=(0, 4)) self._major_wrap.pack_propagate(False) self._major_text = tk.Text( self._major_wrap, wrap="word", font=("Segoe UI", 9), bg=self._CARD_BG, fg=self._TEXT_FG, relief="flat", padx=10, pady=8, cursor="arrow", spacing1=1, spacing3=1 ) sb_major = ttk.Scrollbar(self._major_wrap, orient="vertical", command=self._major_text.yview) self._major_text.configure(yscrollcommand=sb_major.set) sb_major.pack(side="right", fill="y") self._major_text.pack(side="left", fill="both", expand=True) self._major_text.configure(state="disabled") self._main_wrap = tk.Frame(f, bg=self._CARD_BG, bd=0) self._main_wrap.pack(fill="both", expand=True, side="top") self._text = tk.Text( self._main_wrap, wrap="word", font=("Segoe UI", 9), bg=self._CARD_BG, fg=self._TEXT_FG, relief="flat", padx=10, pady=8, cursor="arrow", spacing1=1, spacing3=1 ) sb = ttk.Scrollbar(self._main_wrap, orient="vertical", command=self._text.yview) self._text.configure(yscrollcommand=sb.set) sb.pack(side="right", fill="y") self._text.pack(side="left", fill="both", expand=True) self._text.configure(state="disabled") self._configure_text_tags(self._major_text) self._configure_text_tags(self._text) self._update_major_panel_height() self._win.bind("", self._on_window_resize_for_major_panel, add="+") def _configure_text_tags(self, widget: tk.Text): widget.tag_configure("month_hdr", font=("Segoe UI", 10, "bold"), foreground="#0e3350", spacing1=10, spacing3=4, background="#e0ecf5") widget.tag_configure("section", font=("Segoe UI", 9, "bold"), foreground="#2060a0", spacing1=6, spacing3=2) widget.tag_configure("title", font=("Segoe UI", 9, "bold"), foreground="#0e3350") widget.tag_configure("normal", font=("Segoe UI", 9), foreground="#2b4a5c") widget.tag_configure("meta", font=("Segoe UI", 8), foreground="#5a7a8c") widget.tag_configure("type_tag", font=("Segoe UI", 7, "bold"), foreground="#ffffff", background="#5090c0") widget.tag_configure("loading", font=("Segoe UI", 9), foreground="#6a9ab0") widget.tag_configure("warn", font=("Segoe UI", 9), foreground="#b07020") widget.tag_configure("sep", font=("Segoe UI", 2), foreground=self._SEP_FG) def _on_window_resize_for_major_panel(self, _event=None): self._update_major_panel_height() def _update_major_panel_height(self): try: h = int(self._win.winfo_height()) max_h = max(180, min(520, h // 2)) self._major_wrap.configure(height=max_h) except Exception: pass def _build_status_bar(self): self._status = tk.StringVar(value="") tk.Label(self._win, textvariable=self._status, bg=self._HDR_BG, fg="#4a7a8c", font=("Segoe UI", 8), anchor="w", padx=8).pack(fill="x", side="bottom") def _change_font_size(self, delta: int): new = max(6, min(18, self._font_size + delta)) if new == self._font_size: return self._font_size = new self._apply_font_tags() def _apply_font_tags(self): s = self._font_size for widget in (self._major_text, self._text): widget.configure(font=("Segoe UI", s)) widget.tag_configure("month_hdr", font=("Segoe UI", s + 1, "bold")) widget.tag_configure("section", font=("Segoe UI", s, "bold")) widget.tag_configure("title", font=("Segoe UI", s, "bold")) widget.tag_configure("normal", font=("Segoe UI", s)) widget.tag_configure("meta", font=("Segoe UI", s - 1)) widget.tag_configure("type_tag", font=("Segoe UI", s - 2, "bold")) widget.tag_configure("loading", font=("Segoe UI", s)) widget.tag_configure("warn", font=("Segoe UI", s)) for i in range(1, self._link_cnt + 1): widget.tag_configure(f"cl_{i}", font=("Segoe UI", s - 1, "underline")) # ── Render helpers ────────────────────────────────────────────────── def _open_url(self, url): try: webbrowser.open(url) except Exception: pass def _insert_link(self, text_widget: tk.Text, url: str, label: Optional[str] = None): self._link_cnt += 1 tag = f"cl_{self._link_cnt}" text_widget.tag_configure(tag, font=("Segoe UI", 8, "underline"), foreground=self._LINK_FG) text_widget.tag_bind(tag, "", lambda e, u=url: self._open_url(u)) text_widget.tag_bind(tag, "", lambda e, w=text_widget: w.configure(cursor="hand2")) text_widget.tag_bind(tag, "", lambda e, w=text_widget: w.configure(cursor="arrow")) text_widget.tag_bind(tag, "", lambda e, u=url, w=text_widget: self._show_link_menu(w, e, u)) text_widget.insert("end", label or url, tag) def _show_link_menu(self, text_widget: tk.Text, event, url: str): m = tk.Menu(text_widget, tearoff=0, font=("Segoe UI", 9)) m.add_command(label="Link kopieren", command=lambda: self._copy(url)) m.add_command(label="Im Browser öffnen", command=lambda: self._open_url(url)) try: m.tk_popup(event.x_root, event.y_root) finally: m.grab_release() def _insert_btn(self, text_widget: tk.Text, label, bg, fg, cmd): btn = tk.Label(text_widget, text=f" {label} ", bg=bg, fg=fg, font=("Segoe UI", 7, "bold"), cursor="hand2", relief="flat", padx=3) btn.bind("", lambda e: cmd()) text_widget.window_create("end", window=btn, padx=2) def _copy(self, text): try: self._win.clipboard_clear() self._win.clipboard_append(text) except Exception: pass def _format_date_range(self, ev: Dict) -> str: sd = ev.get("_parsed_start") ed = ev.get("_parsed_end") if sd and ed and sd != ed: return f"{sd.strftime('%d.%m.%Y')} – {ed.strftime('%d.%m.%Y')}" if sd: return sd.strftime("%d.%m.%Y") raw_s = ev.get("start_date", "") raw_e = ev.get("end_date", "") if raw_s: return f"{raw_s} – {raw_e}" if raw_e and raw_e != raw_s else raw_s return "" def _render_card(self, text_widget: tk.Text, ev: Dict): url = ev.get("url", "") title = ev.get("title", "Unbekannt") typ = ev.get("type", "unknown") fmt = ev.get("format", "unknown") cme = ev.get("cme", "") loc = ev.get("location", "") desc = ev.get("description", "") org = ev.get("_org", "") if _is_css_junk(loc): loc = "" if _is_css_junk(desc): desc = "" if _is_css_junk(org): org = "" if _is_css_junk(title): title = "Unbekannt" type_label = self._TYPE_LABELS.get(typ, "") fmt_label = self._FORMAT_LABELS.get(fmt, "") date_str = self._format_date_range(ev) text_widget.insert("end", " ") if type_label: text_widget.insert("end", f" {type_label} ", "type_tag") text_widget.insert("end", " ") text_widget.insert("end", title, "title") text_widget.insert("end", "\n") meta = [] if date_str: conf = str(ev.get("_date_confidence", "") or "").lower().strip() if conf == "high": date_note = "Datum: verifiziert" elif conf == "medium": date_note = "Datum: bitte prüfen" else: date_note = "Datum: unsicher" meta.append(f"{date_str} · {date_note}") if loc: meta.append(loc) if fmt_label: meta.append(fmt_label) if cme: meta.append(f"CME: {cme}") if meta: text_widget.insert("end", f" {' · '.join(meta)}\n", "meta") cme_ev = ev.get("cme_evidence", "") if cme_ev and cme: text_widget.insert("end", f" CME Hinweis: {cme_ev}\n", "meta") if org: text_widget.insert("end", f" {org}\n", "meta") if desc: text_widget.insert("end", f" {desc}\n", "normal") if url: text_widget.insert("end", " ") self._insert_link(text_widget, url) text_widget.insert("end", "\n\n") else: search_q = title if loc: search_q += f" {loc}" if date_str: search_q += f" {date_str}" from urllib.parse import quote_plus as _qp google_url = "https://www.google.com/search?q=" + _qp(search_q) text_widget.insert("end", " ") self._insert_btn(text_widget, "Im Google suchen", "#f0e8d8", "#6a5020", lambda u=google_url: self._open_url(u)) text_widget.insert("end", "\n\n") def _render_events(self, text_widget: tk.Text, events: List[Dict]): current_month = "" for ev in events: sd = ev.get("_parsed_start") if sd: ml = _month_label(sd) if ml != current_month: current_month = ml text_widget.insert("end", f" {ml}\n", "month_hdr") text_widget.insert("end", "\n") elif current_month != "__unknown__": current_month = "__unknown__" text_widget.insert("end", " Weiteres\n", "month_hdr") text_widget.insert("end", "\n") self._render_card(text_widget, ev) # ── Pipeline ──────────────────────────────────────────────────────── def _persist(self): try: self._save_fn(self._data) except Exception: pass def _force_refresh(self): self._run_pipeline(force_refresh=True) def _query_events_with_fallback( self, specialties: Set[str], months: int, regions: List[str] ) -> tuple[List[Dict[str, Any]], str]: primary = _event_db.query(specialties, months, regions) if len(primary) >= 20: return primary, "" relaxed_specs = _event_db.query(set(), months, regions) if len(relaxed_specs) > len(primary): return relaxed_specs, "Erweiterte Anzeige: alle Fachrichtungen." relaxed_months = max(24, int(months)) relaxed_all = _event_db.query(set(), relaxed_months, ["CH", "EU", "Weltweit"]) if len(relaxed_all) > len(primary): return relaxed_all, "Erweiterte Anzeige: alle Fachrichtungen, Regionen und 24 Monate." return primary, "" def _set_inline_status(self, text: str): self._inline_msg = text elapsed = int(time.time() - getattr(self, "_search_start", time.time())) self._inline_var.set(f" {text} ({elapsed} Sek.)") def _tick_inline_timer(self): if not getattr(self, "_search_running", False): return elapsed = int(time.time() - self._search_start) msg = getattr(self, "_inline_msg", "") full = f" {msg} ({elapsed} Sek.)" if msg else f" ({elapsed} Sek.)" self._inline_var.set(full) self._status.set(f"{msg} ({elapsed} Sek.)" if msg else f"({elapsed} Sek.)") self._win.after(1000, self._tick_inline_timer) def _run_pipeline(self, force_refresh: bool = False): self._major_text.configure(state="normal") self._major_text.delete("1.0", "end") self._text.configure(state="normal") self._text.delete("1.0", "end") self._link_cnt = 0 registry_events = MajorCongressRegistry.as_db_events(self._selected) if registry_events: self._major_text.insert("end", " Wichtige Hauptkongresse\n", "month_hdr") self._major_text.insert("end", "\n") cur_spec = "" for ev in registry_events: specs = ev.get("specialties", []) sp = specs[0] if specs else "" if sp != cur_spec: cur_spec = sp self._major_text.insert("end", f" {sp}\n", "section") self._render_card(self._major_text, ev) self._major_text.insert("end", "─" * 50 + "\n", "sep") self._major_text.insert("end", "\n") self._major_wrap.pack(fill="x", side="top", pady=(0, 4)) else: self._major_wrap.pack_forget() cached, fallback_note = self._query_events_with_fallback( self._selected, self._months, self._regions ) _validate_cme_batch(cached) if cached: self._text.insert("end", f" Gespeicherte Events ({len(cached)})\n", "month_hdr") self._text.insert("end", "\n") self._render_events(self._text, cached) self._text.insert("end", "─" * 50 + "\n", "sep") self._text.insert("end", "\n") if fallback_note: self._text.insert("end", f" {fallback_note}\n", "meta") self._text.insert("end", "\n") do_refresh = force_refresh or _event_db.needs_refresh() if do_refresh: self._text.insert("end", " Aktualisiere Datenbank …\n", "loading") self._inline_var = tk.StringVar(value=" ") lbl = tk.Label(self._text, textvariable=self._inline_var, bg=self._CARD_BG, fg="#6a9ab0", font=("Segoe UI", 9), anchor="w") self._text.window_create("end", window=lbl) self._text.insert("end", "\n") else: self._text.insert("end", f" Datenbank aktuell (Stand: {_event_db.last_refresh_time()})\n", "meta") self._major_text.configure(state="disabled") self._text.configure(state="disabled") self._update_db_label() specs = sorted(self._selected) if self._selected else ["Dermatologie und Venerologie"] self._status.set(f"{'Aktualisiere' if do_refresh else 'Geladen'} · {', '.join(specs)}") if not do_refresh: return self._search_start = time.time() self._search_running = True self._tick_inline_timer() def _bg_job(): all_new: List[Dict[str, Any]] = [] def _update_inline(txt): self._win.after(0, lambda: self._set_inline_status(txt)) def _progress(i, t, name): msg = f"Seed-Crawling {i}/{t}: {name}" self._win.after(0, lambda: self._status.set(msg)) _update_inline(msg) try: seed_events = SeedCrawler.crawl_for_specialties( self._selected, progress_cb=_progress) all_new.extend(seed_events) msg = f"Seed: {len(seed_events)} Events · starte Websuche …" self._win.after(0, lambda: self._status.set(msg)) _update_inline(msg) except Exception: pass try: if self._client: major_names = MajorCongressRegistry.names_for(self._selected) system, user = CongressSearchEngine.build_prompt( specs, self._months, self._regions, major_names) model = os.getenv( "CONGRESS_SEARCH_MODEL", os.getenv("NEWS_SEARCH_MODEL", "gpt-4o-mini-search-preview") ).strip() resp = self._client.chat.completions.create( model=model, messages=[ {"role": "system", "content": system}, {"role": "user", "content": user}, ], ) raw = (resp.choices[0].message.content or "").strip() search_events = CongressSearchEngine.parse_response(raw, specs) all_new.extend(search_events) except Exception: pass _validate_cme_batch(all_new) _update_inline("Link-Prüfung …") checkable = [e for e in all_new if e.get("source") != "registry" and e.get("url")] check_count = 0 for ev in checkable: if check_count >= 180: break check_count += 1 msg = f"Link-Prüfung {check_count}/{min(len(checkable), 180)} …" self._win.after(0, lambda m=msg: self._status.set(m)) _update_inline(msg) _validate_search_event_url(ev) _event_db.merge_in(all_new) _event_db.save() self._search_running = False final_cached, _ = self._query_events_with_fallback( self._selected, self._months, self._regions ) _validate_cme_batch(final_cached) final = CongressSearchEngine.merge_all(registry_events, final_cached, []) # Vor Anzeige Links nach Möglichkeit verifizieren: # keine Events entfernen, nur fehlerhafte Links auf Google-Fallback umstellen. registry_first = [e for e in final if e.get("source") == "registry" and e.get("url")] others = [e for e in final if e.get("source") != "registry" and e.get("url")] checked = 0 for ev in registry_first + others: if checked >= 220: break checked += 1 _validate_search_event_url(ev) self._win.after(0, lambda: self._show_final(final)) threading.Thread(target=_bg_job, daemon=True).start() def _show_final(self, events: List[Dict]): self._major_text.configure(state="normal") self._major_text.delete("1.0", "end") self._text.configure(state="normal") self._text.delete("1.0", "end") self._link_cnt = 0 self._update_db_label() registry_events = MajorCongressRegistry.as_db_events(self._selected) if registry_events: self._major_text.insert("end", " Wichtige Hauptkongresse\n", "month_hdr") self._major_text.insert("end", "\n") cur_spec = "" for ev in registry_events: specs = ev.get("specialties", []) sp = specs[0] if specs else "" if sp != cur_spec: cur_spec = sp self._major_text.insert("end", f" {sp}\n", "section") self._render_card(self._major_text, ev) self._major_text.insert("end", "─" * 50 + "\n", "sep") self._major_text.insert("end", "\n") self._major_wrap.pack(fill="x", side="top", pady=(0, 4)) else: self._major_wrap.pack_forget() non_registry = [e for e in events if e.get("source") != "registry"] if not non_registry and not registry_events: self._text.insert("end", " Keine Events gefunden.\n", "warn") self._major_text.configure(state="disabled") self._text.configure(state="disabled") self._status.set("Keine Events") return if non_registry: self._text.insert("end", f" Kongresse & Weiterbildungen ({len(non_registry)})\n", "month_hdr") self._text.insert("end", "\n") self._render_events(self._text, non_registry) self._major_text.configure(state="disabled") self._text.configure(state="disabled") total = len(registry_events) + len(non_registry) self._status.set( f"Fertig · {total} Events · DB: {_event_db.last_refresh_time()}")