Files
aza/AzA march 2026/services/event_extract_llm.py

221 lines
7.0 KiB
Python
Raw Normal View History

2026-03-25 22:03:39 +01:00
from __future__ import annotations
import json
import os
import re
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any
from openai import OpenAI
from services.live_event_search import SearchResult
@dataclass(frozen=True)
class EventCandidate:
name: str
startDate: str | None
endDate: str | None
city: str
country: str
urlCandidate: str
shortDescription: str
organizer: str
specialtyTags: list[str]
regionTags: list[str]
rationale: str
confidence: float
def _clean_text(t: str) -> str:
t = re.sub(r"<[^>]+>", " ", t or "")
t = re.sub(r"\s+", " ", t).strip()
return t
def _safe_date(value: Any) -> str | None:
s = str(value or "").strip()
if not s:
return None
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", s):
return s
return None
def _extract_page_excerpt(url: str, max_chars: int = 14000) -> str:
u = (url or "").strip()
if not u:
return ""
try:
req = urllib.request.Request(
u,
headers={"User-Agent": "AZA-LiveEventSearch/1.0"},
method="GET",
)
with urllib.request.urlopen(req, timeout=8) as resp:
raw = resp.read(max_chars * 2).decode("utf-8", errors="ignore")
return _clean_text(raw)[:max_chars]
except Exception:
return ""
def _make_prompt(
search_rows: list[SearchResult],
specialty: str,
regions: list[str],
excerpts: list[dict[str, str]],
) -> str:
search_payload = [
{"title": r.title, "snippet": r.snippet, "url": r.url}
for r in search_rows
]
rules = (
"Du extrahierst medizinische Kongresse/Weiterbildungen als STRICT JSON.\n"
"NUR aus den gelieferten Daten, NICHT raten, NICHT erfinden.\n"
"Wenn Datum/Ort unklar: null setzen und confidence reduzieren.\n"
"urlCandidate MUSS eine gelieferte URL oder eine URL aus pageExcerpts sein.\n"
"Antwortformat: JSON-Objekt mit key 'events' (Array).\n"
"Jedes Event: {name,startDate,endDate,city,country,urlCandidate,shortDescription,organizer,specialtyTags,regionTags,rationale,confidence}\n"
"startDate/endDate als ISO YYYY-MM-DD oder null.\n"
"confidence zwischen 0 und 1."
)
payload = {
"specialty": specialty,
"regions": regions,
"searchResults": search_payload,
"pageExcerpts": excerpts,
}
return rules + "\n\nDATA:\n" + json.dumps(payload, ensure_ascii=False)
def _extract_json_block(text: str) -> dict:
try:
data = json.loads(text)
if isinstance(data, dict):
return data
except Exception:
pass
match = re.search(r"\{.*\}", text, flags=re.DOTALL)
if not match:
return {"events": []}
try:
data = json.loads(match.group(0))
if isinstance(data, dict):
return data
except Exception:
pass
return {"events": []}
def _call_openai_json(prompt: str) -> dict:
key = os.getenv("OPENAI_API_KEY", "").strip()
if not key:
raise RuntimeError("OPENAI_API_KEY nicht gesetzt")
model = os.getenv("EVENT_LLM_MODEL", "gpt-4o-mini").strip()
client = OpenAI(api_key=key)
resp = client.chat.completions.create(
model=model,
temperature=0,
messages=[
{"role": "system", "content": "Gib nur valides JSON zurück."},
{"role": "user", "content": prompt},
],
)
txt = ""
try:
txt = (resp.choices[0].message.content or "").strip()
except Exception:
txt = ""
return _extract_json_block(txt)
def _call_gemini_json(prompt: str) -> dict:
key = os.getenv("GEMINI_API_KEY", "").strip()
if not key:
raise RuntimeError("GEMINI_API_KEY nicht gesetzt")
model = os.getenv("GEMINI_MODEL", "gemini-1.5-flash").strip()
url = (
f"https://generativelanguage.googleapis.com/v1beta/models/"
f"{urllib.parse.quote(model)}:generateContent?key={urllib.parse.quote(key)}"
)
body = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"temperature": 0},
}
req = urllib.request.Request(
url,
headers={"Content-Type": "application/json"},
data=json.dumps(body).encode("utf-8"),
method="POST",
)
with urllib.request.urlopen(req, timeout=20) as resp:
raw = resp.read().decode("utf-8", errors="ignore")
data = json.loads(raw)
txt = ""
try:
txt = data["candidates"][0]["content"]["parts"][0]["text"]
except Exception:
txt = ""
return _extract_json_block(txt)
def _normalize_candidate(row: dict, default_specialty: str, default_regions: list[str]) -> EventCandidate | None:
if not isinstance(row, dict):
return None
name = str(row.get("name") or "").strip()
url_candidate = str(row.get("urlCandidate") or "").strip()
if not name or not url_candidate:
return None
try:
confidence = float(row.get("confidence", 0.0))
except Exception:
confidence = 0.0
confidence = max(0.0, min(1.0, confidence))
specialty_tags = row.get("specialtyTags") if isinstance(row.get("specialtyTags"), list) else [default_specialty]
region_tags = row.get("regionTags") if isinstance(row.get("regionTags"), list) else list(default_regions)
return EventCandidate(
name=name,
startDate=_safe_date(row.get("startDate")),
endDate=_safe_date(row.get("endDate")) or _safe_date(row.get("startDate")),
city=str(row.get("city") or "").strip(),
country=str(row.get("country") or "").strip(),
urlCandidate=url_candidate,
shortDescription=str(row.get("shortDescription") or "").strip()[:600],
organizer=str(row.get("organizer") or "").strip(),
specialtyTags=[str(x).strip().lower() for x in specialty_tags if str(x).strip()],
regionTags=[str(x).strip().upper() for x in region_tags if str(x).strip()],
rationale=str(row.get("rationale") or "").strip()[:300],
confidence=confidence,
)
def extract_event_candidates(
search_rows: list[SearchResult],
specialty: str,
regions: list[str],
) -> list[EventCandidate]:
if not search_rows:
return []
top_rows = list(search_rows[:10])
excerpts = []
for row in top_rows:
text = _extract_page_excerpt(row.url)
if text:
excerpts.append({"url": row.url, "excerpt": text})
prompt = _make_prompt(top_rows, specialty=specialty, regions=regions, excerpts=excerpts)
provider = os.getenv("LLM_PROVIDER", "openai").strip().lower()
if provider == "gemini":
payload = _call_gemini_json(prompt)
else:
payload = _call_openai_json(prompt)
rows = payload.get("events") if isinstance(payload.get("events"), list) else []
out: list[EventCandidate] = []
for row in rows:
cand = _normalize_candidate(row, default_specialty=specialty, default_regions=regions)
if cand is not None:
out.append(cand)
return out