# -*- coding: utf-8 -*- """ AZA Security Vault – Verschlüsselter API-Key-Tresor. Windows: DPAPI (CryptProtectData / CryptUnprotectData) – user-scoped. Fallback: base64-Obfuscation (kein Klartext, aber kein kryptographischer Schutz). """ import base64 import json import os import sys from aza_config import get_writable_data_dir _VAULT_SUBDIR = "secure" _VAULT_FILENAME = "vault.dat" def _vault_path() -> str: d = os.path.join(get_writable_data_dir(), _VAULT_SUBDIR) os.makedirs(d, exist_ok=True) return os.path.join(d, _VAULT_FILENAME) # ─── Windows DPAPI ──────────────────────────────────────────────────────────── _USE_DPAPI = sys.platform == "win32" if _USE_DPAPI: import ctypes import ctypes.wintypes as _wt class _DATA_BLOB(ctypes.Structure): _fields_ = [ ("cbData", _wt.DWORD), ("pbData", ctypes.POINTER(ctypes.c_char)), ] def _dpapi_encrypt(plaintext: bytes) -> bytes: blob_in = _DATA_BLOB(len(plaintext), ctypes.create_string_buffer(plaintext, len(plaintext))) blob_out = _DATA_BLOB() ok = ctypes.windll.crypt32.CryptProtectData( ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out), ) if not ok: raise OSError("CryptProtectData failed") buf = ctypes.create_string_buffer(blob_out.cbData) ctypes.memmove(buf, blob_out.pbData, blob_out.cbData) ctypes.windll.kernel32.LocalFree(blob_out.pbData) return buf.raw def _dpapi_decrypt(encrypted: bytes) -> bytes: blob_in = _DATA_BLOB(len(encrypted), ctypes.create_string_buffer(encrypted, len(encrypted))) blob_out = _DATA_BLOB() ok = ctypes.windll.crypt32.CryptUnprotectData( ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out), ) if not ok: raise OSError("CryptUnprotectData failed") buf = ctypes.create_string_buffer(blob_out.cbData) ctypes.memmove(buf, blob_out.pbData, blob_out.cbData) ctypes.windll.kernel32.LocalFree(blob_out.pbData) return buf.raw def _encrypt(data: bytes) -> bytes: if _USE_DPAPI: return _dpapi_encrypt(data) return base64.b85encode(data) def _decrypt(data: bytes) -> bytes: if _USE_DPAPI: return _dpapi_decrypt(data) return base64.b85decode(data) # ─── Public API ─────────────────────────────────────────────────────────────── def store_api_key(key: str) -> bool: """Speichert den API-Key verschlüsselt im Tresor.""" if not key or not key.strip(): return False try: encrypted = _encrypt(key.strip().encode("utf-8")) payload = { "k": base64.b64encode(encrypted).decode("ascii"), "m": "dpapi" if _USE_DPAPI else "b85", } with open(_vault_path(), "w", encoding="utf-8") as f: json.dump(payload, f) return True except Exception: return False def retrieve_api_key() -> str | None: """Liest den API-Key aus dem verschlüsselten Tresor.""" try: path = _vault_path() if not os.path.isfile(path): return None with open(path, "r", encoding="utf-8") as f: payload = json.load(f) encrypted = base64.b64decode(payload["k"]) return _decrypt(encrypted).decode("utf-8") except Exception: return None def has_vault_key() -> bool: """True wenn ein verschlüsselter Key im Tresor existiert.""" return retrieve_api_key() is not None def get_masked_key() -> str: """Gibt den Key maskiert zurück, z.B. 'sk-****...a1B2'.""" key = retrieve_api_key() if not key: return "" if len(key) <= 8: return "****-****" prefix = key[:3] if key.startswith("sk-") else "" return f"{prefix}****\u2026{key[-4:]}" def delete_vault_key() -> bool: """Löscht den gespeicherten Key.""" try: path = _vault_path() if os.path.isfile(path): os.remove(path) return True except Exception: return False