316 lines
11 KiB
Python
316 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
AZA MedWork – Monitoring & Integritaetspruefung.
|
||
|
||
Sammelt Health-Status, Metriken aus Audit-/Consent-Logs
|
||
und fuehrt Integritaetschecks durch. Keine Patientendaten.
|
||
|
||
Nutzung:
|
||
python aza_monitoring.py health -> Health-Checks
|
||
python aza_monitoring.py metrics -> Metriken aus Logs
|
||
python aza_monitoring.py integrity -> Integritaetspruefung
|
||
python aza_monitoring.py all -> Alles zusammen
|
||
python aza_monitoring.py nightly -> Nightly-Check (Integrity + Alert-Metriken)
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import sys
|
||
import time
|
||
import urllib.request
|
||
import ssl
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
_BASE_DIR = Path(__file__).resolve().parent
|
||
|
||
|
||
# =====================================================================
|
||
# 1) HEALTH CHECKS
|
||
# =====================================================================
|
||
|
||
_SERVICES = [
|
||
{"name": "backend_main", "url": os.getenv("AZA_BACKEND_URL", "https://127.0.0.1:8000/health")},
|
||
{"name": "transcribe_server", "url": os.getenv("AZA_TRANSCRIBE_URL", "https://127.0.0.1:8090/health")},
|
||
{"name": "todo_server", "url": os.getenv("AZA_TODO_URL", "https://127.0.0.1:5111/health")},
|
||
]
|
||
|
||
|
||
def check_health(services=None) -> list:
|
||
"""Prueft /health fuer alle konfigurierten Services."""
|
||
if services is None:
|
||
services = _SERVICES
|
||
|
||
ctx = ssl.create_default_context()
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
|
||
results = []
|
||
for svc in services:
|
||
entry = {"name": svc["name"], "url": svc["url"], "status": "FAIL", "detail": ""}
|
||
try:
|
||
req = urllib.request.Request(svc["url"], method="GET")
|
||
resp = urllib.request.urlopen(req, timeout=5, context=ctx)
|
||
data = json.loads(resp.read().decode("utf-8"))
|
||
entry["status"] = "OK" if data.get("status") == "ok" else "WARN"
|
||
entry["version"] = data.get("version", "?")
|
||
entry["uptime_s"] = data.get("uptime_s", 0)
|
||
entry["tls"] = data.get("tls", False)
|
||
except Exception as e:
|
||
entry["detail"] = str(e)[:120]
|
||
results.append(entry)
|
||
return results
|
||
|
||
|
||
# =====================================================================
|
||
# 2) MONITORING-METRIKEN
|
||
# =====================================================================
|
||
|
||
def collect_metrics() -> dict:
|
||
"""Sammelt Metriken aus Audit-Log und Backup-Status."""
|
||
metrics = {
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"audit_log": {},
|
||
"consent_log": {},
|
||
"backup": {},
|
||
}
|
||
|
||
try:
|
||
from aza_audit_log import get_log_stats
|
||
metrics["audit_log"] = get_log_stats()
|
||
except Exception as e:
|
||
metrics["audit_log"] = {"error": str(e)[:100]}
|
||
|
||
try:
|
||
from aza_consent import verify_chain_integrity
|
||
ok, errs = verify_chain_integrity()
|
||
consent_file = _BASE_DIR / "aza_consent_log.json"
|
||
count = 0
|
||
if consent_file.exists():
|
||
try:
|
||
with open(consent_file, "r", encoding="utf-8") as f:
|
||
count = len(json.load(f))
|
||
except Exception:
|
||
pass
|
||
metrics["consent_log"] = {
|
||
"entries": count,
|
||
"integrity": "PASS" if ok else "FAIL",
|
||
}
|
||
except Exception as e:
|
||
metrics["consent_log"] = {"error": str(e)[:100]}
|
||
|
||
try:
|
||
backup_dir = _BASE_DIR / "backups"
|
||
if backup_dir.exists():
|
||
zips = sorted(backup_dir.glob("aza_backup_*.zip"), key=lambda p: p.stat().st_mtime, reverse=True)
|
||
metrics["backup"] = {
|
||
"count": len(zips),
|
||
"latest": zips[0].name if zips else None,
|
||
"latest_time": datetime.fromtimestamp(zips[0].stat().st_mtime, tz=timezone.utc).isoformat() if zips else None,
|
||
}
|
||
else:
|
||
metrics["backup"] = {"count": 0, "latest": None}
|
||
except Exception as e:
|
||
metrics["backup"] = {"error": str(e)[:100]}
|
||
|
||
return metrics
|
||
|
||
|
||
def get_alert_metrics() -> list:
|
||
"""Extrahiert sicherheitsrelevante Zaehler fuer Alerting."""
|
||
alerts = []
|
||
try:
|
||
from aza_audit_log import get_log_stats
|
||
stats = get_log_stats()
|
||
events = stats.get("events", {})
|
||
|
||
login_fail = events.get("LOGIN_FAIL", 0)
|
||
if login_fail > 0:
|
||
alerts.append({"metric": "login_fail_count", "value": login_fail, "severity": "WARN" if login_fail < 10 else "HIGH"})
|
||
|
||
ai_blocked = events.get("AI_BLOCKED", 0)
|
||
if ai_blocked > 0:
|
||
alerts.append({"metric": "ai_blocked_count", "value": ai_blocked, "severity": "INFO"})
|
||
|
||
ai_calls = events.get("AI_CHAT", 0) + events.get("AI_TRANSCRIBE", 0)
|
||
alerts.append({"metric": "ai_calls_total", "value": ai_calls, "severity": "INFO"})
|
||
|
||
twofa_fail = events.get("2FA_FAIL", 0)
|
||
if twofa_fail > 0:
|
||
alerts.append({"metric": "2fa_fail_count", "value": twofa_fail, "severity": "WARN" if twofa_fail < 5 else "HIGH"})
|
||
|
||
if stats.get("integrity") == "FAIL":
|
||
alerts.append({"metric": "audit_log_integrity", "value": "FAIL", "severity": "CRITICAL"})
|
||
|
||
except Exception as e:
|
||
alerts.append({"metric": "audit_log_read_error", "value": str(e)[:80], "severity": "HIGH"})
|
||
|
||
return alerts
|
||
|
||
|
||
# =====================================================================
|
||
# 3) INTEGRITAETS-CHECKS
|
||
# =====================================================================
|
||
|
||
def check_integrity() -> dict:
|
||
"""Prueft Audit-Log und Consent-Log Integritaet."""
|
||
results = {"timestamp": datetime.now(timezone.utc).isoformat(), "audit_log": {}, "consent_log": {}}
|
||
|
||
try:
|
||
from aza_audit_log import verify_integrity, verify_all_rotations, _LOG_FILE
|
||
if _LOG_FILE.exists():
|
||
ok_all, res_all = verify_all_rotations()
|
||
results["audit_log"] = {
|
||
"status": "PASS" if ok_all else "FAIL",
|
||
"files": {k: {"ok": v["ok"], "errors": v["errors"]} for k, v in res_all.items()},
|
||
}
|
||
else:
|
||
results["audit_log"] = {"status": "PASS", "note": "Keine Logdatei vorhanden"}
|
||
except Exception as e:
|
||
results["audit_log"] = {"status": "ERROR", "error": str(e)[:120]}
|
||
|
||
try:
|
||
from aza_consent import verify_chain_integrity
|
||
consent_file = _BASE_DIR / "aza_consent_log.json"
|
||
if consent_file.exists():
|
||
ok, errs = verify_chain_integrity()
|
||
results["consent_log"] = {
|
||
"status": "PASS" if ok else "FAIL",
|
||
"errors": errs,
|
||
}
|
||
else:
|
||
results["consent_log"] = {"status": "PASS", "note": "Keine Logdatei vorhanden"}
|
||
except Exception as e:
|
||
results["consent_log"] = {"status": "ERROR", "error": str(e)[:120]}
|
||
|
||
if results["audit_log"].get("status") == "FAIL" or results["consent_log"].get("status") == "FAIL":
|
||
try:
|
||
from aza_audit_log import log_event
|
||
log_event("INTEGRITY_FAIL", source="monitoring",
|
||
detail=f"audit={results['audit_log'].get('status')} consent={results['consent_log'].get('status')}")
|
||
except Exception:
|
||
pass
|
||
|
||
return results
|
||
|
||
|
||
# =====================================================================
|
||
# 4) NIGHTLY CHECK (alle Pruefungen + Ausgabe)
|
||
# =====================================================================
|
||
|
||
def run_nightly() -> dict:
|
||
"""Fuehrt alle naechtlichen Pruefungen durch."""
|
||
report = {
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"integrity": check_integrity(),
|
||
"alerts": get_alert_metrics(),
|
||
"metrics": collect_metrics(),
|
||
}
|
||
|
||
all_ok = (
|
||
report["integrity"]["audit_log"].get("status") in ("PASS", None)
|
||
and report["integrity"]["consent_log"].get("status") in ("PASS", None)
|
||
and not any(a.get("severity") in ("HIGH", "CRITICAL") for a in report["alerts"])
|
||
)
|
||
report["overall"] = "PASS" if all_ok else "ATTENTION"
|
||
|
||
return report
|
||
|
||
|
||
# =====================================================================
|
||
# CLI
|
||
# =====================================================================
|
||
|
||
def _print_health(results):
|
||
print(f"\n{'='*60}")
|
||
print("HEALTH CHECKS")
|
||
print(f"{'='*60}")
|
||
for r in results:
|
||
status = r["status"]
|
||
line = f" {r['name']:25s} {status:4s}"
|
||
if status == "OK":
|
||
line += f" v{r.get('version','?')} uptime={r.get('uptime_s',0)}s tls={r.get('tls','?')}"
|
||
else:
|
||
line += f" {r.get('detail','')}"
|
||
print(line)
|
||
|
||
|
||
def _print_metrics(m):
|
||
print(f"\n{'='*60}")
|
||
print("METRIKEN")
|
||
print(f"{'='*60}")
|
||
al = m.get("audit_log", {})
|
||
print(f" Audit-Log: {al.get('total_lines', '?')} Eintraege, "
|
||
f"Integritaet={al.get('integrity','?')}, "
|
||
f"Groesse={al.get('size_mb','?')} MB")
|
||
for ev, cnt in sorted(al.get("events", {}).items()):
|
||
print(f" {ev}: {cnt}")
|
||
|
||
cl = m.get("consent_log", {})
|
||
print(f" Consent-Log: {cl.get('entries','?')} Eintraege, Integritaet={cl.get('integrity','?')}")
|
||
|
||
bk = m.get("backup", {})
|
||
print(f" Backups: {bk.get('count','?')} vorhanden, letztes={bk.get('latest','keins')}")
|
||
if bk.get("latest_time"):
|
||
print(f" Zeitpunkt: {bk['latest_time']}")
|
||
|
||
|
||
def _print_integrity(r):
|
||
print(f"\n{'='*60}")
|
||
print("INTEGRITAETS-CHECKS")
|
||
print(f"{'='*60}")
|
||
for name in ("audit_log", "consent_log"):
|
||
info = r.get(name, {})
|
||
status = info.get("status", "?")
|
||
print(f" {name:15s} {status}")
|
||
for e in info.get("errors", []):
|
||
print(f" {e}")
|
||
if info.get("note"):
|
||
print(f" ({info['note']})")
|
||
|
||
|
||
def _print_alerts(alerts):
|
||
print(f"\n{'='*60}")
|
||
print("SICHERHEITS-ALERTS")
|
||
print(f"{'='*60}")
|
||
if not alerts:
|
||
print(" Keine Alerts.")
|
||
for a in alerts:
|
||
print(f" [{a['severity']:8s}] {a['metric']}: {a['value']}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
cmd = sys.argv[1] if len(sys.argv) > 1 else "all"
|
||
|
||
if cmd == "health":
|
||
_print_health(check_health())
|
||
elif cmd == "metrics":
|
||
_print_metrics(collect_metrics())
|
||
elif cmd == "integrity":
|
||
r = check_integrity()
|
||
_print_integrity(r)
|
||
ok = all(r[k].get("status") in ("PASS", None) for k in ("audit_log", "consent_log"))
|
||
sys.exit(0 if ok else 1)
|
||
elif cmd == "alerts":
|
||
_print_alerts(get_alert_metrics())
|
||
elif cmd == "nightly":
|
||
report = run_nightly()
|
||
_print_integrity(report["integrity"])
|
||
_print_alerts(report["alerts"])
|
||
print(f"\n GESAMT: {report['overall']}")
|
||
out = _BASE_DIR / f"monitoring_nightly_{datetime.now().strftime('%Y-%m-%d')}.json"
|
||
with open(out, "w", encoding="utf-8") as f:
|
||
json.dump(report, f, ensure_ascii=False, indent=2)
|
||
print(f" Report: {out}")
|
||
sys.exit(0 if report["overall"] == "PASS" else 1)
|
||
elif cmd == "all":
|
||
_print_health(check_health())
|
||
m = collect_metrics()
|
||
_print_metrics(m)
|
||
r = check_integrity()
|
||
_print_integrity(r)
|
||
_print_alerts(get_alert_metrics())
|
||
else:
|
||
print("Nutzung: python aza_monitoring.py [health|metrics|integrity|alerts|nightly|all]")
|
||
sys.exit(1)
|