Add startup-check to generate an admin sign-up code if no admins are present

Closes #10
This commit is contained in:
digimint 2025-11-18 23:44:25 -06:00
parent 9508a8b132
commit 9707dbe45e
Signed by: digimint
GPG key ID: 8DF1C6FD85ABF748
6 changed files with 135 additions and 5 deletions

View file

@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
import logging
import humanize import humanize
from typing import Any from typing import Any
from flask import Flask, render_template from flask import Flask, render_template
@ -13,15 +14,25 @@ from taskflower.web import web_base
from taskflower.tools.hibp import hibp_bp from taskflower.tools.hibp import hibp_bp
log = logging.getLogger(__name__)
log.info('Initializing Taskflower...')
app = Flask(__name__) app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = config.db_url app.config['SQLALCHEMY_DATABASE_URI'] = config.db_url
app.config['SECRET_KEY'] = config.app_secret app.config['SECRET_KEY'] = config.app_secret
log.info(' > Initializing login manager...')
taskflower_login_manager.init_app(app) # pyright:ignore[reportUnknownMemberType] taskflower_login_manager.init_app(app) # pyright:ignore[reportUnknownMemberType]
log.info(' > Initializing database...')
db.init_app(app) db.init_app(app)
log.info(' > Building routes...')
app.register_blueprint(web_base) app.register_blueprint(web_base)
app.register_blueprint(hibp_bp) app.register_blueprint(hibp_bp)

View file

@ -1,6 +1,20 @@
from taskflower import app, db import logging
from taskflower import app
from taskflower.auth.startup import startup_checks
from taskflower.db import db
from taskflower.types.either import Left
log = logging.getLogger(__name__)
if __name__ == '__main__': if __name__ == '__main__':
with app.app_context(): with app.app_context():
db.create_all() db.create_all()
log.info('Running startup checks...')
res = startup_checks(db)
if isinstance(res, Left):
log.error(f'Startup checks failed: {res.val}')
raise res.val
else:
log.error('Startup checks succeeded!')
app.run(debug=True) app.run(debug=True)

View file

@ -0,0 +1,80 @@
from datetime import datetime, timedelta
import logging
from secrets import token_hex
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import MultipleResultsFound
from taskflower.config import SignUpMode, config
from taskflower.db.model.codes import SignUpCode
from taskflower.db.model.user import User
from taskflower.types.either import Either, Left, Right
log = logging.getLogger(__name__)
def _ensure_at_least_one_admin(db: SQLAlchemy) -> Either[Exception, None]:
def _extant_admin_code() -> SignUpCode|None:
try:
return db.session.query(
SignUpCode
).filter(
SignUpCode.expires > datetime.now()
).filter(
SignUpCode.grants_admin
).filter(
SignUpCode.created_by == -1
).one_or_none()
except Exception as e:
log.error(f'Error while querying the database for admin sign-up codes: {e}!')
if isinstance(e, MultipleResultsFound):
log.warning('Multiple admin sign-up keys found. Purging...')
delete_count = db.session.query(
SignUpCode
).filter(
SignUpCode.expires > datetime.now()
).filter(
SignUpCode.grants_admin
).filter(
SignUpCode.created_by == -1
).delete()
log.warning(f'Deleted {delete_count} admin sign-up keys.')
return None
try:
num_admins = db.session.query(
User
).filter(
User.administrator
).filter(
User.enabled
).count()
except Exception:
num_admins = 0
if num_admins > 0:
return Right(None)
else:
if config.sign_up_mode == SignUpMode.OPEN:
return Left(AssertionError(
'There are no admins and ``SignUpMode`` is OPEN. Please set ``SignUpMode`` to either ``USERS_CAN_INVITE`` or ``ADMINS_CAN_INVITE``, so that an administrator sign-up code can be generated.'
))
elif (extant_code:=_extant_admin_code()) is not None:
log.warning(f'There are no active administrators registered. A one-time administrator sign-up key has been generated. It will expire soon. Use this to sign up for an account: {extant_code.code}')
return Right(None)
else:
code = SignUpCode(
code=token_hex(16), # pyright:ignore[reportCallIssue]
created=datetime.now(), # pyright:ignore[reportCallIssue]
expires=datetime.now() + timedelta(hours=1), # pyright:ignore[reportCallIssue]
created_by=-1, # pyright:ignore[reportCallIssue]
grants_admin=True # pyright:ignore[reportCallIssue]
)
db.session.add(code)
db.session.commit()
log.warning(f'There are no active administrators registered. A one-time administrator sign-up key has been generated. It will expire in one hour. Use this to sign up for an account: {code.code}')
return Right(None)
def startup_checks(db: SQLAlchemy) -> Either[Exception, None]:
''' Runs various start-up checks.
'''
return _ensure_at_least_one_admin(db)

View file

@ -18,8 +18,8 @@ def do_commit(
database.session.rollback() database.session.rollback()
return Left(e) return Left(e)
def do_delete[T]( def do_delete(
to_delete: T, to_delete: object,
database: SQLAlchemy database: SQLAlchemy
) -> Either[Exception, None]: ) -> Either[Exception, None]:
try: try:

View file

@ -46,10 +46,32 @@ def unique_username(_: Form, field: Field) :
else: else:
raise ValidationError(f'Unexpected datatype {type(field.data)} passed to ``unique_username`` validator!') # pyright:ignore[reportAny] raise ValidationError(f'Unexpected datatype {type(field.data)} passed to ``unique_username`` validator!') # pyright:ignore[reportAny]
def code_grants_admin(code: str|None) -> bool:
if not code:
return False
try:
if db.session.query(
SignUpCode
).filter(
SignUpCode.code == code
).filter(
SignUpCode.grants_admin
).one_or_none():
return True
else:
return False
except Exception:
return False
def code_valid(_: Form, field: Field): def code_valid(_: Form, field: Field):
f_data: str|None = field.data # pyright:ignore[reportAny] f_data: str|None = field.data # pyright:ignore[reportAny]
def _check_user_perms(code: SignUpCode) -> Either[Exception, SignUpCode]: def _check_user_perms(code: SignUpCode) -> Either[Exception, SignUpCode]:
if code.created_by == -1:
# codes created by the system have UID -1. Bypass checks on these codes.
return Right(code)
res = Option[User].encapsulate( res = Option[User].encapsulate(
db.session.query( db.session.query(
User User
@ -275,6 +297,8 @@ class CreateUserForm(FormCreatesObject[User]):
) )
) )
grant_admin = code_grants_admin(self.sign_up_code.data)
return make_hash_v1( return make_hash_v1(
self.password.data # pyright:ignore[reportArgumentType] self.password.data # pyright:ignore[reportArgumentType]
).map( ).map(
@ -290,7 +314,8 @@ class CreateUserForm(FormCreatesObject[User]):
pr_plr=pronouns[5], # pyright:ignore[reportCallIssue] pr_plr=pronouns[5], # pyright:ignore[reportCallIssue]
password=hres.hash, # pyright:ignore[reportCallIssue] password=hres.hash, # pyright:ignore[reportCallIssue]
salt=hres.salt, # pyright:ignore[reportCallIssue] salt=hres.salt, # pyright:ignore[reportCallIssue]
hash_params=hres.hash_params # pyright:ignore[reportCallIssue] hash_params=hres.hash_params, # pyright:ignore[reportCallIssue]
administrator=grant_admin # pyright:ignore[reportCallIssue]
) )
).side_effect( ).side_effect(
lambda _: ( lambda _: (

View file

@ -11,7 +11,7 @@ from taskflower.db import commit_update, db, do_delete
from taskflower.db.helpers import add_to_db from taskflower.db.helpers import add_to_db
from taskflower.db.model.task import Task from taskflower.db.model.task import Task
from taskflower.db.model.user import User from taskflower.db.model.user import User
from taskflower.form import FormEditsObjectWithUser, task from taskflower.form import FormEditsObjectWithUser
from taskflower.form.task import task_edit_form_for_task, task_form_for_user from taskflower.form.task import task_edit_form_for_task, task_form_for_user
from taskflower.sanitize.task import TaskForUser from taskflower.sanitize.task import TaskForUser
from taskflower.types.either import Either, Left, Right, reduce_either from taskflower.types.either import Either, Left, Right, reduce_either