Files
aza/AzA march 2026/aza_workspace_sync.py
2026-05-06 22:43:22 +02:00

566 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""Hybride Persistenz: lokale JSON-Dateien als Hauptquelle, Supabase todo_sync nur als Backup.
Die globale Autotext-Expansion liest weiterhin über load_autotext() vom Datenträger
keine Serverabhängigkeit im Hook.
Synchronisation: nach Start kurz zusammenführen + im Hintergrund Backup pushen.
**Lizenz-Workspace**: Zeilen in ``todo_sync`` sind pro Lizenz-Fingerprint vergeben
(siehe ``aza_workspace_license.workspace_cloud_row_ids``). Die älteren festen IDs
``3``/``4`` werden nicht mehr für neuen Traffic verwendet (keine Vermischung
zwischen Lizenzen).
Konflikte je Kürzel/Textblock über entry_meta.updated_at / slot.updated_at; ohne
Zeitstempel gilt bei Text-Unterschied Offline-Primat (lokaler Wert bleibt).
Gleicher Text → Vereinigung ohne Verlust.
**Nicht** Bestandteil dieses Workspace-Backup: Praxis-/Chat-spezifische Daten,
Empfangspräferenzen usw.; diese bleiben rein lokal bzw. folgen anderen Pfaden.
"""
from __future__ import annotations
import json
import threading
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from typing import Any, Dict, Optional, Tuple
try:
from aza_workspace_license import (
DEMO_WORKSPACE_TAG,
resolve_workspace_identity_tag,
workspace_cloud_row_ids,
)
except Exception: # pragma: no cover — Falls Modul beim Pack fehlt, defensiv ausweichen
def resolve_workspace_identity_tag() -> str: # type: ignore
return ""
DEMO_WORKSPACE_TAG = "__demo__"
def workspace_cloud_row_ids(_workspace_tag: str) -> Tuple[Optional[int], Optional[int]]: # type: ignore
return None, None
try:
from aza_config import _SUPABASE_ANON_KEY, _SUPABASE_URL
except Exception: # pragma: no cover
_SUPABASE_URL = ""
_SUPABASE_ANON_KEY = ""
# Historische globale Rows (ältere Builds): nicht mehr aktiv nutzen.
WORKSPACE_AUTOTEXT_SYNC_ID = 3
WORKSPACE_TEXTBLOECKE_SYNC_ID = 4
def _effective_workspace_backup_row_ids() -> Tuple[Optional[int], Optional[int]]:
tag = resolve_workspace_identity_tag()
if not tag or tag == DEMO_WORKSPACE_TAG:
return None, None
return workspace_cloud_row_ids(tag)
_SYNC_PUSH_DELAY_SEC = 0.65
_push_timer_lock = threading.Lock()
_push_timer_state: Optional[threading.Timer] = None
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def parse_iso_ts(s: Optional[str]) -> Optional[datetime]:
if not s or not isinstance(s, str):
return None
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def normalize_autotext_entries(entries: Any) -> Dict[str, str]:
"""Nur String-Werte kompatibel mit globalem Listener."""
out: Dict[str, str] = {}
if not isinstance(entries, dict):
return out
for k, v in entries.items():
if not isinstance(k, str) or not k.strip():
continue
if isinstance(v, str):
out[k] = v
elif isinstance(v, dict) and "text" in v:
out[k] = str(v.get("text") or "")
return out
def _workspace_tag_backup_meta() -> Dict[str, Any]:
try:
t = resolve_workspace_identity_tag()
if t and t != DEMO_WORKSPACE_TAG:
return {"workspace_identity_tag_sha256_prefix": t[:16]}
except Exception:
pass
return {}
def cloud_push_workspace_row(row_id: int, data: Any) -> bool:
if not (_SUPABASE_URL and _SUPABASE_ANON_KEY):
return False
payload = json.dumps({"id": row_id, "data": data}).encode("utf-8")
req = urllib.request.Request(
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.{row_id}",
data=payload,
method="PATCH",
headers={
"apikey": _SUPABASE_ANON_KEY,
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
"Content-Type": "application/json",
},
)
try:
urllib.request.urlopen(req, timeout=14)
return True
except Exception:
try:
req2 = urllib.request.Request(
f"{_SUPABASE_URL}/rest/v1/todo_sync",
data=payload,
method="POST",
headers={
"apikey": _SUPABASE_ANON_KEY,
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
"Content-Type": "application/json",
"Prefer": "return=minimal",
},
)
urllib.request.urlopen(req2, timeout=14)
return True
except Exception:
return False
def cloud_pull_workspace_row(row_id: int) -> Optional[Any]:
if not (_SUPABASE_URL and _SUPABASE_ANON_KEY):
return None
req = urllib.request.Request(
f"{_SUPABASE_URL}/rest/v1/todo_sync?id=eq.{row_id}&select=data",
headers={
"apikey": _SUPABASE_ANON_KEY,
"Authorization": f"Bearer {_SUPABASE_ANON_KEY}",
},
)
try:
resp = urllib.request.urlopen(req, timeout=14)
rows = json.loads(resp.read().decode("utf-8"))
if rows and isinstance(rows, list) and rows:
return rows[0].get("data")
except (urllib.error.URLError, OSError, ValueError, json.JSONDecodeError):
pass
return None
def _prefer_nonempty_iso(local_iso: Optional[str], remote_iso: Optional[str]) -> str:
if local_iso and remote_iso:
ld = parse_iso_ts(local_iso)
rd = parse_iso_ts(remote_iso)
if ld and rd:
return remote_iso if rd >= ld else local_iso
if rd:
return remote_iso
if ld:
return local_iso
return local_iso or utc_now_iso()
return remote_iso or local_iso or utc_now_iso()
def merge_autotext_fragments(
local: Dict[str, Any],
remote: Optional[Dict[str, Any]],
) -> Tuple[Dict[str, str], Dict[str, str], bool]:
"""entries, entry_meta, enabled (global-Schalter wie in load_autotext)."""
loc_ent = normalize_autotext_entries(local.get("entries"))
loc_meta_raw = local.get("entry_meta") or {}
loc_meta = dict(loc_meta_raw) if isinstance(loc_meta_raw, dict) else {}
if not isinstance(remote, dict):
return dict(loc_ent), dict(loc_meta), bool(local.get("enabled", True))
rem_ent = normalize_autotext_entries(remote.get("entries"))
rem_meta_raw = remote.get("entry_meta") or {}
rem_meta = dict(rem_meta_raw) if isinstance(rem_meta_raw, dict) else {}
out_ent: Dict[str, str] = {}
out_meta: Dict[str, str] = {}
ls_snap = parse_iso_ts(remote.get("workspace_backup_ts"))
lw_snap = parse_iso_ts(local.get("workspace_backup_ts"))
remote_snapshot_newer = ls_snap is not None and (lw_snap is None or ls_snap > lw_snap)
keys = sorted(set(loc_ent.keys()) | set(rem_ent.keys()))
for abbr in keys:
lt = loc_ent.get(abbr)
rt = rem_ent.get(abbr)
l_iso = loc_meta.get(abbr)
r_iso = rem_meta.get(abbr)
l_dt = parse_iso_ts(l_iso)
r_dt = parse_iso_ts(r_iso)
if lt is None:
out_ent[abbr] = str(rt or "")
out_meta[abbr] = r_iso or utc_now_iso()
continue
if rt is None:
out_ent[abbr] = str(lt or "")
out_meta[abbr] = l_iso or utc_now_iso()
continue
s_l = str(lt or "")
s_r = str(rt or "")
if s_l == s_r:
out_ent[abbr] = s_l
out_meta[abbr] = _prefer_nonempty_iso(l_iso, r_iso)
continue
if remote_snapshot_newer and (r_iso or not l_iso):
if r_dt is not None and (l_dt is None or r_dt >= l_dt):
out_ent[abbr] = s_r
out_meta[abbr] = r_iso or utc_now_iso()
continue
if r_dt is not None and l_dt is not None:
if r_dt > l_dt:
out_ent[abbr] = s_r
out_meta[abbr] = r_iso or utc_now_iso()
else:
out_ent[abbr] = s_l
out_meta[abbr] = l_iso or utc_now_iso()
continue
if r_dt is not None and l_dt is None:
out_ent[abbr] = s_r
out_meta[abbr] = r_iso or utc_now_iso()
continue
out_ent[abbr] = s_l
out_meta[abbr] = l_iso or utc_now_iso()
merged_global_enabled = bool(local.get("enabled", True))
r_en = remote.get("enabled")
if remote_snapshot_newer and isinstance(r_en, bool):
merged_global_enabled = r_en
return out_ent, out_meta, merged_global_enabled
def merge_textbloecke_dict(
local: Dict[str, Any],
remote: Optional[Dict[str, Any]],
) -> Dict[str, Dict[str, str]]:
"""Je Slot: neueres updated_at gewinnt; sonst lokaler Inhalt (Offline-Primat)."""
loc = strip_internal_textbloecke_meta(dict(local or {}))
rem_blocks: Dict[str, Any] = {}
if isinstance(remote, dict):
inner = remote.get("blocks")
if isinstance(inner, dict):
rem_blocks = dict(inner)
keys = sorted(set(loc.keys()) | set(rem_blocks.keys()), key=lambda x: int(str(x)))
out: Dict[str, Dict[str, str]] = {}
for k in keys:
if not str(k).isdigit():
continue
a = dict(loc.get(k) or {})
b = dict(rem_blocks.get(k) or {})
if not a:
out[str(k)] = {
"name": str(b.get("name") or "").strip() or f"Textblock {k}",
"content": str(b.get("content") or ""),
"updated_at": str(b.get("updated_at") or utc_now_iso()),
}
continue
if not b:
out[str(k)] = {
"name": str(a.get("name") or "").strip() or f"Textblock {k}",
"content": str(a.get("content") or ""),
"updated_at": str(a.get("updated_at") or utc_now_iso()),
}
continue
na = str(a.get("name") or "").strip() or f"Textblock {k}"
nb = str(b.get("name") or "").strip() or f"Textblock {k}"
ca = str(a.get("content") or "")
cb = str(b.get("content") or "")
if na == nb and ca == cb:
out[str(k)] = {
"name": na,
"content": ca,
"updated_at": _prefer_nonempty_iso(
str(a.get("updated_at")), str(b.get("updated_at"))),
}
continue
ta = parse_iso_ts(a.get("updated_at"))
tb_dt = parse_iso_ts(b.get("updated_at"))
if tb_dt is not None and (ta is None or tb_dt > ta):
src = b
elif ta is not None and (tb_dt is None or ta >= tb_dt):
src = a
else:
src = a
out[str(k)] = {
"name": str(src.get("name") or "").strip() or f"Textblock {k}",
"content": str(src.get("content") or ""),
"updated_at": str(src.get("updated_at") or utc_now_iso()),
}
slots = sorted(out.keys(), key=int)
if len(slots) < 2:
return {
"1": {"name": "Textblock 1", "content": "", "updated_at": utc_now_iso()},
"2": {"name": "Textblock 2", "content": "", "updated_at": utc_now_iso()},
}
return out
def _later_workspace_ts(local_ts: Optional[str], remote_ts: Optional[str]) -> str:
lt, rt = parse_iso_ts(local_ts), parse_iso_ts(remote_ts)
if lt and rt:
return remote_ts if rt >= lt else (local_ts or remote_ts or utc_now_iso())
return remote_ts or local_ts or utc_now_iso()
def autotext_backup_payload(
enabled: bool,
entries: Dict[str, Any],
entry_meta: Dict[str, Any],
) -> Dict[str, Any]:
out = {
"v": 1,
"enabled": enabled,
"entries": normalize_autotext_entries(entries),
"entry_meta": dict(entry_meta or {}),
"workspace_backup_ts": utc_now_iso(),
}
meta = _workspace_tag_backup_meta()
if meta:
out.update(meta)
return out
def textbloecke_backup_payload(blocks: Dict[str, Any]) -> Dict[str, Any]:
out = {
"v": 1,
"blocks": dict(blocks or {}),
"workspace_backup_ts": utc_now_iso(),
}
meta = _workspace_tag_backup_meta()
if meta:
out.update(meta)
return out
def schedule_workspace_cloud_push(delay: Optional[float] = None) -> None:
"""Nach lokalem Speichern: debounced Autotext+Textblöcke ins Backup."""
global _push_timer_state
delay_sec = delay if delay is not None else _SYNC_PUSH_DELAY_SEC
def push_both() -> None:
try:
id_at, id_tb = _effective_workspace_backup_row_ids()
if id_at is None or id_tb is None:
return
from aza_persistence import load_autotext
at = load_autotext()
snap = autotext_backup_payload(
bool(at.get("enabled", True)),
at.get("entries") or {},
at.get("entry_meta") or {},
)
cloud_push_workspace_row(id_at, snap)
from aza_persistence import load_textbloecke
tb_clean = strip_internal_textbloecke_meta(load_textbloecke())
cloud_push_workspace_row(
id_tb,
textbloecke_backup_payload(tb_clean),
)
at["workspace_backup_ts"] = snap["workspace_backup_ts"]
from aza_persistence import save_autotext
save_autotext(at)
except Exception:
pass
def arm() -> None:
global _push_timer_state
with _push_timer_lock:
if _push_timer_state is not None:
try:
_push_timer_state.cancel()
except Exception:
pass
t = threading.Timer(
delay_sec,
lambda: threading.Thread(target=push_both, daemon=True).start(),
)
t.daemon = True
_push_timer_state = t
t.start()
try:
arm()
except Exception:
pass
def strip_internal_textbloecke_meta(d: Dict[str, Any]) -> Dict[str, Any]:
return {
sk: sv for sk, sv in dict(d).items()
if not str(sk).startswith("__")
}
def start_background_workspace_hybrid_sync(app) -> None:
def job():
time.sleep(1.8)
row_at, row_tb = _effective_workspace_backup_row_ids()
try:
from aza_persistence import (
load_autotext,
load_textbloecke,
save_autotext,
save_textbloecke,
)
local_at = load_autotext()
rat: Optional[Any] = None
rtl: Optional[Any] = None
if row_at is not None:
rat = cloud_pull_workspace_row(row_at)
if row_tb is not None:
rtl = cloud_pull_workspace_row(row_tb)
merged_e, merged_m, merged_en = merge_autotext_fragments(
local_at,
rat if isinstance(rat, dict) else None,
)
merged_at_data = dict(local_at)
merged_at_data["entries"] = merged_e
merged_at_data["entry_meta"] = merged_m
merged_at_data["enabled"] = merged_en
rts = rat.get("workspace_backup_ts") if isinstance(rat, dict) else None
merged_at_data["workspace_backup_ts"] = _later_workspace_ts(
local_at.get("workspace_backup_ts"), rts if isinstance(rts, str) else None,
)
ltbl = load_textbloecke()
remote_wrap = rtl if isinstance(rtl, dict) else None
merged_tb = merge_textbloecke_dict(ltbl, remote_wrap)
def apply_ui():
try:
tgt = getattr(app, "_autotext_data", None)
merged = merged_at_data
if isinstance(tgt, dict):
en_m = normalize_autotext_entries(merged.get("entries"))
tgt["enabled"] = bool(merged.get("enabled", True))
te = tgt.get("entries")
if isinstance(te, dict):
te.clear()
te.update(en_m)
else:
tgt["entries"] = dict(en_m)
em_tgt = tgt.get("entry_meta")
em_src = merged.get("entry_meta") or {}
if isinstance(em_tgt, dict):
em_tgt.clear()
if isinstance(em_src, dict):
em_tgt.update(em_src)
else:
tgt["entry_meta"] = dict(em_src) if isinstance(em_src, dict) else {}
wts = merged.get("workspace_backup_ts")
if isinstance(wts, str):
tgt["workspace_backup_ts"] = wts
save_autotext(tgt)
else:
setattr(app, "_autotext_data", merged)
save_autotext(merged)
save_textbloecke(strip_internal_textbloecke_meta(merged_tb))
shell = getattr(app, "_aza_office_v1", None)
if shell is not None and hasattr(
shell, "refresh_sidebar_textbloecke_section"):
shell.refresh_sidebar_textbloecke_section()
except Exception:
pass
try:
app.after(0, apply_ui)
except Exception:
apply_ui()
time.sleep(0.5)
snap_a = autotext_backup_payload(
bool(merged_at_data.get("enabled", True)),
merged_at_data.get("entries") or {},
merged_at_data.get("entry_meta") or {},
)
if row_at is not None and row_tb is not None:
cloud_push_workspace_row(row_at, snap_a)
cloud_push_workspace_row(
row_tb,
textbloecke_backup_payload(
strip_internal_textbloecke_meta(merged_tb)),
)
except Exception:
pass
try:
threading.Thread(target=job, daemon=True).start()
except Exception:
pass
def touch_autotext_entry_meta(data: dict, abbrev: str) -> None:
if not isinstance(data, dict) or not abbrev:
return
em = data.setdefault("entry_meta", {})
if isinstance(em, dict):
em[abbrev.strip()] = utc_now_iso()
def prune_autotext_meta(data: dict, valid_abbrevs: set) -> None:
em = data.get("entry_meta")
if not isinstance(em, dict):
data["entry_meta"] = {}
return
for k in list(em.keys()):
if k not in valid_abbrevs:
try:
del em[k]
except Exception:
pass
__all__ = [
"WORKSPACE_AUTOTEXT_SYNC_ID",
"WORKSPACE_TEXTBLOECKE_SYNC_ID",
"utc_now_iso",
"normalize_autotext_entries",
"schedule_workspace_cloud_push",
"start_background_workspace_hybrid_sync",
"touch_autotext_entry_meta",
"prune_autotext_meta",
"autotext_backup_payload",
"textbloecke_backup_payload",
"merge_autotext_fragments",
"merge_textbloecke_dict",
]