import hashlib import sqlite3 import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple DEVICES_PER_LICENSE_FLOOR = 2 def _runtime_base_dir() -> Path: if getattr(sys, "frozen", False): return Path(getattr(sys, "_MEIPASS", Path(sys.executable).resolve().parent)) return Path(__file__).resolve().parent def _resolve_db_path() -> str: if getattr(sys, "frozen", False): import os as _os try: from aza_config import get_writable_data_dir writable = Path(get_writable_data_dir()) / "data" except Exception: writable = Path(_os.environ.get("APPDATA", "")) / "AZA Desktop" / "data" writable.mkdir(parents=True, exist_ok=True) return str(writable / "stripe_webhook.sqlite") return str(_runtime_base_dir() / "data" / "stripe_webhook.sqlite") DB_PATH = _resolve_db_path() @dataclass(frozen=True) class DeviceDecision: allowed: bool reason: str # "ok"|"missing_device_id"|"license_not_found"|"device_limit_reached"|"user_limit_reached" devices_used: int devices_allowed: int users_used: int users_allowed: int license_active: bool = False license_count: int = 0 def _hash_device_id(device_id: str) -> str: return hashlib.sha256(device_id.encode("utf-8")).hexdigest() def ensure_device_table(conn: sqlite3.Connection) -> None: conn.execute( """ CREATE TABLE IF NOT EXISTS device_bindings ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_email TEXT NOT NULL, user_key TEXT NOT NULL, device_hash TEXT NOT NULL, first_seen_at INTEGER NOT NULL, last_seen_at INTEGER NOT NULL, UNIQUE(customer_email, user_key, device_hash) ); """ ) cols = [row[1] for row in conn.execute("PRAGMA table_info(device_bindings)").fetchall()] if "device_name" not in cols: conn.execute("ALTER TABLE device_bindings ADD COLUMN device_name TEXT DEFAULT ''") if "is_active" not in cols: conn.execute("ALTER TABLE device_bindings ADD COLUMN is_active INTEGER DEFAULT 1") if "app_version" not in cols: conn.execute("ALTER TABLE device_bindings ADD COLUMN app_version TEXT DEFAULT ''") if "device_fingerprint" not in cols: conn.execute("ALTER TABLE device_bindings ADD COLUMN device_fingerprint TEXT DEFAULT ''") conn.commit() def _count_active_licenses(conn: sqlite3.Connection, customer_email: str) -> Tuple[int, int]: """Returns (n_active_licenses, total_allowed_devices). Multiple active licenses for the same email stack: 1 license = 2 devices, 2 licenses = 4 devices, etc. """ cur = conn.execute( """SELECT COUNT(*), COALESCE(SUM( CASE WHEN devices_per_user IS NULL OR devices_per_user < ? THEN ? ELSE devices_per_user END ), 0) FROM licenses WHERE lower(customer_email) = lower(?) AND status = 'active'""", (DEVICES_PER_LICENSE_FLOOR, DEVICES_PER_LICENSE_FLOOR, customer_email), ) row = cur.fetchone() if not row or int(row[0]) == 0: return (0, 0) return (int(row[0]), int(row[1])) def _get_license_row(conn: sqlite3.Connection, customer_email: str) -> Optional[Tuple[int, int]]: """Backward-compat wrapper. Returns (allowed_users, total_devices).""" n_licenses, total_devices = _count_active_licenses(conn, customer_email) if n_licenses == 0: return None return (1, total_devices) def enforce_and_touch_device( customer_email: str, user_key: str, device_id: Optional[str], db_path: Optional[str] = None, device_name: str = "", app_version: str = "", device_fingerprint: str = "", ) -> DeviceDecision: """Enforce device limit for a customer email. Rules (in order): 1. device_hash match -> allow, update last_seen + store fingerprint 2. device_fingerprint match -> rebind hash (same HW, new install) 3. Legacy hostname match -> rebind hash + store fingerprint (migration) 4. Under limit -> register new device 5. At/over limit -> deny """ if not device_id: print("[DEVICE-ENFORCE] REJECT: missing device_id") return DeviceDecision( allowed=False, reason="missing_device_id", devices_used=0, devices_allowed=0, users_used=0, users_allowed=0, ) device_hash = _hash_device_id(device_id) now = int(time.time()) print(f"[DEVICE-ENFORCE] enforce email={customer_email} " f"hash={device_hash[:12]}... name={device_name} " f"fp={device_fingerprint[:12] + '...' if device_fingerprint else 'none'}") conn = sqlite3.connect(db_path or DB_PATH) try: ensure_device_table(conn) n_licenses, total_devices = _count_active_licenses(conn, customer_email) if n_licenses == 0: print(f"[DEVICE-ENFORCE] no active license for {customer_email}") return DeviceDecision( allowed=False, reason="license_not_found", devices_used=0, devices_allowed=0, users_used=0, users_allowed=0, license_active=False, license_count=0, ) cur_devices = conn.execute( """SELECT COUNT(*) FROM device_bindings WHERE lower(customer_email) = lower(?) AND user_key = ? AND COALESCE(is_active, 1) = 1""", (customer_email, user_key), ) used_devices = int(cur_devices.fetchone()[0]) cur_users = conn.execute( "SELECT COUNT(DISTINCT user_key) FROM device_bindings WHERE lower(customer_email) = lower(?)", (customer_email,), ) users_used = int(cur_users.fetchone()[0]) _ok = DeviceDecision( allowed=True, reason="ok", devices_used=used_devices, devices_allowed=total_devices, users_used=users_used, users_allowed=1, license_active=True, license_count=n_licenses, ) # --- Step 1: exact device_hash match (normal case) --- cur = conn.execute( """SELECT id FROM device_bindings WHERE lower(customer_email) = lower(?) AND user_key = ? AND device_hash = ? LIMIT 1""", (customer_email, user_key, device_hash), ) existing = cur.fetchone() if existing: conn.execute( """UPDATE device_bindings SET last_seen_at = ?, device_name = COALESCE(NULLIF(?, ''), device_name), app_version = COALESCE(NULLIF(?, ''), app_version), device_fingerprint = COALESCE(NULLIF(?, ''), device_fingerprint), is_active = 1 WHERE id = ?""", (now, device_name, app_version, device_fingerprint, int(existing[0])), ) conn.commit() print(f"[DEVICE-ENFORCE] step1 hash-match id={existing[0]} -> allowed") return _ok # --- Step 2: fingerprint match (reinstall, new device_id, same HW) --- if device_fingerprint: fp_row = conn.execute( """SELECT id FROM device_bindings WHERE lower(customer_email) = lower(?) AND user_key = ? AND device_fingerprint = ? AND device_fingerprint != '' AND COALESCE(is_active, 1) = 1 LIMIT 1""", (customer_email, user_key, device_fingerprint), ).fetchone() if fp_row: conn.execute( """UPDATE device_bindings SET device_hash = ?, last_seen_at = ?, device_name = COALESCE(NULLIF(?, ''), device_name), app_version = COALESCE(NULLIF(?, ''), app_version), is_active = 1 WHERE id = ?""", (device_hash, now, device_name, app_version, int(fp_row[0])), ) conn.commit() print(f"[DEVICE-ENFORCE] step2 fingerprint-rebind id={fp_row[0]} -> allowed") return _ok # --- Step 3: legacy hostname match (old entry without fingerprint) --- if device_fingerprint and device_name: legacy_row = conn.execute( """SELECT id, device_hash FROM device_bindings WHERE lower(customer_email) = lower(?) AND user_key = ? AND (device_fingerprint IS NULL OR device_fingerprint = '') AND device_name = ? AND COALESCE(is_active, 1) = 1 LIMIT 1""", (customer_email, user_key, device_name), ).fetchone() if legacy_row: rebind_id = int(legacy_row[0]) conn.execute( """UPDATE device_bindings SET device_hash = ?, device_fingerprint = ?, last_seen_at = ?, app_version = COALESCE(NULLIF(?, ''), app_version), is_active = 1 WHERE id = ?""", (device_hash, device_fingerprint, now, app_version, rebind_id), ) stale = conn.execute( """UPDATE device_bindings SET is_active = 0 WHERE lower(customer_email) = lower(?) AND user_key = ? AND device_name = ? AND (device_fingerprint IS NULL OR device_fingerprint = '') AND id != ? AND COALESCE(is_active, 1) = 1""", (customer_email, user_key, device_name, rebind_id), ) stale_count = stale.rowcount conn.commit() used_after = used_devices - stale_count print(f"[DEVICE-ENFORCE] step3 legacy-hostname-rebind id={rebind_id} " f"old_hash={legacy_row[1][:12]}... stale_deactivated={stale_count} " f"slots={used_after}/{total_devices} -> allowed") return DeviceDecision( allowed=True, reason="ok", devices_used=max(1, used_after), devices_allowed=total_devices, users_used=users_used, users_allowed=1, license_active=True, license_count=n_licenses, ) # --- Step 4: new device -- check limit --- if used_devices >= total_devices: print(f"[DEVICE-ENFORCE] step4 limit-reached {used_devices}/{total_devices} " f"name={device_name} fp={device_fingerprint[:12] if device_fingerprint else 'none'}") return DeviceDecision( allowed=False, reason="device_limit_reached", devices_used=used_devices, devices_allowed=total_devices, users_used=users_used, users_allowed=1, license_active=True, license_count=n_licenses, ) # --- Step 5: register new device --- conn.execute( """INSERT INTO device_bindings (customer_email, user_key, device_hash, first_seen_at, last_seen_at, device_name, is_active, app_version, device_fingerprint) VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?)""", (customer_email, user_key, device_hash, now, now, device_name, app_version, device_fingerprint), ) conn.commit() print(f"[DEVICE-ENFORCE] step5 new-device registered name={device_name} " f"{used_devices + 1}/{total_devices}") return DeviceDecision( allowed=True, reason="ok", devices_used=used_devices + 1, devices_allowed=total_devices, users_used=users_used if used_devices > 0 else users_used + 1, users_allowed=1, license_active=True, license_count=n_licenses, ) finally: conn.close() def list_devices_for_email( customer_email: str, db_path: Optional[str] = None, ) -> Dict[str, Any]: """Admin/debug: list all registered devices and license info for an email.""" conn = sqlite3.connect(db_path or DB_PATH) try: ensure_device_table(conn) n_licenses, total_devices = _count_active_licenses(conn, customer_email) rows = conn.execute( """SELECT device_hash, device_name, COALESCE(is_active, 1), COALESCE(app_version, ''), first_seen_at, last_seen_at FROM device_bindings WHERE lower(customer_email) = lower(?) ORDER BY last_seen_at DESC""", (customer_email,), ).fetchall() devices: List[Dict[str, Any]] = [] for r in rows: devices.append({ "device_hash_short": (r[0] or "")[:12] + "…", "device_name": r[1] or "", "is_active": bool(r[2]), "app_version": r[3] or "", "first_seen": r[4], "last_seen": r[5], }) return { "email": customer_email, "active_licenses": n_licenses, "allowed_devices": total_devices, "registered_devices": len(devices), "active_devices": sum(1 for d in devices if d["is_active"]), "devices": devices, } finally: conn.close()