215 lines
7.7 KiB
Python
215 lines
7.7 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Phase 1e: read-only Woo REST 403 isolation. Prints only status codes and safe metadata.
|
||
|
|
No secrets, no emails, no full response bodies.
|
||
|
|
"""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
import sqlite3
|
||
|
|
import sys
|
||
|
|
from typing import Any, Dict, Optional, Tuple
|
||
|
|
|
||
|
|
import requests
|
||
|
|
from requests.auth import HTTPBasicAuth
|
||
|
|
|
||
|
|
|
||
|
|
def mask_pii(s: str) -> str:
|
||
|
|
s = re.sub(r"\b[\w.+-]+@[\w.-]+\.[a-zA-Z]{2,}\b", "[email]", s)
|
||
|
|
return s
|
||
|
|
|
||
|
|
|
||
|
|
def summarize_wp_error(r: requests.Response) -> Dict[str, Any]:
|
||
|
|
out: Dict[str, Any] = {
|
||
|
|
"http": r.status_code,
|
||
|
|
"content_type": (r.headers.get("content-type") or "")[:60],
|
||
|
|
}
|
||
|
|
try:
|
||
|
|
j = r.json()
|
||
|
|
except Exception:
|
||
|
|
out["json"] = False
|
||
|
|
out["body_prefix"] = (r.text or "")[:80].replace("\n", " ")
|
||
|
|
return out
|
||
|
|
out["json"] = True
|
||
|
|
if isinstance(j, dict):
|
||
|
|
code = j.get("code")
|
||
|
|
if code is not None:
|
||
|
|
out["wp_error_code"] = code
|
||
|
|
msg = j.get("message")
|
||
|
|
if isinstance(msg, str) and msg:
|
||
|
|
out["message_paraphrase"] = mask_pii(msg)[:200]
|
||
|
|
data = j.get("data")
|
||
|
|
if isinstance(data, dict) and "status" in data:
|
||
|
|
out["data_status"] = data.get("status")
|
||
|
|
elif isinstance(j, list):
|
||
|
|
out["json_top"] = "array"
|
||
|
|
out["json_len"] = len(j)
|
||
|
|
else:
|
||
|
|
out["json_top"] = type(j).__name__
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def get_auth() -> Tuple[str, HTTPBasicAuth]:
|
||
|
|
base = (os.environ.get("AZA_WOOCOMMERCE_URL") or "").strip().rstrip("/")
|
||
|
|
key = os.environ.get("AZA_WOOCOMMERCE_CONSUMER_KEY") or ""
|
||
|
|
sec = os.environ.get("AZA_WOOCOMMERCE_CONSUMER_SECRET") or ""
|
||
|
|
if not (base and key and sec):
|
||
|
|
raise SystemExit("missing env")
|
||
|
|
return base, HTTPBasicAuth(key, sec)
|
||
|
|
|
||
|
|
|
||
|
|
def fetch(method: str, url: str, auth: HTTPBasicAuth, **kw: Any) -> requests.Response:
|
||
|
|
return requests.request(method, url, auth=auth, timeout=(5, 30), **kw)
|
||
|
|
|
||
|
|
|
||
|
|
def pick_license_rows(db_path: str) -> Tuple[Optional[int], Optional[int]]:
|
||
|
|
"""Returns (order_numeric_id_or_none, sub_numeric_id_or_none) from wc_ prefixed cols."""
|
||
|
|
con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
||
|
|
try:
|
||
|
|
row = con.execute(
|
||
|
|
"""
|
||
|
|
SELECT customer_id, subscription_id FROM licenses
|
||
|
|
WHERE lower(trim(status))='active'
|
||
|
|
AND subscription_id LIKE 'wc_sub_%'
|
||
|
|
LIMIT 1
|
||
|
|
"""
|
||
|
|
).fetchone()
|
||
|
|
sub_num: Optional[int] = None
|
||
|
|
ord_num: Optional[int] = None
|
||
|
|
if row:
|
||
|
|
cid, sid = row[0], row[1]
|
||
|
|
if sid:
|
||
|
|
m = re.search(r"wc_sub_(\d+)", str(sid), re.I)
|
||
|
|
if m:
|
||
|
|
sub_num = int(m.group(1))
|
||
|
|
if cid:
|
||
|
|
m = re.search(r"wc_order_(\d+)", str(cid), re.I)
|
||
|
|
if m:
|
||
|
|
ord_num = int(m.group(1))
|
||
|
|
if ord_num is None:
|
||
|
|
r2 = con.execute(
|
||
|
|
"""
|
||
|
|
SELECT customer_id FROM licenses
|
||
|
|
WHERE customer_id LIKE 'wc_order_%'
|
||
|
|
LIMIT 1
|
||
|
|
"""
|
||
|
|
).fetchone()
|
||
|
|
if r2 and r2[0]:
|
||
|
|
m = re.search(r"wc_order_(\d+)", str(r2[0]), re.I)
|
||
|
|
if m:
|
||
|
|
ord_num = int(m.group(1))
|
||
|
|
return ord_num, sub_num
|
||
|
|
finally:
|
||
|
|
con.close()
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
base, auth = get_auth()
|
||
|
|
print("=== A) ENV (presence only) ===")
|
||
|
|
print("ENV_URL", "yes" if os.environ.get("AZA_WOOCOMMERCE_URL", "").strip() else "no")
|
||
|
|
print("ENV_KEY", "yes" if os.environ.get("AZA_WOOCOMMERCE_CONSUMER_KEY", "").strip() else "no")
|
||
|
|
print("ENV_SECRET", "yes" if os.environ.get("AZA_WOOCOMMERCE_CONSUMER_SECRET", "").strip() else "no")
|
||
|
|
|
||
|
|
print("\n=== B1) GET /wp-json ===")
|
||
|
|
r = fetch("GET", base + "/wp-json", auth)
|
||
|
|
print("wp_json_root_http", r.status_code)
|
||
|
|
print("wp_json_root", summarize_wp_error(r))
|
||
|
|
|
||
|
|
print("\n=== B2) GET /wp-json/wc/v3 ===")
|
||
|
|
r = fetch("GET", base + "/wp-json/wc/v3", auth)
|
||
|
|
print("wc_v3_index_http", r.status_code)
|
||
|
|
s = summarize_wp_error(r)
|
||
|
|
print("wc_v3_index", s)
|
||
|
|
if r.ok:
|
||
|
|
try:
|
||
|
|
j = r.json()
|
||
|
|
print("wc_v3_index_has_namespace", isinstance(j, dict) and "namespace" in j)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
db_path = os.environ.get("STRIPE_DB_PATH") or "/app/data/stripe_webhook.sqlite"
|
||
|
|
ord_id, sub_id = pick_license_rows(db_path)
|
||
|
|
print("\n=== license-derived ids (numeric only) ===")
|
||
|
|
print("sample_order_numeric_from_db", ord_id)
|
||
|
|
print("sample_sub_numeric_from_db", sub_id)
|
||
|
|
|
||
|
|
if ord_id is not None:
|
||
|
|
print("\n=== B3) GET /wp-json/wc/v3/orders/{id} ===")
|
||
|
|
r = fetch("GET", f"{base}/wp-json/wc/v3/orders/{ord_id}", auth)
|
||
|
|
print("orders_single_http", r.status_code)
|
||
|
|
summ = summarize_wp_error(r)
|
||
|
|
print("orders_single", summ)
|
||
|
|
if r.ok:
|
||
|
|
try:
|
||
|
|
j = r.json()
|
||
|
|
jid = j.get("id") if isinstance(j, dict) else None
|
||
|
|
print("orders_json_id_matches", jid == ord_id)
|
||
|
|
except Exception:
|
||
|
|
print("orders_json_parse", False)
|
||
|
|
else:
|
||
|
|
print("\n=== B3) orders test skipped (no wc_order_ id in DB) ===")
|
||
|
|
|
||
|
|
if sub_id is not None:
|
||
|
|
print("\n=== B4) GET /wp-json/wc/v3/subscriptions/{id} ===")
|
||
|
|
r = fetch("GET", f"{base}/wp-json/wc/v3/subscriptions/{sub_id}", auth)
|
||
|
|
print("subscriptions_single_http", r.status_code)
|
||
|
|
print("subscriptions_single", summarize_wp_error(r))
|
||
|
|
else:
|
||
|
|
print("\n=== B4) subscriptions single skipped ===")
|
||
|
|
|
||
|
|
print("\n=== B5) GET /wp-json/wc/v3/subscriptions?per_page=1 ===")
|
||
|
|
r = fetch("GET", base + "/wp-json/wc/v3/subscriptions?per_page=1", auth)
|
||
|
|
print("subscriptions_list_http", r.status_code)
|
||
|
|
summ = summarize_wp_error(r)
|
||
|
|
print("subscriptions_list", summ)
|
||
|
|
|
||
|
|
print("\n=== C) More routes (GET only) ===")
|
||
|
|
for path, label in [
|
||
|
|
("/wp-json/wc/v3/orders?per_page=1", "orders_list"),
|
||
|
|
]:
|
||
|
|
r = fetch("GET", base + path, auth)
|
||
|
|
print(label + "_http", r.status_code)
|
||
|
|
print(label, summarize_wp_error(r))
|
||
|
|
|
||
|
|
for path, label in [
|
||
|
|
("/wp-json/wps/v1/subscriptions?per_page=1", "wps_v1_subscriptions_list"),
|
||
|
|
]:
|
||
|
|
r = fetch("GET", base + path, auth)
|
||
|
|
print(label + "_http", r.status_code)
|
||
|
|
print(label, summarize_wp_error(r))
|
||
|
|
|
||
|
|
print("\n=== OPTIONS /wp-json/wc/v3/subscriptions ===")
|
||
|
|
r = fetch("OPTIONS", base + "/wp-json/wc/v3/subscriptions", auth)
|
||
|
|
print("subscriptions_options_http", r.status_code)
|
||
|
|
allow = r.headers.get("Allow") or r.headers.get("allow")
|
||
|
|
print("subscriptions_options_allow_header", (allow or "")[:120])
|
||
|
|
|
||
|
|
print("\n=== D) Classification hint ===")
|
||
|
|
# re-read last subscription single response for code
|
||
|
|
if sub_id is not None:
|
||
|
|
r = fetch("GET", f"{base}/wp-json/wc/v3/subscriptions/{sub_id}", auth)
|
||
|
|
try:
|
||
|
|
j = r.json()
|
||
|
|
if isinstance(j, dict) and "code" in j:
|
||
|
|
c = str(j.get("code"))
|
||
|
|
if "authentication" in c.lower():
|
||
|
|
print("hint", "likely_category: authentication / invalid key")
|
||
|
|
elif "cannot_view" in c.lower() or c == "rest_forbidden":
|
||
|
|
print("hint", "likely_category: authenticated_but_forbidden_cap")
|
||
|
|
elif r.status_code == 404:
|
||
|
|
print("hint", "likely_category: endpoint_or_resource_missing")
|
||
|
|
else:
|
||
|
|
print("hint", "likely_category: see wp_error_code above")
|
||
|
|
except Exception:
|
||
|
|
print("hint", "parse_failed")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
try:
|
||
|
|
main()
|
||
|
|
except requests.RequestException as e:
|
||
|
|
print("REQUEST_ERROR", type(e).__name__, str(e)[:120])
|
||
|
|
sys.exit(1)
|