Files
aza/AzA march 2026/aza_ai_budget.py

654 lines
21 KiB
Python
Raw Permalink Normal View History

2026-05-20 00:09:28 +02:00
# -*- coding: utf-8 -*-
"""
Serverseitiges KI-Budget (Phase 1): SQLite-Events, Schätzkosten, Gating pro Subscription-Periode.
Keine Secrets loggen. Alle Beträge sind Schätzungen (estimated_cost_usd).
"""
from __future__ import annotations
import hashlib
import json
import os
2026-05-23 21:31:34 +02:00
import re
2026-05-20 00:09:28 +02:00
import sqlite3
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
2026-05-23 21:31:34 +02:00
# Default: 20 USD = 100 % pro Stripe-Abrechnungsperiode (ENV AZA_AI_BUDGET_USD_DEFAULT hat Vorrang)
DEFAULT_MONTHLY_AI_BUDGET_USD: float = float(os.environ.get("AZA_AI_BUDGET_USD_DEFAULT", "20.0"))
2026-05-20 00:09:28 +02:00
# Konservative Schätzung; später pro lookup_key überschreibbar
_ESTIMATE_MARKER = "openai_estimate_v1"
# USD pro 1M Tokens (Input / Output) zentrale Konstanten, keine Streuung im Code
# Werte sind bewusst konservativ platziert; Anpassung nur hier oder via ENV-Overrides (optional später).
_MODEL_USD_PER_1M_INPUT: Dict[str, float] = {
"gpt-4o-mini": 0.15,
"gpt-4o": 2.5,
"gpt-5.2": 5.0,
"gpt-5-mini": 1.0,
"gpt-5-nano": 0.3,
"gpt-4o-mini-transcribe": 3.0, # Näherung Audio-Modell
"whisper-1": 1.0,
}
_MODEL_USD_PER_1M_OUTPUT: Dict[str, float] = {
"gpt-4o-mini": 0.60,
"gpt-4o": 10.0,
"gpt-5.2": 15.0,
"gpt-5-mini": 4.0,
"gpt-5-nano": 1.2,
"gpt-4o-mini-transcribe": 0.0,
"whisper-1": 0.0,
}
# Transkription: USD pro Minute (wenn nur Audio-Länge bekannt)
_TRANSCRIBE_USD_PER_MINUTE: Dict[str, float] = {
"gpt-4o-mini-transcribe": 0.012,
"whisper-1": 0.006,
"default": 0.012,
}
def _hash_device_id(device_id: str) -> str:
return hashlib.sha256(device_id.encode("utf-8")).hexdigest()
2026-05-23 21:31:34 +02:00
# OpenAI liefert oft datierte Modell-IDs (z. B. gpt-4o-mini-2024-07-18) nur Datum am Ende strippen.
_MODEL_DATE_SUFFIX_RE = re.compile(r"-\d{4}-\d{2}-\d{2}$")
2026-05-20 00:09:28 +02:00
def _norm_model(model: str) -> str:
m = (model or "").strip().lower()
if m.startswith("gpt-") and "-transcribe" in m:
return "gpt-4o-mini-transcribe"
2026-05-23 21:31:34 +02:00
if _MODEL_DATE_SUFFIX_RE.search(m):
base = _MODEL_DATE_SUFFIX_RE.sub("", m)
if base in _MODEL_USD_PER_1M_INPUT:
return base
2026-05-20 00:09:28 +02:00
return m
def estimate_openai_cost_usd(
*,
model: str,
input_tokens: int = 0,
output_tokens: int = 0,
audio_seconds: float = 0.0,
operation_type: str = "other",
) -> float:
"""
Einheitliche Kostenschätzung (USD). Immer konservativ; echte Abrechnung kann abweichen.
"""
_ = operation_type
m = _norm_model(model)
in_rate = _MODEL_USD_PER_1M_INPUT.get(m)
out_rate = _MODEL_USD_PER_1M_OUTPUT.get(m)
if in_rate is None:
# Unbekanntes Modell: konservativ nach „teuerem“ Chat
in_rate = 5.0
out_rate = 15.0
if out_rate is None:
out_rate = 0.0
cost = (max(0, int(input_tokens)) / 1_000_000.0) * in_rate
cost += (max(0, int(output_tokens)) / 1_000_000.0) * out_rate
if audio_seconds > 0:
per_min = _TRANSCRIBE_USD_PER_MINUTE.get(m) or _TRANSCRIBE_USD_PER_MINUTE["default"]
cost += (max(0.0, float(audio_seconds)) / 60.0) * per_min
return round(max(0.0, cost), 6)
def estimate_audio_seconds_for_transcription(
*,
byte_size: int,
file_path: Optional[str] = None,
suffix: str = "",
) -> float:
"""
Dauer in Sekunden für Transkriptions-Kosten (Schätzung).
- WAV: echte Länge via stdlib ``wave``, wenn lesbar.
- Sonst: konservative Byte-Heuristik (wie bisher ~ bytes/10000), gekappt 37200 s.
"""
suf = (suffix or "").lower()
fp = (file_path or "").strip()
if fp and suf in (".wav", ".wave"):
try:
import wave
with wave.open(fp, "rb") as wf:
frames = wf.getnframes()
rate = wf.getframerate() or 1
sec = frames / float(rate)
if sec > 0:
return max(3.0, min(7200.0, float(sec)))
except Exception:
pass
est = max(3.0, min(7200.0, int(byte_size) / 10000.0))
return float(est)
def budget_gate_blocked_payload_or_none(
con: sqlite3.Connection,
lic: LicenseBudgetRow,
*,
device_id: Optional[str],
request_id: str,
operation_type: str,
model: str,
gate_meta: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[str, Any]]:
"""
None = Aufruf erlaubt. Sonst Dict für HTTP 402 (Content), inkl. blocked usage_event.
"""
ok, info = check_allows_openai_call(con, lic)
if ok:
return None
2026-05-23 21:31:34 +02:00
pid = (lic.practice_id or "").strip()
if pid:
try:
from aza_ai_credit import _auto_topup_enabled_env, maybe_run_auto_topup_for_practice
snap = compute_budget_snapshot(con, lic)
pct = int(snap.get("available_percent", 0))
if _auto_topup_enabled_env():
auto = maybe_run_auto_topup_for_practice(
con,
practice_id=pid,
available_percent=pct,
dry_run=False,
subscription_id=lic.subscription_id,
customer_email=lic.customer_email,
)
if auto.get("charged"):
ok2, _ = check_allows_openai_call(con, lic)
if ok2:
return None
if auto.get("reason") == "auto_topup_monthly_limit_reached":
if isinstance(info, dict):
info = dict(info)
else:
info = {"success": False, "error_code": "AI_BUDGET_EXCEEDED"}
info["auto_topup_failed"] = True
info["error_code"] = "AUTO_TOPUP_MONTHLY_LIMIT_REACHED"
info["message_user"] = auto.get("message_user") or (
"Das Monatslimit für automatische Aufladungen ist erreicht. "
"Sie können weiterhin manuell Zusatzguthaben kaufen."
)
elif auto.get("paused") or auto.get("reason") == "payment_failed":
if isinstance(info, dict):
info = dict(info)
else:
info = {"success": False, "error_code": "AI_BUDGET_EXCEEDED"}
info["auto_topup_failed"] = True
info["message_user"] = (
"Auto-Aufladung konnte nicht durchgeführt werden. "
"Bitte laden Sie manuell Zusatzguthaben auf."
)
except Exception:
pass
2026-05-20 00:09:28 +02:00
if not isinstance(info, dict):
info = {"success": False, "error_code": "AI_BUDGET_EXCEEDED"}
detail = dict(info)
detail["request_id"] = request_id
ps, pe = _effective_period(lic.period_start, lic.period_end)
meta = dict(gate_meta or {})
meta["gate"] = operation_type
insert_usage_event(
con,
lic=lic,
device_id=device_id,
period_start=ps,
period_end=pe,
operation_type=operation_type,
model=model,
input_tokens=0,
output_tokens=0,
total_tokens=0,
audio_seconds=0.0,
estimated_cost_usd=0.0,
request_id=request_id,
status="blocked",
error_code="AI_BUDGET_EXCEEDED",
meta=meta,
)
return detail
def record_success_after_openai(
con: sqlite3.Connection,
lic: LicenseBudgetRow,
*,
device_id: Optional[str],
request_id: str,
model: str,
operation_type: str,
input_tokens: int,
output_tokens: int,
total_tokens: int,
audio_seconds: float,
) -> None:
ps, pe = _effective_period(lic.period_start, lic.period_end)
cost = estimate_openai_cost_usd(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
audio_seconds=audio_seconds,
operation_type=operation_type,
)
insert_usage_event(
con,
lic=lic,
device_id=device_id,
period_start=ps,
period_end=pe,
operation_type=operation_type,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=total_tokens,
audio_seconds=audio_seconds,
estimated_cost_usd=cost,
request_id=request_id,
status="success",
meta={"op": operation_type},
)
def record_openai_error_event(
con: sqlite3.Connection,
lic: LicenseBudgetRow,
*,
device_id: Optional[str],
request_id: str,
model: str,
operation_type: str,
error_code: str,
meta: Optional[Dict[str, Any]] = None,
) -> None:
ps, pe = _effective_period(lic.period_start, lic.period_end)
insert_usage_event(
con,
lic=lic,
device_id=device_id,
period_start=ps,
period_end=pe,
operation_type=operation_type,
model=model,
input_tokens=0,
output_tokens=0,
total_tokens=0,
audio_seconds=0.0,
estimated_cost_usd=0.0,
request_id=request_id,
status="error",
error_code=(error_code or "openai_error")[:64],
meta=meta,
)
def ensure_ai_budget_schema(con: sqlite3.Connection) -> None:
"""Idempotent: ai_usage_events + Indizes."""
con.execute(
"""
CREATE TABLE IF NOT EXISTS ai_usage_events (
id TEXT PRIMARY KEY,
created_at INTEGER NOT NULL,
license_email TEXT,
customer_id TEXT,
subscription_id TEXT,
practice_id TEXT,
device_id_suffix TEXT,
period_start INTEGER,
period_end INTEGER,
operation_type TEXT,
model TEXT,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
audio_seconds REAL DEFAULT 0,
estimated_cost_usd REAL DEFAULT 0,
request_id TEXT,
status TEXT,
error_code TEXT,
meta_json TEXT
)
"""
)
con.execute(
"CREATE INDEX IF NOT EXISTS idx_ai_usage_sub_period "
"ON ai_usage_events(subscription_id, period_start, period_end, status)"
)
con.execute(
"CREATE INDEX IF NOT EXISTS idx_ai_usage_created ON ai_usage_events(created_at)"
)
con.commit()
@dataclass(frozen=True)
class LicenseBudgetRow:
subscription_id: str
customer_email: str
customer_id: Optional[str]
practice_id: Optional[str]
lookup_key: Optional[str]
status: str
period_start: Optional[int]
period_end: Optional[int]
def _effective_period(
period_start: Optional[int],
period_end: Optional[int],
) -> Tuple[int, int]:
"""Liefert gültiges (start, end) Unix-TS; Fallback nur wenn end bekannt."""
now = int(time.time())
pe = int(period_end) if period_end is not None else now + 86400
ps = int(period_start) if period_start is not None else max(0, pe - 3024000)
return ps, pe
def resolve_license_for_practice_id(con: sqlite3.Connection, practice_id: str) -> Optional[LicenseBudgetRow]:
"""
Ordnet eine Mandanten-practice_id der zuletzt aktualisierten aktiven Lizenz zu.
Nur nutzen, wenn practice_id aus authentifizierter Session stammt (kein Raten).
"""
pid = (practice_id or "").strip()
if not pid:
return None
lic = con.execute(
"""
SELECT subscription_id, customer_id, customer_email, practice_id, lookup_key, status,
current_period_start, current_period_end
FROM licenses
WHERE practice_id = ?
AND status = 'active'
ORDER BY updated_at DESC
LIMIT 1
""",
(pid,),
).fetchone()
if not lic:
return None
return LicenseBudgetRow(
subscription_id=str(lic[0] or ""),
customer_email=str(lic[2] or ""),
customer_id=str(lic[1]) if lic[1] else None,
practice_id=str(lic[3]).strip() if lic[3] else None,
lookup_key=str(lic[4]).strip() if lic[4] else None,
status=str(lic[5] or ""),
period_start=int(lic[6]) if lic[6] is not None else None,
period_end=int(lic[7]) if lic[7] is not None else None,
)
def resolve_license_for_empfang(
con: sqlite3.Connection,
*,
x_device_id: Optional[str],
session_practice_id: Optional[str],
) -> Optional[LicenseBudgetRow]:
"""
2026-05-23 21:31:34 +02:00
Lizenz für Empfang/Desktop: bei gesetzter session_practice_id zuerst die Practice-Zeile,
danach Device/E-Mail-Fallback. Device-Lizenz mit abweichender practice_id wird nicht
genutzt, wenn eine Practice-ID angefordert wurde.
2026-05-20 00:09:28 +02:00
"""
hdr = (x_device_id or "").strip() or None
sp = (session_practice_id or "").strip() or None
2026-05-23 21:31:34 +02:00
if sp:
lic_practice = resolve_license_for_practice_id(con, sp)
if lic_practice:
return lic_practice
2026-05-20 00:09:28 +02:00
if hdr:
lic_dev = resolve_license_for_device(con, hdr)
2026-05-23 21:31:34 +02:00
if lic_dev:
return lic_dev
2026-05-20 00:09:28 +02:00
return None
def resolve_license_for_device(con: sqlite3.Connection, device_id: Optional[str]) -> Optional[LicenseBudgetRow]:
"""Ordnet X-Device-Id über device_hash einer aktiven Lizenz zu."""
if not device_id or not str(device_id).strip():
return None
dh = _hash_device_id(str(device_id).strip())
row = con.execute(
"""
SELECT d.customer_email, d.user_key
FROM device_bindings d
WHERE d.device_hash = ? AND COALESCE(d.is_active, 1) = 1
LIMIT 1
""",
(dh,),
).fetchone()
if not row:
return None
email = str(row[0] or "").strip()
if not email:
return None
lic = con.execute(
"""
SELECT subscription_id, customer_id, customer_email, practice_id, lookup_key, status,
current_period_start, current_period_end
FROM licenses
WHERE lower(customer_email) = lower(?)
AND status = 'active'
ORDER BY updated_at DESC
LIMIT 1
""",
(email,),
).fetchone()
if not lic:
return None
return LicenseBudgetRow(
subscription_id=str(lic[0] or ""),
customer_email=str(lic[2] or email),
customer_id=str(lic[1]) if lic[1] else None,
practice_id=str(lic[3]).strip() if lic[3] else None,
lookup_key=str(lic[4]).strip() if lic[4] else None,
status=str(lic[5] or ""),
period_start=int(lic[6]) if lic[6] is not None else None,
period_end=int(lic[7]) if lic[7] is not None else None,
)
def monthly_budget_usd_for_license(_row: LicenseBudgetRow) -> float:
"""Phase 1: fester Default; später lookup_key-abhängig."""
_ = _row
return max(0.01, float(DEFAULT_MONTHLY_AI_BUDGET_USD))
def sum_usage_usd_for_period(
con: sqlite3.Connection,
subscription_id: str,
period_start: int,
period_end: int,
) -> float:
row = con.execute(
"""
SELECT COALESCE(SUM(estimated_cost_usd), 0)
FROM ai_usage_events
WHERE subscription_id = ?
AND period_start = ? AND period_end = ?
AND status = 'success'
""",
(subscription_id, period_start, period_end),
).fetchone()
return float(row[0] or 0.0)
def compute_budget_snapshot(con: sqlite3.Connection, lic: LicenseBudgetRow) -> Dict[str, Any]:
ps, pe = _effective_period(lic.period_start, lic.period_end)
budget = monthly_budget_usd_for_license(lic)
used = sum_usage_usd_for_period(con, lic.subscription_id, ps, pe)
remaining = max(0.0, budget - used)
pct = int((remaining / budget) * 100.0) if budget > 0 else 0
pct = max(0, min(100, pct))
show_warning = pct <= 20 and pct > 0
2026-05-23 21:31:34 +02:00
base = {
2026-05-20 00:09:28 +02:00
"ok": True,
"active": lic.status == "active",
"budget_usd": round(budget, 2),
"used_usd": round(used, 4),
"remaining_usd": round(remaining, 4),
"available_percent": pct,
"period_start": ps,
"period_end": pe,
"subscription_id": lic.subscription_id,
2026-05-23 21:31:34 +02:00
"practice_id": lic.practice_id,
2026-05-20 00:09:28 +02:00
"show_warning": show_warning,
"user_label": f"KI-Kontingent: {pct} % verfügbar",
}
2026-05-23 21:31:34 +02:00
try:
from aza_ai_credit import (
apply_extra_credit_to_snapshot,
compute_extra_credit_remaining,
ensure_ai_credit_schema,
)
ensure_ai_credit_schema(con)
pid = (lic.practice_id or "").strip()
if pid:
extra = compute_extra_credit_remaining(
con,
practice_id=pid,
subscription_id=lic.subscription_id,
monthly_budget_usd=budget,
)
return apply_extra_credit_to_snapshot(base, extra_remaining=extra, monthly_budget=budget)
except Exception:
pass
return base
2026-05-20 00:09:28 +02:00
def check_allows_openai_call(
con: sqlite3.Connection,
lic: LicenseBudgetRow,
) -> Tuple[bool, Dict[str, Any]]:
snap = compute_budget_snapshot(con, lic)
budget = float(snap["budget_usd"])
used = float(snap["used_usd"])
2026-05-23 21:31:34 +02:00
monthly_remaining = max(0.0, budget - used)
extra_remaining = float(snap.get("extra_credit_remaining_usd") or 0.0)
if monthly_remaining + extra_remaining <= 1e-9:
2026-05-20 00:09:28 +02:00
pe = int(snap["period_end"])
return False, {
"success": False,
"error_code": "AI_BUDGET_EXCEEDED",
"message_user": (
2026-05-23 21:31:34 +02:00
"Ihr KI-Guthaben ist aufgebraucht. Sie können Zusatzguthaben kaufen, "
"um sofort weiterzuarbeiten."
2026-05-20 00:09:28 +02:00
),
"available_percent": 0,
"period_end": pe,
"request_id": "",
"estimate_engine": _ESTIMATE_MARKER,
}
return True, snap
def insert_usage_event(
con: sqlite3.Connection,
*,
lic: Optional[LicenseBudgetRow],
device_id: Optional[str],
period_start: int,
period_end: int,
operation_type: str,
model: str,
input_tokens: int,
output_tokens: int,
total_tokens: int,
audio_seconds: float,
estimated_cost_usd: float,
request_id: str,
status: str,
error_code: Optional[str] = None,
meta: Optional[Dict[str, Any]] = None,
) -> str:
eid = str(uuid.uuid4())
dev_suf = ""
if device_id:
dev_suf = hashlib.sha256(str(device_id).encode("utf-8")).hexdigest()[:12]
meta_clean: Dict[str, Any] = {"engine": _ESTIMATE_MARKER}
if meta:
for k, v in meta.items():
if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
continue
meta_clean[str(k)[:64]] = v
con.execute(
"""
INSERT INTO ai_usage_events(
id, created_at, license_email, customer_id, subscription_id, practice_id, device_id_suffix,
period_start, period_end, operation_type, model,
input_tokens, output_tokens, total_tokens, audio_seconds, estimated_cost_usd,
request_id, status, error_code, meta_json
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""",
(
eid,
int(time.time()),
(lic.customer_email if lic else None),
(lic.customer_id if lic else None),
(lic.subscription_id if lic else None),
(lic.practice_id if lic else None),
dev_suf or None,
period_start,
period_end,
operation_type[:32],
(model or "")[:128],
int(input_tokens),
int(output_tokens),
int(total_tokens),
float(audio_seconds),
float(estimated_cost_usd),
(request_id or "")[:64],
status[:16],
(error_code or "")[:64] if error_code else None,
json.dumps(meta_clean, ensure_ascii=False)[:4000],
),
)
con.commit()
return eid
def budget_json_for_client(snap: Dict[str, Any], *, include_operator_fields: bool = False) -> Dict[str, Any]:
"""Antwort für Desktop (nur Prozent) oder Admin (+ USD)."""
2026-05-23 21:31:34 +02:00
pe = int(snap.get("period_end", 0) or 0)
renewal_de = ""
if pe > 0:
try:
from datetime import datetime, timezone
renewal_de = (
datetime.fromtimestamp(pe, tz=timezone.utc)
.astimezone()
.strftime("%d.%m.%Y")
)
except Exception:
renewal_de = ""
2026-05-20 00:09:28 +02:00
base = {
"ok": bool(snap.get("ok", True)),
"active": bool(snap.get("active", False)),
"available_percent": int(snap.get("available_percent", 0)),
2026-05-23 21:31:34 +02:00
"period_end": pe,
"renewal_date_de": renewal_de,
2026-05-20 00:09:28 +02:00
"show_warning": bool(snap.get("show_warning", False)),
"user_label": snap.get("user_label") or "",
}
if include_operator_fields:
base["budget_usd"] = snap.get("budget_usd")
base["used_usd"] = snap.get("used_usd")
base["remaining_usd"] = snap.get("remaining_usd")
2026-05-23 21:31:34 +02:00
base["extra_credit_remaining_usd"] = snap.get("extra_credit_remaining_usd")
base["total_available_usd"] = snap.get("total_available_usd")
base["extra_credit_active"] = snap.get("extra_credit_active")
2026-05-20 00:09:28 +02:00
base["period_start"] = snap.get("period_start")
base["subscription_id"] = snap.get("subscription_id")
2026-05-23 21:31:34 +02:00
base["practice_id"] = snap.get("practice_id")
2026-05-20 00:09:28 +02:00
return base