1544 lines
53 KiB
Python
1544 lines
53 KiB
Python
"""
|
||
AzA Intern – separates internes Webportal (Phase 1).
|
||
Nicht Teil des produktiven AzA-Hauptsystems.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import re
|
||
import secrets
|
||
import sqlite3
|
||
import uuid
|
||
import html as html_module
|
||
from html.parser import HTMLParser
|
||
from contextlib import contextmanager
|
||
from datetime import date, datetime
|
||
from pathlib import Path
|
||
from typing import Any, Generator, Optional
|
||
|
||
from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile
|
||
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.templating import Jinja2Templates
|
||
from passlib.context import CryptContext
|
||
from starlette.middleware.sessions import SessionMiddleware
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
DB_PATH = Path(os.environ.get("AZA_INTERN_DB_PATH", str(BASE_DIR / "intern.db")))
|
||
UPLOAD_DIR = Path(os.environ.get("AZA_INTERN_UPLOAD_DIR", str(BASE_DIR / "uploads")))
|
||
BACKUP_DIR = Path(os.environ.get("AZA_INTERN_BACKUP_DIR", str(BASE_DIR / "backups")))
|
||
TEMPLATES_DIR = BASE_DIR / "templates"
|
||
STATIC_DIR = BASE_DIR / "static"
|
||
|
||
MAX_UPLOAD_BYTES = 200 * 1024 * 1024
|
||
MAX_UPLOAD_MB = 200
|
||
|
||
ALLOWED_EXTENSIONS = frozenset({
|
||
".pdf", ".png", ".jpg", ".jpeg", ".webp", ".gif", ".svg",
|
||
".doc", ".docx", ".odt", ".rtf",
|
||
".xls", ".xlsx", ".ods", ".csv",
|
||
".ppt", ".pptx", ".odp",
|
||
".txt", ".md",
|
||
".zip", ".7z", ".rar",
|
||
".eml", ".msg",
|
||
".xml", ".json",
|
||
})
|
||
|
||
BLOCKED_EXTENSIONS = frozenset({
|
||
".exe", ".bat", ".cmd", ".ps1", ".sh", ".msi", ".dll",
|
||
".js", ".vbs", ".scr", ".jar", ".app", ".dmg",
|
||
})
|
||
|
||
UPLOAD_HINT_LINE1 = "Max. 200 MB pro Datei"
|
||
UPLOAD_HINT_LINE2 = "Bürodateien, PDFs, Bilder, Tabellen, Präsentationen, Archive und E-Mails erlaubt"
|
||
UPLOAD_HINT_LINE3 = "Keine Patientendaten, API-Keys, Passwörter oder Secrets hochladen"
|
||
|
||
TASK_STATUSES = ["Neu", "In Arbeit", "Warten auf André", "Erledigt", "Archiv"]
|
||
TASK_PRIORITIES = ["niedrig", "normal", "hoch"]
|
||
CATEGORIES = [
|
||
"Recherche", "Webseite", "Flyer / Marketing", "Screenshots / Bugs",
|
||
"Anleitung", "Recht / Datenschutz", "Installer / Release", "Sonstiges",
|
||
]
|
||
ROLES = ["admin", "assistant"]
|
||
|
||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||
|
||
app = FastAPI(title="AzA Intern", docs_url=None, redoc_url=None)
|
||
session_secret = os.environ.get("AZA_INTERN_SESSION_SECRET") or secrets.token_hex(32)
|
||
app.add_middleware(SessionMiddleware, secret_key=session_secret, https_only=False, same_site="lax")
|
||
|
||
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
|
||
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
|
||
|
||
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
||
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
|
||
def now_iso() -> str:
|
||
return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
@contextmanager
|
||
def get_db() -> Generator[sqlite3.Connection, None, None]:
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
conn.row_factory = sqlite3.Row
|
||
conn.execute("PRAGMA foreign_keys = ON")
|
||
try:
|
||
yield conn
|
||
conn.commit()
|
||
except Exception:
|
||
conn.rollback()
|
||
raise
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def init_db() -> None:
|
||
with get_db() as conn:
|
||
conn.executescript("""
|
||
CREATE TABLE IF NOT EXISTS users (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
username TEXT NOT NULL UNIQUE,
|
||
password_hash TEXT NOT NULL,
|
||
display_name TEXT,
|
||
role TEXT NOT NULL DEFAULT 'assistant',
|
||
is_active INTEGER NOT NULL DEFAULT 1,
|
||
deleted_at TEXT,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL
|
||
);
|
||
CREATE TABLE IF NOT EXISTS tasks (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
title TEXT NOT NULL,
|
||
description TEXT,
|
||
status TEXT NOT NULL DEFAULT 'Neu',
|
||
priority TEXT NOT NULL DEFAULT 'normal',
|
||
category TEXT,
|
||
assigned_to INTEGER,
|
||
due_date TEXT,
|
||
created_by INTEGER NOT NULL,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
FOREIGN KEY (assigned_to) REFERENCES users(id),
|
||
FOREIGN KEY (created_by) REFERENCES users(id)
|
||
);
|
||
CREATE TABLE IF NOT EXISTS task_comments (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
task_id INTEGER NOT NULL,
|
||
user_id INTEGER NOT NULL,
|
||
comment TEXT NOT NULL,
|
||
created_at TEXT NOT NULL,
|
||
FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
|
||
FOREIGN KEY (user_id) REFERENCES users(id)
|
||
);
|
||
CREATE TABLE IF NOT EXISTS files (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
original_filename TEXT NOT NULL,
|
||
stored_filename TEXT NOT NULL UNIQUE,
|
||
content_type TEXT,
|
||
size_bytes INTEGER NOT NULL,
|
||
category TEXT,
|
||
description TEXT,
|
||
task_id INTEGER,
|
||
uploaded_by INTEGER NOT NULL,
|
||
created_at TEXT NOT NULL,
|
||
FOREIGN KEY (uploaded_by) REFERENCES users(id),
|
||
FOREIGN KEY (task_id) REFERENCES tasks(id)
|
||
);
|
||
CREATE TABLE IF NOT EXISTS notes (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
title TEXT NOT NULL,
|
||
body TEXT,
|
||
category TEXT,
|
||
created_by INTEGER NOT NULL,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
FOREIGN KEY (created_by) REFERENCES users(id)
|
||
);
|
||
CREATE TABLE IF NOT EXISTS tags (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL UNIQUE
|
||
);
|
||
CREATE TABLE IF NOT EXISTS item_tags (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
tag_id INTEGER NOT NULL,
|
||
item_type TEXT NOT NULL,
|
||
item_id INTEGER NOT NULL,
|
||
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE,
|
||
UNIQUE(tag_id, item_type, item_id)
|
||
);
|
||
CREATE TABLE IF NOT EXISTS documents (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
title TEXT NOT NULL,
|
||
content_html TEXT DEFAULT '',
|
||
category TEXT,
|
||
task_id INTEGER,
|
||
created_by INTEGER NOT NULL,
|
||
updated_by INTEGER,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
deleted_at TEXT,
|
||
FOREIGN KEY (task_id) REFERENCES tasks(id),
|
||
FOREIGN KEY (created_by) REFERENCES users(id),
|
||
FOREIGN KEY (updated_by) REFERENCES users(id)
|
||
);
|
||
""")
|
||
|
||
|
||
def migrate_db() -> None:
|
||
with get_db() as conn:
|
||
file_cols = {row[1] for row in conn.execute("PRAGMA table_info(files)").fetchall()}
|
||
if "task_id" not in file_cols:
|
||
conn.execute("ALTER TABLE files ADD COLUMN task_id INTEGER REFERENCES tasks(id)")
|
||
user_cols = {row[1] for row in conn.execute("PRAGMA table_info(users)").fetchall()}
|
||
if "display_name" not in user_cols:
|
||
conn.execute("ALTER TABLE users ADD COLUMN display_name TEXT")
|
||
if "deleted_at" not in user_cols:
|
||
conn.execute("ALTER TABLE users ADD COLUMN deleted_at TEXT")
|
||
conn.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS documents (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
title TEXT NOT NULL,
|
||
content_html TEXT DEFAULT '',
|
||
category TEXT,
|
||
task_id INTEGER,
|
||
created_by INTEGER NOT NULL,
|
||
updated_by INTEGER,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
deleted_at TEXT,
|
||
FOREIGN KEY (task_id) REFERENCES tasks(id),
|
||
FOREIGN KEY (created_by) REFERENCES users(id),
|
||
FOREIGN KEY (updated_by) REFERENCES users(id)
|
||
)
|
||
"""
|
||
)
|
||
|
||
|
||
def hash_password(password: str) -> str:
|
||
return pwd_context.hash(password)
|
||
|
||
|
||
def verify_password(plain: str, hashed: str) -> bool:
|
||
return pwd_context.verify(plain, hashed)
|
||
|
||
|
||
def user_count() -> int:
|
||
with get_db() as conn:
|
||
row = conn.execute("SELECT COUNT(*) AS c FROM users").fetchone()
|
||
return int(row["c"])
|
||
|
||
|
||
def create_admin_from_env() -> bool:
|
||
username = os.environ.get("AZA_INTERN_ADMIN_USER", "").strip()
|
||
password = os.environ.get("AZA_INTERN_ADMIN_PASSWORD", "").strip()
|
||
if not username or not password:
|
||
return False
|
||
if user_count() > 0:
|
||
return False
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
conn.execute(
|
||
"INSERT INTO users (username, password_hash, role, is_active, created_at, updated_at) VALUES (?, ?, 'admin', 1, ?, ?)",
|
||
(username, hash_password(password), ts, ts),
|
||
)
|
||
return True
|
||
|
||
|
||
def get_csrf_token(request: Request) -> str:
|
||
if "csrf_token" not in request.session:
|
||
request.session["csrf_token"] = secrets.token_hex(32)
|
||
return request.session["csrf_token"]
|
||
|
||
|
||
def verify_csrf(request: Request, token: str) -> None:
|
||
expected = request.session.get("csrf_token")
|
||
if not expected or not token or not secrets.compare_digest(expected, token):
|
||
raise HTTPException(status_code=403, detail="CSRF-Token ungültig")
|
||
|
||
|
||
def get_current_user(request: Request) -> Optional[dict[str, Any]]:
|
||
uid = request.session.get("user_id")
|
||
if not uid:
|
||
return None
|
||
with get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT id, username, display_name, role, is_active FROM users WHERE id = ? AND deleted_at IS NULL",
|
||
(uid,),
|
||
).fetchone()
|
||
if not row or not row["is_active"]:
|
||
return None
|
||
return dict(row)
|
||
|
||
|
||
def require_login(request: Request) -> dict[str, Any]:
|
||
user = get_current_user(request)
|
||
if not user:
|
||
raise HTTPException(status_code=303, headers={"Location": "/login"})
|
||
return user
|
||
|
||
|
||
def require_admin(request: Request) -> dict[str, Any]:
|
||
user = require_login(request)
|
||
if user["role"] != "admin":
|
||
raise HTTPException(status_code=403, detail="Admin-Rechte erforderlich")
|
||
return user
|
||
|
||
|
||
def template_ctx(request: Request, user: Optional[dict] = None, **extra) -> dict:
|
||
ctx = {
|
||
"request": request,
|
||
"user": user,
|
||
"csrf_token": get_csrf_token(request),
|
||
"categories": CATEGORIES,
|
||
"task_statuses": TASK_STATUSES,
|
||
"task_priorities": TASK_PRIORITIES,
|
||
"roles": ROLES,
|
||
"max_upload_bytes": MAX_UPLOAD_BYTES,
|
||
"max_upload_mb": MAX_UPLOAD_MB,
|
||
"upload_allowed_extensions": sorted(ALLOWED_EXTENSIONS),
|
||
"upload_blocked_extensions": sorted(BLOCKED_EXTENSIONS),
|
||
"upload_hint_line1": UPLOAD_HINT_LINE1,
|
||
"upload_hint_line2": UPLOAD_HINT_LINE2,
|
||
"upload_hint_line3": UPLOAD_HINT_LINE3,
|
||
}
|
||
ctx.update(extra)
|
||
return ctx
|
||
|
||
|
||
def safe_filename(name: str) -> str:
|
||
name = os.path.basename(name or "file")
|
||
name = re.sub(r"[^\w.\- ]", "_", name, flags=re.UNICODE)
|
||
return name[:200] or "file"
|
||
|
||
|
||
def upload_extension_error(filename: str) -> Optional[str]:
|
||
ext = Path(filename or "").suffix.lower()
|
||
if not ext:
|
||
return "type"
|
||
if ext in BLOCKED_EXTENSIONS:
|
||
return "blocked"
|
||
if ext not in ALLOWED_EXTENSIONS:
|
||
return "type"
|
||
return None
|
||
|
||
|
||
def allowed_extension(filename: str) -> bool:
|
||
return upload_extension_error(filename) is None
|
||
|
||
|
||
def is_odt_file(filename: str) -> bool:
|
||
return Path(filename or "").suffix.lower() == ".odt"
|
||
|
||
|
||
ALLOWED_HTML_TAGS = frozenset({"b", "strong", "i", "em", "u", "p", "br", "ul", "ol", "li", "h1", "h2", "h3", "a"})
|
||
ALLOWED_HTML_ATTRS = frozenset({"href", "title", "target"})
|
||
|
||
|
||
class _HtmlSanitizer(HTMLParser):
|
||
def __init__(self) -> None:
|
||
super().__init__()
|
||
self._parts: list[str] = []
|
||
|
||
def handle_starttag(self, tag: str, attrs: list[tuple[str, Optional[str]]]) -> None:
|
||
tag = tag.lower()
|
||
if tag not in ALLOWED_HTML_TAGS:
|
||
return
|
||
if tag == "br":
|
||
self._parts.append("<br>")
|
||
return
|
||
safe_attrs: list[tuple[str, str]] = []
|
||
for key, val in attrs:
|
||
key = key.lower()
|
||
if key not in ALLOWED_HTML_ATTRS or val is None:
|
||
continue
|
||
val = val.strip()
|
||
if key == "href" and val.lower().startswith(("javascript:", "data:")):
|
||
continue
|
||
safe_attrs.append((key, html_module.escape(val, quote=True)))
|
||
attr_str = "".join(f' {k}="{v}"' for k, v in safe_attrs)
|
||
self._parts.append(f"<{tag}{attr_str}>")
|
||
|
||
def handle_endtag(self, tag: str) -> None:
|
||
tag = tag.lower()
|
||
if tag in ALLOWED_HTML_TAGS and tag != "br":
|
||
self._parts.append(f"</{tag}>")
|
||
|
||
def handle_data(self, data: str) -> None:
|
||
self._parts.append(html_module.escape(data))
|
||
|
||
def get_html(self) -> str:
|
||
return "".join(self._parts)
|
||
|
||
|
||
def sanitize_html(raw: str) -> str:
|
||
if not raw:
|
||
return ""
|
||
cleaned = re.sub(
|
||
r"<\s*(script|iframe|object|embed|style)[^>]*>.*?<\s*/\s*\1\s*>",
|
||
"",
|
||
raw,
|
||
flags=re.I | re.S,
|
||
)
|
||
cleaned = re.sub(r"<\s*(script|iframe|object|embed|style)[^>]*/?\s*>", "", cleaned, flags=re.I)
|
||
cleaned = re.sub(r"\s+on\w+\s*=\s*(['\"]).*?\1", "", cleaned, flags=re.I)
|
||
cleaned = re.sub(r"\s+on\w+\s*=\s*[^\s>]+", "", cleaned, flags=re.I)
|
||
parser = _HtmlSanitizer()
|
||
parser.feed(cleaned)
|
||
parser.close()
|
||
return parser.get_html()
|
||
|
||
|
||
def strip_html_text(raw: str) -> str:
|
||
return re.sub(r"<[^>]+>", " ", raw or "").strip()
|
||
|
||
|
||
def all_tasks_for_select() -> list[dict]:
|
||
with get_db() as conn:
|
||
return [
|
||
dict(r)
|
||
for r in conn.execute(
|
||
"""
|
||
SELECT id, title FROM tasks
|
||
WHERE status != 'Archiv'
|
||
ORDER BY updated_at DESC LIMIT 200
|
||
"""
|
||
).fetchall()
|
||
]
|
||
|
||
|
||
def get_document(doc_id: int, include_deleted: bool = False) -> Optional[dict]:
|
||
with get_db() as conn:
|
||
q = """
|
||
SELECT d.*, c.username AS creator_username, u.username AS updater_username,
|
||
t.title AS task_title
|
||
FROM documents d
|
||
JOIN users c ON c.id = d.created_by
|
||
LEFT JOIN users u ON u.id = d.updated_by
|
||
LEFT JOIN tasks t ON t.id = d.task_id
|
||
WHERE d.id = ?
|
||
"""
|
||
if not include_deleted:
|
||
q += " AND d.deleted_at IS NULL"
|
||
row = conn.execute(q, (doc_id,)).fetchone()
|
||
return dict(row) if row else None
|
||
|
||
|
||
async def save_uploaded_file(
|
||
upload: UploadFile,
|
||
user_id: int,
|
||
*,
|
||
category: str = "",
|
||
description: str = "",
|
||
tags: str = "",
|
||
task_id: Optional[int] = None,
|
||
) -> tuple[bool, str, Optional[dict[str, Any]]]:
|
||
orig = safe_filename(upload.filename or "file")
|
||
ext_err = upload_extension_error(orig)
|
||
if ext_err == "blocked":
|
||
return False, "blocked", None
|
||
if ext_err == "type":
|
||
return False, "type", None
|
||
content = await upload.read()
|
||
if len(content) > MAX_UPLOAD_BYTES:
|
||
return False, "size", None
|
||
stored = f"{uuid.uuid4().hex}{Path(orig).suffix.lower()}"
|
||
if ".." in stored or "/" in stored or "\\" in stored:
|
||
return False, "invalid", None
|
||
dest = UPLOAD_DIR / stored
|
||
dest.write_bytes(content)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
if task_id is not None:
|
||
task = conn.execute("SELECT id FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||
if not task:
|
||
dest.unlink(missing_ok=True)
|
||
return False, "task_not_found", None
|
||
cur = conn.execute(
|
||
"""
|
||
INSERT INTO files (original_filename, stored_filename, content_type, size_bytes,
|
||
category, description, task_id, uploaded_by, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
orig,
|
||
stored,
|
||
upload.content_type or "application/octet-stream",
|
||
len(content),
|
||
category or None,
|
||
description.strip() or None,
|
||
task_id,
|
||
user_id,
|
||
ts,
|
||
),
|
||
)
|
||
file_id = int(cur.lastrowid)
|
||
set_item_tags(conn, "file", file_id, tags)
|
||
if task_id is not None:
|
||
conn.execute("UPDATE tasks SET updated_at = ? WHERE id = ?", (ts, task_id))
|
||
row = conn.execute(
|
||
"""
|
||
SELECT f.*, u.username AS uploader
|
||
FROM files f JOIN users u ON u.id = f.uploaded_by
|
||
WHERE f.id = ?
|
||
""",
|
||
(file_id,),
|
||
).fetchone()
|
||
file_data = dict(row) if row else None
|
||
if file_data:
|
||
with get_db() as conn:
|
||
file_data["tags"] = get_item_tags(conn, "file", file_id)
|
||
return True, "ok", file_data
|
||
|
||
|
||
def get_or_create_tag(conn: sqlite3.Connection, name: str) -> int:
|
||
name = name.strip()
|
||
if not name:
|
||
raise ValueError("Leerer Tag")
|
||
row = conn.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()
|
||
if row:
|
||
return int(row["id"])
|
||
cur = conn.execute("INSERT INTO tags (name) VALUES (?)", (name,))
|
||
return int(cur.lastrowid)
|
||
|
||
|
||
def set_item_tags(conn: sqlite3.Connection, item_type: str, item_id: int, tag_names: str) -> None:
|
||
conn.execute(
|
||
"DELETE FROM item_tags WHERE item_type = ? AND item_id = ?",
|
||
(item_type, item_id),
|
||
)
|
||
for part in re.split(r"[,;]", tag_names or ""):
|
||
tag = part.strip()
|
||
if not tag:
|
||
continue
|
||
tag_id = get_or_create_tag(conn, tag)
|
||
conn.execute(
|
||
"INSERT OR IGNORE INTO item_tags (tag_id, item_type, item_id) VALUES (?, ?, ?)",
|
||
(tag_id, item_type, item_id),
|
||
)
|
||
|
||
|
||
def get_item_tags(conn: sqlite3.Connection, item_type: str, item_id: int) -> list[str]:
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT t.name FROM tags t
|
||
JOIN item_tags it ON it.tag_id = t.id
|
||
WHERE it.item_type = ? AND it.item_id = ?
|
||
ORDER BY t.name
|
||
""",
|
||
(item_type, item_id),
|
||
).fetchall()
|
||
return [r["name"] for r in rows]
|
||
|
||
|
||
def all_users(active_only: bool = True) -> list[dict]:
|
||
with get_db() as conn:
|
||
q = "SELECT id, username, display_name, role, is_active, created_at FROM users WHERE deleted_at IS NULL"
|
||
if active_only:
|
||
q += " AND is_active = 1"
|
||
q += " ORDER BY username"
|
||
return [dict(r) for r in conn.execute(q).fetchall()]
|
||
|
||
|
||
def get_user_profile(user_id: int) -> Optional[dict[str, Any]]:
|
||
with get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT id, username, display_name, role, is_active, created_at, updated_at FROM users WHERE id = ? AND deleted_at IS NULL",
|
||
(user_id,),
|
||
).fetchone()
|
||
return dict(row) if row else None
|
||
|
||
|
||
@app.on_event("startup")
|
||
def on_startup() -> None:
|
||
init_db()
|
||
migrate_db()
|
||
create_admin_from_env()
|
||
|
||
|
||
@app.get("/health")
|
||
def health() -> dict:
|
||
return {"status": "ok", "service": "aza-intern"}
|
||
|
||
|
||
@app.get("/login", response_class=HTMLResponse)
|
||
def login_page(request: Request):
|
||
if get_current_user(request):
|
||
return RedirectResponse("/", status_code=303)
|
||
if user_count() == 0:
|
||
return RedirectResponse("/setup", status_code=303)
|
||
return templates.TemplateResponse(
|
||
"login.html",
|
||
template_ctx(request, error=request.query_params.get("error")),
|
||
)
|
||
|
||
|
||
@app.post("/login")
|
||
def login_submit(
|
||
request: Request,
|
||
username: str = Form(...),
|
||
password: str = Form(...),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
verify_csrf(request, csrf_token)
|
||
with get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT id, password_hash, is_active FROM users WHERE username = ? AND deleted_at IS NULL",
|
||
(username.strip(),),
|
||
).fetchone()
|
||
if not row or not row["is_active"] or not verify_password(password, row["password_hash"]):
|
||
return RedirectResponse("/login?error=1", status_code=303)
|
||
request.session["user_id"] = row["id"]
|
||
return RedirectResponse("/", status_code=303)
|
||
|
||
|
||
@app.get("/setup", response_class=HTMLResponse)
|
||
def setup_page(request: Request):
|
||
if user_count() > 0:
|
||
return RedirectResponse("/login", status_code=303)
|
||
return templates.TemplateResponse("setup.html", template_ctx(request))
|
||
|
||
|
||
@app.post("/setup")
|
||
def setup_submit(
|
||
request: Request,
|
||
username: str = Form(...),
|
||
password: str = Form(...),
|
||
password_confirm: str = Form(...),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
verify_csrf(request, csrf_token)
|
||
if user_count() > 0:
|
||
raise HTTPException(status_code=403, detail="Setup bereits abgeschlossen")
|
||
username = username.strip()
|
||
if len(username) < 3:
|
||
return templates.TemplateResponse(
|
||
"setup.html",
|
||
template_ctx(request, error="Benutzername mindestens 3 Zeichen."),
|
||
)
|
||
if len(password) < 8:
|
||
return templates.TemplateResponse(
|
||
"setup.html",
|
||
template_ctx(request, error="Passwort mindestens 8 Zeichen."),
|
||
)
|
||
if password != password_confirm:
|
||
return templates.TemplateResponse(
|
||
"setup.html",
|
||
template_ctx(request, error="Passwörter stimmen nicht überein."),
|
||
)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
conn.execute(
|
||
"INSERT INTO users (username, password_hash, role, is_active, created_at, updated_at) VALUES (?, ?, 'admin', 1, ?, ?)",
|
||
(username, hash_password(password), ts, ts),
|
||
)
|
||
request.session["user_id"] = 1
|
||
return RedirectResponse("/", status_code=303)
|
||
|
||
|
||
@app.post("/logout")
|
||
def logout(request: Request, csrf_token: str = Form(...)):
|
||
verify_csrf(request, csrf_token)
|
||
request.session.clear()
|
||
return RedirectResponse("/login", status_code=303)
|
||
|
||
|
||
@app.get("/", response_class=HTMLResponse)
|
||
def dashboard(request: Request):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
with get_db() as conn:
|
||
open_tasks = conn.execute(
|
||
"""
|
||
SELECT t.*, u.username AS assigned_username
|
||
FROM tasks t
|
||
LEFT JOIN users u ON u.id = t.assigned_to
|
||
WHERE t.status NOT IN ('Erledigt', 'Archiv')
|
||
ORDER BY CASE t.priority WHEN 'hoch' THEN 0 WHEN 'normal' THEN 1 ELSE 2 END, t.updated_at DESC
|
||
LIMIT 10
|
||
"""
|
||
).fetchall()
|
||
waiting_tasks = conn.execute(
|
||
"""
|
||
SELECT t.*, u.username AS assigned_username
|
||
FROM tasks t
|
||
LEFT JOIN users u ON u.id = t.assigned_to
|
||
WHERE t.status = 'Warten auf André'
|
||
ORDER BY t.updated_at DESC LIMIT 5
|
||
"""
|
||
).fetchall()
|
||
recent_files = conn.execute(
|
||
"""
|
||
SELECT f.*, u.username AS uploader
|
||
FROM files f JOIN users u ON u.id = f.uploaded_by
|
||
ORDER BY f.created_at DESC LIMIT 5
|
||
"""
|
||
).fetchall()
|
||
recent_notes = conn.execute(
|
||
"""
|
||
SELECT n.*, u.username AS author
|
||
FROM notes n JOIN users u ON u.id = n.created_by
|
||
ORDER BY n.updated_at DESC LIMIT 5
|
||
"""
|
||
).fetchall()
|
||
return templates.TemplateResponse(
|
||
"dashboard.html",
|
||
template_ctx(
|
||
request, user,
|
||
open_tasks=[dict(r) for r in open_tasks],
|
||
waiting_tasks=[dict(r) for r in waiting_tasks],
|
||
recent_files=[dict(r) for r in recent_files],
|
||
recent_notes=[dict(r) for r in recent_notes],
|
||
),
|
||
)
|
||
|
||
|
||
@app.get("/tasks", response_class=HTMLResponse)
|
||
def tasks_list(request: Request, status: str = "", category: str = ""):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
q = """
|
||
SELECT t.*, u.username AS assigned_username, c.username AS creator_username
|
||
FROM tasks t
|
||
LEFT JOIN users u ON u.id = t.assigned_to
|
||
LEFT JOIN users c ON c.id = t.created_by
|
||
WHERE 1=1
|
||
"""
|
||
params: list[Any] = []
|
||
if status:
|
||
q += " AND t.status = ?"
|
||
params.append(status)
|
||
if category:
|
||
q += " AND t.category = ?"
|
||
params.append(category)
|
||
q += " ORDER BY t.updated_at DESC"
|
||
with get_db() as conn:
|
||
tasks = [dict(r) for r in conn.execute(q, params).fetchall()]
|
||
for t in tasks:
|
||
t["tags"] = get_item_tags(conn, "task", t["id"])
|
||
return templates.TemplateResponse(
|
||
"tasks_list.html",
|
||
template_ctx(request, user, tasks=tasks, filter_status=status, filter_category=category),
|
||
)
|
||
|
||
|
||
@app.get("/tasks/new", response_class=HTMLResponse)
|
||
def task_new_page(request: Request):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
return templates.TemplateResponse(
|
||
"task_form.html",
|
||
template_ctx(request, user, task=None, users=all_users()),
|
||
)
|
||
|
||
|
||
@app.post("/tasks/new")
|
||
def task_create(
|
||
request: Request,
|
||
title: str = Form(...),
|
||
description: str = Form(""),
|
||
status: str = Form("Neu"),
|
||
priority: str = Form("normal"),
|
||
category: str = Form(""),
|
||
assigned_to: str = Form(""),
|
||
due_date: str = Form(""),
|
||
tags: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
if status not in TASK_STATUSES:
|
||
status = "Neu"
|
||
if priority not in TASK_PRIORITIES:
|
||
priority = "normal"
|
||
assign_id = int(assigned_to) if assigned_to.isdigit() else None
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
cur = conn.execute(
|
||
"""
|
||
INSERT INTO tasks (title, description, status, priority, category, assigned_to, due_date, created_by, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(title.strip(), description.strip(), status, priority, category or None,
|
||
assign_id, due_date or None, user["id"], ts, ts),
|
||
)
|
||
task_id = int(cur.lastrowid)
|
||
set_item_tags(conn, "task", task_id, tags)
|
||
return RedirectResponse(f"/tasks/{task_id}", status_code=303)
|
||
|
||
|
||
@app.get("/tasks/{task_id}", response_class=HTMLResponse)
|
||
def task_detail(request: Request, task_id: int):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
with get_db() as conn:
|
||
task = conn.execute(
|
||
"""
|
||
SELECT t.*, u.username AS assigned_username, c.username AS creator_username
|
||
FROM tasks t
|
||
LEFT JOIN users u ON u.id = t.assigned_to
|
||
LEFT JOIN users c ON c.id = t.created_by
|
||
WHERE t.id = ?
|
||
""",
|
||
(task_id,),
|
||
).fetchone()
|
||
if not task:
|
||
raise HTTPException(status_code=404)
|
||
comments = conn.execute(
|
||
"""
|
||
SELECT tc.*, u.username FROM task_comments tc
|
||
JOIN users u ON u.id = tc.user_id
|
||
WHERE tc.task_id = ? ORDER BY tc.created_at ASC
|
||
""",
|
||
(task_id,),
|
||
).fetchall()
|
||
tags = get_item_tags(conn, "task", task_id)
|
||
task_files = conn.execute(
|
||
"""
|
||
SELECT f.*, u.username AS uploader
|
||
FROM files f JOIN users u ON u.id = f.uploaded_by
|
||
WHERE f.task_id = ?
|
||
ORDER BY f.created_at DESC
|
||
""",
|
||
(task_id,),
|
||
).fetchall()
|
||
task_files_list = [dict(r) for r in task_files]
|
||
for tf in task_files_list:
|
||
tf["tags"] = get_item_tags(conn, "file", tf["id"])
|
||
task_documents = conn.execute(
|
||
"""
|
||
SELECT d.id, d.title, d.category, d.updated_at, c.username AS creator_username
|
||
FROM documents d
|
||
JOIN users c ON c.id = d.created_by
|
||
WHERE d.task_id = ? AND d.deleted_at IS NULL
|
||
ORDER BY d.updated_at DESC
|
||
""",
|
||
(task_id,),
|
||
).fetchall()
|
||
return templates.TemplateResponse(
|
||
"task_detail.html",
|
||
template_ctx(
|
||
request, user,
|
||
task=dict(task),
|
||
comments=[dict(c) for c in comments],
|
||
tags=tags,
|
||
users=all_users(),
|
||
task_files=task_files_list,
|
||
task_documents=[dict(r) for r in task_documents],
|
||
),
|
||
)
|
||
|
||
|
||
@app.get("/tasks/{task_id}/edit", response_class=HTMLResponse)
|
||
def task_edit_page(request: Request, task_id: int):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
with get_db() as conn:
|
||
task = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||
if not task:
|
||
raise HTTPException(status_code=404)
|
||
tags = ", ".join(get_item_tags(conn, "task", task_id))
|
||
t = dict(task)
|
||
t["tags_str"] = tags
|
||
return templates.TemplateResponse(
|
||
"task_form.html",
|
||
template_ctx(request, user, task=t, users=all_users()),
|
||
)
|
||
|
||
|
||
@app.post("/tasks/{task_id}/edit")
|
||
def task_update(
|
||
request: Request,
|
||
task_id: int,
|
||
title: str = Form(...),
|
||
description: str = Form(""),
|
||
status: str = Form("Neu"),
|
||
priority: str = Form("normal"),
|
||
category: str = Form(""),
|
||
assigned_to: str = Form(""),
|
||
due_date: str = Form(""),
|
||
tags: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
if status not in TASK_STATUSES:
|
||
status = "Neu"
|
||
if priority not in TASK_PRIORITIES:
|
||
priority = "normal"
|
||
assign_id = int(assigned_to) if assigned_to.isdigit() else None
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
conn.execute(
|
||
"""
|
||
UPDATE tasks SET title=?, description=?, status=?, priority=?, category=?,
|
||
assigned_to=?, due_date=?, updated_at=? WHERE id=?
|
||
""",
|
||
(title.strip(), description.strip(), status, priority, category or None,
|
||
assign_id, due_date or None, ts, task_id),
|
||
)
|
||
set_item_tags(conn, "task", task_id, tags)
|
||
return RedirectResponse(f"/tasks/{task_id}", status_code=303)
|
||
|
||
|
||
@app.post("/tasks/{task_id}/comment")
|
||
def task_add_comment(
|
||
request: Request,
|
||
task_id: int,
|
||
comment: str = Form(...),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
text = comment.strip()
|
||
if not text:
|
||
return RedirectResponse(f"/tasks/{task_id}", status_code=303)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
conn.execute(
|
||
"INSERT INTO task_comments (task_id, user_id, comment, created_at) VALUES (?, ?, ?, ?)",
|
||
(task_id, user["id"], text, ts),
|
||
)
|
||
conn.execute("UPDATE tasks SET updated_at = ? WHERE id = ?", (ts, task_id))
|
||
return RedirectResponse(f"/tasks/{task_id}", status_code=303)
|
||
|
||
|
||
@app.get("/files", response_class=HTMLResponse)
|
||
def files_list(request: Request, category: str = ""):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
q = """
|
||
SELECT f.*, u.username AS uploader
|
||
FROM files f JOIN users u ON u.id = f.uploaded_by
|
||
WHERE 1=1
|
||
"""
|
||
params: list[Any] = []
|
||
if category:
|
||
q += " AND f.category = ?"
|
||
params.append(category)
|
||
q += " ORDER BY f.created_at DESC"
|
||
with get_db() as conn:
|
||
files = [dict(r) for r in conn.execute(q, params).fetchall()]
|
||
for f in files:
|
||
f["tags"] = get_item_tags(conn, "file", f["id"])
|
||
return templates.TemplateResponse(
|
||
"files_list.html",
|
||
template_ctx(request, user, files=files, filter_category=category),
|
||
)
|
||
|
||
|
||
@app.post("/files/upload")
|
||
async def file_upload(
|
||
request: Request,
|
||
upload: UploadFile = File(...),
|
||
category: str = Form(""),
|
||
description: str = Form(""),
|
||
tags: str = Form(""),
|
||
task_id: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
tid = int(task_id) if task_id.isdigit() else None
|
||
ok, err, _ = await save_uploaded_file(
|
||
upload, user["id"], category=category, description=description, tags=tags, task_id=tid,
|
||
)
|
||
if not ok:
|
||
if tid:
|
||
return RedirectResponse(f"/tasks/{tid}?error={err}", status_code=303)
|
||
return RedirectResponse(f"/files?error={err}", status_code=303)
|
||
if tid:
|
||
return RedirectResponse(f"/tasks/{tid}?upload=ok", status_code=303)
|
||
return RedirectResponse("/files?upload=ok", status_code=303)
|
||
|
||
|
||
@app.post("/api/files/upload")
|
||
async def api_file_upload(
|
||
request: Request,
|
||
upload: UploadFile = File(...),
|
||
category: str = Form(""),
|
||
description: str = Form(""),
|
||
tags: str = Form(""),
|
||
task_id: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
tid = int(task_id) if task_id.isdigit() else None
|
||
ok, err, file_data = await save_uploaded_file(
|
||
upload, user["id"], category=category, description=description, tags=tags, task_id=tid,
|
||
)
|
||
if not ok:
|
||
messages = {
|
||
"blocked": "Dieser Dateityp ist aus Sicherheitsgründen nicht erlaubt.",
|
||
"type": "Dateityp nicht erlaubt.",
|
||
"size": f"Datei zu gross (max. {MAX_UPLOAD_MB} MB).",
|
||
"task_not_found": "Aufgabe nicht gefunden.",
|
||
"invalid": "Ungültiger Dateiname.",
|
||
}
|
||
return JSONResponse({"ok": False, "error": err, "message": messages.get(err, "Upload fehlgeschlagen.")})
|
||
return JSONResponse({"ok": True, "file": file_data})
|
||
|
||
|
||
@app.get("/files/{file_id}/download")
|
||
def file_download(request: Request, file_id: int):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
with get_db() as conn:
|
||
row = conn.execute("SELECT * FROM files WHERE id = ?", (file_id,)).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404)
|
||
path = UPLOAD_DIR / row["stored_filename"]
|
||
if not path.is_file() or ".." in row["stored_filename"]:
|
||
raise HTTPException(status_code=404)
|
||
return FileResponse(
|
||
path,
|
||
filename=row["original_filename"],
|
||
media_type=row["content_type"] or "application/octet-stream",
|
||
)
|
||
|
||
|
||
@app.get("/files/{file_id}/edit", response_class=HTMLResponse)
|
||
def file_edit_page(request: Request, file_id: int):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
with get_db() as conn:
|
||
row = conn.execute(
|
||
"""
|
||
SELECT f.*, u.username AS uploader
|
||
FROM files f JOIN users u ON u.id = f.uploaded_by
|
||
WHERE f.id = ?
|
||
""",
|
||
(file_id,),
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404)
|
||
file_info = dict(row)
|
||
odt = is_odt_file(file_info["original_filename"])
|
||
return templates.TemplateResponse(
|
||
"file_edit.html",
|
||
template_ctx(request, user, file=file_info, is_odt=odt),
|
||
)
|
||
|
||
|
||
@app.get("/notes", response_class=HTMLResponse)
|
||
def notes_list(request: Request, category: str = ""):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
q = """
|
||
SELECT n.*, u.username AS author
|
||
FROM notes n JOIN users u ON u.id = n.created_by
|
||
WHERE 1=1
|
||
"""
|
||
params: list[Any] = []
|
||
if category:
|
||
q += " AND n.category = ?"
|
||
params.append(category)
|
||
q += " ORDER BY n.updated_at DESC"
|
||
with get_db() as conn:
|
||
notes = [dict(r) for r in conn.execute(q, params).fetchall()]
|
||
for n in notes:
|
||
n["tags"] = get_item_tags(conn, "note", n["id"])
|
||
return templates.TemplateResponse(
|
||
"notes_list.html",
|
||
template_ctx(request, user, notes=notes, filter_category=category),
|
||
)
|
||
|
||
|
||
@app.get("/notes/new", response_class=HTMLResponse)
|
||
def note_new_page(request: Request):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
return templates.TemplateResponse(
|
||
"note_form.html",
|
||
template_ctx(request, user, note=None),
|
||
)
|
||
|
||
|
||
@app.post("/notes/new")
|
||
def note_create(
|
||
request: Request,
|
||
title: str = Form(...),
|
||
body: str = Form(""),
|
||
category: str = Form(""),
|
||
tags: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
cur = conn.execute(
|
||
"INSERT INTO notes (title, body, category, created_by, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
|
||
(title.strip(), body.strip(), category or None, user["id"], ts, ts),
|
||
)
|
||
set_item_tags(conn, "note", int(cur.lastrowid), tags)
|
||
return RedirectResponse("/notes", status_code=303)
|
||
|
||
|
||
@app.get("/notes/{note_id}/edit", response_class=HTMLResponse)
|
||
def note_edit_page(request: Request, note_id: int):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
with get_db() as conn:
|
||
note = conn.execute("SELECT * FROM notes WHERE id = ?", (note_id,)).fetchone()
|
||
if not note:
|
||
raise HTTPException(status_code=404)
|
||
tags = ", ".join(get_item_tags(conn, "note", note_id))
|
||
n = dict(note)
|
||
n["tags_str"] = tags
|
||
return templates.TemplateResponse(
|
||
"note_form.html",
|
||
template_ctx(request, user, note=n),
|
||
)
|
||
|
||
|
||
@app.post("/notes/{note_id}/edit")
|
||
def note_update(
|
||
request: Request,
|
||
note_id: int,
|
||
title: str = Form(...),
|
||
body: str = Form(""),
|
||
category: str = Form(""),
|
||
tags: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
conn.execute(
|
||
"UPDATE notes SET title=?, body=?, category=?, updated_at=? WHERE id=?",
|
||
(title.strip(), body.strip(), category or None, ts, note_id),
|
||
)
|
||
set_item_tags(conn, "note", note_id, tags)
|
||
return RedirectResponse("/notes", status_code=303)
|
||
|
||
|
||
@app.get("/documents", response_class=HTMLResponse)
|
||
def documents_list(request: Request):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
with get_db() as conn:
|
||
docs = [
|
||
dict(r)
|
||
for r in conn.execute(
|
||
"""
|
||
SELECT d.id, d.title, d.category, d.updated_at, d.created_at,
|
||
c.username AS creator_username, t.title AS task_title, d.task_id
|
||
FROM documents d
|
||
JOIN users c ON c.id = d.created_by
|
||
LEFT JOIN tasks t ON t.id = d.task_id
|
||
WHERE d.deleted_at IS NULL
|
||
ORDER BY d.updated_at DESC
|
||
"""
|
||
).fetchall()
|
||
]
|
||
return templates.TemplateResponse(
|
||
"documents_list.html",
|
||
template_ctx(request, user, documents=docs, message=request.query_params.get("msg")),
|
||
)
|
||
|
||
|
||
@app.get("/documents/new", response_class=HTMLResponse)
|
||
def document_new_page(request: Request, task_id: str = ""):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
pre_task = int(task_id) if task_id.isdigit() else None
|
||
return templates.TemplateResponse(
|
||
"document_new.html",
|
||
template_ctx(request, user, tasks=all_tasks_for_select(), pre_task_id=pre_task),
|
||
)
|
||
|
||
|
||
@app.post("/documents/new")
|
||
def document_create(
|
||
request: Request,
|
||
title: str = Form(...),
|
||
category: str = Form(""),
|
||
task_id: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
title = title.strip() or "Unbenanntes Dokument"
|
||
tid = int(task_id) if task_id.isdigit() else None
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
if tid is not None:
|
||
task = conn.execute("SELECT id FROM tasks WHERE id = ?", (tid,)).fetchone()
|
||
if not task:
|
||
tid = None
|
||
cur = conn.execute(
|
||
"""
|
||
INSERT INTO documents (title, content_html, category, task_id, created_by, updated_by, created_at, updated_at)
|
||
VALUES (?, '', ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(title, category or None, tid, user["id"], user["id"], ts, ts),
|
||
)
|
||
doc_id = int(cur.lastrowid)
|
||
return RedirectResponse(f"/documents/{doc_id}", status_code=303)
|
||
|
||
|
||
@app.get("/documents/{document_id}", response_class=HTMLResponse)
|
||
def document_edit_page(request: Request, document_id: int):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
doc = get_document(document_id)
|
||
if not doc:
|
||
raise HTTPException(status_code=404)
|
||
return templates.TemplateResponse(
|
||
"document_edit.html",
|
||
template_ctx(request, user, document=doc, tasks=all_tasks_for_select()),
|
||
)
|
||
|
||
|
||
@app.post("/api/documents/{document_id}/save")
|
||
async def api_document_save(
|
||
request: Request,
|
||
document_id: int,
|
||
title: str = Form(...),
|
||
content_html: str = Form(""),
|
||
category: str = Form(""),
|
||
task_id: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
doc = get_document(document_id)
|
||
if not doc:
|
||
return JSONResponse({"ok": False, "error": "not_found", "message": "Dokument nicht gefunden."}, status_code=404)
|
||
title = title.strip() or "Unbenanntes Dokument"
|
||
tid = int(task_id) if task_id.isdigit() else None
|
||
if tid is not None:
|
||
with get_db() as conn:
|
||
task = conn.execute("SELECT id FROM tasks WHERE id = ?", (tid,)).fetchone()
|
||
if not task:
|
||
tid = None
|
||
safe_html = sanitize_html(content_html)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
conn.execute(
|
||
"""
|
||
UPDATE documents SET title=?, content_html=?, category=?, task_id=?,
|
||
updated_by=?, updated_at=? WHERE id=? AND deleted_at IS NULL
|
||
""",
|
||
(title, safe_html, category or None, tid, user["id"], ts, document_id),
|
||
)
|
||
return JSONResponse({
|
||
"ok": True,
|
||
"updated_at": ts,
|
||
"title": title,
|
||
})
|
||
|
||
|
||
@app.post("/documents/{document_id}/delete")
|
||
def document_delete(
|
||
request: Request,
|
||
document_id: int,
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
doc = get_document(document_id)
|
||
if not doc:
|
||
raise HTTPException(status_code=404)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
conn.execute(
|
||
"UPDATE documents SET deleted_at = ?, updated_by = ?, updated_at = ? WHERE id = ?",
|
||
(ts, user["id"], ts, document_id),
|
||
)
|
||
return RedirectResponse("/documents?msg=deleted", status_code=303)
|
||
|
||
|
||
@app.get("/search", response_class=HTMLResponse)
|
||
def search_page(request: Request, q: str = ""):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
results: dict[str, list] = {"tasks": [], "files": [], "notes": [], "documents": []}
|
||
term = q.strip()
|
||
if term:
|
||
like = f"%{term}%"
|
||
with get_db() as conn:
|
||
results["tasks"] = [
|
||
dict(r) for r in conn.execute(
|
||
"""
|
||
SELECT id, title, status, category, updated_at FROM tasks
|
||
WHERE title LIKE ? OR description LIKE ? OR category LIKE ?
|
||
ORDER BY updated_at DESC LIMIT 30
|
||
""",
|
||
(like, like, like),
|
||
).fetchall()
|
||
]
|
||
results["files"] = [
|
||
dict(r) for r in conn.execute(
|
||
"""
|
||
SELECT id, original_filename, category, description, created_at FROM files
|
||
WHERE original_filename LIKE ? OR description LIKE ? OR category LIKE ?
|
||
ORDER BY created_at DESC LIMIT 30
|
||
""",
|
||
(like, like, like),
|
||
).fetchall()
|
||
]
|
||
results["notes"] = [
|
||
dict(r) for r in conn.execute(
|
||
"""
|
||
SELECT id, title, category, updated_at FROM notes
|
||
WHERE title LIKE ? OR body LIKE ? OR category LIKE ?
|
||
ORDER BY updated_at DESC LIMIT 30
|
||
""",
|
||
(like, like, like),
|
||
).fetchall()
|
||
]
|
||
doc_rows = conn.execute(
|
||
"""
|
||
SELECT id, title, category, updated_at, content_html FROM documents
|
||
WHERE deleted_at IS NULL
|
||
AND (title LIKE ? OR category LIKE ? OR content_html LIKE ?)
|
||
ORDER BY updated_at DESC LIMIT 30
|
||
""",
|
||
(like, like, like),
|
||
).fetchall()
|
||
results["documents"] = [
|
||
{k: r[k] for k in ("id", "title", "category", "updated_at")}
|
||
for r in doc_rows
|
||
]
|
||
tag_rows = conn.execute(
|
||
"SELECT item_type, item_id FROM item_tags it JOIN tags t ON t.id = it.tag_id WHERE t.name LIKE ?",
|
||
(like,),
|
||
).fetchall()
|
||
for tr in tag_rows:
|
||
itype, iid = tr["item_type"], tr["item_id"]
|
||
if itype == "task" and not any(t["id"] == iid for t in results["tasks"]):
|
||
row = conn.execute("SELECT id, title, status, category, updated_at FROM tasks WHERE id=?", (iid,)).fetchone()
|
||
if row:
|
||
results["tasks"].append(dict(row))
|
||
elif itype == "file" and not any(f["id"] == iid for f in results["files"]):
|
||
row = conn.execute(
|
||
"SELECT id, original_filename, category, description, created_at FROM files WHERE id=?", (iid,),
|
||
).fetchone()
|
||
if row:
|
||
results["files"].append(dict(row))
|
||
elif itype == "note" and not any(n["id"] == iid for n in results["notes"]):
|
||
row = conn.execute("SELECT id, title, category, updated_at FROM notes WHERE id=?", (iid,)).fetchone()
|
||
if row:
|
||
results["notes"].append(dict(row))
|
||
return templates.TemplateResponse(
|
||
"search.html",
|
||
template_ctx(request, user, query=term, results=results),
|
||
)
|
||
|
||
|
||
@app.get("/admin", response_class=HTMLResponse)
|
||
def admin_page(request: Request):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
if user["role"] != "admin":
|
||
raise HTTPException(status_code=403, detail="Nur für Administratoren")
|
||
with get_db() as conn:
|
||
users = [dict(r) for r in conn.execute(
|
||
"""
|
||
SELECT id, username, display_name, role, is_active, deleted_at, created_at, updated_at
|
||
FROM users ORDER BY username
|
||
"""
|
||
).fetchall()]
|
||
return templates.TemplateResponse(
|
||
"admin.html",
|
||
template_ctx(request, user, users=users, message=request.query_params.get("msg")),
|
||
)
|
||
|
||
|
||
@app.get("/profile", response_class=HTMLResponse)
|
||
def profile_page(request: Request):
|
||
user = get_current_user(request)
|
||
if not user:
|
||
return RedirectResponse("/login", status_code=303)
|
||
profile = get_user_profile(user["id"])
|
||
if not profile:
|
||
return RedirectResponse("/login", status_code=303)
|
||
return templates.TemplateResponse(
|
||
"profile.html",
|
||
template_ctx(request, user, profile=profile, message=request.query_params.get("msg")),
|
||
)
|
||
|
||
|
||
@app.post("/profile/update")
|
||
def profile_update(
|
||
request: Request,
|
||
display_name: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
name = display_name.strip() or None
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
conn.execute(
|
||
"UPDATE users SET display_name = ?, updated_at = ? WHERE id = ?",
|
||
(name, ts, user["id"]),
|
||
)
|
||
return RedirectResponse("/profile?msg=profile_saved", status_code=303)
|
||
|
||
|
||
@app.post("/profile/password")
|
||
def profile_password(
|
||
request: Request,
|
||
current_password: str = Form(...),
|
||
new_password: str = Form(...),
|
||
new_password_confirm: str = Form(...),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
user = require_login(request)
|
||
verify_csrf(request, csrf_token)
|
||
if new_password != new_password_confirm:
|
||
return RedirectResponse("/profile?msg=password_mismatch", status_code=303)
|
||
if len(new_password) < 8:
|
||
return RedirectResponse("/profile?msg=password_short", status_code=303)
|
||
with get_db() as conn:
|
||
row = conn.execute("SELECT password_hash FROM users WHERE id = ?", (user["id"],)).fetchone()
|
||
if not row or not verify_password(current_password, row["password_hash"]):
|
||
return RedirectResponse("/profile?msg=password_wrong", status_code=303)
|
||
conn.execute(
|
||
"UPDATE users SET password_hash = ?, updated_at = ? WHERE id = ?",
|
||
(hash_password(new_password), now_iso(), user["id"]),
|
||
)
|
||
return RedirectResponse("/profile?msg=password_changed", status_code=303)
|
||
|
||
|
||
@app.post("/admin/users/new")
|
||
def admin_create_user(
|
||
request: Request,
|
||
username: str = Form(...),
|
||
password: str = Form(...),
|
||
display_name: str = Form(""),
|
||
role: str = Form("assistant"),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
admin = require_admin(request)
|
||
verify_csrf(request, csrf_token)
|
||
_ = admin
|
||
username = username.strip()
|
||
if len(username) < 3 or len(password) < 8:
|
||
return RedirectResponse("/admin?msg=invalid", status_code=303)
|
||
if role not in ROLES:
|
||
role = "assistant"
|
||
ts = now_iso()
|
||
try:
|
||
with get_db() as conn:
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO users (username, password_hash, display_name, role, is_active, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, 1, ?, ?)
|
||
""",
|
||
(username, hash_password(password), display_name.strip() or None, role, ts, ts),
|
||
)
|
||
except sqlite3.IntegrityError:
|
||
return RedirectResponse("/admin?msg=exists", status_code=303)
|
||
return RedirectResponse("/admin?msg=created", status_code=303)
|
||
|
||
|
||
@app.post("/admin/users/{user_id}/edit")
|
||
def admin_edit_user(
|
||
request: Request,
|
||
user_id: int,
|
||
display_name: str = Form(""),
|
||
role: str = Form("assistant"),
|
||
password: str = Form(""),
|
||
csrf_token: str = Form(...),
|
||
):
|
||
admin = require_admin(request)
|
||
verify_csrf(request, csrf_token)
|
||
if role not in ROLES:
|
||
role = "assistant"
|
||
if user_id == admin["id"] and role != "admin":
|
||
return RedirectResponse("/admin?msg=self_role", status_code=303)
|
||
if password and len(password) < 8:
|
||
return RedirectResponse("/admin?msg=invalid", status_code=303)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
row = conn.execute("SELECT id FROM users WHERE id = ? AND deleted_at IS NULL", (user_id,)).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404)
|
||
if password:
|
||
conn.execute(
|
||
"UPDATE users SET display_name=?, role=?, password_hash=?, updated_at=? WHERE id=?",
|
||
(display_name.strip() or None, role, hash_password(password), ts, user_id),
|
||
)
|
||
else:
|
||
conn.execute(
|
||
"UPDATE users SET display_name=?, role=?, updated_at=? WHERE id=?",
|
||
(display_name.strip() or None, role, ts, user_id),
|
||
)
|
||
return RedirectResponse("/admin?msg=updated", status_code=303)
|
||
|
||
|
||
@app.post("/admin/users/{user_id}/toggle")
|
||
def admin_toggle_user(
|
||
request: Request,
|
||
user_id: int,
|
||
csrf_token: str = Form(...),
|
||
):
|
||
admin = require_admin(request)
|
||
verify_csrf(request, csrf_token)
|
||
if user_id == admin["id"]:
|
||
return RedirectResponse("/admin?msg=self", status_code=303)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT is_active FROM users WHERE id = ? AND deleted_at IS NULL", (user_id,),
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404)
|
||
new_val = 0 if row["is_active"] else 1
|
||
conn.execute(
|
||
"UPDATE users SET is_active = ?, updated_at = ? WHERE id = ?",
|
||
(new_val, ts, user_id),
|
||
)
|
||
return RedirectResponse("/admin?msg=toggled", status_code=303)
|
||
|
||
|
||
@app.post("/admin/users/{user_id}/delete")
|
||
def admin_delete_user(
|
||
request: Request,
|
||
user_id: int,
|
||
csrf_token: str = Form(...),
|
||
):
|
||
admin = require_admin(request)
|
||
verify_csrf(request, csrf_token)
|
||
if user_id == admin["id"]:
|
||
return RedirectResponse("/admin?msg=self", status_code=303)
|
||
ts = now_iso()
|
||
with get_db() as conn:
|
||
row = conn.execute("SELECT id FROM users WHERE id = ? AND deleted_at IS NULL", (user_id,)).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404)
|
||
conn.execute(
|
||
"UPDATE users SET is_active = 0, deleted_at = ?, updated_at = ? WHERE id = ?",
|
||
(ts, ts, user_id),
|
||
)
|
||
return RedirectResponse("/admin?msg=deleted", status_code=303)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run("app:app", host="127.0.0.1", port=8088, reload=True)
|