137 lines
4.3 KiB
Python
137 lines
4.3 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
AZA Security Vault – Verschlüsselter API-Key-Tresor.
|
||
|
||
Windows: DPAPI (CryptProtectData / CryptUnprotectData) – user-scoped.
|
||
Fallback: base64-Obfuscation (kein Klartext, aber kein kryptographischer Schutz).
|
||
"""
|
||
|
||
import base64
|
||
import json
|
||
import os
|
||
import sys
|
||
|
||
from aza_config import get_writable_data_dir
|
||
|
||
_VAULT_SUBDIR = "secure"
|
||
_VAULT_FILENAME = "vault.dat"
|
||
|
||
|
||
def _vault_path() -> str:
|
||
d = os.path.join(get_writable_data_dir(), _VAULT_SUBDIR)
|
||
os.makedirs(d, exist_ok=True)
|
||
return os.path.join(d, _VAULT_FILENAME)
|
||
|
||
|
||
# ─── Windows DPAPI ────────────────────────────────────────────────────────────
|
||
|
||
_USE_DPAPI = sys.platform == "win32"
|
||
|
||
if _USE_DPAPI:
|
||
import ctypes
|
||
import ctypes.wintypes as _wt
|
||
|
||
class _DATA_BLOB(ctypes.Structure):
|
||
_fields_ = [
|
||
("cbData", _wt.DWORD),
|
||
("pbData", ctypes.POINTER(ctypes.c_char)),
|
||
]
|
||
|
||
def _dpapi_encrypt(plaintext: bytes) -> bytes:
|
||
blob_in = _DATA_BLOB(len(plaintext), ctypes.create_string_buffer(plaintext, len(plaintext)))
|
||
blob_out = _DATA_BLOB()
|
||
ok = ctypes.windll.crypt32.CryptProtectData(
|
||
ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out),
|
||
)
|
||
if not ok:
|
||
raise OSError("CryptProtectData failed")
|
||
buf = ctypes.create_string_buffer(blob_out.cbData)
|
||
ctypes.memmove(buf, blob_out.pbData, blob_out.cbData)
|
||
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
|
||
return buf.raw
|
||
|
||
def _dpapi_decrypt(encrypted: bytes) -> bytes:
|
||
blob_in = _DATA_BLOB(len(encrypted), ctypes.create_string_buffer(encrypted, len(encrypted)))
|
||
blob_out = _DATA_BLOB()
|
||
ok = ctypes.windll.crypt32.CryptUnprotectData(
|
||
ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out),
|
||
)
|
||
if not ok:
|
||
raise OSError("CryptUnprotectData failed")
|
||
buf = ctypes.create_string_buffer(blob_out.cbData)
|
||
ctypes.memmove(buf, blob_out.pbData, blob_out.cbData)
|
||
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
|
||
return buf.raw
|
||
|
||
|
||
def _encrypt(data: bytes) -> bytes:
|
||
if _USE_DPAPI:
|
||
return _dpapi_encrypt(data)
|
||
return base64.b85encode(data)
|
||
|
||
|
||
def _decrypt(data: bytes) -> bytes:
|
||
if _USE_DPAPI:
|
||
return _dpapi_decrypt(data)
|
||
return base64.b85decode(data)
|
||
|
||
|
||
# ─── Public API ───────────────────────────────────────────────────────────────
|
||
|
||
def store_api_key(key: str) -> bool:
|
||
"""Speichert den API-Key verschlüsselt im Tresor."""
|
||
if not key or not key.strip():
|
||
return False
|
||
try:
|
||
encrypted = _encrypt(key.strip().encode("utf-8"))
|
||
payload = {
|
||
"k": base64.b64encode(encrypted).decode("ascii"),
|
||
"m": "dpapi" if _USE_DPAPI else "b85",
|
||
}
|
||
with open(_vault_path(), "w", encoding="utf-8") as f:
|
||
json.dump(payload, f)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def retrieve_api_key() -> str | None:
|
||
"""Liest den API-Key aus dem verschlüsselten Tresor."""
|
||
try:
|
||
path = _vault_path()
|
||
if not os.path.isfile(path):
|
||
return None
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
payload = json.load(f)
|
||
encrypted = base64.b64decode(payload["k"])
|
||
return _decrypt(encrypted).decode("utf-8")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def has_vault_key() -> bool:
|
||
"""True wenn ein verschlüsselter Key im Tresor existiert."""
|
||
return retrieve_api_key() is not None
|
||
|
||
|
||
def get_masked_key() -> str:
|
||
"""Gibt den Key maskiert zurück, z.B. 'sk-****...a1B2'."""
|
||
key = retrieve_api_key()
|
||
if not key:
|
||
return ""
|
||
if len(key) <= 8:
|
||
return "****-****"
|
||
prefix = key[:3] if key.startswith("sk-") else ""
|
||
return f"{prefix}****\u2026{key[-4:]}"
|
||
|
||
|
||
def delete_vault_key() -> bool:
|
||
"""Löscht den gespeicherten Key."""
|
||
try:
|
||
path = _vault_path()
|
||
if os.path.isfile(path):
|
||
os.remove(path)
|
||
return True
|
||
except Exception:
|
||
return False
|