Files
aza/AzA march 2026 - Kopie (4)/aza_audit_log.py
2026-03-30 07:59:11 +02:00

366 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
AZA MedWork Audit-Logging (DSG-konform, tamper-evident).
Protokolliert sicherheitsrelevante Ereignisse in einer
Append-only-Logdatei mit SHA-256-Hash-Kette.
Keine Patientendaten, keine Prompts, keine KI-Antworten.
Format pro Zeile (pipe-separiert, 8 Felder):
TIMESTAMP | EVENT | USER | STATUS | SOURCE | DETAIL | PREV_HASH | ENTRY_HASH
Konfiguration:
AZA_AUDIT_LOG Pfad zur Logdatei (Standard: aza_audit.log)
AZA_AUDIT_ROTATE_MB Max. Groesse in MB vor Rotation (Standard: 10)
AZA_AUDIT_KEEP Anzahl rotierter Dateien (Standard: 12)
"""
import os
import sys
import json
import hashlib
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from aza_config import get_writable_data_dir
_LOG_FILE = Path(os.getenv("AZA_AUDIT_LOG", str(Path(get_writable_data_dir()) / "aza_audit.log")))
_ROTATE_MB = float(os.getenv("AZA_AUDIT_ROTATE_MB", "10"))
_KEEP_COUNT = int(os.getenv("AZA_AUDIT_KEEP", "12"))
_GENESIS_HASH = "0" * 64
_CHAIN_HEADER_PREFIX = "#CHAIN_FROM="
def _compute_entry_hash(payload: str, prev_hash: str) -> str:
"""SHA-256 ueber prev_hash + payload (ohne Hash-Felder)."""
raw = prev_hash + payload
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _get_last_hash(path: Path = None) -> str:
"""Liest den letzten entry_hash aus einer Logdatei."""
if path is None:
path = _LOG_FILE
if not path.exists():
return _GENESIS_HASH
last_hash = _GENESIS_HASH
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
if line.startswith(_CHAIN_HEADER_PREFIX):
last_hash = line[len(_CHAIN_HEADER_PREFIX):].strip()
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 8:
last_hash = parts[7]
except OSError:
pass
return last_hash
def _rotate_if_needed():
"""Rotiert die Logdatei wenn sie die Maximalgroesse ueberschreitet.
Speichert den letzten Hash als Chain-Header in der neuen Datei."""
if not _LOG_FILE.exists():
return False
try:
size_mb = _LOG_FILE.stat().st_size / (1024 * 1024)
except OSError:
return False
if size_mb < _ROTATE_MB:
return False
last_hash = _get_last_hash(_LOG_FILE)
for i in range(_KEEP_COUNT - 1, 0, -1):
src = _LOG_FILE.parent / f"{_LOG_FILE.stem}.{i}{_LOG_FILE.suffix}"
dst = _LOG_FILE.parent / f"{_LOG_FILE.stem}.{i + 1}{_LOG_FILE.suffix}"
if src.exists():
try:
shutil.move(str(src), str(dst))
except OSError:
pass
rotated = _LOG_FILE.parent / f"{_LOG_FILE.stem}.1{_LOG_FILE.suffix}"
try:
shutil.move(str(_LOG_FILE), str(rotated))
except OSError:
pass
try:
with open(_LOG_FILE, "w", encoding="utf-8") as f:
f.write(f"{_CHAIN_HEADER_PREFIX}{last_hash}\n")
except OSError:
pass
return True
def log_event(
event: str,
user_id: str = "",
success: bool = True,
detail: str = "",
source: str = "desktop",
):
"""Schreibt einen Audit-Eintrag mit Hash-Kette.
Format: TS | EVENT | USER | STATUS | SOURCE | DETAIL | PREV_HASH | ENTRY_HASH
"""
_rotate_if_needed()
ts = datetime.now(timezone.utc).isoformat(timespec="milliseconds")
status = "OK" if success else "FAIL"
safe_detail = detail.replace("|", "/").replace("\n", " ").replace("\r", "")
if len(safe_detail) > 200:
safe_detail = safe_detail[:197] + "..."
prev_hash = _get_last_hash()
payload = f"{ts} | {event} | {user_id} | {status} | {source} | {safe_detail}"
entry_hash = _compute_entry_hash(payload, prev_hash)
line = f"{payload} | {prev_hash} | {entry_hash}\n"
try:
_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(_LOG_FILE, "a", encoding="utf-8") as f:
f.write(line)
except OSError as e:
print(f"AUDIT-LOG FEHLER: {e}", file=sys.stderr)
def verify_integrity(path: Path = None) -> tuple[bool, list]:
"""Prueft die Integritaet der Hash-Kette einer Logdatei.
Returns: (ok, errors) errors enthaelt Zeilennummer + Beschreibung.
"""
if path is None:
path = _LOG_FILE
path = Path(path)
if not path.exists():
return True, []
errors = []
prev_hash = _GENESIS_HASH
line_num = 0
try:
with open(path, "r", encoding="utf-8") as f:
for raw_line in f:
raw_line = raw_line.strip()
if not raw_line:
continue
if raw_line.startswith("#"):
if raw_line.startswith(_CHAIN_HEADER_PREFIX):
prev_hash = raw_line[len(_CHAIN_HEADER_PREFIX):].strip()
continue
line_num += 1
parts = [p.strip() for p in raw_line.split("|")]
if len(parts) < 8:
errors.append(f"Zeile {line_num}: Ungueltig ({len(parts)} Felder, erwartet 8)")
continue
stored_prev = parts[6]
stored_hash = parts[7]
if stored_prev != prev_hash:
errors.append(
f"Zeile {line_num}: prev_hash stimmt nicht "
f"(erwartet {prev_hash[:16]}..., gefunden {stored_prev[:16]}...)"
)
payload = " | ".join(parts[:6])
expected_hash = _compute_entry_hash(payload, stored_prev)
if stored_hash != expected_hash:
errors.append(
f"Zeile {line_num}: entry_hash stimmt nicht "
f"(erwartet {expected_hash[:16]}..., gefunden {stored_hash[:16]}...)"
)
prev_hash = stored_hash
except OSError as e:
errors.append(f"Dateifehler: {e}")
return len(errors) == 0, errors
def verify_all_rotations() -> tuple[bool, dict]:
"""Prueft die Integritaet ueber alle rotierten Logdateien hinweg.
Returns: (all_ok, results_per_file)
"""
results = {}
all_ok = True
rotation_files = []
for i in range(_KEEP_COUNT, 0, -1):
rp = _LOG_FILE.parent / f"{_LOG_FILE.stem}.{i}{_LOG_FILE.suffix}"
if rp.exists():
rotation_files.append(rp)
if _LOG_FILE.exists():
rotation_files.append(_LOG_FILE)
for fp in rotation_files:
ok, errs = verify_integrity(fp)
results[str(fp.name)] = {"ok": ok, "errors": errs}
if not ok:
all_ok = False
return all_ok, results
def export_audit_log(output_path: Optional[str] = None) -> str:
"""Exportiert das Audit-Log als JSON."""
entries = _parse_log_file(_LOG_FILE)
if output_path is None:
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_path = str(Path(get_writable_data_dir()) / f"audit_export_{ts}.json")
ok, errs = verify_integrity(_LOG_FILE)
export_data = {
"export_timestamp": datetime.now(timezone.utc).isoformat(),
"total_entries": len(entries),
"source_file": str(_LOG_FILE),
"integrity": "PASS" if ok else "FAIL",
"integrity_errors": errs,
"entries": entries,
}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(export_data, f, ensure_ascii=False, indent=2)
return output_path
def _parse_log_file(path: Path) -> list:
"""Parst eine Logdatei in eine Liste von Dicts."""
entries = []
if not path.exists():
return entries
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 6:
entry = {
"timestamp": parts[0],
"event": parts[1],
"user_id": parts[2],
"status": parts[3],
"source": parts[4],
"detail": parts[5],
}
if len(parts) >= 8:
entry["prev_hash"] = parts[6]
entry["entry_hash"] = parts[7]
entries.append(entry)
except OSError:
pass
return entries
def get_log_stats() -> dict:
"""Gibt Statistiken ueber das Audit-Log zurueck."""
entries = _parse_log_file(_LOG_FILE)
ok, _ = verify_integrity(_LOG_FILE)
stats = {
"log_file": str(_LOG_FILE),
"exists": _LOG_FILE.exists(),
"size_mb": 0.0,
"total_lines": len(entries),
"integrity": "PASS" if ok else "FAIL",
"events": {},
"first_entry": None,
"last_entry": None,
}
if _LOG_FILE.exists():
try:
stats["size_mb"] = round(_LOG_FILE.stat().st_size / (1024 * 1024), 3)
except OSError:
pass
for e in entries:
ev = e.get("event", "")
stats["events"][ev] = stats["events"].get(ev, 0) + 1
if entries:
stats["first_entry"] = entries[0].get("timestamp")
stats["last_entry"] = entries[-1].get("timestamp")
return stats
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="AZA MedWork Audit-Log")
sub = parser.add_subparsers(dest="command")
p_verify = sub.add_parser("verify", help="Integritaet pruefen")
p_verify.add_argument("--file", help="Logdatei (Standard: aktuelle)", default=None)
p_verify.add_argument("--all", action="store_true", help="Alle Rotationsdateien pruefen")
sub.add_parser("stats", help="Statistiken anzeigen")
p_export = sub.add_parser("export", help="Log exportieren")
p_export.add_argument("--output", help="Ausgabepfad", default=None)
args = parser.parse_args()
if args.command == "verify":
if args.all:
ok, results = verify_all_rotations()
for fname, res in results.items():
status = "PASS" if res["ok"] else "FAIL"
print(f" {fname}: {status}")
for e in res["errors"]:
print(f" {e}")
print(f"\nGESAMT: {'PASS' if ok else 'FAIL'}")
sys.exit(0 if ok else 1)
else:
fp = Path(args.file) if args.file else _LOG_FILE
ok, errs = verify_integrity(fp)
print(f"Datei: {fp}")
print(f"Integritaet: {'PASS' if ok else 'FAIL'}")
for e in errs:
print(f" {e}")
sys.exit(0 if ok else 1)
elif args.command == "stats":
s = get_log_stats()
print(f"Datei: {s['log_file']}")
print(f"Existiert: {s['exists']}")
print(f"Groesse: {s['size_mb']} MB")
print(f"Eintraege: {s['total_lines']}")
print(f"Integritaet: {s['integrity']}")
if s["first_entry"]:
print(f"Erster: {s['first_entry']}")
print(f"Letzter: {s['last_entry']}")
if s["events"]:
print("Events:")
for ev, cnt in sorted(s["events"].items()):
print(f" {ev}: {cnt}")
elif args.command == "export":
path = export_audit_log(args.output)
print(f"Exportiert: {path}")
else:
parser.print_help()