137 lines
4.3 KiB
Python
137 lines
4.3 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
AZA Security Vault – Verschlüsselter API-Key-Tresor.
|
|||
|
|
|
|||
|
|
Windows: DPAPI (CryptProtectData / CryptUnprotectData) – user-scoped.
|
|||
|
|
Fallback: base64-Obfuscation (kein Klartext, aber kein kryptographischer Schutz).
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import base64
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
|
|||
|
|
from aza_config import get_writable_data_dir
|
|||
|
|
|
|||
|
|
_VAULT_SUBDIR = "secure"
|
|||
|
|
_VAULT_FILENAME = "vault.dat"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _vault_path() -> str:
|
|||
|
|
d = os.path.join(get_writable_data_dir(), _VAULT_SUBDIR)
|
|||
|
|
os.makedirs(d, exist_ok=True)
|
|||
|
|
return os.path.join(d, _VAULT_FILENAME)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ─── Windows DPAPI ────────────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
_USE_DPAPI = sys.platform == "win32"
|
|||
|
|
|
|||
|
|
if _USE_DPAPI:
|
|||
|
|
import ctypes
|
|||
|
|
import ctypes.wintypes as _wt
|
|||
|
|
|
|||
|
|
class _DATA_BLOB(ctypes.Structure):
|
|||
|
|
_fields_ = [
|
|||
|
|
("cbData", _wt.DWORD),
|
|||
|
|
("pbData", ctypes.POINTER(ctypes.c_char)),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
def _dpapi_encrypt(plaintext: bytes) -> bytes:
|
|||
|
|
blob_in = _DATA_BLOB(len(plaintext), ctypes.create_string_buffer(plaintext, len(plaintext)))
|
|||
|
|
blob_out = _DATA_BLOB()
|
|||
|
|
ok = ctypes.windll.crypt32.CryptProtectData(
|
|||
|
|
ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out),
|
|||
|
|
)
|
|||
|
|
if not ok:
|
|||
|
|
raise OSError("CryptProtectData failed")
|
|||
|
|
buf = ctypes.create_string_buffer(blob_out.cbData)
|
|||
|
|
ctypes.memmove(buf, blob_out.pbData, blob_out.cbData)
|
|||
|
|
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
|
|||
|
|
return buf.raw
|
|||
|
|
|
|||
|
|
def _dpapi_decrypt(encrypted: bytes) -> bytes:
|
|||
|
|
blob_in = _DATA_BLOB(len(encrypted), ctypes.create_string_buffer(encrypted, len(encrypted)))
|
|||
|
|
blob_out = _DATA_BLOB()
|
|||
|
|
ok = ctypes.windll.crypt32.CryptUnprotectData(
|
|||
|
|
ctypes.byref(blob_in), None, None, None, None, 0, ctypes.byref(blob_out),
|
|||
|
|
)
|
|||
|
|
if not ok:
|
|||
|
|
raise OSError("CryptUnprotectData failed")
|
|||
|
|
buf = ctypes.create_string_buffer(blob_out.cbData)
|
|||
|
|
ctypes.memmove(buf, blob_out.pbData, blob_out.cbData)
|
|||
|
|
ctypes.windll.kernel32.LocalFree(blob_out.pbData)
|
|||
|
|
return buf.raw
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _encrypt(data: bytes) -> bytes:
|
|||
|
|
if _USE_DPAPI:
|
|||
|
|
return _dpapi_encrypt(data)
|
|||
|
|
return base64.b85encode(data)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _decrypt(data: bytes) -> bytes:
|
|||
|
|
if _USE_DPAPI:
|
|||
|
|
return _dpapi_decrypt(data)
|
|||
|
|
return base64.b85decode(data)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ─── Public API ───────────────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
def store_api_key(key: str) -> bool:
|
|||
|
|
"""Speichert den API-Key verschlüsselt im Tresor."""
|
|||
|
|
if not key or not key.strip():
|
|||
|
|
return False
|
|||
|
|
try:
|
|||
|
|
encrypted = _encrypt(key.strip().encode("utf-8"))
|
|||
|
|
payload = {
|
|||
|
|
"k": base64.b64encode(encrypted).decode("ascii"),
|
|||
|
|
"m": "dpapi" if _USE_DPAPI else "b85",
|
|||
|
|
}
|
|||
|
|
with open(_vault_path(), "w", encoding="utf-8") as f:
|
|||
|
|
json.dump(payload, f)
|
|||
|
|
return True
|
|||
|
|
except Exception:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
def retrieve_api_key() -> str | None:
|
|||
|
|
"""Liest den API-Key aus dem verschlüsselten Tresor."""
|
|||
|
|
try:
|
|||
|
|
path = _vault_path()
|
|||
|
|
if not os.path.isfile(path):
|
|||
|
|
return None
|
|||
|
|
with open(path, "r", encoding="utf-8") as f:
|
|||
|
|
payload = json.load(f)
|
|||
|
|
encrypted = base64.b64decode(payload["k"])
|
|||
|
|
return _decrypt(encrypted).decode("utf-8")
|
|||
|
|
except Exception:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def has_vault_key() -> bool:
|
|||
|
|
"""True wenn ein verschlüsselter Key im Tresor existiert."""
|
|||
|
|
return retrieve_api_key() is not None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_masked_key() -> str:
|
|||
|
|
"""Gibt den Key maskiert zurück, z.B. 'sk-****...a1B2'."""
|
|||
|
|
key = retrieve_api_key()
|
|||
|
|
if not key:
|
|||
|
|
return ""
|
|||
|
|
if len(key) <= 8:
|
|||
|
|
return "****-****"
|
|||
|
|
prefix = key[:3] if key.startswith("sk-") else ""
|
|||
|
|
return f"{prefix}****\u2026{key[-4:]}"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def delete_vault_key() -> bool:
|
|||
|
|
"""Löscht den gespeicherten Key."""
|
|||
|
|
try:
|
|||
|
|
path = _vault_path()
|
|||
|
|
if os.path.isfile(path):
|
|||
|
|
os.remove(path)
|
|||
|
|
return True
|
|||
|
|
except Exception:
|
|||
|
|
return False
|