123 lines
5.0 KiB
Python
123 lines
5.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Tests: weißes Chat-Fenster beim Erststart + Peer-Wechsel-Latenz (minimaler Fix)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import inspect
|
|
import re
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parent
|
|
HTML = (ROOT / "web" / "empfang.html").read_text(encoding="utf-8-sig")
|
|
WEBVIEW = (ROOT / "aza_empfang_webview.py").read_text(encoding="utf-8")
|
|
|
|
|
|
def _func_body(src: str, name: str) -> str:
|
|
marker = f"function {name}"
|
|
idx = src.find(marker)
|
|
if idx < 0:
|
|
return ""
|
|
rest = src[idx + len(marker) :]
|
|
nxt = rest.find("\nfunction ")
|
|
return rest[:nxt] if nxt >= 0 else rest
|
|
|
|
|
|
class ChatWhiteFirstOpenTests(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.init_block = HTML[HTML.find("(async function init()") : HTML.find("})().catch(function(_eInitFatal)")]
|
|
cls.pulse = _func_body(HTML, "pulseTick")
|
|
start = HTML.find("window.addEventListener('focus'")
|
|
end = HTML.find("});", start) + 3
|
|
cls.focus_block = HTML[start:end]
|
|
|
|
def test_boot_ok_not_immediately_after_auth_success(self):
|
|
auth_idx = self.init_block.find("currentSession = me;")
|
|
boot_early = self.init_block.find("empfangShellBootOk();", auth_idx)
|
|
boot_ready_fn = self.init_block.find("empfangShellBootHideWhenUiReady")
|
|
self.assertGreater(boot_ready_fn, 0, "Boot-Hide-When-Ready fehlt")
|
|
if boot_early >= 0:
|
|
self.assertGreater(boot_early, boot_ready_fn, "empfangShellBootOk zu frueh nach Auth")
|
|
|
|
def test_boot_waits_for_users_and_shell_context(self):
|
|
fn = self.init_block[self.init_block.find("empfangShellBootHideWhenUiReady") :]
|
|
self.assertIn("await loadUsersFromServer();", fn)
|
|
self.assertIn("await refreshDesktopShellContextFromServer(true);", fn)
|
|
self.assertIn("empfangShellBootOk();", fn)
|
|
lu = fn.find("await loadUsersFromServer();")
|
|
ctx = fn.find("await refreshDesktopShellContextFromServer(true);")
|
|
ok = fn.find("empfangShellBootOk();")
|
|
self.assertLess(lu, ctx)
|
|
self.assertLess(ctx, ok)
|
|
|
|
def test_no_fixed_380ms_shell_context_timeout(self):
|
|
self.assertNotIn("setTimeout(function() {\n try {\n refreshDesktopShellContextFromServer(true);", self.init_block)
|
|
|
|
def test_health_probe_does_not_reload_on_boot_only(self):
|
|
m = re.search(
|
|
r"if state in \(([^)]+)\) or state\.startswith\(\"eval_err\"\):",
|
|
WEBVIEW,
|
|
)
|
|
self.assertIsNotNone(m, "Health-Probe-Branch nicht gefunden")
|
|
self.assertNotIn("boot_only", m.group(1))
|
|
|
|
def test_health_probe_still_reloads_blank(self):
|
|
self.assertIn('"blank"', WEBVIEW)
|
|
|
|
|
|
class ChatPeerSwitchLatencyTests(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.pulse = _func_body(HTML, "pulseTick")
|
|
start = HTML.find("window.addEventListener('focus'")
|
|
end = HTML.find("});", start) + 3
|
|
cls.focus_block = HTML[start:end]
|
|
|
|
def test_peer_refresh_before_pulse_fetch(self):
|
|
pr = self.pulse.find("empfangShellMaybeConsumePeerRefreshSignal")
|
|
fetch = self.pulse.find("await apiFetch(pulseUrl)")
|
|
self.assertGreater(pr, 0)
|
|
self.assertGreater(fetch, 0)
|
|
self.assertLess(pr, fetch)
|
|
|
|
def test_minichat_in_pulse_peer_refresh_block(self):
|
|
head = self.pulse[: self.pulse.find("await apiFetch")]
|
|
self.assertIn("isMiniChatMode()", head)
|
|
|
|
def test_minichat_in_pulse_context_throttle_block(self):
|
|
tail = self.pulse[self.pulse.find("await apiFetch") :]
|
|
self.assertIn("isMiniChatMode()", tail)
|
|
|
|
def test_focus_consumes_peer_signal_before_debounce(self):
|
|
deb = self.focus_block.find("_shellCtxFocusRefreshTs")
|
|
pr = self.focus_block.find("empfangShellMaybeConsumePeerRefreshSignal")
|
|
self.assertGreater(pr, 0)
|
|
self.assertGreater(deb, 0)
|
|
self.assertLess(pr, deb)
|
|
|
|
def test_timing_simulation_peer_signal_not_blocked_by_pulse(self):
|
|
"""Peer-Refresh muss logisch vor blockierendem /pulse liegen (Timing-Proxy)."""
|
|
lines = [ln.strip() for ln in self.pulse.splitlines() if ln.strip()]
|
|
peer_line = next(i for i, ln in enumerate(lines) if "empfangShellMaybeConsumePeerRefreshSignal" in ln)
|
|
pulse_line = next(i for i, ln in enumerate(lines) if "await apiFetch(pulseUrl)" in ln)
|
|
self.assertLess(peer_line, pulse_line)
|
|
|
|
|
|
class ChatWhiteLatencyRegressionTests(unittest.TestCase):
|
|
def test_existing_badge_tests_still_valid(self):
|
|
kp = _func_body(HTML, "kontaktPanelOpenMessagesBadgeInHuelle")
|
|
self.assertIn("focus_empfang_chat_shell", kp)
|
|
self.assertIn("dm_open_peer_user_id", kp)
|
|
|
|
def test_sso_and_office_ipc_unchanged(self):
|
|
focus_src = inspect.getsource(
|
|
__import__("aza_empfang_webview", fromlist=["EmpfangWebviewApi"]).EmpfangWebviewApi.focus_empfang_chat_shell
|
|
)
|
|
self.assertIn("request_office_open_empfang_chat_shell_ipc", focus_src)
|
|
self.assertIn("touch_shell_peer_refresh_signal", focus_src)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|