Files

478 lines
17 KiB
Python
Raw Permalink Normal View History

2026-04-22 22:33:46 +02:00
"""Aggregator for open-access medical news and congress events."""
from __future__ import annotations
import json
import re
import threading
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import asdict, dataclass, field
from datetime import date, datetime, timedelta, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any
NEWS_CACHE_TTL_SECONDS = 30 * 60
EVENTS_CACHE_TTL_SECONDS = 45 * 60
EVENT_URL_CHECK_TTL_SECONDS = 12 * 60 * 60
@dataclass(frozen=True)
class NewsFilter:
specialties: list[str]
regions: list[str]
language: str = "de"
sort: str = "newest"
limit: int = 30
@dataclass(frozen=True)
class EventFilter:
specialties: list[str]
regions: list[str]
from_date: date
to_date: date
sort: str = "soonest"
limit: int = 100
@dataclass(frozen=True)
class NewsItem:
id: str
source: str
title: str
url: str
publishedAt: str
tags: list[str]
languageOriginal: str
isOpenAccess: bool
evidenceType: str
summaryOriginal: str
summaryTranslated: str | None = None
regions: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class EventItem:
id: str
name: str
startDate: str
endDate: str
city: str
country: str
regions: list[str]
tags: list[str]
url: str
description: str
type: str
cmeFlag: bool = False
organizer: str = ""
source: str = ""
icsUrl: str | None = None
@dataclass(frozen=True)
class FeedSource:
name: str
url: str
regions: list[str]
language: str
default_tags: list[str]
evidence_type: str
NEWS_SOURCES: tuple[FeedSource, ...] = (
FeedSource("WHO", "https://www.who.int/feeds/entity/news-room/releases/en/rss.xml", ["WORLD"], "en", ["public-health"], "official"),
FeedSource("ECDC", "https://www.ecdc.europa.eu/en/rss.xml", ["EU"], "en", ["public-health"], "official"),
FeedSource("CDC", "https://tools.cdc.gov/api/v2/resources/media/132608.rss", ["WORLD"], "en", ["public-health"], "official"),
FeedSource("EMA", "https://www.ema.europa.eu/en/news-events/news/rss.xml", ["EU"], "en", ["drug-safety"], "official"),
FeedSource("Swissmedic", "https://www.swissmedic.ch/swissmedic/en/home/news/rss-feed/_jcr_content/contentPar/rssfeed.rss", ["CH"], "en", ["drug-safety"], "official"),
FeedSource("Cochrane", "https://www.cochrane.org/news/rss.xml", ["WORLD"], "en", ["evidence-based-medicine"], "review"),
FeedSource("medRxiv", "https://connect.medrxiv.org/relate/feed/medrxiv.xml", ["WORLD"], "en", ["preprint", "dermatology"], "preprint"),
FeedSource("bioRxiv", "https://connect.biorxiv.org/relate/feed/biorxiv.xml", ["WORLD"], "en", ["preprint", "research"], "preprint"),
)
DEFAULT_SPECIALTY = "dermatology"
DEFAULT_NEWS_REGIONS = ["CH", "EU"]
DEFAULT_EVENT_REGIONS = ["CH", "EU"]
_news_cache_lock = threading.Lock()
_events_cache_lock = threading.Lock()
_event_url_cache_lock = threading.Lock()
_news_cache: dict[str, Any] = {"payload": [], "expires_at": 0.0}
_events_cache: dict[str, Any] = {"payload": [], "expires_at": 0.0, "seed_mtime": 0.0}
_event_url_status_cache: dict[str, tuple[bool, float]] = {}
def _now_ts() -> float:
return datetime.now(timezone.utc).timestamp()
def _clean_text(text: str) -> str:
cleaned = re.sub(r"<[^>]+>", " ", text or "")
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
def _safe_summary(text: str) -> str:
s = _clean_text(text)
if len(s) > 520:
return s[:517].rstrip() + "..."
return s
def _parse_datetime(raw: str) -> datetime:
if not raw:
return datetime.now(timezone.utc)
try:
dt = parsedate_to_datetime(raw)
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
except Exception:
pass
try:
dt2 = datetime.fromisoformat(raw.replace("Z", "+00:00"))
return dt2 if dt2.tzinfo else dt2.replace(tzinfo=timezone.utc)
except Exception:
return datetime.now(timezone.utc)
def _read_url(url: str, timeout: int = 12) -> bytes:
req = urllib.request.Request(url, headers={"User-Agent": "AZA-News-Aggregator/1.0"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read()
def _is_live_event_url(url: str) -> bool:
u = (url or "").strip()
if not u:
return False
now = _now_ts()
with _event_url_cache_lock:
cached = _event_url_status_cache.get(u)
if cached and cached[1] > now:
return bool(cached[0])
ok = False
try:
req = urllib.request.Request(
u,
headers={"User-Agent": "AZA-News-Aggregator/1.0"},
method="HEAD",
)
with urllib.request.urlopen(req, timeout=10) as resp:
ok = int(getattr(resp, "status", 200)) < 400
except Exception:
try:
req = urllib.request.Request(
u,
headers={"User-Agent": "AZA-News-Aggregator/1.0"},
method="GET",
)
with urllib.request.urlopen(req, timeout=12) as resp:
ok = int(getattr(resp, "status", 200)) < 400
except Exception:
ok = False
with _event_url_cache_lock:
_event_url_status_cache[u] = (ok, now + EVENT_URL_CHECK_TTL_SECONDS)
return ok
def _rss_items(source: FeedSource, limit: int = 25) -> list[NewsItem]:
try:
payload = _read_url(source.url)
root = ET.fromstring(payload)
except Exception:
return []
channel = root.find("channel")
if channel is not None:
entries = channel.findall("item")
else:
entries = root.findall(".//{http://www.w3.org/2005/Atom}entry")
items: list[NewsItem] = []
for idx, node in enumerate(entries):
if idx >= limit:
break
title = _clean_text((node.findtext("title") or node.findtext("{http://www.w3.org/2005/Atom}title") or "Ohne Titel"))
link = _clean_text(node.findtext("link") or "")
if not link:
atom_link = node.find("{http://www.w3.org/2005/Atom}link")
if atom_link is not None:
link = _clean_text(atom_link.attrib.get("href") or source.url)
summary = node.findtext("description") or node.findtext("{http://www.w3.org/2005/Atom}summary") or ""
pub_raw = node.findtext("pubDate") or node.findtext("{http://www.w3.org/2005/Atom}updated") or ""
published_at = _parse_datetime(pub_raw).isoformat()
items.append(
NewsItem(
id=f"{source.name.lower()}-{abs(hash((title, link, published_at)))}",
source=source.name,
title=title or "Ohne Titel",
url=link or source.url,
publishedAt=published_at,
tags=list(source.default_tags),
languageOriginal=source.language,
isOpenAccess=True,
evidenceType=source.evidence_type,
summaryOriginal=_safe_summary(summary) or "Kurz-Zusammenfassung in der Quelle nicht verfügbar.",
regions=list(source.regions),
)
)
return items
def _pubmed_open_access_news(limit: int = 12) -> list[NewsItem]:
term = '(dermatology[Title/Abstract]) AND ("open access"[Filter])'
query = urllib.parse.urlencode(
{
"db": "pubmed",
"retmode": "json",
"retmax": str(limit),
"sort": "pub+date",
"term": term,
}
)
search_url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?{query}"
try:
ids_payload = json.loads(_read_url(search_url).decode("utf-8", errors="ignore"))
ids = ids_payload.get("esearchresult", {}).get("idlist", [])
except Exception:
return []
if not ids:
return []
summary_url = (
"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?"
+ urllib.parse.urlencode({"db": "pubmed", "retmode": "json", "id": ",".join(ids)})
)
try:
summary_payload = json.loads(_read_url(summary_url).decode("utf-8", errors="ignore"))
except Exception:
return []
out: list[NewsItem] = []
for pmid in ids:
rec = summary_payload.get("result", {}).get(pmid) or {}
title = _clean_text(str(rec.get("title") or ""))
if not title:
continue
pubdate = _clean_text(str(rec.get("pubdate") or ""))
dt = _parse_datetime(pubdate)
url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/"
out.append(
NewsItem(
id=f"pubmed-{pmid}",
source="PubMed",
title=title,
url=url,
publishedAt=dt.isoformat(),
tags=["dermatology", "open-access"],
languageOriginal="en",
isOpenAccess=True,
evidenceType="peer-reviewed",
summaryOriginal="Open-Access Eintrag aus PubMed. Volltext/Abstract je nach Journal frei verfügbar.",
regions=["WORLD"],
)
)
return out
def _normalize_tokens(values: list[str] | None, default: list[str]) -> list[str]:
out = [str(v).strip().lower() for v in (values or []) if str(v).strip()]
return out or list(default)
def _match_regions(item_regions: list[str], selected_regions: list[str]) -> bool:
if not selected_regions:
return True
selected = {r.lower() for r in selected_regions}
if "world" in selected or "worldwide" in selected:
return True
item_norm = {r.lower() for r in item_regions}
# Strikter Regionsabgleich: EU zeigt nur EU/CH-Events, nicht WORLD/US/CA.
if "eu" in selected and ("eu" in item_norm or "ch" in item_norm):
return True
if "ch" in selected and "ch" in item_norm:
return True
return bool(item_norm.intersection(selected))
def _match_specialties(item_tags: list[str], selected_specialties: list[str]) -> bool:
if "all" in selected_specialties:
return True
tags = {t.lower() for t in item_tags}
selected = set(selected_specialties)
if "dermatology" in selected:
selected.update({"skin"})
if "infectiology" in selected:
selected.update({"public-health", "infectiology"})
if "general-medicine" in selected:
selected.update({"public-health", "evidence-based-medicine", "internal-medicine"})
if "internal-medicine" in selected:
selected.update({"general-medicine", "internal-medicine"})
return bool(tags.intersection(selected))
def _translate_summary_stub(summary: str, target_language: str, source_language: str) -> str | None:
target = (target_language or "").strip().lower()
source = (source_language or "").strip().lower()
if not target or target in {"system", "auto", source}:
return None
return f"[Übersetzung nicht konfiguriert: {source}->{target}] {summary}"
def get_news(filters: NewsFilter) -> list[NewsItem]:
now = _now_ts()
with _news_cache_lock:
if float(_news_cache["expires_at"]) > now:
all_items = list(_news_cache["payload"])
else:
fetched: list[NewsItem] = []
for src in NEWS_SOURCES:
fetched.extend(_rss_items(src, limit=24))
fetched.extend(_pubmed_open_access_news(limit=16))
fetched.sort(key=lambda x: x.publishedAt, reverse=True)
_news_cache["payload"] = fetched
_news_cache["expires_at"] = now + NEWS_CACHE_TTL_SECONDS
all_items = fetched
specialties = _normalize_tokens(filters.specialties, [DEFAULT_SPECIALTY])
regions = _normalize_tokens(filters.regions, DEFAULT_NEWS_REGIONS)
filtered = [item for item in all_items if _match_specialties(item.tags, specialties) and _match_regions(item.regions, regions)]
if filters.sort == "oldest":
filtered.sort(key=lambda x: x.publishedAt)
else:
filtered.sort(key=lambda x: x.publishedAt, reverse=True)
out: list[NewsItem] = []
for item in filtered[: max(1, min(filters.limit, 120))]:
out.append(
NewsItem(
id=item.id,
source=item.source,
title=item.title,
url=item.url,
publishedAt=item.publishedAt,
tags=item.tags,
languageOriginal=item.languageOriginal,
isOpenAccess=item.isOpenAccess,
evidenceType=item.evidenceType,
summaryOriginal=item.summaryOriginal,
summaryTranslated=_translate_summary_stub(item.summaryOriginal, filters.language, item.languageOriginal),
regions=item.regions,
)
)
return out
def _seed_events_path() -> Path:
return Path(__file__).resolve().parent / "news_events_seed.json"
def _load_seed_events() -> list[EventItem]:
try:
with open(_seed_events_path(), "r", encoding="utf-8") as f:
payload = json.load(f)
except Exception:
return []
rows = payload.get("events") if isinstance(payload, dict) else None
if not isinstance(rows, list):
return []
out: list[EventItem] = []
for row in rows:
if not isinstance(row, dict):
continue
try:
out.append(
EventItem(
id=str(row["id"]),
name=str(row["name"]),
startDate=str(row["startDate"]),
endDate=str(row["endDate"]),
city=str(row.get("city") or ""),
country=str(row.get("country") or ""),
regions=[str(r).upper() for r in row.get("regions", []) if str(r).strip()],
tags=[str(t).lower() for t in row.get("tags", []) if str(t).strip()],
url=str(row.get("url") or ""),
description=str(row.get("description") or ""),
type=str(row.get("type") or "kongress"),
cmeFlag=bool(row.get("cmeFlag", False)),
organizer=str(row.get("organizer") or ""),
source=str(row.get("source") or ""),
icsUrl=(str(row.get("icsUrl")).strip() if row.get("icsUrl") else None),
)
)
except Exception:
continue
return out
def get_events(filters: EventFilter) -> list[EventItem]:
now = _now_ts()
try:
seed_mtime = _seed_events_path().stat().st_mtime
except Exception:
seed_mtime = 0.0
with _events_cache_lock:
cache_mtime = float(_events_cache.get("seed_mtime", 0.0))
if float(_events_cache["expires_at"]) > now and cache_mtime == seed_mtime:
source_items = list(_events_cache["payload"])
else:
source_items = _load_seed_events()
_events_cache["payload"] = source_items
_events_cache["expires_at"] = now + EVENTS_CACHE_TTL_SECONDS
_events_cache["seed_mtime"] = seed_mtime
specialties = _normalize_tokens(filters.specialties, [DEFAULT_SPECIALTY])
regions = _normalize_tokens(filters.regions, DEFAULT_EVENT_REGIONS)
out: list[EventItem] = []
for item in source_items:
try:
start = date.fromisoformat(item.startDate)
end = date.fromisoformat(item.endDate)
except Exception:
continue
if end < filters.from_date or start > filters.to_date:
continue
if not _match_specialties(item.tags, specialties):
continue
if not _match_regions(item.regions, regions):
continue
if not _is_live_event_url(item.url):
continue
out.append(item)
if filters.sort == "latest":
out.sort(key=lambda x: x.startDate, reverse=True)
else:
out.sort(key=lambda x: x.startDate)
return out[: max(1, min(filters.limit, 300))]
# Backward-compatible wrappers used by backend_main.py
def get_news_items(specialties: list[str] | None, lang: str = "de", region: str = "CH", limit: int = 30) -> list[dict[str, Any]]:
region_values = [r.strip() for r in str(region or "CH").split(",") if r.strip()]
rows = get_news(NewsFilter(specialties=specialties or [DEFAULT_SPECIALTY], regions=region_values, language=lang, limit=limit))
return [asdict(x) for x in rows]
def get_event_items(
specialties: list[str] | None,
regions: list[str] | None,
from_date: date | None,
to_date: date | None,
limit: int = 100,
) -> list[dict[str, Any]]:
rows = get_events(
EventFilter(
specialties=specialties or [DEFAULT_SPECIALTY],
regions=regions or list(DEFAULT_EVENT_REGIONS),
from_date=from_date or date.today(),
to_date=to_date or (date.today() + timedelta(days=396)),
limit=limit,
)
)
return [asdict(x) for x in rows]