import hashlib import sqlite3 import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple DEVICES_PER_LICENSE_FLOOR = 2 def _runtime_base_dir() -> Path: if getattr(sys, "frozen", False): return Path(getattr(sys, "_MEIPASS", Path(sys.executable).resolve().parent)) return Path(__file__).resolve().parent def _resolve_db_path() -> str: if getattr(sys, "frozen", False): import os as _os try: from aza_config import get_writable_data_dir writable = Path(get_writable_data_dir()) / "data" except Exception: writable = Path(_os.environ.get("APPDATA", "")) / "AZA Desktop" / "data" writable.mkdir(parents=True, exist_ok=True) return str(writable / "stripe_webhook.sqlite") return str(_runtime_base_dir() / "data" / "stripe_webhook.sqlite") DB_PATH = _resolve_db_path() @dataclass(frozen=True) class DeviceDecision: allowed: bool reason: str # "ok"|"missing_device_id"|"license_not_found"|"device_limit_reached"|"user_limit_reached" devices_used: int devices_allowed: int users_used: int users_allowed: int license_active: bool = False license_count: int = 0 def _hash_device_id(device_id: str) -> str: return hashlib.sha256(device_id.encode("utf-8")).hexdigest() def ensure_device_table(conn: sqlite3.Connection) -> None: conn.execute( """ CREATE TABLE IF NOT EXISTS device_bindings ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_email TEXT NOT NULL, user_key TEXT NOT NULL, device_hash TEXT NOT NULL, first_seen_at INTEGER NOT NULL, last_seen_at INTEGER NOT NULL, UNIQUE(customer_email, user_key, device_hash) ); """ ) cols = [row[1] for row in conn.execute("PRAGMA table_info(device_bindings)").fetchall()] if "device_name" not in cols: conn.execute("ALTER TABLE device_bindings ADD COLUMN device_name TEXT DEFAULT ''") if "is_active" not in cols: conn.execute("ALTER TABLE device_bindings ADD COLUMN is_active INTEGER DEFAULT 1") if "app_version" not in cols: conn.execute("ALTER TABLE device_bindings ADD COLUMN app_version TEXT DEFAULT ''") conn.commit() def _count_active_licenses(conn: sqlite3.Connection, customer_email: str) -> Tuple[int, int]: """Returns (n_active_licenses, total_allowed_devices). Multiple active licenses for the same email stack: 1 license = 2 devices, 2 licenses = 4 devices, etc. """ cur = conn.execute( """SELECT COUNT(*), COALESCE(SUM( CASE WHEN devices_per_user IS NULL OR devices_per_user < ? THEN ? ELSE devices_per_user END ), 0) FROM licenses WHERE lower(customer_email) = lower(?) AND status = 'active'""", (DEVICES_PER_LICENSE_FLOOR, DEVICES_PER_LICENSE_FLOOR, customer_email), ) row = cur.fetchone() if not row or int(row[0]) == 0: return (0, 0) return (int(row[0]), int(row[1])) def _get_license_row(conn: sqlite3.Connection, customer_email: str) -> Optional[Tuple[int, int]]: """Backward-compat wrapper. Returns (allowed_users, total_devices).""" n_licenses, total_devices = _count_active_licenses(conn, customer_email) if n_licenses == 0: return None return (1, total_devices) def enforce_and_touch_device( customer_email: str, user_key: str, device_id: Optional[str], db_path: Optional[str] = None, device_name: str = "", app_version: str = "", ) -> DeviceDecision: """Enforce device limit for a customer email. Rules: - License not active → deny - device_id already known → allow, update last_seen - New device + under limit → register + allow - New device + at/over limit → deny (device_limit_reached) """ if not device_id: return DeviceDecision( allowed=False, reason="missing_device_id", devices_used=0, devices_allowed=0, users_used=0, users_allowed=0, ) device_hash = _hash_device_id(device_id) now = int(time.time()) conn = sqlite3.connect(db_path or DB_PATH) try: ensure_device_table(conn) n_licenses, total_devices = _count_active_licenses(conn, customer_email) if n_licenses == 0: return DeviceDecision( allowed=False, reason="license_not_found", devices_used=0, devices_allowed=0, users_used=0, users_allowed=0, license_active=False, license_count=0, ) cur_devices = conn.execute( """SELECT COUNT(*) FROM device_bindings WHERE lower(customer_email) = lower(?) AND user_key = ? AND COALESCE(is_active, 1) = 1""", (customer_email, user_key), ) used_devices = int(cur_devices.fetchone()[0]) cur_users = conn.execute( "SELECT COUNT(DISTINCT user_key) FROM device_bindings WHERE lower(customer_email) = lower(?)", (customer_email,), ) users_used = int(cur_users.fetchone()[0]) # Is this device already bound? cur = conn.execute( """SELECT id FROM device_bindings WHERE lower(customer_email) = lower(?) AND user_key = ? AND device_hash = ? LIMIT 1""", (customer_email, user_key, device_hash), ) existing = cur.fetchone() if existing: conn.execute( """UPDATE device_bindings SET last_seen_at = ?, device_name = COALESCE(NULLIF(?, ''), device_name), app_version = COALESCE(NULLIF(?, ''), app_version), is_active = 1 WHERE id = ?""", (now, device_name, app_version, int(existing[0])), ) conn.commit() return DeviceDecision( allowed=True, reason="ok", devices_used=used_devices, devices_allowed=total_devices, users_used=users_used, users_allowed=1, license_active=True, license_count=n_licenses, ) # New device — check limit if used_devices >= total_devices: return DeviceDecision( allowed=False, reason="device_limit_reached", devices_used=used_devices, devices_allowed=total_devices, users_used=users_used, users_allowed=1, license_active=True, license_count=n_licenses, ) # Register new device conn.execute( """INSERT INTO device_bindings (customer_email, user_key, device_hash, first_seen_at, last_seen_at, device_name, is_active, app_version) VALUES (?, ?, ?, ?, ?, ?, 1, ?)""", (customer_email, user_key, device_hash, now, now, device_name, app_version), ) conn.commit() return DeviceDecision( allowed=True, reason="ok", devices_used=used_devices + 1, devices_allowed=total_devices, users_used=users_used if used_devices > 0 else users_used + 1, users_allowed=1, license_active=True, license_count=n_licenses, ) finally: conn.close() def list_devices_for_email( customer_email: str, db_path: Optional[str] = None, ) -> Dict[str, Any]: """Admin/debug: list all registered devices and license info for an email.""" conn = sqlite3.connect(db_path or DB_PATH) try: ensure_device_table(conn) n_licenses, total_devices = _count_active_licenses(conn, customer_email) rows = conn.execute( """SELECT device_hash, device_name, COALESCE(is_active, 1), COALESCE(app_version, ''), first_seen_at, last_seen_at FROM device_bindings WHERE lower(customer_email) = lower(?) ORDER BY last_seen_at DESC""", (customer_email,), ).fetchall() devices: List[Dict[str, Any]] = [] for r in rows: devices.append({ "device_hash_short": (r[0] or "")[:12] + "…", "device_name": r[1] or "", "is_active": bool(r[2]), "app_version": r[3] or "", "first_seen": r[4], "last_seen": r[5], }) return { "email": customer_email, "active_licenses": n_licenses, "allowed_devices": total_devices, "registered_devices": len(devices), "active_devices": sum(1 for d in devices if d["is_active"]), "devices": devices, } finally: conn.close()