Files
aza/AzA march 2026 - Kopie (28)/aza_admin_control_shell.py
2026-05-20 00:09:28 +02:00

2389 lines
93 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
AZA Kontroll-Hülle — separates read-only Kontrollwerkzeug für Empfang-/Hetzner-Daten.
Start:
python -u .\\aza_admin_control_shell.py
Umgebungsvariablen (optional):
AZA_CONTROL_SSH_HOST — Default root@178.104.51.177
AZA_CONTROL_REMOTE_DATA — Remote data-Pfad (/root/aza-app/data)
"""
from __future__ import annotations
import csv
import hashlib
import html
import json
import os
import queue
import re
import sqlite3
import subprocess
import threading
import unicodedata
import urllib.error
import urllib.parse
import urllib.request
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple
import shlex
import tkinter as tk
from tkinter import filedialog, messagebox, scrolledtext, ttk
# --- Konstanten / Pfade ---
DEFAULT_SSH_SPEC = os.environ.get("AZA_CONTROL_SSH_HOST", "root@178.104.51.177").strip()
DEFAULT_REMOTE_DATA = os.environ.get("AZA_CONTROL_REMOTE_DATA", "/root/aza-app/data").strip().rstrip("/")
_SNAP_ROOT_WIN = Path(r"C:\Users\surov\Documents\AzA Drive") / "AZA_CONTROL_SNAPSHOTS"
REMOTE_CORE_REQUIRED_POST_SCP = [
"empfang_practices.json",
"empfang_accounts.json",
]
REMOTE_COPY_OPTIONAL_ORDERED = [
"empfang_sessions.json",
"empfang_devices.json",
"empfang_practice_links.json",
"empfang_user_notes.json",
"empfang_tasks.json",
"stripe_events.log.jsonl",
"stripe_webhook.sqlite",
"license_status_cache.json",
]
STALE_SESSION_SEC = int(os.environ.get("AZA_CONTROL_STALE_SESSION_DAYS", "180")) * 86400
SOLL_PROFIL = {
"lindengut": {
"practice_id": "prac_883ddc21fb6a",
"name_must_contain": "lindengut",
"expected_display_names_contains": frozenset(
{
"andre m surovy",
"andre m. surovy",
"anja",
"empfang laptop",
"jelena empfang",
"jelena",
"test",
"zeno",
"empfang chat tester",
}
),
"expect_admin_contains": ("andre", "surovy"),
"expect_admin_email": frozenset({"andre.surovy@haut-winterthur.ch".lower()}),
},
"obergasse": {
"practice_id": "prac_e864d294474e",
"name_must_contain": "obergasse",
"admin_display_names": frozenset({"birgit"}),
"license_customer_email_expect": frozenset({"dermapraxis.meier@hin.ch".lower()}),
"roles": {"susanne": "empfang", "birgit": "admin"},
"andre_must_not_admin": True,
"andre_display_hints": frozenset({"andre m. surovy", "andre", "andre m surovy"}),
},
}
# --- Hilfen ---
def _now_local_stamp() -> str:
return datetime.now().strftime("%Y%m%d_%H%M%S")
def _norm_name_key(s: str) -> str:
s = unicodedata.normalize("NFKD", (s or "")).strip().lower()
for ch in (".", ",", "-", "", "'", ""):
s = s.replace(ch, " ")
return " ".join(s.split())
def _license_suffix(key: Optional[str]) -> str:
v = (key or "").strip()
if not v:
return ""
groups = [g for g in re.split(r"[-_\s]", v.upper()) if g]
return groups[-1] if groups else v[-12:]
def _license_hash_hex(key: Optional[str]) -> str:
k = (key or "").strip()
if not k:
return ""
return hashlib.sha256(k.encode("utf-8")).hexdigest()
def _mask_license(val: Optional[str]) -> str:
v = (val or "").strip()
if not v:
return ""
suf = _license_suffix(v)
return f"{_license_hash_hex(v)[:12]}…*{suf}"
def _mask_az_keys_in_line(s: str) -> str:
pat = re.compile(r"\bAZA-(?:[A-Za-z0-9]{4}-){3}[A-Za-z0-9]{4}\b")
def _rep(m: re.Match) -> str:
return _mask_license(m.group(0))
return pat.sub(_rep, s)
def _short(s: Optional[str], n: int = 14) -> str:
t = str(s or "").strip()
if len(t) <= n:
return t
return t[: max(6, n - 1)] + ""
def _sha256_file(path: Path) -> Optional[str]:
try:
h = hashlib.sha256()
with path.open("rb") as bf:
for chunk in iter(lambda: bf.read(65536), b""):
h.update(chunk)
return h.hexdigest()
except Exception:
return None
_IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
def shutil_which(cmd: str) -> Optional[str]:
import shutil
return shutil.which(cmd)
def _first_str(obj: Mapping[str, Any], names: Iterable[str]) -> str:
for n in names:
v = obj.get(n)
if v is None:
continue
s = str(v).strip()
if s:
return s[:512]
return ""
def _parse_ts_maybe(v: Optional[str]) -> Optional[float]:
if v is None:
return None
if isinstance(v, (int, float)):
try:
return float(v)
except Exception:
return None
s = str(v).strip()
if not s:
return None
try:
if s.isdigit() or (s.startswith("-") and s[1:].isdigit()):
return float(int(s))
except Exception:
pass
try:
s2 = s.replace("Z", "+00:00")
return datetime.fromisoformat(s2).timestamp()
except Exception:
return None
def _ts_iso_or_raw(v: Optional[str]) -> str:
if not v:
return ""
ts = _parse_ts_maybe(v)
if ts is None:
return str(v)[:64]
try:
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
except Exception:
return str(v)[:64]
def _open_sqlite_ro(path: Path) -> Optional[sqlite3.Connection]:
if not path.is_file():
return None
try:
uri = path.resolve().as_uri()
return sqlite3.connect(f"{uri}?mode=ro", uri=True)
except Exception:
return None
def _sqlite_license_rows(sqlite_path: Path) -> Tuple[List[str], List[sqlite3.Row]]:
con = _open_sqlite_ro(sqlite_path)
if not con:
return [], []
try:
con.row_factory = sqlite3.Row
cur = con.cursor()
try:
cur.execute("SELECT * FROM licenses ORDER BY subscription_id COLLATE NOCASE")
rows = list(cur.fetchall())
names = rows[0].keys() if rows else [c[1] for c in cur.execute("PRAGMA table_info(licenses)").fetchall()]
if rows:
names = list(rows[0].keys())
return names, rows
except sqlite3.Error:
try:
cur.execute("SELECT * FROM sqlite_master WHERE type='table'")
return [], []
except sqlite3.Error:
return [], []
finally:
try:
con.close()
except Exception:
pass
def _row_to_license_rec(row: sqlite3.Row) -> Dict[str, Any]:
d = dict(row)
lk = str(d.get("license_key") or "").strip()
return {
"subscription_id": str(d.get("subscription_id") or "").strip(),
"customer_id_stripe": str(d.get("customer_id") or "").strip(),
"status": str(d.get("status") or "").strip(),
"lookup_key": str(d.get("lookup_key") or "").strip(),
"allowed_users": str(d.get("allowed_users") or ""),
"devices_per_user": str(d.get("devices_per_user") or ""),
"customer_email_license": str(d.get("customer_email") or "").strip(),
"client_reference_id": str(d.get("client_reference_id") or "").strip(),
"current_period_end_raw": str(d.get("current_period_end") or ""),
"updated_at_raw": str(d.get("updated_at") or ""),
"license_key_plain": lk,
"practice_id": str(d.get("practice_id") or "").strip(),
}
def _woo_order_from_ref(ref: str) -> str:
ref = (ref or "").strip()
if not ref:
return ""
mo = re.search(r"(?:woo|(?:wc))[_-]?order[_-]?(\d+)", ref, re.I)
if mo:
return mo.group(1)
mo2 = re.search(r"\b(?:order[#\s_-]*)(\d{3,})\b", ref, re.I)
return mo2.group(1) if mo2 else ""
def _period_end_human(val: Optional[str]) -> str:
v = val and str(val).strip()
if not v:
return ""
try:
secs = int(v)
return datetime.fromtimestamp(secs, tz=timezone.utc).strftime("%Y-%m-%d %H:%MZ")
except Exception:
return v[:32]
def _unix_db_ts_display(val: Optional[str]) -> str:
"""Unix-Sekunden aus SQLite (updated_at o.ä.) als UTC lesbar."""
v = val and str(val).strip()
if not v:
return ""
try:
secs = int(float(v))
return datetime.fromtimestamp(secs, tz=timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
except Exception:
return v[:28]
def _jsonl_erstes_event_utc(path: Path, needle: str, max_lines: int = 48000) -> str:
"""Früheste Log-Zeile in stripe_events, die needle enthält (ts oben im JSON). Näherung für 'Checkout/Lizenz im Log'."""
needle = (needle or "").strip()
if not needle or not path.is_file():
return ""
best: Optional[int] = None
n = 0
try:
with path.open("r", encoding="utf-8", errors="replace") as fh:
for line in fh:
n += 1
if n > max_lines:
break
if needle not in line:
continue
try:
rec = json.loads(line)
except Exception:
continue
ts = rec.get("ts")
if ts is None:
continue
try:
ti = int(ts)
except (TypeError, ValueError):
continue
if best is None or ti < best:
best = ti
except Exception:
return ""
if best is None:
return ""
return datetime.fromtimestamp(best, tz=timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
def _tree_column_heading(col: str) -> str:
"""Kurze Überschriften; Fallback wie bisher."""
pretty = {
"license_key_display": "lizenz anzeige",
"stripe_letzte_db_aenderung_utc": "letzte stripe-db änderung",
"erstes_passendes_stripe_log": "erstes log-event (abo)",
"billing_or_customer_snippet": "rechnung/kunde (stichwort)",
}
if col in pretty:
return pretty[col]
return col.replace("_", " ")
def _tree_sort_key_tuple(column: str, raw: str) -> Tuple[Any, ...]:
v = (raw or "").strip()
if column in ("user_count", "admin_count", "idx", "sessions_count_approx"):
try:
return (0, int(v))
except ValueError:
return (1, v.lower())
# alles andere: case-insensitive String
return (2, v.lower())
def _stripe_jsonl_billing_hints_by_sub(sqlite_sub_ids: Iterable[str], path: Path, cap_lines: int = 28000) -> Dict[str, str]:
by_sub = {sid: [] for sid in sqlite_sub_ids if sid}
if not path.is_file() or not by_sub:
return {k: "" for k in by_sub}
tails = defaultdict(list)
try:
with path.open("r", encoding="utf-8", errors="replace") as fh:
n = 0
for line in fh:
n += 1
if n > cap_lines:
break
low = line.lower()
if "billing" not in low and "address" not in low:
continue
for sid in by_sub.keys():
if sid and sid in line:
tails[sid].append(line.strip()[:400])
except Exception:
pass
out: Dict[str, str] = {}
for sid, chunks in tails.items():
seen = []
for c in chunks[:12]:
if c not in seen:
seen.append(c)
out[sid] = (" | ".join(seen))[:1600]
return out
def _extract_billing_friendly(snippet: str) -> str:
if not snippet:
return ""
out = []
emails = set(re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", snippet))
for em in sorted(emails)[:6]:
out.append(em)
m = re.search(r'"city"\s*:\s*"([^"]{2,})"', snippet)
if m:
out.append(f"stadt:{m.group(1)}")
m2 = re.search(r'"line1"\s*:\s*"([^"]{4,})"', snippet)
if m2:
out.append(f"Zeile1:{m2.group(1)[:80]}")
m3 = re.search(r'"postal_code"\s*:\s*"([^"]+)"', snippet)
if m3:
out.append(f"PLZ:{m3.group(1)}")
return "; ".join(out)[:520]
def _account_license_supplement_accounts(accounts: Dict[str, dict], seen_plain_keys: set) -> List[Dict[str, Any]]:
extra: List[Dict[str, Any]] = []
for uk, ac in accounts.items():
lk = _first_str(ac, ("license_key", "activation_key", "stripe_license_key"))
if not lk:
continue
key_n = lk.strip().upper()
if key_n in seen_plain_keys:
continue
seen_plain_keys.add(key_n)
pid = str(ac.get("practice_id") or "").strip()
extra.append(
{
"subscription_id": "",
"customer_id_stripe": "",
"status": "",
"lookup_key": _first_str(ac, ("lookup_key", "plan_key", "tier")),
"allowed_users": "",
"devices_per_user": "",
"customer_email_license": _first_str(ac, ("stripe_customer_email", "billing_email", "email")),
"client_reference_id": "",
"current_period_end_raw": "",
"updated_at_raw": "",
"license_key_plain": lk.strip(),
"practice_id": pid,
"_source_primary": "empfang_accounts.json",
}
)
return extra
def _merge_license_customer_email_per_practice(license_bundle_rows: List[Dict[str, Any]]) -> Dict[str, str]:
best_ts: Dict[str, int] = {}
best_mail: Dict[str, str] = {}
for r in license_bundle_rows:
pid = str(r.get("practice_id") or "").strip()
mail = str(r.get("customer_email_license") or "").strip()
if not pid or not mail or "@" not in mail:
continue
try:
u = int(float(str(r.get("updated_at_raw") or "0") or "0"))
except Exception:
u = 0
prev = best_ts.get(pid, -1)
if u >= prev:
best_ts[pid] = u
best_mail[pid] = mail
return best_mail
def _build_license_display_rows(stripe_sql: Path, stripe_jsonl: Path, practices: Mapping, accounts: Dict[str, dict]) -> List[Dict[str, Any]]:
_, sql_rows = _sqlite_license_rows(stripe_sql)
recs: List[Dict[str, Any]] = []
plain_seen: set = set()
for row in sql_rows:
rr = _row_to_license_rec(row)
if rr["license_key_plain"]:
plain_seen.add(rr["license_key_plain"].strip().upper())
rr["_source_primary"] = "stripe_webhook.sqlite (+ ggf. JSONL)"
recs.append(rr)
recs.extend(_account_license_supplement_accounts(accounts, plain_seen))
sid_set = list({str(r["subscription_id"]) for r in recs if str(r["subscription_id"])} )
hints = _stripe_jsonl_billing_hints_by_sub(set(sid_set), stripe_jsonl)
pname = {pid: str((practices.get(pid) or {}).get("name") or "").strip() for pid in practices if isinstance(practices.get(pid), dict)}
out_rows: List[Dict[str, str]] = []
for r in recs:
pid = str(r.get("practice_id") or "").strip()
lk = str(r.get("license_key_plain") or "")
sub = str(r.get("subscription_id") or "")
bill_raw = _extract_billing_friendly(hints.get(sub, ""))
woo = _woo_order_from_ref(str(r.get("client_reference_id") or ""))
u_at = str(r.get("updated_at_raw") or "").strip()
first_log = _jsonl_erstes_event_utc(stripe_jsonl, sub) if sub else ""
stand_db = _unix_db_ts_display(u_at)
row = {
"license_key_plain": lk,
"license_key_masked": _mask_license(lk),
"license_suffix": _license_suffix(lk),
"license_sha256": _license_hash_hex(lk),
"customer_email_license": str(r.get("customer_email_license") or ""),
"practice_id": pid,
"practice_name": pname.get(pid, ""),
"subscription_id": sub,
"woo_order_id": woo,
"stripe_customer_id": str(r.get("customer_id_stripe") or ""),
"lookup_key": str(r.get("lookup_key") or ""),
"status": str(r.get("status") or ""),
"stripe_letzte_db_aenderung_utc": stand_db,
"erstes_passendes_stripe_log": first_log,
"current_period_end": _period_end_human(str(r.get("current_period_end_raw") or "")),
"billing_or_customer_snippet": bill_raw,
"sources": str(r.get("_source_primary") or "gemischt"),
"allowed_users": str(r.get("allowed_users") or ""),
"devices_per_user": str(r.get("devices_per_user") or ""),
"client_reference_id": str(r.get("client_reference_id") or "")[:120],
"updated_at_raw": str(r.get("updated_at_raw") or ""),
}
out_rows.append(row)
return out_rows
def _session_device_ip(s: Mapping[str, Any]) -> str:
return _first_str(
s,
(
"ip",
"ip_address",
"remote_ip",
"client_ip",
"last_ip",
"login_ip",
),
)
def _user_agent_str(s: Mapping[str, Any]) -> str:
return _first_str(s, ("user_agent", "userAgent", "ua", "browser_ua"))[:400]
def _browser_os_guess(ua: str) -> str:
if not ua:
return ""
parts = []
low = ua.lower()
if "edg/" in low or "edge" in low:
parts.append("Edge")
elif "chrome" in low and "edg" not in low:
parts.append("Chrome")
elif "firefox" in low:
parts.append("Firefox")
elif "safari" in low and "chrome" not in low:
parts.append("Safari")
if "windows" in low:
parts.append("Windows")
elif "mac os" in low or "macintosh" in low:
parts.append("macOS")
elif "android" in low:
parts.append("Android")
elif "iphone" in low or "ipad" in low:
parts.append("iOS")
return " / ".join(parts)[:120]
def _device_platform_str(d: Mapping[str, Any]) -> str:
parts = [
_first_str(d, ("device_name", "name", "label")),
_first_str(d, ("browser", "browser_name")),
_first_str(d, ("os", "os_name", "platform")),
]
ua = _user_agent_str(d)
if ua and not any(parts):
parts.append(_browser_os_guess(ua))
return " ".join(p for p in parts if p).strip()[:200]
def _count_json_notes(path: Path) -> str:
if not path.is_file():
return "fehlt im Snapshot"
d = _load_json(path, None)
if d is None or isinstance(d, dict) and d.get("__error__"):
return ""
if isinstance(d, dict):
for k in ("notes", "items", "data", "list"):
v = d.get(k)
if isinstance(v, (list, dict)):
return str(len(v))
return str(len(d))
if isinstance(d, list):
return str(len(d))
return "? " + type(d).__name__
def _count_generic_json(path: Path) -> str:
if not path.is_file():
return "fehlt im Snapshot"
d = _load_json(path, None)
if d is None:
return "— fehlt"
if isinstance(d, dict):
err = d.get("__error__")
if isinstance(err, str):
return f"(Fehler: {err[:80]})"
return f"Dict mit {len(d)} Schlüsseln (Inhalte nicht ausgewertet)"
if isinstance(d, list):
return str(len(d)) + " Einträge (Inhalte nicht ausgewertet)"
return "Unbekanntes Format"
def _stripe_jsonl_summary(path: Path, limit_lines: int = 8000) -> Dict[str, Any]:
out: Dict[str, Any] = {"lines_seen": 0, "stripe_like_lines": 0, "emails_seen": [], "skipped_tail": False}
if not path.is_file():
out["notes"] = "Datei nicht im Snapshot vorhanden"
return out
email_pat = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
emails_found: List[str] = []
stripe_kw = ("subscription", "invoice", "customer", "checkout", "license", "stripe")
lines_seen = 0
with path.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
lines_seen += 1
if lines_seen > limit_lines:
out["skipped_tail"] = True
break
low = line.lower()
if any(k in low for k in stripe_kw):
out["stripe_like_lines"] += 1
for em in email_pat.findall(line)[:15]:
if em.lower() not in [x.lower() for x in emails_found]:
emails_found.append(em)
if len(emails_found) >= 40:
break
out["lines_seen"] = lines_seen
out["emails_seen"] = emails_found[:30]
return out
def _sqlite_summary(sqlite_path: Path) -> str:
con = _open_sqlite_ro(sqlite_path)
if not con:
return "nicht lesbar oder fehlend"
try:
cur = con.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [r[0] for r in cur.fetchall()]
return f"{len(tables)} Tabellen: " + ", ".join(tables[:12]) + ("" if len(tables) > 12 else "")
except Exception as exc:
return f"Fehler: {exc}"
finally:
try:
con.close()
except Exception:
pass
def _load_json(path: Path, default):
try:
if path.is_file():
return json.loads(path.read_text(encoding="utf-8"))
except Exception as exc:
return {"__error__": str(exc)}
return default
def lookup_ip_geo_optional(ip: str, timeout: int = 14) -> str:
"""Optional externe Abfrage (nur auf Knopfdruck). Fehler -> leerer String."""
ip = (ip or "").strip()
if not ip or not _IP_RE.fullmatch(ip):
return ""
q = urllib.parse.urlencode({"fields": "status,message,country,regionName,city,isp"})
url = f"http://ip-api.com/json/{ip}?{q}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "AZA-Kontroll-Huelle/1 (read-only admin)"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = json.loads(resp.read().decode("utf-8", errors="replace"))
if data.get("status") != "success":
return f"API: {data.get('message') or 'fail'}"
bits = [data.get("city"), data.get("regionName"), data.get("country"), data.get("isp")]
return ", ".join(str(b) for b in bits if b)[:240]
except Exception as exc:
return f"Fehler: {type(exc).__name__}"
def analyse_snapshot(
raw_copy_dir: Path,
scp_optional_misses: Optional[List[str]] = None,
) -> Tuple[Dict[str, Any], List[str]]:
warnings: List[str] = []
if scp_optional_misses:
warnings.extend(scp_optional_misses)
paths = {
"practices": raw_copy_dir / "empfang_practices.json",
"accounts": raw_copy_dir / "empfang_accounts.json",
"sessions": raw_copy_dir / "empfang_sessions.json",
"devices": raw_copy_dir / "empfang_devices.json",
"links": raw_copy_dir / "empfang_practice_links.json",
"notes": raw_copy_dir / "empfang_user_notes.json",
"tasks": raw_copy_dir / "empfang_tasks.json",
"stripe_log": raw_copy_dir / "stripe_events.log.jsonl",
"stripe_sql": raw_copy_dir / "stripe_webhook.sqlite",
}
for label, pth in paths.items():
if label in ("practices", "accounts"):
continue
if not pth.is_file():
warnings.append(f"Optionale Datei nicht vorhanden: {pth.name}")
practices_raw = _load_json(paths["practices"], {})
accounts_raw = _load_json(paths["accounts"], {})
sessions_raw = _load_json(paths["sessions"], {})
devices_raw = _load_json(paths["devices"], {})
links_raw = _load_json(paths["links"], {"links": []})
if isinstance(practices_raw, dict) and practices_raw.get("__error__"):
warnings.append(f"empfang_practices.json: {practices_raw.get('__error__')}")
practices: Dict[str, Any] = {}
else:
practices = practices_raw if isinstance(practices_raw, dict) else {}
if isinstance(accounts_raw, dict) and accounts_raw.get("__error__"):
warnings.append(f"empfang_accounts.json: {accounts_raw.get('__error__')}")
accounts: Dict[str, dict] = {}
else:
accounts = {k: v for k, v in (accounts_raw or {}).items() if isinstance(v, dict)}
if isinstance(sessions_raw, dict) and sessions_raw.get("__error__"):
warnings.append(f"empfang_sessions.json: {sessions_raw.get('__error__')}")
sessions_dict: Dict[str, dict] = {}
else:
sessions_dict = sessions_raw if isinstance(sessions_raw, dict) else {}
if isinstance(devices_raw, dict) and devices_raw.get("__error__"):
warnings.append(f"empfang_devices.json: {devices_raw.get('__error__')}")
devices_dict: Dict[str, dict] = {}
else:
devices_dict = devices_raw if isinstance(devices_raw, dict) else {}
if isinstance(links_raw, dict) and links_raw.get("__error__"):
warnings.append(f"empfang_practice_links.json: {links_raw.get('__error__')}")
links_list: List[Any] = []
elif isinstance(links_raw, dict):
ll = links_raw.get("links")
links_list = ll if isinstance(ll, list) else []
if not isinstance(ll, list):
warnings.append("empfang_practice_links.json: Schlüssel »links« ist keine Liste")
else:
links_list = []
warnings.append("empfang_practice_links.json: nicht als Objekt lesbar")
by_practice: Dict[str, List[dict]] = defaultdict(list)
for uid_k, a in accounts.items():
pid = str(a.get("practice_id") or "").strip()
if pid:
by_practice[pid].append(a)
license_rows_ui = _build_license_display_rows(paths["stripe_sql"], paths["stripe_log"], practices, accounts)
lic_customer_by_pid = _merge_license_customer_email_per_practice(license_rows_ui)
dup_names = defaultdict(list)
for pid, pdata in practices.items():
nm = _norm_name_key(pdata.get("name") or "") if isinstance(pdata, dict) else ""
if nm:
dup_names[nm].append(pid)
practice_rows = []
for pid in sorted(set(practices.keys()) | set(by_practice.keys())):
pdata = practices.get(pid) if isinstance(practices.get(pid), dict) else {}
name = str(pdata.get("name") or "").strip() if pdata else "(kein Stammsatz in practices.json)"
invite = str(pdata.get("invite_code") or "").strip()
practice_meta_email = _first_str(pdata, ("admin_email", "contact_email", "meta_email"))
users = sorted(by_practice.get(pid, []), key=lambda x: str(x.get("display_name") or "").lower())
admin_users = [u for u in users if str(u.get("role") or "").lower() == "admin"]
admin_names_join = "; ".join(sorted((str(a.get("display_name") or "?") for a in admin_users), key=str.lower))
admin_account_emails = "; ".join(
sorted({str(a.get("email") or "").strip() for a in admin_users if str(a.get("email") or "").strip()}, key=str.lower)
)
lic_cust_mail = lic_customer_by_pid.get(pid, "").strip()
hn: List[str] = []
nm_key = _norm_name_key(name)
if nm_key:
collide = dup_names.get(nm_key) or []
if len(collide) > 1:
hn.append("Namensgleich mehrerer IDs (Praxisname)")
wmsg = "Gleicher Praxisname mehrfacher Datensatz (normalisiert): " + "; ".join(sorted(collide))
if wmsg not in warnings:
warnings.append(wmsg)
if not admin_users:
hn.append("keine Admin-Konten")
warnings.append(f"Praxis '{name}' [{pid}] hat keine role=admin-Konten laut Kontenliste.")
if len(admin_users) > 1:
hn.append("mehrere Admins")
adm_mail_set_lower = {
str(a.get("email") or "").strip().lower()
for a in admin_users
if str(a.get("email") or "").strip()
}
if lic_cust_mail and adm_mail_set_lower and lic_cust_mail.lower() not in adm_mail_set_lower:
hn.append("Admin-E-Mail weicht von Lizenz-/Kunden-E-Mail ab")
warnings.append(f"Abweichende E-Mail: Praxis {_short(pid)} — Admin-Konto vs. Stripe-Kunden-Mail")
hint = "; ".join(hn) if hn else "OK"
practice_rows.append(
{
"practice_name": name,
"practice_id": pid,
"invite_code": invite,
"user_count": str(len(users)),
"admin_count": str(len(admin_users)),
"admin_names": admin_names_join or "",
"admin_account_emails": admin_account_emails or "",
"practice_meta_email": practice_meta_email or "",
"license_customer_email": lic_cust_mail or "",
"hints": hint,
}
)
users_rows = []
for uid, ac in sorted(accounts.items(), key=lambda x: str(x[1].get("display_name") or "").lower()):
pid = str(ac.get("practice_id") or "").strip()
pname = ""
pdata = practices.get(pid) if isinstance(practices.get(pid), dict) else {}
if pdata:
pname = str(pdata.get("name") or "").strip()
role = str(ac.get("role") or "").strip()
admin_src = "account.role" if role.lower() == "admin" else ""
lic_mail_pr = lic_customer_by_pid.get(pid, "").strip()
lk_plain = ac.get("license_key") or ac.get("activation_key")
users_rows.append(
{
"practice_name": pname or "(unbekannt)",
"practice_id": pid,
"user_id_short": _short(ac.get("user_id") or uid, 16),
"user_id_full": str(ac.get("user_id") or uid or "").strip(),
"display_name": str(ac.get("display_name") or "").strip(),
"login_name": str(ac.get("login_name") or "").strip(),
"email": str(ac.get("email") or "").strip(),
"practice_license_customer_email": lic_mail_pr or "",
"role": role,
"admin_source": admin_src,
"status": str(ac.get("status") or "active").strip(),
"updated": str(ac.get("updated") or ac.get("created") or "").strip(),
"license_key_plain": str(lk_plain or "").strip(),
"license_key_masked": _mask_license(str(lk_plain or "").strip()),
}
)
if not str(ac.get("email") or "").strip():
warnings.append(f"Leere E-Mail bei Konto {ac.get('display_name')!r} ({_short(uid, 10)})")
practice_ids_set = set(practices.keys()) | set(by_practice.keys())
practice_name_by_id = {
k: str((v or {}).get("name") or "").strip() for k, v in practices.items() if isinstance(v, dict)
}
session_rows = []
acc_by_uid = {str(a.get("user_id") or "").strip(): a for a in accounts.values()}
OB_ID = "prac_e864d294474e"
for idx, (tok, s) in enumerate(sorted(sessions_dict.items(), key=lambda x: str(x[0])[:12])):
if not isinstance(s, dict):
continue
pid = str(s.get("practice_id") or "").strip()
uid = str(s.get("user_id") or "").strip()
s_role = str(s.get("role") or "").strip()
acc = acc_by_uid.get(uid) or {}
acc_role = str(acc.get("role") or "").strip()
ua = _user_agent_str(s)
dip = _session_device_ip(s)
mism_parts: List[str] = []
if not uid.strip():
warnings.append(f"Session #{idx + 1} ohne user_id ({_session_device_ip(s) or tok[:8]}…)")
if acc and s_role and acc_role and s_role.lower() != acc_role.lower():
mism_parts.append(f"Session ≠ Konto Rolle ({s_role!r}/{acc_role!r})")
warnings.append(f"Session #{idx + 1} user {_short(uid)}: Session-Rolle {s_role!r} ≠ Kontorolle {acc_role!r}")
acc_pid = str(acc.get("practice_id") or "").strip()
if acc and pid and acc_pid and acc_pid != pid:
mism_parts.append("Session-practice≠Konto-practice")
warnings.append(f"Session #{idx + 1} Session-Praxis {pid} ≠ Konto-praxis {acc_pid}")
mismatch_display = "; ".join(mism_parts)
dsp = str(s.get("display_name") or acc.get("display_name") or "").strip()
if pid == OB_ID and "birgit" in _norm_name_key(dsp):
if s_role.lower() == "arzt" and acc_role.lower() == "admin":
bw = (
"Obergasse Birgit-Session zeigt weiter Rolle=arzt bei Kontoadmin — bitte Kontext prüfen"
)
warnings.append(bw)
fst = (
_first_str(s, ("first_seen", "created_at", "created"))
or _first_str(acc, ("created_at",))
)
last_raw = (
_first_str(s, ("last_seen", "updated_at", "updated", "ts", "heartbeat"))
or _first_str(acc, ("updated",))
)
ts_last = _parse_ts_maybe(last_raw)
if ts_last:
ago = datetime.now(timezone.utc).timestamp() - ts_last
if ago > STALE_SESSION_SEC:
warnings.append(f"Sehr alte Session (>{STALE_SESSION_SEC // 86400}d idle): {_short(uid)} @{pid}")
session_rows.append(
{
"idx": str(idx + 1),
"practice_name": practice_name_by_id.get(pid, ""),
"practice_id": pid,
"user_id_short": _short(uid, 22),
"user_id_full": uid,
"display_name": dsp,
"session_role": s_role or "",
"account_role": acc_role or "",
"device_id_short": _short(_first_str(s, ("device_id", "hardware_id")), 14),
"ip_address": dip or "",
"ort_nach_ip": "nicht aufgelöst",
"first_seen": _ts_iso_or_raw(fst),
"last_seen": _ts_iso_or_raw(last_raw),
"user_agent": ua[:260],
"browser_os": _browser_os_guess(ua),
"mismatch": mismatch_display or "",
"_source_row": "empfang_sessions.json",
"_ip_key": dip,
}
)
if pid and pid not in practice_ids_set:
warnings.append(f"Session verweist auf unbekannte practice_id: {pid}")
device_rows = []
for dk, d in sorted(devices_dict.items(), key=lambda x: str(x[0])):
if not isinstance(d, dict):
continue
pid = str(d.get("practice_id") or "").strip()
uid = str(d.get("user_id") or "").strip()
ua_d = _user_agent_str(d)
dip_d = _session_device_ip(d)
fst = _first_str(d, ("created_at", "first_seen", "registered_at"))
last_active = _first_str(d, ("last_active", "last_seen", "updated_at"))
dsp_dev = ""
if uid:
acc_d = acc_by_uid.get(uid) or {}
dsp_dev = str(acc_d.get("display_name") or "").strip()
device_rows.append(
{
"display_name": dsp_dev or "",
"device_id_short": _short(dk, 18),
"device_id_full": str(dk),
"practice_id": pid,
"practice_name": practice_name_by_id.get(pid, ""),
"user_id_short": _short(uid, 18),
"user_id_full": uid,
"device_name_browser_os": (_device_platform_str(d) + (" | " + _browser_os_guess(ua_d) if ua_d else ""))[
:260
],
"user_agent": ua_d[:260],
"ip_address": dip_d,
"ort_nach_ip": "nicht aufgelöst",
"first_seen": _ts_iso_or_raw(fst),
"last_seen": _ts_iso_or_raw(last_active),
"trust": str(d.get("trust_status") or "").strip(),
"_source_row": "empfang_devices.json",
"_ip_key": dip_d,
}
)
if not uid.strip():
warnings.append(f"Gerät {_short(str(dk), 12)} ohne user_id (@{pid or '?'})")
norm_dev_id = defaultdict(list)
for r in device_rows:
dk = str(r.get("device_id_full") or "").strip()
if dk:
norm_dev_id[dk].append(r.get("practice_id") or "")
for dk_full, plist in norm_dev_id.items():
uniq = sorted({p for p in plist if p})
if len(uniq) > 1:
warnings.append(f"gleiche device_id in mehreren Praxen: {_short(dk_full,14)}{uniq}")
ip_to_practices: Dict[str, set] = defaultdict(set)
ip_to_sess = defaultdict(int)
for r in session_rows:
ipa = str(r.get("ip_address") or "").strip()
if ipa and _IP_RE.fullmatch(ipa):
pid = str(r.get("practice_id") or "").strip()
ip_to_sess[ipa] += 1
if pid:
ip_to_practices[ipa].add(pid)
for r in device_rows:
ipa = str(r.get("ip_address") or "").strip()
if ipa and _IP_RE.fullmatch(ipa):
pid = str(r.get("practice_id") or "").strip()
if pid:
ip_to_practices[ipa].add(pid)
for ipa, plist in ip_to_practices.items():
if len(plist) > 1:
warnings.append(f"gleiche IP in mehreren Praxen ({ipa}): {sorted(plist)}")
unknown_practice_dev = [r for r in device_rows if r.get("practice_id") and str(r["practice_id"]) not in practice_ids_set]
for r in unknown_practice_dev[:50]:
warnings.append(f"Gerät mit unbekannter practice_id: {r.get('practice_id')} / dev {_short(r.get('device_id_full'),14)}")
link_rows: List[Dict[str, str]] = []
for L in links_list or []:
if not isinstance(L, dict):
continue
st = str(L.get("status") or "").strip()
ct = str(L.get("contact_type") or "").strip()
sp = str(L.get("source_practice_id") or "").strip()
tp = str(L.get("target_practice_id") or "").strip()
inv = str(L.get("invite_code_used") or L.get("invite_code") or "").strip()
lid = str(L.get("id") or "").strip()
cr = _ts_iso_or_raw(L.get("created_at") or L.get("updated_at") or "")
link_rows.append(
{
"status": st,
"contact_type": ct,
"source_practice": practice_name_by_id.get(sp, sp),
"target_practice": practice_name_by_id.get(tp, tp),
"source_id": _short(sp, 18),
"target_id": _short(tp, 18),
"invite_used": inv[:40],
"direction": str(L.get("direction") or ""),
"created_at": cr[:32],
"link_id_short": _short(lid, 14),
}
)
if not link_rows:
p_links = paths["links"]
if not p_links.is_file():
hint = "Hinweis: empfang_practice_links.json fehlt im Snapshot (optional)."
elif isinstance(links_raw, dict) and links_raw.get("__error__"):
hint = "Hinweis: practice_links konnte nicht geparst werden (siehe Warnliste)."
else:
hint = "Hinweis: Datei vorhanden, aber »links« ist leer [] (noch keine Praxis-Verbindungen)."
link_rows.append(
{
"status": "",
"contact_type": "",
"source_practice": hint,
"target_practice": "",
"source_id": "",
"target_id": "",
"invite_used": "",
"direction": "",
"created_at": "",
"link_id_short": "",
}
)
email_to_practices: Dict[str, set] = defaultdict(set)
login_to_practices: Dict[str, set] = defaultdict(set)
for ac in accounts.values():
pid = str(ac.get("practice_id") or "").strip()
em = str(ac.get("email") or "").strip().lower()
ln = str(ac.get("login_name") or "").strip().lower()
if em:
email_to_practices[em].add(pid)
if ln:
login_to_practices[ln].add(pid)
for em, pids in email_to_practices.items():
if len(pids) > 1:
warnings.append(f"E-Mail mehrfacher Praxis-Zuordnung: {em} -> Praxen {sorted(pids)}")
for ln, pids in login_to_practices.items():
if len(pids) > 1 and ln:
warnings.append(f"gleicher login_name mehrerer Praxen: {ln!r} -> {sorted(pids)}")
soll_messages: List[Dict[str, str]] = []
def _practice_row_by_id(target_pid: str) -> Optional[dict]:
for rr in practice_rows:
if rr.get("practice_id") == target_pid:
return rr
return None
lg = SOLL_PROFIL["lindengut"]
row_lg = _practice_row_by_id(lg["practice_id"])
if not row_lg:
soll_messages.append({"level": "red", "text": f"Lindengut-Soll: ID {lg['practice_id']} fehlt"})
else:
nm = row_lg.get("practice_name", "")
if lg["name_must_contain"].lower() not in nm.lower():
soll_messages.append({"level": "yellow", "text": f"Lindengut-Soll: Name erwartet Praxis Lindengut-Kontext — ist {nm}"})
else:
soll_messages.append({"level": "green", "text": "Lindengut-Soll: Name plausibel."})
admins_ok = []
for ac in by_practice.get(lg["practice_id"], []):
if str(ac.get("role")).lower() != "admin":
continue
nm_a = _norm_name_key(ac.get("display_name") or "")
if all(h in nm_a for h in lg["expect_admin_contains"]):
admins_ok.append(ac)
em_set = lg["expect_admin_email"]
if not admins_ok:
soll_messages.append({"level": "yellow", "text": "Lindengut-Soll: kein André M. Surovy als Admin erkannt"})
else:
good_em = False
for aa in admins_ok:
mail = str(aa.get("email") or "").strip().lower()
if mail and mail in em_set:
good_em = True
soll_messages.append(
{
"level": "green" if good_em else "yellow",
"text": (
"Lindengut-Soll: André-Admin gefunden mit erwarteter E-Mail."
if good_em
else "Lindengut-Soll: André-Admin nicht mit andre.surovy@haut-winterthur.ch verknüpft"
),
}
)
dnames_by_norm = defaultdict(list)
for ac in by_practice.get(lg["practice_id"], []):
dnames_by_norm[_norm_name_key(ac.get("display_name") or "")].append(ac)
missed = []
for exp in lg["expected_display_names_contains"]:
if not any(exp in k or k.startswith(exp[:5]) for k in dnames_by_norm.keys()):
missed.append(exp)
if missed:
soll_messages.append(
{"level": "yellow", "text": f"Lindengut-Soll: fehlende erwartete Anzeigenamen-Stichprobe: {', '.join(missed)}",}
)
else:
soll_messages.append({"level": "green", "text": "Lindengut-Soll: Kernbenutzer-Stichprobe weitgehend ok."})
ob = SOLL_PROFIL["obergasse"]
row_ob = _practice_row_by_id(ob["practice_id"])
lic_ob = lic_customer_by_pid.get(ob["practice_id"], "").strip().lower()
lexp = next(iter(ob["license_customer_email_expect"])) if ob["license_customer_email_expect"] else ""
if not row_ob:
soll_messages.append({"level": "red", "text": f"Obergasse-Soll: ID {ob['practice_id']} fehlt"})
else:
nm = row_ob.get("practice_name", "")
if ob["name_must_contain"].lower() not in nm.lower():
soll_messages.append({"level": "yellow", "text": f"Obergasse-Soll: Name erwartet Obergasse-Kontext — ist {nm}"})
else:
soll_messages.append({"level": "green", "text": "Obergasse-Soll: Name plausibel."})
if lexp:
if not lic_ob:
soll_messages.append(
{"level": "yellow", "text": "Obergasse-Soll: keine Lizenz-/Kunden-E-Mail aus Stripe-Tabelle ermittelbar",}
)
elif lexp != lic_ob:
soll_messages.append(
{
"level": "yellow",
"text": f"Obergasse-Soll: Lizenz-Mail ist {lic_ob}, erwartet dermapraxis.meier@hin.ch",
},
)
else:
soll_messages.append({"level": "green", "text": "Obergasse-Soll: Stripe-Kundenmail wie erwartet."})
members = {_norm_name_key(a.get("display_name") or ""): a for a in by_practice.get(ob["practice_id"], [])}
ok_b_admin = False
for dk, ac in members.items():
if "birgit" not in dk:
continue
if str(ac.get("role") or "").lower() == "admin":
ok_b_admin = True
if not ok_b_admin:
soll_messages.append({"level": "red", "text": "Obergasse-Soll: keine Birgit-Admin gefunden"})
else:
soll_messages.append({"level": "green", "text": "Obergasse-Soll: Birgit als Admin gefunden"})
suc = False
for dk, ac in members.items():
if "susanne" in dk and str(ac.get("role") or "").lower() == "empfang":
suc = True
soll_messages.append(
{"level": "green" if suc else "yellow", "text": "Obergasse-Soll: Susanne empfang" if suc else "Obergasse-Soll: Susanne nicht als empfang",}
)
andre_problem = []
if ob.get("andre_must_not_admin"):
for dk, ac in members.items():
if not any(h in dk for h in ob.get("andre_display_hints", ())):
continue
if str(ac.get("role") or "").lower() == "admin":
andre_problem.append(str(ac.get("display_name")))
if andre_problem:
soll_messages.append(
{"level": "yellow", "text": "Obergasse-Soll: André als admin gelistet: " + ", ".join(andre_problem),}
)
else:
soll_messages.append({"level": "green", "text": "Obergasse-Soll: kein André-Admin in dieser Praxis."})
ip_summary_rows = []
for ipa in sorted(ip_to_practices.keys()):
ip_summary_rows.append(
{
"ip": ipa,
"ort_nach_ip": "nicht aufgelöst",
"practices": "; ".join(sorted(ip_to_practices[ipa])),
"sessions_count_approx": str(ip_to_sess.get(ipa, 0)),
}
)
stats_block = {
"notes_summary": _count_json_notes(paths["notes"]),
"tasks_summary": _count_generic_json(paths["tasks"]),
"stripe_events": _stripe_jsonl_summary(paths["stripe_log"]),
"stripe_sqlite": _sqlite_summary(paths["stripe_sql"]) if paths["stripe_sql"].exists() else "(nicht kopiert oder fehlt)",
}
result = {
"practice_rows": practice_rows,
"users_rows": users_rows,
"session_rows": session_rows,
"device_rows": device_rows,
"link_rows": link_rows,
"license_rows": license_rows_ui,
"ip_summary_rows": ip_summary_rows,
"warnings": sorted(set(warnings)),
"soll_messages": soll_messages,
"stats_block": stats_block,
"_meta": {"raw_copy_dir": str(raw_copy_dir.resolve())},
}
return result, sorted(set(warnings))
def _bundle_for_inventory_json(bundle: Mapping[str, Any]) -> Dict[str, Any]:
out = {}
skip_root = {"_meta", "_ip_geo_cache_live"}
for k, v in bundle.items():
if k in skip_root:
continue
if k == "license_rows":
slim = []
for r in v or []:
if isinstance(r, dict):
rr = dict(r)
kk = rr.get("license_key_plain")
rr.pop("license_key_plain", None)
rr["license_key_masked"] = _mask_license(str(kk or ""))
slim.append(rr)
out[k] = slim
continue
if k == "users_rows":
slim_u = []
for r in v or []:
if isinstance(r, dict):
rr = dict(r)
lk = rr.get("license_key_plain")
rr.pop("license_key_plain", None)
if lk:
rr["license_key_masked"] = _mask_license(str(lk))
slim_u.append(rr)
out[k] = slim_u
continue
out[k] = v
return out
def _warnings_for_txt(bundle: Mapping[str, Any]) -> List[str]:
lines = []
for w in bundle.get("warnings", []) or []:
lines.append(_mask_az_keys_in_line(str(w)))
for sm in bundle.get("soll_messages", []) or []:
lines.append(_mask_az_keys_in_line(str(sm.get("text", ""))))
return lines
def export_snapshot_folder(snap_dir: Path, bundle: Mapping[str, Any], server_spec: str, remote_path: str) -> Tuple[Path, Path, Path]:
snap_dir.mkdir(parents=True, exist_ok=True)
rd = snap_dir / "raw_data_copy"
if not rd.is_dir():
raise FileNotFoundError(f"Kein Ordner raw_data_copy unter {snap_dir}")
readme_lines = [
"AZA Kontroll-Hülle Snapshot",
f"Zeit UTC: {datetime.now(timezone.utc).isoformat(timespec='seconds')}Z",
f"Konfig read-only Kopie/Hinweis: server_label={server_spec} remote_dir={remote_path}",
"",
"Hinweis: Keine Patienten-/Chat-/Notiz-/Aufgabeninhalte. Lizenzschlüssel in README/warnings/HTML nur maskiert.",
"",
"SHA256 kopierter Rohdateien (raw_data_copy):",
"",
]
for p in sorted(rd.iterdir()):
if p.is_file():
hs = _sha256_file(p)
if hs:
readme_lines.append(f" {p.name} SHA256:{hs}")
inv_path = snap_dir / "practice_inventory.json"
csv_pr = snap_dir / "practice_inventory.csv"
usr_csv = snap_dir / "users.csv"
adm_csv = snap_dir / "admins.csv"
lic_csv = snap_dir / "licenses.csv"
sess_csv = snap_dir / "sessions.csv"
dev_csv = snap_dir / "devices.csv"
ip_csv = snap_dir / "ip_summary.csv"
warn_path = snap_dir / "warnings.txt"
html_path = snap_dir / "report.html"
inv_path.write_text(json.dumps(_bundle_for_inventory_json(bundle), indent=2, ensure_ascii=False), encoding="utf-8")
pcols = [
"practice_name",
"practice_id",
"invite_code",
"user_count",
"admin_count",
"admin_names",
"admin_account_emails",
"practice_meta_email",
"license_customer_email",
"hints",
]
def _csv_write(path_: Path, header: Sequence[str], rows_: Sequence[Mapping[str, Any]]) -> None:
with path_.open("w", newline="", encoding="utf-8-sig") as fh:
w = csv.writer(fh, delimiter=";")
w.writerow(list(header))
for rr in rows_:
w.writerow([(str(rr.get(c, "") if isinstance(rr, dict) else "") or "").replace("\r\n", " ")[:8000] for c in header])
_csv_write(csv_pr, pcols, bundle.get("practice_rows", []) or [])
ucols = [
"practice_name",
"practice_id",
"user_id_short",
"user_id_full",
"display_name",
"login_name",
"email",
"practice_license_customer_email",
"role",
"admin_source",
"status",
"updated",
"license_key_masked",
]
adm_rows_export: List[Dict[str, str]] = []
with usr_csv.open("w", newline="", encoding="utf-8-sig") as fh_u:
wu = csv.writer(fh_u, delimiter=";")
wu.writerow(ucols)
for rr in bundle.get("users_rows", []) or []:
if not isinstance(rr, dict):
continue
rw = dict(rr)
lk = rw.get("license_key_plain")
rw.pop("license_key_plain", None)
rw["license_key_masked"] = _mask_license(str(lk or "").strip())
if str(rw.get("role") or "").lower() == "admin":
adm_rows_export.append(rw)
wu.writerow([str(rw.get(c, "") or "") for c in ucols])
acols = [
"practice_name",
"practice_id",
"user_id_short",
"display_name",
"login_name",
"email",
"practice_license_customer_email",
"role",
"status",
"license_key_masked",
]
with adm_csv.open("w", newline="", encoding="utf-8-sig") as fh_a:
wa = csv.writer(fh_a, delimiter=";")
wa.writerow(acols)
for rw in adm_rows_export:
wa.writerow([str(rw.get(c, "") or "") for c in acols])
lcols = [
"license_key_masked",
"license_suffix",
"license_sha256",
"customer_email_license",
"practice_name",
"practice_id",
"subscription_id",
"stripe_customer_id",
"woo_order_id",
"lookup_key",
"status",
"current_period_end",
"stripe_letzte_db_aenderung_utc",
"erstes_passendes_stripe_log",
"allowed_users",
"devices_per_user",
"billing_or_customer_snippet",
"sources",
"client_reference_id",
]
with lic_csv.open("w", newline="", encoding="utf-8-sig") as fh_l:
wl = csv.writer(fh_l, delimiter=";")
wl.writerow(lcols)
for rw in bundle.get("license_rows", []) or []:
if not isinstance(rw, dict):
continue
lk = rw.get("license_key_plain") or ""
rr = dict(rw)
rr.pop("license_key_plain", None)
rr["license_key_masked"] = _mask_license(str(lk).strip())
wl.writerow([str(rr.get(c, "") or "") for c in lcols])
sess_cols_export = [
"idx",
"practice_name",
"practice_id",
"user_id_short",
"display_name",
"session_role",
"account_role",
"ip_address",
"ort_nach_ip",
"first_seen",
"last_seen",
"device_id_short",
"browser_os",
"mismatch",
"user_agent",
"source",
]
with sess_csv.open("w", newline="", encoding="utf-8-sig") as fh_s:
ws = csv.writer(fh_s, delimiter=";")
ws.writerow(sess_cols_export)
for r in bundle.get("session_rows", []) or []:
if not isinstance(r, dict):
continue
rr = {k: v for k, v in r.items() if not str(k).startswith("_")}
rr.setdefault("source", "empfang_sessions.json")
ws.writerow([str(rr.get(c, "") or "") for c in sess_cols_export])
dev_cols_export = [
"practice_name",
"practice_id",
"display_name",
"user_id_short",
"device_name_browser_os",
"device_id_short",
"ip_address",
"ort_nach_ip",
"first_seen",
"last_seen",
"trust",
"user_agent",
"source",
]
with dev_csv.open("w", newline="", encoding="utf-8-sig") as fh_d:
wd = csv.writer(fh_d, delimiter=";")
wd.writerow(dev_cols_export)
for r in bundle.get("device_rows", []) or []:
if not isinstance(r, dict):
continue
rr = {k: v for k, v in r.items() if not str(k).startswith("_")}
rr.setdefault("source", "empfang_devices.json")
wd.writerow([str(rr.get(c, "") or "") for c in dev_cols_export])
ipc = ["ip", "ort_nach_ip", "practices", "sessions_count_approx"]
with ip_csv.open("w", newline="", encoding="utf-8-sig") as fh_i:
wi = csv.writer(fh_i, delimiter=";")
wi.writerow(ipc)
for r in bundle.get("ip_summary_rows", []) or []:
if isinstance(r, dict):
wi.writerow([str(r.get(c, "") or "") for c in ipc])
warn_path.write_text("\n".join(_warnings_for_txt(bundle)).strip() + "\n", encoding="utf-8")
def esc_rows(rows: Iterable[Dict[str, Any]], keys: Sequence[str]) -> str:
out_ln: List[str] = []
out_ln.append("<tr>" + "".join(f"<th>{html.escape(str(k))}</th>" for k in keys) + "</tr>")
for rw in rows:
out_ln.append(
"<tr>"
+ "".join(
f"<td>{html.escape(str(((rw.get(k) if isinstance(rw, dict) else '') or '')))}</td>"
for k in keys
)
+ "</tr>"
)
return "\n".join(out_ln)
htm: List[str] = []
htm.append("<!DOCTYPE html><html lang='de'><head><meta charset='utf-8'><title>AZA Kontroll-Report</title>")
htm.append(
"<style>body{font-family:Segoe UI,sans-serif;background:#eaf1f8;color:#1a2a3a;padding:14px;}"
"table{border-collapse:collapse;background:#fff;margin:10px 0;} th,td{border:1px solid #cddbeb;"
"padding:6px 8px;text-align:left;font-size:.85rem;} th{background:#5B8DB3;color:#fff;} "
"h2{font-size:1.05rem;color:#356488;} .yellow{background:#fff8e6;padding:10px;} "
".green{background:#e8f5e9;} .red{background:#fdecea;} .muted{color:#55708a;font-size:.85rem;} </style></head><body>"
)
htm.append("<h1>AZA Kontroll-Hülle — Report</h1>")
htm.append(
"<p>Hinweise: keine exakte Adresse aus einer öffentlichen IP ableitbar. "
"Optional aufgelöster Ort entspricht meist ISP/Regionalnetz.</p>"
)
htm.append(f"<p class='muted'>SSH-Label: <code>{html.escape(server_spec)}</code> · Remote: <code>{html.escape(remote_path)}</code></p>")
htm.append("<h2>Soll-Prüfung</h2>")
for sm in bundle.get("soll_messages", []) or []:
lvl = html.escape(str(sm.get("level", "")))
htm.append(f"<div class='{lvl}'><strong>[{lvl}]</strong> {html.escape(str(sm.get('text','')))}</div>")
htm.append("<h2>Lizenzen (Schlüssel maskiert)</h2><table>")
lk_html = [
"license_key_masked",
"license_suffix",
"license_sha256",
"customer_email_license",
"practice_name",
"practice_id",
"subscription_id",
"woo_order_id",
"lookup_key",
"status",
"current_period_end",
"stripe_letzte_db_aenderung_utc",
"erstes_passendes_stripe_log",
"billing_or_customer_snippet",
"sources",
]
lic_html_rows = []
for rr in bundle.get("license_rows", []) or []:
if not isinstance(rr, dict):
continue
row = dict(rr)
lk = row.get("license_key_plain") or ""
row.pop("license_key_plain", None)
row["license_key_masked"] = _mask_license(str(lk).strip())
lic_html_rows.append(row)
htm.append(esc_rows(lic_html_rows, lk_html))
htm.append("</table><h2>Praxen</h2><table>")
htm.append(esc_rows(bundle.get("practice_rows", []) or [], pcols))
htm.append("</table><h2>Benutzer (Auszug)</h2><table>")
ukeys = ["practice_name", "practice_id", "user_id_short", "display_name", "login_name", "email",
"practice_license_customer_email", "role", "status", "license_key_masked"]
htm.append(esc_rows((bundle.get("users_rows") or [])[:360], ukeys))
htm.append("</table><h2>Sessions (Auszug)</h2><table>")
sess_rows_html = []
for r in (bundle.get("session_rows") or [])[:340]:
if not isinstance(r, dict):
continue
rr = {k: v for k, v in r.items() if not str(k).startswith("_")}
rr.setdefault("source", "empfang_sessions.json")
sess_rows_html.append(rr)
htm.append(esc_rows(sess_rows_html, sess_cols_export))
htm.append("</table><h2>Geräte (Auszug)</h2><table>")
dev_rows_html = []
for r in (bundle.get("device_rows") or [])[:340]:
if not isinstance(r, dict):
continue
rr = {k: v for k, v in r.items() if not str(k).startswith("_")}
rr.setdefault("source", "empfang_devices.json")
dev_rows_html.append(rr)
htm.append(esc_rows(dev_rows_html, dev_cols_export))
htm.append("</table><h2>IP-Zusammenfassung</h2><table>")
htm.append(esc_rows(bundle.get("ip_summary_rows") or [], ipc))
htm.append("</table><h2>Warnungen (maskierte Schlüsselformate)</h2>")
htm.append(
"<pre style='white-space:pre-wrap;background:#fff;padding:12px;border:1px solid #cddbeb;'>"
+ html.escape("\n".join(_warnings_for_txt(bundle)))
+ "</pre>"
)
sb = bundle.get("stats_block") or {}
htm.append("<h2>Zähl-Statistik (keine PHI)</h2>")
htm.append(f"<p><code>{html.escape(str(sb.get('notes_summary')))}</code></p>")
htm.append(f"<p><code>{html.escape(str(sb.get('tasks_summary')))}</code></p>")
se = sb.get("stripe_events") or {}
htm.append(f"<p>stripe JSONL-Stichprobe: Zeilen=<code>{se.get('lines_seen')}</code> · stripe-artig="
f"<code>{se.get('stripe_like_lines')}</code></p>")
htm.append(f"<p>stripe sqlite: <code>{html.escape(str(sb.get('stripe_sqlite')))}</code></p>")
htm.append("</body></html>")
html_path.write_text("\n".join(htm), encoding="utf-8")
readme_path_disk = snap_dir / "README.txt"
readme_path_disk.write_text("\n".join(readme_lines).strip() + "\n", encoding="utf-8")
return inv_path, html_path, warn_path
COPY_BLOCK = REMOTE_CORE_REQUIRED_POST_SCP + REMOTE_COPY_OPTIONAL_ORDERED
class AdminControlShell(tk.Tk):
"""Hauptfenster Tkinter — read-only Kontrolle eines lokal kopierten Snapshots."""
def __init__(self) -> None:
super().__init__()
self.title("AZA Kontroll-Hülle")
self.configure(bg="#eaf1f8")
self.minsize(1080, 700)
self._queue: queue.Queue[tuple[str, Any]] = queue.Queue()
self._server_spec = DEFAULT_SSH_SPEC
self._remote_data = DEFAULT_REMOTE_DATA
self._snap_root = _SNAP_ROOT_WIN
self._current_snap: Optional[Path] = None
self._last_bundle: Optional[Dict[str, Any]] = None
self._ssh_ok = tk.StringVar(value="unbekannt")
self._ro_mode = tk.StringVar(value="READ-ONLY")
self._last_snap_var = tk.StringVar(value="keiner")
self._soft_notes_var = tk.StringVar(value="")
self._ip_geo_cache: Dict[str, str] = {}
self._ip_disclaimed = False
self._var_show_full_license = tk.BooleanVar(value=True)
self._tree_sort_state: Dict[int, Dict[str, Any]] = {}
self._tree_clipboard_bound = False
self._build_ui()
self.after(160, self._poll_queue)
# --- UI-Aufbau ---
def _build_ui(self) -> None:
hdr = tk.Frame(self, bg="#eaf1f8")
hdr.pack(fill="x", padx=12, pady=(10, 6))
tk.Label(hdr, text="AZA Kontroll-Hülle", font=("Segoe UI", 16, "bold"), fg="#356488", bg="#eaf1f8").pack(side="left")
bf = tk.Frame(hdr, bg="#eaf1f8")
bf.pack(side="right")
tk.Button(bf, text="Hetzner-Daten aktualisieren", bg="#5B8DB3", fg="white",
relief="flat", padx=12, pady=6, command=self._on_refresh_remote, cursor="hand2").pack(side="left", padx=4)
tk.Button(bf, text="Snapshot öffnen", relief="flat", padx=10, pady=6, bg="#dceaf4",
command=self._on_open_manual, cursor="hand2").pack(side="left", padx=4)
tk.Button(bf, text="Report exportieren", relief="flat", padx=10, pady=6, bg="#dceaf4",
command=self._on_export_reports, cursor="hand2").pack(side="left", padx=4)
tk.Checkbutton(bf, text="Lizenzen komplett anzeigen", variable=self._var_show_full_license,
bg="#eaf1f8", command=self._on_toggle_license).pack(side="left", padx=10)
stf = tk.Frame(self, bg="#dceaf4", highlightthickness=1, highlightbackground="#c8dae8")
stf.pack(fill="x", padx=12, pady=(0, 6))
tk.Label(stf, textvariable=self._last_snap_var, fg="#334e66", bg="#dceaf4",
font=("Segoe UI", 9)).pack(side="left", padx=8, pady=4)
tk.Label(stf, text=" | SSH/OpenSSH: ", fg="#334e66", bg="#dceaf4").pack(side="left")
tk.Label(stf, textvariable=self._ssh_ok, fg="#334e66", bg="#dceaf4",
font=("Segoe UI", 9, "bold")).pack(side="left")
tk.Label(stf, text=" | SCP-Hinweise: ", fg="#334e66", bg="#dceaf4").pack(side="left")
tk.Label(stf, textvariable=self._soft_notes_var, fg="#6a5720", bg="#dceaf4",
wraplength=480, justify="left", font=("Segoe UI", 8)).pack(side="left", padx=4)
tk.Label(stf, text=" | ", fg="#334e66", bg="#dceaf4").pack(side="left")
tk.Label(stf, textvariable=self._ro_mode, fg="#2E7D32", bg="#dceaf4",
font=("Segoe UI", 9, "bold")).pack(side="left", padx=4)
self._nb = ttk.Notebook(self)
self._nb.pack(fill="both", expand=True, padx=10, pady=(0, 12))
self._tree_practices = self._mk_plain_tree(self._nb, "Praxen")
self._tree_users = self._mk_plain_tree(self._nb, "Benutzer")
self._text_admins = self._mk_text(self._nb, "Admin-Kontrolle")
lf = tk.Frame(self._nb, bg="#eaf1f8")
self._nb.add(lf, text="Lizenzen")
lf_top = tk.Frame(lf, bg="#eaf1f8")
lf_top.pack(fill="x", padx=4, pady=2)
tk.Label(lf_top, bg="#eaf1f8", fg="#54708f", font=("Segoe UI", 8),
wraplength=900, justify="left",
text="Quellen kombiniert: stripe_webhook.sqlite, stripe_events.log.jsonl, empfang_accounts.json. "
"„Rechnungs-/Kundenstichwort“ kann von Stripe-Adresse kommen ≠ Registrierungsort.").pack(anchor="w")
lf_body = tk.Frame(lf, bg="#eaf1f8")
lf_body.pack(fill="both", expand=True)
self._tv_license = ttk.Treeview(lf_body, columns=(), show="headings")
yl = ttk.Scrollbar(lf_body, orient="vertical", command=self._tv_license.yview)
self._tv_license.configure(yscrollcommand=yl.set)
self._tv_license.grid(row=0, column=0, sticky="nsew")
yl.grid(row=0, column=1, sticky="ns")
lf_body.grid_rowconfigure(0, weight=1)
lf_body.grid_columnconfigure(0, weight=1)
self._tree_links = self._mk_plain_tree(self._nb, "Codes / Verbindungen")
sf = tk.Frame(self._nb, bg="#eaf1f8")
self._nb.add(sf, text="Sessions")
sbt = tk.Frame(sf, bg="#eaf1f8")
sbt.pack(fill="x", padx=4, pady=2)
tk.Button(sbt, text="IP-Orte auflösen (optional, nachfragen)", bg="#dceaf4", command=self._on_resolve_ips).pack(side="left")
tk.Label(sbt, bg="#eaf1f8", fg="#667e93", font=("Segoe UI", 8),
text=" Kein Auto beim Start · nur näherungsweise Stadt/ISP · keine exakte Hausadresse.").pack(side="left", padx=6)
sfb = tk.Frame(sf, bg="#eaf1f8")
sfb.pack(fill="both", expand=True)
self._tv_sess = ttk.Treeview(sfb, columns=(), show="headings")
ys = ttk.Scrollbar(sfb, orient="vertical", command=self._tv_sess.yview)
self._tv_sess.configure(yscrollcommand=ys.set)
self._tv_sess.grid(row=0, column=0, sticky="nsew")
ys.grid(row=0, column=1, sticky="ns")
sfb.grid_rowconfigure(0, weight=1)
sfb.grid_columnconfigure(0, weight=1)
df = tk.Frame(self._nb, bg="#eaf1f8")
self._nb.add(df, text="Geräte")
dt = tk.Frame(df, bg="#eaf1f8")
dt.pack(fill="x", padx=4)
tk.Label(dt, bg="#eaf1f8", fg="#667e93", font=("Segoe UI", 8),
text="IP-Ort-Spalten nutzen denselben Button wie beim SessionsTab.").pack(anchor="w")
df_b = tk.Frame(df, bg="#eaf1f8")
df_b.pack(fill="both", expand=True)
self._tv_dev = ttk.Treeview(df_b, columns=(), show="headings")
yd = ttk.Scrollbar(df_b, orient="vertical", command=self._tv_dev.yview)
self._tv_dev.configure(yscrollcommand=yd.set)
self._tv_dev.grid(row=0, column=0, sticky="nsew")
yd.grid(row=0, column=1, sticky="ns")
df_b.grid_rowconfigure(0, weight=1)
df_b.grid_columnconfigure(0, weight=1)
kib = tk.Frame(self._nb, bg="#eaf1f8")
self._nb.add(kib, text="KI-Budget (API)")
kt = tk.Frame(kib, bg="#eaf1f8")
kt.pack(fill="x", padx=6, pady=4)
tk.Label(kt, text="API-Basis", bg="#eaf1f8", font=("Segoe UI", 9)).pack(side="left")
self._var_ai_api_base = tk.StringVar(
value=os.environ.get("AZA_ADMIN_API_BASE", "https://api.aza-medwork.ch").strip()
)
tk.Entry(kt, textvariable=self._var_ai_api_base, width=48).pack(side="left", padx=6)
tk.Label(kt, text="X-Admin-Token", bg="#eaf1f8", font=("Segoe UI", 9)).pack(side="left", padx=(10, 0))
self._var_ai_admin_tok = tk.StringVar(value=os.environ.get("AZA_ADMIN_TOKEN", "").strip())
tk.Entry(kt, textvariable=self._var_ai_admin_tok, width=32, show="*").pack(side="left", padx=6)
tk.Button(
kt, text="Übersicht laden", command=self._on_load_ai_budget,
bg="#5B8DB3", fg="white", relief="flat", padx=10, pady=4,
).pack(side="left", padx=6)
tk.Button(
kt, text="CSV speichern…", command=self._on_export_ai_budget_csv,
bg="#dceaf4", relief="flat", padx=10, pady=4,
).pack(side="left", padx=4)
khelp = tk.Frame(kib, bg="#eaf1f8")
khelp.pack(fill="x", padx=8)
tk.Label(
khelp,
bg="#eaf1f8",
fg="#667e93",
font=("Segoe UI", 8),
justify="left",
text="GET /admin/ai_budget_overview · CSV /admin/ai_budget_export.csv · nur lesend, Admin-Token wie andere /admin/*-Routen.",
wraplength=980,
).pack(anchor="w")
kb = tk.Frame(kib, bg="#eaf1f8")
kb.pack(fill="both", expand=True, padx=4, pady=(0, 6))
self._txt_ai_budget = scrolledtext.ScrolledText(
kb, wrap="word", font=("Consolas", 9), bg="#fdfefe", fg="#1a3550"
)
self._txt_ai_budget.pack(fill="both", expand=True)
self._diag = self._mk_text(self._nb, "Diagnose")
self._footer()
self._check_ssh_clients()
self._wire_tree_clipboard_sort()
def _footer(self) -> None:
tk.Label(
self,
fg="#667e93",
bg="#eaf1f8",
font=("Segoe UI", 8),
justify="left",
wraplength=1040,
text="Snapshots: Documents\\AzA Drive\\AZA_CONTROL_SNAPSHOTS · OpenSSH ssh/scp (Schlüssel/Agent, kein PW-Speicher). "
"EXE nutzt ebenfalls externes Windows-OpenSSH.",
).pack(fill="x", padx=14, pady=(0, 8))
def _mk_plain_tree(self, nb: ttk.Notebook, title: str) -> ttk.Treeview:
fr = tk.Frame(nb, bg="#eaf1f8")
nb.add(fr, text=title)
wrap = tk.Frame(fr, bg="#eaf1f8")
wrap.pack(fill="both", expand=True)
tv = ttk.Treeview(wrap, columns=(), show="headings")
ys = ttk.Scrollbar(wrap, orient="vertical", command=tv.yview)
tv.configure(yscrollcommand=ys.set)
tv.grid(row=0, column=0, sticky="nsew")
ys.grid(row=0, column=1, sticky="ns")
wrap.grid_rowconfigure(0, weight=1)
wrap.grid_columnconfigure(0, weight=1)
return tv
def _mk_text(self, nb: ttk.Notebook, title: str) -> scrolledtext.ScrolledText:
fr = tk.Frame(nb, bg="#eaf1f8")
nb.add(fr, text=title)
txt = scrolledtext.ScrolledText(fr, wrap="word", font=("Consolas", 9), bg="#fdfefe", fg="#1a3550")
txt.pack(fill="both", expand=True, padx=4, pady=4)
txt.configure(state="disabled")
return txt
def _check_ssh_clients(self) -> None:
ok_scp = shutil_which("scp") is not None
ok_ssh = shutil_which("ssh") is not None
if ok_ssh and ok_scp:
self._ssh_ok.set("scp/ssh vorhanden")
else:
miss = []
if not ok_ssh:
miss.append("ssh")
if not ok_scp:
miss.append("scp")
self._ssh_ok.set("fehlt: " + ",".join(miss))
def _poll_queue(self) -> None:
try:
while True:
kind, payload = self._queue.get_nowait()
self._dispatch_queue(kind, payload)
except queue.Empty:
pass
self.after(260, self._poll_queue)
def _dispatch_queue(self, kind: str, payload: Any) -> None:
if kind == "toast":
self._set_status(str(payload))
elif kind == "ai_budget_text":
try:
self._txt_ai_budget.configure(state="normal")
self._txt_ai_budget.delete("1.0", "end")
self._txt_ai_budget.insert("1.0", str(payload))
self._txt_ai_budget.configure(state="disabled")
except Exception:
pass
elif kind == "snap_done":
path, bundle, opt_notes = payload
self._current_snap = Path(path)
self._last_bundle = bundle
self._ip_geo_cache.clear()
self._inject_geo(bundle)
self._populate_all(bundle)
self._last_snap_var.set(str(self._current_snap.resolve()))
txt = "; ".join(opt_notes[:6])[:520] + ("" if len(opt_notes) > 6 else "")
self._soft_notes_var.set(txt or "(optional vollständig)")
self._ssh_ok.set("scp/ssh Snapshot gelesen")
elif kind == "snap_fail":
self._ssh_ok.set("Fehler / nicht erreichbar")
messagebox.showerror("Snapshot fehlgeschlagen", str(payload))
elif kind == "ip_done":
self._inject_geo(self._last_bundle or {})
if self._last_bundle:
self._populate_sessions_devices(self._last_bundle)
self._set_status("IP-Näherungsdaten angefordert")
def _on_load_ai_budget(self) -> None:
base = (self._var_ai_api_base.get() or "").strip().rstrip("/")
tok = (self._var_ai_admin_tok.get() or "").strip()
if not base or not tok:
messagebox.showwarning("KI-Budget", "API-Basis und Admin-Token angeben.", parent=self)
return
def worker():
try:
url = base + "/admin/ai_budget_overview?status=active"
req = urllib.request.Request(url, headers={"X-Admin-Token": tok})
with urllib.request.urlopen(req, timeout=90) as resp:
body = resp.read().decode("utf-8", errors="replace")
data = json.loads(body)
pretty = json.dumps(data, ensure_ascii=False, indent=2)
self._queue.put(("ai_budget_text", pretty))
except Exception as e:
self._queue.put(("ai_budget_text", f"FEHLER: {e}"))
threading.Thread(target=worker, daemon=True).start()
def _on_export_ai_budget_csv(self) -> None:
base = (self._var_ai_api_base.get() or "").strip().rstrip("/")
tok = (self._var_ai_admin_tok.get() or "").strip()
if not base or not tok:
messagebox.showwarning("KI-Budget", "API-Basis und Admin-Token angeben.", parent=self)
return
path = filedialog.asksaveasfilename(
parent=self,
defaultextension=".csv",
filetypes=[("CSV", "*.csv"), ("Alle", "*.*")],
title="KI-Budget CSV speichern",
)
if not path:
return
def worker():
try:
url = base + "/admin/ai_budget_export.csv?status=active"
req = urllib.request.Request(url, headers={"X-Admin-Token": tok})
with urllib.request.urlopen(req, timeout=120) as resp:
raw = resp.read()
Path(path).write_bytes(raw)
self._queue.put(("toast", f"CSV geschrieben: {path}"))
except Exception as e:
self._queue.put(("toast", f"CSV-Fehler: {e}"))
threading.Thread(target=worker, daemon=True).start()
def _set_status(self, msg: str) -> None:
self.title("AZA Kontroll-Hülle — " + msg[:78])
def _inject_geo(self, bundle: Mapping[str, Any]) -> None:
cmap = self._ip_geo_cache
if not cmap:
return
br = bundle # mutate ok
for r in list(br.get("session_rows") or []):
if isinstance(r, dict):
ipa = str(r.get("ip_address") or "").strip()
if ipa in cmap:
r["ort_nach_ip"] = cmap[ipa]
for r in list(br.get("device_rows") or []):
if isinstance(r, dict):
ipa = str(r.get("ip_address") or "").strip()
if ipa in cmap:
r["ort_nach_ip"] = cmap[ipa]
for r in list(br.get("ip_summary_rows") or []):
if isinstance(r, dict):
ipa = str(r.get("ip") or "").strip()
if ipa in cmap:
r["ort_nach_ip"] = cmap[ipa]
# --- Trees ---
def _wire_tree_clipboard_sort(self) -> None:
if self._tree_clipboard_bound:
return
self._tree_clipboard_bound = True
self.bind_all("<Control-c>", self._on_tree_ctrl_c, add="+")
self.bind_class("Treeview", "<Double-1>", self._on_tree_double_cell, add="+")
def _on_tree_ctrl_c(self, event: tk.Event) -> Optional[str]:
w = self.focus_get()
if not isinstance(w, ttk.Treeview):
return None
cols = getattr(w, "_cols_order", None)
if not cols:
return None
sel = w.selection()
if not sel:
return None
lines: List[str] = []
for iid in sel:
vals = w.item(iid, "values")
lines.append("\t".join(str(x) for x in vals))
try:
self.clipboard_clear()
self.clipboard_append("\n".join(lines))
except tk.TclError:
pass
return "break"
def _on_tree_double_cell(self, event: tk.Event) -> None:
w = event.widget
if not isinstance(w, ttk.Treeview):
return
cols = getattr(w, "_cols_order", None)
if not cols:
return
row_id = w.identify_row(event.y)
col_id = w.identify_column(event.x)
if not row_id or not col_id or col_id == "#0":
return
try:
idx = int(col_id.replace("#", "")) - 1
except ValueError:
return
if idx < 0 or idx >= len(cols):
return
vals = w.item(row_id, "values")
if idx >= len(vals):
return
try:
self.clipboard_clear()
self.clipboard_append(str(vals[idx]))
except tk.TclError:
pass
def _clear_fill(self, tv: ttk.Treeview, cols: Sequence[str], rows: Sequence[Any]) -> None:
col_list = list(cols)
tv.delete(*tv.get_children())
tv.configure(columns=col_list)
norm: List[Dict[str, Any]] = []
for rw in rows:
if isinstance(rw, dict):
norm.append(dict(rw))
else:
norm.append({c: str(rw) for c in col_list})
tv._cols_order = col_list # type: ignore[attr-defined]
tv._rows_raw = norm # type: ignore[attr-defined]
sid = id(tv)
self._tree_sort_state[sid] = {"col": None, "reverse": False}
st = self._tree_sort_state[sid]
def refill_from_sorted(sorted_rows: List[Dict[str, Any]]) -> None:
tv.delete(*tv.get_children())
tv._rows_raw = sorted_rows # type: ignore[attr-defined]
for rw in sorted_rows:
tv.insert("", "end", values=tuple(str(rw.get(c, "") or "") for c in col_list))
def on_head(col: str) -> None:
if st.get("col") == col:
st["reverse"] = not bool(st.get("reverse"))
else:
st["col"] = col
st["reverse"] = False
rev = bool(st.get("reverse"))
cur = list(getattr(tv, "_rows_raw", []) or [])
def sk(r: Dict[str, Any]) -> Tuple[Any, ...]:
return _tree_sort_key_tuple(col, str(r.get(col, "") or ""))
cur.sort(key=sk, reverse=rev)
refill_from_sorted(cur)
for c in col_list:
tv.heading(c, text=_tree_column_heading(c), command=lambda cc=c: on_head(cc))
tv.column(c, width=126, anchor="w")
refill_from_sorted(norm)
def _on_toggle_license(self) -> None:
if self._last_bundle:
self._populate_licenses(dict(self._last_bundle))
def _license_cell(self, r: Mapping[str, Any]) -> str:
full = bool(self._var_show_full_license.get())
lk = str((r.get("license_key_plain") if isinstance(r, dict) else "") or "").strip()
mk = str((r.get("license_key_masked") if isinstance(r, dict) else "") or "").strip()
if full and lk:
return lk
return mk or _mask_license(lk)
def _populate_licenses(self, bundle: Mapping[str, Any]) -> None:
cols = [
"license_key_display",
"license_suffix",
"license_sha256",
"customer_email_license",
"practice_name",
"practice_id",
"subscription_id",
"woo_order_id",
"stripe_customer_id",
"lookup_key",
"status",
"current_period_end",
"stripe_letzte_db_aenderung_utc",
"erstes_passendes_stripe_log",
"billing_or_customer_snippet",
"sources",
]
rows: List[Dict[str, Any]] = []
for rr in bundle.get("license_rows", []) or []:
if not isinstance(rr, dict):
continue
row = dict(rr)
row["license_key_display"] = self._license_cell(row)
rows.append(row)
self._clear_fill(self._tv_license, cols, rows)
def _populate_sessions_devices(self, bundle: Mapping[str, Any]) -> None:
scols = [
"idx",
"practice_name",
"practice_id",
"display_name",
"user_id_short",
"session_role",
"account_role",
"ip_address",
"ort_nach_ip",
"first_seen",
"last_seen",
"device_id_short",
"browser_os",
"mismatch",
"user_agent",
"source",
]
s_rows: List[Dict[str, Any]] = []
for r in bundle.get("session_rows", []) or []:
if not isinstance(r, dict):
continue
rr = dict(r)
rr["source"] = rr.get("_source_row", "empfang_sessions.json")
if not rr.get("ort_nach_ip"):
rr["ort_nach_ip"] = "nicht aufgelöst"
s_rows.append(rr)
self._clear_fill(self._tv_sess, scols, s_rows)
dcols = [
"practice_name",
"practice_id",
"display_name",
"user_id_short",
"device_name_browser_os",
"device_id_short",
"ip_address",
"ort_nach_ip",
"first_seen",
"last_seen",
"trust",
"user_agent",
"source",
]
d_rows = []
for r in bundle.get("device_rows", []) or []:
if not isinstance(r, dict):
continue
rr = dict(r)
rr["source"] = rr.get("_source_row", "empfang_devices.json")
if not rr.get("ort_nach_ip"):
rr["ort_nach_ip"] = "nicht aufgelöst"
d_rows.append(rr)
self._clear_fill(self._tv_dev, dcols, d_rows)
def _populate_all(self, bundle: Dict[str, Any]) -> None:
p_cols = [
"practice_name",
"practice_id",
"invite_code",
"user_count",
"admin_count",
"admin_names",
"admin_account_emails",
"practice_meta_email",
"license_customer_email",
"hints",
]
self._clear_fill(self._tree_practices, p_cols, bundle.get("practice_rows") or [])
u_cols = [
"practice_name",
"practice_id",
"user_id_short",
"display_name",
"login_name",
"email",
"practice_license_customer_email",
"role",
"admin_source",
"status",
"updated",
"license_key_masked",
]
u_vis = []
for rr in bundle.get("users_rows", []) or []:
rw = dict(rr)
lk_plain = rw.get("license_key_plain") or ""
rw.pop("license_key_plain", None)
mm = _mask_license(str(lk_plain).strip())
if bool(self._var_show_full_license.get()):
rw["license_key_masked"] = str(lk_plain).strip() or mm or ""
else:
rw["license_key_masked"] = mm or ""
u_vis.append(rw)
self._clear_fill(self._tree_users, u_cols, u_vis)
adm_lines = ["=== Nach Praxis gruppierte Admins (read-only Konten-JSON) ===\n"]
byp: Dict[str, List[str]] = defaultdict(list)
for rr in bundle.get("users_rows", []) or []:
if str(rr.get("role")).lower() == "admin":
byp[str(rr.get("practice_id"))].append(
f"{rr.get('display_name')} (login={rr.get('login_name')}, Konto-Mail={rr.get('email')}, "
f"Praxis-Mail-Lizenz={rr.get('practice_license_customer_email')})"
)
pname_map = {r["practice_id"]: r.get("practice_name", "") for r in bundle.get("practice_rows", []) or []}
for pid in sorted(byp.keys(), key=lambda x: (str(pname_map.get(x)), x)):
admins = byp.get(pid, [])
adm_lines.append(f"\n[{pid}] {pname_map.get(pid, '')}")
if not admins:
adm_lines.append(" (!) keine Admins")
else:
if len(admins) > 1:
adm_lines.append(f" (!) mehrere ({len(admins)}):")
for ln in admins:
adm_lines.append(str(ln))
adm_out = "\n".join(adm_lines)
if not byp:
adm_out += "\nKeine Konten sind als Rolle »admin« markiert.\n"
self._set_text_widget(self._text_admins, adm_out)
self._populate_licenses(bundle)
self._populate_sessions_devices(bundle)
link_cols = [
"status",
"contact_type",
"source_practice",
"target_practice",
"invite_used",
"created_at",
"direction",
"link_id_short",
]
self._clear_fill(self._tree_links, link_cols, bundle.get("link_rows", []) or [])
diag = ["=== Automatisch ermittelte Warnungen/Hinweise (maskierte Lizenztokens) ===\n"]
for ln in bundle.get("warnings") or []:
diag.append("- " + ln)
diag.append("\n=== Soll-Prüfung ===")
for sm in bundle.get("soll_messages") or []:
diag.append(f"[{sm.get('level','')}] {sm.get('text','')}")
diag.append("\n=== stripe / notes / tasks Zählwerte ===")
st = bundle.get("stats_block") or {}
diag.append(str(st))
diag.append("\nExakte Hausadresse ist aus öffentlicher IP typischerweise nicht ableitbar; optionaler Button nutzt Fremd-API (näherungsweise Stadt/ISP).")
self._set_text_widget(self._diag, "\n".join(diag))
def _set_text_widget(self, w: scrolledtext.ScrolledText, content: str) -> None:
w.configure(state="normal")
w.delete("1.0", tk.END)
w.insert(tk.END, content)
w.configure(state="disabled")
def _gather_ips_from_bundle(self) -> List[str]:
if not self._last_bundle:
return []
ips: set[str] = set()
for rr in list(self._last_bundle.get("session_rows") or []) + list(self._last_bundle.get("device_rows") or []):
if isinstance(rr, dict):
ipa = str(rr.get("ip_address") or "").strip()
if _IP_RE.fullmatch(ipa):
ips.add(ipa)
return sorted(ips)
def _on_resolve_ips(self) -> None:
if not self._last_bundle:
messagebox.showinfo("IP-Ortung", "Zuerst Snapshot laden oder aktualisieren.")
return
ips = self._gather_ips_from_bundle()
if not ips:
messagebox.showinfo("IP-Ortung", "Keine IPv4-Adressen in Sessions/Geräten erkannt.")
return
if not self._ip_disclaimed:
messagebox.showinfo(
"Hinweis",
"Ort über öffentliche IP ist ungenau und zeigt häufig nur Provider/Region/Stadt "
"(keine Zuverlässigkeit für konkrete Adresse). Datenabfrage nutzt externes HTTP (ip-api.com). "
"Nur wenn Sie diese Näherung bewusst wünschen.",
)
self._ip_disclaimed = True
self._queue.put(("toast", "Frage öffentliche IP-Näherungsdaten extern ab …"))
def run_geo() -> None:
cmap: Dict[str, str] = {}
for ipa in ips:
cmap[ipa] = lookup_ip_geo_optional(ipa) or "nicht aufgelöst"
self._ip_geo_cache.update(cmap)
self._queue.put(("ip_done", None))
threading.Thread(target=run_geo, daemon=True).start()
def _on_refresh_remote(self) -> None:
if not shutil_which("scp") or not shutil_which("ssh"):
messagebox.showerror("OpenSSH benötigt", "Installieren oder PATH setzen.")
return
stamp = _now_local_stamp()
snap_name = f"AZA_CONTROL_SNAPSHOT_{stamp}"
try:
self._snap_root.mkdir(parents=True, exist_ok=True)
except Exception as exc:
messagebox.showerror("Snapshot-Ordner", str(exc))
return
snap_dir = self._snap_root / snap_name
rd_path = snap_dir / "raw_data_copy"
def worker() -> None:
notes: List[str] = []
try:
self._queue.put(("toast", "Lese remote data (read-only) …"))
rd_path.mkdir(parents=True, exist_ok=True)
remote_base = self._remote_data.rstrip("/")
r_esc = shlex.quote(remote_base.replace("\\", "/"))
probe = subprocess.run(
[
"ssh",
"-o",
"BatchMode=yes",
"-o",
"ConnectTimeout=12",
"-o",
"StrictHostKeyChecking=accept-new",
self._server_spec,
"echo SSH_OK && test -d " + r_esc,
],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=22,
)
outp = (probe.stdout or "") + (probe.stderr or "")
if probe.returncode != 0:
raise RuntimeError(outp[:2400])
for fname in COPY_BLOCK:
remote_path = remote_base + "/" + fname
lp = rd_path / fname
pr = subprocess.run(
[
"scp",
"-q",
"-o",
"BatchMode=yes",
"-o",
"ConnectTimeout=42",
"-o",
"StrictHostKeyChecking=accept-new",
f"{self._server_spec}:{remote_path}",
str(lp),
],
capture_output=True,
timeout=560,
text=True,
encoding="utf-8",
errors="replace",
)
req = fname in REMOTE_CORE_REQUIRED_POST_SCP
if pr.returncode != 0:
err = str(pr.stderr or pr.stdout or "")[:400]
if req:
raise RuntimeError(f"Kern-Datei {fname} konnte nicht kopiert werden: {err}")
notes.append(fname + ": SCP optional fehlgeschlagen")
try:
if lp.exists() and lp.stat().st_size == 0:
lp.unlink(missing_ok=True)
except Exception:
pass
for req in REMOTE_CORE_REQUIRED_POST_SCP:
pth = rd_path / req
if not pth.is_file():
raise RuntimeError(f"Kern-Datei fehlt lokal nach SCP: {req}")
bundle_, _ = analyse_snapshot(rd_path, scp_optional_misses=sorted(set(notes)))
export_snapshot_folder(
snap_dir,
bundle_,
server_spec=self._server_spec,
remote_path=self._remote_data,
)
self._queue.put(("snap_done", (str(snap_dir), bundle_, sorted(set(notes)))))
except Exception as exc:
import traceback as tb_lib
self._queue.put(("snap_fail", str(exc) + "\n\n" + tb_lib.format_exc(limit=12)[-1500:]))
threading.Thread(target=worker, daemon=True).start()
def _on_open_manual(self) -> None:
sel = filedialog.askdirectory(title="raw_data_copy wählen (oder Snapshot-übergeordnet)", parent=self)
if not sel:
return
pth = Path(sel)
rd = pth if pth.name == "raw_data_copy" else (pth / "raw_data_copy")
if not rd.is_dir():
rz = filedialog.askdirectory(title='Ordner „raw_data_copy“', initialdir=str(pth), parent=self)
if not rz:
return
rd = Path(rz)
if rd.name != "raw_data_copy":
messagebox.showwarning("Pfad", "Es muss raw_data_copy heißen.")
return
try:
bundle_local, warns = analyse_snapshot(rd)
bundle_local["_meta"] = {"raw_copy_dir": str(rd.resolve())}
self._current_snap = rd.parent
self._last_bundle = bundle_local
self._inject_geo(bundle_local)
self._populate_all(bundle_local)
self._last_snap_var.set(str(self._current_snap.resolve()))
self._soft_notes_var.set("(lokal geöffnet)")
self._ssh_ok.set("offline / ohne SCP")
if warns:
messagebox.showinfo("Hinweise Einlesen", "\n".join(warns[:24]) + ("\n" if len(warns) > 24 else ""))
except Exception as exc:
messagebox.showerror("Öffnen", str(exc))
def _on_export_reports(self) -> None:
if not self._last_bundle or not self._current_snap:
messagebox.showinfo("Export", "Zuerst Snapshot laden oder aktualisieren.")
return
try:
export_snapshot_folder(Path(self._current_snap), self._last_bundle, self._server_spec, self._remote_data)
messagebox.showinfo("Export", f"Bereits gespeichert unter:\n{Path(self._current_snap).resolve()}")
except Exception as exc:
messagebox.showerror("Export-Fehler", str(exc))
if __name__ == "__main__":
AdminControlShell().mainloop()