Files
aza/backup 24.2.26 - Kopie (61)/aza_macro.py
2026-03-25 13:42:48 +01:00

389 lines
14 KiB
Python

import asyncio
import json
import sys
from pathlib import Path
import tkinter as tk
from tkinter import messagebox, simpledialog
from aza_config import get_writable_data_dir
try:
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from playwright.async_api import async_playwright
except Exception as exc: # pragma: no cover
async_playwright = None
PlaywrightTimeoutError = Exception
_PLAYWRIGHT_IMPORT_ERROR = exc
else:
_PLAYWRIGHT_IMPORT_ERROR = None
DEFAULT_STEPS = [
{"action": "click", "selector": 'button:has-text("Einstellungen")'},
{"action": "click", "selector": 'li:has-text("Importieren")'},
{"action": "fill", "selector": "#textarea-input", "value": "<clipboard>"},
{"action": "click", "selector": "#submit-button"},
]
DEFAULT_PROFILE_NAME = "macro1"
def _notify(title: str, text: str, is_error: bool = False) -> None:
root = tk.Tk()
root.withdraw()
try:
if is_error:
messagebox.showerror(title, text)
else:
messagebox.showinfo(title, text)
finally:
root.destroy()
def _ask_string(title: str, prompt: str, initial_value: str = "") -> str | None:
root = tk.Tk()
root.withdraw()
try:
return simpledialog.askstring(title, prompt, initialvalue=initial_value, parent=root)
finally:
root.destroy()
def _macro_steps_path() -> Path:
return Path(get_writable_data_dir()) / "macro_steps.json"
def _profiles_path() -> Path:
return Path(get_writable_data_dir()) / "macro_profiles.json"
def _load_steps() -> list[dict]:
path = _macro_steps_path()
if not path.exists():
return DEFAULT_STEPS
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, list):
return data
except Exception:
pass
return DEFAULT_STEPS
def _load_profiles() -> dict:
path = _profiles_path()
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
except Exception:
pass
return {}
def _save_profiles(data: dict) -> None:
path = _profiles_path()
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def _normalize_click_steps(raw_clicks: list[dict]) -> list[dict]:
out = []
last_sel = None
for item in raw_clicks:
sel = str(item.get("selector", "")).strip()
if not sel:
continue
if sel == last_sel:
continue
out.append({"action": "click", "selector": sel})
last_sel = sel
return out
class BrowserMacro:
def __init__(self, cdp_url: str = "http://127.0.0.1:9222"):
self.cdp_url = cdp_url
self._pw = None
self.browser = None
self.context = None
self.page = None
async def connect_to_browser(self):
"""Verbindet sich via CDP mit einem laufenden Chromium-Browser."""
if async_playwright is None:
raise RuntimeError(
f"Playwright ist nicht installiert/importierbar: {_PLAYWRIGHT_IMPORT_ERROR}"
)
self._pw = await async_playwright().start()
self.browser = await self._pw.chromium.connect_over_cdp(self.cdp_url)
if self.browser.contexts:
self.context = self.browser.contexts[0]
else:
self.context = await self.browser.new_context()
if self.context.pages:
self.page = self.context.pages[0]
else:
self.page = await self.context.new_page()
return self.page
async def click_element(self, selector: str, timeout_ms: int = 12000):
"""Wartet strukturell auf ein sichtbares Element und klickt dann."""
await self.page.wait_for_selector(selector, state="visible", timeout=timeout_ms)
await self.page.click(selector, timeout=timeout_ms)
async def run_macro(self, steps: list[dict] | None = None):
if self.page is None:
await self.connect_to_browser()
await self.page.wait_for_load_state("networkidle")
await self.page.keyboard.press("Control+0")
sequence = steps or DEFAULT_STEPS
for idx, step in enumerate(sequence, start=1):
action = str(step.get("action", "")).strip().lower()
if action == "click":
selector = step.get("selector")
if not selector:
raise ValueError(f"Schritt {idx}: selector fehlt.")
await self.click_element(selector)
elif action == "fill":
selector = step.get("selector")
value = step.get("value", "")
if not selector:
raise ValueError(f"Schritt {idx}: selector fehlt.")
await self.page.wait_for_selector(selector, state="visible", timeout=12000)
await self.page.fill(selector, str(value))
elif action == "press":
key = step.get("key")
if not key:
raise ValueError(f"Schritt {idx}: key fehlt.")
await self.page.keyboard.press(str(key))
elif action == "wait":
ms = int(step.get("ms", 500))
await self.page.wait_for_timeout(ms)
elif action == "goto":
url = step.get("url")
if not url:
raise ValueError(f"Schritt {idx}: url fehlt.")
await self.page.goto(str(url), wait_until="networkidle")
else:
raise ValueError(f"Schritt {idx}: Unbekannte action '{action}'.")
async def run_saved_profile(self, profile_name: str):
profiles = _load_profiles()
profile = profiles.get(profile_name)
if not isinstance(profile, dict):
raise ValueError(
f"Profil '{profile_name}' nicht gefunden. Rechtsklick auf 'Macro 1' zum Aufnehmen."
)
url = str(profile.get("url", "")).strip()
steps = profile.get("steps")
if not isinstance(steps, list) or not steps:
raise ValueError(f"Profil '{profile_name}' enthält keine Schritte.")
if self.page is None:
await self.connect_to_browser()
if url:
await self.page.goto(url, wait_until="networkidle")
else:
await self.page.wait_for_load_state("networkidle")
await self.run_macro(steps)
async def record_profile(self, profile_name: str):
profiles = _load_profiles()
old_profile = profiles.get(profile_name, {}) if isinstance(profiles.get(profile_name), dict) else {}
initial_url = str(old_profile.get("url", "")).strip() or "https://"
url = _ask_string("Makro aufnehmen", "Zielseite (URL) für Macro 1:", initial_url)
if not url:
return
if self.page is None:
await self.connect_to_browser()
await self.page.goto(url, wait_until="networkidle")
await self.page.keyboard.press("Control+0")
await self.page.evaluate(
"""
() => {
const cssEscape = (v) => {
try { return CSS.escape(v); } catch (_) { return v.replace(/[^a-zA-Z0-9_-]/g, "_"); }
};
const txt = (v) => (v || "").replace(/\\s+/g, " ").trim();
const escTxt = (v) => txt(v).replace(/"/g, '\\"');
const buildPath = (el) => {
const parts = [];
let cur = el;
for (let i = 0; i < 5 && cur && cur.nodeType === 1; i++) {
let part = cur.tagName.toLowerCase();
if (cur.id) {
part += "#" + cssEscape(cur.id);
parts.unshift(part);
return parts.join(" > ");
}
const siblings = Array.from(cur.parentElement ? cur.parentElement.children : []).filter(
(s) => s.tagName === cur.tagName
);
if (siblings.length > 1) {
const pos = siblings.indexOf(cur) + 1;
part += `:nth-of-type(${pos})`;
}
parts.unshift(part);
cur = cur.parentElement;
}
return parts.join(" > ");
};
const chooseTarget = (el) => {
return el.closest('button,a,input,textarea,select,[role="button"],[data-testid]') || el;
};
const selectorFor = (el) => {
const t = chooseTarget(el);
if (!t) return "";
if (t.id) return "#" + cssEscape(t.id);
const testId = t.getAttribute("data-testid");
if (testId) return `[data-testid="${testId.replace(/"/g, '\\"')}"]`;
const name = t.getAttribute("name");
if (name) return `${t.tagName.toLowerCase()}[name="${name.replace(/"/g, '\\"')}"]`;
const aria = t.getAttribute("aria-label");
if (aria) return `${t.tagName.toLowerCase()}[aria-label="${aria.replace(/"/g, '\\"')}"]`;
const text = txt(t.innerText || t.textContent || "");
if (text && text.length <= 80) return `${t.tagName.toLowerCase()}:has-text("${escTxt(text)}")`;
return buildPath(t);
};
if (window.__macroRecorderCleanup) {
window.__macroRecorderCleanup();
}
window.__macroClicks = [];
window.__macroStopFlag = false;
const onClick = (ev) => {
const selector = selectorFor(ev.target);
if (!selector) return;
window.__macroClicks.push({ action: "click", selector });
};
const onKey = (ev) => {
if (ev.key === "F8") {
window.__macroStopFlag = true;
}
};
document.addEventListener("click", onClick, true);
document.addEventListener("keydown", onKey, true);
window.__macroRecorderCleanup = () => {
document.removeEventListener("click", onClick, true);
document.removeEventListener("keydown", onKey, true);
};
}
"""
)
_notify(
"Makro-Aufnahme",
"Aufnahme läuft.\n\n"
"1) Klicke im Browser die gewünschten Schritte.\n"
"2) Drücke F8 im Browser zum Beenden.\n\n"
"Danach wird 'Macro 1' gespeichert.",
is_error=False,
)
while True:
stop = await self.page.evaluate("() => !!window.__macroStopFlag")
if stop:
break
await asyncio.sleep(0.25)
raw_clicks = await self.page.evaluate(
"() => (Array.isArray(window.__macroClicks) ? window.__macroClicks : [])"
)
await self.page.evaluate(
"() => { if (window.__macroRecorderCleanup) window.__macroRecorderCleanup(); }"
)
steps = _normalize_click_steps(raw_clicks if isinstance(raw_clicks, list) else [])
if not steps:
raise ValueError("Keine Klicks aufgenommen. Bitte erneut probieren.")
profiles[profile_name] = {"url": url, "steps": steps}
_save_profiles(profiles)
_notify(
"Makro gespeichert",
f"Profil '{profile_name}' gespeichert ({len(steps)} Klick-Schritte).\n"
"Start per Linksklick auf 'Macro 1'.",
is_error=False,
)
async def close(self):
if self._pw is not None:
await self._pw.stop()
self._pw = None
async def _run_default():
macro = BrowserMacro()
try:
await macro.run_macro(_load_steps())
_notify(
"Makro",
"Makro erfolgreich gestartet/ausgeführt.\n\n"
"Hinweis: Schritte können in 'macro_steps.json' angepasst werden.",
is_error=False,
)
except PlaywrightTimeoutError as exc:
_notify("Makro-Timeout", f"Element nicht rechtzeitig gefunden:\n{exc}", is_error=True)
except Exception as exc:
_notify(
"Makro-Fehler",
"Makro konnte nicht ausgeführt werden.\n\n"
"Prüfe:\n"
"- Chrome/Edge mit --remote-debugging-port=9222 gestartet\n"
"- korrekte Selektoren in macro_steps.json\n"
f"- Details: {exc}",
is_error=True,
)
finally:
await macro.close()
async def _run_profile(profile_name: str):
macro = BrowserMacro()
try:
await macro.run_saved_profile(profile_name)
except Exception as exc:
_notify("Makro-Fehler", str(exc), is_error=True)
finally:
await macro.close()
async def _record_profile(profile_name: str):
macro = BrowserMacro()
try:
await macro.record_profile(profile_name)
except Exception as exc:
_notify("Makro-Fehler", str(exc), is_error=True)
finally:
await macro.close()
def main():
mode = (sys.argv[1].strip().lower() if len(sys.argv) > 1 else "")
profile_name = (sys.argv[2].strip() if len(sys.argv) > 2 else DEFAULT_PROFILE_NAME)
if mode == "record":
asyncio.run(_record_profile(profile_name))
elif mode == "run":
asyncio.run(_run_profile(profile_name))
else:
asyncio.run(_run_default())
if __name__ == "__main__":
main()