update
This commit is contained in:
@@ -80,6 +80,19 @@ class ScheduleItemIn(BaseModel):
|
||||
note: str = ""
|
||||
|
||||
|
||||
class LicenseJoinExistingPracticeIn(BaseModel):
|
||||
"""Bewusster Beitritt: eigene Office-Lizenz an bestehende Praxis (CHAT-Code)."""
|
||||
|
||||
license_key: str = Field(..., min_length=1)
|
||||
invite_code: str = Field(..., min_length=1)
|
||||
join_mode: str = Field(default="join_existing_practice")
|
||||
name: str = Field(..., min_length=1)
|
||||
email: str = Field(..., min_length=1)
|
||||
password: str = Field(..., min_length=4)
|
||||
desktop_specialty: str = ""
|
||||
desktop_title: str = ""
|
||||
|
||||
|
||||
class ScheduleItemUpdate(BaseModel):
|
||||
old: ScheduleItemIn
|
||||
new: ScheduleItemIn
|
||||
@@ -1756,13 +1769,14 @@ def license_status(
|
||||
current_period_end = None
|
||||
customer_email = None
|
||||
license_practice_id: Optional[str] = None
|
||||
lookup_key_val: Optional[str] = None
|
||||
try:
|
||||
with sqlite3.connect(db_path) as con:
|
||||
row = None
|
||||
if license_key and license_key.strip():
|
||||
row = con.execute(
|
||||
"""
|
||||
SELECT status, current_period_end, customer_email, practice_id
|
||||
SELECT status, current_period_end, customer_email, practice_id, lookup_key
|
||||
FROM licenses
|
||||
WHERE upper(license_key) = ?
|
||||
ORDER BY updated_at DESC
|
||||
@@ -1773,7 +1787,7 @@ def license_status(
|
||||
if row is None and email and email.strip():
|
||||
row = con.execute(
|
||||
"""
|
||||
SELECT status, current_period_end, customer_email, practice_id
|
||||
SELECT status, current_period_end, customer_email, practice_id, lookup_key
|
||||
FROM licenses
|
||||
WHERE lower(customer_email) = ?
|
||||
ORDER BY updated_at DESC
|
||||
@@ -1788,11 +1802,35 @@ def license_status(
|
||||
current_period_end = int(row[1]) if row[1] is not None else None
|
||||
customer_email = str(row[2]).strip() if row[2] is not None else None
|
||||
license_practice_id = str(row[3]).strip() if len(row) > 3 and row[3] else None
|
||||
lookup_key_val = str(row[4]).strip() if len(row) > 4 and row[4] else None
|
||||
except Exception:
|
||||
status = None
|
||||
current_period_end = None
|
||||
customer_email = None
|
||||
license_practice_id = None
|
||||
lookup_key_val = None
|
||||
|
||||
try:
|
||||
from empfang_routes import _entitlements_from_lookup_key
|
||||
|
||||
_oa, _ca = _entitlements_from_lookup_key(lookup_key_val)
|
||||
except Exception:
|
||||
_oa, _ca = True, True
|
||||
|
||||
practice_name_out: Optional[str] = None
|
||||
practice_specialty_out: Optional[str] = None
|
||||
if license_practice_id and str(license_practice_id).strip():
|
||||
try:
|
||||
from empfang_routes import _load_practices
|
||||
|
||||
_pdata = _load_practices().get(str(license_practice_id).strip())
|
||||
if isinstance(_pdata, dict):
|
||||
_pn = (_pdata.get("name") or "").strip()
|
||||
_ps = (_pdata.get("specialty") or "").strip()
|
||||
practice_name_out = _pn or None
|
||||
practice_specialty_out = _ps or None
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
decision = compute_license_decision(current_period_end=current_period_end, status=status)
|
||||
|
||||
@@ -1810,6 +1848,14 @@ def license_status(
|
||||
"device_allowed": True,
|
||||
"reason": "ok",
|
||||
"practice_id": license_practice_id or None,
|
||||
"customer_email": customer_email,
|
||||
"license_customer_email": customer_email,
|
||||
"practice_name": practice_name_out,
|
||||
"practice_specialty": practice_specialty_out,
|
||||
"lookup_key": lookup_key_val or "",
|
||||
"plan": lookup_key_val or "",
|
||||
"office_allowed": bool(_oa),
|
||||
"chat_allowed": bool(_ca),
|
||||
}
|
||||
|
||||
dev_user_key = license_practice_id.strip() if license_practice_id and license_practice_id.strip() else "default"
|
||||
@@ -1950,6 +1996,220 @@ def license_activate(
|
||||
return result
|
||||
|
||||
|
||||
def _license_key_audit_tail(key: str) -> str:
|
||||
k = (key or "").strip().upper()
|
||||
if len(k) < 5:
|
||||
return "----"
|
||||
return k[-4:]
|
||||
|
||||
|
||||
@app.post("/license/join_existing_practice")
|
||||
def license_join_existing_practice(
|
||||
request: Request,
|
||||
body: LicenseJoinExistingPracticeIn,
|
||||
):
|
||||
"""Bewusster Office-Beitritt: ``license_key`` an bestehende ``practice_id`` (per CHAT-Code) und Empfang-Konto.
|
||||
|
||||
Keine externe Praxisverbindung, kein rates Erraten — ``join_mode`` muss gesetzt sein.
|
||||
"""
|
||||
if (body.join_mode or "").strip() != "join_existing_practice":
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="join_mode muss «join_existing_practice» sein.",
|
||||
)
|
||||
|
||||
db_path = _stripe_db_path()
|
||||
if not db_path.exists():
|
||||
raise HTTPException(status_code=404, detail="Keine Lizenzdatenbank.")
|
||||
|
||||
try:
|
||||
from stripe_routes import ensure_license_schema
|
||||
except Exception as exc:
|
||||
print(f"[LICENSE-JOIN] ensure_license_schema import: {exc}")
|
||||
raise HTTPException(status_code=503, detail="Lizenzmodul nicht verfuegbar.")
|
||||
|
||||
try:
|
||||
ensure_license_schema(db_path)
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as exc:
|
||||
print(f"[LICENSE-JOIN] ensure_license_schema: {exc}")
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="Lizenzdatenbank konnte nicht vorbereitet werden.",
|
||||
)
|
||||
|
||||
key_clean = (body.license_key or "").strip().upper()
|
||||
with sqlite3.connect(db_path) as con:
|
||||
row = con.execute(
|
||||
"""
|
||||
SELECT subscription_id, status, current_period_end, customer_email,
|
||||
practice_id, lookup_key
|
||||
FROM licenses
|
||||
WHERE upper(license_key) = ?
|
||||
ORDER BY updated_at DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
(key_clean,),
|
||||
).fetchone()
|
||||
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Lizenzschluessel ungueltig.")
|
||||
|
||||
sub_id = row[0]
|
||||
status = row[1]
|
||||
cpe = row[2]
|
||||
cust_email = row[3]
|
||||
practice_id_raw = row[4]
|
||||
lookup_key_raw = row[5] if len(row) > 5 else None
|
||||
current_period_end = int(cpe) if cpe is not None else None
|
||||
old_pid = (str(practice_id_raw).strip() if practice_id_raw else "") or ""
|
||||
|
||||
decision = compute_license_decision(current_period_end=current_period_end, status=status)
|
||||
if not decision.valid:
|
||||
raise HTTPException(status_code=403, detail="Lizenz nicht aktiv oder abgelaufen.")
|
||||
|
||||
try:
|
||||
from empfang_routes import _entitlements_from_lookup_key
|
||||
|
||||
office_ok, _c_ok = _entitlements_from_lookup_key(
|
||||
str(lookup_key_raw).strip() if lookup_key_raw else None,
|
||||
)
|
||||
except Exception:
|
||||
office_ok = True
|
||||
if not office_ok:
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="Diese Lizenz erlaubt keinen AZA-Office-Beitritt als Arzt/Admin.",
|
||||
)
|
||||
|
||||
try:
|
||||
from empfang_routes import (
|
||||
_load_practices,
|
||||
_lookup_practice_id_by_invite,
|
||||
internal_license_join_existing_practice_account,
|
||||
)
|
||||
except Exception as exc:
|
||||
print(f"[LICENSE-JOIN] empfang import: {exc}")
|
||||
raise HTTPException(status_code=503, detail="Empfang-Modul nicht verfuegbar.")
|
||||
|
||||
target_pid = _lookup_practice_id_by_invite(body.invite_code)
|
||||
if not target_pid:
|
||||
raise HTTPException(status_code=404, detail="Ungueltiger oder unbekannter Einladungscode.")
|
||||
|
||||
practices = _load_practices()
|
||||
if target_pid not in practices:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="Zielpraxis fuer diesen Code existiert nicht auf dem Server.",
|
||||
)
|
||||
|
||||
device_id = request.headers.get("X-Device-Id")
|
||||
if not device_id:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="X-Device-Id Header fehlt. Bitte die AZA Desktop-App verwenden.",
|
||||
)
|
||||
|
||||
device_name = request.headers.get("X-Device-Name", "")
|
||||
app_version = request.headers.get("X-App-Version", "")
|
||||
device_fingerprint = request.headers.get("X-Device-Fingerprint", "")
|
||||
cust_email_s = str(cust_email or "").strip()
|
||||
|
||||
if cust_email_s:
|
||||
dd = enforce_and_touch_device(
|
||||
customer_email=cust_email_s,
|
||||
user_key=target_pid.strip(),
|
||||
device_id=device_id,
|
||||
db_path=str(db_path),
|
||||
device_name=device_name,
|
||||
app_version=app_version,
|
||||
device_fingerprint=device_fingerprint,
|
||||
)
|
||||
if not dd.allowed:
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail=(
|
||||
f"Geraete-Limit erreicht: {dd.devices_used}/{dd.devices_allowed} "
|
||||
"Geraete belegt."
|
||||
),
|
||||
headers={"X-Device-Reason": dd.reason},
|
||||
)
|
||||
|
||||
rebind = old_pid != target_pid
|
||||
updated = False
|
||||
acc_payload: dict = {}
|
||||
now_ts = int(time.time())
|
||||
lk_tail = _license_key_audit_tail(key_clean)
|
||||
inv_raw = (body.invite_code or "").strip().upper().replace(" ", "")
|
||||
inv_tail = inv_raw[-4:] if len(inv_raw) >= 4 else "----"
|
||||
|
||||
try:
|
||||
if rebind:
|
||||
with sqlite3.connect(db_path) as con:
|
||||
con.execute(
|
||||
"""
|
||||
UPDATE licenses
|
||||
SET practice_id = ?, updated_at = ?
|
||||
WHERE subscription_id = ?
|
||||
""",
|
||||
(target_pid, now_ts, sub_id),
|
||||
)
|
||||
con.commit()
|
||||
updated = True
|
||||
|
||||
body_dict = body.model_dump()
|
||||
acc_payload = internal_license_join_existing_practice_account(
|
||||
target_practice_id=target_pid,
|
||||
name=body.name,
|
||||
email=body.email,
|
||||
password=body.password,
|
||||
assign_admin=True,
|
||||
license_customer_email=cust_email_s,
|
||||
body=body_dict,
|
||||
)
|
||||
except HTTPException:
|
||||
if updated and rebind:
|
||||
try:
|
||||
prev = old_pid if old_pid else None
|
||||
with sqlite3.connect(db_path) as con:
|
||||
con.execute(
|
||||
"""
|
||||
UPDATE licenses
|
||||
SET practice_id = ?, updated_at = ?
|
||||
WHERE subscription_id = ?
|
||||
""",
|
||||
(prev, int(time.time()), sub_id),
|
||||
)
|
||||
con.commit()
|
||||
except Exception as exc:
|
||||
print(f"[LICENSE-JOIN] Revert practice_id fehlgeschlagen: {exc}")
|
||||
raise
|
||||
|
||||
pdata = practices.get(target_pid) or {}
|
||||
pname = str(pdata.get("name") or "").strip()
|
||||
|
||||
role_as = str(acc_payload.get("role") or "")
|
||||
adm_flag = bool(acc_payload.get("admin"))
|
||||
print(
|
||||
"[LICENSE-JOIN-OK] "
|
||||
f"license_tail={lk_tail} invite_tail={inv_tail} "
|
||||
f"old_practice={(old_pid or '')[:16]!r} new_practice={(target_pid or '')[:16]!r} "
|
||||
f"user={(str(acc_payload.get('user_id') or ''))[:12]!r} "
|
||||
f"role={role_as!r} admin_assigned={adm_flag} "
|
||||
f"rebound={int(rebind)}"
|
||||
)
|
||||
|
||||
out = dict(acc_payload)
|
||||
out["practice_id"] = target_pid
|
||||
out["practice_name"] = pname or None
|
||||
out["license_customer_email"] = cust_email_s or None
|
||||
out["license_rebound_to_existing"] = bool(rebind)
|
||||
if old_pid:
|
||||
out["previous_practice_id"] = old_pid
|
||||
return JSONResponse(content=out)
|
||||
|
||||
|
||||
@app.get("/billing/success")
|
||||
def billing_success(session_id: Optional[str] = Query(None)) -> HTMLResponse:
|
||||
customer_email = ""
|
||||
|
||||
Reference in New Issue
Block a user