Files
aza/AzA march 2026 - Kopie (28)/aza_ai_budget.py
2026-05-20 00:09:28 +02:00

565 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Serverseitiges KI-Budget (Phase 1): SQLite-Events, Schätzkosten, Gating pro Subscription-Periode.
Keine Secrets loggen. Alle Beträge sind Schätzungen (estimated_cost_usd).
"""
from __future__ import annotations
import hashlib
import json
import os
import sqlite3
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
# Default: 10 USD = 100 % pro Stripe-Abrechnungsperiode
DEFAULT_MONTHLY_AI_BUDGET_USD: float = float(os.environ.get("AZA_AI_BUDGET_USD_DEFAULT", "10.0"))
# Konservative Schätzung; später pro lookup_key überschreibbar
_ESTIMATE_MARKER = "openai_estimate_v1"
# USD pro 1M Tokens (Input / Output) zentrale Konstanten, keine Streuung im Code
# Werte sind bewusst konservativ platziert; Anpassung nur hier oder via ENV-Overrides (optional später).
_MODEL_USD_PER_1M_INPUT: Dict[str, float] = {
"gpt-4o-mini": 0.15,
"gpt-4o": 2.5,
"gpt-5.2": 5.0,
"gpt-5-mini": 1.0,
"gpt-5-nano": 0.3,
"gpt-4o-mini-transcribe": 3.0, # Näherung Audio-Modell
"whisper-1": 1.0,
}
_MODEL_USD_PER_1M_OUTPUT: Dict[str, float] = {
"gpt-4o-mini": 0.60,
"gpt-4o": 10.0,
"gpt-5.2": 15.0,
"gpt-5-mini": 4.0,
"gpt-5-nano": 1.2,
"gpt-4o-mini-transcribe": 0.0,
"whisper-1": 0.0,
}
# Transkription: USD pro Minute (wenn nur Audio-Länge bekannt)
_TRANSCRIBE_USD_PER_MINUTE: Dict[str, float] = {
"gpt-4o-mini-transcribe": 0.012,
"whisper-1": 0.006,
"default": 0.012,
}
def _hash_device_id(device_id: str) -> str:
return hashlib.sha256(device_id.encode("utf-8")).hexdigest()
def _norm_model(model: str) -> str:
m = (model or "").strip().lower()
if m.startswith("gpt-") and "-transcribe" in m:
return "gpt-4o-mini-transcribe"
return m
def estimate_openai_cost_usd(
*,
model: str,
input_tokens: int = 0,
output_tokens: int = 0,
audio_seconds: float = 0.0,
operation_type: str = "other",
) -> float:
"""
Einheitliche Kostenschätzung (USD). Immer konservativ; echte Abrechnung kann abweichen.
"""
_ = operation_type
m = _norm_model(model)
in_rate = _MODEL_USD_PER_1M_INPUT.get(m)
out_rate = _MODEL_USD_PER_1M_OUTPUT.get(m)
if in_rate is None:
# Unbekanntes Modell: konservativ nach „teuerem“ Chat
in_rate = 5.0
out_rate = 15.0
if out_rate is None:
out_rate = 0.0
cost = (max(0, int(input_tokens)) / 1_000_000.0) * in_rate
cost += (max(0, int(output_tokens)) / 1_000_000.0) * out_rate
if audio_seconds > 0:
per_min = _TRANSCRIBE_USD_PER_MINUTE.get(m) or _TRANSCRIBE_USD_PER_MINUTE["default"]
cost += (max(0.0, float(audio_seconds)) / 60.0) * per_min
return round(max(0.0, cost), 6)
def estimate_audio_seconds_for_transcription(
*,
byte_size: int,
file_path: Optional[str] = None,
suffix: str = "",
) -> float:
"""
Dauer in Sekunden für Transkriptions-Kosten (Schätzung).
- WAV: echte Länge via stdlib ``wave``, wenn lesbar.
- Sonst: konservative Byte-Heuristik (wie bisher ~ bytes/10000), gekappt 3…7200 s.
"""
suf = (suffix or "").lower()
fp = (file_path or "").strip()
if fp and suf in (".wav", ".wave"):
try:
import wave
with wave.open(fp, "rb") as wf:
frames = wf.getnframes()
rate = wf.getframerate() or 1
sec = frames / float(rate)
if sec > 0:
return max(3.0, min(7200.0, float(sec)))
except Exception:
pass
est = max(3.0, min(7200.0, int(byte_size) / 10000.0))
return float(est)
def budget_gate_blocked_payload_or_none(
con: sqlite3.Connection,
lic: LicenseBudgetRow,
*,
device_id: Optional[str],
request_id: str,
operation_type: str,
model: str,
gate_meta: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[str, Any]]:
"""
None = Aufruf erlaubt. Sonst Dict für HTTP 402 (Content), inkl. blocked usage_event.
"""
ok, info = check_allows_openai_call(con, lic)
if ok:
return None
if not isinstance(info, dict):
info = {"success": False, "error_code": "AI_BUDGET_EXCEEDED"}
detail = dict(info)
detail["request_id"] = request_id
ps, pe = _effective_period(lic.period_start, lic.period_end)
meta = dict(gate_meta or {})
meta["gate"] = operation_type
insert_usage_event(
con,
lic=lic,
device_id=device_id,
period_start=ps,
period_end=pe,
operation_type=operation_type,
model=model,
input_tokens=0,
output_tokens=0,
total_tokens=0,
audio_seconds=0.0,
estimated_cost_usd=0.0,
request_id=request_id,
status="blocked",
error_code="AI_BUDGET_EXCEEDED",
meta=meta,
)
return detail
def record_success_after_openai(
con: sqlite3.Connection,
lic: LicenseBudgetRow,
*,
device_id: Optional[str],
request_id: str,
model: str,
operation_type: str,
input_tokens: int,
output_tokens: int,
total_tokens: int,
audio_seconds: float,
) -> None:
ps, pe = _effective_period(lic.period_start, lic.period_end)
cost = estimate_openai_cost_usd(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
audio_seconds=audio_seconds,
operation_type=operation_type,
)
insert_usage_event(
con,
lic=lic,
device_id=device_id,
period_start=ps,
period_end=pe,
operation_type=operation_type,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=total_tokens,
audio_seconds=audio_seconds,
estimated_cost_usd=cost,
request_id=request_id,
status="success",
meta={"op": operation_type},
)
def record_openai_error_event(
con: sqlite3.Connection,
lic: LicenseBudgetRow,
*,
device_id: Optional[str],
request_id: str,
model: str,
operation_type: str,
error_code: str,
meta: Optional[Dict[str, Any]] = None,
) -> None:
ps, pe = _effective_period(lic.period_start, lic.period_end)
insert_usage_event(
con,
lic=lic,
device_id=device_id,
period_start=ps,
period_end=pe,
operation_type=operation_type,
model=model,
input_tokens=0,
output_tokens=0,
total_tokens=0,
audio_seconds=0.0,
estimated_cost_usd=0.0,
request_id=request_id,
status="error",
error_code=(error_code or "openai_error")[:64],
meta=meta,
)
def ensure_ai_budget_schema(con: sqlite3.Connection) -> None:
"""Idempotent: ai_usage_events + Indizes."""
con.execute(
"""
CREATE TABLE IF NOT EXISTS ai_usage_events (
id TEXT PRIMARY KEY,
created_at INTEGER NOT NULL,
license_email TEXT,
customer_id TEXT,
subscription_id TEXT,
practice_id TEXT,
device_id_suffix TEXT,
period_start INTEGER,
period_end INTEGER,
operation_type TEXT,
model TEXT,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
audio_seconds REAL DEFAULT 0,
estimated_cost_usd REAL DEFAULT 0,
request_id TEXT,
status TEXT,
error_code TEXT,
meta_json TEXT
)
"""
)
con.execute(
"CREATE INDEX IF NOT EXISTS idx_ai_usage_sub_period "
"ON ai_usage_events(subscription_id, period_start, period_end, status)"
)
con.execute(
"CREATE INDEX IF NOT EXISTS idx_ai_usage_created ON ai_usage_events(created_at)"
)
con.commit()
@dataclass(frozen=True)
class LicenseBudgetRow:
subscription_id: str
customer_email: str
customer_id: Optional[str]
practice_id: Optional[str]
lookup_key: Optional[str]
status: str
period_start: Optional[int]
period_end: Optional[int]
def _effective_period(
period_start: Optional[int],
period_end: Optional[int],
) -> Tuple[int, int]:
"""Liefert gültiges (start, end) Unix-TS; Fallback nur wenn end bekannt."""
now = int(time.time())
pe = int(period_end) if period_end is not None else now + 86400
ps = int(period_start) if period_start is not None else max(0, pe - 3024000)
return ps, pe
def resolve_license_for_practice_id(con: sqlite3.Connection, practice_id: str) -> Optional[LicenseBudgetRow]:
"""
Ordnet eine Mandanten-practice_id der zuletzt aktualisierten aktiven Lizenz zu.
Nur nutzen, wenn practice_id aus authentifizierter Session stammt (kein Raten).
"""
pid = (practice_id or "").strip()
if not pid:
return None
lic = con.execute(
"""
SELECT subscription_id, customer_id, customer_email, practice_id, lookup_key, status,
current_period_start, current_period_end
FROM licenses
WHERE practice_id = ?
AND status = 'active'
ORDER BY updated_at DESC
LIMIT 1
""",
(pid,),
).fetchone()
if not lic:
return None
return LicenseBudgetRow(
subscription_id=str(lic[0] or ""),
customer_email=str(lic[2] or ""),
customer_id=str(lic[1]) if lic[1] else None,
practice_id=str(lic[3]).strip() if lic[3] else None,
lookup_key=str(lic[4]).strip() if lic[4] else None,
status=str(lic[5] or ""),
period_start=int(lic[6]) if lic[6] is not None else None,
period_end=int(lic[7]) if lic[7] is not None else None,
)
def resolve_license_for_empfang(
con: sqlite3.Connection,
*,
x_device_id: Optional[str],
session_practice_id: Optional[str],
) -> Optional[LicenseBudgetRow]:
"""
Lizenz für Empfang-Kontext: zuerst X-Device-Id (wie Desktop device_bindings), sonst practice_id.
Bei gesetztem Header: bei Konflikt Gerät↔Praxis (practice_id auf Lizenz ≠ Session) wird
die Geräte-Zuordnung verworfen kein Raten.
"""
hdr = (x_device_id or "").strip() or None
sp = (session_practice_id or "").strip() or None
lic_dev: Optional[LicenseBudgetRow] = None
if hdr:
lic_dev = resolve_license_for_device(con, hdr)
if lic_dev and sp:
lp = (lic_dev.practice_id or "").strip()
if lp and lp != sp:
lic_dev = None
if lic_dev:
return lic_dev
if sp:
return resolve_license_for_practice_id(con, sp)
return None
def resolve_license_for_device(con: sqlite3.Connection, device_id: Optional[str]) -> Optional[LicenseBudgetRow]:
"""Ordnet X-Device-Id über device_hash einer aktiven Lizenz zu."""
if not device_id or not str(device_id).strip():
return None
dh = _hash_device_id(str(device_id).strip())
row = con.execute(
"""
SELECT d.customer_email, d.user_key
FROM device_bindings d
WHERE d.device_hash = ? AND COALESCE(d.is_active, 1) = 1
LIMIT 1
""",
(dh,),
).fetchone()
if not row:
return None
email = str(row[0] or "").strip()
if not email:
return None
lic = con.execute(
"""
SELECT subscription_id, customer_id, customer_email, practice_id, lookup_key, status,
current_period_start, current_period_end
FROM licenses
WHERE lower(customer_email) = lower(?)
AND status = 'active'
ORDER BY updated_at DESC
LIMIT 1
""",
(email,),
).fetchone()
if not lic:
return None
return LicenseBudgetRow(
subscription_id=str(lic[0] or ""),
customer_email=str(lic[2] or email),
customer_id=str(lic[1]) if lic[1] else None,
practice_id=str(lic[3]).strip() if lic[3] else None,
lookup_key=str(lic[4]).strip() if lic[4] else None,
status=str(lic[5] or ""),
period_start=int(lic[6]) if lic[6] is not None else None,
period_end=int(lic[7]) if lic[7] is not None else None,
)
def monthly_budget_usd_for_license(_row: LicenseBudgetRow) -> float:
"""Phase 1: fester Default; später lookup_key-abhängig."""
_ = _row
return max(0.01, float(DEFAULT_MONTHLY_AI_BUDGET_USD))
def sum_usage_usd_for_period(
con: sqlite3.Connection,
subscription_id: str,
period_start: int,
period_end: int,
) -> float:
row = con.execute(
"""
SELECT COALESCE(SUM(estimated_cost_usd), 0)
FROM ai_usage_events
WHERE subscription_id = ?
AND period_start = ? AND period_end = ?
AND status = 'success'
""",
(subscription_id, period_start, period_end),
).fetchone()
return float(row[0] or 0.0)
def compute_budget_snapshot(con: sqlite3.Connection, lic: LicenseBudgetRow) -> Dict[str, Any]:
ps, pe = _effective_period(lic.period_start, lic.period_end)
budget = monthly_budget_usd_for_license(lic)
used = sum_usage_usd_for_period(con, lic.subscription_id, ps, pe)
remaining = max(0.0, budget - used)
pct = int((remaining / budget) * 100.0) if budget > 0 else 0
pct = max(0, min(100, pct))
show_warning = pct <= 20 and pct > 0
return {
"ok": True,
"active": lic.status == "active",
"budget_usd": round(budget, 2),
"used_usd": round(used, 4),
"remaining_usd": round(remaining, 4),
"available_percent": pct,
"period_start": ps,
"period_end": pe,
"subscription_id": lic.subscription_id,
"show_warning": show_warning,
"user_label": f"KI-Kontingent: {pct} % verfügbar",
}
def check_allows_openai_call(
con: sqlite3.Connection,
lic: LicenseBudgetRow,
) -> Tuple[bool, Dict[str, Any]]:
snap = compute_budget_snapshot(con, lic)
budget = float(snap["budget_usd"])
used = float(snap["used_usd"])
if used >= budget - 1e-9:
pe = int(snap["period_end"])
return False, {
"success": False,
"error_code": "AI_BUDGET_EXCEEDED",
"message_user": (
"Das monatliche KI-Kontingent dieser Lizenz ist aufgebraucht.\n\n"
"Bitte wenden Sie sich an den Praxisadministrator."
),
"available_percent": 0,
"period_end": pe,
"request_id": "",
"estimate_engine": _ESTIMATE_MARKER,
}
return True, snap
def insert_usage_event(
con: sqlite3.Connection,
*,
lic: Optional[LicenseBudgetRow],
device_id: Optional[str],
period_start: int,
period_end: int,
operation_type: str,
model: str,
input_tokens: int,
output_tokens: int,
total_tokens: int,
audio_seconds: float,
estimated_cost_usd: float,
request_id: str,
status: str,
error_code: Optional[str] = None,
meta: Optional[Dict[str, Any]] = None,
) -> str:
eid = str(uuid.uuid4())
dev_suf = ""
if device_id:
dev_suf = hashlib.sha256(str(device_id).encode("utf-8")).hexdigest()[:12]
meta_clean: Dict[str, Any] = {"engine": _ESTIMATE_MARKER}
if meta:
for k, v in meta.items():
if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
continue
meta_clean[str(k)[:64]] = v
con.execute(
"""
INSERT INTO ai_usage_events(
id, created_at, license_email, customer_id, subscription_id, practice_id, device_id_suffix,
period_start, period_end, operation_type, model,
input_tokens, output_tokens, total_tokens, audio_seconds, estimated_cost_usd,
request_id, status, error_code, meta_json
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""",
(
eid,
int(time.time()),
(lic.customer_email if lic else None),
(lic.customer_id if lic else None),
(lic.subscription_id if lic else None),
(lic.practice_id if lic else None),
dev_suf or None,
period_start,
period_end,
operation_type[:32],
(model or "")[:128],
int(input_tokens),
int(output_tokens),
int(total_tokens),
float(audio_seconds),
float(estimated_cost_usd),
(request_id or "")[:64],
status[:16],
(error_code or "")[:64] if error_code else None,
json.dumps(meta_clean, ensure_ascii=False)[:4000],
),
)
con.commit()
return eid
def budget_json_for_client(snap: Dict[str, Any], *, include_operator_fields: bool = False) -> Dict[str, Any]:
"""Antwort für Desktop (nur Prozent) oder Admin (+ USD)."""
base = {
"ok": bool(snap.get("ok", True)),
"active": bool(snap.get("active", False)),
"available_percent": int(snap.get("available_percent", 0)),
"period_end": int(snap.get("period_end", 0)),
"show_warning": bool(snap.get("show_warning", False)),
"user_label": snap.get("user_label") or "",
}
if include_operator_fields:
base["budget_usd"] = snap.get("budget_usd")
base["used_usd"] = snap.get("used_usd")
base["remaining_usd"] = snap.get("remaining_usd")
base["period_start"] = snap.get("period_start")
base["subscription_id"] = snap.get("subscription_id")
return base