Files
aza/AzA march 2026 - Kopie (27)/hetzner_diag_woo_403_phase1e.py
2026-05-20 00:09:28 +02:00

215 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""
Phase 1e: read-only Woo REST 403 isolation. Prints only status codes and safe metadata.
No secrets, no emails, no full response bodies.
"""
from __future__ import annotations
import json
import os
import re
import sqlite3
import sys
from typing import Any, Dict, Optional, Tuple
import requests
from requests.auth import HTTPBasicAuth
def mask_pii(s: str) -> str:
s = re.sub(r"\b[\w.+-]+@[\w.-]+\.[a-zA-Z]{2,}\b", "[email]", s)
return s
def summarize_wp_error(r: requests.Response) -> Dict[str, Any]:
out: Dict[str, Any] = {
"http": r.status_code,
"content_type": (r.headers.get("content-type") or "")[:60],
}
try:
j = r.json()
except Exception:
out["json"] = False
out["body_prefix"] = (r.text or "")[:80].replace("\n", " ")
return out
out["json"] = True
if isinstance(j, dict):
code = j.get("code")
if code is not None:
out["wp_error_code"] = code
msg = j.get("message")
if isinstance(msg, str) and msg:
out["message_paraphrase"] = mask_pii(msg)[:200]
data = j.get("data")
if isinstance(data, dict) and "status" in data:
out["data_status"] = data.get("status")
elif isinstance(j, list):
out["json_top"] = "array"
out["json_len"] = len(j)
else:
out["json_top"] = type(j).__name__
return out
def get_auth() -> Tuple[str, HTTPBasicAuth]:
base = (os.environ.get("AZA_WOOCOMMERCE_URL") or "").strip().rstrip("/")
key = os.environ.get("AZA_WOOCOMMERCE_CONSUMER_KEY") or ""
sec = os.environ.get("AZA_WOOCOMMERCE_CONSUMER_SECRET") or ""
if not (base and key and sec):
raise SystemExit("missing env")
return base, HTTPBasicAuth(key, sec)
def fetch(method: str, url: str, auth: HTTPBasicAuth, **kw: Any) -> requests.Response:
return requests.request(method, url, auth=auth, timeout=(5, 30), **kw)
def pick_license_rows(db_path: str) -> Tuple[Optional[int], Optional[int]]:
"""Returns (order_numeric_id_or_none, sub_numeric_id_or_none) from wc_ prefixed cols."""
con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
try:
row = con.execute(
"""
SELECT customer_id, subscription_id FROM licenses
WHERE lower(trim(status))='active'
AND subscription_id LIKE 'wc_sub_%'
LIMIT 1
"""
).fetchone()
sub_num: Optional[int] = None
ord_num: Optional[int] = None
if row:
cid, sid = row[0], row[1]
if sid:
m = re.search(r"wc_sub_(\d+)", str(sid), re.I)
if m:
sub_num = int(m.group(1))
if cid:
m = re.search(r"wc_order_(\d+)", str(cid), re.I)
if m:
ord_num = int(m.group(1))
if ord_num is None:
r2 = con.execute(
"""
SELECT customer_id FROM licenses
WHERE customer_id LIKE 'wc_order_%'
LIMIT 1
"""
).fetchone()
if r2 and r2[0]:
m = re.search(r"wc_order_(\d+)", str(r2[0]), re.I)
if m:
ord_num = int(m.group(1))
return ord_num, sub_num
finally:
con.close()
def main() -> None:
base, auth = get_auth()
print("=== A) ENV (presence only) ===")
print("ENV_URL", "yes" if os.environ.get("AZA_WOOCOMMERCE_URL", "").strip() else "no")
print("ENV_KEY", "yes" if os.environ.get("AZA_WOOCOMMERCE_CONSUMER_KEY", "").strip() else "no")
print("ENV_SECRET", "yes" if os.environ.get("AZA_WOOCOMMERCE_CONSUMER_SECRET", "").strip() else "no")
print("\n=== B1) GET /wp-json ===")
r = fetch("GET", base + "/wp-json", auth)
print("wp_json_root_http", r.status_code)
print("wp_json_root", summarize_wp_error(r))
print("\n=== B2) GET /wp-json/wc/v3 ===")
r = fetch("GET", base + "/wp-json/wc/v3", auth)
print("wc_v3_index_http", r.status_code)
s = summarize_wp_error(r)
print("wc_v3_index", s)
if r.ok:
try:
j = r.json()
print("wc_v3_index_has_namespace", isinstance(j, dict) and "namespace" in j)
except Exception:
pass
db_path = os.environ.get("STRIPE_DB_PATH") or "/app/data/stripe_webhook.sqlite"
ord_id, sub_id = pick_license_rows(db_path)
print("\n=== license-derived ids (numeric only) ===")
print("sample_order_numeric_from_db", ord_id)
print("sample_sub_numeric_from_db", sub_id)
if ord_id is not None:
print("\n=== B3) GET /wp-json/wc/v3/orders/{id} ===")
r = fetch("GET", f"{base}/wp-json/wc/v3/orders/{ord_id}", auth)
print("orders_single_http", r.status_code)
summ = summarize_wp_error(r)
print("orders_single", summ)
if r.ok:
try:
j = r.json()
jid = j.get("id") if isinstance(j, dict) else None
print("orders_json_id_matches", jid == ord_id)
except Exception:
print("orders_json_parse", False)
else:
print("\n=== B3) orders test skipped (no wc_order_ id in DB) ===")
if sub_id is not None:
print("\n=== B4) GET /wp-json/wc/v3/subscriptions/{id} ===")
r = fetch("GET", f"{base}/wp-json/wc/v3/subscriptions/{sub_id}", auth)
print("subscriptions_single_http", r.status_code)
print("subscriptions_single", summarize_wp_error(r))
else:
print("\n=== B4) subscriptions single skipped ===")
print("\n=== B5) GET /wp-json/wc/v3/subscriptions?per_page=1 ===")
r = fetch("GET", base + "/wp-json/wc/v3/subscriptions?per_page=1", auth)
print("subscriptions_list_http", r.status_code)
summ = summarize_wp_error(r)
print("subscriptions_list", summ)
print("\n=== C) More routes (GET only) ===")
for path, label in [
("/wp-json/wc/v3/orders?per_page=1", "orders_list"),
]:
r = fetch("GET", base + path, auth)
print(label + "_http", r.status_code)
print(label, summarize_wp_error(r))
for path, label in [
("/wp-json/wps/v1/subscriptions?per_page=1", "wps_v1_subscriptions_list"),
]:
r = fetch("GET", base + path, auth)
print(label + "_http", r.status_code)
print(label, summarize_wp_error(r))
print("\n=== OPTIONS /wp-json/wc/v3/subscriptions ===")
r = fetch("OPTIONS", base + "/wp-json/wc/v3/subscriptions", auth)
print("subscriptions_options_http", r.status_code)
allow = r.headers.get("Allow") or r.headers.get("allow")
print("subscriptions_options_allow_header", (allow or "")[:120])
print("\n=== D) Classification hint ===")
# re-read last subscription single response for code
if sub_id is not None:
r = fetch("GET", f"{base}/wp-json/wc/v3/subscriptions/{sub_id}", auth)
try:
j = r.json()
if isinstance(j, dict) and "code" in j:
c = str(j.get("code"))
if "authentication" in c.lower():
print("hint", "likely_category: authentication / invalid key")
elif "cannot_view" in c.lower() or c == "rest_forbidden":
print("hint", "likely_category: authenticated_but_forbidden_cap")
elif r.status_code == 404:
print("hint", "likely_category: endpoint_or_resource_missing")
else:
print("hint", "likely_category: see wp_error_code above")
except Exception:
print("hint", "parse_failed")
if __name__ == "__main__":
try:
main()
except requests.RequestException as e:
print("REQUEST_ERROR", type(e).__name__, str(e)[:120])
sys.exit(1)