258 lines
8.5 KiB
Python
258 lines
8.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Zentrale KI-Budget-Zuordnung: Gate, Practice-Fallback, keine Doppeltzählung (ohne Netzwerk)."""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sqlite3
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from aza_ai_budget import (
|
|
_norm_model,
|
|
compute_budget_snapshot,
|
|
ensure_ai_budget_schema,
|
|
estimate_openai_cost_usd,
|
|
insert_usage_event,
|
|
resolve_license_for_empfang,
|
|
sum_usage_usd_for_period,
|
|
)
|
|
|
|
|
|
def _mk_db(path: Path) -> None:
|
|
now = 1_700_000_000
|
|
con = sqlite3.connect(str(path))
|
|
con.execute(
|
|
"""
|
|
CREATE TABLE device_bindings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
customer_email TEXT NOT NULL,
|
|
user_key TEXT NOT NULL,
|
|
device_hash TEXT NOT NULL,
|
|
first_seen_at INTEGER NOT NULL,
|
|
last_seen_at INTEGER NOT NULL,
|
|
is_active INTEGER DEFAULT 1,
|
|
UNIQUE(customer_email, user_key, device_hash)
|
|
)
|
|
"""
|
|
)
|
|
con.execute(
|
|
"""
|
|
CREATE TABLE licenses (
|
|
subscription_id TEXT PRIMARY KEY,
|
|
customer_id TEXT,
|
|
status TEXT,
|
|
lookup_key TEXT,
|
|
allowed_users INTEGER,
|
|
devices_per_user INTEGER,
|
|
customer_email TEXT,
|
|
client_reference_id TEXT,
|
|
current_period_end INTEGER,
|
|
current_period_start INTEGER,
|
|
updated_at INTEGER NOT NULL,
|
|
license_key TEXT,
|
|
practice_id TEXT
|
|
)
|
|
"""
|
|
)
|
|
con.execute(
|
|
"""
|
|
INSERT INTO licenses(subscription_id, customer_id, status, lookup_key, allowed_users, devices_per_user,
|
|
customer_email, client_reference_id, current_period_end, current_period_start, updated_at, license_key, practice_id)
|
|
VALUES ('sub_c', 'cus_x', 'active', 'aza_basic_monthly', 1, 2,
|
|
'central@example.test', NULL, ?, ?, ?, 'KEY', 'prac_central')
|
|
""",
|
|
(now + 86400 * 30, now, now),
|
|
)
|
|
con.commit()
|
|
con.close()
|
|
|
|
|
|
class TestModelNormalizationAndBudgetDefault(unittest.TestCase):
|
|
def test_norm_dated_gpt_4o_mini(self):
|
|
self.assertEqual(_norm_model("gpt-4o-mini-2024-07-18"), "gpt-4o-mini")
|
|
|
|
def test_dated_gpt_4o_mini_uses_cheap_price(self):
|
|
cost = estimate_openai_cost_usd(
|
|
model="gpt-4o-mini-2024-07-18",
|
|
input_tokens=1_000_000,
|
|
output_tokens=0,
|
|
)
|
|
self.assertAlmostEqual(cost, 0.15, places=6)
|
|
self.assertEqual(
|
|
cost,
|
|
estimate_openai_cost_usd(
|
|
model="gpt-4o-mini", input_tokens=1_000_000, output_tokens=0
|
|
),
|
|
)
|
|
|
|
def test_gpt_4o_mini_transcribe_unchanged(self):
|
|
self.assertEqual(_norm_model("gpt-4o-mini-transcribe"), "gpt-4o-mini-transcribe")
|
|
cost = estimate_openai_cost_usd(
|
|
model="gpt-4o-mini-transcribe", audio_seconds=60.0
|
|
)
|
|
self.assertAlmostEqual(cost, 0.012, places=6)
|
|
|
|
def test_unknown_model_stays_conservative_fallback(self):
|
|
cost = estimate_openai_cost_usd(
|
|
model="totally-unknown-model-xyz",
|
|
input_tokens=1_000_000,
|
|
output_tokens=0,
|
|
)
|
|
self.assertAlmostEqual(cost, 5.0, places=6)
|
|
|
|
def test_env_overrides_budget_default(self):
|
|
import importlib
|
|
import aza_ai_budget as mod
|
|
|
|
key = "AZA_AI_BUDGET_USD_DEFAULT"
|
|
old = os.environ.get(key)
|
|
try:
|
|
os.environ[key] = "42.5"
|
|
importlib.reload(mod)
|
|
self.assertEqual(mod.DEFAULT_MONTHLY_AI_BUDGET_USD, 42.5)
|
|
finally:
|
|
if old is None:
|
|
os.environ.pop(key, None)
|
|
else:
|
|
os.environ[key] = old
|
|
importlib.reload(mod)
|
|
|
|
|
|
class TestBackendGatePracticeFallback(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.NamedTemporaryFile(suffix=".sqlite", delete=False)
|
|
self.tmp.close()
|
|
self.db_path = Path(self.tmp.name)
|
|
_mk_db(self.db_path)
|
|
|
|
def tearDown(self):
|
|
try:
|
|
os.unlink(self.db_path)
|
|
except OSError:
|
|
pass
|
|
|
|
def test_gate_blocks_without_mapping_when_practice_given(self):
|
|
import backend_main as bm
|
|
|
|
with patch("backend_main._stripe_db_path", return_value=self.db_path):
|
|
resp = bm._gate_ai_budget_or_none(
|
|
db_path=self.db_path,
|
|
device_id=None,
|
|
practice_id="prac_unknown",
|
|
request_id="t_gate",
|
|
operation_type="chat",
|
|
model="gpt-4o-mini",
|
|
)
|
|
self.assertIsNotNone(resp)
|
|
self.assertEqual(resp.status_code, 402)
|
|
|
|
def test_gate_allows_with_practice_only(self):
|
|
import backend_main as bm
|
|
|
|
with patch("backend_main._stripe_db_path", return_value=self.db_path):
|
|
resp = bm._gate_ai_budget_or_none(
|
|
db_path=self.db_path,
|
|
device_id=None,
|
|
practice_id="prac_central",
|
|
request_id="t_ok",
|
|
operation_type="chat",
|
|
model="gpt-4o-mini",
|
|
)
|
|
self.assertIsNone(resp)
|
|
|
|
def test_record_success_one_event_per_request(self):
|
|
import backend_main as bm
|
|
|
|
with patch("backend_main._stripe_db_path", return_value=self.db_path):
|
|
bm._record_ai_budget_success(
|
|
device_id=None,
|
|
practice_id="prac_central",
|
|
request_id="req_one",
|
|
model="gpt-4o-mini",
|
|
operation_type="chat",
|
|
input_tokens=1000,
|
|
output_tokens=200,
|
|
total_tokens=1200,
|
|
audio_seconds=0.0,
|
|
)
|
|
bm._record_ai_budget_success(
|
|
device_id=None,
|
|
practice_id="prac_central",
|
|
request_id="req_two",
|
|
model="gpt-4o-mini",
|
|
operation_type="transcription",
|
|
input_tokens=0,
|
|
output_tokens=0,
|
|
total_tokens=0,
|
|
audio_seconds=30.0,
|
|
)
|
|
with sqlite3.connect(str(self.db_path)) as con:
|
|
n = con.execute(
|
|
"SELECT COUNT(*) FROM ai_usage_events WHERE status='success'"
|
|
).fetchone()[0]
|
|
self.assertEqual(n, 2)
|
|
|
|
def test_admin_snapshot_aggregates_chat_and_transcription(self):
|
|
with sqlite3.connect(str(self.db_path)) as con:
|
|
ensure_ai_budget_schema(con)
|
|
lic = resolve_license_for_empfang(
|
|
con, x_device_id=None, session_practice_id="prac_central"
|
|
)
|
|
self.assertIsNotNone(lic)
|
|
ps, pe = int(lic.period_start or 0), int(lic.period_end or 0)
|
|
insert_usage_event(
|
|
con,
|
|
lic=lic,
|
|
device_id=None,
|
|
period_start=ps,
|
|
period_end=pe,
|
|
operation_type="chat",
|
|
model="gpt-4o-mini",
|
|
input_tokens=100,
|
|
output_tokens=50,
|
|
total_tokens=150,
|
|
audio_seconds=0.0,
|
|
estimated_cost_usd=0.05,
|
|
request_id="a",
|
|
status="success",
|
|
)
|
|
insert_usage_event(
|
|
con,
|
|
lic=lic,
|
|
device_id=None,
|
|
period_start=ps,
|
|
period_end=pe,
|
|
operation_type="transcription",
|
|
model="whisper-1",
|
|
input_tokens=0,
|
|
output_tokens=0,
|
|
total_tokens=0,
|
|
audio_seconds=60.0,
|
|
estimated_cost_usd=0.08,
|
|
request_id="b",
|
|
status="success",
|
|
)
|
|
used = sum_usage_usd_for_period(con, lic.subscription_id, ps, pe)
|
|
snap = compute_budget_snapshot(con, lic)
|
|
self.assertGreater(used, 0.12)
|
|
self.assertLess(snap["available_percent"], 100)
|
|
|
|
|
|
class TestEmpfangChatDoesNotTouchBudget(unittest.TestCase):
|
|
def test_budget_helpers_only_in_transcribe_budgeted(self):
|
|
import inspect
|
|
import empfang_routes as er
|
|
|
|
src = inspect.getsource(er._empfang_transcribe_openai_budgeted)
|
|
self.assertIn("record_success_after_openai", src)
|
|
self.assertIn("budget_gate_blocked_payload_or_none", src)
|
|
mod_src = Path(er.__file__).read_text(encoding="utf-8", errors="replace")
|
|
before_budget_fn = mod_src.split("def _empfang_transcribe_openai_budgeted")[0]
|
|
self.assertNotIn("record_success_after_openai", before_budget_fn)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|