"""Aggregator for open-access medical news and congress events.""" from __future__ import annotations import json import re import threading import urllib.parse import urllib.request import xml.etree.ElementTree as ET from dataclasses import asdict, dataclass, field from datetime import date, datetime, timedelta, timezone from email.utils import parsedate_to_datetime from pathlib import Path from typing import Any NEWS_CACHE_TTL_SECONDS = 30 * 60 EVENTS_CACHE_TTL_SECONDS = 45 * 60 EVENT_URL_CHECK_TTL_SECONDS = 12 * 60 * 60 @dataclass(frozen=True) class NewsFilter: specialties: list[str] regions: list[str] language: str = "de" sort: str = "newest" limit: int = 30 @dataclass(frozen=True) class EventFilter: specialties: list[str] regions: list[str] from_date: date to_date: date sort: str = "soonest" limit: int = 100 @dataclass(frozen=True) class NewsItem: id: str source: str title: str url: str publishedAt: str tags: list[str] languageOriginal: str isOpenAccess: bool evidenceType: str summaryOriginal: str summaryTranslated: str | None = None regions: list[str] = field(default_factory=list) @dataclass(frozen=True) class EventItem: id: str name: str startDate: str endDate: str city: str country: str regions: list[str] tags: list[str] url: str description: str type: str cmeFlag: bool = False organizer: str = "" source: str = "" icsUrl: str | None = None @dataclass(frozen=True) class FeedSource: name: str url: str regions: list[str] language: str default_tags: list[str] evidence_type: str NEWS_SOURCES: tuple[FeedSource, ...] = ( FeedSource("WHO", "https://www.who.int/feeds/entity/news-room/releases/en/rss.xml", ["WORLD"], "en", ["public-health"], "official"), FeedSource("ECDC", "https://www.ecdc.europa.eu/en/rss.xml", ["EU"], "en", ["public-health"], "official"), FeedSource("CDC", "https://tools.cdc.gov/api/v2/resources/media/132608.rss", ["WORLD"], "en", ["public-health"], "official"), FeedSource("EMA", "https://www.ema.europa.eu/en/news-events/news/rss.xml", ["EU"], "en", ["drug-safety"], "official"), FeedSource("Swissmedic", "https://www.swissmedic.ch/swissmedic/en/home/news/rss-feed/_jcr_content/contentPar/rssfeed.rss", ["CH"], "en", ["drug-safety"], "official"), FeedSource("Cochrane", "https://www.cochrane.org/news/rss.xml", ["WORLD"], "en", ["evidence-based-medicine"], "review"), FeedSource("medRxiv", "https://connect.medrxiv.org/relate/feed/medrxiv.xml", ["WORLD"], "en", ["preprint", "dermatology"], "preprint"), FeedSource("bioRxiv", "https://connect.biorxiv.org/relate/feed/biorxiv.xml", ["WORLD"], "en", ["preprint", "research"], "preprint"), ) DEFAULT_SPECIALTY = "dermatology" DEFAULT_NEWS_REGIONS = ["CH", "EU"] DEFAULT_EVENT_REGIONS = ["CH", "EU"] _news_cache_lock = threading.Lock() _events_cache_lock = threading.Lock() _event_url_cache_lock = threading.Lock() _news_cache: dict[str, Any] = {"payload": [], "expires_at": 0.0} _events_cache: dict[str, Any] = {"payload": [], "expires_at": 0.0, "seed_mtime": 0.0} _event_url_status_cache: dict[str, tuple[bool, float]] = {} def _now_ts() -> float: return datetime.now(timezone.utc).timestamp() def _clean_text(text: str) -> str: cleaned = re.sub(r"<[^>]+>", " ", text or "") cleaned = re.sub(r"\s+", " ", cleaned).strip() return cleaned def _safe_summary(text: str) -> str: s = _clean_text(text) if len(s) > 520: return s[:517].rstrip() + "..." return s def _parse_datetime(raw: str) -> datetime: if not raw: return datetime.now(timezone.utc) try: dt = parsedate_to_datetime(raw) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) except Exception: pass try: dt2 = datetime.fromisoformat(raw.replace("Z", "+00:00")) return dt2 if dt2.tzinfo else dt2.replace(tzinfo=timezone.utc) except Exception: return datetime.now(timezone.utc) def _read_url(url: str, timeout: int = 12) -> bytes: req = urllib.request.Request(url, headers={"User-Agent": "AZA-News-Aggregator/1.0"}) with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.read() def _is_live_event_url(url: str) -> bool: u = (url or "").strip() if not u: return False now = _now_ts() with _event_url_cache_lock: cached = _event_url_status_cache.get(u) if cached and cached[1] > now: return bool(cached[0]) ok = False try: req = urllib.request.Request( u, headers={"User-Agent": "AZA-News-Aggregator/1.0"}, method="HEAD", ) with urllib.request.urlopen(req, timeout=10) as resp: ok = int(getattr(resp, "status", 200)) < 400 except Exception: try: req = urllib.request.Request( u, headers={"User-Agent": "AZA-News-Aggregator/1.0"}, method="GET", ) with urllib.request.urlopen(req, timeout=12) as resp: ok = int(getattr(resp, "status", 200)) < 400 except Exception: ok = False with _event_url_cache_lock: _event_url_status_cache[u] = (ok, now + EVENT_URL_CHECK_TTL_SECONDS) return ok def _rss_items(source: FeedSource, limit: int = 25) -> list[NewsItem]: try: payload = _read_url(source.url) root = ET.fromstring(payload) except Exception: return [] channel = root.find("channel") if channel is not None: entries = channel.findall("item") else: entries = root.findall(".//{http://www.w3.org/2005/Atom}entry") items: list[NewsItem] = [] for idx, node in enumerate(entries): if idx >= limit: break title = _clean_text((node.findtext("title") or node.findtext("{http://www.w3.org/2005/Atom}title") or "Ohne Titel")) link = _clean_text(node.findtext("link") or "") if not link: atom_link = node.find("{http://www.w3.org/2005/Atom}link") if atom_link is not None: link = _clean_text(atom_link.attrib.get("href") or source.url) summary = node.findtext("description") or node.findtext("{http://www.w3.org/2005/Atom}summary") or "" pub_raw = node.findtext("pubDate") or node.findtext("{http://www.w3.org/2005/Atom}updated") or "" published_at = _parse_datetime(pub_raw).isoformat() items.append( NewsItem( id=f"{source.name.lower()}-{abs(hash((title, link, published_at)))}", source=source.name, title=title or "Ohne Titel", url=link or source.url, publishedAt=published_at, tags=list(source.default_tags), languageOriginal=source.language, isOpenAccess=True, evidenceType=source.evidence_type, summaryOriginal=_safe_summary(summary) or "Kurz-Zusammenfassung in der Quelle nicht verfügbar.", regions=list(source.regions), ) ) return items def _pubmed_open_access_news(limit: int = 12) -> list[NewsItem]: term = '(dermatology[Title/Abstract]) AND ("open access"[Filter])' query = urllib.parse.urlencode( { "db": "pubmed", "retmode": "json", "retmax": str(limit), "sort": "pub+date", "term": term, } ) search_url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?{query}" try: ids_payload = json.loads(_read_url(search_url).decode("utf-8", errors="ignore")) ids = ids_payload.get("esearchresult", {}).get("idlist", []) except Exception: return [] if not ids: return [] summary_url = ( "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?" + urllib.parse.urlencode({"db": "pubmed", "retmode": "json", "id": ",".join(ids)}) ) try: summary_payload = json.loads(_read_url(summary_url).decode("utf-8", errors="ignore")) except Exception: return [] out: list[NewsItem] = [] for pmid in ids: rec = summary_payload.get("result", {}).get(pmid) or {} title = _clean_text(str(rec.get("title") or "")) if not title: continue pubdate = _clean_text(str(rec.get("pubdate") or "")) dt = _parse_datetime(pubdate) url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" out.append( NewsItem( id=f"pubmed-{pmid}", source="PubMed", title=title, url=url, publishedAt=dt.isoformat(), tags=["dermatology", "open-access"], languageOriginal="en", isOpenAccess=True, evidenceType="peer-reviewed", summaryOriginal="Open-Access Eintrag aus PubMed. Volltext/Abstract je nach Journal frei verfügbar.", regions=["WORLD"], ) ) return out def _normalize_tokens(values: list[str] | None, default: list[str]) -> list[str]: out = [str(v).strip().lower() for v in (values or []) if str(v).strip()] return out or list(default) def _match_regions(item_regions: list[str], selected_regions: list[str]) -> bool: if not selected_regions: return True selected = {r.lower() for r in selected_regions} if "world" in selected or "worldwide" in selected: return True item_norm = {r.lower() for r in item_regions} # Strikter Regionsabgleich: EU zeigt nur EU/CH-Events, nicht WORLD/US/CA. if "eu" in selected and ("eu" in item_norm or "ch" in item_norm): return True if "ch" in selected and "ch" in item_norm: return True return bool(item_norm.intersection(selected)) def _match_specialties(item_tags: list[str], selected_specialties: list[str]) -> bool: if "all" in selected_specialties: return True tags = {t.lower() for t in item_tags} selected = set(selected_specialties) if "dermatology" in selected: selected.update({"skin"}) if "infectiology" in selected: selected.update({"public-health", "infectiology"}) if "general-medicine" in selected: selected.update({"public-health", "evidence-based-medicine", "internal-medicine"}) if "internal-medicine" in selected: selected.update({"general-medicine", "internal-medicine"}) return bool(tags.intersection(selected)) def _translate_summary_stub(summary: str, target_language: str, source_language: str) -> str | None: target = (target_language or "").strip().lower() source = (source_language or "").strip().lower() if not target or target in {"system", "auto", source}: return None return f"[Übersetzung nicht konfiguriert: {source}->{target}] {summary}" def get_news(filters: NewsFilter) -> list[NewsItem]: now = _now_ts() with _news_cache_lock: if float(_news_cache["expires_at"]) > now: all_items = list(_news_cache["payload"]) else: fetched: list[NewsItem] = [] for src in NEWS_SOURCES: fetched.extend(_rss_items(src, limit=24)) fetched.extend(_pubmed_open_access_news(limit=16)) fetched.sort(key=lambda x: x.publishedAt, reverse=True) _news_cache["payload"] = fetched _news_cache["expires_at"] = now + NEWS_CACHE_TTL_SECONDS all_items = fetched specialties = _normalize_tokens(filters.specialties, [DEFAULT_SPECIALTY]) regions = _normalize_tokens(filters.regions, DEFAULT_NEWS_REGIONS) filtered = [item for item in all_items if _match_specialties(item.tags, specialties) and _match_regions(item.regions, regions)] if filters.sort == "oldest": filtered.sort(key=lambda x: x.publishedAt) else: filtered.sort(key=lambda x: x.publishedAt, reverse=True) out: list[NewsItem] = [] for item in filtered[: max(1, min(filters.limit, 120))]: out.append( NewsItem( id=item.id, source=item.source, title=item.title, url=item.url, publishedAt=item.publishedAt, tags=item.tags, languageOriginal=item.languageOriginal, isOpenAccess=item.isOpenAccess, evidenceType=item.evidenceType, summaryOriginal=item.summaryOriginal, summaryTranslated=_translate_summary_stub(item.summaryOriginal, filters.language, item.languageOriginal), regions=item.regions, ) ) return out def _seed_events_path() -> Path: return Path(__file__).resolve().parent / "news_events_seed.json" def _load_seed_events() -> list[EventItem]: try: with open(_seed_events_path(), "r", encoding="utf-8") as f: payload = json.load(f) except Exception: return [] rows = payload.get("events") if isinstance(payload, dict) else None if not isinstance(rows, list): return [] out: list[EventItem] = [] for row in rows: if not isinstance(row, dict): continue try: out.append( EventItem( id=str(row["id"]), name=str(row["name"]), startDate=str(row["startDate"]), endDate=str(row["endDate"]), city=str(row.get("city") or ""), country=str(row.get("country") or ""), regions=[str(r).upper() for r in row.get("regions", []) if str(r).strip()], tags=[str(t).lower() for t in row.get("tags", []) if str(t).strip()], url=str(row.get("url") or ""), description=str(row.get("description") or ""), type=str(row.get("type") or "kongress"), cmeFlag=bool(row.get("cmeFlag", False)), organizer=str(row.get("organizer") or ""), source=str(row.get("source") or ""), icsUrl=(str(row.get("icsUrl")).strip() if row.get("icsUrl") else None), ) ) except Exception: continue return out def get_events(filters: EventFilter) -> list[EventItem]: now = _now_ts() try: seed_mtime = _seed_events_path().stat().st_mtime except Exception: seed_mtime = 0.0 with _events_cache_lock: cache_mtime = float(_events_cache.get("seed_mtime", 0.0)) if float(_events_cache["expires_at"]) > now and cache_mtime == seed_mtime: source_items = list(_events_cache["payload"]) else: source_items = _load_seed_events() _events_cache["payload"] = source_items _events_cache["expires_at"] = now + EVENTS_CACHE_TTL_SECONDS _events_cache["seed_mtime"] = seed_mtime specialties = _normalize_tokens(filters.specialties, [DEFAULT_SPECIALTY]) regions = _normalize_tokens(filters.regions, DEFAULT_EVENT_REGIONS) out: list[EventItem] = [] for item in source_items: try: start = date.fromisoformat(item.startDate) end = date.fromisoformat(item.endDate) except Exception: continue if end < filters.from_date or start > filters.to_date: continue if not _match_specialties(item.tags, specialties): continue if not _match_regions(item.regions, regions): continue if not _is_live_event_url(item.url): continue out.append(item) if filters.sort == "latest": out.sort(key=lambda x: x.startDate, reverse=True) else: out.sort(key=lambda x: x.startDate) return out[: max(1, min(filters.limit, 300))] # Backward-compatible wrappers used by backend_main.py def get_news_items(specialties: list[str] | None, lang: str = "de", region: str = "CH", limit: int = 30) -> list[dict[str, Any]]: region_values = [r.strip() for r in str(region or "CH").split(",") if r.strip()] rows = get_news(NewsFilter(specialties=specialties or [DEFAULT_SPECIALTY], regions=region_values, language=lang, limit=limit)) return [asdict(x) for x in rows] def get_event_items( specialties: list[str] | None, regions: list[str] | None, from_date: date | None, to_date: date | None, limit: int = 100, ) -> list[dict[str, Any]]: rows = get_events( EventFilter( specialties=specialties or [DEFAULT_SPECIALTY], regions=regions or list(DEFAULT_EVENT_REGIONS), from_date=from_date or date.today(), to_date=to_date or (date.today() + timedelta(days=396)), limit=limit, ) ) return [asdict(x) for x in rows]