Files
aza/AzA march 2026 - Kopie (18)/aza_ai_client.py
2026-04-22 22:33:46 +02:00

212 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Zentraler KI-Client für AZA.
Liefert entweder einen Backend-Proxy oder einen lokalen OpenAI-Client.
Im Remote-Backend-Modus wird KEIN lokaler API-Key benötigt
alle Anfragen gehen über POST /v1/chat an das Backend.
"""
import os
import sys
from types import SimpleNamespace
import requests
def _search_dirs() -> list[str]:
dirs: list[str] = []
if getattr(sys, "frozen", False):
_exe = os.path.dirname(os.path.abspath(sys.executable))
dirs.append(_exe)
dirs.append(os.path.join(_exe, "_internal"))
_src = os.path.dirname(os.path.abspath(__file__))
dirs.append(_src)
dirs.append(os.path.join(_src, "_internal"))
dirs.append(os.getcwd())
return dirs
def _read_file_value(filename: str) -> str | None:
seen: set[str] = set()
for base in _search_dirs():
if not base or base in seen:
continue
seen.add(base)
p = os.path.join(base, filename)
if os.path.isfile(p):
try:
with open(p, "r", encoding="utf-8-sig") as f:
v = f.read().replace("\ufeff", "").strip()
if v:
return v
except Exception:
continue
return None
def _clean(value) -> str | None:
if value is None:
return None
v = str(value).replace("\ufeff", "").strip()
return v if v else None
def read_backend_url() -> str | None:
url = _clean(os.getenv("MEDWORK_BACKEND_URL"))
if url:
return url.rstrip("/")
url = _read_file_value("backend_url.txt")
return url.rstrip("/") if url else None
def read_backend_token() -> str | None:
token = _read_file_value("backend_token.txt")
if token:
return token
tokens_env = os.getenv("MEDWORK_API_TOKENS", "").strip()
if tokens_env:
t = _clean(tokens_env.split(",")[0])
if t:
return t
return _clean(os.getenv("MEDWORK_API_TOKEN"))
def has_remote_backend() -> bool:
url = read_backend_url()
if not url:
return False
return not any(h in url for h in ("127.0.0.1", "localhost", "0.0.0.0"))
# ── Backend-Proxy-Klassen (Drop-In für OpenAI-Client) ──────────────
class _BackendCompletions:
def __init__(self, backend_url: str, backend_token: str):
self._url = backend_url
self._token = backend_token
def create(self, **kwargs):
payload: dict = {
"model": kwargs.get("model", "gpt-4o"),
"messages": [
{"role": m["role"], "content": m["content"]}
if isinstance(m, dict)
else {"role": m.role, "content": m.content}
for m in kwargs.get("messages", [])
],
}
if kwargs.get("temperature") is not None:
payload["temperature"] = kwargs["temperature"]
if kwargs.get("max_tokens") is not None:
payload["max_tokens"] = kwargs["max_tokens"]
if kwargs.get("top_p") is not None:
payload["top_p"] = kwargs["top_p"]
r = requests.post(
f"{self._url}/v1/chat",
json=payload,
headers={"X-API-Token": self._token},
timeout=(5, 180),
)
r.raise_for_status()
data = r.json()
if not data.get("success"):
raise RuntimeError(data.get("error", "Backend-Chat fehlgeschlagen"))
content = (data.get("content") or "").replace("ß", "ss")
msg = SimpleNamespace(content=content, role="assistant")
choice = SimpleNamespace(message=msg, finish_reason=data.get("finish_reason"))
usage = None
usage_data = data.get("usage")
if usage_data:
usage = SimpleNamespace(
prompt_tokens=usage_data.get("prompt_tokens", 0),
completion_tokens=usage_data.get("completion_tokens", 0),
total_tokens=usage_data.get("total_tokens", 0),
)
return SimpleNamespace(choices=[choice], usage=usage, model=data.get("model", ""))
class _BackendChat:
def __init__(self, backend_url: str, backend_token: str):
self.completions = _BackendCompletions(backend_url, backend_token)
class _BackendTranscriptions:
def __init__(self, backend_url: str, backend_token: str):
self._url = backend_url
self._token = backend_token
def create(self, **kwargs):
file_obj = kwargs.get("file")
if file_obj is None:
raise ValueError("Kein 'file'-Argument für Transkription übergeben.")
language = kwargs.get("language", "de")
prompt = kwargs.get("prompt", "")
fname = getattr(file_obj, "name", "audio.m4a")
ext = os.path.splitext(fname)[1].lower() if fname else ".m4a"
ct = "audio/mp4" if ext == ".m4a" else "audio/wav"
r = requests.post(
f"{self._url}/v1/transcribe",
files={"file": (os.path.basename(fname), file_obj, ct)},
data={"language": language, "prompt": prompt},
headers={"X-API-Token": self._token},
timeout=(5, 300),
)
r.raise_for_status()
data = r.json()
if not data.get("success"):
raise RuntimeError(data.get("error", "Transkription fehlgeschlagen"))
return SimpleNamespace(text=data.get("transcript", ""))
class _BackendAudio:
def __init__(self, backend_url: str, backend_token: str):
self.transcriptions = _BackendTranscriptions(backend_url, backend_token)
class BackendChatProxy:
"""Drop-in-Ersatz für den OpenAI-Client im Remote-Backend-Modus.
Unterstützt:
- proxy.chat.completions.create(model=..., messages=..., ...)
- proxy.audio.transcriptions.create(model=..., file=..., language=...)
"""
def __init__(self, backend_url: str, backend_token: str):
self.chat = _BackendChat(backend_url, backend_token)
self.audio = _BackendAudio(backend_url, backend_token)
def get_ai_client():
"""Liefert einen KI-Client.
- Remote-Backend verfügbar → BackendChatProxy (kein lokaler Key nötig)
- Sonst → lokaler OpenAI-Client (braucht API-Key)
- Beides nicht möglich → RuntimeError
"""
if has_remote_backend():
url = read_backend_url()
token = read_backend_token()
if url and token:
return BackendChatProxy(url, token)
try:
from openai_runtime_config import get_openai_api_key
api_key = get_openai_api_key()
if api_key:
from openai import OpenAI
return OpenAI(api_key=api_key)
except Exception:
pass
raise RuntimeError(
"KI-Verbindung nicht verfügbar.\n\n"
"Weder ein Remote-Backend noch ein lokaler\n"
"OpenAI-Schlüssel ist konfiguriert."
)