# -*- coding: utf-8 -*- """Einfache Token-basierte Authentifizierung (JWT).""" import datetime from typing import Optional import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import jwt, JWTError from sqlalchemy.orm import Session from ..config import SECRET_KEY, ACCESS_TOKEN_EXPIRE_MINUTES from ..database import get_db from ..core.models import Employee from ..core.enums import EmployeeRole _security = HTTPBearer(auto_error=False) ALGORITHM = "HS256" def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) def create_access_token(employee_id: str, role: str) -> str: expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) return jwt.encode( {"sub": employee_id, "role": role, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM, ) def get_current_user( creds: Optional[HTTPAuthorizationCredentials] = Depends(_security), db: Session = Depends(get_db), ) -> Employee: if not creds: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Token fehlt") try: payload = jwt.decode(creds.credentials, SECRET_KEY, algorithms=[ALGORITHM]) emp_id = payload.get("sub") if not emp_id: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Ungültiger Token") except JWTError: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Ungültiger Token") emp = db.query(Employee).filter(Employee.id == emp_id).first() if not emp or not emp.is_active: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Benutzer nicht gefunden") return emp def require_role(*roles: EmployeeRole): """Dependency-Factory: erlaubt nur bestimmte Rollen.""" def _check(user: Employee = Depends(get_current_user)): if user.role not in roles: raise HTTPException(status.HTTP_403_FORBIDDEN, "Keine Berechtigung") return user return _check