389 lines
14 KiB
Python
389 lines
14 KiB
Python
import asyncio
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
import tkinter as tk
|
|
from tkinter import messagebox, simpledialog
|
|
|
|
from aza_config import get_writable_data_dir
|
|
|
|
try:
|
|
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
|
|
from playwright.async_api import async_playwright
|
|
except Exception as exc: # pragma: no cover
|
|
async_playwright = None
|
|
PlaywrightTimeoutError = Exception
|
|
_PLAYWRIGHT_IMPORT_ERROR = exc
|
|
else:
|
|
_PLAYWRIGHT_IMPORT_ERROR = None
|
|
|
|
|
|
DEFAULT_STEPS = [
|
|
{"action": "click", "selector": 'button:has-text("Einstellungen")'},
|
|
{"action": "click", "selector": 'li:has-text("Importieren")'},
|
|
{"action": "fill", "selector": "#textarea-input", "value": "<clipboard>"},
|
|
{"action": "click", "selector": "#submit-button"},
|
|
]
|
|
DEFAULT_PROFILE_NAME = "macro1"
|
|
|
|
|
|
def _notify(title: str, text: str, is_error: bool = False) -> None:
|
|
root = tk.Tk()
|
|
root.withdraw()
|
|
try:
|
|
if is_error:
|
|
messagebox.showerror(title, text)
|
|
else:
|
|
messagebox.showinfo(title, text)
|
|
finally:
|
|
root.destroy()
|
|
|
|
|
|
def _ask_string(title: str, prompt: str, initial_value: str = "") -> str | None:
|
|
root = tk.Tk()
|
|
root.withdraw()
|
|
try:
|
|
return simpledialog.askstring(title, prompt, initialvalue=initial_value, parent=root)
|
|
finally:
|
|
root.destroy()
|
|
|
|
|
|
def _macro_steps_path() -> Path:
|
|
return Path(get_writable_data_dir()) / "macro_steps.json"
|
|
|
|
|
|
def _profiles_path() -> Path:
|
|
return Path(get_writable_data_dir()) / "macro_profiles.json"
|
|
|
|
|
|
def _load_steps() -> list[dict]:
|
|
path = _macro_steps_path()
|
|
if not path.exists():
|
|
return DEFAULT_STEPS
|
|
try:
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
if isinstance(data, list):
|
|
return data
|
|
except Exception:
|
|
pass
|
|
return DEFAULT_STEPS
|
|
|
|
|
|
def _load_profiles() -> dict:
|
|
path = _profiles_path()
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
if isinstance(data, dict):
|
|
return data
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _save_profiles(data: dict) -> None:
|
|
path = _profiles_path()
|
|
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def _normalize_click_steps(raw_clicks: list[dict]) -> list[dict]:
|
|
out = []
|
|
last_sel = None
|
|
for item in raw_clicks:
|
|
sel = str(item.get("selector", "")).strip()
|
|
if not sel:
|
|
continue
|
|
if sel == last_sel:
|
|
continue
|
|
out.append({"action": "click", "selector": sel})
|
|
last_sel = sel
|
|
return out
|
|
|
|
|
|
class BrowserMacro:
|
|
def __init__(self, cdp_url: str = "http://127.0.0.1:9222"):
|
|
self.cdp_url = cdp_url
|
|
self._pw = None
|
|
self.browser = None
|
|
self.context = None
|
|
self.page = None
|
|
|
|
async def connect_to_browser(self):
|
|
"""Verbindet sich via CDP mit einem laufenden Chromium-Browser."""
|
|
if async_playwright is None:
|
|
raise RuntimeError(
|
|
f"Playwright ist nicht installiert/importierbar: {_PLAYWRIGHT_IMPORT_ERROR}"
|
|
)
|
|
|
|
self._pw = await async_playwright().start()
|
|
self.browser = await self._pw.chromium.connect_over_cdp(self.cdp_url)
|
|
|
|
if self.browser.contexts:
|
|
self.context = self.browser.contexts[0]
|
|
else:
|
|
self.context = await self.browser.new_context()
|
|
|
|
if self.context.pages:
|
|
self.page = self.context.pages[0]
|
|
else:
|
|
self.page = await self.context.new_page()
|
|
|
|
return self.page
|
|
|
|
async def click_element(self, selector: str, timeout_ms: int = 12000):
|
|
"""Wartet strukturell auf ein sichtbares Element und klickt dann."""
|
|
await self.page.wait_for_selector(selector, state="visible", timeout=timeout_ms)
|
|
await self.page.click(selector, timeout=timeout_ms)
|
|
|
|
async def run_macro(self, steps: list[dict] | None = None):
|
|
if self.page is None:
|
|
await self.connect_to_browser()
|
|
|
|
await self.page.wait_for_load_state("networkidle")
|
|
await self.page.keyboard.press("Control+0")
|
|
|
|
sequence = steps or DEFAULT_STEPS
|
|
for idx, step in enumerate(sequence, start=1):
|
|
action = str(step.get("action", "")).strip().lower()
|
|
if action == "click":
|
|
selector = step.get("selector")
|
|
if not selector:
|
|
raise ValueError(f"Schritt {idx}: selector fehlt.")
|
|
await self.click_element(selector)
|
|
elif action == "fill":
|
|
selector = step.get("selector")
|
|
value = step.get("value", "")
|
|
if not selector:
|
|
raise ValueError(f"Schritt {idx}: selector fehlt.")
|
|
await self.page.wait_for_selector(selector, state="visible", timeout=12000)
|
|
await self.page.fill(selector, str(value))
|
|
elif action == "press":
|
|
key = step.get("key")
|
|
if not key:
|
|
raise ValueError(f"Schritt {idx}: key fehlt.")
|
|
await self.page.keyboard.press(str(key))
|
|
elif action == "wait":
|
|
ms = int(step.get("ms", 500))
|
|
await self.page.wait_for_timeout(ms)
|
|
elif action == "goto":
|
|
url = step.get("url")
|
|
if not url:
|
|
raise ValueError(f"Schritt {idx}: url fehlt.")
|
|
await self.page.goto(str(url), wait_until="networkidle")
|
|
else:
|
|
raise ValueError(f"Schritt {idx}: Unbekannte action '{action}'.")
|
|
|
|
async def run_saved_profile(self, profile_name: str):
|
|
profiles = _load_profiles()
|
|
profile = profiles.get(profile_name)
|
|
if not isinstance(profile, dict):
|
|
raise ValueError(
|
|
f"Profil '{profile_name}' nicht gefunden. Rechtsklick auf 'Macro 1' zum Aufnehmen."
|
|
)
|
|
url = str(profile.get("url", "")).strip()
|
|
steps = profile.get("steps")
|
|
if not isinstance(steps, list) or not steps:
|
|
raise ValueError(f"Profil '{profile_name}' enthält keine Schritte.")
|
|
|
|
if self.page is None:
|
|
await self.connect_to_browser()
|
|
|
|
if url:
|
|
await self.page.goto(url, wait_until="networkidle")
|
|
else:
|
|
await self.page.wait_for_load_state("networkidle")
|
|
await self.run_macro(steps)
|
|
|
|
async def record_profile(self, profile_name: str):
|
|
profiles = _load_profiles()
|
|
old_profile = profiles.get(profile_name, {}) if isinstance(profiles.get(profile_name), dict) else {}
|
|
initial_url = str(old_profile.get("url", "")).strip() or "https://"
|
|
url = _ask_string("Makro aufnehmen", "Zielseite (URL) für Macro 1:", initial_url)
|
|
if not url:
|
|
return
|
|
|
|
if self.page is None:
|
|
await self.connect_to_browser()
|
|
|
|
await self.page.goto(url, wait_until="networkidle")
|
|
await self.page.keyboard.press("Control+0")
|
|
await self.page.evaluate(
|
|
"""
|
|
() => {
|
|
const cssEscape = (v) => {
|
|
try { return CSS.escape(v); } catch (_) { return v.replace(/[^a-zA-Z0-9_-]/g, "_"); }
|
|
};
|
|
const txt = (v) => (v || "").replace(/\\s+/g, " ").trim();
|
|
const escTxt = (v) => txt(v).replace(/"/g, '\\"');
|
|
const buildPath = (el) => {
|
|
const parts = [];
|
|
let cur = el;
|
|
for (let i = 0; i < 5 && cur && cur.nodeType === 1; i++) {
|
|
let part = cur.tagName.toLowerCase();
|
|
if (cur.id) {
|
|
part += "#" + cssEscape(cur.id);
|
|
parts.unshift(part);
|
|
return parts.join(" > ");
|
|
}
|
|
const siblings = Array.from(cur.parentElement ? cur.parentElement.children : []).filter(
|
|
(s) => s.tagName === cur.tagName
|
|
);
|
|
if (siblings.length > 1) {
|
|
const pos = siblings.indexOf(cur) + 1;
|
|
part += `:nth-of-type(${pos})`;
|
|
}
|
|
parts.unshift(part);
|
|
cur = cur.parentElement;
|
|
}
|
|
return parts.join(" > ");
|
|
};
|
|
|
|
const chooseTarget = (el) => {
|
|
return el.closest('button,a,input,textarea,select,[role="button"],[data-testid]') || el;
|
|
};
|
|
const selectorFor = (el) => {
|
|
const t = chooseTarget(el);
|
|
if (!t) return "";
|
|
if (t.id) return "#" + cssEscape(t.id);
|
|
const testId = t.getAttribute("data-testid");
|
|
if (testId) return `[data-testid="${testId.replace(/"/g, '\\"')}"]`;
|
|
const name = t.getAttribute("name");
|
|
if (name) return `${t.tagName.toLowerCase()}[name="${name.replace(/"/g, '\\"')}"]`;
|
|
const aria = t.getAttribute("aria-label");
|
|
if (aria) return `${t.tagName.toLowerCase()}[aria-label="${aria.replace(/"/g, '\\"')}"]`;
|
|
const text = txt(t.innerText || t.textContent || "");
|
|
if (text && text.length <= 80) return `${t.tagName.toLowerCase()}:has-text("${escTxt(text)}")`;
|
|
return buildPath(t);
|
|
};
|
|
|
|
if (window.__macroRecorderCleanup) {
|
|
window.__macroRecorderCleanup();
|
|
}
|
|
window.__macroClicks = [];
|
|
window.__macroStopFlag = false;
|
|
|
|
const onClick = (ev) => {
|
|
const selector = selectorFor(ev.target);
|
|
if (!selector) return;
|
|
window.__macroClicks.push({ action: "click", selector });
|
|
};
|
|
const onKey = (ev) => {
|
|
if (ev.key === "F8") {
|
|
window.__macroStopFlag = true;
|
|
}
|
|
};
|
|
|
|
document.addEventListener("click", onClick, true);
|
|
document.addEventListener("keydown", onKey, true);
|
|
|
|
window.__macroRecorderCleanup = () => {
|
|
document.removeEventListener("click", onClick, true);
|
|
document.removeEventListener("keydown", onKey, true);
|
|
};
|
|
}
|
|
"""
|
|
)
|
|
|
|
_notify(
|
|
"Makro-Aufnahme",
|
|
"Aufnahme läuft.\n\n"
|
|
"1) Klicke im Browser die gewünschten Schritte.\n"
|
|
"2) Drücke F8 im Browser zum Beenden.\n\n"
|
|
"Danach wird 'Macro 1' gespeichert.",
|
|
is_error=False,
|
|
)
|
|
|
|
while True:
|
|
stop = await self.page.evaluate("() => !!window.__macroStopFlag")
|
|
if stop:
|
|
break
|
|
await asyncio.sleep(0.25)
|
|
|
|
raw_clicks = await self.page.evaluate(
|
|
"() => (Array.isArray(window.__macroClicks) ? window.__macroClicks : [])"
|
|
)
|
|
await self.page.evaluate(
|
|
"() => { if (window.__macroRecorderCleanup) window.__macroRecorderCleanup(); }"
|
|
)
|
|
|
|
steps = _normalize_click_steps(raw_clicks if isinstance(raw_clicks, list) else [])
|
|
if not steps:
|
|
raise ValueError("Keine Klicks aufgenommen. Bitte erneut probieren.")
|
|
|
|
profiles[profile_name] = {"url": url, "steps": steps}
|
|
_save_profiles(profiles)
|
|
_notify(
|
|
"Makro gespeichert",
|
|
f"Profil '{profile_name}' gespeichert ({len(steps)} Klick-Schritte).\n"
|
|
"Start per Linksklick auf 'Macro 1'.",
|
|
is_error=False,
|
|
)
|
|
|
|
async def close(self):
|
|
if self._pw is not None:
|
|
await self._pw.stop()
|
|
self._pw = None
|
|
|
|
|
|
async def _run_default():
|
|
macro = BrowserMacro()
|
|
try:
|
|
await macro.run_macro(_load_steps())
|
|
_notify(
|
|
"Makro",
|
|
"Makro erfolgreich gestartet/ausgeführt.\n\n"
|
|
"Hinweis: Schritte können in 'macro_steps.json' angepasst werden.",
|
|
is_error=False,
|
|
)
|
|
except PlaywrightTimeoutError as exc:
|
|
_notify("Makro-Timeout", f"Element nicht rechtzeitig gefunden:\n{exc}", is_error=True)
|
|
except Exception as exc:
|
|
_notify(
|
|
"Makro-Fehler",
|
|
"Makro konnte nicht ausgeführt werden.\n\n"
|
|
"Prüfe:\n"
|
|
"- Chrome/Edge mit --remote-debugging-port=9222 gestartet\n"
|
|
"- korrekte Selektoren in macro_steps.json\n"
|
|
f"- Details: {exc}",
|
|
is_error=True,
|
|
)
|
|
finally:
|
|
await macro.close()
|
|
|
|
|
|
async def _run_profile(profile_name: str):
|
|
macro = BrowserMacro()
|
|
try:
|
|
await macro.run_saved_profile(profile_name)
|
|
except Exception as exc:
|
|
_notify("Makro-Fehler", str(exc), is_error=True)
|
|
finally:
|
|
await macro.close()
|
|
|
|
|
|
async def _record_profile(profile_name: str):
|
|
macro = BrowserMacro()
|
|
try:
|
|
await macro.record_profile(profile_name)
|
|
except Exception as exc:
|
|
_notify("Makro-Fehler", str(exc), is_error=True)
|
|
finally:
|
|
await macro.close()
|
|
|
|
|
|
def main():
|
|
mode = (sys.argv[1].strip().lower() if len(sys.argv) > 1 else "")
|
|
profile_name = (sys.argv[2].strip() if len(sys.argv) > 2 else DEFAULT_PROFILE_NAME)
|
|
|
|
if mode == "record":
|
|
asyncio.run(_record_profile(profile_name))
|
|
elif mode == "run":
|
|
asyncio.run(_run_profile(profile_name))
|
|
else:
|
|
asyncio.run(_run_default())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|