2743 lines
95 KiB
Python
2743 lines
95 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Standalone persistence/load-save utilities extracted from basis14.py.
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import sys
|
||
import time
|
||
import re
|
||
import html
|
||
import hashlib
|
||
import threading
|
||
from difflib import SequenceMatcher
|
||
from datetime import datetime, timedelta
|
||
from tkinter import messagebox
|
||
|
||
from aza_config import (
|
||
CONFIG_FILENAME,
|
||
WINDOW_CONFIG_FILENAME,
|
||
SIGNATURE_CONFIG_FILENAME,
|
||
KORREKTUREN_CONFIG_FILENAME,
|
||
ABLAGE_BASE_DIR,
|
||
ABLAGE_SUBFOLDERS,
|
||
ABLAGE_LABELS,
|
||
PRUEFEN_WINDOW_CONFIG_FILENAME,
|
||
ORDNER_WINDOW_CONFIG_FILENAME,
|
||
TEXT_WINDOW_CONFIG_FILENAME,
|
||
DIKTAT_WINDOW_CONFIG_FILENAME,
|
||
DISKUSSION_WINDOW_CONFIG_FILENAME,
|
||
SETTINGS_WINDOW_CONFIG_FILENAME,
|
||
TEXTBLOECKE_CONFIG_FILENAME,
|
||
TEMPLATES_CONFIG_FILENAME,
|
||
OP_BERICHT_TEMPLATE_CONFIG_FILENAME,
|
||
ARZTBRIEF_VORLAGE_CONFIG_FILENAME,
|
||
TODO_CONFIG_FILENAME,
|
||
TODO_WINDOW_CONFIG_FILENAME,
|
||
TODO_INBOX_CONFIG_FILENAME,
|
||
TODO_SETTINGS_CONFIG_FILENAME,
|
||
NOTES_CONFIG_FILENAME,
|
||
CHECKLIST_CONFIG_FILENAME,
|
||
USER_PROFILE_CONFIG_FILENAME,
|
||
OPACITY_CONFIG_FILENAME,
|
||
AUTOTEXT_CONFIG_FILENAME,
|
||
FONT_SCALE_CONFIG_FILENAME,
|
||
BUTTON_SCALE_CONFIG_FILENAME,
|
||
TOKEN_USAGE_CONFIG_FILENAME,
|
||
KG_DETAIL_LEVEL_CONFIG_FILENAME,
|
||
SOAP_SECTION_LEVELS_CONFIG_FILENAME,
|
||
FONT_SIZES_CONFIG_FILENAME,
|
||
PANED_POSITIONS_CONFIG_FILENAME,
|
||
DEFAULT_OPACITY,
|
||
MIN_OPACITY,
|
||
DEFAULT_FONT_SCALE,
|
||
MIN_FONT_SCALE,
|
||
MAX_FONT_SCALE,
|
||
DEFAULT_BUTTON_SCALE,
|
||
MIN_BUTTON_SCALE,
|
||
MAX_BUTTON_SCALE,
|
||
_SOAP_SECTIONS,
|
||
_SOAP_LABELS,
|
||
_DEFAULT_KORREKTUREN,
|
||
ARZTBRIEF_VORLAGE_DEFAULT,
|
||
KOGU_GRUSS_OPTIONS,
|
||
KOGU_GRUSS_CONFIG_FILENAME,
|
||
KOGU_TEMPLATES_CONFIG_FILENAME,
|
||
DISKUSSION_VORLAGE_CONFIG_FILENAME,
|
||
ALLOWED_SUMMARY_MODELS,
|
||
DEFAULT_SUMMARY_MODEL,
|
||
COMMENT_KEYWORDS,
|
||
_SUPABASE_URL,
|
||
_SUPABASE_ANON_KEY,
|
||
SOAP_ORDER_CONFIG_FILENAME,
|
||
SOAP_VISIBILITY_CONFIG_FILENAME,
|
||
SOAP_PRESETS_CONFIG_FILENAME,
|
||
DEFAULT_SOAP_ORDER,
|
||
NUM_SOAP_PRESETS,
|
||
BRIEF_PRESETS_CONFIG_FILENAME,
|
||
NUM_BRIEF_PRESETS,
|
||
BRIEF_PROFILE_DEFAULTS,
|
||
LAUNCHER_CONFIG_FILENAME,
|
||
DEFAULT_TOKEN_QUOTA,
|
||
SOFT_LOCK_THRESHOLD,
|
||
AVG_TOKENS_PER_REPORT,
|
||
get_writable_data_dir,
|
||
)
|
||
|
||
|
||
def _config_path():
|
||
return os.path.join(get_writable_data_dir(), CONFIG_FILENAME)
|
||
|
||
|
||
def _window_config_path():
|
||
return os.path.join(get_writable_data_dir(), WINDOW_CONFIG_FILENAME)
|
||
|
||
|
||
def _clamp_geometry_str(geom: str, min_w: int, min_h: int) -> str:
|
||
"""Begrenzt gespeicherte Geometrie-String auf Mindestgröße (alle Buttons sichtbar)."""
|
||
if not geom or "x" not in geom:
|
||
return f"{min_w}x{min_h}"
|
||
parts = geom.replace("+", "x").split("x")
|
||
try:
|
||
w = max(min_w, int(parts[0].strip()))
|
||
h = max(min_h, int(parts[1].strip()))
|
||
if len(parts) >= 4:
|
||
return f"{w}x{h}+{parts[2].strip()}+{parts[3].strip()}"
|
||
return f"{w}x{h}"
|
||
except (ValueError, IndexError):
|
||
return f"{min_w}x{min_h}"
|
||
|
||
|
||
def load_window_geometry():
|
||
"""Liest gespeicherte Fenstergröße, Position, Sash (Breite) und Transkript-Höhe. Rückgabe: (w, h, x, y, sash_h, sash_v) oder None."""
|
||
try:
|
||
path = _window_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
parts = f.read().strip().split()
|
||
if len(parts) >= 4:
|
||
w, h, x, y = int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3])
|
||
if w >= 400 and h >= 300:
|
||
sash_h = int(parts[4]) if len(parts) >= 5 else None
|
||
sash_v = int(parts[5]) if len(parts) >= 6 else None
|
||
return (w, h, x, y, sash_h, sash_v)
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def save_window_geometry(
|
||
width: int, height: int, x: int, y: int, sash: int = None, sash_transcript: int = None
|
||
) -> None:
|
||
"""Speichert Fenstergröße, Position, Sash (Breite) und Transkript-Höhe dauerhaft."""
|
||
try:
|
||
with open(_window_config_path(), "w", encoding="utf-8") as f:
|
||
if sash is not None and sash_transcript is not None:
|
||
f.write(f"{width} {height} {x} {y} {sash} {sash_transcript}\n")
|
||
elif sash is not None:
|
||
f.write(f"{width} {height} {x} {y} {sash}\n")
|
||
else:
|
||
f.write(f"{width} {height} {x} {y}\n")
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def reset_all_window_positions() -> None:
|
||
"""Löscht alle gespeicherten Fensterpositionen, damit beim nächsten Start alle Fenster zentriert öffnen."""
|
||
base_dir = get_writable_data_dir()
|
||
# Alle Fenster-Geometrie-Dateien
|
||
geometry_files = [
|
||
WINDOW_CONFIG_FILENAME,
|
||
PRUEFEN_WINDOW_CONFIG_FILENAME,
|
||
ORDNER_WINDOW_CONFIG_FILENAME,
|
||
TEXT_WINDOW_CONFIG_FILENAME,
|
||
DIKTAT_WINDOW_CONFIG_FILENAME,
|
||
DISKUSSION_WINDOW_CONFIG_FILENAME,
|
||
SETTINGS_WINDOW_CONFIG_FILENAME,
|
||
TODO_WINDOW_CONFIG_FILENAME,
|
||
PANED_POSITIONS_CONFIG_FILENAME,
|
||
]
|
||
# Auch generische Toplevel-Geometrie-Dateien (autotext, interaktionscheck, ki_kontrolle, etc.)
|
||
for fname in os.listdir(base_dir):
|
||
if fname.startswith("kg_diktat_") and fname.endswith("_geometry.txt"):
|
||
geometry_files.append(fname)
|
||
# KG Detail-Level und SOAP-Section-Levels zurücksetzen
|
||
for cfg_name in (KG_DETAIL_LEVEL_CONFIG_FILENAME, SOAP_SECTION_LEVELS_CONFIG_FILENAME):
|
||
cfg_path = os.path.join(base_dir, cfg_name)
|
||
try:
|
||
if os.path.isfile(cfg_path):
|
||
os.remove(cfg_path)
|
||
except Exception:
|
||
pass
|
||
# Alle löschen
|
||
deleted = 0
|
||
for fname in geometry_files:
|
||
path = os.path.join(base_dir, fname)
|
||
try:
|
||
if os.path.isfile(path):
|
||
os.remove(path)
|
||
deleted += 1
|
||
except Exception:
|
||
pass
|
||
return deleted
|
||
|
||
|
||
def _opacity_config_path():
|
||
return os.path.join(get_writable_data_dir(), OPACITY_CONFIG_FILENAME)
|
||
|
||
|
||
def load_opacity() -> float:
|
||
"""Liest die Fenster-Transparenz (0.4–1.0). Standard 0.9."""
|
||
try:
|
||
path = _opacity_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
v = float(f.read().strip())
|
||
return max(MIN_OPACITY, min(1.0, v))
|
||
except Exception:
|
||
pass
|
||
return DEFAULT_OPACITY
|
||
|
||
|
||
def save_opacity(value: float) -> None:
|
||
"""Speichert die Fenster-Transparenz."""
|
||
try:
|
||
v = max(MIN_OPACITY, min(1.0, value))
|
||
with open(_opacity_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(str(v))
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _autotext_config_path():
|
||
return os.path.join(get_writable_data_dir(), AUTOTEXT_CONFIG_FILENAME)
|
||
|
||
|
||
def _font_scale_config_path():
|
||
return os.path.join(get_writable_data_dir(), FONT_SCALE_CONFIG_FILENAME)
|
||
|
||
|
||
def _button_scale_config_path():
|
||
return os.path.join(get_writable_data_dir(), BUTTON_SCALE_CONFIG_FILENAME)
|
||
|
||
|
||
def load_font_scale() -> float:
|
||
"""Liest den Schriftgrößen-Skalierungsfaktor (0.3–0.8). Standard 1.0."""
|
||
try:
|
||
path = _font_scale_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
v = float(f.read().strip())
|
||
return max(MIN_FONT_SCALE, min(MAX_FONT_SCALE, v))
|
||
except Exception:
|
||
pass
|
||
return DEFAULT_FONT_SCALE
|
||
|
||
|
||
def save_font_scale(value: float) -> None:
|
||
"""Speichert den Schriftgrößen-Skalierungsfaktor."""
|
||
try:
|
||
v = max(MIN_FONT_SCALE, min(MAX_FONT_SCALE, value))
|
||
with open(_font_scale_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(str(v))
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def load_button_scale() -> float:
|
||
"""Liest den Button-Größen-Skalierungsfaktor (0.8–2.0). Standard 1.0."""
|
||
try:
|
||
path = _button_scale_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
v = float(f.read().strip())
|
||
return max(MIN_BUTTON_SCALE, min(MAX_BUTTON_SCALE, v))
|
||
except Exception:
|
||
pass
|
||
return DEFAULT_BUTTON_SCALE
|
||
|
||
|
||
def save_button_scale(value: float) -> None:
|
||
"""Speichert den Button-Größen-Skalierungsfaktor."""
|
||
try:
|
||
v = max(MIN_BUTTON_SCALE, min(MAX_BUTTON_SCALE, value))
|
||
with open(_button_scale_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(str(v))
|
||
except Exception:
|
||
pass
|
||
|
||
# (Textfeld-Schriftgrößen + add_text_font_size_control sind in aza_ui_helpers.py)
|
||
|
||
|
||
def _token_usage_config_path():
|
||
return os.path.join(get_writable_data_dir(), TOKEN_USAGE_CONFIG_FILENAME)
|
||
|
||
|
||
def load_token_usage() -> dict:
|
||
"""Liest die Token-Nutzung. Format: {'used': int, 'total': int, 'budget_dollars': float, 'used_dollars': float}"""
|
||
try:
|
||
path = _token_usage_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.loads(f.read().strip())
|
||
return data
|
||
except Exception:
|
||
pass
|
||
return {"used": 0, "total": 1000000, "budget_dollars": 0, "used_dollars": 0}
|
||
|
||
|
||
def save_token_usage(used: int = None, total: int = None, budget_dollars: float = None, used_dollars: float = None) -> None:
|
||
"""Speichert die Token-Nutzung."""
|
||
try:
|
||
current = load_token_usage()
|
||
if used is not None:
|
||
current["used"] = used
|
||
if total is not None:
|
||
current["total"] = total
|
||
if budget_dollars is not None:
|
||
current["budget_dollars"] = budget_dollars
|
||
if used_dollars is not None:
|
||
current["used_dollars"] = used_dollars
|
||
with open(_token_usage_config_path(), "w", encoding="utf-8") as f:
|
||
json.dump(current, f)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def add_token_usage(tokens: int) -> None:
|
||
"""Fügt verbrauchte Tokens hinzu."""
|
||
try:
|
||
data = load_token_usage()
|
||
data["used"] = data.get("used", 0) + tokens
|
||
save_token_usage(used=data["used"])
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def get_remaining_tokens() -> int:
|
||
"""Verbleibende KI-Einheiten (Token-Guthaben)."""
|
||
data = load_token_usage()
|
||
return max(0, data.get("total", DEFAULT_TOKEN_QUOTA) - data.get("used", 0))
|
||
|
||
|
||
def get_capacity_fraction() -> float:
|
||
"""Anteil verbleibender Kapazität (0.0 – 1.0)."""
|
||
data = load_token_usage()
|
||
total = data.get("total", DEFAULT_TOKEN_QUOTA)
|
||
if total <= 0:
|
||
return 1.0
|
||
return max(0.0, min(1.0, (total - data.get("used", 0)) / total))
|
||
|
||
|
||
def is_capacity_low() -> bool:
|
||
"""True wenn verbleibende Kapazität unter dem Soft-Lock-Schwellenwert liegt."""
|
||
return get_remaining_tokens() <= SOFT_LOCK_THRESHOLD
|
||
|
||
|
||
def estimated_reports_remaining() -> int:
|
||
"""Geschätzte verbleibende Berichte basierend auf Durchschnittsverbrauch."""
|
||
remaining = get_remaining_tokens()
|
||
if AVG_TOKENS_PER_REPORT <= 0:
|
||
return 0
|
||
return remaining // AVG_TOKENS_PER_REPORT
|
||
|
||
|
||
def reset_token_allowance(total: int = None) -> None:
|
||
"""Setzt das Token-Guthaben zurück (Admin-Funktion)."""
|
||
if total is None:
|
||
total = DEFAULT_TOKEN_QUOTA
|
||
save_token_usage(used=0, total=total)
|
||
|
||
|
||
# ─── Installations-Standort (anonymisiert) ───────────────────────────────────
|
||
|
||
_LOCATION_FILENAME = "aza_installation_location.json"
|
||
|
||
|
||
def _location_path() -> str:
|
||
return os.path.join(get_writable_data_dir(), _LOCATION_FILENAME)
|
||
|
||
|
||
def log_installation_location() -> dict | None:
|
||
"""Ermittelt den ungefähren Standort via IP (ip-api.com) und speichert ihn anonymisiert.
|
||
|
||
Gespeichert werden nur Stadt, Region und Land – keine IP-Adresse.
|
||
Wird im Hintergrund-Thread aufgerufen, blockiert die UI nicht.
|
||
"""
|
||
import urllib.request
|
||
try:
|
||
req = urllib.request.Request(
|
||
"http://ip-api.com/json/?fields=status,city,regionName,country,countryCode",
|
||
headers={"User-Agent": "AZA-Desktop/1.0"},
|
||
)
|
||
resp = urllib.request.urlopen(req, timeout=5)
|
||
data = json.loads(resp.read().decode("utf-8"))
|
||
if data.get("status") != "success":
|
||
return None
|
||
|
||
location = {
|
||
"city": data.get("city", ""),
|
||
"region": data.get("regionName", ""),
|
||
"country": data.get("country", ""),
|
||
"country_code": data.get("countryCode", ""),
|
||
"updated": datetime.now().isoformat(timespec="seconds"),
|
||
}
|
||
with open(_location_path(), "w", encoding="utf-8") as f:
|
||
json.dump(location, f, ensure_ascii=False, indent=2)
|
||
return location
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def load_installation_location() -> dict:
|
||
"""Liest den gespeicherten Installations-Standort."""
|
||
try:
|
||
path = _location_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return {}
|
||
|
||
|
||
def get_location_display() -> str:
|
||
"""Gibt den Standort als lesbaren String zurück."""
|
||
loc = load_installation_location()
|
||
if not loc or not loc.get("city"):
|
||
return "Nicht ermittelt"
|
||
parts = [loc.get("city", "")]
|
||
if loc.get("region"):
|
||
parts.append(loc["region"])
|
||
if loc.get("country"):
|
||
parts.append(loc["country"])
|
||
return ", ".join(parts)
|
||
|
||
|
||
_REGISTRY_URL = "https://aza-medwork.ch/api/installations"
|
||
|
||
|
||
def register_installation() -> int | None:
|
||
"""Registriert diese Installation anonym am AZA-Netzwerk und gibt die
|
||
Gesamt-Anzahl einzigartiger Installationen zurück.
|
||
|
||
Falls der Server nicht erreichbar ist, wird None zurückgegeben.
|
||
Die Geräte-ID wird als anonymer SHA256-Hash gesendet.
|
||
"""
|
||
import urllib.request
|
||
import platform
|
||
try:
|
||
raw = f"{platform.node()}-{platform.machine()}-{os.getlogin()}"
|
||
device_hash = hashlib.sha256(raw.encode()).hexdigest()[:16]
|
||
except Exception:
|
||
device_hash = "unknown"
|
||
|
||
loc = load_installation_location()
|
||
payload = json.dumps({
|
||
"device_id": device_hash,
|
||
"city": loc.get("city", ""),
|
||
"country_code": loc.get("country_code", ""),
|
||
}).encode("utf-8")
|
||
|
||
try:
|
||
req = urllib.request.Request(
|
||
_REGISTRY_URL,
|
||
data=payload,
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "AZA-Desktop/1.0",
|
||
},
|
||
method="POST",
|
||
)
|
||
resp = urllib.request.urlopen(req, timeout=5)
|
||
data = json.loads(resp.read().decode("utf-8"))
|
||
count = data.get("total_installations")
|
||
if isinstance(count, int):
|
||
_save_cached_install_count(count)
|
||
return count
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def _cached_install_count_path() -> str:
|
||
return os.path.join(get_writable_data_dir(), "install_count_cache.json")
|
||
|
||
|
||
def _save_cached_install_count(count: int):
|
||
try:
|
||
path = _cached_install_count_path()
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump({"count": count, "updated": datetime.now().isoformat()}, f)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def get_install_count() -> tuple[int, bool]:
|
||
"""Gibt (Anzahl, ist_live) zurück.
|
||
|
||
Versucht zuerst den Server, fällt auf Cache zurück, zuletzt auf 1.
|
||
"""
|
||
live = register_installation()
|
||
if live is not None:
|
||
return live, True
|
||
|
||
try:
|
||
path = _cached_install_count_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
cached = data.get("count", 1)
|
||
if isinstance(cached, int) and cached > 0:
|
||
return cached, False
|
||
except Exception:
|
||
pass
|
||
|
||
return 1, False
|
||
|
||
|
||
def fetch_openai_usage(client) -> dict:
|
||
"""Ruft echte Verbrauchs-Daten von OpenAI ab."""
|
||
try:
|
||
# OpenAI API Key aus Client extrahieren
|
||
api_key = client.api_key if hasattr(client, 'api_key') else None
|
||
if not api_key:
|
||
return None
|
||
|
||
# Verwende httpx (bereits von openai installiert)
|
||
import httpx
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# Verbrauch der letzten 30 Tage abrufen
|
||
end_date = datetime.now()
|
||
start_date = end_date - timedelta(days=30)
|
||
|
||
url = f"https://api.openai.com/v1/usage?start_date={start_date.strftime('%Y-%m-%d')}&end_date={end_date.strftime('%Y-%m-%d')}"
|
||
|
||
with httpx.Client(timeout=10.0) as http_client:
|
||
response = http_client.get(url, headers=headers)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
# Summiere Verbrauch aus allen Tagen
|
||
total_cost = 0
|
||
for day_data in data.get("data", []):
|
||
total_cost += day_data.get("cost", 0) / 100 # Cent to Dollar
|
||
|
||
return {
|
||
"used_dollars": total_cost,
|
||
"success": True
|
||
}
|
||
else:
|
||
return {
|
||
"error": f"API returned status {response.status_code}",
|
||
"success": False
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
"error": str(e),
|
||
"success": False
|
||
}
|
||
|
||
return None
|
||
|
||
|
||
def load_autotext() -> dict:
|
||
"""Laedt Autotext-Einstellungen. ACHTUNG: Der globale Autotext-Listener
|
||
in basis14.py nutzt einen RAM-Cache und ruft diese Funktion NICHT im
|
||
Hook-Callback auf. Das ist bewusst so (Windows LowLevelHooksTimeout).
|
||
Bitte diese Funktion NICHT in on_press/on_release aufrufen."""
|
||
try:
|
||
path = _autotext_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict):
|
||
return {
|
||
"enabled": data.get("enabled", True),
|
||
"entries": data.get("entries") if isinstance(data.get("entries"), dict) else {},
|
||
"diktat_auto_start": data.get("diktat_auto_start", True),
|
||
"notizen_open_on_start": data.get("notizen_open_on_start", data.get("diktat_open_on_start", True)),
|
||
"textbloecke_visible": data.get("textbloecke_visible", True),
|
||
"addon_visible": data.get("addon_visible", True),
|
||
"addon_buttons": data.get("addon_buttons", {
|
||
"uebersetzer": True,
|
||
"email": True,
|
||
"autotext": True,
|
||
"whatsapp": True,
|
||
"docapp": True
|
||
}),
|
||
"kg_auto_delete_old": data.get("kg_auto_delete_old", False),
|
||
"textbloecke_collapsed": data.get("textbloecke_collapsed", False),
|
||
"status_color": data.get("status_color", "#BD4500"),
|
||
"soap_collapsed": data.get("soap_collapsed", False),
|
||
"autoOpenNews": data.get("autoOpenNews", False),
|
||
"autoOpenEvents": data.get("autoOpenEvents", True),
|
||
"newsTemplate": data.get("newsTemplate", "all"),
|
||
"newsSelectedSpecialties": data.get("newsSelectedSpecialties", []),
|
||
"newsSelectedRegions": data.get("newsSelectedRegions", ["CH", "EU"]),
|
||
"newsSort": data.get("newsSort", "newest"),
|
||
"eventsSelectedSpecialties": data.get("eventsSelectedSpecialties", ["general-medicine"]),
|
||
"eventsSelectedRegions": data.get("eventsSelectedRegions", ["CH", "EU"]),
|
||
"eventsTemplate": data.get("eventsTemplate", "general_ch_eu"),
|
||
"eventsSort": data.get("eventsSort", "soonest"),
|
||
"eventsMonthsAhead": int(data.get("eventsMonthsAhead", 13)),
|
||
"selectedLanguage": data.get("selectedLanguage", "system"),
|
||
"user_specialty_default": data.get("user_specialty_default", "dermatology"),
|
||
"user_specialties_selected": data.get("user_specialties_selected", []),
|
||
"ui_font_delta": int(data.get("ui_font_delta", -2)),
|
||
"global_right_click_paste": data.get("global_right_click_paste", True),
|
||
"todo_auto_open": data.get("todo_auto_open", False),
|
||
"autocopy_after_diktat": data.get("autocopy_after_diktat", True),
|
||
"kommentare_auto_open": data.get("kommentare_auto_open", False),
|
||
"empfang_auto_open": data.get("empfang_auto_open", False),
|
||
"empfang_was_open": data.get("empfang_was_open", False),
|
||
"empfang_prefs": data.get("empfang_prefs", {}),
|
||
"medikament_quelle": data.get("medikament_quelle", "compendium.ch"),
|
||
"diagnose_quelle": data.get("diagnose_quelle", ""),
|
||
"dokumente_collapsed": data.get("dokumente_collapsed", False),
|
||
"active_brief_profile": data.get("active_brief_profile", ""),
|
||
"stilprofil_enabled": data.get("stilprofil_enabled", False),
|
||
"stilprofil_name": data.get("stilprofil_name", ""),
|
||
"stilprofil_default_brief": data.get("stilprofil_default_brief", False),
|
||
}
|
||
except Exception:
|
||
pass
|
||
return {
|
||
"enabled": True, "entries": {}, "diktat_auto_start": True,
|
||
"notizen_open_on_start": True,
|
||
"textbloecke_visible": True, "addon_visible": True,
|
||
"addon_buttons": {
|
||
"uebersetzer": True,
|
||
"email": True,
|
||
"autotext": True,
|
||
"whatsapp": True,
|
||
"docapp": True
|
||
},
|
||
"kg_auto_delete_old": False,
|
||
"textbloecke_collapsed": False,
|
||
"status_color": "#BD4500",
|
||
"soap_collapsed": False,
|
||
"dokumente_collapsed": False,
|
||
"autoOpenNews": False,
|
||
"autoOpenEvents": True,
|
||
"newsTemplate": "all",
|
||
"newsSelectedSpecialties": [],
|
||
"newsSelectedRegions": ["CH", "EU"],
|
||
"newsSort": "newest",
|
||
"eventsSelectedSpecialties": ["general-medicine"],
|
||
"eventsSelectedRegions": ["CH", "EU"],
|
||
"eventsTemplate": "general_ch_eu",
|
||
"eventsSort": "soonest",
|
||
"eventsMonthsAhead": 13,
|
||
"selectedLanguage": "system",
|
||
"user_specialty_default": "dermatology",
|
||
"user_specialties_selected": [],
|
||
"ui_font_delta": -2,
|
||
"global_right_click_paste": True,
|
||
"todo_auto_open": False,
|
||
"autocopy_after_diktat": True,
|
||
"kommentare_auto_open": False,
|
||
"medikament_quelle": "compendium.ch",
|
||
"diagnose_quelle": "",
|
||
"active_brief_profile": "",
|
||
"stilprofil_enabled": False,
|
||
"stilprofil_name": "",
|
||
"stilprofil_default_brief": False,
|
||
}
|
||
|
||
|
||
def save_autotext(data: dict) -> None:
|
||
"""Speichert Autotext inkl. News/Event-Settings dauerhaft."""
|
||
try:
|
||
with open(_autotext_config_path(), "w", encoding="utf-8") as f:
|
||
json.dump(
|
||
{
|
||
"enabled": data.get("enabled", True),
|
||
"entries": data.get("entries") or {},
|
||
"diktat_auto_start": data.get("diktat_auto_start", True),
|
||
"notizen_open_on_start": data.get("notizen_open_on_start", data.get("diktat_open_on_start", True)),
|
||
"textbloecke_visible": data.get("textbloecke_visible", True),
|
||
"addon_visible": data.get("addon_visible", True),
|
||
"kg_auto_delete_old": data.get("kg_auto_delete_old", False),
|
||
"addon_buttons": data.get("addon_buttons", {}),
|
||
"textbloecke_collapsed": data.get("textbloecke_collapsed", False),
|
||
"status_color": data.get("status_color", "#BD4500"),
|
||
"soap_collapsed": data.get("soap_collapsed", False),
|
||
"autoOpenNews": bool(data.get("autoOpenNews", False)),
|
||
"autoOpenEvents": bool(data.get("autoOpenEvents", True)),
|
||
"newsTemplate": data.get("newsTemplate", "all"),
|
||
"newsSelectedSpecialties": data.get("newsSelectedSpecialties", []),
|
||
"newsSelectedRegions": data.get("newsSelectedRegions", ["CH", "EU"]),
|
||
"newsSort": data.get("newsSort", "newest"),
|
||
"eventsSelectedSpecialties": data.get("eventsSelectedSpecialties", ["general-medicine"]),
|
||
"eventsSelectedRegions": data.get("eventsSelectedRegions", ["CH", "EU"]),
|
||
"eventsTemplate": data.get("eventsTemplate", "general_ch_eu"),
|
||
"eventsSort": data.get("eventsSort", "soonest"),
|
||
"eventsMonthsAhead": int(data.get("eventsMonthsAhead", 13)),
|
||
"selectedLanguage": data.get("selectedLanguage", "system"),
|
||
"user_specialty_default": data.get("user_specialty_default", "dermatology"),
|
||
"user_specialties_selected": data.get("user_specialties_selected", []),
|
||
"ui_font_delta": int(data.get("ui_font_delta", -2)),
|
||
"global_right_click_paste": bool(data.get("global_right_click_paste", True)),
|
||
"todo_auto_open": bool(data.get("todo_auto_open", False)),
|
||
"autocopy_after_diktat": bool(data.get("autocopy_after_diktat", True)),
|
||
"kommentare_auto_open": bool(data.get("kommentare_auto_open", False)),
|
||
"empfang_auto_open": bool(data.get("empfang_auto_open", False)),
|
||
"empfang_was_open": bool(data.get("empfang_was_open", False)),
|
||
"empfang_prefs": data.get("empfang_prefs", {}),
|
||
"medikament_quelle": data.get("medikament_quelle", "compendium.ch"),
|
||
"diagnose_quelle": data.get("diagnose_quelle", ""),
|
||
"dokumente_collapsed": bool(data.get("dokumente_collapsed", False)),
|
||
"active_brief_profile": data.get("active_brief_profile", ""),
|
||
"stilprofil_enabled": bool(data.get("stilprofil_enabled", False)),
|
||
"stilprofil_name": data.get("stilprofil_name", ""),
|
||
"stilprofil_default_brief": bool(data.get("stilprofil_default_brief", False)),
|
||
},
|
||
f, ensure_ascii=False, indent=2,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def is_autocopy_after_diktat_enabled() -> bool:
|
||
"""Ob nach Diktat/Transkription automatisch in Zwischenablage kopiert wird (Standard: ja)."""
|
||
try:
|
||
return bool(load_autotext().get("autocopy_after_diktat", True))
|
||
except Exception:
|
||
return True
|
||
|
||
|
||
def is_global_right_click_paste_enabled() -> bool:
|
||
"""Ob Rechtsklick in externen Apps direkt einfügt (Standard: ja)."""
|
||
try:
|
||
return bool(load_autotext().get("global_right_click_paste", True))
|
||
except Exception:
|
||
return True
|
||
|
||
|
||
def save_autocopy_prefs(autocopy: bool | None = None, global_right_click: bool | None = None) -> None:
|
||
"""Speichert Autocopy/Rechtsklick-Einstellungen (nur gegebene Werte werden aktualisiert)."""
|
||
try:
|
||
data = load_autotext()
|
||
if autocopy is not None:
|
||
data["autocopy_after_diktat"] = bool(autocopy)
|
||
if global_right_click is not None:
|
||
data["global_right_click_paste"] = bool(global_right_click)
|
||
save_autotext(data)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _is_admin() -> bool:
|
||
"""Prüft, ob die Anwendung mit Administratorrechten läuft (für globalen Autotext)."""
|
||
if sys.platform != "win32":
|
||
return False
|
||
try:
|
||
import ctypes
|
||
return bool(ctypes.windll.shell32.IsUserAnAdmin())
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _run_as_admin() -> bool:
|
||
"""Startet die Anwendung mit Administratorrechten neu. Beendet die aktuelle Instanz."""
|
||
if sys.platform != "win32":
|
||
return False
|
||
try:
|
||
import ctypes
|
||
args = " ".join([f'"{a}"' if " " in a else a for a in sys.argv])
|
||
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, args, None, 1)
|
||
sys.exit(0)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def sanitize_markdown_for_plain_text(raw_text: str) -> str:
|
||
"""Entfernt Markdown-Syntax für sauberen Plain-Text (ohne *, #, etc.)."""
|
||
lines = (raw_text or "").replace("\r\n", "\n").replace("\r", "\n").split("\n")
|
||
out_lines = []
|
||
for raw_line in lines:
|
||
line = raw_line
|
||
line = re.sub(r"^\s*#{1,6}\s+", "", line)
|
||
line = re.sub(r"^\s*\d+\.\s+", "", line)
|
||
line = re.sub(r"^\s*[-*•]\s+", "", line)
|
||
line = re.sub(r"\*\*(.+?)\*\*", r"\1", line)
|
||
line = re.sub(r"__(.+?)__", r"\1", line)
|
||
line = re.sub(r"(?<!\*)\*(?!\s)(.+?)(?<!\s)\*(?!\*)", r"\1", line)
|
||
line = re.sub(r"(?<!_)_(?!\s)(.+?)(?<!\s)_(?!_)", r"\1", line)
|
||
out_lines.append(line)
|
||
return "\n".join(out_lines).strip()
|
||
|
||
|
||
def _win_clipboard_set(text: str, html_fragment: str = None) -> bool:
|
||
"""Text in Windows-Zwischenablage (für globalen Autotext per Strg+V)."""
|
||
if sys.platform != "win32":
|
||
return False
|
||
try:
|
||
import ctypes
|
||
from ctypes import wintypes
|
||
CF_UNICODETEXT = 13
|
||
GMEM_MOVEABLE = 0x0002
|
||
GMEM_DDESHARE = 0x2000
|
||
alloc_flags = GMEM_MOVEABLE | GMEM_DDESHARE
|
||
kernel32 = ctypes.WinDLL("kernel32")
|
||
user32 = ctypes.WinDLL("user32")
|
||
user32.OpenClipboard.argtypes = [wintypes.HWND]
|
||
user32.OpenClipboard.restype = wintypes.BOOL
|
||
user32.CloseClipboard.argtypes = []
|
||
user32.EmptyClipboard.argtypes = []
|
||
user32.RegisterClipboardFormatW.argtypes = [wintypes.LPCWSTR]
|
||
user32.RegisterClipboardFormatW.restype = wintypes.UINT
|
||
user32.SetClipboardData.argtypes = [wintypes.UINT, wintypes.HANDLE]
|
||
user32.SetClipboardData.restype = wintypes.HANDLE
|
||
kernel32.GlobalAlloc.argtypes = [wintypes.UINT, ctypes.c_size_t]
|
||
kernel32.GlobalAlloc.restype = wintypes.HGLOBAL
|
||
kernel32.GlobalLock.argtypes = [wintypes.HGLOBAL]
|
||
kernel32.GlobalLock.restype = ctypes.c_void_p
|
||
kernel32.GlobalUnlock.argtypes = [wintypes.HGLOBAL]
|
||
|
||
def _set_clipboard_data(fmt: int, payload: bytes) -> bool:
|
||
h = kernel32.GlobalAlloc(alloc_flags, len(payload))
|
||
if not h:
|
||
return False
|
||
ptr = kernel32.GlobalLock(h)
|
||
if not ptr:
|
||
return False
|
||
ctypes.memmove(ptr, payload, len(payload))
|
||
kernel32.GlobalUnlock(h)
|
||
return bool(user32.SetClipboardData(fmt, h))
|
||
|
||
def _inline_markdown_to_html(line: str) -> str:
|
||
escaped = html.escape(line)
|
||
escaped = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", escaped)
|
||
escaped = re.sub(r"__(.+?)__", r"<strong>\1</strong>", escaped)
|
||
escaped = re.sub(r"(?<!\*)\*(?!\s)(.+?)(?<!\s)\*(?!\*)", r"<em>\1</em>", escaped)
|
||
escaped = re.sub(r"(?<!_)_(?!\s)(.+?)(?<!\s)_(?!_)", r"<em>\1</em>", escaped)
|
||
return escaped
|
||
|
||
def _markdown_like_to_html(raw_text: str) -> str:
|
||
lines = (raw_text or "").replace("\r\n", "\n").replace("\r", "\n").split("\n")
|
||
html_parts = []
|
||
in_ul = False
|
||
in_ol = False
|
||
|
||
def _close_lists():
|
||
nonlocal in_ul, in_ol
|
||
if in_ul:
|
||
html_parts.append("</ul>")
|
||
in_ul = False
|
||
if in_ol:
|
||
html_parts.append("</ol>")
|
||
in_ol = False
|
||
|
||
for raw_line in lines:
|
||
line = raw_line.rstrip()
|
||
stripped = line.strip()
|
||
if not stripped:
|
||
_close_lists()
|
||
html_parts.append("<br>")
|
||
continue
|
||
|
||
m_head = re.match(r"^(#{1,6})\s+(.*)$", stripped)
|
||
if m_head:
|
||
_close_lists()
|
||
level = min(6, len(m_head.group(1)))
|
||
html_parts.append(f"<h{level}>{_inline_markdown_to_html(m_head.group(2))}</h{level}>")
|
||
continue
|
||
|
||
m_ol = re.match(r"^\d+\.\s+(.*)$", stripped)
|
||
if m_ol:
|
||
if in_ul:
|
||
html_parts.append("</ul>")
|
||
in_ul = False
|
||
if not in_ol:
|
||
html_parts.append("<ol>")
|
||
in_ol = True
|
||
html_parts.append(f"<li>{_inline_markdown_to_html(m_ol.group(1))}</li>")
|
||
continue
|
||
|
||
m_ul = re.match(r"^[-*•]\s+(.*)$", stripped)
|
||
if m_ul:
|
||
if in_ol:
|
||
html_parts.append("</ol>")
|
||
in_ol = False
|
||
if not in_ul:
|
||
html_parts.append("<ul>")
|
||
in_ul = True
|
||
html_parts.append(f"<li>{_inline_markdown_to_html(m_ul.group(1))}</li>")
|
||
continue
|
||
|
||
_close_lists()
|
||
html_parts.append(f"<p>{_inline_markdown_to_html(stripped)}</p>")
|
||
|
||
_close_lists()
|
||
return "".join(html_parts) if html_parts else "<p></p>"
|
||
|
||
def _build_cf_html_payload(fragment_html: str) -> bytes:
|
||
full_html = (
|
||
"<html><body><!--StartFragment-->"
|
||
+ fragment_html
|
||
+ "<!--EndFragment--></body></html>"
|
||
)
|
||
marker_start = b"<!--StartFragment-->"
|
||
marker_end = b"<!--EndFragment-->"
|
||
header_template = (
|
||
"Version:0.9\r\n"
|
||
"StartHTML:{:010d}\r\n"
|
||
"EndHTML:{:010d}\r\n"
|
||
"StartFragment:{:010d}\r\n"
|
||
"EndFragment:{:010d}\r\n"
|
||
)
|
||
dummy_header = header_template.format(0, 0, 0, 0)
|
||
html_bytes = full_html.encode("utf-8")
|
||
start_html = len(dummy_header.encode("ascii"))
|
||
end_html = start_html + len(html_bytes)
|
||
start_fragment = start_html + html_bytes.index(marker_start) + len(marker_start)
|
||
end_fragment = start_html + html_bytes.index(marker_end)
|
||
header = header_template.format(start_html, end_html, start_fragment, end_fragment)
|
||
return header.encode("ascii") + html_bytes + b"\0"
|
||
|
||
for _ in range(5):
|
||
if user32.OpenClipboard(None):
|
||
break
|
||
time.sleep(0.03)
|
||
else:
|
||
return False
|
||
try:
|
||
user32.EmptyClipboard()
|
||
plain_text = sanitize_markdown_for_plain_text(text or "")
|
||
text_data = (plain_text + "\0").encode("utf-16-le")
|
||
ok_text = _set_clipboard_data(CF_UNICODETEXT, text_data)
|
||
html_format = user32.RegisterClipboardFormatW("HTML Format")
|
||
ok_html = False
|
||
if html_format:
|
||
fragment = html_fragment if html_fragment is not None else _markdown_like_to_html(text or "")
|
||
html_payload = _build_cf_html_payload(fragment)
|
||
ok_html = _set_clipboard_data(html_format, html_payload)
|
||
return bool(ok_text or ok_html)
|
||
finally:
|
||
user32.CloseClipboard()
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _win_clipboard_get() -> str:
|
||
"""Text aus Windows-Zwischenablage lesen."""
|
||
if sys.platform != "win32":
|
||
return ""
|
||
try:
|
||
import ctypes
|
||
from ctypes import wintypes
|
||
CF_UNICODETEXT = 13
|
||
user32 = ctypes.WinDLL("user32")
|
||
kernel32 = ctypes.WinDLL("kernel32")
|
||
user32.OpenClipboard.argtypes = [wintypes.HWND]
|
||
user32.OpenClipboard.restype = wintypes.BOOL
|
||
user32.GetClipboardData.argtypes = [wintypes.UINT]
|
||
user32.GetClipboardData.restype = wintypes.HANDLE
|
||
user32.CloseClipboard.argtypes = []
|
||
user32.CloseClipboard.restype = wintypes.BOOL
|
||
kernel32.GlobalLock.argtypes = [wintypes.HGLOBAL]
|
||
kernel32.GlobalLock.restype = ctypes.c_void_p
|
||
kernel32.GlobalUnlock.argtypes = [wintypes.HGLOBAL]
|
||
if not user32.OpenClipboard(None):
|
||
return ""
|
||
try:
|
||
h = user32.GetClipboardData(CF_UNICODETEXT)
|
||
if not h:
|
||
return ""
|
||
ptr = kernel32.GlobalLock(h)
|
||
if not ptr:
|
||
return ""
|
||
try:
|
||
buf = (ctypes.c_char * 131072).from_address(ptr)
|
||
data = bytearray()
|
||
for i in range(0, 131070, 2):
|
||
if buf[i] == 0 and buf[i + 1] == 0:
|
||
break
|
||
data.extend([buf[i], buf[i + 1]])
|
||
return data.decode("utf-16-le", errors="ignore")
|
||
finally:
|
||
kernel32.GlobalUnlock(h)
|
||
finally:
|
||
user32.CloseClipboard()
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def _signature_config_path():
|
||
return os.path.join(get_writable_data_dir(), SIGNATURE_CONFIG_FILENAME)
|
||
|
||
|
||
def load_signature_name(fallback_to_profile: bool = True) -> str:
|
||
"""Liest den gespeicherten Namen für die Unterschrift.
|
||
Wenn leer und fallback_to_profile=True, wird der Profilname verwendet."""
|
||
try:
|
||
path = _signature_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
val = f.read().strip()
|
||
if val:
|
||
return val
|
||
except Exception:
|
||
pass
|
||
if fallback_to_profile:
|
||
try:
|
||
profile = load_user_profile()
|
||
return profile.get("name", "")
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_signature_name(name: str) -> None:
|
||
"""Speichert den Namen für die Unterschrift."""
|
||
try:
|
||
with open(_signature_config_path(), "w", encoding="utf-8") as f:
|
||
f.write((name or "").strip())
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _korrekturen_config_path():
|
||
return os.path.join(get_writable_data_dir(), KORREKTUREN_CONFIG_FILENAME)
|
||
|
||
|
||
def load_korrekturen() -> dict:
|
||
"""Lädt die Korrekturen-Datenbank: {'medikamente': {falsch: richtig}, 'diagnosen': {...}}."""
|
||
try:
|
||
path = _korrekturen_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict):
|
||
for cat, defaults in _DEFAULT_KORREKTUREN.items():
|
||
if cat not in data:
|
||
data[cat] = {}
|
||
for falsch, richtig in defaults.items():
|
||
if falsch not in data[cat]:
|
||
data[cat][falsch] = richtig
|
||
return data
|
||
except Exception:
|
||
pass
|
||
return {cat: dict(mapping) for cat, mapping in _DEFAULT_KORREKTUREN.items()}
|
||
|
||
|
||
def save_korrekturen(data: dict) -> None:
|
||
"""Speichert die Korrekturen-Datenbank."""
|
||
try:
|
||
with open(_korrekturen_config_path(), "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _ablage_base_path():
|
||
"""Pfad zum Ablage-Basisordner: Dokumente/KG_Diktat_Ablage."""
|
||
docs = os.path.join(os.path.expanduser("~"), "Documents")
|
||
if not os.path.isdir(docs):
|
||
docs = os.path.expanduser("~")
|
||
return os.path.join(docs, "KG_Diktat_Ablage")
|
||
|
||
|
||
def _ablage_json_path():
|
||
"""Eine zentrale JSON-Datei – Inhalt wird hier zuverlässig gespeichert."""
|
||
return os.path.join(_ablage_base_path(), "ablage.json")
|
||
|
||
|
||
def ensure_ablage_dirs():
|
||
"""Erstellt Basisordner und alle Unterordner."""
|
||
base = _ablage_base_path()
|
||
os.makedirs(base, exist_ok=True)
|
||
for sub in ABLAGE_SUBFOLDERS:
|
||
os.makedirs(os.path.join(base, sub), exist_ok=True)
|
||
|
||
|
||
def _load_ablage_json():
|
||
"""Lädt ablage.json. Rückgabe: {"KG": [{"content": "...", "name": "..."}], ...}."""
|
||
path = _ablage_json_path()
|
||
if not os.path.isfile(path):
|
||
return {c: [] for c in ABLAGE_SUBFOLDERS}
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if not isinstance(data, dict):
|
||
return {c: [] for c in ABLAGE_SUBFOLDERS}
|
||
for c in ABLAGE_SUBFOLDERS:
|
||
if c not in data or not isinstance(data[c], list):
|
||
data[c] = []
|
||
return data
|
||
except Exception:
|
||
return {c: [] for c in ABLAGE_SUBFOLDERS}
|
||
|
||
|
||
def _save_ablage_json(data: dict) -> bool:
|
||
"""Schreibt ablage.json. Rückgabe: True bei Erfolg."""
|
||
try:
|
||
ensure_ablage_dirs()
|
||
path = _ablage_json_path()
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def save_to_ablage(category: str, content: str):
|
||
"""
|
||
Speichert in ablage.json (eine zentrale JSON-Datei).
|
||
Laden erfolgt über die App (Ordner → Auswählen → „Ausgewählte Datei in App laden“).
|
||
Rückgabe: Pfad der ablage.json bei Erfolg, sonst None.
|
||
"""
|
||
if category not in ABLAGE_SUBFOLDERS:
|
||
try:
|
||
messagebox.showerror("Speichern", f"Unbekannte Kategorie: {category}")
|
||
except Exception:
|
||
pass
|
||
return None
|
||
raw = content if isinstance(content, str) else (str(content) if content is not None else "")
|
||
content = raw.strip()
|
||
if not content:
|
||
return None
|
||
try:
|
||
ensure_ablage_dirs()
|
||
label = ABLAGE_LABELS.get(category, category)
|
||
now = datetime.now()
|
||
date_str = now.strftime("%d.%m.%Y")
|
||
time_str = now.strftime("%H:%M")
|
||
data = _load_ablage_json()
|
||
n = len(data.get(category, [])) + 1
|
||
name = f"{n} {label} {date_str} {time_str}.txt"
|
||
entry = {"content": content, "name": name}
|
||
data.setdefault(category, []).append(entry)
|
||
if not _save_ablage_json(data):
|
||
raise RuntimeError("ablage.json konnte nicht geschrieben werden.")
|
||
return _ablage_json_path()
|
||
except Exception as e:
|
||
try:
|
||
messagebox.showerror("Speichern fehlgeschlagen", f"Pfad: {_ablage_base_path()}\nFehler: {e}")
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def list_ablage_files(category: str):
|
||
"""Listet Einträge aus ablage.json, neueste zuoberst (höchste Nummer zuerst)."""
|
||
data = _load_ablage_json()
|
||
names = [e.get("name", "") for e in data.get(category, []) if isinstance(e, dict) and e.get("name")]
|
||
def sort_key(name):
|
||
m = re.match(r"^(\d+)", str(name))
|
||
return (-int(m.group(1)), name) if m else (0, name)
|
||
names.sort(key=sort_key)
|
||
return names
|
||
|
||
|
||
def _parse_entry_date(name: str):
|
||
"""Parst Datum aus Eintragsnamen (z.B. '1 KG 04.02.2026 10.txt') → datetime oder None."""
|
||
if not name:
|
||
return None
|
||
m = re.search(r"(\d{2})\.(\d{2})\.(\d{4})", str(name))
|
||
if not m:
|
||
return None
|
||
try:
|
||
d, mo, y = int(m.group(1)), int(m.group(2)), int(m.group(3))
|
||
return datetime(y, mo, d)
|
||
except (ValueError, IndexError):
|
||
return None
|
||
|
||
|
||
def get_old_kg_entries(days: int = 14):
|
||
"""Liefert KG-Einträge, die älter als days Tage sind."""
|
||
data = _load_ablage_json()
|
||
entries = data.get("KG", [])
|
||
if not isinstance(entries, list):
|
||
return []
|
||
cutoff = datetime.now() - timedelta(days=days)
|
||
return [e for e in entries if isinstance(e, dict) and _parse_entry_date(e.get("name", "")) and _parse_entry_date(e.get("name", "")) < cutoff]
|
||
|
||
|
||
def delete_kg_entries_older_than(days: int = 14) -> int:
|
||
"""Löscht KG-Einträge älter als days Tage. Rückgabe: Anzahl gelöschter Einträge."""
|
||
return delete_entries_older_than("KG", days=days)
|
||
|
||
|
||
def count_entries_older_than(category: str, days: int = 14) -> int:
|
||
"""Zählt Einträge einer Kategorie, die älter als days Tage sind."""
|
||
if category not in ABLAGE_SUBFOLDERS:
|
||
return 0
|
||
data = _load_ablage_json()
|
||
entries = data.get(category, [])
|
||
if not isinstance(entries, list):
|
||
return 0
|
||
cutoff = datetime.now() - timedelta(days=days)
|
||
old_entries = [
|
||
e
|
||
for e in entries
|
||
if isinstance(e, dict)
|
||
and _parse_entry_date(e.get("name", ""))
|
||
and _parse_entry_date(e.get("name", "")) < cutoff
|
||
]
|
||
return len(old_entries)
|
||
|
||
|
||
def delete_entries_older_than(category: str, days: int = 14) -> int:
|
||
"""Löscht Einträge einer Kategorie, die älter als days Tage sind. Rückgabe: Anzahl gelöschter Einträge."""
|
||
if category not in ABLAGE_SUBFOLDERS:
|
||
return 0
|
||
data = _load_ablage_json()
|
||
entries = data.get(category, [])
|
||
if not isinstance(entries, list):
|
||
return 0
|
||
cutoff = datetime.now() - timedelta(days=days)
|
||
kept = [e for e in entries if not isinstance(e, dict) or not _parse_entry_date(e.get("name", "")) or _parse_entry_date(e.get("name", "")) >= cutoff]
|
||
deleted = len(entries) - len(kept)
|
||
if deleted > 0:
|
||
data[category] = kept
|
||
_save_ablage_json(data)
|
||
return deleted
|
||
|
||
|
||
def delete_all_ablage_entries(category: str) -> int:
|
||
"""Löscht alle Einträge einer Kategorie. Rückgabe: Anzahl gelöschter Einträge."""
|
||
if category not in ABLAGE_SUBFOLDERS:
|
||
return 0
|
||
data = _load_ablage_json()
|
||
count = len(data.get(category, []))
|
||
if count > 0:
|
||
data[category] = []
|
||
_save_ablage_json(data)
|
||
return count
|
||
|
||
|
||
def get_ablage_content(category: str, filename: str) -> str:
|
||
"""Liest Inhalt aus ablage.json (Eintrag anhand name). Liefert nur Text, nie JSON-Rohdaten."""
|
||
if not filename or filename == "ablage.json":
|
||
return ""
|
||
data = _load_ablage_json()
|
||
for e in data.get(category, []):
|
||
if isinstance(e, dict) and e.get("name") == filename:
|
||
return (e.get("content") or "").strip() or ""
|
||
return ""
|
||
|
||
|
||
def _pruefen_window_config_path():
|
||
return os.path.join(get_writable_data_dir(), PRUEFEN_WINDOW_CONFIG_FILENAME)
|
||
|
||
|
||
def _ordner_window_config_path():
|
||
return os.path.join(get_writable_data_dir(), ORDNER_WINDOW_CONFIG_FILENAME)
|
||
|
||
|
||
def _text_window_config_path():
|
||
return os.path.join(get_writable_data_dir(), TEXT_WINDOW_CONFIG_FILENAME)
|
||
|
||
|
||
def _diktat_window_config_path():
|
||
return os.path.join(get_writable_data_dir(), DIKTAT_WINDOW_CONFIG_FILENAME)
|
||
|
||
|
||
def _diskussion_window_config_path():
|
||
return os.path.join(get_writable_data_dir(), DISKUSSION_WINDOW_CONFIG_FILENAME)
|
||
|
||
|
||
def _settings_window_config_path():
|
||
return os.path.join(get_writable_data_dir(), SETTINGS_WINDOW_CONFIG_FILENAME)
|
||
|
||
|
||
def load_settings_geometry() -> str:
|
||
"""Liest gespeicherte Geometry des Einstellungs-Fensters (z. B. '460x300+100+50')."""
|
||
try:
|
||
path = _settings_window_config_path()
|
||
if os.path.isfile(path):
|
||
geom = open(path, "r", encoding="utf-8").read().strip()
|
||
if geom:
|
||
return geom
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_settings_geometry(geom: str) -> None:
|
||
"""Speichert Größe und Position des Einstellungs-Fensters."""
|
||
try:
|
||
if geom:
|
||
with open(_settings_window_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(geom + "\n")
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _textbloecke_config_path():
|
||
return os.path.join(get_writable_data_dir(), TEXTBLOECKE_CONFIG_FILENAME)
|
||
|
||
|
||
def load_textbloecke():
|
||
"""Lädt die Textblöcke: {"1": {"name": "...", "content": "..."}, ...}. Mindestens 2 Slots."""
|
||
try:
|
||
path = _textbloecke_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict):
|
||
out = {}
|
||
for k, v in data.items():
|
||
if isinstance(k, str) and k.isdigit() and isinstance(v, dict):
|
||
out[k] = {"name": (v.get("name") or "").strip(), "content": v.get("content") or ""}
|
||
slots = sorted(out.keys(), key=int)
|
||
if len(slots) >= 2:
|
||
return {s: out[s] for s in slots}
|
||
except Exception:
|
||
pass
|
||
return {"1": {"name": "Textblock 1", "content": ""}, "2": {"name": "Textblock 2", "content": ""}}
|
||
|
||
|
||
def save_textbloecke(data: dict) -> None:
|
||
"""Speichert die Textblöcke dauerhaft. Alle Slots werden gespeichert."""
|
||
try:
|
||
full = {}
|
||
for k, v in (data or {}).items():
|
||
if isinstance(k, str) and isinstance(v, dict):
|
||
full[k] = {"name": (v.get("name") or "").strip(), "content": v.get("content") or ""}
|
||
if len(full) < 2:
|
||
return
|
||
with open(_textbloecke_config_path(), "w", encoding="utf-8") as f:
|
||
json.dump(full, f, ensure_ascii=False, indent=2)
|
||
f.flush()
|
||
try:
|
||
os.fsync(f.fileno())
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def load_pruefen_geometry():
|
||
"""Liest gespeicherte Größe und Position des Prüfen-Fensters. Rückgabe: (w, h, x, y) oder (w, h) oder None."""
|
||
try:
|
||
path = _pruefen_window_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
parts = f.read().strip().split()
|
||
if len(parts) >= 2:
|
||
w, h = int(parts[0]), int(parts[1])
|
||
if w >= 300 and h >= 250:
|
||
if len(parts) >= 4:
|
||
x, y = int(parts[2]), int(parts[3])
|
||
return (w, h, x, y)
|
||
return (w, h)
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def save_pruefen_geometry(width: int, height: int, x: int = None, y: int = None) -> None:
|
||
"""Speichert Größe und Position des Prüfen-Fensters."""
|
||
try:
|
||
with open(_pruefen_window_config_path(), "w", encoding="utf-8") as f:
|
||
if x is not None and y is not None:
|
||
f.write(f"{width} {height} {x} {y}\n")
|
||
else:
|
||
f.write(f"{width} {height}\n")
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def load_ordner_geometry() -> str:
|
||
"""Liest gespeicherte Geometry des Ordner-Fensters (z. B. '640x500+100+50')."""
|
||
try:
|
||
path = _ordner_window_config_path()
|
||
if os.path.isfile(path):
|
||
geom = open(path, "r", encoding="utf-8").read().strip()
|
||
if geom:
|
||
return geom
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_ordner_geometry(geom: str) -> None:
|
||
try:
|
||
with open(_ordner_window_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(geom)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def load_text_window_geometry() -> str:
|
||
try:
|
||
path = _text_window_config_path()
|
||
if os.path.isfile(path):
|
||
return open(path, "r", encoding="utf-8").read().strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_text_window_geometry(geom: str) -> None:
|
||
try:
|
||
with open(_text_window_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(geom)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def load_diktat_geometry() -> str:
|
||
try:
|
||
path = _diktat_window_config_path()
|
||
if os.path.isfile(path):
|
||
return open(path, "r", encoding="utf-8").read().strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_diktat_geometry(geom: str) -> None:
|
||
try:
|
||
with open(_diktat_window_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(geom)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def load_diskussion_geometry() -> str:
|
||
try:
|
||
path = _diskussion_window_config_path()
|
||
if os.path.isfile(path):
|
||
return open(path, "r", encoding="utf-8").read().strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_diskussion_geometry(geom: str) -> None:
|
||
try:
|
||
with open(_diskussion_window_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(geom)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def extract_diagnosen_therapie_procedere(text: str) -> str:
|
||
"""Extrahiert nur Diagnosen, Therapie und Procedere aus dem Text."""
|
||
if "KRANKENGESCHICHTE:" in text:
|
||
kg_part = text.split("TRANSKRIPT:")[0].replace("KRANKENGESCHICHTE:", "").strip()
|
||
else:
|
||
kg_part = text
|
||
lines = kg_part.split("\n")
|
||
result = []
|
||
in_block = False
|
||
target_headers = ("Diagnose:", "Diagnosen:", "Therapie:", "Procedere:")
|
||
|
||
for line in lines:
|
||
stripped = line.strip()
|
||
if any(stripped.startswith(h) for h in target_headers):
|
||
if result:
|
||
result.append("")
|
||
result.append(line)
|
||
in_block = True
|
||
elif in_block:
|
||
if stripped and stripped.endswith(":"):
|
||
in_block = False
|
||
else:
|
||
result.append(line)
|
||
|
||
out = "\n".join(result).strip()
|
||
return out if out else kg_part
|
||
|
||
|
||
def _similarity(a: str, b: str) -> float:
|
||
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
||
|
||
|
||
def apply_korrekturen(text: str, korrekturen: dict) -> tuple:
|
||
"""Wendet Korrekturen an. Rückgabe: (korrigierter_text, [(falsch, richtig), ...])."""
|
||
result = text
|
||
applied = []
|
||
FUZZY_THRESHOLD = 0.85
|
||
|
||
for kategorie, mapping in korrekturen.items():
|
||
if not isinstance(mapping, dict):
|
||
continue
|
||
for falsch, richtig in mapping.items():
|
||
if not falsch or not richtig:
|
||
continue
|
||
pattern = r"\b" + re.escape(falsch) + r"\b"
|
||
if re.search(pattern, result, re.IGNORECASE):
|
||
result = re.sub(pattern, richtig, result, flags=re.IGNORECASE)
|
||
applied.append((falsch, richtig))
|
||
|
||
words = re.findall(r"[A-Za-zÄÖÜäöüß0-9\-]+", result)
|
||
for kategorie, mapping in korrekturen.items():
|
||
if not isinstance(mapping, dict):
|
||
continue
|
||
for falsch, richtig in mapping.items():
|
||
if not falsch or not richtig or (falsch, richtig) in applied:
|
||
continue
|
||
for w in set(words):
|
||
if len(w) < 4:
|
||
continue
|
||
if _similarity(w, falsch) >= FUZZY_THRESHOLD:
|
||
pattern = r"\b" + re.escape(w) + r"\b"
|
||
result = re.sub(pattern, richtig, result)
|
||
applied.append((falsch, richtig))
|
||
words = re.findall(r"[A-Za-zÄÖÜäöüß0-9\-]+", result)
|
||
break
|
||
|
||
result = result.replace("ß", "ss")
|
||
|
||
return result, applied
|
||
|
||
|
||
def _kogu_gruss_config_path():
|
||
return os.path.join(get_writable_data_dir(), KOGU_GRUSS_CONFIG_FILENAME)
|
||
|
||
|
||
def load_kogu_gruss() -> str:
|
||
"""Liest den gespeicherten Schlusssatz für KOGU."""
|
||
try:
|
||
path = _kogu_gruss_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
s = f.read().strip()
|
||
if s in KOGU_GRUSS_OPTIONS:
|
||
return s
|
||
except Exception:
|
||
pass
|
||
return KOGU_GRUSS_OPTIONS[0]
|
||
|
||
|
||
def save_kogu_gruss(gruss: str) -> None:
|
||
"""Speichert den Schlusssatz für KOGU."""
|
||
try:
|
||
with open(_kogu_gruss_config_path(), "w", encoding="utf-8") as f:
|
||
f.write((gruss or "").strip())
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _kogu_templates_config_path():
|
||
return os.path.join(get_writable_data_dir(), KOGU_TEMPLATES_CONFIG_FILENAME)
|
||
|
||
|
||
def load_kogu_templates() -> str:
|
||
"""Liest die gespeicherte Vorlage für Kostengutsprachen (eigene Wünsche an Typ/Format/Inhalt)."""
|
||
try:
|
||
path = _kogu_templates_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return f.read().strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_kogu_templates(text: str) -> None:
|
||
"""Speichert die Vorlage für Kostengutsprachen."""
|
||
try:
|
||
with open(_kogu_templates_config_path(), "w", encoding="utf-8") as f:
|
||
f.write((text or "").strip())
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _diskussion_vorlage_config_path():
|
||
return os.path.join(get_writable_data_dir(), DISKUSSION_VORLAGE_CONFIG_FILENAME)
|
||
|
||
|
||
def load_diskussion_vorlage() -> str:
|
||
"""Liest die Vorlage für die KI-Diskussion (legt fest, wie die KI mit dem Nutzer diskutiert)."""
|
||
try:
|
||
path = _diskussion_vorlage_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return f.read().strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_diskussion_vorlage(text: str) -> None:
|
||
"""Speichert die Vorlage für die KI-Diskussion."""
|
||
try:
|
||
with open(_diskussion_vorlage_config_path(), "w", encoding="utf-8") as f:
|
||
f.write((text or "").strip())
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _op_bericht_template_config_path():
|
||
return os.path.join(get_writable_data_dir(), OP_BERICHT_TEMPLATE_CONFIG_FILENAME)
|
||
|
||
|
||
def load_op_bericht_template() -> str:
|
||
"""Liest die gespeicherte Vorlage für den OP-Bericht (eigene Wünsche an Format/Inhalt)."""
|
||
try:
|
||
path = _op_bericht_template_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return f.read().strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_op_bericht_template(text: str) -> None:
|
||
"""Speichert die Vorlage für den OP-Bericht."""
|
||
try:
|
||
with open(_op_bericht_template_config_path(), "w", encoding="utf-8") as f:
|
||
f.write((text or "").strip())
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
|
||
|
||
def _arztbrief_vorlage_config_path():
|
||
return os.path.join(get_writable_data_dir(), ARZTBRIEF_VORLAGE_CONFIG_FILENAME)
|
||
|
||
|
||
def load_arztbrief_vorlage() -> str:
|
||
"""Liest die gespeicherte Vorlage für den Arztbrief (Reihenfolge + Anweisungen)."""
|
||
try:
|
||
path = _arztbrief_vorlage_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return f.read().strip()
|
||
except Exception:
|
||
pass
|
||
return ARZTBRIEF_VORLAGE_DEFAULT
|
||
|
||
|
||
def save_arztbrief_vorlage(text: str) -> None:
|
||
"""Speichert die Vorlage für den Arztbrief."""
|
||
try:
|
||
with open(_arztbrief_vorlage_config_path(), "w", encoding="utf-8") as f:
|
||
f.write((text or "").strip())
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _todo_config_path():
|
||
return os.path.join(get_writable_data_dir(), TODO_CONFIG_FILENAME)
|
||
|
||
|
||
def _todo_window_config_path():
|
||
return os.path.join(get_writable_data_dir(), TODO_WINDOW_CONFIG_FILENAME)
|
||
|
||
|
||
def load_todos() -> list:
|
||
"""Lädt die To-do-Liste. Jedes Item: {id, text, done, date (optional, 'YYYY-MM-DD'), created}."""
|
||
try:
|
||
path = _todo_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return []
|
||
|
||
|
||
def save_todos(todos: list) -> None:
|
||
"""Speichert die To-do-Liste lokal UND pusht in die Cloud."""
|
||
try:
|
||
with open(_todo_config_path(), "w", encoding="utf-8") as f:
|
||
json.dump(todos, f, indent=2, ensure_ascii=False)
|
||
except Exception:
|
||
pass
|
||
import threading
|
||
threading.Thread(target=cloud_push_todos, args=(todos,), daemon=True).start()
|
||
|
||
|
||
def _notes_config_path():
|
||
return os.path.join(get_writable_data_dir(), NOTES_CONFIG_FILENAME)
|
||
|
||
|
||
def load_notes() -> list:
|
||
"""Lädt die Notizen-Liste. Jedes Item: {id, title, text, created}."""
|
||
try:
|
||
path = _notes_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return []
|
||
|
||
|
||
def save_notes(notes: list) -> None:
|
||
"""Speichert die Notizen-Liste lokal UND pusht in die Cloud."""
|
||
try:
|
||
with open(_notes_config_path(), "w", encoding="utf-8") as f:
|
||
json.dump(notes, f, indent=2, ensure_ascii=False)
|
||
except Exception:
|
||
pass
|
||
import threading
|
||
threading.Thread(target=cloud_push_notes, args=(notes,), daemon=True).start()
|
||
|
||
|
||
def _checklist_config_path():
|
||
return os.path.join(get_writable_data_dir(), CHECKLIST_CONFIG_FILENAME)
|
||
|
||
|
||
def load_checklists() -> list:
|
||
"""Lädt die Checklisten. Jede: {id, title, items: [{text, done}], created}."""
|
||
try:
|
||
path = _checklist_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return []
|
||
|
||
|
||
def save_checklists(checklists: list) -> None:
|
||
try:
|
||
with open(_checklist_config_path(), "w", encoding="utf-8") as f:
|
||
json.dump(checklists, f, indent=2, ensure_ascii=False)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def load_todo_geometry() -> str:
|
||
try:
|
||
path = _todo_window_config_path()
|
||
if os.path.isfile(path):
|
||
return open(path, "r", encoding="utf-8").read().strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_todo_geometry(geom: str) -> None:
|
||
try:
|
||
with open(_todo_window_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(geom)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _todo_settings_path():
|
||
return os.path.join(get_writable_data_dir(), TODO_SETTINGS_CONFIG_FILENAME)
|
||
|
||
|
||
def load_todo_settings() -> dict:
|
||
"""Lädt Todo-Fenster-Einstellungen (aktiver Tab, aktive Kategorie, benutzerdefinierte Kategorien)."""
|
||
try:
|
||
path = _todo_settings_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return {}
|
||
|
||
|
||
def save_todo_settings(settings: dict) -> None:
|
||
"""Speichert Todo-Fenster-Einstellungen."""
|
||
try:
|
||
with open(_todo_settings_path(), "w", encoding="utf-8") as f:
|
||
json.dump(settings, f, indent=2, ensure_ascii=False)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _todo_inbox_path():
|
||
return os.path.join(get_writable_data_dir(), TODO_INBOX_CONFIG_FILENAME)
|
||
|
||
|
||
def load_todo_inbox() -> list:
|
||
"""Lädt die Inbox (empfangene To-dos von anderen Benutzern)."""
|
||
try:
|
||
path = _todo_inbox_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return []
|
||
|
||
|
||
def save_todo_inbox(inbox: list) -> None:
|
||
"""Speichert die Inbox."""
|
||
try:
|
||
with open(_todo_inbox_path(), "w", encoding="utf-8") as f:
|
||
json.dump(inbox, f, indent=2, ensure_ascii=False)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def send_todo_to_inbox(todo_item: dict, sender_name: str, recipient: str) -> None:
|
||
"""Sendet ein To-do in die Inbox (Datei-basiert, für lokale Nutzung)."""
|
||
inbox = load_todo_inbox()
|
||
from datetime import datetime as _dt
|
||
entry = {
|
||
"id": int(_dt.now().timestamp() * 1000),
|
||
"text": todo_item.get("text", ""),
|
||
"date": todo_item.get("date"),
|
||
"priority": todo_item.get("priority", 0),
|
||
"notes": todo_item.get("notes", ""),
|
||
"done": False,
|
||
"sender": sender_name,
|
||
"recipient": recipient,
|
||
"sent_at": _dt.now().isoformat(),
|
||
}
|
||
inbox.append(entry)
|
||
save_todo_inbox(inbox)
|
||
|
||
|
||
# ─── Cloud-Sync (Supabase – kostenlose Cloud-DB) ───
|
||
|
||
|
||
def cloud_push_todos(todos: list) -> bool:
|
||
"""Schreibt die To-do-Liste nach Supabase."""
|
||
import urllib.request
|
||
payload = json.dumps({"data": todos}).encode("utf-8")
|
||
req = urllib.request.Request(
|
||
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.1",
|
||
data=payload, method="PATCH",
|
||
headers={
|
||
"apikey": _SUPABASE_ANON_KEY,
|
||
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
)
|
||
try:
|
||
urllib.request.urlopen(req, timeout=10)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def cloud_pull_todos() -> list:
|
||
"""Liest die To-do-Liste aus Supabase. Gibt None zurück bei Fehler."""
|
||
import urllib.request
|
||
req = urllib.request.Request(
|
||
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.1&select=data",
|
||
headers={
|
||
"apikey": _SUPABASE_ANON_KEY,
|
||
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
|
||
}
|
||
)
|
||
try:
|
||
resp = urllib.request.urlopen(req, timeout=10)
|
||
rows = json.loads(resp.read().decode("utf-8"))
|
||
if rows and len(rows) > 0:
|
||
return rows[0].get("data", [])
|
||
return []
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def cloud_get_status() -> str:
|
||
"""Gibt den Cloud-Status zurück."""
|
||
try:
|
||
pulled = cloud_pull_todos()
|
||
if pulled is not None:
|
||
return "Supabase verbunden"
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def cloud_push_notes(notes: list) -> bool:
|
||
"""Schreibt die Notizen-Liste nach Supabase (id=2 in todo_sync)."""
|
||
import urllib.request
|
||
payload = json.dumps({"id": 2, "data": notes}).encode("utf-8")
|
||
req = urllib.request.Request(
|
||
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.2",
|
||
data=payload, method="PATCH",
|
||
headers={
|
||
"apikey": _SUPABASE_ANON_KEY,
|
||
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
|
||
"Content-Type": "application/json",
|
||
}
|
||
)
|
||
try:
|
||
urllib.request.urlopen(req, timeout=10)
|
||
return True
|
||
except Exception:
|
||
# Wenn Zeile noch nicht existiert, INSERT
|
||
try:
|
||
req2 = urllib.request.Request(
|
||
f"{_SUPABASE_URL}/rest/v1/todo_sync",
|
||
data=payload, method="POST",
|
||
headers={
|
||
"apikey": _SUPABASE_ANON_KEY,
|
||
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
|
||
"Content-Type": "application/json",
|
||
"Prefer": "return=minimal",
|
||
}
|
||
)
|
||
urllib.request.urlopen(req2, timeout=10)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def cloud_pull_notes() -> list:
|
||
"""Liest die Notizen-Liste aus Supabase (id=2)."""
|
||
import urllib.request
|
||
req = urllib.request.Request(
|
||
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.2&select=data",
|
||
headers={
|
||
"apikey": _SUPABASE_ANON_KEY,
|
||
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
|
||
}
|
||
)
|
||
try:
|
||
resp = urllib.request.urlopen(req, timeout=10)
|
||
rows = json.loads(resp.read().decode("utf-8"))
|
||
if rows and len(rows) > 0:
|
||
return rows[0].get("data", [])
|
||
return []
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _user_profile_config_path():
|
||
return os.path.join(get_writable_data_dir(), USER_PROFILE_CONFIG_FILENAME)
|
||
|
||
|
||
def load_user_profile() -> dict:
|
||
"""Lädt das gespeicherte Benutzerprofil. Rückgabe: {name, specialty, clinic} oder leeres Dict."""
|
||
try:
|
||
path = _user_profile_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return {}
|
||
|
||
|
||
def save_user_profile(profile: dict) -> None:
|
||
"""Speichert das Benutzerprofil."""
|
||
try:
|
||
with open(_user_profile_config_path(), "w", encoding="utf-8") as f:
|
||
json.dump(profile, f, indent=2, ensure_ascii=False)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def extract_date_from_todo_text(text: str):
|
||
"""Erkennt deutsche Datumsangaben im diktierten Text und gibt (bereinigter_text, date_obj_or_None) zurück.
|
||
|
||
Erkannte Muster:
|
||
- "heute", "morgen", "übermorgen"
|
||
- "nächsten Montag/Dienstag/…", "am Montag", "kommenden Freitag"
|
||
- "in 3 Tagen", "in einer Woche", "in zwei Wochen", "in einem Monat"
|
||
- "bis 20. März", "am 15. Februar 2026", "bis 3.4.", "bis 03.04.2026"
|
||
- "bis Ende Woche", "bis Ende Monat"
|
||
"""
|
||
import re
|
||
from datetime import date, timedelta
|
||
|
||
if not text or not text.strip():
|
||
return text, None
|
||
|
||
original = text
|
||
today = date.today()
|
||
|
||
WOCHENTAGE = {
|
||
"montag": 0, "dienstag": 1, "mittwoch": 2, "donnerstag": 3,
|
||
"freitag": 4, "samstag": 5, "sonntag": 6,
|
||
}
|
||
MONATE = {
|
||
"januar": 1, "februar": 2, "märz": 3, "maerz": 3, "april": 4,
|
||
"mai": 5, "juni": 6, "juli": 7, "august": 8, "september": 9,
|
||
"oktober": 10, "november": 11, "dezember": 12,
|
||
"jan": 1, "feb": 2, "mär": 3, "mar": 3, "apr": 4,
|
||
"jun": 6, "jul": 7, "aug": 8, "sep": 9, "okt": 10, "nov": 11, "dez": 12,
|
||
}
|
||
ZAHLWOERTER = {
|
||
"einem": 1, "einer": 1, "eins": 1, "ein": 1, "zwei": 2, "drei": 3,
|
||
"vier": 4, "fünf": 5, "fuenf": 5, "sechs": 6, "sieben": 7, "acht": 8,
|
||
"neun": 9, "zehn": 10, "elf": 11, "zwölf": 12, "zwoelf": 12,
|
||
}
|
||
|
||
lowered = text.lower().strip()
|
||
found_date = None
|
||
pattern_match = None
|
||
|
||
# "heute"
|
||
m = re.search(r'\b(bis\s+)?heute\b', lowered)
|
||
if m:
|
||
found_date = today
|
||
pattern_match = m
|
||
|
||
# "morgen"
|
||
if not found_date:
|
||
m = re.search(r'\b(bis\s+)?morgen\b', lowered)
|
||
if m:
|
||
found_date = today + timedelta(days=1)
|
||
pattern_match = m
|
||
|
||
# "übermorgen"
|
||
if not found_date:
|
||
m = re.search(r'\b(bis\s+)?[üu]bermorgen\b', lowered)
|
||
if m:
|
||
found_date = today + timedelta(days=2)
|
||
pattern_match = m
|
||
|
||
# "bis Ende Woche"
|
||
if not found_date:
|
||
m = re.search(r'\bbis\s+ende\s+(?:der\s+)?woche\b', lowered)
|
||
if m:
|
||
days_until_friday = (4 - today.weekday()) % 7
|
||
if days_until_friday == 0:
|
||
days_until_friday = 7
|
||
found_date = today + timedelta(days=days_until_friday)
|
||
pattern_match = m
|
||
|
||
# "bis Ende Monat"
|
||
if not found_date:
|
||
m = re.search(r'\bbis\s+ende\s+(?:des\s+)?monat[s]?\b', lowered)
|
||
if m:
|
||
import calendar as cal_m
|
||
last_day = cal_m.monthrange(today.year, today.month)[1]
|
||
found_date = date(today.year, today.month, last_day)
|
||
pattern_match = m
|
||
|
||
# "nächsten/kommenden/am Montag/Dienstag/…"
|
||
if not found_date:
|
||
wt_pattern = "|".join(WOCHENTAGE.keys())
|
||
m = re.search(
|
||
r'\b(?:(?:bis\s+)?(?:n[äa]chsten?|kommenden?|am)\s+)(' + wt_pattern + r')\b',
|
||
lowered,
|
||
)
|
||
if m:
|
||
target_wd = WOCHENTAGE[m.group(1)]
|
||
days_ahead = (target_wd - today.weekday()) % 7
|
||
if days_ahead == 0:
|
||
days_ahead = 7
|
||
found_date = today + timedelta(days=days_ahead)
|
||
pattern_match = m
|
||
|
||
# "in X Tagen/Wochen/Monaten"
|
||
if not found_date:
|
||
m = re.search(
|
||
r'\bin\s+(\d+|' + '|'.join(ZAHLWOERTER.keys()) + r')\s+(tag(?:en?)?|woche[n]?|monat(?:en?)?)\b',
|
||
lowered,
|
||
)
|
||
if m:
|
||
num_str = m.group(1)
|
||
num = ZAHLWOERTER.get(num_str, None)
|
||
if num is None:
|
||
try:
|
||
num = int(num_str)
|
||
except ValueError:
|
||
num = 1
|
||
unit = m.group(2)
|
||
if "tag" in unit:
|
||
found_date = today + timedelta(days=num)
|
||
elif "woche" in unit:
|
||
found_date = today + timedelta(weeks=num)
|
||
elif "monat" in unit:
|
||
new_month = today.month + num
|
||
new_year = today.year + (new_month - 1) // 12
|
||
new_month = ((new_month - 1) % 12) + 1
|
||
import calendar as cal_m2
|
||
max_day = cal_m2.monthrange(new_year, new_month)[1]
|
||
found_date = date(new_year, new_month, min(today.day, max_day))
|
||
pattern_match = m
|
||
|
||
# "bis/am 20. März (2026)" oder "bis/am 20. 3. (2026)"
|
||
if not found_date:
|
||
monat_pattern = "|".join(MONATE.keys())
|
||
m = re.search(
|
||
r'\b(?:bis|am|vom)\s+(\d{1,2})\.\s*(' + monat_pattern + r')(?:\s+(\d{2,4}))?\b',
|
||
lowered,
|
||
)
|
||
if m:
|
||
day = int(m.group(1))
|
||
month = MONATE.get(m.group(2), None)
|
||
year = today.year
|
||
if m.group(3):
|
||
year = int(m.group(3))
|
||
if year < 100:
|
||
year += 2000
|
||
if month and 1 <= day <= 31:
|
||
try:
|
||
found_date = date(year, month, day)
|
||
if found_date < today and not m.group(3):
|
||
found_date = date(year + 1, month, day)
|
||
pattern_match = m
|
||
except ValueError:
|
||
found_date = None
|
||
|
||
# "bis/am 20.3." oder "bis/am 20.03.2026"
|
||
if not found_date:
|
||
m = re.search(
|
||
r'\b(?:bis|am|vom)\s+(\d{1,2})\.(\d{1,2})\.(?:(\d{2,4}))?\b',
|
||
lowered,
|
||
)
|
||
if m:
|
||
day = int(m.group(1))
|
||
month = int(m.group(2))
|
||
year = today.year
|
||
if m.group(3):
|
||
year = int(m.group(3))
|
||
if year < 100:
|
||
year += 2000
|
||
if 1 <= month <= 12 and 1 <= day <= 31:
|
||
try:
|
||
found_date = date(year, month, day)
|
||
if found_date < today and not m.group(3):
|
||
found_date = date(year + 1, month, day)
|
||
pattern_match = m
|
||
except ValueError:
|
||
found_date = None
|
||
|
||
# Standalone Datum: "20. März", "15. Februar 2026" (ohne bis/am Präfix)
|
||
if not found_date:
|
||
monat_pattern = "|".join(MONATE.keys())
|
||
m = re.search(
|
||
r'\b(\d{1,2})\.\s*(' + monat_pattern + r')(?:\s+(\d{2,4}))?\b',
|
||
lowered,
|
||
)
|
||
if m:
|
||
day = int(m.group(1))
|
||
month = MONATE.get(m.group(2), None)
|
||
year = today.year
|
||
if m.group(3):
|
||
year = int(m.group(3))
|
||
if year < 100:
|
||
year += 2000
|
||
if month and 1 <= day <= 31:
|
||
try:
|
||
found_date = date(year, month, day)
|
||
if found_date < today and not m.group(3):
|
||
found_date = date(year + 1, month, day)
|
||
pattern_match = m
|
||
except ValueError:
|
||
found_date = None
|
||
|
||
if found_date and pattern_match:
|
||
cleaned = original[:pattern_match.start()] + original[pattern_match.end():]
|
||
cleaned = re.sub(r'\s{2,}', ' ', cleaned).strip()
|
||
cleaned = re.sub(r'^[,.\s]+|[,.\s]+$', '', cleaned).strip()
|
||
return cleaned, found_date
|
||
|
||
return text, None
|
||
|
||
|
||
def load_saved_model() -> str:
|
||
"""Liest die zuletzt gewählte KG-Modell-ID aus der Config-Datei."""
|
||
try:
|
||
path = _config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
model = f.read().strip()
|
||
if model in ALLOWED_SUMMARY_MODELS:
|
||
return model
|
||
except Exception:
|
||
pass
|
||
return DEFAULT_SUMMARY_MODEL
|
||
|
||
|
||
def save_model(model: str) -> None:
|
||
"""Speichert die gewählte KG-Modell-ID in der Config-Datei."""
|
||
if model not in ALLOWED_SUMMARY_MODELS:
|
||
return
|
||
try:
|
||
with open(_config_path(), "w", encoding="utf-8") as f:
|
||
f.write(model)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _templates_config_path():
|
||
return os.path.join(get_writable_data_dir(), TEMPLATES_CONFIG_FILENAME)
|
||
|
||
|
||
# ─── KG Detail-Level (Kürzer/Ausführlicher-Stufe) ───
|
||
|
||
def _kg_detail_level_path():
|
||
return os.path.join(get_writable_data_dir(), KG_DETAIL_LEVEL_CONFIG_FILENAME)
|
||
|
||
|
||
def load_kg_detail_level() -> int:
|
||
"""Lädt die gespeicherte KG-Detailstufe. 0=Standard, negativ=kürzer, positiv=ausführlicher."""
|
||
try:
|
||
path = _kg_detail_level_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return int(f.read().strip())
|
||
except Exception:
|
||
pass
|
||
return 0
|
||
|
||
|
||
def save_kg_detail_level(level: int) -> None:
|
||
"""Speichert die KG-Detailstufe."""
|
||
try:
|
||
with open(_kg_detail_level_path(), "w", encoding="utf-8") as f:
|
||
f.write(str(level))
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def get_kg_detail_instruction(level: int) -> str:
|
||
"""Gibt die passende Anweisung für die KG-Erstellung basierend auf dem Detail-Level zurück."""
|
||
if level == 0:
|
||
return ""
|
||
if level <= -3:
|
||
return ("\n\nWICHTIG – STIL: Extrem knapp und kompakt. Nur Schlüsselwörter und Diagnosen mit ICD-10. "
|
||
"Keine ganzen Sätze, nur Stichpunkte. Maximal komprimiert.")
|
||
if level == -2:
|
||
return ("\n\nWICHTIG – STIL: Sehr kurz und kompakt. Kurze Stichpunkte, "
|
||
"keine ausführlichen Beschreibungen. Nur das Wesentliche.")
|
||
if level == -1:
|
||
return ("\n\nWICHTIG – STIL: Eher kurz und prägnant. Knapp formulieren, "
|
||
"auf das Wesentliche beschränken.")
|
||
if level == 1:
|
||
return ("\n\nWICHTIG – STIL: Etwas ausführlicher als normal. Stichpunkte zu kurzen Sätzen ausformulieren, "
|
||
"klinische Details ergänzen wo sinnvoll.")
|
||
if level == 2:
|
||
return ("\n\nWICHTIG – STIL: Ausführlich. Vollständige Sätze, detaillierte klinische Beschreibungen, "
|
||
"differentialdiagnostische Überlegungen wo relevant.")
|
||
if level >= 3:
|
||
return ("\n\nWICHTIG – STIL: Sehr ausführlich und detailliert. Vollständige Sätze, "
|
||
"ausführliche klinische Beschreibungen, differentialdiagnostische Überlegungen, "
|
||
"detaillierte Therapiebegründungen. Umfassende Dokumentation.")
|
||
return ""
|
||
|
||
|
||
# ─── SOAP-Abschnitts-Detailstufen (S, O, D einzeln steuerbar) ───
|
||
|
||
|
||
def _soap_section_levels_path():
|
||
return os.path.join(get_writable_data_dir(), SOAP_SECTION_LEVELS_CONFIG_FILENAME)
|
||
|
||
|
||
def load_soap_section_levels() -> dict:
|
||
"""Lädt die individuellen SOAP-Detailstufen: {"S": 0, "O": 0, "D": 0}."""
|
||
try:
|
||
path = _soap_section_levels_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict):
|
||
return {k: int(data.get(k, 0)) for k in _SOAP_SECTIONS}
|
||
except Exception:
|
||
pass
|
||
return {k: 0 for k in _SOAP_SECTIONS}
|
||
|
||
|
||
def save_soap_section_levels(levels: dict) -> None:
|
||
"""Speichert die individuellen SOAP-Detailstufen."""
|
||
try:
|
||
with open(_soap_section_levels_path(), "w", encoding="utf-8") as f:
|
||
json.dump({k: int(levels.get(k, 0)) for k in _SOAP_SECTIONS}, f)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def get_soap_section_instruction(levels: dict) -> str:
|
||
"""Erzeugt eine Prompt-Anweisung basierend auf individuellen SOAP-Section-Levels."""
|
||
parts = []
|
||
for key in _SOAP_SECTIONS:
|
||
lv = levels.get(key, 0)
|
||
if lv == 0:
|
||
continue
|
||
name = _SOAP_LABELS[key]
|
||
if lv <= -3:
|
||
parts.append(f"- {name}: Maximal komprimiert – nur die wichtigsten 1-2 Stichworte pro Punkt. Vorhandene Informationen beibehalten, nur kürzer formulieren.")
|
||
elif lv == -2:
|
||
parts.append(f"- {name}: Deutlich kürzer formulieren – gleiche Fakten, aber knapper auf den Punkt gebracht.")
|
||
elif lv == -1:
|
||
parts.append(f"- {name}: Leicht kürzer formulieren – gleicher Inhalt, etwas knappere Wortwahl.")
|
||
elif lv == 1:
|
||
parts.append(f"- {name}: Leicht ausführlicher formulieren – gleiche Fakten in vollständigeren Sätzen statt Stichpunkten. KEINE neuen Informationen erfinden.")
|
||
elif lv == 2:
|
||
parts.append(f"- {name}: Ausführlicher formulieren – vorhandene Stichpunkte in ganzen Sätzen ausformulieren. NUR vorhandene Informationen verwenden, NICHTS dazuerfinden.")
|
||
elif lv >= 3:
|
||
parts.append(f"- {name}: In vollständigen, ausführlichen Sätzen ausformulieren. Alle vorhandenen Punkte in Fliesstext umwandeln. STRIKT NUR vorhandene Informationen verwenden – KEINE neuen Fakten, Befunde oder Details erfinden.")
|
||
if not parts:
|
||
return ""
|
||
return ("\n\nWICHTIG – INDIVIDUELLE ABSCHNITTSLÄNGEN (zwingend einhalten):\n"
|
||
"ACHTUNG: Ausführlicher bedeutet NUR längere/vollständigere Formulierungen – NIEMALS neue Fakten, "
|
||
"Befunde oder Details erfinden, die nicht im Original stehen!\n"
|
||
+ "\n".join(parts))
|
||
|
||
|
||
def load_templates_text() -> str:
|
||
"""Liest den gespeicherten Template-Text (z. B. Fachrichtung/Kontext für die KI)."""
|
||
try:
|
||
path = _templates_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return f.read().strip()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
|
||
def save_templates_text(text: str) -> None:
|
||
"""Speichert den Template-Text."""
|
||
try:
|
||
with open(_templates_config_path(), "w", encoding="utf-8") as f:
|
||
f.write((text or "").strip())
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def strip_kg_warnings(text: str) -> str:
|
||
"""Entfernt typische Warn-/Hinweisphrasen aus der KG-Ausgabe (z. B. Verordnung/Beginn dokumentiert)."""
|
||
import re
|
||
def remove_warning_block(m):
|
||
content = m.group(1)
|
||
if any(
|
||
x in content
|
||
for x in (
|
||
"dokumentiert",
|
||
"Weiterverordnung",
|
||
"Überprüfung erforderlich",
|
||
)
|
||
):
|
||
return ""
|
||
return m.group(0)
|
||
|
||
result = re.sub(r"\(([^)]*)\)", remove_warning_block, text)
|
||
result = re.sub(r"(?<=\S) +", " ", result)
|
||
result = re.sub(r"\n{3,}", "\n\n", result)
|
||
# Therapie/Procedere-Format bereinigen
|
||
result = re.sub(r"Therapie/Procedere\s*:", "Therapie:", result, flags=re.IGNORECASE)
|
||
result = re.sub(r"^-\s*Therapie\s*:\s*", "- ", result, flags=re.MULTILINE | re.IGNORECASE)
|
||
result = re.sub(r"^-\s*Procedere\s*:\s*", "- ", result, flags=re.MULTILINE | re.IGNORECASE)
|
||
result = re.sub(r"\n{3,}", "\n\n", result)
|
||
_KG_HEADERS = (
|
||
r"Anamnese|Subjektiv|Sozialanamnese|Familienanamnese|"
|
||
r"Objektiv|Beurteilung|Diagnose|Diagnosen|Therapie|Procedere"
|
||
)
|
||
_BULLET = r"[ \t]{0,6}[\u2022\-\u2013]"
|
||
|
||
for _ in range(5):
|
||
result = re.sub(
|
||
r"(^" + _BULLET + r".*)\n\n+(" + _BULLET + r")",
|
||
r"\1\n\2", result, flags=re.MULTILINE)
|
||
|
||
for _ in range(3):
|
||
result = re.sub(
|
||
r"(?m)^((?:" + _KG_HEADERS + r")(?:\s*\(.*?\))?:?\s*)\n\s*\n(?=" + _BULLET + r")",
|
||
r"\1\n", result)
|
||
|
||
result = re.sub(
|
||
r"(?m)([^\n])\n(?=(?:" + _KG_HEADERS + r")(?:\s*\(.*?\))?:?\s*$)",
|
||
r"\1\n\n", result)
|
||
|
||
result = re.sub(r"^[ \t]*(\u2022)", r" \1", result, flags=re.MULTILINE)
|
||
return result.strip()
|
||
|
||
|
||
|
||
|
||
def _is_warning_comment(text: str) -> bool:
|
||
"""True, wenn der Klammer-Text eine Vorsicht/Warnung für den Arzt darstellt."""
|
||
t = text.lower().strip()
|
||
return any(kw in t for kw in COMMENT_KEYWORDS)
|
||
|
||
|
||
def _is_icd10_code(text: str) -> bool:
|
||
"""True, wenn der Klammer-Text ein ICD-10-GM-Code ist (z. B. L57.0, M79.1). Diese bleiben in der KG."""
|
||
import re
|
||
t = text.strip()
|
||
return bool(re.match(r"^[A-Z][0-9]{2}(\.[0-9]{1,2})?$", t, re.IGNORECASE))
|
||
|
||
|
||
def extract_kg_comments(text: str) -> tuple:
|
||
"""Entfernt Klammer-Inhalte aus der KG, außer ICD-10-Codes. Nur Vorsicht/Warnzeichen kommen ins graue Kommentarfeld."""
|
||
import re
|
||
lines = text.split("\n")
|
||
cleaned_lines = []
|
||
comments = []
|
||
for line in lines:
|
||
rest = line
|
||
line_comments = []
|
||
new_rest = ""
|
||
last_end = 0
|
||
for m in re.finditer(r"\(([^)]*)\)", rest):
|
||
content = m.group(1).strip()
|
||
if _is_icd10_code(content):
|
||
new_rest += rest[last_end : m.end()]
|
||
else:
|
||
new_rest += rest[last_end : m.start()]
|
||
if content:
|
||
line_comments.append(content)
|
||
last_end = m.end()
|
||
new_rest += rest[last_end:]
|
||
new_rest = re.sub(r" +", " ", new_rest).strip()
|
||
if line_comments:
|
||
context = new_rest.strip()
|
||
if context.startswith("- "):
|
||
context = context[2:].strip()
|
||
for c in line_comments:
|
||
if _is_warning_comment(c):
|
||
comments.append(f"- {context}: {c}")
|
||
cleaned_lines.append(new_rest)
|
||
cleaned = "\n".join(cleaned_lines)
|
||
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned).strip()
|
||
comments_text = "\n".join(comments) if comments else ""
|
||
return cleaned, comments_text
|
||
|
||
|
||
# ─── SOAP-Reihenfolge ───
|
||
|
||
def _soap_order_config_path():
|
||
return os.path.join(get_writable_data_dir(), SOAP_ORDER_CONFIG_FILENAME)
|
||
|
||
|
||
def _legacy_load_soap_order() -> list:
|
||
"""Migriert alte Einzeldatei-Konfiguration (nur einmalig beim ersten Start mit Presets)."""
|
||
try:
|
||
path = _soap_order_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, list):
|
||
return data
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def get_soap_order_instruction(order: list, visibility=None) -> str:
|
||
"""Erzeugt eine Prompt-Anweisung für die benutzerdefinierte SOAP-Reihenfolge (unter Berücksichtigung der Sichtbarkeit)."""
|
||
visible_order = order
|
||
if visibility:
|
||
visible_order = [k for k in order if visibility.get(k, True)]
|
||
if visible_order == DEFAULT_SOAP_ORDER:
|
||
return ""
|
||
names = [_SOAP_LABELS.get(k, k) for k in visible_order]
|
||
numbered = "\n".join(f" {i+1}. {n}" for i, n in enumerate(names))
|
||
return (
|
||
"\n\nWICHTIG – BENUTZERDEFINIERTE ABSCHNITTS-REIHENFOLGE (zwingend einhalten):\n"
|
||
"Ordne die Abschnitte der Krankengeschichte EXAKT in folgender Reihenfolge:\n"
|
||
f"{numbered}\n"
|
||
"Abschnitte, die nicht vorhanden sind, weglassen – aber die Reihenfolge der vorhandenen Abschnitte MUSS dieser Vorgabe entsprechen."
|
||
)
|
||
|
||
|
||
# ─── SOAP-Sichtbarkeit ───
|
||
|
||
def _soap_visibility_config_path():
|
||
return os.path.join(get_writable_data_dir(), SOAP_VISIBILITY_CONFIG_FILENAME)
|
||
|
||
|
||
def _legacy_load_soap_visibility() -> dict:
|
||
"""Migriert alte Einzeldatei-Konfiguration."""
|
||
try:
|
||
path = _soap_visibility_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict):
|
||
return {k: bool(data.get(k, True)) for k in DEFAULT_SOAP_ORDER}
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def get_soap_visibility_instruction(visibility: dict) -> str:
|
||
"""Erzeugt eine Prompt-Anweisung für ausgeblendete SOAP-Abschnitte."""
|
||
hidden = [_SOAP_LABELS.get(k, k) for k in DEFAULT_SOAP_ORDER if not visibility.get(k, True)]
|
||
if not hidden:
|
||
return ""
|
||
hidden_str = ", ".join(hidden)
|
||
return (
|
||
f"\n\nWICHTIG – AUSGEBLENDETE ABSCHNITTE (zwingend einhalten):\n"
|
||
f"Folgende Abschnitte dürfen NICHT in der Krankengeschichte erscheinen: {hidden_str}.\n"
|
||
f"Lasse diese Abschnitte komplett weg – keine Überschrift, kein Inhalt."
|
||
)
|
||
|
||
|
||
# ─── SOAP-Profile (KG) ───
|
||
|
||
def _soap_presets_path():
|
||
return os.path.join(get_writable_data_dir(), SOAP_PRESETS_CONFIG_FILENAME)
|
||
|
||
|
||
def _default_soap_presets():
|
||
return {
|
||
"active": 0,
|
||
"presets": [
|
||
{"name": f"Profil {i+1}",
|
||
"order": list(DEFAULT_SOAP_ORDER),
|
||
"visibility": {k: True for k in DEFAULT_SOAP_ORDER}}
|
||
for i in range(NUM_SOAP_PRESETS)
|
||
],
|
||
}
|
||
|
||
|
||
def load_soap_presets() -> dict:
|
||
try:
|
||
path = _soap_presets_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict) and "presets" in data:
|
||
for p in data["presets"]:
|
||
existing = set(p.get("order", []))
|
||
for k in DEFAULT_SOAP_ORDER:
|
||
if k not in existing:
|
||
p["order"].insert(0, k)
|
||
if k not in p.get("visibility", {}):
|
||
p["visibility"][k] = True
|
||
return data
|
||
except Exception:
|
||
pass
|
||
defaults = _default_soap_presets()
|
||
legacy_order = _legacy_load_soap_order()
|
||
legacy_vis = _legacy_load_soap_visibility()
|
||
if legacy_order or legacy_vis:
|
||
p0 = defaults["presets"][0]
|
||
if legacy_order:
|
||
for k in DEFAULT_SOAP_ORDER:
|
||
if k not in legacy_order:
|
||
legacy_order.insert(0, k)
|
||
p0["order"] = legacy_order
|
||
if legacy_vis:
|
||
p0["visibility"] = legacy_vis
|
||
return defaults
|
||
|
||
|
||
def save_soap_presets(data: dict) -> None:
|
||
try:
|
||
with open(_soap_presets_path(), "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def get_active_soap_preset(data: dict = None) -> dict:
|
||
if data is None:
|
||
data = load_soap_presets()
|
||
idx = data.get("active", 0)
|
||
presets = data.get("presets", [])
|
||
if 0 <= idx < len(presets):
|
||
return presets[idx]
|
||
return {"order": list(DEFAULT_SOAP_ORDER), "visibility": {k: True for k in DEFAULT_SOAP_ORDER}}
|
||
|
||
|
||
def load_soap_order() -> list:
|
||
"""Lädt die SOAP-Reihenfolge des aktiven Profils."""
|
||
preset = get_active_soap_preset()
|
||
return list(preset.get("order", DEFAULT_SOAP_ORDER))
|
||
|
||
|
||
def load_soap_visibility() -> dict:
|
||
"""Lädt die SOAP-Sichtbarkeit des aktiven Profils."""
|
||
preset = get_active_soap_preset()
|
||
vis = preset.get("visibility", {})
|
||
return {k: bool(vis.get(k, True)) for k in DEFAULT_SOAP_ORDER}
|
||
|
||
|
||
# ─── Brief-Profile ───
|
||
|
||
def _brief_presets_path():
|
||
return os.path.join(get_writable_data_dir(), BRIEF_PRESETS_CONFIG_FILENAME)
|
||
|
||
|
||
def _default_brief_presets():
|
||
import copy
|
||
return {
|
||
"active": 0,
|
||
"presets": [copy.deepcopy(p) for p in BRIEF_PROFILE_DEFAULTS],
|
||
}
|
||
|
||
|
||
def load_brief_presets() -> dict:
|
||
try:
|
||
path = _brief_presets_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict) and "presets" in data:
|
||
return data
|
||
except Exception:
|
||
pass
|
||
return _default_brief_presets()
|
||
|
||
|
||
def save_brief_presets(data: dict) -> None:
|
||
try:
|
||
with open(_brief_presets_path(), "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def get_active_brief_preset() -> dict:
|
||
data = load_brief_presets()
|
||
idx = data.get("active", 0)
|
||
presets = data.get("presets", [])
|
||
if 0 <= idx < len(presets):
|
||
return presets[idx]
|
||
import copy
|
||
return copy.deepcopy(BRIEF_PROFILE_DEFAULTS[0])
|
||
|
||
|
||
def get_brief_order_instruction() -> str:
|
||
preset = get_active_brief_preset()
|
||
order = preset.get("order", [])
|
||
labels = preset.get("labels", {})
|
||
vis = preset.get("visibility", {})
|
||
visible = [k for k in order if vis.get(k, True)]
|
||
if not visible:
|
||
return ""
|
||
names = [labels.get(k, k) for k in visible]
|
||
numbered = "\n".join(f" {i+1}. {n}" for i, n in enumerate(names))
|
||
hidden = [labels.get(k, k) for k in order if not vis.get(k, True)]
|
||
|
||
diag_keys = {"DI"}
|
||
sentence_keys = {"AN", "BE", "ZF", "EP", "AE", "VL"}
|
||
diag_names = [labels.get(k, k) for k in visible if k in diag_keys]
|
||
sentence_names = [labels.get(k, k) for k in visible if k in sentence_keys]
|
||
|
||
parts = [
|
||
"\n\nWICHTIG – BRIEF-ABSCHNITTSREIHENFOLGE UND FORMATIERUNG (zwingend einhalten):\n"
|
||
"Verwende EXAKT folgende Abschnitte in dieser Reihenfolge als Überschriften:\n"
|
||
f"{numbered}\n"
|
||
"Abschnitte, die nicht vorhanden sind, weglassen."
|
||
]
|
||
if hidden:
|
||
parts.append(
|
||
f"Folgende Abschnitte NICHT im Brief verwenden: {', '.join(hidden)}."
|
||
)
|
||
if diag_names:
|
||
parts.append(
|
||
f"FORMATIERUNG Diagnosen ({', '.join(diag_names)}): Stichwortartig als Aufzählung – "
|
||
"jede Diagnose eine Zeile mit ICD-10-GM-Code in eckigen Klammern."
|
||
)
|
||
if sentence_names:
|
||
parts.append(
|
||
f"FORMATIERUNG ({', '.join(sentence_names)}): In vollständigen, ausformulierten Sätzen schreiben – "
|
||
"wie in einem ärztlichen Brief üblich. Keine reinen Stichpunkte."
|
||
)
|
||
return "\n".join(parts)
|
||
|
||
|
||
# ─── Launcher-Startpräferenz ───
|
||
|
||
def _launcher_config_path():
|
||
return os.path.join(get_writable_data_dir(), LAUNCHER_CONFIG_FILENAME)
|
||
|
||
|
||
def load_launcher_prefs() -> dict:
|
||
"""Lädt Launcher-Startpräferenz. Rückgabe: {default_module, auto_open}."""
|
||
try:
|
||
path = _launcher_config_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict):
|
||
return {
|
||
"default_module": data.get("default_module", ""),
|
||
"auto_open": bool(data.get("auto_open", False)),
|
||
}
|
||
except Exception:
|
||
pass
|
||
return {"default_module": "", "auto_open": False}
|
||
|
||
|
||
def save_launcher_prefs(default_module: str, auto_open: bool) -> None:
|
||
"""Speichert Launcher-Startpräferenz."""
|
||
try:
|
||
with open(_launcher_config_path(), "w", encoding="utf-8") as f:
|
||
json.dump({"default_module": default_module, "auto_open": auto_open}, f, indent=2)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ─── Briefstil-Profile ───
|
||
|
||
SYSTEM_STYLE_PROFILES = ["Klinischer Bericht", "KISIM Bericht"]
|
||
|
||
|
||
def get_all_style_profile_choices() -> list:
|
||
"""Gibt die vollstaendige Profil-Liste zurueck: (keins), System-Profile, dann User-Profile."""
|
||
user_profiles = load_brief_style_profiles()
|
||
user_names = [k for k in user_profiles if k != "_active_profile"]
|
||
return ["(keins)"] + SYSTEM_STYLE_PROFILES + user_names
|
||
|
||
|
||
def _brief_style_profiles_path():
|
||
from aza_config import BRIEF_STYLE_PROFILES_FILENAME
|
||
return os.path.join(get_writable_data_dir(), BRIEF_STYLE_PROFILES_FILENAME)
|
||
|
||
|
||
def load_brief_style_profiles() -> dict:
|
||
"""Lädt alle gespeicherten Briefstil-Profile. Format: {profile_name: {...}}"""
|
||
try:
|
||
path = _brief_style_profiles_path()
|
||
if os.path.isfile(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, dict):
|
||
return data
|
||
except Exception:
|
||
pass
|
||
return {}
|
||
|
||
|
||
def save_brief_style_profiles(profiles: dict) -> None:
|
||
"""Speichert alle Briefstil-Profile."""
|
||
try:
|
||
with open(_brief_style_profiles_path(), "w", encoding="utf-8") as f:
|
||
json.dump(profiles, f, indent=2, ensure_ascii=False)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def get_active_brief_style_profile_name() -> str:
|
||
"""Gibt den Namen des aktiven Stilprofils zurück (leer = keins aktiv)."""
|
||
profiles = load_brief_style_profiles()
|
||
return profiles.get("_active_profile", "")
|
||
|
||
|
||
def set_active_brief_style_profile(name: str) -> None:
|
||
"""Setzt das aktive Stilprofil."""
|
||
profiles = load_brief_style_profiles()
|
||
profiles["_active_profile"] = name
|
||
save_brief_style_profiles(profiles)
|
||
|
||
|
||
def get_active_brief_style_prompt() -> str:
|
||
"""Gibt den Stil-Prompt des aktiven Profils zurück (leer = kein Profil aktiv)."""
|
||
profiles = load_brief_style_profiles()
|
||
active = profiles.get("_active_profile", "")
|
||
if not active:
|
||
return ""
|
||
profile = profiles.get(active, {})
|
||
return profile.get("style_prompt", "")
|
||
|
||
|
||
def extract_text_from_docx(file_path: str) -> str:
|
||
"""Extrahiert den vollständigen Text aus einer DOCX-Datei."""
|
||
try:
|
||
from docx import Document
|
||
doc = Document(file_path)
|
||
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
|
||
return "\n".join(paragraphs)
|
||
except Exception as e:
|
||
return f"[FEHLER beim Lesen von {os.path.basename(file_path)}: {e}]"
|
||
|
||
|
||
def extract_texts_from_docx_files(file_paths: list) -> list:
|
||
"""Extrahiert Texte aus mehreren DOCX-Dateien. Gibt Liste von (Dateiname, Text) zurück."""
|
||
results = []
|
||
for fp in file_paths:
|
||
basename = os.path.basename(fp)
|
||
text = extract_text_from_docx(fp)
|
||
results.append((basename, text))
|
||
return results
|