# -*- coding: utf-8 -*- """ Serverseitiges KI-Budget (Phase 1): SQLite-Events, Schätzkosten, Gating pro Subscription-Periode. Keine Secrets loggen. Alle Beträge sind Schätzungen (estimated_cost_usd). """ from __future__ import annotations import hashlib import json import os import re import sqlite3 import time import uuid from dataclasses import dataclass from typing import Any, Dict, Optional, Tuple # Default: 20 USD = 100 % pro Stripe-Abrechnungsperiode (ENV AZA_AI_BUDGET_USD_DEFAULT hat Vorrang) DEFAULT_MONTHLY_AI_BUDGET_USD: float = float(os.environ.get("AZA_AI_BUDGET_USD_DEFAULT", "20.0")) # Konservative Schätzung; später pro lookup_key überschreibbar _ESTIMATE_MARKER = "openai_estimate_v1" # USD pro 1M Tokens (Input / Output) – zentrale Konstanten, keine Streuung im Code # Werte sind bewusst konservativ platziert; Anpassung nur hier oder via ENV-Overrides (optional später). _MODEL_USD_PER_1M_INPUT: Dict[str, float] = { "gpt-4o-mini": 0.15, "gpt-4o": 2.5, "gpt-5.2": 5.0, "gpt-5-mini": 1.0, "gpt-5-nano": 0.3, "gpt-4o-mini-transcribe": 3.0, # Näherung Audio-Modell "whisper-1": 1.0, } _MODEL_USD_PER_1M_OUTPUT: Dict[str, float] = { "gpt-4o-mini": 0.60, "gpt-4o": 10.0, "gpt-5.2": 15.0, "gpt-5-mini": 4.0, "gpt-5-nano": 1.2, "gpt-4o-mini-transcribe": 0.0, "whisper-1": 0.0, } # Transkription: USD pro Minute (wenn nur Audio-Länge bekannt) _TRANSCRIBE_USD_PER_MINUTE: Dict[str, float] = { "gpt-4o-mini-transcribe": 0.012, "whisper-1": 0.006, "default": 0.012, } def _hash_device_id(device_id: str) -> str: return hashlib.sha256(device_id.encode("utf-8")).hexdigest() # OpenAI liefert oft datierte Modell-IDs (z. B. gpt-4o-mini-2024-07-18) – nur Datum am Ende strippen. _MODEL_DATE_SUFFIX_RE = re.compile(r"-\d{4}-\d{2}-\d{2}$") def _norm_model(model: str) -> str: m = (model or "").strip().lower() if m.startswith("gpt-") and "-transcribe" in m: return "gpt-4o-mini-transcribe" if _MODEL_DATE_SUFFIX_RE.search(m): base = _MODEL_DATE_SUFFIX_RE.sub("", m) if base in _MODEL_USD_PER_1M_INPUT: return base return m def estimate_openai_cost_usd( *, model: str, input_tokens: int = 0, output_tokens: int = 0, audio_seconds: float = 0.0, operation_type: str = "other", ) -> float: """ Einheitliche Kostenschätzung (USD). Immer konservativ; echte Abrechnung kann abweichen. """ _ = operation_type m = _norm_model(model) in_rate = _MODEL_USD_PER_1M_INPUT.get(m) out_rate = _MODEL_USD_PER_1M_OUTPUT.get(m) if in_rate is None: # Unbekanntes Modell: konservativ nach „teuerem“ Chat in_rate = 5.0 out_rate = 15.0 if out_rate is None: out_rate = 0.0 cost = (max(0, int(input_tokens)) / 1_000_000.0) * in_rate cost += (max(0, int(output_tokens)) / 1_000_000.0) * out_rate if audio_seconds > 0: per_min = _TRANSCRIBE_USD_PER_MINUTE.get(m) or _TRANSCRIBE_USD_PER_MINUTE["default"] cost += (max(0.0, float(audio_seconds)) / 60.0) * per_min return round(max(0.0, cost), 6) def estimate_audio_seconds_for_transcription( *, byte_size: int, file_path: Optional[str] = None, suffix: str = "", ) -> float: """ Dauer in Sekunden für Transkriptions-Kosten (Schätzung). - WAV: echte Länge via stdlib ``wave``, wenn lesbar. - Sonst: konservative Byte-Heuristik (wie bisher ~ bytes/10000), gekappt 3…7200 s. """ suf = (suffix or "").lower() fp = (file_path or "").strip() if fp and suf in (".wav", ".wave"): try: import wave with wave.open(fp, "rb") as wf: frames = wf.getnframes() rate = wf.getframerate() or 1 sec = frames / float(rate) if sec > 0: return max(3.0, min(7200.0, float(sec))) except Exception: pass est = max(3.0, min(7200.0, int(byte_size) / 10000.0)) return float(est) def budget_gate_blocked_payload_or_none( con: sqlite3.Connection, lic: LicenseBudgetRow, *, device_id: Optional[str], request_id: str, operation_type: str, model: str, gate_meta: Optional[Dict[str, Any]] = None, ) -> Optional[Dict[str, Any]]: """ None = Aufruf erlaubt. Sonst Dict für HTTP 402 (Content), inkl. blocked usage_event. """ ok, info = check_allows_openai_call(con, lic) if ok: return None pid = (lic.practice_id or "").strip() if pid: try: from aza_ai_credit import _auto_topup_enabled_env, maybe_run_auto_topup_for_practice snap = compute_budget_snapshot(con, lic) pct = int(snap.get("available_percent", 0)) if _auto_topup_enabled_env(): auto = maybe_run_auto_topup_for_practice( con, practice_id=pid, available_percent=pct, dry_run=False, subscription_id=lic.subscription_id, customer_email=lic.customer_email, ) if auto.get("charged"): ok2, _ = check_allows_openai_call(con, lic) if ok2: return None if auto.get("reason") == "auto_topup_monthly_limit_reached": if isinstance(info, dict): info = dict(info) else: info = {"success": False, "error_code": "AI_BUDGET_EXCEEDED"} info["auto_topup_failed"] = True info["error_code"] = "AUTO_TOPUP_MONTHLY_LIMIT_REACHED" info["message_user"] = auto.get("message_user") or ( "Das Monatslimit für automatische Aufladungen ist erreicht. " "Sie können weiterhin manuell Zusatzguthaben kaufen." ) elif auto.get("paused") or auto.get("reason") == "payment_failed": if isinstance(info, dict): info = dict(info) else: info = {"success": False, "error_code": "AI_BUDGET_EXCEEDED"} info["auto_topup_failed"] = True info["message_user"] = ( "Auto-Aufladung konnte nicht durchgeführt werden. " "Bitte laden Sie manuell Zusatzguthaben auf." ) except Exception: pass if not isinstance(info, dict): info = {"success": False, "error_code": "AI_BUDGET_EXCEEDED"} detail = dict(info) detail["request_id"] = request_id ps, pe = _effective_period(lic.period_start, lic.period_end) meta = dict(gate_meta or {}) meta["gate"] = operation_type insert_usage_event( con, lic=lic, device_id=device_id, period_start=ps, period_end=pe, operation_type=operation_type, model=model, input_tokens=0, output_tokens=0, total_tokens=0, audio_seconds=0.0, estimated_cost_usd=0.0, request_id=request_id, status="blocked", error_code="AI_BUDGET_EXCEEDED", meta=meta, ) return detail def record_success_after_openai( con: sqlite3.Connection, lic: LicenseBudgetRow, *, device_id: Optional[str], request_id: str, model: str, operation_type: str, input_tokens: int, output_tokens: int, total_tokens: int, audio_seconds: float, ) -> None: ps, pe = _effective_period(lic.period_start, lic.period_end) cost = estimate_openai_cost_usd( model=model, input_tokens=input_tokens, output_tokens=output_tokens, audio_seconds=audio_seconds, operation_type=operation_type, ) insert_usage_event( con, lic=lic, device_id=device_id, period_start=ps, period_end=pe, operation_type=operation_type, model=model, input_tokens=input_tokens, output_tokens=output_tokens, total_tokens=total_tokens, audio_seconds=audio_seconds, estimated_cost_usd=cost, request_id=request_id, status="success", meta={"op": operation_type}, ) def record_openai_error_event( con: sqlite3.Connection, lic: LicenseBudgetRow, *, device_id: Optional[str], request_id: str, model: str, operation_type: str, error_code: str, meta: Optional[Dict[str, Any]] = None, ) -> None: ps, pe = _effective_period(lic.period_start, lic.period_end) insert_usage_event( con, lic=lic, device_id=device_id, period_start=ps, period_end=pe, operation_type=operation_type, model=model, input_tokens=0, output_tokens=0, total_tokens=0, audio_seconds=0.0, estimated_cost_usd=0.0, request_id=request_id, status="error", error_code=(error_code or "openai_error")[:64], meta=meta, ) def ensure_ai_budget_schema(con: sqlite3.Connection) -> None: """Idempotent: ai_usage_events + Indizes.""" con.execute( """ CREATE TABLE IF NOT EXISTS ai_usage_events ( id TEXT PRIMARY KEY, created_at INTEGER NOT NULL, license_email TEXT, customer_id TEXT, subscription_id TEXT, practice_id TEXT, device_id_suffix TEXT, period_start INTEGER, period_end INTEGER, operation_type TEXT, model TEXT, input_tokens INTEGER DEFAULT 0, output_tokens INTEGER DEFAULT 0, total_tokens INTEGER DEFAULT 0, audio_seconds REAL DEFAULT 0, estimated_cost_usd REAL DEFAULT 0, request_id TEXT, status TEXT, error_code TEXT, meta_json TEXT ) """ ) con.execute( "CREATE INDEX IF NOT EXISTS idx_ai_usage_sub_period " "ON ai_usage_events(subscription_id, period_start, period_end, status)" ) con.execute( "CREATE INDEX IF NOT EXISTS idx_ai_usage_created ON ai_usage_events(created_at)" ) con.commit() @dataclass(frozen=True) class LicenseBudgetRow: subscription_id: str customer_email: str customer_id: Optional[str] practice_id: Optional[str] lookup_key: Optional[str] status: str period_start: Optional[int] period_end: Optional[int] def _effective_period( period_start: Optional[int], period_end: Optional[int], ) -> Tuple[int, int]: """Liefert gültiges (start, end) Unix-TS; Fallback nur wenn end bekannt.""" now = int(time.time()) pe = int(period_end) if period_end is not None else now + 86400 ps = int(period_start) if period_start is not None else max(0, pe - 3024000) return ps, pe def resolve_license_for_practice_id(con: sqlite3.Connection, practice_id: str) -> Optional[LicenseBudgetRow]: """ Ordnet eine Mandanten-practice_id der zuletzt aktualisierten aktiven Lizenz zu. Nur nutzen, wenn practice_id aus authentifizierter Session stammt (kein Raten). """ pid = (practice_id or "").strip() if not pid: return None lic = con.execute( """ SELECT subscription_id, customer_id, customer_email, practice_id, lookup_key, status, current_period_start, current_period_end FROM licenses WHERE practice_id = ? AND status = 'active' ORDER BY updated_at DESC LIMIT 1 """, (pid,), ).fetchone() if not lic: return None return LicenseBudgetRow( subscription_id=str(lic[0] or ""), customer_email=str(lic[2] or ""), customer_id=str(lic[1]) if lic[1] else None, practice_id=str(lic[3]).strip() if lic[3] else None, lookup_key=str(lic[4]).strip() if lic[4] else None, status=str(lic[5] or ""), period_start=int(lic[6]) if lic[6] is not None else None, period_end=int(lic[7]) if lic[7] is not None else None, ) def resolve_license_for_empfang( con: sqlite3.Connection, *, x_device_id: Optional[str], session_practice_id: Optional[str], ) -> Optional[LicenseBudgetRow]: """ Lizenz für Empfang/Desktop: bei gesetzter session_practice_id zuerst die Practice-Zeile, danach Device/E-Mail-Fallback. Device-Lizenz mit abweichender practice_id wird nicht genutzt, wenn eine Practice-ID angefordert wurde. """ hdr = (x_device_id or "").strip() or None sp = (session_practice_id or "").strip() or None if sp: lic_practice = resolve_license_for_practice_id(con, sp) if lic_practice: return lic_practice if hdr: lic_dev = resolve_license_for_device(con, hdr) if lic_dev: return lic_dev return None def resolve_license_for_device(con: sqlite3.Connection, device_id: Optional[str]) -> Optional[LicenseBudgetRow]: """Ordnet X-Device-Id über device_hash einer aktiven Lizenz zu.""" if not device_id or not str(device_id).strip(): return None dh = _hash_device_id(str(device_id).strip()) row = con.execute( """ SELECT d.customer_email, d.user_key FROM device_bindings d WHERE d.device_hash = ? AND COALESCE(d.is_active, 1) = 1 LIMIT 1 """, (dh,), ).fetchone() if not row: return None email = str(row[0] or "").strip() if not email: return None lic = con.execute( """ SELECT subscription_id, customer_id, customer_email, practice_id, lookup_key, status, current_period_start, current_period_end FROM licenses WHERE lower(customer_email) = lower(?) AND status = 'active' ORDER BY updated_at DESC LIMIT 1 """, (email,), ).fetchone() if not lic: return None return LicenseBudgetRow( subscription_id=str(lic[0] or ""), customer_email=str(lic[2] or email), customer_id=str(lic[1]) if lic[1] else None, practice_id=str(lic[3]).strip() if lic[3] else None, lookup_key=str(lic[4]).strip() if lic[4] else None, status=str(lic[5] or ""), period_start=int(lic[6]) if lic[6] is not None else None, period_end=int(lic[7]) if lic[7] is not None else None, ) def monthly_budget_usd_for_license(_row: LicenseBudgetRow) -> float: """Phase 1: fester Default; später lookup_key-abhängig.""" _ = _row return max(0.01, float(DEFAULT_MONTHLY_AI_BUDGET_USD)) def sum_usage_usd_for_period( con: sqlite3.Connection, subscription_id: str, period_start: int, period_end: int, ) -> float: row = con.execute( """ SELECT COALESCE(SUM(estimated_cost_usd), 0) FROM ai_usage_events WHERE subscription_id = ? AND period_start = ? AND period_end = ? AND status = 'success' """, (subscription_id, period_start, period_end), ).fetchone() return float(row[0] or 0.0) def compute_budget_snapshot(con: sqlite3.Connection, lic: LicenseBudgetRow) -> Dict[str, Any]: ps, pe = _effective_period(lic.period_start, lic.period_end) budget = monthly_budget_usd_for_license(lic) used = sum_usage_usd_for_period(con, lic.subscription_id, ps, pe) remaining = max(0.0, budget - used) pct = int((remaining / budget) * 100.0) if budget > 0 else 0 pct = max(0, min(100, pct)) show_warning = pct <= 20 and pct > 0 base = { "ok": True, "active": lic.status == "active", "budget_usd": round(budget, 2), "used_usd": round(used, 4), "remaining_usd": round(remaining, 4), "available_percent": pct, "period_start": ps, "period_end": pe, "subscription_id": lic.subscription_id, "practice_id": lic.practice_id, "show_warning": show_warning, "user_label": f"KI-Kontingent: {pct} % verfügbar", } try: from aza_ai_credit import ( apply_extra_credit_to_snapshot, compute_extra_credit_remaining, ensure_ai_credit_schema, ) ensure_ai_credit_schema(con) pid = (lic.practice_id or "").strip() if pid: extra = compute_extra_credit_remaining( con, practice_id=pid, subscription_id=lic.subscription_id, monthly_budget_usd=budget, ) return apply_extra_credit_to_snapshot(base, extra_remaining=extra, monthly_budget=budget) except Exception: pass return base def check_allows_openai_call( con: sqlite3.Connection, lic: LicenseBudgetRow, ) -> Tuple[bool, Dict[str, Any]]: snap = compute_budget_snapshot(con, lic) budget = float(snap["budget_usd"]) used = float(snap["used_usd"]) monthly_remaining = max(0.0, budget - used) extra_remaining = float(snap.get("extra_credit_remaining_usd") or 0.0) if monthly_remaining + extra_remaining <= 1e-9: pe = int(snap["period_end"]) return False, { "success": False, "error_code": "AI_BUDGET_EXCEEDED", "message_user": ( "Ihr KI-Guthaben ist aufgebraucht. Sie können Zusatzguthaben kaufen, " "um sofort weiterzuarbeiten." ), "available_percent": 0, "period_end": pe, "request_id": "", "estimate_engine": _ESTIMATE_MARKER, } return True, snap def insert_usage_event( con: sqlite3.Connection, *, lic: Optional[LicenseBudgetRow], device_id: Optional[str], period_start: int, period_end: int, operation_type: str, model: str, input_tokens: int, output_tokens: int, total_tokens: int, audio_seconds: float, estimated_cost_usd: float, request_id: str, status: str, error_code: Optional[str] = None, meta: Optional[Dict[str, Any]] = None, ) -> str: eid = str(uuid.uuid4()) dev_suf = "" if device_id: dev_suf = hashlib.sha256(str(device_id).encode("utf-8")).hexdigest()[:12] meta_clean: Dict[str, Any] = {"engine": _ESTIMATE_MARKER} if meta: for k, v in meta.items(): if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower(): continue meta_clean[str(k)[:64]] = v con.execute( """ INSERT INTO ai_usage_events( id, created_at, license_email, customer_id, subscription_id, practice_id, device_id_suffix, period_start, period_end, operation_type, model, input_tokens, output_tokens, total_tokens, audio_seconds, estimated_cost_usd, request_id, status, error_code, meta_json ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) """, ( eid, int(time.time()), (lic.customer_email if lic else None), (lic.customer_id if lic else None), (lic.subscription_id if lic else None), (lic.practice_id if lic else None), dev_suf or None, period_start, period_end, operation_type[:32], (model or "")[:128], int(input_tokens), int(output_tokens), int(total_tokens), float(audio_seconds), float(estimated_cost_usd), (request_id or "")[:64], status[:16], (error_code or "")[:64] if error_code else None, json.dumps(meta_clean, ensure_ascii=False)[:4000], ), ) con.commit() return eid def budget_json_for_client(snap: Dict[str, Any], *, include_operator_fields: bool = False) -> Dict[str, Any]: """Antwort für Desktop (nur Prozent) oder Admin (+ USD).""" pe = int(snap.get("period_end", 0) or 0) renewal_de = "" if pe > 0: try: from datetime import datetime, timezone renewal_de = ( datetime.fromtimestamp(pe, tz=timezone.utc) .astimezone() .strftime("%d.%m.%Y") ) except Exception: renewal_de = "" base = { "ok": bool(snap.get("ok", True)), "active": bool(snap.get("active", False)), "available_percent": int(snap.get("available_percent", 0)), "period_end": pe, "renewal_date_de": renewal_de, "show_warning": bool(snap.get("show_warning", False)), "user_label": snap.get("user_label") or "", } if include_operator_fields: base["budget_usd"] = snap.get("budget_usd") base["used_usd"] = snap.get("used_usd") base["remaining_usd"] = snap.get("remaining_usd") base["extra_credit_remaining_usd"] = snap.get("extra_credit_remaining_usd") base["total_available_usd"] = snap.get("total_available_usd") base["extra_credit_active"] = snap.get("extra_credit_active") base["period_start"] = snap.get("period_start") base["subscription_id"] = snap.get("subscription_id") base["practice_id"] = snap.get("practice_id") return base