Files
aza/AzA march 2026 - Kopie (11)/security_vault.py

137 lines
4.3 KiB
Python
Raw Normal View History

2026-04-16 13:32:32 +02:00
# -*- coding: utf-8 -*-
"""
AZA Security Vault Verschlüsselter API-Key-Tresor.
Windows: DPAPI (CryptProtectData / CryptUnprotectData) user-scoped.
Fallback: base64-Obfuscation (kein Klartext, aber kein kryptographischer Schutz).
"""
import base64
import json
import os
import sys
from aza_config import get_writable_data_dir
_VAULT_SUBDIR = "secure"
_VAULT_FILENAME = "vault.dat"
def _vault_path() -> str:
d = os.path.join(get_writable_data_dir(), _VAULT_SUBDIR)
os.makedirs(d, exist_ok=True)
return os.path.join(d, _VAULT_FILENAME)
# ─── Windows DPAPI ────────────────────────────────────────────────────────────
_USE_DPAPI = sys.platform == "win32"
if _USE_DPAPI:
import ctypes
import ctypes.wintypes as _wt
class _DATA_BLOB(ctypes.Structure):
_fields_ = [
("cbData", _wt.DWORD),
("pbData", ctypes.POINTER(ctypes.c_char)),
]
def _dpapi_encrypt(plaintext: bytes) -> bytes:
blob_in = _DATA_BLOB(len(plaintext), ctypes.create_string_buffer(plaintext, len(plaintext)))
blob_out = _DATA_BLOB()
ok = ctypes.windll.crypt32.CryptProtectData(
ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out),
)
if not ok:
raise OSError("CryptProtectData failed")
buf = ctypes.create_string_buffer(blob_out.cbData)
ctypes.memmove(buf, blob_out.pbData, blob_out.cbData)
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
return buf.raw
def _dpapi_decrypt(encrypted: bytes) -> bytes:
blob_in = _DATA_BLOB(len(encrypted), ctypes.create_string_buffer(encrypted, len(encrypted)))
blob_out = _DATA_BLOB()
ok = ctypes.windll.crypt32.CryptUnprotectData(
ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out),
)
if not ok:
raise OSError("CryptUnprotectData failed")
buf = ctypes.create_string_buffer(blob_out.cbData)
ctypes.memmove(buf, blob_out.pbData, blob_out.cbData)
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
return buf.raw
def _encrypt(data: bytes) -> bytes:
if _USE_DPAPI:
return _dpapi_encrypt(data)
return base64.b85encode(data)
def _decrypt(data: bytes) -> bytes:
if _USE_DPAPI:
return _dpapi_decrypt(data)
return base64.b85decode(data)
# ─── Public API ───────────────────────────────────────────────────────────────
def store_api_key(key: str) -> bool:
"""Speichert den API-Key verschlüsselt im Tresor."""
if not key or not key.strip():
return False
try:
encrypted = _encrypt(key.strip().encode("utf-8"))
payload = {
"k": base64.b64encode(encrypted).decode("ascii"),
"m": "dpapi" if _USE_DPAPI else "b85",
}
with open(_vault_path(), "w", encoding="utf-8") as f:
json.dump(payload, f)
return True
except Exception:
return False
def retrieve_api_key() -> str | None:
"""Liest den API-Key aus dem verschlüsselten Tresor."""
try:
path = _vault_path()
if not os.path.isfile(path):
return None
with open(path, "r", encoding="utf-8") as f:
payload = json.load(f)
encrypted = base64.b64decode(payload["k"])
return _decrypt(encrypted).decode("utf-8")
except Exception:
return None
def has_vault_key() -> bool:
"""True wenn ein verschlüsselter Key im Tresor existiert."""
return retrieve_api_key() is not None
def get_masked_key() -> str:
"""Gibt den Key maskiert zurück, z.B. 'sk-****...a1B2'."""
key = retrieve_api_key()
if not key:
return ""
if len(key) <= 8:
return "****-****"
prefix = key[:3] if key.startswith("sk-") else ""
return f"{prefix}****\u2026{key[-4:]}"
def delete_vault_key() -> bool:
"""Löscht den gespeicherten Key."""
try:
path = _vault_path()
if os.path.isfile(path):
os.remove(path)
return True
except Exception:
return False