# -*- coding: utf-8 -*- """ Zentraler KI-Client für AZA. Liefert entweder einen Backend-Proxy oder einen lokalen OpenAI-Client. Im Remote-Backend-Modus wird KEIN lokaler API-Key benötigt – alle Anfragen gehen über POST /v1/chat an das Backend. """ import os import sys from types import SimpleNamespace import requests def _search_dirs() -> list[str]: dirs: list[str] = [] if getattr(sys, "frozen", False): _exe = os.path.dirname(os.path.abspath(sys.executable)) dirs.append(_exe) dirs.append(os.path.join(_exe, "_internal")) _src = os.path.dirname(os.path.abspath(__file__)) dirs.append(_src) dirs.append(os.path.join(_src, "_internal")) dirs.append(os.getcwd()) return dirs def _read_file_value(filename: str) -> str | None: seen: set[str] = set() for base in _search_dirs(): if not base or base in seen: continue seen.add(base) p = os.path.join(base, filename) if os.path.isfile(p): try: with open(p, "r", encoding="utf-8-sig") as f: v = f.read().replace("\ufeff", "").strip() if v: return v except Exception: continue return None def _clean(value) -> str | None: if value is None: return None v = str(value).replace("\ufeff", "").strip() return v if v else None def read_backend_url() -> str | None: url = _clean(os.getenv("MEDWORK_BACKEND_URL")) if url: return url.rstrip("/") url = _read_file_value("backend_url.txt") return url.rstrip("/") if url else None def read_backend_token() -> str | None: token = _read_file_value("backend_token.txt") if token: return token tokens_env = os.getenv("MEDWORK_API_TOKENS", "").strip() if tokens_env: t = _clean(tokens_env.split(",")[0]) if t: return t return _clean(os.getenv("MEDWORK_API_TOKEN")) def has_remote_backend() -> bool: url = read_backend_url() if not url: return False return not any(h in url for h in ("127.0.0.1", "localhost", "0.0.0.0")) # ── Backend-Proxy-Klassen (Drop-In für OpenAI-Client) ────────────── class _BackendCompletions: def __init__(self, backend_url: str, backend_token: str): self._url = backend_url self._token = backend_token def create(self, **kwargs): payload: dict = { "model": kwargs.get("model", "gpt-4o"), "messages": [ {"role": m["role"], "content": m["content"]} if isinstance(m, dict) else {"role": m.role, "content": m.content} for m in kwargs.get("messages", []) ], } if kwargs.get("temperature") is not None: payload["temperature"] = kwargs["temperature"] if kwargs.get("max_tokens") is not None: payload["max_tokens"] = kwargs["max_tokens"] if kwargs.get("top_p") is not None: payload["top_p"] = kwargs["top_p"] r = requests.post( f"{self._url}/v1/chat", json=payload, headers={"X-API-Token": self._token}, timeout=(5, 180), ) r.raise_for_status() data = r.json() if not data.get("success"): raise RuntimeError(data.get("error", "Backend-Chat fehlgeschlagen")) content = (data.get("content") or "").replace("ß", "ss") msg = SimpleNamespace(content=content, role="assistant") choice = SimpleNamespace(message=msg, finish_reason=data.get("finish_reason")) usage = None usage_data = data.get("usage") if usage_data: usage = SimpleNamespace( prompt_tokens=usage_data.get("prompt_tokens", 0), completion_tokens=usage_data.get("completion_tokens", 0), total_tokens=usage_data.get("total_tokens", 0), ) return SimpleNamespace(choices=[choice], usage=usage, model=data.get("model", "")) class _BackendChat: def __init__(self, backend_url: str, backend_token: str): self.completions = _BackendCompletions(backend_url, backend_token) class _BackendTranscriptions: def __init__(self, backend_url: str, backend_token: str): self._url = backend_url self._token = backend_token def create(self, **kwargs): file_obj = kwargs.get("file") if file_obj is None: raise ValueError("Kein 'file'-Argument für Transkription übergeben.") language = kwargs.get("language", "de") prompt = kwargs.get("prompt", "") fname = getattr(file_obj, "name", "audio.m4a") ext = os.path.splitext(fname)[1].lower() if fname else ".m4a" ct = "audio/mp4" if ext == ".m4a" else "audio/wav" r = requests.post( f"{self._url}/v1/transcribe", files={"file": (os.path.basename(fname), file_obj, ct)}, data={"language": language, "prompt": prompt}, headers={"X-API-Token": self._token}, timeout=(5, 300), ) r.raise_for_status() data = r.json() if not data.get("success"): raise RuntimeError(data.get("error", "Transkription fehlgeschlagen")) return SimpleNamespace(text=data.get("transcript", "")) class _BackendAudio: def __init__(self, backend_url: str, backend_token: str): self.transcriptions = _BackendTranscriptions(backend_url, backend_token) class BackendChatProxy: """Drop-in-Ersatz für den OpenAI-Client im Remote-Backend-Modus. Unterstützt: - proxy.chat.completions.create(model=..., messages=..., ...) - proxy.audio.transcriptions.create(model=..., file=..., language=...) """ def __init__(self, backend_url: str, backend_token: str): self.chat = _BackendChat(backend_url, backend_token) self.audio = _BackendAudio(backend_url, backend_token) def get_ai_client(): """Liefert einen KI-Client. - Remote-Backend verfügbar → BackendChatProxy (kein lokaler Key nötig) - Sonst → lokaler OpenAI-Client (braucht API-Key) - Beides nicht möglich → RuntimeError """ if has_remote_backend(): url = read_backend_url() token = read_backend_token() if url and token: return BackendChatProxy(url, token) try: from openai_runtime_config import get_openai_api_key api_key = get_openai_api_key() if api_key: from openai import OpenAI return OpenAI(api_key=api_key) except Exception: pass raise RuntimeError( "KI-Verbindung nicht verfügbar.\n\n" "Weder ein Remote-Backend noch ein lokaler\n" "OpenAI-Schlüssel ist konfiguriert." )