Finish implementing invite codes

Closes #1
This commit is contained in:
digimint 2025-11-18 11:50:29 -06:00
parent 03a12b7889
commit 9bb625afe6
Signed by: digimint
GPG key ID: 8DF1C6FD85ABF748
19 changed files with 691 additions and 302 deletions

View file

@ -1,10 +1,13 @@
from datetime import datetime
import humanize
from typing import Any
from flask import Flask, render_template
from taskflower.auth import taskflower_login_manager
from taskflower.config import config
from taskflower.config import SignUpMode, config
from taskflower.db import db
from taskflower.api import APIBase
from taskflower.db.model.user import User
from taskflower.web import web_base
from taskflower.tools.hibp import hibp_bp
@ -40,8 +43,36 @@ def template_utility_fns():
+ ')'
)
def reltime(until: datetime) -> str:
''' Turn a timestamp into a human-readable relative value, relative
to the moment this function is called.
'''
now = datetime.now()
delta = now - until
if now > until:
return humanize.naturaldelta(
delta
) + ' ago'
else:
return 'in ' + humanize.naturaldelta(
delta
)
def can_generate_sign_up_codes(usr: User) -> bool:
match config.sign_up_mode:
case SignUpMode.OPEN:
return False
case SignUpMode.USERS_CAN_INVITE:
return usr.enabled
case SignUpMode.ADMINS_CAN_INVITE:
return (
usr.enabled and usr.administrator
)
return dict(
literal_call=literal_call
literal_call=literal_call,
reltime=reltime,
can_generate_sign_up_codes=can_generate_sign_up_codes
)
@app.route('/')

View file

@ -3,6 +3,7 @@ from enum import IntFlag
from typing import override
from taskflower.db import db
from taskflower.db.model.codes import SignUpCode
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.role import NamespaceRole, UserRole, UserToNamespaceRole
from taskflower.db.model.task import Task
@ -10,22 +11,24 @@ from taskflower.db.model.user import User
from taskflower.types.either import Either, Left, Right
from taskflower.types.option import Option, Some
ProtectedResourceType = User|Task|Namespace|NamespaceRole|UserRole
ProtectedResourceType = User|Task|Namespace|NamespaceRole|UserRole|SignUpCode
class NamespacePermissionType(IntFlag):
NO_PERMS = 0
READ = (1 << 0)
ADMINISTRATE = (1 << 1)
CREATE_IN = (1 << 2)
COMPLETE_OWN = (1 << 3)
COMPLETE_ALL = (1 << 4)
UNCOMPLETE_OWN = (1 << 5)
UNCOMPLETE_ALL = (1 << 6)
EDIT_OWN = (1 << 7)
EDIT_ALL = (1 << 8)
DELETE_OWN = (1 << 9)
DELETE_ALL = (1 << 10)
EDIT_ROLES = (1 << 11)
NO_PERMS = 0
READ = (1 << 0)
ADMINISTRATE = (1 << 1)
CREATE_TASKS_IN = (1 << 2)
COMPLETE_OWN_TASKS = (1 << 3)
COMPLETE_ALL_TASKS = (1 << 4)
UNCOMPLETE_OWN_TASKS = (1 << 5)
UNCOMPLETE_ALL_TASKS = (1 << 6)
EDIT_OWN_TASKS = (1 << 7)
EDIT_ALL_TASKS = (1 << 8)
DELETE_OWN_TASKS = (1 << 9)
DELETE_ALL_TASKS = (1 << 10)
EDIT_TAGS = (1 << 12)
EDIT_FIELDS = (1 << 13)
EDIT_ROLES = (1 << 11)
class UserPermissionType(IntFlag):
NO_PERMS = 0
@ -34,10 +37,7 @@ class UserPermissionType(IntFlag):
EDIT_USERNAME = (1 << 2)
EDIT_PROFILE = (1 << 3)
SEE_ALL_TASKS_OF = (1 << 4)
COMPLETE_ALL_TASKS_OF = (1 << 5)
UNCOMPLETE_ALL_TASKS_OF = (1 << 6)
EDIT_ALL_TASKS_OF = (1 << 7)
DELETE_ALL_TASKS_OF = (1 << 8)
ACT_AS = (1 << 5)
EDIT_ROLES = (1 << 9)
ADMINISTRATE = (1 << 10)
@ -47,10 +47,7 @@ SELF_USER_PERMISSIONS = (
| UserPermissionType.EDIT_USERNAME
| UserPermissionType.EDIT_PROFILE
| UserPermissionType.SEE_ALL_TASKS_OF
| UserPermissionType.COMPLETE_ALL_TASKS_OF
| UserPermissionType.UNCOMPLETE_ALL_TASKS_OF
| UserPermissionType.EDIT_ALL_TASKS_OF
| UserPermissionType.DELETE_ALL_TASKS_OF
| UserPermissionType.ACT_AS
| UserPermissionType.EDIT_ROLES
| UserPermissionType.ADMINISTRATE
)
@ -58,15 +55,16 @@ SELF_USER_PERMISSIONS = (
SELF_NAMESPACE_PERMISSIONS = (
NamespacePermissionType.READ
| NamespacePermissionType.ADMINISTRATE
| NamespacePermissionType.CREATE_IN
| NamespacePermissionType.COMPLETE_OWN
| NamespacePermissionType.COMPLETE_ALL
| NamespacePermissionType.UNCOMPLETE_OWN
| NamespacePermissionType.UNCOMPLETE_ALL
| NamespacePermissionType.EDIT_OWN
| NamespacePermissionType.EDIT_ALL
| NamespacePermissionType.DELETE_OWN
| NamespacePermissionType.DELETE_ALL
| NamespacePermissionType.CREATE_TASKS_IN
| NamespacePermissionType.COMPLETE_OWN_TASKS
| NamespacePermissionType.COMPLETE_ALL_TASKS
| NamespacePermissionType.UNCOMPLETE_OWN_TASKS
| NamespacePermissionType.UNCOMPLETE_ALL_TASKS
| NamespacePermissionType.EDIT_OWN_TASKS
| NamespacePermissionType.EDIT_ALL_TASKS
| NamespacePermissionType.DELETE_OWN_TASKS
| NamespacePermissionType.DELETE_ALL_TASKS
| NamespacePermissionType.EDIT_TAGS
| NamespacePermissionType.EDIT_ROLES
)
@ -174,14 +172,14 @@ class AuthorizationError(Exception):
user : User|Option[User]|None,
resource : ProtectedResourceType,
action : str,
user_perms : NamespacePermissionType,
required_perms : NamespacePermissionType
user_perms : NamespacePermissionType|UserPermissionType|None,
required_perms : NamespacePermissionType|UserPermissionType|None
) -> None:
self.user : Option[User] = Option[User].ensure(user)
self.resource : ProtectedResourceType = resource
self.action : str = action
self.user_perms : NamespacePermissionType = user_perms
self.required_perms : NamespacePermissionType = required_perms
self.user_perms : NamespacePermissionType|UserPermissionType|None = user_perms
self.required_perms : NamespacePermissionType|UserPermissionType|None = required_perms
super().__init__(str(self))
@override
@ -195,6 +193,6 @@ class AuthorizationError(Exception):
f'AuthorizationError: {user_str} tried to perform '
+ f'action ``{self.action}`` on {type(self.resource)} object '
+ f'``{str(self.resource)}``.'
+ f'\n - Required permissions: {str(self.required_perms)}'
+ f'\n - User has permissions: {str(self.user_perms)}'
+ f'\n - Required permissions: {str(self.required_perms) if self.required_perms else "N/A"}'
+ f'\n - User has permissions: {str(self.user_perms) if self.user_perms else "N/A"}'
)

View file

@ -58,6 +58,19 @@ class EnumFromEnv(Enum):
return Left(KeyError(f'No such key `{env_val}` in enum {cls.__name__}'))
class SignUpMode(EnumFromEnv):
''' Restrictions on who can sign up for an account.
OPEN: Anyone can sign up for an account, no registration code required.
USERS_CAN_INVITE: An invite code is required to create an account. Any
user can generate an invite code.
ADMINS_CAN_INVITE: An invite code is requird to create an account. Only
system administrators can generate an invite code.
'''
OPEN = auto()
USERS_CAN_INVITE = auto()
ADMINS_CAN_INVITE = auto()
class HIBPMode(EnumFromEnv):
''' Whether to download a local copy of the HaveIBeenPwned API. Note that
the database is very large (about 40GB at time of writing).
@ -110,6 +123,9 @@ class ConfigType:
# URL to submit issues to
issue_url: Option[str] = field(default_factory=Nothing[str])
# Users
sign_up_mode: SignUpMode = SignUpMode.ADMINS_CAN_INVITE
# Whether to keep a local copy of the HaveIBeenPwned API
hibp_mode: HIBPMode = HIBPMode.ONLINE_ONLY

View file

@ -0,0 +1,23 @@
from datetime import datetime
from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String
from sqlalchemy.sql import func
from sqlalchemy.orm import Mapped, mapped_column
from taskflower.db import db
class SignUpCode(db.Model):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
code: Mapped[str] = mapped_column(String(32), unique=True)
created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
expires: Mapped[datetime] = mapped_column(DateTime(timezone=True))
grants_admin: Mapped[bool] = mapped_column(Boolean, default=False)
created_by: Mapped[int] = mapped_column(Integer, ForeignKey('user.id'))
class NamespaceInviteCode(db.Model):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
code: Mapped[str] = mapped_column(String(32), unique=True)
created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
expires: Mapped[datetime] = mapped_column(DateTime(timezone=True))
created_by: Mapped[int] = mapped_column(Integer, ForeignKey('user.id'))
for_role: Mapped[int] = mapped_column(Integer, ForeignKey('namespace_role.id'))

View file

@ -1,15 +0,0 @@
from datetime import datetime
from sqlalchemy import DateTime, ForeignKey, Integer, String
from sqlalchemy.sql import func
from sqlalchemy.orm import Mapped, mapped_column
from taskflower.db import db
from taskflower.types.resource import APISerializable
class SignUpCode(db.Model, APISerializable):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
code: Mapped[str] = mapped_column(String, unique=True)
created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
expires: Mapped[datetime] = mapped_column(DateTime(timezone=True))
# Relations
created_by: Mapped[int] = mapped_column(Integer, ForeignKey('user.id'))

View file

@ -19,6 +19,7 @@ class User(db.Model, UserMixin, APISerializable):
# Status
enabled: Mapped[bool] = mapped_column(Boolean)
administrator: Mapped[bool] = mapped_column(Boolean, default=False)
# Pronouns
pr_sub: Mapped[str] = mapped_column(String)

View file

@ -19,7 +19,7 @@ def task_form_for_user(
(ns.id, ns.name)
for ns in namespaces_where_user_can(
user,
NamespacePermissionType.CREATE_IN
NamespacePermissionType.CREATE_TASKS_IN
)
]
@ -71,7 +71,7 @@ def task_form_for_user(
lambda ns: assert_user_perms_on_namespace(
current_user,
ns,
NamespacePermissionType.CREATE_IN,
NamespacePermissionType.CREATE_TASKS_IN,
'Create task'
)
).map(

300
src/taskflower/form/user.py Normal file
View file

@ -0,0 +1,300 @@
from typing import override
from wtforms import Field, Form, PasswordField, SelectField, StringField, ValidationError
from wtforms.validators import DataRequired, EqualTo, Length
from taskflower.auth import password_breach_count
from taskflower.auth.hash import make_hash_v1
from taskflower.config import SignUpMode, config
from taskflower.db import db
from taskflower.db.model.codes import SignUpCode
from taskflower.db.model.user import User
from taskflower.form import FormCreatesObject
from taskflower.types import ann
from taskflower.types.either import Either, Left, Right
from taskflower.types.option import Option, Some
def pwned_validator(_: Form, field: Field):
if isinstance(field.data, str): # pyright:ignore[reportAny]
breaches = password_breach_count(
field.data
).and_then(
lambda val: val,
lambda: 0
)
if breaches > 0:
raise ValidationError(f'Password has been breached {breaches} times!')
def optional_if_pronoun_specified(form: 'CreateUserForm|Form', field: Field):
if isinstance(form, CreateUserForm):
if form.pronouns.data == 'custom': # pyright:ignore[reportAny]
if not field.data: # pyright:ignore[reportAny]
raise ValidationError(f'{field.name} is required if using custom pronouns!')
def unique_username(_: Form, field: Field) :
if isinstance(field.data, str): # pyright:ignore[reportAny]
res = db.session.execute(
db.select( # pyright:ignore[reportAny]
User
).filter_by(
username=field.data
)
).scalar_one_or_none()
if res is not None:
raise ValidationError('Sorry, that username is taken!')
else:
raise ValidationError(f'Unexpected datatype {type(field.data)} passed to ``unique_username`` validator!') # pyright:ignore[reportAny]
def code_valid(_: Form, field: Field):
f_data: str|None = field.data # pyright:ignore[reportAny]
def _check_user_perms(code: SignUpCode) -> Either[Exception, SignUpCode]:
res = Option[User].encapsulate(
db.session.query(
User
).filter(
User.id == code.created_by
).one_or_none()
)
if isinstance(res, Some):
if not res.val.enabled:
return Left(ValidationError(
'The user that created that invite code has been deleted or disabled!'
))
if config.sign_up_mode == SignUpMode.ADMINS_CAN_INVITE:
if not res.val.administrator:
return Left(ValidationError(
'The user that created that invite code is not an administrator, and this instance is set to only allow invites from administrators.'
))
return Right(code)
else:
return Left(ValidationError(
'The user that created that invite code has been deleted or disabled!'
))
if config.sign_up_mode == SignUpMode.OPEN:
return # Don't check for a sign-up code on open instances.
if isinstance(f_data, str):
res = Option[SignUpCode].encapsulate(
db.session.query(
SignUpCode
).filter(
SignUpCode.code == f_data
).one_or_none()
).and_then(
lambda val: Right[Exception, SignUpCode](val),
lambda: Left[Exception, SignUpCode](ValidationError('Invalid sign-up code!'))
).flat_map(
_check_user_perms
)
if isinstance(res, Left):
raise res.val
else:
raise ValidationError(f'Unexpected datatype {type(f_data)}')
def _delete_sign_up_code(code: str):
count = db.session.query(
SignUpCode
).filter(
SignUpCode.code == code
).delete()
return count != 0
default_pronoun_sets = {
'they': (
'they',
'them',
'their',
'theirs',
'themself',
True
),
'it': (
'it',
'it',
'its',
'its',
'itself',
False
),
'fae': (
'fae',
'faer',
'faer',
'faers',
'faerself',
False
),
'xe': (
'xe',
'xem',
'xyr',
'xyrs',
'xyrself',
False
),
'she': (
'she',
'her',
'her',
'hers',
'herself',
False
),
'he': (
'he',
'him',
'his',
'his',
'himself',
False
)
}
class CreateUserForm(FormCreatesObject[User]):
username: StringField = StringField(
'Enter a unique username',
[
Length(
1,
message='Usernames must be at least one character long!'
),
Length(
max=32,
message='Usernames can\'t be longer than 32 characters!'
),
unique_username
]
)
display_name: StringField = StringField(
'Choose a display name',
[
Length(
min=1,
message='Display names must be at least one character long!'
),
Length(
max=256,
message='Display names can\'t be longer than 256 characters!'
)
]
)
pronouns: SelectField = SelectField(
'Select your pronouns',
choices=[
('they', 'they/them'),
('it', 'it/its'),
('fae', 'fae/faer'),
('xe', 'xe/xem'),
('she', 'she/her'),
('he', 'he/him'),
('custom', 'Custom...')
],
validators=[DataRequired()]
)
pr_sub: StringField = StringField(
'Custom Subjective Pronoun',
[optional_if_pronoun_specified]
)
pr_obj: StringField = StringField(
'Custom Objective Pronoun',
[optional_if_pronoun_specified]
)
pr_dep: StringField = StringField(
'Custom Dependent Possessive Pronoun',
[optional_if_pronoun_specified]
)
pr_ind: StringField = StringField(
'Custom Independent Possessive Pronoun',
[optional_if_pronoun_specified]
)
pr_ref: StringField = StringField(
'Custom Reflexive Pronoun',
[optional_if_pronoun_specified]
)
pr_plr: SelectField = SelectField(
'My pronouns are:',
choices=[
('singular', 'Singular (e.g. "she has")'),
('plural', 'Plural (e.g. "they have")')
],
validators=[optional_if_pronoun_specified]
)
password: PasswordField = PasswordField(
'Enter a secure password',
[
Length(
min = 16,
message='Passwords must be at least 16 characters long!'
),
Length(
max = 512,
message='The maximum password length is 512!'
),
pwned_validator,
EqualTo('confirm_password')
]
)
confirm_password: PasswordField = PasswordField(
'Type password again to confirm'
)
sign_up_code: StringField = StringField(
'Enter a sign-up code to register',
[
code_valid
]
)
code_required: bool = config.sign_up_mode != SignUpMode.OPEN
@override
def create_object(self) -> Either[Exception, User]:
if not self.validate():
return Left(ValidationError('Form data did not validate correctly!'))
if self.pronouns.data in default_pronoun_sets: # pyright:ignore[reportAny]
pronouns: tuple[
str, str,
str, str,
str, bool
] = default_pronoun_sets[self.pronouns.data] # pyright:ignore[reportAny]
else:
pronouns = (
ann(self.pr_sub.data), ann(self.pr_obj.data),
ann(self.pr_dep.data), ann(self.pr_ind.data),
ann(self.pr_ref.data), (
True if self.pr_plr.data == 'plural' # pyright:ignore[reportAny]
else False
)
)
return make_hash_v1(
self.password.data # pyright:ignore[reportArgumentType]
).map(
lambda hres:User(
username=self.username.data, # pyright:ignore[reportCallIssue]
display_name=self.display_name.data, # pyright:ignore[reportCallIssue]
enabled=True, # pyright:ignore[reportCallIssue]
pr_sub=pronouns[0], # pyright:ignore[reportCallIssue]
pr_obj=pronouns[1], # pyright:ignore[reportCallIssue]
pr_dep=pronouns[2], # pyright:ignore[reportCallIssue]
pr_ind=pronouns[3], # pyright:ignore[reportCallIssue]
pr_ref=pronouns[4], # pyright:ignore[reportCallIssue]
pr_plr=pronouns[5], # pyright:ignore[reportCallIssue]
password=hres.hash, # pyright:ignore[reportCallIssue]
salt=hres.salt, # pyright:ignore[reportCallIssue]
hash_params=hres.hash_params # pyright:ignore[reportCallIssue]
)
).side_effect(
lambda _: (
_delete_sign_up_code(self.sign_up_code.data)
) if self.sign_up_code.data is not None
else None
)

View file

@ -0,0 +1,27 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Self
from taskflower.db.model.codes import SignUpCode
from taskflower.types.either import Either, Right
@dataclass(frozen=True)
class SignUpCodeForUser:
id: int
code: str
created: datetime
expires: datetime
grants_admin: bool
@classmethod
def from_code(
cls,
code: SignUpCode
) -> Either[Exception, Self]:
return Right(cls(
code.id,
code.code,
code.created,
code.expires,
code.grants_admin
))

View file

@ -10,16 +10,18 @@ from taskflower.types.either import Either, Left, Right
@dataclass(frozen=True)
class NamespacePermsForUser:
read: bool
create_in: bool
complete_own: bool
complete_all: bool
uncomplete_own: bool
uncomplete_all: bool
edit_own: bool
edit_all: bool
delete_own: bool
delete_all: bool
create_tasks_in: bool
complete_own_tasks: bool
complete_all_tasks: bool
uncomplete_own_tasks: bool
uncomplete_all_tasks: bool
edit_own_tasks: bool
edit_all_tasks: bool
delete_own_tasks: bool
delete_all_tasks: bool
edit_roles: bool
edit_tags: bool
edit_fields: bool
administrate: bool
@dataclass(frozen=True)
@ -54,16 +56,18 @@ class NamespaceForUser():
ns.description,
NamespacePermsForUser(
NamespacePermissionType.READ in perms,
NamespacePermissionType.CREATE_IN in perms,
NamespacePermissionType.COMPLETE_OWN in perms,
NamespacePermissionType.COMPLETE_ALL in perms,
NamespacePermissionType.UNCOMPLETE_OWN in perms,
NamespacePermissionType.UNCOMPLETE_ALL in perms,
NamespacePermissionType.EDIT_OWN in perms,
NamespacePermissionType.EDIT_ALL in perms,
NamespacePermissionType.DELETE_OWN in perms,
NamespacePermissionType.DELETE_ALL in perms,
NamespacePermissionType.CREATE_TASKS_IN in perms,
NamespacePermissionType.COMPLETE_OWN_TASKS in perms,
NamespacePermissionType.COMPLETE_ALL_TASKS in perms,
NamespacePermissionType.UNCOMPLETE_OWN_TASKS in perms,
NamespacePermissionType.UNCOMPLETE_ALL_TASKS in perms,
NamespacePermissionType.EDIT_OWN_TASKS in perms,
NamespacePermissionType.EDIT_ALL_TASKS in perms,
NamespacePermissionType.DELETE_OWN_TASKS in perms,
NamespacePermissionType.DELETE_ALL_TASKS in perms,
NamespacePermissionType.EDIT_ROLES in perms,
NamespacePermissionType.EDIT_TAGS in perms,
NamespacePermissionType.EDIT_FIELDS in perms,
NamespacePermissionType.ADMINISTRATE in perms
),
(

View file

@ -0,0 +1,44 @@
{% extends "main.html" %}
{% block head_extras %}
<link rel="stylesheet" href={{ url_for("static", filename="list-view.css") }} />
{% endblock %}
{% block title %}My Sign-Up Codes{% endblock %}
{% macro list_entry(code) %}
<tr id="row-{{ code.id }}">
<td id="row-{{code.id}}-code">{{ code.code }}</td>
<td id="row-{{code.id}}-created">{{ reltime(code.created) }}</td>
<td id="row-{{code.id}}-expires">{{ reltime(code.expires) }}</td>
<td id="row-{{code.id}}-delete"><a href="{{url_for('web.invite.delete_sign_up', id=code.id)}}">🗑 DEL</a></td>
</tr>
{% endmacro %}
{% macro add_new() %}
<tr id="row-add-new">
<td colspan="4"><a href="{{url_for('web.invite.new_sign_up')}}">⊞ Generate a new code</a></td>
</tr>
{% endmacro %}
{% block main_content %}
<h1>My Sign-Up Codes</h1>
<table class="list">
<tbody>
<tr>
<th>Code</th>
<th>Created</th>
<th>Expires</th>
<th>Delete</th>
</tr>
{{ add_new() }}
{% for code in codes %}
{{ list_entry(code) }}
{% endfor %}
</tbody>
</table>
{% endblock%}

View file

@ -10,11 +10,14 @@
<a href={{ url_for("web.task.new") }}>Create Task</a>
{% if current_user.is_authenticated %}
{{ namespace_list(fetch_namespaces(current_user)) }}
<a href="{{ url_for('web.namespace.new') }}">+ New Namespace</a>
<a href="{{ url_for('web.namespace.new') }}">+ New Zone</a>
{% else %}
<div id="sidebar-spacer"></div>
{% endif %}
{% if current_user.is_authenticated %}
{% if can_generate_sign_up_codes(current_user) %}
<a href="{{url_for('web.invite.all_sign_up')}}">+ Invite</a>
{% endif %}
<a href={{ url_for(
"web.user.profile",
id=current_user.id

View file

@ -57,6 +57,12 @@
to make your life easier!
</p>
</div>
{% if form.code_required %}
<dl>
{{ render_field(form.sign_up_code) }}
</dl>
{% endif %}
<input id="submit-form" type="submit" value="REGISTER USER"/>
</form>

View file

@ -1,5 +1,6 @@
from flask import Blueprint
from taskflower.web.auth import web_auth
from taskflower.web.invite import web_invite
from taskflower.web.namespace import web_namespace
from taskflower.web.task import web_tasks
from taskflower.web.user import web_user
@ -9,4 +10,5 @@ web_base = Blueprint('web', __name__, url_prefix='/')
web_base.register_blueprint(web_tasks)
web_base.register_blueprint(web_user)
web_base.register_blueprint(web_auth)
web_base.register_blueprint(web_namespace)
web_base.register_blueprint(web_namespace)
web_base.register_blueprint(web_invite)

View file

View file

@ -5,6 +5,7 @@ from flask import render_template
from taskflower.auth.messages import not_found_or_not_authorized
from taskflower.auth.permission import AuthorizationError
from taskflower.auth.violations import report_authorization_error
from taskflower.config import config
from taskflower.types import FlaskViewReturnType
from taskflower.types.option import Nothing, Option, Some
@ -24,9 +25,9 @@ class ResponseErrorType(Exception):
@override
def __init__(
self,
method: HTTPMethod|str,
page_name: str,
reason: str = '',
method: HTTPMethod|str = HTTPMethod.GET,
page_name: str = '[unspecified]',
reason: str = '[unspecified]',
user_reason: str = '' # CAUTION: This message will be displayed to the user!
) -> None:
self.method: HTTPMethod = (
@ -73,6 +74,9 @@ class ResponseErrorForbidden(ResponseErrorType):
class ResponseErrorNotFound(ResponseErrorType):
status: HTTPStatus = HTTPStatus.NOT_FOUND
class ResponseErrorBadRequest(ResponseErrorType):
status: HTTPStatus = HTTPStatus.BAD_REQUEST
class ResponseErrorInternalServerError(ResponseErrorType):
status: HTTPStatus = HTTPStatus.INTERNAL_SERVER_ERROR
@ -120,6 +124,7 @@ def status_response(code: HTTPStatus, user_description: str|None = None) -> Flas
def response_from_exception(exc: Exception|None = None) -> FlaskViewReturnType:
if isinstance(exc, AuthorizationError):
report_authorization_error(exc)
return status_response(
HTTPStatus.NOT_FOUND,
not_found_or_not_authorized(

View file

@ -0,0 +1,163 @@
from datetime import datetime, timedelta
from secrets import token_hex
from flask import Blueprint, redirect, render_template, url_for
from flask_login import current_user, login_required # pyright:ignore[reportMissingTypeStubs, reportUnknownVariableType]
from taskflower.auth.permission import AuthorizationError
from taskflower.config import SignUpMode, config
from taskflower.db import db
from taskflower.db.model.codes import SignUpCode
from taskflower.db.model.user import User
from taskflower.sanitize.code import SignUpCodeForUser
from taskflower.types.either import Either, Left, Right, gather_successes
from taskflower.types.option import Option
from taskflower.web.errors import ResponseErrorBadRequest, ResponseErrorForbidden, ResponseErrorNotFound, response_from_exception
web_invite: Blueprint = Blueprint(
'invite',
__name__,
'/templates',
url_prefix='/invite'
)
def _can_make_sign_ups(
usr: User
) -> Either[Exception, User]:
match config.sign_up_mode:
case SignUpMode.OPEN:
return Left(
ResponseErrorBadRequest(
reason=f'User ID {usr.id} ({usr.username}) tried to access a sign-up code endpoint, but sign-up codes are not enabled!',
user_reason='This instance does not currently use sign-up codes for registration.'
)
)
case SignUpMode.ADMINS_CAN_INVITE:
if not (
usr.enabled and usr.administrator
):
return Left(
ResponseErrorForbidden(
reason=f'User ID {usr.id} ({usr.username}) tried to access a sign-up code endpoint, but sign-up codes are restricted to administrators and {usr.pr_sub} {"are" if usr.pr_plr else "is"} not an administrator',
user_reason='Only administrators are currently allowed to generate sign-up codes. If you think this is in error, please contact your instance administrator.'
)
)
case SignUpMode.USERS_CAN_INVITE:
if not (
usr.enabled
):
return Left(
ResponseErrorForbidden(
reason=f'User ID {usr.id} ({usr.username}) tried to access a sign-up code endpoint, but {usr.pr_dep} account is disabled!'
)
)
return Right(usr)
@web_invite.route('/sign-up')
@login_required
def all_sign_up():
cur_usr: User = current_user # pyright:ignore[reportAssignmentType]
return _can_make_sign_ups(
cur_usr
).flat_map(
lambda _: Right(
db.session.query(
SignUpCode
).filter(
SignUpCode.created_by == cur_usr.id
).all()
)
).flat_map(
lambda codes: Right(
gather_successes(
[
SignUpCodeForUser.from_code(co)
for co in codes
]
)
)
).and_then(
lambda codes: render_template(
'codes/all_sign_up_codes.html',
codes=codes
),
lambda exc: response_from_exception(exc)
)
@web_invite.route('/sign-up/new')
@login_required
def new_sign_up():
cur_usr: User = current_user # pyright:ignore[reportAssignmentType]
if isinstance(
cmsu:=_can_make_sign_ups(cur_usr),
Left
):
return response_from_exception(cmsu.val)
try:
code = SignUpCode(
code=token_hex(16), # pyright:ignore[reportCallIssue]
created=datetime.now(), # pyright:ignore[reportCallIssue]
expires=datetime.now() + timedelta(weeks=1), # pyright:ignore[reportCallIssue]
created_by=cur_usr.id # pyright:ignore[reportCallIssue]
)
db.session.add(code)
db.session.commit()
return redirect(url_for('web.invite.all_sign_up'))
except Exception as e:
return response_from_exception(e)
@web_invite.route('/sign-up/<int:id>/delete')
@login_required
def delete_sign_up(id: int):
cur_usr: User = current_user # pyright:ignore[reportAssignmentType]
def _do_delete(code: SignUpCode) -> Either[Exception, None]:
try:
db.session.delete(code)
db.session.commit()
return Right(None)
except Exception as e:
db.session.rollback()
return Left(e)
return Option[SignUpCode].encapsulate(
db.session.query(
SignUpCode
).filter(
SignUpCode.id == id
).one_or_none()
).and_then(
lambda val: Right[Exception, SignUpCode](val),
lambda: Left[Exception, SignUpCode](
ResponseErrorNotFound(
reason=f'User {cur_usr.id} ({cur_usr.username}) tried to delete a SignUpCode that doesn\'t exist!'
)
)
).flat_map(
lambda code: (
(
Right(code)
) if (
(code.created_by == cur_usr.id)
or cur_usr.administrator
) else (
Left(AuthorizationError(
cur_usr,
code,
'Delete',
None,
None
))
)
)
).flat_map(
_do_delete
).and_then(
lambda _: redirect(
url_for('web.invite.all_sign_up')
),
lambda exc: response_from_exception(exc)
)

View file

@ -129,7 +129,7 @@ def complete(id: int):
lambda tsk: assert_user_perms_on_task(
cur_usr,
tsk,
NamespacePermissionType.COMPLETE_ALL,
NamespacePermissionType.COMPLETE_ALL_TASKS,
'Complete task'
)
).lside_effect(
@ -180,7 +180,7 @@ def uncomplete(id: int):
lambda tsk: assert_user_perms_on_task(
cur_usr,
tsk,
NamespacePermissionType.UNCOMPLETE_ALL,
NamespacePermissionType.UNCOMPLETE_ALL_TASKS,
'Complete task'
)
).lside_effect(

View file

@ -1,14 +1,10 @@
from flask import Blueprint, redirect, render_template, request, url_for
from flask_login import login_user # pyright:ignore[reportMissingTypeStubs,reportUnknownVariableType]
from wtforms import Field, Form, PasswordField, SelectField, StringField, ValidationError
from wtforms.validators import DataRequired, EqualTo, Length
from taskflower.auth import password_breach_count
from taskflower.auth.hash import make_hash_v1
from taskflower.auth.permission import initialize_user_permissions
from taskflower.db import db
from taskflower.db.model.user import User
from taskflower.types import ann
from taskflower.form.user import CreateUserForm
from taskflower.types.either import Either, Left, Right
from taskflower.web.errors import ResponseErrorInternalServerError, response_from_exception
@ -20,223 +16,8 @@ web_user: Blueprint = Blueprint(
url_prefix='/users'
)
def pwned_validator(_: Form, field: Field):
if isinstance(field.data, str): # pyright:ignore[reportAny]
breaches = password_breach_count(
field.data
).and_then(
lambda val: val,
lambda: 0
)
if breaches > 0:
raise ValidationError(f'Password has been breached {breaches} times!')
def optional_if_pronoun_specified(form: 'CreateUserForm|Form', field: Field):
if isinstance(form, CreateUserForm):
if form.pronouns.data == 'custom': # pyright:ignore[reportAny]
if not field.data: # pyright:ignore[reportAny]
raise ValidationError(f'{field.name} is required if using custom pronouns!')
def unique_username(_: Form, field: Field) :
if isinstance(field.data, str): # pyright:ignore[reportAny]
res = db.session.execute(
db.select( # pyright:ignore[reportAny]
User
).filter_by(
username=field.data
)
).scalar_one_or_none()
if res is not None:
raise ValidationError('Sorry, that username is taken!')
else:
raise ValidationError(f'Unexpected datatype {type(field.data)} passed to ``unique_username`` validator!') # pyright:ignore[reportAny]
default_pronoun_sets = {
'they': (
'they',
'them',
'their',
'theirs',
'themself',
True
),
'it': (
'it',
'it',
'its',
'its',
'itself',
False
),
'fae': (
'fae',
'faer',
'faer',
'faers',
'faerself',
False
),
'xe': (
'xe',
'xem',
'xyr',
'xyrs',
'xyrself',
False
),
'she': (
'she',
'her',
'her',
'hers',
'herself',
False
),
'he': (
'he',
'him',
'his',
'his',
'himself',
False
)
}
class CreateUserForm(Form):
username: StringField = StringField(
'Enter a unique username',
[
Length(
1,
message='Usernames must be at least one character long!'
),
Length(
max=32,
message='Usernames can\'t be longer than 32 characters!'
),
unique_username
]
)
display_name: StringField = StringField(
'Choose a display name',
[
Length(
min=1,
message='Display names must be at least one character long!'
),
Length(
max=256,
message='Display names can\'t be longer than 256 characters!'
)
]
)
pronouns: SelectField = SelectField(
'Select your pronouns',
choices=[
('they', 'they/them'),
('it', 'it/its'),
('fae', 'fae/faer'),
('xe', 'xe/xem'),
('she', 'she/her'),
('he', 'he/him'),
('custom', 'Custom...')
],
validators=[DataRequired()]
)
pr_sub: StringField = StringField(
'Custom Subjective Pronoun',
[optional_if_pronoun_specified]
)
pr_obj: StringField = StringField(
'Custom Objective Pronoun',
[optional_if_pronoun_specified]
)
pr_dep: StringField = StringField(
'Custom Dependent Possessive Pronoun',
[optional_if_pronoun_specified]
)
pr_ind: StringField = StringField(
'Custom Independent Possessive Pronoun',
[optional_if_pronoun_specified]
)
pr_ref: StringField = StringField(
'Custom Reflexive Pronoun',
[optional_if_pronoun_specified]
)
pr_plr: SelectField = SelectField(
'My pronouns are:',
choices=[
('singular', 'Singular (e.g. "she has")'),
('plural', 'Plural (e.g. "they have")')
],
validators=[optional_if_pronoun_specified]
)
password: PasswordField = PasswordField(
'Enter a secure password',
[
Length(
min = 16,
message='Passwords must be at least 16 characters long!'
),
Length(
max = 512,
message='The maximum password length is 512!'
),
pwned_validator,
EqualTo('confirm_password')
]
)
confirm_password: PasswordField = PasswordField(
'Type password again to confirm'
)
@web_user.route('/new', methods=['GET', 'POST'])
def create_user_page():
def _user_from_form(form: CreateUserForm) -> Either[Exception, User]:
if not form.validate():
return Left(ResponseErrorInternalServerError(
request.method,
request.path
))
if form.pronouns.data in default_pronoun_sets: # pyright:ignore[reportAny]
pronouns: tuple[
str, str,
str, str,
str, bool
] = default_pronoun_sets[form.pronouns.data] # pyright:ignore[reportAny]
else:
pronouns = (
ann(form.pr_sub.data), ann(form.pr_obj.data),
ann(form.pr_dep.data), ann(form.pr_ind.data),
ann(form.pr_ref.data), (
True if form.pr_plr.data == 'plural' # pyright:ignore[reportAny]
else False
)
)
return make_hash_v1(
form.password.data # pyright:ignore[reportArgumentType]
).map(
lambda hres:User(
username=form.username.data, # pyright:ignore[reportCallIssue]
display_name=form.display_name.data, # pyright:ignore[reportCallIssue]
enabled=True, # pyright:ignore[reportCallIssue]
pr_sub=pronouns[0], # pyright:ignore[reportCallIssue]
pr_obj=pronouns[1], # pyright:ignore[reportCallIssue]
pr_dep=pronouns[2], # pyright:ignore[reportCallIssue]
pr_ind=pronouns[3], # pyright:ignore[reportCallIssue]
pr_ref=pronouns[4], # pyright:ignore[reportCallIssue]
pr_plr=pronouns[5], # pyright:ignore[reportCallIssue]
password=hres.hash, # pyright:ignore[reportCallIssue]
salt=hres.salt, # pyright:ignore[reportCallIssue]
hash_params=hres.hash_params # pyright:ignore[reportCallIssue]
)
)
def _add_to_db(user: User) -> Either[Exception, User]:
try:
db.session.add(user)
@ -253,7 +34,7 @@ def create_user_page():
form_data = CreateUserForm(request.form)
if request.method == 'POST' and form_data.validate():
return _user_from_form(form_data).flat_map(
return form_data.create_object().flat_map(
lambda usr: _add_to_db(usr)
).flat_map(
lambda usr: initialize_user_permissions(usr)