import asyncio import json import sys from pathlib import Path import tkinter as tk from tkinter import messagebox, simpledialog from aza_config import get_writable_data_dir try: from playwright.async_api import TimeoutError as PlaywrightTimeoutError from playwright.async_api import async_playwright except Exception as exc: # pragma: no cover async_playwright = None PlaywrightTimeoutError = Exception _PLAYWRIGHT_IMPORT_ERROR = exc else: _PLAYWRIGHT_IMPORT_ERROR = None DEFAULT_STEPS = [ {"action": "click", "selector": 'button:has-text("Einstellungen")'}, {"action": "click", "selector": 'li:has-text("Importieren")'}, {"action": "fill", "selector": "#textarea-input", "value": ""}, {"action": "click", "selector": "#submit-button"}, ] DEFAULT_PROFILE_NAME = "macro1" def _notify(title: str, text: str, is_error: bool = False) -> None: root = tk.Tk() root.withdraw() try: if is_error: messagebox.showerror(title, text) else: messagebox.showinfo(title, text) finally: root.destroy() def _ask_string(title: str, prompt: str, initial_value: str = "") -> str | None: root = tk.Tk() root.withdraw() try: return simpledialog.askstring(title, prompt, initialvalue=initial_value, parent=root) finally: root.destroy() def _macro_steps_path() -> Path: return Path(get_writable_data_dir()) / "macro_steps.json" def _profiles_path() -> Path: return Path(get_writable_data_dir()) / "macro_profiles.json" def _load_steps() -> list[dict]: path = _macro_steps_path() if not path.exists(): return DEFAULT_STEPS try: data = json.loads(path.read_text(encoding="utf-8")) if isinstance(data, list): return data except Exception: pass return DEFAULT_STEPS def _load_profiles() -> dict: path = _profiles_path() if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8")) if isinstance(data, dict): return data except Exception: pass return {} def _save_profiles(data: dict) -> None: path = _profiles_path() path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") def _normalize_click_steps(raw_clicks: list[dict]) -> list[dict]: out = [] last_sel = None for item in raw_clicks: sel = str(item.get("selector", "")).strip() if not sel: continue if sel == last_sel: continue out.append({"action": "click", "selector": sel}) last_sel = sel return out class BrowserMacro: def __init__(self, cdp_url: str = "http://127.0.0.1:9222"): self.cdp_url = cdp_url self._pw = None self.browser = None self.context = None self.page = None async def connect_to_browser(self): """Verbindet sich via CDP mit einem laufenden Chromium-Browser.""" if async_playwright is None: raise RuntimeError( f"Playwright ist nicht installiert/importierbar: {_PLAYWRIGHT_IMPORT_ERROR}" ) self._pw = await async_playwright().start() self.browser = await self._pw.chromium.connect_over_cdp(self.cdp_url) if self.browser.contexts: self.context = self.browser.contexts[0] else: self.context = await self.browser.new_context() if self.context.pages: self.page = self.context.pages[0] else: self.page = await self.context.new_page() return self.page async def click_element(self, selector: str, timeout_ms: int = 12000): """Wartet strukturell auf ein sichtbares Element und klickt dann.""" await self.page.wait_for_selector(selector, state="visible", timeout=timeout_ms) await self.page.click(selector, timeout=timeout_ms) async def run_macro(self, steps: list[dict] | None = None): if self.page is None: await self.connect_to_browser() await self.page.wait_for_load_state("networkidle") await self.page.keyboard.press("Control+0") sequence = steps or DEFAULT_STEPS for idx, step in enumerate(sequence, start=1): action = str(step.get("action", "")).strip().lower() if action == "click": selector = step.get("selector") if not selector: raise ValueError(f"Schritt {idx}: selector fehlt.") await self.click_element(selector) elif action == "fill": selector = step.get("selector") value = step.get("value", "") if not selector: raise ValueError(f"Schritt {idx}: selector fehlt.") await self.page.wait_for_selector(selector, state="visible", timeout=12000) await self.page.fill(selector, str(value)) elif action == "press": key = step.get("key") if not key: raise ValueError(f"Schritt {idx}: key fehlt.") await self.page.keyboard.press(str(key)) elif action == "wait": ms = int(step.get("ms", 500)) await self.page.wait_for_timeout(ms) elif action == "goto": url = step.get("url") if not url: raise ValueError(f"Schritt {idx}: url fehlt.") await self.page.goto(str(url), wait_until="networkidle") else: raise ValueError(f"Schritt {idx}: Unbekannte action '{action}'.") async def run_saved_profile(self, profile_name: str): profiles = _load_profiles() profile = profiles.get(profile_name) if not isinstance(profile, dict): raise ValueError( f"Profil '{profile_name}' nicht gefunden. Rechtsklick auf 'Macro 1' zum Aufnehmen." ) url = str(profile.get("url", "")).strip() steps = profile.get("steps") if not isinstance(steps, list) or not steps: raise ValueError(f"Profil '{profile_name}' enthält keine Schritte.") if self.page is None: await self.connect_to_browser() if url: await self.page.goto(url, wait_until="networkidle") else: await self.page.wait_for_load_state("networkidle") await self.run_macro(steps) async def record_profile(self, profile_name: str): profiles = _load_profiles() old_profile = profiles.get(profile_name, {}) if isinstance(profiles.get(profile_name), dict) else {} initial_url = str(old_profile.get("url", "")).strip() or "https://" url = _ask_string("Makro aufnehmen", "Zielseite (URL) für Macro 1:", initial_url) if not url: return if self.page is None: await self.connect_to_browser() await self.page.goto(url, wait_until="networkidle") await self.page.keyboard.press("Control+0") await self.page.evaluate( """ () => { const cssEscape = (v) => { try { return CSS.escape(v); } catch (_) { return v.replace(/[^a-zA-Z0-9_-]/g, "_"); } }; const txt = (v) => (v || "").replace(/\\s+/g, " ").trim(); const escTxt = (v) => txt(v).replace(/"/g, '\\"'); const buildPath = (el) => { const parts = []; let cur = el; for (let i = 0; i < 5 && cur && cur.nodeType === 1; i++) { let part = cur.tagName.toLowerCase(); if (cur.id) { part += "#" + cssEscape(cur.id); parts.unshift(part); return parts.join(" > "); } const siblings = Array.from(cur.parentElement ? cur.parentElement.children : []).filter( (s) => s.tagName === cur.tagName ); if (siblings.length > 1) { const pos = siblings.indexOf(cur) + 1; part += `:nth-of-type(${pos})`; } parts.unshift(part); cur = cur.parentElement; } return parts.join(" > "); }; const chooseTarget = (el) => { return el.closest('button,a,input,textarea,select,[role="button"],[data-testid]') || el; }; const selectorFor = (el) => { const t = chooseTarget(el); if (!t) return ""; if (t.id) return "#" + cssEscape(t.id); const testId = t.getAttribute("data-testid"); if (testId) return `[data-testid="${testId.replace(/"/g, '\\"')}"]`; const name = t.getAttribute("name"); if (name) return `${t.tagName.toLowerCase()}[name="${name.replace(/"/g, '\\"')}"]`; const aria = t.getAttribute("aria-label"); if (aria) return `${t.tagName.toLowerCase()}[aria-label="${aria.replace(/"/g, '\\"')}"]`; const text = txt(t.innerText || t.textContent || ""); if (text && text.length <= 80) return `${t.tagName.toLowerCase()}:has-text("${escTxt(text)}")`; return buildPath(t); }; if (window.__macroRecorderCleanup) { window.__macroRecorderCleanup(); } window.__macroClicks = []; window.__macroStopFlag = false; const onClick = (ev) => { const selector = selectorFor(ev.target); if (!selector) return; window.__macroClicks.push({ action: "click", selector }); }; const onKey = (ev) => { if (ev.key === "F8") { window.__macroStopFlag = true; } }; document.addEventListener("click", onClick, true); document.addEventListener("keydown", onKey, true); window.__macroRecorderCleanup = () => { document.removeEventListener("click", onClick, true); document.removeEventListener("keydown", onKey, true); }; } """ ) _notify( "Makro-Aufnahme", "Aufnahme läuft.\n\n" "1) Klicke im Browser die gewünschten Schritte.\n" "2) Drücke F8 im Browser zum Beenden.\n\n" "Danach wird 'Macro 1' gespeichert.", is_error=False, ) while True: stop = await self.page.evaluate("() => !!window.__macroStopFlag") if stop: break await asyncio.sleep(0.25) raw_clicks = await self.page.evaluate( "() => (Array.isArray(window.__macroClicks) ? window.__macroClicks : [])" ) await self.page.evaluate( "() => { if (window.__macroRecorderCleanup) window.__macroRecorderCleanup(); }" ) steps = _normalize_click_steps(raw_clicks if isinstance(raw_clicks, list) else []) if not steps: raise ValueError("Keine Klicks aufgenommen. Bitte erneut probieren.") profiles[profile_name] = {"url": url, "steps": steps} _save_profiles(profiles) _notify( "Makro gespeichert", f"Profil '{profile_name}' gespeichert ({len(steps)} Klick-Schritte).\n" "Start per Linksklick auf 'Macro 1'.", is_error=False, ) async def close(self): if self._pw is not None: await self._pw.stop() self._pw = None async def _run_default(): macro = BrowserMacro() try: await macro.run_macro(_load_steps()) _notify( "Makro", "Makro erfolgreich gestartet/ausgeführt.\n\n" "Hinweis: Schritte können in 'macro_steps.json' angepasst werden.", is_error=False, ) except PlaywrightTimeoutError as exc: _notify("Makro-Timeout", f"Element nicht rechtzeitig gefunden:\n{exc}", is_error=True) except Exception as exc: _notify( "Makro-Fehler", "Makro konnte nicht ausgeführt werden.\n\n" "Prüfe:\n" "- Chrome/Edge mit --remote-debugging-port=9222 gestartet\n" "- korrekte Selektoren in macro_steps.json\n" f"- Details: {exc}", is_error=True, ) finally: await macro.close() async def _run_profile(profile_name: str): macro = BrowserMacro() try: await macro.run_saved_profile(profile_name) except Exception as exc: _notify("Makro-Fehler", str(exc), is_error=True) finally: await macro.close() async def _record_profile(profile_name: str): macro = BrowserMacro() try: await macro.record_profile(profile_name) except Exception as exc: _notify("Makro-Fehler", str(exc), is_error=True) finally: await macro.close() def main(): mode = (sys.argv[1].strip().lower() if len(sys.argv) > 1 else "") profile_name = (sys.argv[2].strip() if len(sys.argv) > 2 else DEFAULT_PROFILE_NAME) if mode == "record": asyncio.run(_record_profile(profile_name)) elif mode == "run": asyncio.run(_run_profile(profile_name)) else: asyncio.run(_run_default()) if __name__ == "__main__": main()