#!/usr/bin/env python3 """ Phase 1e: read-only Woo REST 403 isolation. Prints only status codes and safe metadata. No secrets, no emails, no full response bodies. """ from __future__ import annotations import json import os import re import sqlite3 import sys from typing import Any, Dict, Optional, Tuple import requests from requests.auth import HTTPBasicAuth def mask_pii(s: str) -> str: s = re.sub(r"\b[\w.+-]+@[\w.-]+\.[a-zA-Z]{2,}\b", "[email]", s) return s def summarize_wp_error(r: requests.Response) -> Dict[str, Any]: out: Dict[str, Any] = { "http": r.status_code, "content_type": (r.headers.get("content-type") or "")[:60], } try: j = r.json() except Exception: out["json"] = False out["body_prefix"] = (r.text or "")[:80].replace("\n", " ") return out out["json"] = True if isinstance(j, dict): code = j.get("code") if code is not None: out["wp_error_code"] = code msg = j.get("message") if isinstance(msg, str) and msg: out["message_paraphrase"] = mask_pii(msg)[:200] data = j.get("data") if isinstance(data, dict) and "status" in data: out["data_status"] = data.get("status") elif isinstance(j, list): out["json_top"] = "array" out["json_len"] = len(j) else: out["json_top"] = type(j).__name__ return out def get_auth() -> Tuple[str, HTTPBasicAuth]: base = (os.environ.get("AZA_WOOCOMMERCE_URL") or "").strip().rstrip("/") key = os.environ.get("AZA_WOOCOMMERCE_CONSUMER_KEY") or "" sec = os.environ.get("AZA_WOOCOMMERCE_CONSUMER_SECRET") or "" if not (base and key and sec): raise SystemExit("missing env") return base, HTTPBasicAuth(key, sec) def fetch(method: str, url: str, auth: HTTPBasicAuth, **kw: Any) -> requests.Response: return requests.request(method, url, auth=auth, timeout=(5, 30), **kw) def pick_license_rows(db_path: str) -> Tuple[Optional[int], Optional[int]]: """Returns (order_numeric_id_or_none, sub_numeric_id_or_none) from wc_ prefixed cols.""" con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) try: row = con.execute( """ SELECT customer_id, subscription_id FROM licenses WHERE lower(trim(status))='active' AND subscription_id LIKE 'wc_sub_%' LIMIT 1 """ ).fetchone() sub_num: Optional[int] = None ord_num: Optional[int] = None if row: cid, sid = row[0], row[1] if sid: m = re.search(r"wc_sub_(\d+)", str(sid), re.I) if m: sub_num = int(m.group(1)) if cid: m = re.search(r"wc_order_(\d+)", str(cid), re.I) if m: ord_num = int(m.group(1)) if ord_num is None: r2 = con.execute( """ SELECT customer_id FROM licenses WHERE customer_id LIKE 'wc_order_%' LIMIT 1 """ ).fetchone() if r2 and r2[0]: m = re.search(r"wc_order_(\d+)", str(r2[0]), re.I) if m: ord_num = int(m.group(1)) return ord_num, sub_num finally: con.close() def main() -> None: base, auth = get_auth() print("=== A) ENV (presence only) ===") print("ENV_URL", "yes" if os.environ.get("AZA_WOOCOMMERCE_URL", "").strip() else "no") print("ENV_KEY", "yes" if os.environ.get("AZA_WOOCOMMERCE_CONSUMER_KEY", "").strip() else "no") print("ENV_SECRET", "yes" if os.environ.get("AZA_WOOCOMMERCE_CONSUMER_SECRET", "").strip() else "no") print("\n=== B1) GET /wp-json ===") r = fetch("GET", base + "/wp-json", auth) print("wp_json_root_http", r.status_code) print("wp_json_root", summarize_wp_error(r)) print("\n=== B2) GET /wp-json/wc/v3 ===") r = fetch("GET", base + "/wp-json/wc/v3", auth) print("wc_v3_index_http", r.status_code) s = summarize_wp_error(r) print("wc_v3_index", s) if r.ok: try: j = r.json() print("wc_v3_index_has_namespace", isinstance(j, dict) and "namespace" in j) except Exception: pass db_path = os.environ.get("STRIPE_DB_PATH") or "/app/data/stripe_webhook.sqlite" ord_id, sub_id = pick_license_rows(db_path) print("\n=== license-derived ids (numeric only) ===") print("sample_order_numeric_from_db", ord_id) print("sample_sub_numeric_from_db", sub_id) if ord_id is not None: print("\n=== B3) GET /wp-json/wc/v3/orders/{id} ===") r = fetch("GET", f"{base}/wp-json/wc/v3/orders/{ord_id}", auth) print("orders_single_http", r.status_code) summ = summarize_wp_error(r) print("orders_single", summ) if r.ok: try: j = r.json() jid = j.get("id") if isinstance(j, dict) else None print("orders_json_id_matches", jid == ord_id) except Exception: print("orders_json_parse", False) else: print("\n=== B3) orders test skipped (no wc_order_ id in DB) ===") if sub_id is not None: print("\n=== B4) GET /wp-json/wc/v3/subscriptions/{id} ===") r = fetch("GET", f"{base}/wp-json/wc/v3/subscriptions/{sub_id}", auth) print("subscriptions_single_http", r.status_code) print("subscriptions_single", summarize_wp_error(r)) else: print("\n=== B4) subscriptions single skipped ===") print("\n=== B5) GET /wp-json/wc/v3/subscriptions?per_page=1 ===") r = fetch("GET", base + "/wp-json/wc/v3/subscriptions?per_page=1", auth) print("subscriptions_list_http", r.status_code) summ = summarize_wp_error(r) print("subscriptions_list", summ) print("\n=== C) More routes (GET only) ===") for path, label in [ ("/wp-json/wc/v3/orders?per_page=1", "orders_list"), ]: r = fetch("GET", base + path, auth) print(label + "_http", r.status_code) print(label, summarize_wp_error(r)) for path, label in [ ("/wp-json/wps/v1/subscriptions?per_page=1", "wps_v1_subscriptions_list"), ]: r = fetch("GET", base + path, auth) print(label + "_http", r.status_code) print(label, summarize_wp_error(r)) print("\n=== OPTIONS /wp-json/wc/v3/subscriptions ===") r = fetch("OPTIONS", base + "/wp-json/wc/v3/subscriptions", auth) print("subscriptions_options_http", r.status_code) allow = r.headers.get("Allow") or r.headers.get("allow") print("subscriptions_options_allow_header", (allow or "")[:120]) print("\n=== D) Classification hint ===") # re-read last subscription single response for code if sub_id is not None: r = fetch("GET", f"{base}/wp-json/wc/v3/subscriptions/{sub_id}", auth) try: j = r.json() if isinstance(j, dict) and "code" in j: c = str(j.get("code")) if "authentication" in c.lower(): print("hint", "likely_category: authentication / invalid key") elif "cannot_view" in c.lower() or c == "rest_forbidden": print("hint", "likely_category: authenticated_but_forbidden_cap") elif r.status_code == 404: print("hint", "likely_category: endpoint_or_resource_missing") else: print("hint", "likely_category: see wp_error_code above") except Exception: print("hint", "parse_failed") if __name__ == "__main__": try: main() except requests.RequestException as e: print("REQUEST_ERROR", type(e).__name__, str(e)[:120]) sys.exit(1)