# -*- coding: utf-8 -*- """ AZA TOTP – Zwei-Faktor-Authentifizierung (RFC 6238) ENV-Variablen: AZA_2FA_ENABLED – "1" = 2FA-Feature verfügbar (Default: "1") AZA_2FA_REQUIRED – "1" = 2FA ist Pflicht für alle Benutzer (Default: "0") """ import os import sys import time import secrets import hashlib import hmac import base64 import json import pyotp _2FA_ENABLED = os.getenv("AZA_2FA_ENABLED", "1").strip() == "1" _2FA_REQUIRED = os.getenv("AZA_2FA_REQUIRED", "0").strip() == "1" _NUM_BACKUP_CODES = 8 _RATE_LIMIT_MAX_ATTEMPTS = 5 _RATE_LIMIT_WINDOW_SECONDS = 300 _totp_attempts: dict[str, list[float]] = {} def is_2fa_enabled() -> bool: return _2FA_ENABLED def is_2fa_required() -> bool: return _2FA_REQUIRED def generate_totp_secret() -> str: """Generiert ein neues TOTP-Secret (Base32, 160 bit).""" return pyotp.random_base32(length=32) def generate_backup_codes(count: int = _NUM_BACKUP_CODES) -> list[str]: """Generiert Einmal-Backup-Codes (8 Zeichen, alphanumerisch).""" codes = [] for _ in range(count): code = secrets.token_hex(4).upper() codes.append(code) return codes def hash_backup_code(code: str) -> str: """Hasht einen Backup-Code für sichere Speicherung.""" return hashlib.sha256(code.strip().upper().encode("utf-8")).hexdigest() def get_provisioning_uri(secret: str, user_name: str, issuer: str = "AZA MedWork") -> str: """Erzeugt die otpauth:// URI für QR-Code-Generierung.""" totp = pyotp.TOTP(secret) return totp.provisioning_uri(name=user_name, issuer_name=issuer) def verify_totp(secret: str, code: str, user_id: str = "default") -> bool: """Prüft einen TOTP-Code mit Rate-Limiting. Erlaubt +-1 Window (30s).""" if not _check_rate_limit(user_id): return False _record_attempt(user_id) totp = pyotp.TOTP(secret) return totp.verify(code.strip(), valid_window=1) def verify_backup_code(code: str, stored_hashes: list[str]) -> int | None: """Prüft einen Backup-Code. Gibt den Index zurück oder None.""" h = hash_backup_code(code) for i, stored in enumerate(stored_hashes): if stored and hmac.compare_digest(h, stored): return i return None def encrypt_secret(secret: str, key: str) -> str: """Verschlüsselt das TOTP-Secret mit XOR + Base64 (symmetrisch). Für Desktop-App ausreichend; Secret liegt nie im Klartext auf Disk.""" key_bytes = hashlib.sha256(key.encode("utf-8")).digest() secret_bytes = secret.encode("utf-8") encrypted = bytes(s ^ key_bytes[i % len(key_bytes)] for i, s in enumerate(secret_bytes)) return base64.b64encode(encrypted).decode("ascii") def decrypt_secret(encrypted: str, key: str) -> str: """Entschlüsselt das TOTP-Secret. Gibt leeren String bei falschem Key zurück.""" try: key_bytes = hashlib.sha256(key.encode("utf-8")).digest() encrypted_bytes = base64.b64decode(encrypted.encode("ascii")) decrypted = bytes(e ^ key_bytes[i % len(key_bytes)] for i, e in enumerate(encrypted_bytes)) return decrypted.decode("utf-8") except (UnicodeDecodeError, Exception): return "" def is_rate_limited(user_id: str = "default") -> bool: """Prüft ob der Benutzer aktuell rate-limited ist.""" return not _check_rate_limit(user_id) def _check_rate_limit(user_id: str) -> bool: now = time.time() attempts = _totp_attempts.get(user_id, []) recent = [t for t in attempts if now - t < _RATE_LIMIT_WINDOW_SECONDS] _totp_attempts[user_id] = recent return len(recent) < _RATE_LIMIT_MAX_ATTEMPTS def _record_attempt(user_id: str): if user_id not in _totp_attempts: _totp_attempts[user_id] = [] _totp_attempts[user_id].append(time.time()) def get_2fa_profile_data(profile: dict) -> dict: """Extrahiert 2FA-relevante Daten aus dem Profil.""" return { "totp_secret_enc": profile.get("totp_secret_enc", ""), "totp_active": profile.get("totp_active", False), "backup_codes": profile.get("backup_codes", []), }