update
This commit is contained in:
@@ -16,10 +16,132 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||
|
||||
|
||||
def _strip_minichat_query(href: str) -> str:
|
||||
"""URL wie Browser-Chat: Query-Parameter minichat und mode=minichat entfernen."""
|
||||
raw = str(href or "").strip()
|
||||
if not raw:
|
||||
return ""
|
||||
try:
|
||||
p = urlparse(raw)
|
||||
qs = parse_qs(p.query, keep_blank_values=True)
|
||||
qs.pop("minichat", None)
|
||||
if "mode" in qs:
|
||||
modes = [str(x).lower() for x in qs.get("mode") or []]
|
||||
if modes == ["minichat"]:
|
||||
qs.pop("mode", None)
|
||||
return urlunparse(p._replace(query=urlencode(qs, doseq=True)))
|
||||
except Exception:
|
||||
return raw
|
||||
|
||||
|
||||
def _own_top_level_hwnds() -> list[int]:
|
||||
"""Top-Level-HWNDs des EIGENEN Prozesses (Windows; sichtbare Fenster)."""
|
||||
if sys.platform != "win32":
|
||||
return []
|
||||
try:
|
||||
import ctypes
|
||||
from ctypes import wintypes
|
||||
|
||||
user32 = ctypes.windll.user32
|
||||
my_pid = int(os.getpid())
|
||||
out: list[int] = []
|
||||
|
||||
@ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HWND, wintypes.LPARAM)
|
||||
def _enum(hwnd, _lp):
|
||||
try:
|
||||
if not user32.IsWindowVisible(hwnd):
|
||||
return True
|
||||
pid = wintypes.DWORD()
|
||||
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
|
||||
if int(pid.value) == my_pid:
|
||||
out.append(int(hwnd))
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
user32.EnumWindows(_enum, 0)
|
||||
return out
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _apply_win32_topmost(value: bool) -> bool:
|
||||
"""Erzwingt TOPMOST/NOTOPMOST per SetWindowPos auf alle eigenen Top-Level-Fenster.
|
||||
|
||||
Workaround fuer pywebview/WinForms+WebView2: ``window.on_top = True`` setzt
|
||||
das WinForms-TopMost-Flag, greift aber nicht zuverlaessig auf das echte
|
||||
Top-Level-Fenster im WebView2-Host. Ein einmaliger SetWindowPos-Aufruf nach
|
||||
dem Setter sorgt zuverlaessig fuer das tatsaechliche Always-on-top.
|
||||
Kein SetForegroundWindow, kein Loop.
|
||||
"""
|
||||
if sys.platform != "win32":
|
||||
return False
|
||||
try:
|
||||
import ctypes
|
||||
|
||||
user32 = ctypes.windll.user32
|
||||
HWND_TOPMOST = -1
|
||||
HWND_NOTOPMOST = -2
|
||||
SWP_NOMOVE = 0x0002
|
||||
SWP_NOSIZE = 0x0001
|
||||
SWP_NOACTIVATE = 0x0010
|
||||
flags = SWP_NOMOVE | SWP_NOSIZE | SWP_NOACTIVATE
|
||||
target = HWND_TOPMOST if value else HWND_NOTOPMOST
|
||||
any_ok = False
|
||||
for hwnd in _own_top_level_hwnds():
|
||||
try:
|
||||
if user32.SetWindowPos(hwnd, target, 0, 0, 0, 0, flags):
|
||||
any_ok = True
|
||||
except Exception:
|
||||
pass
|
||||
return any_ok
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _focus_other_empfang_host_window() -> bool:
|
||||
"""Bringt ein anderes AzA-Empfang-Fenster (anderer Prozess) in den Vordergrund."""
|
||||
if sys.platform != "win32":
|
||||
return False
|
||||
try:
|
||||
import ctypes
|
||||
from ctypes import wintypes
|
||||
|
||||
user32 = ctypes.windll.user32
|
||||
my_pid = int(os.getpid())
|
||||
handles: list[int] = []
|
||||
|
||||
@ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HWND, wintypes.LPARAM)
|
||||
def _enum(hwnd, _lp):
|
||||
if not user32.IsWindowVisible(hwnd):
|
||||
return True
|
||||
buf = ctypes.create_unicode_buffer(260)
|
||||
user32.GetWindowTextW(hwnd, buf, 260)
|
||||
if buf.value != "AzA-Empfang":
|
||||
return True
|
||||
pid = wintypes.DWORD()
|
||||
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
|
||||
if int(pid.value) != my_pid:
|
||||
handles.append(int(hwnd))
|
||||
return True
|
||||
|
||||
user32.EnumWindows(_enum, 0)
|
||||
if not handles:
|
||||
return False
|
||||
hwnd = wintypes.HWND(handles[0])
|
||||
sw_restore = 9
|
||||
user32.ShowWindow(hwnd, sw_restore)
|
||||
user32.SetForegroundWindow(hwnd)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _empfang_shell_icon_path() -> str | None:
|
||||
@@ -70,36 +192,12 @@ class EmpfangWebviewApi:
|
||||
self._window = None
|
||||
self._on_top = _load_shell_pin_on_top()
|
||||
self._pin_lock = threading.Lock()
|
||||
self._pin_initial_applied = False
|
||||
|
||||
def bind_window(self, window) -> None:
|
||||
self._window = window
|
||||
self._schedule_initial_pin_apply()
|
||||
|
||||
def _schedule_initial_pin_apply(self) -> None:
|
||||
"""Persistierten Pin-Zustand EINMAL anwenden, ohne UI-Thread zu blockieren.
|
||||
|
||||
Wir haengen uns NICHT an ``window.events.loaded`` (das wuerde bei jedem
|
||||
Reload/Navigation erneut feuern und kann unter pywebview/Windows den
|
||||
UI-Thread blockieren -> "Keine Rueckmeldung"). Statt dessen einmaliges,
|
||||
kurz verzoegertes Setzen aus einem Daemon-Thread.
|
||||
"""
|
||||
|
||||
def _apply_once() -> None:
|
||||
if self._pin_initial_applied:
|
||||
return
|
||||
self._pin_initial_applied = True
|
||||
time.sleep(0.6)
|
||||
try:
|
||||
if self._window is not None:
|
||||
self._window.on_top = self._on_top
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
threading.Thread(target=_apply_once, daemon=True).start()
|
||||
|
||||
def toggle_on_top(self):
|
||||
"""Wechselt always-on-top (nur dieses Empfang-Fenster). Wie aza_empfang_app._Api."""
|
||||
"""Wechselt always-on-top zuverlaessig: pywebview-Setter UND Win32 SetWindowPos."""
|
||||
if not self._pin_lock.acquire(blocking=False):
|
||||
return self._on_top
|
||||
try:
|
||||
@@ -110,20 +208,107 @@ class EmpfangWebviewApi:
|
||||
|
||||
_save_shell_pin_on_top(new_val)
|
||||
|
||||
def _apply() -> None:
|
||||
try:
|
||||
win = self._window
|
||||
if win is not None:
|
||||
win.on_top = new_val
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
threading.Thread(target=_apply, daemon=True).start()
|
||||
try:
|
||||
win = self._window
|
||||
if win is not None:
|
||||
win.on_top = new_val
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
_apply_win32_topmost(new_val)
|
||||
except Exception:
|
||||
pass
|
||||
return new_val
|
||||
|
||||
def get_on_top(self):
|
||||
return self._on_top
|
||||
|
||||
def apply_saved_on_top(self) -> dict:
|
||||
"""Persistierten Pin-Zustand einmal anwenden (nach pywebviewready vom JS aufgerufen).
|
||||
|
||||
Synchron im pywebview-API-Thread, KEIN Daemon-Thread, KEIN loaded-Hook.
|
||||
Setzt zusaetzlich SetWindowPos(HWND_TOPMOST), weil pywebview/WinForms
|
||||
unter WebView2 sonst oft kein echtes TopMost erzwingt.
|
||||
"""
|
||||
try:
|
||||
win = self._window
|
||||
if win is not None:
|
||||
win.on_top = self._on_top
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
_apply_win32_topmost(self._on_top)
|
||||
except Exception:
|
||||
pass
|
||||
return {"ok": True, "on_top": bool(self._on_top)}
|
||||
|
||||
def open_minichat(self, href: str) -> dict:
|
||||
"""Zweites schmales Empfang-Fenster (subprocess wie Desktop-Starter).
|
||||
|
||||
Nicht-blockierend: Popen in Daemon-Thread. Damit hat das MiniChat-Fenster
|
||||
eine eigene pywebview-Instanz inkl. Pin (on_top), ohne window.open-Popup ohne API.
|
||||
"""
|
||||
url = str(href or "").strip()
|
||||
if not url:
|
||||
return {"ok": False, "reason": "empty"}
|
||||
try:
|
||||
p = urlparse(url)
|
||||
qs = parse_qs(p.query, keep_blank_values=True)
|
||||
qs["minichat"] = ["1"]
|
||||
full = urlunparse(p._replace(query=urlencode(qs, doseq=True)))
|
||||
except Exception:
|
||||
full = url
|
||||
w, h = 600, 820
|
||||
|
||||
def _spawn() -> None:
|
||||
try:
|
||||
script = Path(__file__).resolve()
|
||||
if getattr(sys, "frozen", False):
|
||||
cmd = [sys.executable, full, str(w), str(h)]
|
||||
cwd = str(Path(sys.executable).resolve().parent)
|
||||
else:
|
||||
cmd = [sys.executable, str(script), full, str(w), str(h)]
|
||||
cwd = str(script.parent)
|
||||
subprocess.Popen(
|
||||
cmd,
|
||||
cwd=cwd,
|
||||
close_fds=(sys.platform != "win32"),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
threading.Thread(target=_spawn, daemon=True).start()
|
||||
return {"ok": True}
|
||||
|
||||
def open_browser_chat(self, href: str) -> dict:
|
||||
"""Volle Empfang-Ansicht ohne minichat: erst anderes Fenster fokussieren, sonst neuer Prozess."""
|
||||
url = _strip_minichat_query(href)
|
||||
if not url:
|
||||
return {"ok": False, "reason": "empty"}
|
||||
w_main, h_main = 1180, 820
|
||||
|
||||
def _work() -> None:
|
||||
if _focus_other_empfang_host_window():
|
||||
return
|
||||
try:
|
||||
script = Path(__file__).resolve()
|
||||
if getattr(sys, "frozen", False):
|
||||
cmd = [sys.executable, url, str(w_main), str(h_main)]
|
||||
cwd = str(Path(sys.executable).resolve().parent)
|
||||
else:
|
||||
cmd = [sys.executable, str(script), url, str(w_main), str(h_main)]
|
||||
cwd = str(script.parent)
|
||||
subprocess.Popen(
|
||||
cmd,
|
||||
cwd=cwd,
|
||||
close_fds=(sys.platform != "win32"),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
threading.Thread(target=_work, daemon=True).start()
|
||||
return {"ok": True}
|
||||
|
||||
@staticmethod
|
||||
def _pinsel_eval_js(win, js_code: str, label: str) -> None:
|
||||
"""Lokal nur stdout/stderr; keine Clipboard-/Patient-Inhalte loggen."""
|
||||
@@ -249,16 +434,140 @@ class EmpfangWebviewApi:
|
||||
return {"ok": True, "scheduled": True}
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
argv = argv if argv is not None else sys.argv[1:]
|
||||
if not argv or not argv[0].strip():
|
||||
def _append_query_marker(url: str, key: str, value: str) -> str:
|
||||
"""Setzt/ersetzt einen Query-Parameter, ohne andere Parameter zu zerstoeren."""
|
||||
try:
|
||||
p = urlparse(url)
|
||||
qs = parse_qs(p.query, keep_blank_values=True)
|
||||
qs[key] = [value]
|
||||
return urlunparse(p._replace(query=urlencode(qs, doseq=True)))
|
||||
except Exception:
|
||||
sep = "&" if "?" in url else "?"
|
||||
return f"{url}{sep}{key}={value}"
|
||||
|
||||
|
||||
def _build_default_empfang_chat_shell_url() -> str:
|
||||
"""Standard-URL der separaten Empfang-Chat-Huelle (kein Arzt-Desktop):
|
||||
nutzt aza_empfang_app._empfang_url() bzw. den oeffentlichen Fallback und setzt
|
||||
den Marker ?empfang_chat_shell=1 fuer empfang.html (Login-/Handoff-UI).
|
||||
"""
|
||||
try:
|
||||
from aza_empfang_app import _empfang_url # noqa: WPS433 (lazy)
|
||||
base = _empfang_url()
|
||||
except Exception:
|
||||
base = "https://empfang.aza-medwork.ch/empfang/"
|
||||
return _append_query_marker(base, "empfang_chat_shell", "1")
|
||||
|
||||
|
||||
def _normalize_handoff_code(raw: str) -> str:
|
||||
s = (raw or "").strip().upper().replace(" ", "")
|
||||
for ch in ("\u2011", "\u2013", "\u2014", "\u2212", "_"):
|
||||
s = s.replace(ch, "-")
|
||||
return s
|
||||
|
||||
|
||||
def _resolve_handoff_code_to_launch_url(base_url: str, code: str) -> str | None:
|
||||
"""Loest XXXX-XXXX serverseitig in /empfang/shell/launch?token=... auf.
|
||||
Sicherheit: Code IST das Geheimnis; einmaliger Verbrauch durch Server.
|
||||
"""
|
||||
try:
|
||||
from urllib.parse import quote
|
||||
from urllib.request import urlopen, Request as _Req
|
||||
except Exception:
|
||||
return None
|
||||
try:
|
||||
p = urlparse(base_url)
|
||||
origin = f"{p.scheme}://{p.netloc}" if p.scheme and p.netloc else None
|
||||
if not origin:
|
||||
return None
|
||||
c = _normalize_handoff_code(code)
|
||||
if not c:
|
||||
return None
|
||||
url = f"{origin}/empfang/handoff/lookup?code={quote(c, safe='')}"
|
||||
req = _Req(url, method="GET", headers={
|
||||
"User-Agent": "AzA-EmpfangShell/handoff",
|
||||
"Accept": "application/json",
|
||||
})
|
||||
import ssl
|
||||
try:
|
||||
ctx = ssl.create_default_context()
|
||||
with urlopen(req, timeout=10, context=ctx) as r:
|
||||
raw = r.read(8192)
|
||||
except ssl.SSLError:
|
||||
ctx = ssl._create_unverified_context()
|
||||
with urlopen(req, timeout=10, context=ctx) as r:
|
||||
raw = r.read(8192)
|
||||
data = json.loads(raw.decode("utf-8", errors="replace"))
|
||||
if not isinstance(data, dict) or not data.get("success"):
|
||||
return None
|
||||
path = str(data.get("launch_path") or "").strip()
|
||||
if not path or not path.startswith("/"):
|
||||
return None
|
||||
return f"{origin}{path}"
|
||||
except Exception as exc:
|
||||
print(
|
||||
"Usage:\n"
|
||||
' python aza_empfang_webview.py "https://host/empfang/shell/launch?token=..."',
|
||||
f"[handoff] lookup failed: {type(exc).__name__}",
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
return 2
|
||||
url = argv[0].strip()
|
||||
return None
|
||||
|
||||
|
||||
def _split_argv_for_handoff(argv: list[str]) -> tuple[list[str], str, str]:
|
||||
"""Filtert --handoff-token=... und --handoff-code=... aus argv heraus.
|
||||
|
||||
Rueckgabe: (uebriges_argv, handoff_token, handoff_code).
|
||||
"""
|
||||
rest: list[str] = []
|
||||
tok = ""
|
||||
code = ""
|
||||
for a in argv:
|
||||
s = str(a or "")
|
||||
ls = s.strip()
|
||||
if ls.startswith("--handoff-token="):
|
||||
tok = ls.split("=", 1)[1].strip()
|
||||
continue
|
||||
if ls.startswith("--handoff-code="):
|
||||
code = ls.split("=", 1)[1].strip()
|
||||
continue
|
||||
rest.append(s)
|
||||
return rest, tok, code
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
argv = list(argv if argv is not None else sys.argv[1:])
|
||||
argv, handoff_token, handoff_code = _split_argv_for_handoff(argv)
|
||||
|
||||
if not argv or not str(argv[0]).strip():
|
||||
# Standalone / Doppelklick: separate Empfang-Chat-Huelle. Marker fuer empfang.html setzen.
|
||||
url = _build_default_empfang_chat_shell_url()
|
||||
argv = [url]
|
||||
url = str(argv[0]).strip()
|
||||
|
||||
if handoff_token and "/empfang/shell/launch" not in url:
|
||||
# Direkter Token-Start (z.B. aus AzA-Desktop): bevorzugt verwenden.
|
||||
try:
|
||||
from urllib.parse import quote as _q
|
||||
p = urlparse(url)
|
||||
origin = f"{p.scheme}://{p.netloc}" if p.scheme and p.netloc else ""
|
||||
if origin:
|
||||
url = (
|
||||
f"{origin}/empfang/shell/launch?token={_q(handoff_token, safe='')}"
|
||||
f"&target=empfang_chat_shell"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
elif handoff_code:
|
||||
# Verbindungscode (XXXX-XXXX) per Lookup in einen einmaligen Launch-Token tauschen.
|
||||
resolved = _resolve_handoff_code_to_launch_url(url, handoff_code)
|
||||
if resolved:
|
||||
url = resolved
|
||||
else:
|
||||
print(
|
||||
"[handoff] Verbindungscode konnte nicht eingeloest werden — starte Login-Seite.",
|
||||
file=sys.stderr, flush=True,
|
||||
)
|
||||
|
||||
w = int(argv[1]) if len(argv) > 1 and str(argv[1]).isdigit() else 1180
|
||||
h = int(argv[2]) if len(argv) > 2 and str(argv[2]).isdigit() else 820
|
||||
|
||||
@@ -298,4 +607,13 @@ def main(argv: list[str] | None = None) -> int:
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if sys.platform == "win32":
|
||||
try:
|
||||
import ctypes
|
||||
|
||||
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID(
|
||||
"ch.aza-medwork.empfang.shellwebview",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
raise SystemExit(main())
|
||||
|
||||
Reference in New Issue
Block a user