Files
aza/AzA march 2026/aza_persistence.py

3126 lines
107 KiB
Python
Raw Normal View History

2026-03-25 22:03:39 +01:00
# -*- coding: utf-8 -*-
"""
Standalone persistence/load-save utilities extracted from basis14.py.
"""
import os
import json
import sys
import time
import re
2026-05-11 08:27:44 +02:00
import shutil
2026-03-25 22:03:39 +01:00
import html
import hashlib
import threading
from difflib import SequenceMatcher
from datetime import datetime, timedelta
2026-05-11 08:27:44 +02:00
from pathlib import Path
from typing import Optional
2026-03-25 22:03:39 +01:00
from tkinter import messagebox
from aza_config import (
CONFIG_FILENAME,
WINDOW_CONFIG_FILENAME,
SIGNATURE_CONFIG_FILENAME,
KORREKTUREN_CONFIG_FILENAME,
ABLAGE_BASE_DIR,
ABLAGE_SUBFOLDERS,
ABLAGE_LABELS,
PRUEFEN_WINDOW_CONFIG_FILENAME,
ORDNER_WINDOW_CONFIG_FILENAME,
TEXT_WINDOW_CONFIG_FILENAME,
DIKTAT_WINDOW_CONFIG_FILENAME,
DISKUSSION_WINDOW_CONFIG_FILENAME,
SETTINGS_WINDOW_CONFIG_FILENAME,
TEXTBLOECKE_CONFIG_FILENAME,
TEMPLATES_CONFIG_FILENAME,
OP_BERICHT_TEMPLATE_CONFIG_FILENAME,
ARZTBRIEF_VORLAGE_CONFIG_FILENAME,
TODO_CONFIG_FILENAME,
TODO_WINDOW_CONFIG_FILENAME,
TODO_INBOX_CONFIG_FILENAME,
TODO_SETTINGS_CONFIG_FILENAME,
NOTES_CONFIG_FILENAME,
CHECKLIST_CONFIG_FILENAME,
USER_PROFILE_CONFIG_FILENAME,
OPACITY_CONFIG_FILENAME,
AUTOTEXT_CONFIG_FILENAME,
FONT_SCALE_CONFIG_FILENAME,
BUTTON_SCALE_CONFIG_FILENAME,
TOKEN_USAGE_CONFIG_FILENAME,
KG_DETAIL_LEVEL_CONFIG_FILENAME,
SOAP_SECTION_LEVELS_CONFIG_FILENAME,
FONT_SIZES_CONFIG_FILENAME,
PANED_POSITIONS_CONFIG_FILENAME,
DEFAULT_OPACITY,
MIN_OPACITY,
DEFAULT_FONT_SCALE,
MIN_FONT_SCALE,
MAX_FONT_SCALE,
DEFAULT_BUTTON_SCALE,
MIN_BUTTON_SCALE,
MAX_BUTTON_SCALE,
_SOAP_SECTIONS,
_SOAP_LABELS,
_DEFAULT_KORREKTUREN,
ARZTBRIEF_VORLAGE_DEFAULT,
KOGU_GRUSS_OPTIONS,
KOGU_GRUSS_CONFIG_FILENAME,
KOGU_TEMPLATES_CONFIG_FILENAME,
DISKUSSION_VORLAGE_CONFIG_FILENAME,
ALLOWED_SUMMARY_MODELS,
DEFAULT_SUMMARY_MODEL,
COMMENT_KEYWORDS,
_SUPABASE_URL,
_SUPABASE_ANON_KEY,
SOAP_ORDER_CONFIG_FILENAME,
SOAP_VISIBILITY_CONFIG_FILENAME,
SOAP_PRESETS_CONFIG_FILENAME,
DEFAULT_SOAP_ORDER,
NUM_SOAP_PRESETS,
BRIEF_PRESETS_CONFIG_FILENAME,
NUM_BRIEF_PRESETS,
BRIEF_PROFILE_DEFAULTS,
LAUNCHER_CONFIG_FILENAME,
DEFAULT_TOKEN_QUOTA,
SOFT_LOCK_THRESHOLD,
AVG_TOKENS_PER_REPORT,
2026-05-11 08:27:44 +02:00
ACTIVATION_CONFIG_FILENAME,
2026-03-25 22:03:39 +01:00
get_writable_data_dir,
)
def _config_path():
return os.path.join(get_writable_data_dir(), CONFIG_FILENAME)
def _window_config_path():
return os.path.join(get_writable_data_dir(), WINDOW_CONFIG_FILENAME)
def _clamp_geometry_str(geom: str, min_w: int, min_h: int) -> str:
"""Begrenzt gespeicherte Geometrie-String auf Mindestgröße (alle Buttons sichtbar)."""
if not geom or "x" not in geom:
return f"{min_w}x{min_h}"
parts = geom.replace("+", "x").split("x")
try:
w = max(min_w, int(parts[0].strip()))
h = max(min_h, int(parts[1].strip()))
if len(parts) >= 4:
return f"{w}x{h}+{parts[2].strip()}+{parts[3].strip()}"
return f"{w}x{h}"
except (ValueError, IndexError):
return f"{min_w}x{min_h}"
def load_window_geometry():
"""Liest gespeicherte Fenstergröße, Position, Sash (Breite) und Transkript-Höhe. Rückgabe: (w, h, x, y, sash_h, sash_v) oder None."""
try:
path = _window_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
parts = f.read().strip().split()
if len(parts) >= 4:
w, h, x, y = int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3])
if w >= 400 and h >= 300:
sash_h = int(parts[4]) if len(parts) >= 5 else None
sash_v = int(parts[5]) if len(parts) >= 6 else None
return (w, h, x, y, sash_h, sash_v)
except Exception:
pass
return None
def save_window_geometry(
width: int, height: int, x: int, y: int, sash: int = None, sash_transcript: int = None
) -> None:
"""Speichert Fenstergröße, Position, Sash (Breite) und Transkript-Höhe dauerhaft."""
try:
with open(_window_config_path(), "w", encoding="utf-8") as f:
if sash is not None and sash_transcript is not None:
f.write(f"{width} {height} {x} {y} {sash} {sash_transcript}\n")
elif sash is not None:
f.write(f"{width} {height} {x} {y} {sash}\n")
else:
f.write(f"{width} {height} {x} {y}\n")
except Exception:
pass
def reset_all_window_positions() -> None:
"""Löscht alle gespeicherten Fensterpositionen, damit beim nächsten Start alle Fenster zentriert öffnen."""
base_dir = get_writable_data_dir()
# Alle Fenster-Geometrie-Dateien
geometry_files = [
WINDOW_CONFIG_FILENAME,
PRUEFEN_WINDOW_CONFIG_FILENAME,
ORDNER_WINDOW_CONFIG_FILENAME,
TEXT_WINDOW_CONFIG_FILENAME,
DIKTAT_WINDOW_CONFIG_FILENAME,
DISKUSSION_WINDOW_CONFIG_FILENAME,
SETTINGS_WINDOW_CONFIG_FILENAME,
TODO_WINDOW_CONFIG_FILENAME,
PANED_POSITIONS_CONFIG_FILENAME,
]
# Auch generische Toplevel-Geometrie-Dateien (autotext, interaktionscheck, ki_kontrolle, etc.)
for fname in os.listdir(base_dir):
if fname.startswith("kg_diktat_") and fname.endswith("_geometry.txt"):
geometry_files.append(fname)
# KG Detail-Level und SOAP-Section-Levels zurücksetzen
for cfg_name in (KG_DETAIL_LEVEL_CONFIG_FILENAME, SOAP_SECTION_LEVELS_CONFIG_FILENAME):
cfg_path = os.path.join(base_dir, cfg_name)
try:
if os.path.isfile(cfg_path):
os.remove(cfg_path)
except Exception:
pass
# Alle löschen
deleted = 0
for fname in geometry_files:
path = os.path.join(base_dir, fname)
try:
if os.path.isfile(path):
os.remove(path)
deleted += 1
except Exception:
pass
return deleted
def _opacity_config_path():
return os.path.join(get_writable_data_dir(), OPACITY_CONFIG_FILENAME)
def load_opacity() -> float:
"""Liest die Fenster-Transparenz (0.41.0). Standard 0.9."""
try:
path = _opacity_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
v = float(f.read().strip())
return max(MIN_OPACITY, min(1.0, v))
except Exception:
pass
return DEFAULT_OPACITY
def save_opacity(value: float) -> None:
"""Speichert die Fenster-Transparenz."""
try:
v = max(MIN_OPACITY, min(1.0, value))
with open(_opacity_config_path(), "w", encoding="utf-8") as f:
f.write(str(v))
except Exception:
pass
def _autotext_config_path():
return os.path.join(get_writable_data_dir(), AUTOTEXT_CONFIG_FILENAME)
def _font_scale_config_path():
return os.path.join(get_writable_data_dir(), FONT_SCALE_CONFIG_FILENAME)
def _button_scale_config_path():
return os.path.join(get_writable_data_dir(), BUTTON_SCALE_CONFIG_FILENAME)
def load_font_scale() -> float:
"""Liest den Schriftgrößen-Skalierungsfaktor (0.30.8). Standard 1.0."""
try:
path = _font_scale_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
v = float(f.read().strip())
return max(MIN_FONT_SCALE, min(MAX_FONT_SCALE, v))
except Exception:
pass
return DEFAULT_FONT_SCALE
def save_font_scale(value: float) -> None:
"""Speichert den Schriftgrößen-Skalierungsfaktor."""
try:
v = max(MIN_FONT_SCALE, min(MAX_FONT_SCALE, value))
with open(_font_scale_config_path(), "w", encoding="utf-8") as f:
f.write(str(v))
except Exception:
pass
def load_button_scale() -> float:
"""Liest den Button-Größen-Skalierungsfaktor (0.82.0). Standard 1.0."""
try:
path = _button_scale_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
v = float(f.read().strip())
return max(MIN_BUTTON_SCALE, min(MAX_BUTTON_SCALE, v))
except Exception:
pass
return DEFAULT_BUTTON_SCALE
def save_button_scale(value: float) -> None:
"""Speichert den Button-Größen-Skalierungsfaktor."""
try:
v = max(MIN_BUTTON_SCALE, min(MAX_BUTTON_SCALE, value))
with open(_button_scale_config_path(), "w", encoding="utf-8") as f:
f.write(str(v))
except Exception:
pass
# (Textfeld-Schriftgrößen + add_text_font_size_control sind in aza_ui_helpers.py)
def _token_usage_config_path():
return os.path.join(get_writable_data_dir(), TOKEN_USAGE_CONFIG_FILENAME)
def load_token_usage() -> dict:
"""Liest die Token-Nutzung. Format: {'used': int, 'total': int, 'budget_dollars': float, 'used_dollars': float}"""
try:
path = _token_usage_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.loads(f.read().strip())
return data
except Exception:
pass
return {"used": 0, "total": 1000000, "budget_dollars": 0, "used_dollars": 0}
def save_token_usage(used: int = None, total: int = None, budget_dollars: float = None, used_dollars: float = None) -> None:
"""Speichert die Token-Nutzung."""
try:
current = load_token_usage()
if used is not None:
current["used"] = used
if total is not None:
current["total"] = total
if budget_dollars is not None:
current["budget_dollars"] = budget_dollars
if used_dollars is not None:
current["used_dollars"] = used_dollars
with open(_token_usage_config_path(), "w", encoding="utf-8") as f:
json.dump(current, f)
except Exception:
pass
def add_token_usage(tokens: int) -> None:
"""Fügt verbrauchte Tokens hinzu."""
try:
data = load_token_usage()
data["used"] = data.get("used", 0) + tokens
save_token_usage(used=data["used"])
except Exception:
pass
def get_remaining_tokens() -> int:
"""Verbleibende KI-Einheiten (Token-Guthaben)."""
data = load_token_usage()
return max(0, data.get("total", DEFAULT_TOKEN_QUOTA) - data.get("used", 0))
def get_capacity_fraction() -> float:
"""Anteil verbleibender Kapazität (0.0 1.0)."""
data = load_token_usage()
total = data.get("total", DEFAULT_TOKEN_QUOTA)
if total <= 0:
return 1.0
return max(0.0, min(1.0, (total - data.get("used", 0)) / total))
def is_capacity_low() -> bool:
"""True wenn verbleibende Kapazität unter dem Soft-Lock-Schwellenwert liegt."""
return get_remaining_tokens() <= SOFT_LOCK_THRESHOLD
def estimated_reports_remaining() -> int:
"""Geschätzte verbleibende Berichte basierend auf Durchschnittsverbrauch."""
remaining = get_remaining_tokens()
if AVG_TOKENS_PER_REPORT <= 0:
return 0
return remaining // AVG_TOKENS_PER_REPORT
def reset_token_allowance(total: int = None) -> None:
"""Setzt das Token-Guthaben zurück (Admin-Funktion)."""
if total is None:
total = DEFAULT_TOKEN_QUOTA
save_token_usage(used=0, total=total)
# ─── Installations-Standort (anonymisiert) ───────────────────────────────────
_LOCATION_FILENAME = "aza_installation_location.json"
def _location_path() -> str:
return os.path.join(get_writable_data_dir(), _LOCATION_FILENAME)
def log_installation_location() -> dict | None:
"""Ermittelt den ungefähren Standort via IP (ip-api.com) und speichert ihn anonymisiert.
Gespeichert werden nur Stadt, Region und Land keine IP-Adresse.
Wird im Hintergrund-Thread aufgerufen, blockiert die UI nicht.
"""
import urllib.request
try:
req = urllib.request.Request(
"http://ip-api.com/json/?fields=status,city,regionName,country,countryCode",
headers={"User-Agent": "AZA-Desktop/1.0"},
)
resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read().decode("utf-8"))
if data.get("status") != "success":
return None
location = {
"city": data.get("city", ""),
"region": data.get("regionName", ""),
"country": data.get("country", ""),
"country_code": data.get("countryCode", ""),
"updated": datetime.now().isoformat(timespec="seconds"),
}
with open(_location_path(), "w", encoding="utf-8") as f:
json.dump(location, f, ensure_ascii=False, indent=2)
return location
except Exception:
return None
def load_installation_location() -> dict:
"""Liest den gespeicherten Installations-Standort."""
try:
path = _location_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def get_location_display() -> str:
"""Gibt den Standort als lesbaren String zurück."""
loc = load_installation_location()
if not loc or not loc.get("city"):
return "Nicht ermittelt"
parts = [loc.get("city", "")]
if loc.get("region"):
parts.append(loc["region"])
if loc.get("country"):
parts.append(loc["country"])
return ", ".join(parts)
_REGISTRY_URL = "https://aza-medwork.ch/api/installations"
def register_installation() -> int | None:
"""Registriert diese Installation anonym am AZA-Netzwerk und gibt die
Gesamt-Anzahl einzigartiger Installationen zurück.
Falls der Server nicht erreichbar ist, wird None zurückgegeben.
Die Geräte-ID wird als anonymer SHA256-Hash gesendet.
"""
import urllib.request
import platform
try:
raw = f"{platform.node()}-{platform.machine()}-{os.getlogin()}"
device_hash = hashlib.sha256(raw.encode()).hexdigest()[:16]
except Exception:
device_hash = "unknown"
loc = load_installation_location()
payload = json.dumps({
"device_id": device_hash,
"city": loc.get("city", ""),
"country_code": loc.get("country_code", ""),
}).encode("utf-8")
try:
req = urllib.request.Request(
_REGISTRY_URL,
data=payload,
headers={
"Content-Type": "application/json",
"User-Agent": "AZA-Desktop/1.0",
},
method="POST",
)
resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read().decode("utf-8"))
count = data.get("total_installations")
if isinstance(count, int):
_save_cached_install_count(count)
return count
except Exception:
pass
return None
def _cached_install_count_path() -> str:
return os.path.join(get_writable_data_dir(), "install_count_cache.json")
def _save_cached_install_count(count: int):
try:
path = _cached_install_count_path()
with open(path, "w", encoding="utf-8") as f:
json.dump({"count": count, "updated": datetime.now().isoformat()}, f)
except Exception:
pass
def get_install_count() -> tuple[int, bool]:
"""Gibt (Anzahl, ist_live) zurück.
Versucht zuerst den Server, fällt auf Cache zurück, zuletzt auf 1.
"""
live = register_installation()
if live is not None:
return live, True
try:
path = _cached_install_count_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
cached = data.get("count", 1)
if isinstance(cached, int) and cached > 0:
return cached, False
except Exception:
pass
return 1, False
def fetch_openai_usage(client) -> dict:
"""Ruft echte Verbrauchs-Daten von OpenAI ab."""
try:
# OpenAI API Key aus Client extrahieren
api_key = client.api_key if hasattr(client, 'api_key') else None
if not api_key:
return None
# Verwende httpx (bereits von openai installiert)
import httpx
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Verbrauch der letzten 30 Tage abrufen
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
url = f"https://api.openai.com/v1/usage?start_date={start_date.strftime('%Y-%m-%d')}&end_date={end_date.strftime('%Y-%m-%d')}"
with httpx.Client(timeout=10.0) as http_client:
response = http_client.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
# Summiere Verbrauch aus allen Tagen
total_cost = 0
for day_data in data.get("data", []):
total_cost += day_data.get("cost", 0) / 100 # Cent to Dollar
return {
"used_dollars": total_cost,
"success": True
}
else:
return {
"error": f"API returned status {response.status_code}",
"success": False
}
except Exception as e:
return {
"error": str(e),
"success": False
}
return None
2026-05-06 22:43:22 +02:00
def _coerce_autotext_entry_values(entries):
"""Stellt sicher, dass Autotext-Einträge String-Werte haben (globaler Hook)."""
out = {}
if not isinstance(entries, dict):
return out
for k, v in entries.items():
if not isinstance(k, str) or not k.strip():
continue
if isinstance(v, str):
out[k] = v
elif isinstance(v, dict) and "text" in v:
out[k] = str(v.get("text") or "")
return out
2026-03-25 22:03:39 +01:00
def load_autotext() -> dict:
2026-04-19 20:41:37 +02:00
"""Laedt Autotext-Einstellungen. ACHTUNG: Der globale Autotext-Listener
in basis14.py nutzt einen RAM-Cache und ruft diese Funktion NICHT im
Hook-Callback auf. Das ist bewusst so (Windows LowLevelHooksTimeout).
Bitte diese Funktion NICHT in on_press/on_release aufrufen."""
2026-03-25 22:03:39 +01:00
try:
path = _autotext_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
2026-05-06 22:43:22 +02:00
out = {
2026-03-25 22:03:39 +01:00
"enabled": data.get("enabled", True),
2026-05-06 22:43:22 +02:00
"entries": _coerce_autotext_entry_values(
data.get("entries") if isinstance(data.get("entries"), dict) else {}),
2026-03-25 22:03:39 +01:00
"diktat_auto_start": data.get("diktat_auto_start", True),
"notizen_open_on_start": data.get("notizen_open_on_start", data.get("diktat_open_on_start", True)),
"textbloecke_visible": data.get("textbloecke_visible", True),
"addon_visible": data.get("addon_visible", True),
"addon_buttons": data.get("addon_buttons", {
"uebersetzer": True,
"email": True,
"autotext": True,
"whatsapp": True,
"docapp": True
}),
"kg_auto_delete_old": data.get("kg_auto_delete_old", False),
"textbloecke_collapsed": data.get("textbloecke_collapsed", False),
"status_color": data.get("status_color", "#BD4500"),
"soap_collapsed": data.get("soap_collapsed", False),
2026-05-06 22:43:22 +02:00
"entry_meta": data.get("entry_meta") if isinstance(data.get("entry_meta"), dict) else {},
"workspace_backup_ts": data.get("workspace_backup_ts"),
"office_sidebar_textbloecke_open": data.get(
"office_sidebar_textbloecke_open", True),
2026-03-25 22:03:39 +01:00
"autoOpenNews": data.get("autoOpenNews", False),
"autoOpenEvents": data.get("autoOpenEvents", True),
"newsTemplate": data.get("newsTemplate", "all"),
"newsSelectedSpecialties": data.get("newsSelectedSpecialties", []),
"newsSelectedRegions": data.get("newsSelectedRegions", ["CH", "EU"]),
"newsSort": data.get("newsSort", "newest"),
"eventsSelectedSpecialties": data.get("eventsSelectedSpecialties", ["general-medicine"]),
"eventsSelectedRegions": data.get("eventsSelectedRegions", ["CH", "EU"]),
"eventsTemplate": data.get("eventsTemplate", "general_ch_eu"),
"eventsSort": data.get("eventsSort", "soonest"),
"eventsMonthsAhead": int(data.get("eventsMonthsAhead", 13)),
"selectedLanguage": data.get("selectedLanguage", "system"),
"user_specialty_default": data.get("user_specialty_default", "dermatology"),
"user_specialties_selected": data.get("user_specialties_selected", []),
"ui_font_delta": int(data.get("ui_font_delta", -2)),
"global_right_click_paste": data.get("global_right_click_paste", True),
"todo_auto_open": data.get("todo_auto_open", False),
"autocopy_after_diktat": data.get("autocopy_after_diktat", True),
2026-05-06 22:43:22 +02:00
"kommentare_auto_open": data.get("kommentare_auto_open", False),
"empfang_auto_open": data.get("empfang_auto_open", False),
"empfang_was_open": data.get("empfang_was_open", False),
"empfang_prefs": data.get("empfang_prefs", {}),
"medikament_quelle": data.get("medikament_quelle", "compendium.ch"),
"diagnose_quelle": data.get("diagnose_quelle", ""),
"dokumente_collapsed": data.get("dokumente_collapsed", False),
"active_brief_profile": data.get("active_brief_profile", ""),
"stilprofil_enabled": data.get("stilprofil_enabled", False),
"stilprofil_name": data.get("stilprofil_name", ""),
"stilprofil_default_brief": data.get("stilprofil_default_brief", False),
2026-03-25 22:03:39 +01:00
}
2026-05-06 22:43:22 +02:00
return out
2026-03-25 22:03:39 +01:00
except Exception:
pass
return {
"enabled": True, "entries": {}, "diktat_auto_start": True,
"notizen_open_on_start": True,
"textbloecke_visible": True, "addon_visible": True,
"addon_buttons": {
"uebersetzer": True,
"email": True,
"autotext": True,
"whatsapp": True,
"docapp": True
},
"kg_auto_delete_old": False,
"textbloecke_collapsed": False,
"status_color": "#BD4500",
"soap_collapsed": False,
2026-05-06 22:43:22 +02:00
"entry_meta": {},
"workspace_backup_ts": "",
"office_sidebar_textbloecke_open": True,
2026-03-25 22:03:39 +01:00
"dokumente_collapsed": False,
"autoOpenNews": False,
"autoOpenEvents": True,
"newsTemplate": "all",
"newsSelectedSpecialties": [],
"newsSelectedRegions": ["CH", "EU"],
"newsSort": "newest",
"eventsSelectedSpecialties": ["general-medicine"],
"eventsSelectedRegions": ["CH", "EU"],
"eventsTemplate": "general_ch_eu",
"eventsSort": "soonest",
"eventsMonthsAhead": 13,
"selectedLanguage": "system",
"user_specialty_default": "dermatology",
"user_specialties_selected": [],
"ui_font_delta": -2,
"global_right_click_paste": True,
"todo_auto_open": False,
"autocopy_after_diktat": True,
"kommentare_auto_open": False,
"medikament_quelle": "compendium.ch",
"diagnose_quelle": "",
"active_brief_profile": "",
"stilprofil_enabled": False,
"stilprofil_name": "",
"stilprofil_default_brief": False,
}
def save_autotext(data: dict) -> None:
"""Speichert Autotext inkl. News/Event-Settings dauerhaft."""
try:
with open(_autotext_config_path(), "w", encoding="utf-8") as f:
json.dump(
{
"enabled": data.get("enabled", True),
2026-05-06 22:43:22 +02:00
"entries": _coerce_autotext_entry_values(data.get("entries") or {}),
2026-03-25 22:03:39 +01:00
"diktat_auto_start": data.get("diktat_auto_start", True),
"notizen_open_on_start": data.get("notizen_open_on_start", data.get("diktat_open_on_start", True)),
"textbloecke_visible": data.get("textbloecke_visible", True),
"addon_visible": data.get("addon_visible", True),
"kg_auto_delete_old": data.get("kg_auto_delete_old", False),
"addon_buttons": data.get("addon_buttons", {}),
"textbloecke_collapsed": data.get("textbloecke_collapsed", False),
"status_color": data.get("status_color", "#BD4500"),
"soap_collapsed": data.get("soap_collapsed", False),
2026-05-06 22:43:22 +02:00
"entry_meta": data.get("entry_meta") if isinstance(data.get("entry_meta"), dict) else {},
"workspace_backup_ts": data.get("workspace_backup_ts"),
"office_sidebar_textbloecke_open": data.get(
"office_sidebar_textbloecke_open", True),
2026-03-25 22:03:39 +01:00
"autoOpenNews": bool(data.get("autoOpenNews", False)),
"autoOpenEvents": bool(data.get("autoOpenEvents", True)),
"newsTemplate": data.get("newsTemplate", "all"),
"newsSelectedSpecialties": data.get("newsSelectedSpecialties", []),
"newsSelectedRegions": data.get("newsSelectedRegions", ["CH", "EU"]),
"newsSort": data.get("newsSort", "newest"),
"eventsSelectedSpecialties": data.get("eventsSelectedSpecialties", ["general-medicine"]),
"eventsSelectedRegions": data.get("eventsSelectedRegions", ["CH", "EU"]),
"eventsTemplate": data.get("eventsTemplate", "general_ch_eu"),
"eventsSort": data.get("eventsSort", "soonest"),
"eventsMonthsAhead": int(data.get("eventsMonthsAhead", 13)),
"selectedLanguage": data.get("selectedLanguage", "system"),
"user_specialty_default": data.get("user_specialty_default", "dermatology"),
"user_specialties_selected": data.get("user_specialties_selected", []),
"ui_font_delta": int(data.get("ui_font_delta", -2)),
"global_right_click_paste": bool(data.get("global_right_click_paste", True)),
"todo_auto_open": bool(data.get("todo_auto_open", False)),
"autocopy_after_diktat": bool(data.get("autocopy_after_diktat", True)),
"kommentare_auto_open": bool(data.get("kommentare_auto_open", False)),
2026-04-19 20:41:37 +02:00
"empfang_auto_open": bool(data.get("empfang_auto_open", False)),
"empfang_was_open": bool(data.get("empfang_was_open", False)),
"empfang_prefs": data.get("empfang_prefs", {}),
2026-03-25 22:03:39 +01:00
"medikament_quelle": data.get("medikament_quelle", "compendium.ch"),
"diagnose_quelle": data.get("diagnose_quelle", ""),
"dokumente_collapsed": bool(data.get("dokumente_collapsed", False)),
"active_brief_profile": data.get("active_brief_profile", ""),
"stilprofil_enabled": bool(data.get("stilprofil_enabled", False)),
"stilprofil_name": data.get("stilprofil_name", ""),
"stilprofil_default_brief": bool(data.get("stilprofil_default_brief", False)),
},
f, ensure_ascii=False, indent=2,
)
except Exception:
pass
def is_autocopy_after_diktat_enabled() -> bool:
"""Ob nach Diktat/Transkription automatisch in Zwischenablage kopiert wird (Standard: ja)."""
try:
return bool(load_autotext().get("autocopy_after_diktat", True))
except Exception:
return True
def is_global_right_click_paste_enabled() -> bool:
"""Ob Rechtsklick in externen Apps direkt einfügt (Standard: ja)."""
try:
return bool(load_autotext().get("global_right_click_paste", True))
except Exception:
return True
def save_autocopy_prefs(autocopy: bool | None = None, global_right_click: bool | None = None) -> None:
"""Speichert Autocopy/Rechtsklick-Einstellungen (nur gegebene Werte werden aktualisiert)."""
try:
data = load_autotext()
if autocopy is not None:
data["autocopy_after_diktat"] = bool(autocopy)
if global_right_click is not None:
data["global_right_click_paste"] = bool(global_right_click)
save_autotext(data)
except Exception:
pass
def _is_admin() -> bool:
"""Prüft, ob die Anwendung mit Administratorrechten läuft (für globalen Autotext)."""
if sys.platform != "win32":
return False
try:
import ctypes
return bool(ctypes.windll.shell32.IsUserAnAdmin())
except Exception:
return False
def _run_as_admin() -> bool:
"""Startet die Anwendung mit Administratorrechten neu. Beendet die aktuelle Instanz."""
if sys.platform != "win32":
return False
try:
import ctypes
args = " ".join([f'"{a}"' if " " in a else a for a in sys.argv])
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, args, None, 1)
sys.exit(0)
return True
except Exception:
return False
def sanitize_markdown_for_plain_text(raw_text: str) -> str:
"""Entfernt Markdown-Syntax für sauberen Plain-Text (ohne *, #, etc.)."""
lines = (raw_text or "").replace("\r\n", "\n").replace("\r", "\n").split("\n")
out_lines = []
for raw_line in lines:
line = raw_line
line = re.sub(r"^\s*#{1,6}\s+", "", line)
line = re.sub(r"^\s*\d+\.\s+", "", line)
line = re.sub(r"^\s*[-*•]\s+", "", line)
line = re.sub(r"\*\*(.+?)\*\*", r"\1", line)
line = re.sub(r"__(.+?)__", r"\1", line)
line = re.sub(r"(?<!\*)\*(?!\s)(.+?)(?<!\s)\*(?!\*)", r"\1", line)
line = re.sub(r"(?<!_)_(?!\s)(.+?)(?<!\s)_(?!_)", r"\1", line)
out_lines.append(line)
return "\n".join(out_lines).strip()
def _win_clipboard_set(text: str, html_fragment: str = None) -> bool:
"""Text in Windows-Zwischenablage (für globalen Autotext per Strg+V)."""
if sys.platform != "win32":
return False
try:
import ctypes
from ctypes import wintypes
CF_UNICODETEXT = 13
GMEM_MOVEABLE = 0x0002
GMEM_DDESHARE = 0x2000
alloc_flags = GMEM_MOVEABLE | GMEM_DDESHARE
kernel32 = ctypes.WinDLL("kernel32")
user32 = ctypes.WinDLL("user32")
user32.OpenClipboard.argtypes = [wintypes.HWND]
user32.OpenClipboard.restype = wintypes.BOOL
user32.CloseClipboard.argtypes = []
user32.EmptyClipboard.argtypes = []
user32.RegisterClipboardFormatW.argtypes = [wintypes.LPCWSTR]
user32.RegisterClipboardFormatW.restype = wintypes.UINT
user32.SetClipboardData.argtypes = [wintypes.UINT, wintypes.HANDLE]
user32.SetClipboardData.restype = wintypes.HANDLE
kernel32.GlobalAlloc.argtypes = [wintypes.UINT, ctypes.c_size_t]
kernel32.GlobalAlloc.restype = wintypes.HGLOBAL
kernel32.GlobalLock.argtypes = [wintypes.HGLOBAL]
kernel32.GlobalLock.restype = ctypes.c_void_p
kernel32.GlobalUnlock.argtypes = [wintypes.HGLOBAL]
def _set_clipboard_data(fmt: int, payload: bytes) -> bool:
h = kernel32.GlobalAlloc(alloc_flags, len(payload))
if not h:
return False
ptr = kernel32.GlobalLock(h)
if not ptr:
return False
ctypes.memmove(ptr, payload, len(payload))
kernel32.GlobalUnlock(h)
return bool(user32.SetClipboardData(fmt, h))
def _inline_markdown_to_html(line: str) -> str:
escaped = html.escape(line)
escaped = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", escaped)
escaped = re.sub(r"__(.+?)__", r"<strong>\1</strong>", escaped)
escaped = re.sub(r"(?<!\*)\*(?!\s)(.+?)(?<!\s)\*(?!\*)", r"<em>\1</em>", escaped)
escaped = re.sub(r"(?<!_)_(?!\s)(.+?)(?<!\s)_(?!_)", r"<em>\1</em>", escaped)
return escaped
def _markdown_like_to_html(raw_text: str) -> str:
lines = (raw_text or "").replace("\r\n", "\n").replace("\r", "\n").split("\n")
html_parts = []
in_ul = False
in_ol = False
def _close_lists():
nonlocal in_ul, in_ol
if in_ul:
html_parts.append("</ul>")
in_ul = False
if in_ol:
html_parts.append("</ol>")
in_ol = False
for raw_line in lines:
line = raw_line.rstrip()
stripped = line.strip()
if not stripped:
_close_lists()
html_parts.append("<br>")
continue
m_head = re.match(r"^(#{1,6})\s+(.*)$", stripped)
if m_head:
_close_lists()
level = min(6, len(m_head.group(1)))
html_parts.append(f"<h{level}>{_inline_markdown_to_html(m_head.group(2))}</h{level}>")
continue
m_ol = re.match(r"^\d+\.\s+(.*)$", stripped)
if m_ol:
if in_ul:
html_parts.append("</ul>")
in_ul = False
if not in_ol:
html_parts.append("<ol>")
in_ol = True
html_parts.append(f"<li>{_inline_markdown_to_html(m_ol.group(1))}</li>")
continue
m_ul = re.match(r"^[-*•]\s+(.*)$", stripped)
if m_ul:
if in_ol:
html_parts.append("</ol>")
in_ol = False
if not in_ul:
html_parts.append("<ul>")
in_ul = True
html_parts.append(f"<li>{_inline_markdown_to_html(m_ul.group(1))}</li>")
continue
_close_lists()
html_parts.append(f"<p>{_inline_markdown_to_html(stripped)}</p>")
_close_lists()
return "".join(html_parts) if html_parts else "<p></p>"
def _build_cf_html_payload(fragment_html: str) -> bytes:
full_html = (
"<html><body><!--StartFragment-->"
+ fragment_html
+ "<!--EndFragment--></body></html>"
)
marker_start = b"<!--StartFragment-->"
marker_end = b"<!--EndFragment-->"
header_template = (
"Version:0.9\r\n"
"StartHTML:{:010d}\r\n"
"EndHTML:{:010d}\r\n"
"StartFragment:{:010d}\r\n"
"EndFragment:{:010d}\r\n"
)
dummy_header = header_template.format(0, 0, 0, 0)
html_bytes = full_html.encode("utf-8")
start_html = len(dummy_header.encode("ascii"))
end_html = start_html + len(html_bytes)
start_fragment = start_html + html_bytes.index(marker_start) + len(marker_start)
end_fragment = start_html + html_bytes.index(marker_end)
header = header_template.format(start_html, end_html, start_fragment, end_fragment)
return header.encode("ascii") + html_bytes + b"\0"
for _ in range(5):
if user32.OpenClipboard(None):
break
time.sleep(0.03)
else:
return False
try:
user32.EmptyClipboard()
plain_text = sanitize_markdown_for_plain_text(text or "")
text_data = (plain_text + "\0").encode("utf-16-le")
ok_text = _set_clipboard_data(CF_UNICODETEXT, text_data)
html_format = user32.RegisterClipboardFormatW("HTML Format")
ok_html = False
if html_format:
fragment = html_fragment if html_fragment is not None else _markdown_like_to_html(text or "")
html_payload = _build_cf_html_payload(fragment)
ok_html = _set_clipboard_data(html_format, html_payload)
return bool(ok_text or ok_html)
finally:
user32.CloseClipboard()
except Exception:
return False
def _win_clipboard_get() -> str:
"""Text aus Windows-Zwischenablage lesen."""
if sys.platform != "win32":
return ""
try:
import ctypes
from ctypes import wintypes
CF_UNICODETEXT = 13
user32 = ctypes.WinDLL("user32")
kernel32 = ctypes.WinDLL("kernel32")
user32.OpenClipboard.argtypes = [wintypes.HWND]
user32.OpenClipboard.restype = wintypes.BOOL
user32.GetClipboardData.argtypes = [wintypes.UINT]
user32.GetClipboardData.restype = wintypes.HANDLE
user32.CloseClipboard.argtypes = []
user32.CloseClipboard.restype = wintypes.BOOL
kernel32.GlobalLock.argtypes = [wintypes.HGLOBAL]
kernel32.GlobalLock.restype = ctypes.c_void_p
kernel32.GlobalUnlock.argtypes = [wintypes.HGLOBAL]
if not user32.OpenClipboard(None):
return ""
try:
h = user32.GetClipboardData(CF_UNICODETEXT)
if not h:
return ""
ptr = kernel32.GlobalLock(h)
if not ptr:
return ""
try:
buf = (ctypes.c_char * 131072).from_address(ptr)
data = bytearray()
for i in range(0, 131070, 2):
if buf[i] == 0 and buf[i + 1] == 0:
break
data.extend([buf[i], buf[i + 1]])
return data.decode("utf-16-le", errors="ignore")
finally:
kernel32.GlobalUnlock(h)
finally:
user32.CloseClipboard()
except Exception:
return ""
def _signature_config_path():
return os.path.join(get_writable_data_dir(), SIGNATURE_CONFIG_FILENAME)
def load_signature_name(fallback_to_profile: bool = True) -> str:
"""Liest den gespeicherten Namen für die Unterschrift.
Wenn leer und fallback_to_profile=True, wird der Profilname verwendet."""
try:
path = _signature_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
val = f.read().strip()
if val:
return val
except Exception:
pass
if fallback_to_profile:
try:
profile = load_user_profile()
return profile.get("name", "")
except Exception:
pass
return ""
def save_signature_name(name: str) -> None:
"""Speichert den Namen für die Unterschrift."""
try:
with open(_signature_config_path(), "w", encoding="utf-8") as f:
f.write((name or "").strip())
except Exception:
pass
def _korrekturen_config_path():
return os.path.join(get_writable_data_dir(), KORREKTUREN_CONFIG_FILENAME)
def load_korrekturen() -> dict:
"""Lädt die Korrekturen-Datenbank: {'medikamente': {falsch: richtig}, 'diagnosen': {...}}."""
try:
path = _korrekturen_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
for cat, defaults in _DEFAULT_KORREKTUREN.items():
if cat not in data:
data[cat] = {}
for falsch, richtig in defaults.items():
if falsch not in data[cat]:
data[cat][falsch] = richtig
return data
except Exception:
pass
return {cat: dict(mapping) for cat, mapping in _DEFAULT_KORREKTUREN.items()}
def save_korrekturen(data: dict) -> None:
"""Speichert die Korrekturen-Datenbank."""
try:
with open(_korrekturen_config_path(), "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception:
pass
def _ablage_base_path():
"""Pfad zum Ablage-Basisordner: Dokumente/KG_Diktat_Ablage."""
docs = os.path.join(os.path.expanduser("~"), "Documents")
if not os.path.isdir(docs):
docs = os.path.expanduser("~")
return os.path.join(docs, "KG_Diktat_Ablage")
def _ablage_json_path():
"""Eine zentrale JSON-Datei Inhalt wird hier zuverlässig gespeichert."""
return os.path.join(_ablage_base_path(), "ablage.json")
def ensure_ablage_dirs():
"""Erstellt Basisordner und alle Unterordner."""
base = _ablage_base_path()
os.makedirs(base, exist_ok=True)
for sub in ABLAGE_SUBFOLDERS:
os.makedirs(os.path.join(base, sub), exist_ok=True)
def _load_ablage_json():
"""Lädt ablage.json. Rückgabe: {"KG": [{"content": "...", "name": "..."}], ...}."""
path = _ablage_json_path()
if not os.path.isfile(path):
return {c: [] for c in ABLAGE_SUBFOLDERS}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
return {c: [] for c in ABLAGE_SUBFOLDERS}
for c in ABLAGE_SUBFOLDERS:
if c not in data or not isinstance(data[c], list):
data[c] = []
return data
except Exception:
return {c: [] for c in ABLAGE_SUBFOLDERS}
def _save_ablage_json(data: dict) -> bool:
"""Schreibt ablage.json. Rückgabe: True bei Erfolg."""
try:
ensure_ablage_dirs()
path = _ablage_json_path()
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return True
except Exception:
return False
def save_to_ablage(category: str, content: str):
"""
Speichert in ablage.json (eine zentrale JSON-Datei).
Laden erfolgt über die App (Ordner Auswählen Ausgewählte Datei in App laden).
Rückgabe: Pfad der ablage.json bei Erfolg, sonst None.
"""
if category not in ABLAGE_SUBFOLDERS:
try:
messagebox.showerror("Speichern", f"Unbekannte Kategorie: {category}")
except Exception:
pass
return None
raw = content if isinstance(content, str) else (str(content) if content is not None else "")
content = raw.strip()
if not content:
return None
try:
ensure_ablage_dirs()
label = ABLAGE_LABELS.get(category, category)
now = datetime.now()
date_str = now.strftime("%d.%m.%Y")
time_str = now.strftime("%H:%M")
data = _load_ablage_json()
n = len(data.get(category, [])) + 1
name = f"{n} {label} {date_str} {time_str}.txt"
entry = {"content": content, "name": name}
data.setdefault(category, []).append(entry)
if not _save_ablage_json(data):
raise RuntimeError("ablage.json konnte nicht geschrieben werden.")
return _ablage_json_path()
except Exception as e:
try:
messagebox.showerror("Speichern fehlgeschlagen", f"Pfad: {_ablage_base_path()}\nFehler: {e}")
except Exception:
pass
return None
def list_ablage_files(category: str):
"""Listet Einträge aus ablage.json, neueste zuoberst (höchste Nummer zuerst)."""
data = _load_ablage_json()
names = [e.get("name", "") for e in data.get(category, []) if isinstance(e, dict) and e.get("name")]
def sort_key(name):
m = re.match(r"^(\d+)", str(name))
return (-int(m.group(1)), name) if m else (0, name)
names.sort(key=sort_key)
return names
def _parse_entry_date(name: str):
"""Parst Datum aus Eintragsnamen (z.B. '1 KG 04.02.2026 10.txt') → datetime oder None."""
if not name:
return None
m = re.search(r"(\d{2})\.(\d{2})\.(\d{4})", str(name))
if not m:
return None
try:
d, mo, y = int(m.group(1)), int(m.group(2)), int(m.group(3))
return datetime(y, mo, d)
except (ValueError, IndexError):
return None
def get_old_kg_entries(days: int = 14):
"""Liefert KG-Einträge, die älter als days Tage sind."""
data = _load_ablage_json()
entries = data.get("KG", [])
if not isinstance(entries, list):
return []
cutoff = datetime.now() - timedelta(days=days)
return [e for e in entries if isinstance(e, dict) and _parse_entry_date(e.get("name", "")) and _parse_entry_date(e.get("name", "")) < cutoff]
def delete_kg_entries_older_than(days: int = 14) -> int:
"""Löscht KG-Einträge älter als days Tage. Rückgabe: Anzahl gelöschter Einträge."""
return delete_entries_older_than("KG", days=days)
def count_entries_older_than(category: str, days: int = 14) -> int:
"""Zählt Einträge einer Kategorie, die älter als days Tage sind."""
if category not in ABLAGE_SUBFOLDERS:
return 0
data = _load_ablage_json()
entries = data.get(category, [])
if not isinstance(entries, list):
return 0
cutoff = datetime.now() - timedelta(days=days)
old_entries = [
e
for e in entries
if isinstance(e, dict)
and _parse_entry_date(e.get("name", ""))
and _parse_entry_date(e.get("name", "")) < cutoff
]
return len(old_entries)
def delete_entries_older_than(category: str, days: int = 14) -> int:
"""Löscht Einträge einer Kategorie, die älter als days Tage sind. Rückgabe: Anzahl gelöschter Einträge."""
if category not in ABLAGE_SUBFOLDERS:
return 0
data = _load_ablage_json()
entries = data.get(category, [])
if not isinstance(entries, list):
return 0
cutoff = datetime.now() - timedelta(days=days)
kept = [e for e in entries if not isinstance(e, dict) or not _parse_entry_date(e.get("name", "")) or _parse_entry_date(e.get("name", "")) >= cutoff]
deleted = len(entries) - len(kept)
if deleted > 0:
data[category] = kept
_save_ablage_json(data)
return deleted
def delete_all_ablage_entries(category: str) -> int:
"""Löscht alle Einträge einer Kategorie. Rückgabe: Anzahl gelöschter Einträge."""
if category not in ABLAGE_SUBFOLDERS:
return 0
data = _load_ablage_json()
count = len(data.get(category, []))
if count > 0:
data[category] = []
_save_ablage_json(data)
return count
def get_ablage_content(category: str, filename: str) -> str:
"""Liest Inhalt aus ablage.json (Eintrag anhand name). Liefert nur Text, nie JSON-Rohdaten."""
if not filename or filename == "ablage.json":
return ""
data = _load_ablage_json()
for e in data.get(category, []):
if isinstance(e, dict) and e.get("name") == filename:
return (e.get("content") or "").strip() or ""
return ""
def _pruefen_window_config_path():
return os.path.join(get_writable_data_dir(), PRUEFEN_WINDOW_CONFIG_FILENAME)
def _ordner_window_config_path():
return os.path.join(get_writable_data_dir(), ORDNER_WINDOW_CONFIG_FILENAME)
def _text_window_config_path():
return os.path.join(get_writable_data_dir(), TEXT_WINDOW_CONFIG_FILENAME)
def _diktat_window_config_path():
return os.path.join(get_writable_data_dir(), DIKTAT_WINDOW_CONFIG_FILENAME)
def _diskussion_window_config_path():
return os.path.join(get_writable_data_dir(), DISKUSSION_WINDOW_CONFIG_FILENAME)
def _settings_window_config_path():
return os.path.join(get_writable_data_dir(), SETTINGS_WINDOW_CONFIG_FILENAME)
def load_settings_geometry() -> str:
"""Liest gespeicherte Geometry des Einstellungs-Fensters (z. B. '460x300+100+50')."""
try:
path = _settings_window_config_path()
if os.path.isfile(path):
geom = open(path, "r", encoding="utf-8").read().strip()
if geom:
return geom
except Exception:
pass
return ""
def save_settings_geometry(geom: str) -> None:
"""Speichert Größe und Position des Einstellungs-Fensters."""
try:
if geom:
with open(_settings_window_config_path(), "w", encoding="utf-8") as f:
f.write(geom + "\n")
except Exception:
pass
def _textbloecke_config_path():
return os.path.join(get_writable_data_dir(), TEXTBLOECKE_CONFIG_FILENAME)
def load_textbloecke():
"""Lädt die Textblöcke: {"1": {"name": "...", "content": "..."}, ...}. Mindestens 2 Slots."""
try:
path = _textbloecke_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
out = {}
for k, v in data.items():
if isinstance(k, str) and k.isdigit() and isinstance(v, dict):
2026-05-06 22:43:22 +02:00
out[k] = {
"name": (v.get("name") or "").strip(),
"content": v.get("content") or "",
"updated_at": (v.get("updated_at") or "").strip(),
}
2026-03-25 22:03:39 +01:00
slots = sorted(out.keys(), key=int)
if len(slots) >= 2:
return {s: out[s] for s in slots}
except Exception:
pass
return {"1": {"name": "Textblock 1", "content": ""}, "2": {"name": "Textblock 2", "content": ""}}
def save_textbloecke(data: dict) -> None:
"""Speichert die Textblöcke dauerhaft. Alle Slots werden gespeichert."""
try:
full = {}
for k, v in (data or {}).items():
if isinstance(k, str) and isinstance(v, dict):
2026-05-06 22:43:22 +02:00
full[k] = {
"name": (v.get("name") or "").strip(),
"content": v.get("content") or "",
"updated_at": (v.get("updated_at") or "").strip(),
}
2026-03-25 22:03:39 +01:00
if len(full) < 2:
return
with open(_textbloecke_config_path(), "w", encoding="utf-8") as f:
json.dump(full, f, ensure_ascii=False, indent=2)
f.flush()
try:
os.fsync(f.fileno())
except Exception:
pass
except Exception:
pass
def load_pruefen_geometry():
"""Liest gespeicherte Größe und Position des Prüfen-Fensters. Rückgabe: (w, h, x, y) oder (w, h) oder None."""
try:
path = _pruefen_window_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
parts = f.read().strip().split()
if len(parts) >= 2:
w, h = int(parts[0]), int(parts[1])
if w >= 300 and h >= 250:
if len(parts) >= 4:
x, y = int(parts[2]), int(parts[3])
return (w, h, x, y)
return (w, h)
except Exception:
pass
return None
def save_pruefen_geometry(width: int, height: int, x: int = None, y: int = None) -> None:
"""Speichert Größe und Position des Prüfen-Fensters."""
try:
with open(_pruefen_window_config_path(), "w", encoding="utf-8") as f:
if x is not None and y is not None:
f.write(f"{width} {height} {x} {y}\n")
else:
f.write(f"{width} {height}\n")
except Exception:
pass
def load_ordner_geometry() -> str:
"""Liest gespeicherte Geometry des Ordner-Fensters (z. B. '640x500+100+50')."""
try:
path = _ordner_window_config_path()
if os.path.isfile(path):
geom = open(path, "r", encoding="utf-8").read().strip()
if geom:
return geom
except Exception:
pass
return ""
def save_ordner_geometry(geom: str) -> None:
try:
with open(_ordner_window_config_path(), "w", encoding="utf-8") as f:
f.write(geom)
except Exception:
pass
def load_text_window_geometry() -> str:
try:
path = _text_window_config_path()
if os.path.isfile(path):
return open(path, "r", encoding="utf-8").read().strip()
except Exception:
pass
return ""
def save_text_window_geometry(geom: str) -> None:
try:
with open(_text_window_config_path(), "w", encoding="utf-8") as f:
f.write(geom)
except Exception:
pass
def load_diktat_geometry() -> str:
try:
path = _diktat_window_config_path()
if os.path.isfile(path):
return open(path, "r", encoding="utf-8").read().strip()
except Exception:
pass
return ""
def save_diktat_geometry(geom: str) -> None:
try:
with open(_diktat_window_config_path(), "w", encoding="utf-8") as f:
f.write(geom)
except Exception:
pass
def load_diskussion_geometry() -> str:
try:
path = _diskussion_window_config_path()
if os.path.isfile(path):
return open(path, "r", encoding="utf-8").read().strip()
except Exception:
pass
return ""
def save_diskussion_geometry(geom: str) -> None:
try:
with open(_diskussion_window_config_path(), "w", encoding="utf-8") as f:
f.write(geom)
except Exception:
pass
def extract_diagnosen_therapie_procedere(text: str) -> str:
"""Extrahiert nur Diagnosen, Therapie und Procedere aus dem Text."""
if "KRANKENGESCHICHTE:" in text:
kg_part = text.split("TRANSKRIPT:")[0].replace("KRANKENGESCHICHTE:", "").strip()
else:
kg_part = text
lines = kg_part.split("\n")
result = []
in_block = False
target_headers = ("Diagnose:", "Diagnosen:", "Therapie:", "Procedere:")
for line in lines:
stripped = line.strip()
if any(stripped.startswith(h) for h in target_headers):
if result:
result.append("")
result.append(line)
in_block = True
elif in_block:
if stripped and stripped.endswith(":"):
in_block = False
else:
result.append(line)
out = "\n".join(result).strip()
return out if out else kg_part
def _similarity(a: str, b: str) -> float:
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def apply_korrekturen(text: str, korrekturen: dict) -> tuple:
"""Wendet Korrekturen an. Rückgabe: (korrigierter_text, [(falsch, richtig), ...])."""
result = text
applied = []
FUZZY_THRESHOLD = 0.85
for kategorie, mapping in korrekturen.items():
if not isinstance(mapping, dict):
continue
for falsch, richtig in mapping.items():
if not falsch or not richtig:
continue
pattern = r"\b" + re.escape(falsch) + r"\b"
if re.search(pattern, result, re.IGNORECASE):
result = re.sub(pattern, richtig, result, flags=re.IGNORECASE)
applied.append((falsch, richtig))
words = re.findall(r"[A-Za-zÄÖÜäöüß0-9\-]+", result)
for kategorie, mapping in korrekturen.items():
if not isinstance(mapping, dict):
continue
for falsch, richtig in mapping.items():
if not falsch or not richtig or (falsch, richtig) in applied:
continue
for w in set(words):
if len(w) < 4:
continue
if _similarity(w, falsch) >= FUZZY_THRESHOLD:
pattern = r"\b" + re.escape(w) + r"\b"
result = re.sub(pattern, richtig, result)
applied.append((falsch, richtig))
words = re.findall(r"[A-Za-zÄÖÜäöüß0-9\-]+", result)
break
result = result.replace("ß", "ss")
return result, applied
def _kogu_gruss_config_path():
return os.path.join(get_writable_data_dir(), KOGU_GRUSS_CONFIG_FILENAME)
def load_kogu_gruss() -> str:
"""Liest den gespeicherten Schlusssatz für KOGU."""
try:
path = _kogu_gruss_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
s = f.read().strip()
if s in KOGU_GRUSS_OPTIONS:
return s
except Exception:
pass
return KOGU_GRUSS_OPTIONS[0]
def save_kogu_gruss(gruss: str) -> None:
"""Speichert den Schlusssatz für KOGU."""
try:
with open(_kogu_gruss_config_path(), "w", encoding="utf-8") as f:
f.write((gruss or "").strip())
except Exception:
pass
def _kogu_templates_config_path():
return os.path.join(get_writable_data_dir(), KOGU_TEMPLATES_CONFIG_FILENAME)
def load_kogu_templates() -> str:
"""Liest die gespeicherte Vorlage für Kostengutsprachen (eigene Wünsche an Typ/Format/Inhalt)."""
try:
path = _kogu_templates_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
pass
return ""
def save_kogu_templates(text: str) -> None:
"""Speichert die Vorlage für Kostengutsprachen."""
try:
with open(_kogu_templates_config_path(), "w", encoding="utf-8") as f:
f.write((text or "").strip())
except Exception:
pass
def _diskussion_vorlage_config_path():
return os.path.join(get_writable_data_dir(), DISKUSSION_VORLAGE_CONFIG_FILENAME)
def load_diskussion_vorlage() -> str:
"""Liest die Vorlage für die KI-Diskussion (legt fest, wie die KI mit dem Nutzer diskutiert)."""
try:
path = _diskussion_vorlage_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
pass
return ""
def save_diskussion_vorlage(text: str) -> None:
"""Speichert die Vorlage für die KI-Diskussion."""
try:
with open(_diskussion_vorlage_config_path(), "w", encoding="utf-8") as f:
f.write((text or "").strip())
except Exception:
pass
def _op_bericht_template_config_path():
return os.path.join(get_writable_data_dir(), OP_BERICHT_TEMPLATE_CONFIG_FILENAME)
def load_op_bericht_template() -> str:
"""Liest die gespeicherte Vorlage für den OP-Bericht (eigene Wünsche an Format/Inhalt)."""
try:
path = _op_bericht_template_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
pass
return ""
def save_op_bericht_template(text: str) -> None:
"""Speichert die Vorlage für den OP-Bericht."""
try:
with open(_op_bericht_template_config_path(), "w", encoding="utf-8") as f:
f.write((text or "").strip())
except Exception:
pass
def _arztbrief_vorlage_config_path():
return os.path.join(get_writable_data_dir(), ARZTBRIEF_VORLAGE_CONFIG_FILENAME)
def load_arztbrief_vorlage() -> str:
"""Liest die gespeicherte Vorlage für den Arztbrief (Reihenfolge + Anweisungen)."""
try:
path = _arztbrief_vorlage_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
pass
return ARZTBRIEF_VORLAGE_DEFAULT
def save_arztbrief_vorlage(text: str) -> None:
"""Speichert die Vorlage für den Arztbrief."""
try:
with open(_arztbrief_vorlage_config_path(), "w", encoding="utf-8") as f:
f.write((text or "").strip())
except Exception:
pass
def _todo_config_path():
return os.path.join(get_writable_data_dir(), TODO_CONFIG_FILENAME)
def _todo_window_config_path():
return os.path.join(get_writable_data_dir(), TODO_WINDOW_CONFIG_FILENAME)
def load_todos() -> list:
"""Lädt die To-do-Liste. Jedes Item: {id, text, done, date (optional, 'YYYY-MM-DD'), created}."""
try:
path = _todo_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return []
def save_todos(todos: list) -> None:
"""Speichert die To-do-Liste lokal UND pusht in die Cloud."""
try:
with open(_todo_config_path(), "w", encoding="utf-8") as f:
json.dump(todos, f, indent=2, ensure_ascii=False)
except Exception:
pass
import threading
threading.Thread(target=cloud_push_todos, args=(todos,), daemon=True).start()
def _notes_config_path():
return os.path.join(get_writable_data_dir(), NOTES_CONFIG_FILENAME)
def load_notes() -> list:
"""Lädt die Notizen-Liste. Jedes Item: {id, title, text, created}."""
try:
path = _notes_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return []
def save_notes(notes: list) -> None:
"""Speichert die Notizen-Liste lokal UND pusht in die Cloud."""
try:
with open(_notes_config_path(), "w", encoding="utf-8") as f:
json.dump(notes, f, indent=2, ensure_ascii=False)
except Exception:
pass
import threading
threading.Thread(target=cloud_push_notes, args=(notes,), daemon=True).start()
def _checklist_config_path():
return os.path.join(get_writable_data_dir(), CHECKLIST_CONFIG_FILENAME)
def load_checklists() -> list:
"""Lädt die Checklisten. Jede: {id, title, items: [{text, done}], created}."""
try:
path = _checklist_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return []
def save_checklists(checklists: list) -> None:
try:
with open(_checklist_config_path(), "w", encoding="utf-8") as f:
json.dump(checklists, f, indent=2, ensure_ascii=False)
except Exception:
pass
def load_todo_geometry() -> str:
try:
path = _todo_window_config_path()
if os.path.isfile(path):
return open(path, "r", encoding="utf-8").read().strip()
except Exception:
pass
return ""
def save_todo_geometry(geom: str) -> None:
try:
with open(_todo_window_config_path(), "w", encoding="utf-8") as f:
f.write(geom)
except Exception:
pass
def _todo_settings_path():
return os.path.join(get_writable_data_dir(), TODO_SETTINGS_CONFIG_FILENAME)
def load_todo_settings() -> dict:
"""Lädt Todo-Fenster-Einstellungen (aktiver Tab, aktive Kategorie, benutzerdefinierte Kategorien)."""
try:
path = _todo_settings_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def save_todo_settings(settings: dict) -> None:
"""Speichert Todo-Fenster-Einstellungen."""
try:
with open(_todo_settings_path(), "w", encoding="utf-8") as f:
json.dump(settings, f, indent=2, ensure_ascii=False)
except Exception:
pass
def _todo_inbox_path():
return os.path.join(get_writable_data_dir(), TODO_INBOX_CONFIG_FILENAME)
def load_todo_inbox() -> list:
"""Lädt die Inbox (empfangene To-dos von anderen Benutzern)."""
try:
path = _todo_inbox_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return []
def save_todo_inbox(inbox: list) -> None:
"""Speichert die Inbox."""
try:
with open(_todo_inbox_path(), "w", encoding="utf-8") as f:
json.dump(inbox, f, indent=2, ensure_ascii=False)
except Exception:
pass
def send_todo_to_inbox(todo_item: dict, sender_name: str, recipient: str) -> None:
"""Sendet ein To-do in die Inbox (Datei-basiert, für lokale Nutzung)."""
inbox = load_todo_inbox()
from datetime import datetime as _dt
entry = {
"id": int(_dt.now().timestamp() * 1000),
"text": todo_item.get("text", ""),
"date": todo_item.get("date"),
"priority": todo_item.get("priority", 0),
"notes": todo_item.get("notes", ""),
"done": False,
"sender": sender_name,
"recipient": recipient,
"sent_at": _dt.now().isoformat(),
}
inbox.append(entry)
save_todo_inbox(inbox)
# ─── Cloud-Sync (Supabase kostenlose Cloud-DB) ───
def cloud_push_todos(todos: list) -> bool:
"""Schreibt die To-do-Liste nach Supabase."""
import urllib.request
payload = json.dumps({"data": todos}).encode("utf-8")
req = urllib.request.Request(
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.1",
data=payload, method="PATCH",
headers={
"apikey": _SUPABASE_ANON_KEY,
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
"Content-Type": "application/json",
}
)
try:
urllib.request.urlopen(req, timeout=10)
return True
except Exception:
return False
def cloud_pull_todos() -> list:
"""Liest die To-do-Liste aus Supabase. Gibt None zurück bei Fehler."""
import urllib.request
req = urllib.request.Request(
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.1&select=data",
headers={
"apikey": _SUPABASE_ANON_KEY,
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
}
)
try:
resp = urllib.request.urlopen(req, timeout=10)
rows = json.loads(resp.read().decode("utf-8"))
if rows and len(rows) > 0:
return rows[0].get("data", [])
return []
except Exception:
return None
def cloud_get_status() -> str:
"""Gibt den Cloud-Status zurück."""
try:
pulled = cloud_pull_todos()
if pulled is not None:
return "Supabase verbunden"
except Exception:
pass
return ""
def cloud_push_notes(notes: list) -> bool:
"""Schreibt die Notizen-Liste nach Supabase (id=2 in todo_sync)."""
import urllib.request
payload = json.dumps({"id": 2, "data": notes}).encode("utf-8")
req = urllib.request.Request(
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.2",
data=payload, method="PATCH",
headers={
"apikey": _SUPABASE_ANON_KEY,
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
"Content-Type": "application/json",
}
)
try:
urllib.request.urlopen(req, timeout=10)
return True
except Exception:
# Wenn Zeile noch nicht existiert, INSERT
try:
req2 = urllib.request.Request(
f"{_SUPABASE_URL}/rest/v1/todo_sync",
data=payload, method="POST",
headers={
"apikey": _SUPABASE_ANON_KEY,
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
"Content-Type": "application/json",
"Prefer": "return=minimal",
}
)
urllib.request.urlopen(req2, timeout=10)
return True
except Exception:
return False
def cloud_pull_notes() -> list:
"""Liest die Notizen-Liste aus Supabase (id=2)."""
import urllib.request
req = urllib.request.Request(
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.2&select=data",
headers={
"apikey": _SUPABASE_ANON_KEY,
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
}
)
try:
resp = urllib.request.urlopen(req, timeout=10)
rows = json.loads(resp.read().decode("utf-8"))
if rows and len(rows) > 0:
return rows[0].get("data", [])
return []
except Exception:
return None
def _user_profile_config_path():
return os.path.join(get_writable_data_dir(), USER_PROFILE_CONFIG_FILENAME)
def load_user_profile() -> dict:
"""Lädt das gespeicherte Benutzerprofil. Rückgabe: {name, specialty, clinic} oder leeres Dict."""
try:
path = _user_profile_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def save_user_profile(profile: dict) -> None:
"""Speichert das Benutzerprofil."""
try:
with open(_user_profile_config_path(), "w", encoding="utf-8") as f:
json.dump(profile, f, indent=2, ensure_ascii=False)
except Exception:
pass
2026-05-11 08:27:44 +02:00
def _read_json_dict(path: Path) -> Optional[dict]:
try:
with open(path, "r", encoding="utf-8") as f:
o = json.load(f)
return o if isinstance(o, dict) else None
except Exception:
return None
def _profile_needs_legacy_migration(profile_path: Path) -> bool:
if not profile_path.is_file():
return True
data = _read_json_dict(profile_path)
if not data:
return True
return not (str(data.get("practice_id") or "").strip())
def _autotext_needs_legacy_migration(autotext_path: Path) -> bool:
if not autotext_path.is_file():
return True
data = _read_json_dict(autotext_path)
if not data:
return True
ent = data.get("entries")
if not isinstance(ent, dict) or len(ent) == 0:
return True
return False
def _legacy_candidate_roots(dest: Path) -> list[Path]:
seen: set[str] = set()
out: list[Path] = []
appdata = (os.environ.get("APPDATA") or "").strip()
local = (os.environ.get("LOCALAPPDATA") or "").strip()
for name in ("AZA Desktop", "AzA Desktop", "AZA", "AzA"):
for base in (appdata, local):
if not base:
continue
p = Path(base) / name
try:
key = str(p.resolve())
except Exception:
key = str(p)
if key in seen:
continue
seen.add(key)
out.append(p)
return out
def _score_legacy_data_root(root: Path) -> int:
if not root.is_dir():
return 0
score = 0
pp = root / USER_PROFILE_CONFIG_FILENAME
if pp.is_file():
data = _read_json_dict(pp)
if data:
if str(data.get("practice_id") or "").strip():
score += 10_000_000
if str(data.get("empfang_user_id") or "").strip():
score += 500_000
if str(data.get("email") or "").strip():
score += 50_000
try:
score += min(int(pp.stat().st_mtime) % 1_000_000, 999_999)
except Exception:
pass
ap = root / AUTOTEXT_CONFIG_FILENAME
if ap.is_file():
data = _read_json_dict(ap)
if data and isinstance(data.get("entries"), dict):
score += min(len(data["entries"]) * 1000, 200_000)
try:
score += min(int(ap.stat().st_mtime) % 500_000, 499_999)
except Exception:
pass
return score
def _backup_file_before_migrate(dest_file: Path) -> None:
if not dest_file.is_file():
return
try:
ts = time.strftime("%Y%m%d_%H%M%S")
bak = dest_file.with_name(dest_file.name + f".pre_legacy_migrate_{ts}.bak")
shutil.copy2(dest_file, bak)
except Exception:
pass
def _backup_autotext_before_merge(autotext_path: Path) -> None:
if not autotext_path.is_file():
return
try:
ts = time.strftime("%Y%m%d_%H%M%S")
bak = autotext_path.with_name(
autotext_path.name + f".pre_autotext_merge_{ts}.bak",
)
shutil.copy2(autotext_path, bak)
except Exception:
pass
def _collect_autotext_merge_candidate_files(dest: Path) -> list[Path]:
"""Lokale Autotext-JSONs zum Merge (Legacy-/Nachbarordner, verlustarm)."""
out: list[Path] = []
seen: set[str] = set()
def add(p: Path) -> None:
try:
key = str(p.resolve())
except Exception:
key = str(p)
if key in seen or not p.is_file():
return
seen.add(key)
out.append(p)
extra_names = (
"AzA-Medwork",
"AZA-MedWork",
"aza-medwork",
)
roots: list[Path] = []
for r in _legacy_candidate_roots(dest):
roots.append(r)
appdata = (os.environ.get("APPDATA") or "").strip()
local = (os.environ.get("LOCALAPPDATA") or "").strip()
for base in (appdata, local):
if not base:
continue
b = Path(base)
for name in extra_names:
roots.append(b / name)
try:
dest_r = dest.resolve()
except Exception:
dest_r = dest
for root in roots:
try:
if not root.is_dir():
continue
try:
if root.resolve() == dest_r:
continue
except Exception:
pass
p = root / AUTOTEXT_CONFIG_FILENAME
if p.is_file():
add(p)
try:
for sub in root.iterdir():
if not sub.is_dir():
continue
q = sub / AUTOTEXT_CONFIG_FILENAME
if q.is_file():
add(q)
except Exception:
pass
try:
for p2 in root.glob("*.json"):
if p2.is_file() and "autotext" in p2.name.lower():
add(p2)
except Exception:
pass
except Exception:
continue
return out
def merge_legacy_autotext_sources_into_current() -> None:
"""Fuegt Autotext-Eintraege aus plausiblen Legacy-Pfaden hinzu (nur frozen, mit Backup).
Keine Aenderung an Hook/Expansion; nur Persistenz-entries/entry_meta per save_autotext.
"""
if not getattr(sys, "frozen", False):
return
try:
dest = Path(get_writable_data_dir()).resolve()
except Exception:
return
auto_path = dest / AUTOTEXT_CONFIG_FILENAME
try:
current = load_autotext()
except Exception:
return
base_entries: dict = dict(current.get("entries") or {})
merged_meta_raw = current.get("entry_meta")
merged_meta: dict = (
dict(merged_meta_raw) if isinstance(merged_meta_raw, dict) else {}
)
candidates = _collect_autotext_merge_candidate_files(dest)
changed = False
for src in candidates:
try:
if auto_path.is_file() and src.resolve() == auto_path.resolve():
continue
except Exception:
pass
data = _read_json_dict(src)
if not data:
continue
ent = data.get("entries")
if not isinstance(ent, dict):
continue
for k, v in _coerce_autotext_entry_values(ent).items():
if not k:
continue
nv = v or ""
if k not in base_entries:
if nv:
base_entries[k] = nv
changed = True
continue
cur = str(base_entries.get(k) or "")
if nv and nv != cur:
if not cur or len(nv) > len(cur):
base_entries[k] = nv
changed = True
em = data.get("entry_meta")
if isinstance(em, dict):
for mk, mv in em.items():
if mk not in merged_meta:
merged_meta[mk] = mv
changed = True
if not changed:
return
if auto_path.is_file():
_backup_autotext_before_merge(auto_path)
current["entries"] = base_entries
current["entry_meta"] = merged_meta
try:
save_autotext(current)
except Exception:
pass
def migrate_legacy_writable_data_if_needed() -> None:
"""Uebernimmt fehlende Profil-/Autotext-/Geraetedaten aus bekannten Legacy-Ordnern (nur frozen).
Wird vor load_user_profile aufgerufen, damit practice_id und Autotexte nach einer
Neuinstallation wieder zur Verfuegung stehen, falls der Uninstaller AppData nicht
geloescht hat oder Daten unter einem aehnlichen Pfadnamen liegen.
"""
if not getattr(sys, "frozen", False):
return
try:
dest = Path(get_writable_data_dir()).resolve()
except Exception:
return
try:
os.makedirs(str(dest), exist_ok=True)
except Exception:
pass
prof = dest / USER_PROFILE_CONFIG_FILENAME
auto = dest / AUTOTEXT_CONFIG_FILENAME
korr = dest / KORREKTUREN_CONFIG_FILENAME
act = dest / ACTIVATION_CONFIG_FILENAME
lic = dest / "license_token.txt"
dev = dest / "device_id.txt"
need_prof = _profile_needs_legacy_migration(prof)
need_auto = _autotext_needs_legacy_migration(auto)
need_korr = not korr.is_file()
need_act = not act.is_file()
need_lic = not lic.is_file()
need_dev = not dev.is_file()
if not (need_prof or need_auto or need_korr or need_act or need_lic or need_dev):
try:
merge_legacy_autotext_sources_into_current()
except Exception:
pass
return
best: Optional[Path] = None
best_score = 0
for root in _legacy_candidate_roots(dest):
if not root.is_dir():
continue
try:
if root.resolve() == dest:
continue
except Exception:
continue
sc = _score_legacy_data_root(root)
if sc > best_score:
best_score = sc
best = root
if best is None or best_score <= 0:
try:
merge_legacy_autotext_sources_into_current()
except Exception:
pass
return
migrations: list[tuple[Path, Path, bool]] = []
if need_prof:
sp = best / USER_PROFILE_CONFIG_FILENAME
if sp.is_file():
migrations.append((sp, prof, True))
if need_auto:
sa = best / AUTOTEXT_CONFIG_FILENAME
if sa.is_file():
migrations.append((sa, auto, True))
if need_korr:
sk = best / KORREKTUREN_CONFIG_FILENAME
if sk.is_file():
migrations.append((sk, korr, False))
if need_act:
sact = best / ACTIVATION_CONFIG_FILENAME
if sact.is_file():
migrations.append((sact, act, False))
if need_lic:
sl = best / "license_token.txt"
if sl.is_file():
migrations.append((sl, lic, False))
if need_dev:
sd = best / "device_id.txt"
if sd.is_file():
migrations.append((sd, dev, False))
for src, dst, is_pr_or_auto in migrations:
if is_pr_or_auto:
if dst == prof and dst.is_file() and not _profile_needs_legacy_migration(dst):
continue
if dst == auto and dst.is_file() and not _autotext_needs_legacy_migration(dst):
continue
elif dst.is_file():
continue
if dst.is_file():
_backup_file_before_migrate(dst)
try:
shutil.copy2(src, dst)
except Exception:
pass
try:
merge_legacy_autotext_sources_into_current()
except Exception:
pass
2026-03-25 22:03:39 +01:00
def extract_date_from_todo_text(text: str):
"""Erkennt deutsche Datumsangaben im diktierten Text und gibt (bereinigter_text, date_obj_or_None) zurück.
Erkannte Muster:
- "heute", "morgen", "übermorgen"
- "nächsten Montag/Dienstag/…", "am Montag", "kommenden Freitag"
- "in 3 Tagen", "in einer Woche", "in zwei Wochen", "in einem Monat"
- "bis 20. März", "am 15. Februar 2026", "bis 3.4.", "bis 03.04.2026"
- "bis Ende Woche", "bis Ende Monat"
"""
import re
from datetime import date, timedelta
if not text or not text.strip():
return text, None
original = text
today = date.today()
WOCHENTAGE = {
"montag": 0, "dienstag": 1, "mittwoch": 2, "donnerstag": 3,
"freitag": 4, "samstag": 5, "sonntag": 6,
}
MONATE = {
"januar": 1, "februar": 2, "märz": 3, "maerz": 3, "april": 4,
"mai": 5, "juni": 6, "juli": 7, "august": 8, "september": 9,
"oktober": 10, "november": 11, "dezember": 12,
"jan": 1, "feb": 2, "mär": 3, "mar": 3, "apr": 4,
"jun": 6, "jul": 7, "aug": 8, "sep": 9, "okt": 10, "nov": 11, "dez": 12,
}
ZAHLWOERTER = {
"einem": 1, "einer": 1, "eins": 1, "ein": 1, "zwei": 2, "drei": 3,
"vier": 4, "fünf": 5, "fuenf": 5, "sechs": 6, "sieben": 7, "acht": 8,
"neun": 9, "zehn": 10, "elf": 11, "zwölf": 12, "zwoelf": 12,
}
lowered = text.lower().strip()
found_date = None
pattern_match = None
# "heute"
m = re.search(r'\b(bis\s+)?heute\b', lowered)
if m:
found_date = today
pattern_match = m
# "morgen"
if not found_date:
m = re.search(r'\b(bis\s+)?morgen\b', lowered)
if m:
found_date = today + timedelta(days=1)
pattern_match = m
# "übermorgen"
if not found_date:
m = re.search(r'\b(bis\s+)?[üu]bermorgen\b', lowered)
if m:
found_date = today + timedelta(days=2)
pattern_match = m
# "bis Ende Woche"
if not found_date:
m = re.search(r'\bbis\s+ende\s+(?:der\s+)?woche\b', lowered)
if m:
days_until_friday = (4 - today.weekday()) % 7
if days_until_friday == 0:
days_until_friday = 7
found_date = today + timedelta(days=days_until_friday)
pattern_match = m
# "bis Ende Monat"
if not found_date:
m = re.search(r'\bbis\s+ende\s+(?:des\s+)?monat[s]?\b', lowered)
if m:
import calendar as cal_m
last_day = cal_m.monthrange(today.year, today.month)[1]
found_date = date(today.year, today.month, last_day)
pattern_match = m
# "nächsten/kommenden/am Montag/Dienstag/…"
if not found_date:
wt_pattern = "|".join(WOCHENTAGE.keys())
m = re.search(
r'\b(?:(?:bis\s+)?(?:n[äa]chsten?|kommenden?|am)\s+)(' + wt_pattern + r')\b',
lowered,
)
if m:
target_wd = WOCHENTAGE[m.group(1)]
days_ahead = (target_wd - today.weekday()) % 7
if days_ahead == 0:
days_ahead = 7
found_date = today + timedelta(days=days_ahead)
pattern_match = m
# "in X Tagen/Wochen/Monaten"
if not found_date:
m = re.search(
r'\bin\s+(\d+|' + '|'.join(ZAHLWOERTER.keys()) + r')\s+(tag(?:en?)?|woche[n]?|monat(?:en?)?)\b',
lowered,
)
if m:
num_str = m.group(1)
num = ZAHLWOERTER.get(num_str, None)
if num is None:
try:
num = int(num_str)
except ValueError:
num = 1
unit = m.group(2)
if "tag" in unit:
found_date = today + timedelta(days=num)
elif "woche" in unit:
found_date = today + timedelta(weeks=num)
elif "monat" in unit:
new_month = today.month + num
new_year = today.year + (new_month - 1) // 12
new_month = ((new_month - 1) % 12) + 1
import calendar as cal_m2
max_day = cal_m2.monthrange(new_year, new_month)[1]
found_date = date(new_year, new_month, min(today.day, max_day))
pattern_match = m
# "bis/am 20. März (2026)" oder "bis/am 20. 3. (2026)"
if not found_date:
monat_pattern = "|".join(MONATE.keys())
m = re.search(
r'\b(?:bis|am|vom)\s+(\d{1,2})\.\s*(' + monat_pattern + r')(?:\s+(\d{2,4}))?\b',
lowered,
)
if m:
day = int(m.group(1))
month = MONATE.get(m.group(2), None)
year = today.year
if m.group(3):
year = int(m.group(3))
if year < 100:
year += 2000
if month and 1 <= day <= 31:
try:
found_date = date(year, month, day)
if found_date < today and not m.group(3):
found_date = date(year + 1, month, day)
pattern_match = m
except ValueError:
found_date = None
# "bis/am 20.3." oder "bis/am 20.03.2026"
if not found_date:
m = re.search(
r'\b(?:bis|am|vom)\s+(\d{1,2})\.(\d{1,2})\.(?:(\d{2,4}))?\b',
lowered,
)
if m:
day = int(m.group(1))
month = int(m.group(2))
year = today.year
if m.group(3):
year = int(m.group(3))
if year < 100:
year += 2000
if 1 <= month <= 12 and 1 <= day <= 31:
try:
found_date = date(year, month, day)
if found_date < today and not m.group(3):
found_date = date(year + 1, month, day)
pattern_match = m
except ValueError:
found_date = None
# Standalone Datum: "20. März", "15. Februar 2026" (ohne bis/am Präfix)
if not found_date:
monat_pattern = "|".join(MONATE.keys())
m = re.search(
r'\b(\d{1,2})\.\s*(' + monat_pattern + r')(?:\s+(\d{2,4}))?\b',
lowered,
)
if m:
day = int(m.group(1))
month = MONATE.get(m.group(2), None)
year = today.year
if m.group(3):
year = int(m.group(3))
if year < 100:
year += 2000
if month and 1 <= day <= 31:
try:
found_date = date(year, month, day)
if found_date < today and not m.group(3):
found_date = date(year + 1, month, day)
pattern_match = m
except ValueError:
found_date = None
if found_date and pattern_match:
cleaned = original[:pattern_match.start()] + original[pattern_match.end():]
cleaned = re.sub(r'\s{2,}', ' ', cleaned).strip()
cleaned = re.sub(r'^[,.\s]+|[,.\s]+$', '', cleaned).strip()
return cleaned, found_date
return text, None
def load_saved_model() -> str:
"""Liest die zuletzt gewählte KG-Modell-ID aus der Config-Datei."""
try:
path = _config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
model = f.read().strip()
if model in ALLOWED_SUMMARY_MODELS:
return model
except Exception:
pass
return DEFAULT_SUMMARY_MODEL
def save_model(model: str) -> None:
"""Speichert die gewählte KG-Modell-ID in der Config-Datei."""
if model not in ALLOWED_SUMMARY_MODELS:
return
try:
with open(_config_path(), "w", encoding="utf-8") as f:
f.write(model)
except Exception:
pass
def _templates_config_path():
return os.path.join(get_writable_data_dir(), TEMPLATES_CONFIG_FILENAME)
# ─── KG Detail-Level (Kürzer/Ausführlicher-Stufe) ───
def _kg_detail_level_path():
return os.path.join(get_writable_data_dir(), KG_DETAIL_LEVEL_CONFIG_FILENAME)
def load_kg_detail_level() -> int:
"""Lädt die gespeicherte KG-Detailstufe. 0=Standard, negativ=kürzer, positiv=ausführlicher."""
try:
path = _kg_detail_level_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return int(f.read().strip())
except Exception:
pass
return 0
def save_kg_detail_level(level: int) -> None:
"""Speichert die KG-Detailstufe."""
try:
with open(_kg_detail_level_path(), "w", encoding="utf-8") as f:
f.write(str(level))
except Exception:
pass
def get_kg_detail_instruction(level: int) -> str:
"""Gibt die passende Anweisung für die KG-Erstellung basierend auf dem Detail-Level zurück."""
if level == 0:
return ""
if level <= -3:
return ("\n\nWICHTIG STIL: Extrem knapp und kompakt. Nur Schlüsselwörter und Diagnosen mit ICD-10. "
"Keine ganzen Sätze, nur Stichpunkte. Maximal komprimiert.")
if level == -2:
return ("\n\nWICHTIG STIL: Sehr kurz und kompakt. Kurze Stichpunkte, "
"keine ausführlichen Beschreibungen. Nur das Wesentliche.")
if level == -1:
return ("\n\nWICHTIG STIL: Eher kurz und prägnant. Knapp formulieren, "
"auf das Wesentliche beschränken.")
if level == 1:
return ("\n\nWICHTIG STIL: Etwas ausführlicher als normal. Stichpunkte zu kurzen Sätzen ausformulieren, "
"klinische Details ergänzen wo sinnvoll.")
if level == 2:
return ("\n\nWICHTIG STIL: Ausführlich. Vollständige Sätze, detaillierte klinische Beschreibungen, "
"differentialdiagnostische Überlegungen wo relevant.")
if level >= 3:
return ("\n\nWICHTIG STIL: Sehr ausführlich und detailliert. Vollständige Sätze, "
"ausführliche klinische Beschreibungen, differentialdiagnostische Überlegungen, "
"detaillierte Therapiebegründungen. Umfassende Dokumentation.")
return ""
# ─── SOAP-Abschnitts-Detailstufen (S, O, D einzeln steuerbar) ───
def _soap_section_levels_path():
return os.path.join(get_writable_data_dir(), SOAP_SECTION_LEVELS_CONFIG_FILENAME)
def load_soap_section_levels() -> dict:
"""Lädt die individuellen SOAP-Detailstufen: {"S": 0, "O": 0, "D": 0}."""
try:
path = _soap_section_levels_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return {k: int(data.get(k, 0)) for k in _SOAP_SECTIONS}
except Exception:
pass
return {k: 0 for k in _SOAP_SECTIONS}
def save_soap_section_levels(levels: dict) -> None:
"""Speichert die individuellen SOAP-Detailstufen."""
try:
with open(_soap_section_levels_path(), "w", encoding="utf-8") as f:
json.dump({k: int(levels.get(k, 0)) for k in _SOAP_SECTIONS}, f)
except Exception:
pass
def get_soap_section_instruction(levels: dict) -> str:
"""Erzeugt eine Prompt-Anweisung basierend auf individuellen SOAP-Section-Levels."""
parts = []
for key in _SOAP_SECTIONS:
lv = levels.get(key, 0)
if lv == 0:
continue
name = _SOAP_LABELS[key]
if lv <= -3:
parts.append(f"- {name}: Maximal komprimiert nur die wichtigsten 1-2 Stichworte pro Punkt. Vorhandene Informationen beibehalten, nur kürzer formulieren.")
elif lv == -2:
parts.append(f"- {name}: Deutlich kürzer formulieren gleiche Fakten, aber knapper auf den Punkt gebracht.")
elif lv == -1:
parts.append(f"- {name}: Leicht kürzer formulieren gleicher Inhalt, etwas knappere Wortwahl.")
elif lv == 1:
parts.append(f"- {name}: Leicht ausführlicher formulieren gleiche Fakten in vollständigeren Sätzen statt Stichpunkten. KEINE neuen Informationen erfinden.")
elif lv == 2:
parts.append(f"- {name}: Ausführlicher formulieren vorhandene Stichpunkte in ganzen Sätzen ausformulieren. NUR vorhandene Informationen verwenden, NICHTS dazuerfinden.")
elif lv >= 3:
parts.append(f"- {name}: In vollständigen, ausführlichen Sätzen ausformulieren. Alle vorhandenen Punkte in Fliesstext umwandeln. STRIKT NUR vorhandene Informationen verwenden KEINE neuen Fakten, Befunde oder Details erfinden.")
if not parts:
return ""
return ("\n\nWICHTIG INDIVIDUELLE ABSCHNITTSLÄNGEN (zwingend einhalten):\n"
"ACHTUNG: Ausführlicher bedeutet NUR längere/vollständigere Formulierungen NIEMALS neue Fakten, "
"Befunde oder Details erfinden, die nicht im Original stehen!\n"
+ "\n".join(parts))
def load_templates_text() -> str:
"""Liest den gespeicherten Template-Text (z. B. Fachrichtung/Kontext für die KI)."""
try:
path = _templates_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
pass
return ""
def save_templates_text(text: str) -> None:
"""Speichert den Template-Text."""
try:
with open(_templates_config_path(), "w", encoding="utf-8") as f:
f.write((text or "").strip())
except Exception:
pass
def strip_kg_warnings(text: str) -> str:
"""Entfernt typische Warn-/Hinweisphrasen aus der KG-Ausgabe (z. B. Verordnung/Beginn dokumentiert)."""
import re
def remove_warning_block(m):
content = m.group(1)
if any(
x in content
for x in (
"dokumentiert",
"Weiterverordnung",
"Überprüfung erforderlich",
)
):
return ""
return m.group(0)
result = re.sub(r"\(([^)]*)\)", remove_warning_block, text)
result = re.sub(r"(?<=\S) +", " ", result)
result = re.sub(r"\n{3,}", "\n\n", result)
# Therapie/Procedere-Format bereinigen
result = re.sub(r"Therapie/Procedere\s*:", "Therapie:", result, flags=re.IGNORECASE)
result = re.sub(r"^-\s*Therapie\s*:\s*", "- ", result, flags=re.MULTILINE | re.IGNORECASE)
result = re.sub(r"^-\s*Procedere\s*:\s*", "- ", result, flags=re.MULTILINE | re.IGNORECASE)
result = re.sub(r"\n{3,}", "\n\n", result)
_KG_HEADERS = (
r"Anamnese|Subjektiv|Sozialanamnese|Familienanamnese|"
r"Objektiv|Beurteilung|Diagnose|Diagnosen|Therapie|Procedere"
)
_BULLET = r"[ \t]{0,6}[\u2022\-\u2013]"
for _ in range(5):
result = re.sub(
r"(^" + _BULLET + r".*)\n\n+(" + _BULLET + r")",
r"\1\n\2", result, flags=re.MULTILINE)
for _ in range(3):
result = re.sub(
r"(?m)^((?:" + _KG_HEADERS + r")(?:\s*\(.*?\))?:?\s*)\n\s*\n(?=" + _BULLET + r")",
r"\1\n", result)
result = re.sub(
r"(?m)([^\n])\n(?=(?:" + _KG_HEADERS + r")(?:\s*\(.*?\))?:?\s*$)",
r"\1\n\n", result)
result = re.sub(r"^[ \t]*(\u2022)", r" \1", result, flags=re.MULTILINE)
return result.strip()
def _is_warning_comment(text: str) -> bool:
"""True, wenn der Klammer-Text eine Vorsicht/Warnung für den Arzt darstellt."""
t = text.lower().strip()
return any(kw in t for kw in COMMENT_KEYWORDS)
def _is_icd10_code(text: str) -> bool:
"""True, wenn der Klammer-Text ein ICD-10-GM-Code ist (z. B. L57.0, M79.1). Diese bleiben in der KG."""
import re
t = text.strip()
return bool(re.match(r"^[A-Z][0-9]{2}(\.[0-9]{1,2})?$", t, re.IGNORECASE))
def extract_kg_comments(text: str) -> tuple:
"""Entfernt Klammer-Inhalte aus der KG, außer ICD-10-Codes. Nur Vorsicht/Warnzeichen kommen ins graue Kommentarfeld."""
import re
lines = text.split("\n")
cleaned_lines = []
comments = []
for line in lines:
rest = line
line_comments = []
new_rest = ""
last_end = 0
for m in re.finditer(r"\(([^)]*)\)", rest):
content = m.group(1).strip()
if _is_icd10_code(content):
new_rest += rest[last_end : m.end()]
else:
new_rest += rest[last_end : m.start()]
if content:
line_comments.append(content)
last_end = m.end()
new_rest += rest[last_end:]
new_rest = re.sub(r" +", " ", new_rest).strip()
if line_comments:
context = new_rest.strip()
if context.startswith("- "):
context = context[2:].strip()
for c in line_comments:
if _is_warning_comment(c):
comments.append(f"- {context}: {c}")
cleaned_lines.append(new_rest)
cleaned = "\n".join(cleaned_lines)
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned).strip()
comments_text = "\n".join(comments) if comments else ""
return cleaned, comments_text
# ─── SOAP-Reihenfolge ───
def _soap_order_config_path():
return os.path.join(get_writable_data_dir(), SOAP_ORDER_CONFIG_FILENAME)
def _legacy_load_soap_order() -> list:
"""Migriert alte Einzeldatei-Konfiguration (nur einmalig beim ersten Start mit Presets)."""
try:
path = _soap_order_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return data
except Exception:
pass
return None
def get_soap_order_instruction(order: list, visibility=None) -> str:
"""Erzeugt eine Prompt-Anweisung für die benutzerdefinierte SOAP-Reihenfolge (unter Berücksichtigung der Sichtbarkeit)."""
visible_order = order
if visibility:
visible_order = [k for k in order if visibility.get(k, True)]
if visible_order == DEFAULT_SOAP_ORDER:
return ""
names = [_SOAP_LABELS.get(k, k) for k in visible_order]
numbered = "\n".join(f" {i+1}. {n}" for i, n in enumerate(names))
return (
"\n\nWICHTIG BENUTZERDEFINIERTE ABSCHNITTS-REIHENFOLGE (zwingend einhalten):\n"
"Ordne die Abschnitte der Krankengeschichte EXAKT in folgender Reihenfolge:\n"
f"{numbered}\n"
"Abschnitte, die nicht vorhanden sind, weglassen aber die Reihenfolge der vorhandenen Abschnitte MUSS dieser Vorgabe entsprechen."
)
# ─── SOAP-Sichtbarkeit ───
def _soap_visibility_config_path():
return os.path.join(get_writable_data_dir(), SOAP_VISIBILITY_CONFIG_FILENAME)
def _legacy_load_soap_visibility() -> dict:
"""Migriert alte Einzeldatei-Konfiguration."""
try:
path = _soap_visibility_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return {k: bool(data.get(k, True)) for k in DEFAULT_SOAP_ORDER}
except Exception:
pass
return None
def get_soap_visibility_instruction(visibility: dict) -> str:
"""Erzeugt eine Prompt-Anweisung für ausgeblendete SOAP-Abschnitte."""
hidden = [_SOAP_LABELS.get(k, k) for k in DEFAULT_SOAP_ORDER if not visibility.get(k, True)]
if not hidden:
return ""
hidden_str = ", ".join(hidden)
return (
f"\n\nWICHTIG AUSGEBLENDETE ABSCHNITTE (zwingend einhalten):\n"
f"Folgende Abschnitte dürfen NICHT in der Krankengeschichte erscheinen: {hidden_str}.\n"
f"Lasse diese Abschnitte komplett weg keine Überschrift, kein Inhalt."
)
# ─── SOAP-Profile (KG) ───
def _soap_presets_path():
return os.path.join(get_writable_data_dir(), SOAP_PRESETS_CONFIG_FILENAME)
def _default_soap_presets():
return {
"active": 0,
"presets": [
{"name": f"Profil {i+1}",
"order": list(DEFAULT_SOAP_ORDER),
"visibility": {k: True for k in DEFAULT_SOAP_ORDER}}
for i in range(NUM_SOAP_PRESETS)
],
}
def load_soap_presets() -> dict:
try:
path = _soap_presets_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict) and "presets" in data:
for p in data["presets"]:
existing = set(p.get("order", []))
for k in DEFAULT_SOAP_ORDER:
if k not in existing:
p["order"].insert(0, k)
if k not in p.get("visibility", {}):
p["visibility"][k] = True
return data
except Exception:
pass
defaults = _default_soap_presets()
legacy_order = _legacy_load_soap_order()
legacy_vis = _legacy_load_soap_visibility()
if legacy_order or legacy_vis:
p0 = defaults["presets"][0]
if legacy_order:
for k in DEFAULT_SOAP_ORDER:
if k not in legacy_order:
legacy_order.insert(0, k)
p0["order"] = legacy_order
if legacy_vis:
p0["visibility"] = legacy_vis
return defaults
def save_soap_presets(data: dict) -> None:
try:
with open(_soap_presets_path(), "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception:
pass
def get_active_soap_preset(data: dict = None) -> dict:
if data is None:
data = load_soap_presets()
idx = data.get("active", 0)
presets = data.get("presets", [])
if 0 <= idx < len(presets):
return presets[idx]
return {"order": list(DEFAULT_SOAP_ORDER), "visibility": {k: True for k in DEFAULT_SOAP_ORDER}}
def load_soap_order() -> list:
"""Lädt die SOAP-Reihenfolge des aktiven Profils."""
preset = get_active_soap_preset()
return list(preset.get("order", DEFAULT_SOAP_ORDER))
def load_soap_visibility() -> dict:
"""Lädt die SOAP-Sichtbarkeit des aktiven Profils."""
preset = get_active_soap_preset()
vis = preset.get("visibility", {})
return {k: bool(vis.get(k, True)) for k in DEFAULT_SOAP_ORDER}
# ─── Brief-Profile ───
def _brief_presets_path():
return os.path.join(get_writable_data_dir(), BRIEF_PRESETS_CONFIG_FILENAME)
def _default_brief_presets():
import copy
return {
"active": 0,
"presets": [copy.deepcopy(p) for p in BRIEF_PROFILE_DEFAULTS],
}
def load_brief_presets() -> dict:
try:
path = _brief_presets_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict) and "presets" in data:
return data
except Exception:
pass
return _default_brief_presets()
def save_brief_presets(data: dict) -> None:
try:
with open(_brief_presets_path(), "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception:
pass
def get_active_brief_preset() -> dict:
data = load_brief_presets()
idx = data.get("active", 0)
presets = data.get("presets", [])
if 0 <= idx < len(presets):
return presets[idx]
import copy
return copy.deepcopy(BRIEF_PROFILE_DEFAULTS[0])
def get_brief_order_instruction() -> str:
preset = get_active_brief_preset()
order = preset.get("order", [])
labels = preset.get("labels", {})
vis = preset.get("visibility", {})
visible = [k for k in order if vis.get(k, True)]
if not visible:
return ""
names = [labels.get(k, k) for k in visible]
numbered = "\n".join(f" {i+1}. {n}" for i, n in enumerate(names))
hidden = [labels.get(k, k) for k in order if not vis.get(k, True)]
diag_keys = {"DI"}
sentence_keys = {"AN", "BE", "ZF", "EP", "AE", "VL"}
diag_names = [labels.get(k, k) for k in visible if k in diag_keys]
sentence_names = [labels.get(k, k) for k in visible if k in sentence_keys]
parts = [
"\n\nWICHTIG BRIEF-ABSCHNITTSREIHENFOLGE UND FORMATIERUNG (zwingend einhalten):\n"
"Verwende EXAKT folgende Abschnitte in dieser Reihenfolge als Überschriften:\n"
f"{numbered}\n"
"Abschnitte, die nicht vorhanden sind, weglassen."
]
if hidden:
parts.append(
f"Folgende Abschnitte NICHT im Brief verwenden: {', '.join(hidden)}."
)
if diag_names:
parts.append(
f"FORMATIERUNG Diagnosen ({', '.join(diag_names)}): Stichwortartig als Aufzählung "
"jede Diagnose eine Zeile mit ICD-10-GM-Code in eckigen Klammern."
)
if sentence_names:
parts.append(
f"FORMATIERUNG ({', '.join(sentence_names)}): In vollständigen, ausformulierten Sätzen schreiben "
"wie in einem ärztlichen Brief üblich. Keine reinen Stichpunkte."
)
return "\n".join(parts)
# ─── Launcher-Startpräferenz ───
def _launcher_config_path():
return os.path.join(get_writable_data_dir(), LAUNCHER_CONFIG_FILENAME)
def load_launcher_prefs() -> dict:
"""Lädt Launcher-Startpräferenz. Rückgabe: {default_module, auto_open}."""
try:
path = _launcher_config_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return {
"default_module": data.get("default_module", ""),
"auto_open": bool(data.get("auto_open", False)),
}
except Exception:
pass
return {"default_module": "", "auto_open": False}
def save_launcher_prefs(default_module: str, auto_open: bool) -> None:
"""Speichert Launcher-Startpräferenz."""
try:
with open(_launcher_config_path(), "w", encoding="utf-8") as f:
json.dump({"default_module": default_module, "auto_open": auto_open}, f, indent=2)
except Exception:
pass
# ─── Briefstil-Profile ───
SYSTEM_STYLE_PROFILES = ["Klinischer Bericht", "KISIM Bericht"]
def get_all_style_profile_choices() -> list:
"""Gibt die vollstaendige Profil-Liste zurueck: (keins), System-Profile, dann User-Profile."""
user_profiles = load_brief_style_profiles()
user_names = [k for k in user_profiles if k != "_active_profile"]
return ["(keins)"] + SYSTEM_STYLE_PROFILES + user_names
def _brief_style_profiles_path():
from aza_config import BRIEF_STYLE_PROFILES_FILENAME
return os.path.join(get_writable_data_dir(), BRIEF_STYLE_PROFILES_FILENAME)
def load_brief_style_profiles() -> dict:
"""Lädt alle gespeicherten Briefstil-Profile. Format: {profile_name: {...}}"""
try:
path = _brief_style_profiles_path()
if os.path.isfile(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return data
except Exception:
pass
return {}
def save_brief_style_profiles(profiles: dict) -> None:
"""Speichert alle Briefstil-Profile."""
try:
with open(_brief_style_profiles_path(), "w", encoding="utf-8") as f:
json.dump(profiles, f, indent=2, ensure_ascii=False)
except Exception:
pass
def get_active_brief_style_profile_name() -> str:
"""Gibt den Namen des aktiven Stilprofils zurück (leer = keins aktiv)."""
profiles = load_brief_style_profiles()
return profiles.get("_active_profile", "")
def set_active_brief_style_profile(name: str) -> None:
"""Setzt das aktive Stilprofil."""
profiles = load_brief_style_profiles()
profiles["_active_profile"] = name
save_brief_style_profiles(profiles)
def get_active_brief_style_prompt() -> str:
"""Gibt den Stil-Prompt des aktiven Profils zurück (leer = kein Profil aktiv)."""
profiles = load_brief_style_profiles()
active = profiles.get("_active_profile", "")
if not active:
return ""
profile = profiles.get(active, {})
return profile.get("style_prompt", "")
def extract_text_from_docx(file_path: str) -> str:
"""Extrahiert den vollständigen Text aus einer DOCX-Datei."""
try:
from docx import Document
doc = Document(file_path)
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
return "\n".join(paragraphs)
except Exception as e:
return f"[FEHLER beim Lesen von {os.path.basename(file_path)}: {e}]"
def extract_texts_from_docx_files(file_paths: list) -> list:
"""Extrahiert Texte aus mehreren DOCX-Dateien. Gibt Liste von (Dateiname, Text) zurück."""
results = []
for fp in file_paths:
basename = os.path.basename(fp)
text = extract_text_from_docx(fp)
results.append((basename, text))
return results