Files
aza/AzA march 2026/aza_chat_device_capacity.py
2026-06-13 22:47:31 +02:00

220 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
"""Chat-Gerätekapazität: +5 Geräte pro aktiver AzA-Office-Lizenz (praxisbezogen).
Office-Geräte-Enforcement bleibt in aza_device_enforcement.py unverändert getrennt.
Dieses Modul liefert testbare Hilfsfunktionen für Limit-Berechnung und Geräte-Klassifikation.
"""
from __future__ import annotations
import sqlite3
from typing import Any, Iterable, Mapping, Optional
CHAT_DEVICES_PER_OFFICE_LICENSE = 5
DEVICE_SCOPE_OFFICE = "office"
DEVICE_SCOPE_CHAT = "chat"
def is_office_license_lookup_key(lookup_key: Optional[str]) -> bool:
"""True wenn die Lizenz Office umfasst (kein reines Chat-only-Produkt)."""
try:
from empfang_routes import _entitlements_from_lookup_key
office_ok, _chat_ok = _entitlements_from_lookup_key(lookup_key)
return bool(office_ok)
except Exception:
lk = (lookup_key or "").strip().lower()
if not lk:
return True
chat_only_markers = (
"chat_only", "chat-only", "aza_chat_only", "empfang_only",
"minichat_only", "chatonly",
)
return not any(m in lk for m in chat_only_markers)
def is_active_license_status(status: Optional[str]) -> bool:
return (status or "").strip().lower() == "active"
def count_active_office_licenses_for_practice(
license_rows: Iterable[Mapping[str, Any]],
practice_id: str,
) -> int:
pid = (practice_id or "").strip()
if not pid:
return 0
n = 0
for row in license_rows:
if not is_active_license_status(row.get("status")):
continue
if str(row.get("practice_id") or "").strip() != pid:
continue
if not is_office_license_lookup_key(row.get("lookup_key")):
continue
n += 1
return n
def chat_device_limit_for_practice(
license_rows: Iterable[Mapping[str, Any]],
practice_id: str,
) -> int:
return (
count_active_office_licenses_for_practice(license_rows, practice_id)
* CHAT_DEVICES_PER_OFFICE_LICENSE
)
def infer_device_scope(device_name: str = "", app_version: str = "") -> str:
"""Klassifiziert ein Gerät als chat oder office anhand bekannter App-/Gerätenamen."""
blob = f"{device_name or ''} {app_version or ''}".lower()
chat_markers = (
"empfangshell", "aza_empfangshell", "empfang",
"kontaktpanel", "aza_kontaktpanel", "kontakt",
"browser", "chat", "mobile", "tablet", "webchat",
)
for marker in chat_markers:
if marker in blob:
return DEVICE_SCOPE_CHAT
return DEVICE_SCOPE_OFFICE
def count_devices_by_scope(
devices_dict: Mapping[str, Any],
*,
practice_id: str = "",
scope: str = DEVICE_SCOPE_CHAT,
) -> int:
pid = (practice_id or "").strip()
n = 0
for d in devices_dict.values():
if not isinstance(d, dict):
continue
if pid and str(d.get("practice_id") or "").strip() != pid:
continue
name = str(d.get("device_name") or d.get("name") or "")
app_v = str(d.get("app_version") or d.get("app") or "")
explicit = str(d.get("device_scope") or d.get("device_category") or "").strip().lower()
if explicit in (DEVICE_SCOPE_CHAT, DEVICE_SCOPE_OFFICE):
dev_scope = explicit
else:
dev_scope = infer_device_scope(name, app_v)
if dev_scope == scope:
n += 1
return n
def count_active_office_licenses_sqlite(
conn: sqlite3.Connection,
practice_id: str,
) -> int:
pid = (practice_id or "").strip()
if not pid:
return 0
try:
rows = conn.execute(
"""
SELECT lookup_key FROM licenses
WHERE lower(trim(coalesce(status, ''))) = 'active'
AND trim(coalesce(practice_id, '')) = ?
""",
(pid,),
).fetchall()
except sqlite3.Error:
return 0
return sum(1 for (lk,) in rows if is_office_license_lookup_key(lk))
def count_chat_devices_sqlite(conn: sqlite3.Connection, practice_id: str) -> int:
pid = (practice_id or "").strip()
if not pid:
return 0
try:
cur = conn.execute(
"""
SELECT COUNT(*) FROM device_bindings
WHERE user_key = ?
AND lower(coalesce(device_scope, 'office')) = 'chat'
AND COALESCE(is_active, 1) = 1
""",
(pid,),
)
return int(cur.fetchone()[0])
except sqlite3.Error:
return 0
def count_office_devices_sqlite(
conn: sqlite3.Connection,
*,
customer_email: str = "",
user_key: str = "",
) -> int:
clauses = ["COALESCE(is_active, 1) = 1", "lower(coalesce(device_scope, 'office')) = 'office'"]
params: list[Any] = []
if customer_email:
clauses.append("lower(customer_email) = lower(?)")
params.append(customer_email)
if user_key:
clauses.append("user_key = ?")
params.append(user_key)
try:
cur = conn.execute(
f"SELECT COUNT(*) FROM device_bindings WHERE {' AND '.join(clauses)}",
params,
)
return int(cur.fetchone()[0])
except sqlite3.Error:
return 0
def practice_chat_capacity_from_db(db_path: str, practice_id: str) -> dict[str, Any]:
"""Read-only Chat-Kapazität aus SQLite (Backend / Kontroll-Hülle)."""
pid = (practice_id or "").strip()
if not pid:
return {
"practice_id": "",
"contributing_office_licenses": 0,
"chat_devices_per_license": CHAT_DEVICES_PER_OFFICE_LICENSE,
"chat_device_limit": 0,
"chat_devices_used": 0,
}
conn = sqlite3.connect(db_path)
try:
n_lic = count_active_office_licenses_sqlite(conn, pid)
chat_used = count_chat_devices_sqlite(conn, pid)
return {
"practice_id": pid,
"contributing_office_licenses": n_lic,
"chat_devices_per_license": CHAT_DEVICES_PER_OFFICE_LICENSE,
"chat_device_limit": n_lic * CHAT_DEVICES_PER_OFFICE_LICENSE,
"chat_devices_used": chat_used,
}
finally:
conn.close()
def practice_chat_capacity_snapshot(
license_rows: Iterable[Mapping[str, Any]],
devices_dict: Mapping[str, Any],
practice_id: str,
) -> dict[str, Any]:
"""Read-only Kapazitätsübersicht für eine Praxis."""
n_lic = count_active_office_licenses_for_practice(license_rows, practice_id)
chat_limit = n_lic * CHAT_DEVICES_PER_OFFICE_LICENSE
chat_used = count_devices_by_scope(
devices_dict, practice_id=practice_id, scope=DEVICE_SCOPE_CHAT,
)
office_used = count_devices_by_scope(
devices_dict, practice_id=practice_id, scope=DEVICE_SCOPE_OFFICE,
)
return {
"practice_id": (practice_id or "").strip(),
"contributing_office_licenses": n_lic,
"chat_devices_per_license": CHAT_DEVICES_PER_OFFICE_LICENSE,
"chat_device_limit": chat_limit,
"chat_devices_used": chat_used,
"office_devices_used": office_used,
}