Files
aza/AzA march 2026 - Kopie (6)/aza_device_enforcement.py
2026-04-16 13:32:32 +02:00

240 lines
7.4 KiB
Python

import hashlib
import sqlite3
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
def _runtime_base_dir() -> Path:
if getattr(sys, "frozen", False):
return Path(getattr(sys, "_MEIPASS", Path(sys.executable).resolve().parent))
return Path(__file__).resolve().parent
def _resolve_db_path() -> str:
if getattr(sys, "frozen", False):
import os as _os
try:
from aza_config import get_writable_data_dir
writable = Path(get_writable_data_dir()) / "data"
except Exception:
writable = Path(_os.environ.get("APPDATA", "")) / "AZA Desktop" / "data"
writable.mkdir(parents=True, exist_ok=True)
return str(writable / "stripe_webhook.sqlite")
return str(_runtime_base_dir() / "data" / "stripe_webhook.sqlite")
DB_PATH = _resolve_db_path()
@dataclass(frozen=True)
class DeviceDecision:
allowed: bool
reason: str # "ok"|"missing_device_id"|"license_not_found"|"limit_reached"|"user_limit_reached"
devices_used: int
devices_allowed: int
users_used: int
users_allowed: int
def _hash_device_id(device_id: str) -> str:
# store only hashed device identifiers (privacy + avoids accidental leaks)
return hashlib.sha256(device_id.encode("utf-8")).hexdigest()
def ensure_device_table(conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS device_bindings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_email TEXT NOT NULL,
user_key TEXT NOT NULL,
device_hash TEXT NOT NULL,
first_seen_at INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
UNIQUE(customer_email, user_key, device_hash)
);
"""
)
conn.commit()
def _get_license_row(conn: sqlite3.Connection, customer_email: str) -> Optional[Tuple[int, int]]:
"""
Returns (allowed_users, devices_per_user) from licenses table for given customer_email.
Assumes licenses table already exists in your system.
"""
cur = conn.execute(
"SELECT allowed_users, devices_per_user FROM licenses WHERE customer_email = ? LIMIT 1;",
(customer_email,),
)
row = cur.fetchone()
if not row:
return None
try:
allowed_users = int(row[0]) if row[0] is not None else 1
devices_per_user = int(row[1]) if row[1] is not None else 1
return allowed_users, devices_per_user
except Exception:
# fail safe defaults
return 1, 1
def enforce_and_touch_device(
customer_email: str,
user_key: str,
device_id: Optional[str],
db_path: Optional[str] = None,
) -> DeviceDecision:
"""
Enforces devices_per_user for (customer_email, user_key).
- customer_email: taken from license record (stable identifier)
- user_key: stable per-user identifier on client side (e.g. "default", or later real user id)
- device_id: client-provided stable device identifier (NOT secret); we hash it before storing.
- db_path: absolute path to stripe_webhook.sqlite; falls back to module-level DB_PATH if not given.
Behavior:
- If device already known: update last_seen_at, allow.
- If new device and below limit: insert, allow.
- If new device and limit reached: deny.
"""
if not device_id:
return DeviceDecision(
allowed=False,
reason="missing_device_id",
devices_used=0,
devices_allowed=0,
users_used=0,
users_allowed=0,
)
device_hash = _hash_device_id(device_id)
now = int(time.time())
conn = sqlite3.connect(db_path or DB_PATH)
try:
ensure_device_table(conn)
lic = _get_license_row(conn, customer_email)
if not lic:
return DeviceDecision(
allowed=False,
reason="license_not_found",
devices_used=0,
devices_allowed=0,
users_used=0,
users_allowed=0,
)
allowed_users, devices_per_user = lic
cur_user = conn.execute(
"""
SELECT COUNT(*) FROM device_bindings
WHERE customer_email = ? AND user_key = ?;
""",
(customer_email, user_key),
)
user_bindings_count = int(cur_user.fetchone()[0])
cur_users = conn.execute(
"""
SELECT COUNT(DISTINCT user_key) FROM device_bindings
WHERE customer_email = ?;
""",
(customer_email,),
)
users_used = int(cur_users.fetchone()[0])
if user_bindings_count == 0 and users_used >= int(allowed_users):
return DeviceDecision(
allowed=False,
reason="user_limit_reached",
devices_used=0,
devices_allowed=int(devices_per_user),
users_used=users_used,
users_allowed=int(allowed_users),
)
# is device already bound?
cur = conn.execute(
"""
SELECT id FROM device_bindings
WHERE customer_email = ? AND user_key = ? AND device_hash = ?
LIMIT 1;
""",
(customer_email, user_key, device_hash),
)
row = cur.fetchone()
if row:
conn.execute(
"""
UPDATE device_bindings
SET last_seen_at = ?
WHERE id = ?;
""",
(now, int(row[0])),
)
conn.commit()
# devices used:
cur2 = conn.execute(
"""
SELECT COUNT(*) FROM device_bindings
WHERE customer_email = ? AND user_key = ?;
""",
(customer_email, user_key),
)
used = int(cur2.fetchone()[0])
return DeviceDecision(
allowed=True,
reason="ok",
devices_used=used,
devices_allowed=int(devices_per_user),
users_used=users_used,
users_allowed=int(allowed_users),
)
# new device -> check limit
cur3 = conn.execute(
"""
SELECT COUNT(*) FROM device_bindings
WHERE customer_email = ? AND user_key = ?;
""",
(customer_email, user_key),
)
used = int(cur3.fetchone()[0])
if used >= int(devices_per_user):
return DeviceDecision(
allowed=False,
reason="limit_reached",
devices_used=used,
devices_allowed=int(devices_per_user),
users_used=users_used,
users_allowed=int(allowed_users),
)
# claim new device
conn.execute(
"""
INSERT INTO device_bindings (customer_email, user_key, device_hash, first_seen_at, last_seen_at)
VALUES (?, ?, ?, ?, ?);
""",
(customer_email, user_key, device_hash, now, now),
)
conn.commit()
return DeviceDecision(
allowed=True,
reason="ok",
devices_used=used + 1,
devices_allowed=int(devices_per_user),
users_used=users_used if user_bindings_count > 0 else users_used + 1,
users_allowed=int(allowed_users),
)
finally:
conn.close()