Files
aza/AzA march 2026/aza_wc_period_sync.py

308 lines
10 KiB
Python
Raw Permalink Normal View History

2026-05-20 00:09:28 +02:00
# -*- coding: utf-8 -*-
"""
WooCommerce Subscriptions licenses.current_period_* (read-only REST, keine Zahlungen).
Konfiguration per Umgebungsvariable (mind. Site-URL + API-Keys erforderlich):
AZA_WOOCOMMERCE_URL Basis-URL Shop (https://, ohne trailing slash)
AZA_WOOCOMMERCE_CONSUMER_KEY
AZA_WOOCOMMERCE_CONSUMER_SECRET
Alternative Namen (Fallback, gleiche Bedeutung):
WOOCOMMERCE_URL, WOOCOMMERCE_CONSUMER_KEY, WOOCOMMERCE_CONSUMER_SECRET
Keine Secrets loggen. Nur Subscription GET /wp-json/wc/v3/subscriptions/{id}
"""
from __future__ import annotations
import calendar
import os
import re
import sqlite3
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import quote
# Optional: nur wenn requests verfügbar (Backend-Dependencies)
try:
import requests
from requests.auth import HTTPBasicAuth
except ImportError:
requests = None # type: ignore
HTTPBasicAuth = None # type: ignore
_WC_SUB_PREFIX_RE = re.compile(r"^wc_sub_(\d+)\s*$", re.IGNORECASE)
def extract_wc_subscription_numeric_id(subscription_id: str) -> Optional[int]:
"""Nur echtes Präfix wc_sub_<Zahl>; sonst None (kein Raten)."""
m = _WC_SUB_PREFIX_RE.match((subscription_id or "").strip())
if not m:
return None
try:
return int(m.group(1))
except ValueError:
return None
def _woo_rest_credentials() -> Optional[Tuple[str, str, str]]:
base = (
os.environ.get("AZA_WOOCOMMERCE_URL", "").strip()
or os.environ.get("WOOCOMMERCE_URL", "").strip()
).rstrip("/")
key = (
os.environ.get("AZA_WOOCOMMERCE_CONSUMER_KEY", "").strip()
or os.environ.get("WOOCOMMERCE_CONSUMER_KEY", "").strip()
)
secret = (
os.environ.get("AZA_WOOCOMMERCE_CONSUMER_SECRET", "").strip()
or os.environ.get("WOOCOMMERCE_CONSUMER_SECRET", "").strip()
)
if not base or not key or not secret:
return None
return base, key, secret
def _stripe_db_path() -> Path:
try:
from stripe_routes import DB_PATH as _p # type: ignore
return Path(_p)
except Exception:
return Path(__file__).resolve().parent / "data" / "stripe_webhook.sqlite"
def _parse_wc_datetime(value: Any) -> Optional[int]:
if value is None:
return None
text = str(value).strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(text)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return int(dt.timestamp())
except Exception:
return None
def _meta_find(meta: Any, keys: Tuple[str, ...]) -> Optional[str]:
if not isinstance(meta, list):
return None
keyset = {k.lower() for k in keys}
for item in meta:
if not isinstance(item, dict):
continue
k = str(item.get("key") or "").strip()
if k.lower() in keyset:
v = item.get("value")
if v is not None and str(v).strip():
return str(v).strip()
return None
def _subtract_billing_period(end_ts: int, billing_period: str, interval: int) -> int:
"""period_start aus period_end Abrechnungsintervall (UTC)."""
p = (billing_period or "month").strip().lower()
iv = max(1, int(interval) if interval else 1)
end = datetime.fromtimestamp(end_ts, tz=timezone.utc)
if p == "month":
y, m = end.year, end.month
m -= iv
while m <= 0:
m += 12
y -= 1
last_day = calendar.monthrange(y, m)[1]
d = min(end.day, last_day)
start = end.replace(year=y, month=m, day=d)
elif p == "year":
start = end.replace(year=end.year - iv)
elif p == "week":
start = end - timedelta(weeks=iv)
elif p == "day":
start = end - timedelta(days=iv)
else:
start = end - timedelta(days=30 * iv)
return int(start.timestamp())
def compute_period_unix_from_wc_subscription(obj: Dict[str, Any]) -> Tuple[Optional[int], Optional[int]]:
"""
Liefert (period_start, period_end) in Unix-Sekunden UTC.
Bevorzugt: next_payment_date period_end; period_start = end billing_period*interval.
"""
next_raw = obj.get("next_payment_date")
if not next_raw:
meta = obj.get("meta_data")
nv = _meta_find(
meta,
(
"_schedule_next_payment",
"schedule_next_payment",
),
)
if nv:
next_raw = nv
period_end = _parse_wc_datetime(next_raw)
if period_end is None:
return None, None
bp = str(obj.get("billing_period") or "month")
bi = obj.get("billing_interval")
try:
bi_int = int(bi) if bi is not None else 1
except (TypeError, ValueError):
bi_int = 1
period_start = _subtract_billing_period(period_end, bp, bi_int)
sd = _parse_wc_datetime(obj.get("start_date_gmt") or obj.get("start_date") or obj.get("date_created"))
if sd is not None and sd > period_start and sd < period_end:
period_start = sd
if period_start >= period_end:
period_start = period_end - 86400
return period_start, period_end
def _fetch_wc_subscription(wc_sub_id: int) -> Dict[str, Any]:
if requests is None or HTTPBasicAuth is None:
raise RuntimeError("requests_not_available")
cred = _woo_rest_credentials()
if not cred:
raise RuntimeError("woo_credentials_missing")
base, key, secret = cred
path = f"/wp-json/wc/v3/subscriptions/{int(wc_sub_id)}"
url = f"{base}{path}"
r = requests.get(url, auth=HTTPBasicAuth(key, secret), timeout=(5, 30))
if r.status_code == 404:
qurl = f"{base}/wp-json/wc/v3/subscriptions?search={quote(str(wc_sub_id))}&per_page=10"
r2 = requests.get(qurl, auth=HTTPBasicAuth(key, secret), timeout=(5, 30))
if r2.ok:
arr = r2.json()
if isinstance(arr, list) and len(arr) == 1 and isinstance(arr[0], dict):
if int(arr[0].get("id") or 0) == wc_sub_id:
return arr[0]
raise RuntimeError("subscription_not_found_http_404")
r.raise_for_status()
data = r.json()
if not isinstance(data, dict):
raise RuntimeError("invalid_json_shape")
return data
def sync_active_license_periods_from_woocommerce_only() -> Dict[str, Any]:
"""
Nur Zeilen mit subscription_id LIKE wc_sub_%: WooCommerce Subscription lesen,
current_period_start / current_period_end / updated_at setzen.
Kein status, customer_id, lookup_key, keine Stripe-/WC-Writes außer SQLite-UPDATE.
"""
if not _woo_rest_credentials():
return {
"ok": False,
"error_code": "WOO_CREDENTIALS_MISSING",
"message": "Set AZA_WOOCOMMERCE_URL + AZA_WOOCOMMERCE_CONSUMER_KEY + AZA_WOOCOMMERCE_CONSUMER_SECRET (or WOOCOMMERCE_* aliases).",
}
if requests is None:
return {
"ok": False,
"error_code": "REQUESTS_MISSING",
"message": "Python package requests required for Woo REST.",
}
db_path = _stripe_db_path()
if not db_path.exists():
return {"ok": False, "error_code": "DB_MISSING", "message": "stripe_webhook.sqlite not found"}
now = int(time.time())
total = 0
updated = 0
skipped = 0
errors: List[Dict[str, str]] = []
with sqlite3.connect(str(db_path)) as con:
rows = con.execute(
"""
SELECT subscription_id FROM licenses
WHERE lower(trim(status))='active'
AND subscription_id IS NOT NULL
AND subscription_id LIKE 'wc_sub_%'
"""
).fetchall()
for (sid_raw,) in rows:
sid = str(sid_raw or "").strip()
wc_num = extract_wc_subscription_numeric_id(sid)
if wc_num is None:
skipped += 1
errors.append(
{"subscription_id_prefix": sid[:16], "detail": "not_wc_sub_numeric"}
)
continue
total += 1
try:
sub = _fetch_wc_subscription(wc_num)
st = str(sub.get("status") or "").lower().replace("_", "-")
if st in ("cancelled", "canceled", "expired", "trash"):
skipped += 1
errors.append(
{
"subscription_id_prefix": sid[:16],
"detail": f"woo_status_{st}",
}
)
continue
ps, pe = compute_period_unix_from_wc_subscription(sub)
if ps is None or pe is None:
skipped += 1
errors.append(
{
"subscription_id_prefix": sid[:16],
"detail": "no_next_payment_derivable",
}
)
continue
if ps >= pe:
skipped += 1
errors.append(
{
"subscription_id_prefix": sid[:16],
"detail": "invalid_period_order",
}
)
continue
cur = con.execute(
"""
UPDATE licenses
SET current_period_start=?,
current_period_end=?,
updated_at=?
WHERE subscription_id=?
AND lower(trim(status))='active'
""",
(ps, pe, now, sid),
)
if cur.rowcount:
updated += 1
except Exception as exc:
errors.append(
{
"subscription_id_prefix": sid[:16],
"detail": type(exc).__name__,
}
)
con.commit()
return {
"ok": True,
"total_active_wc_sub_rows": total,
"updated": updated,
"skipped": skipped,
"failed_count": len(errors),
"errors": errors[:25],
"estimate_engine": "woocommerce_subscription_rest_v1",
}