Fix database migrations; add cat/field/tag editing

Progress toward #4, #5, and #26
This commit is contained in:
digimint 2025-11-28 13:09:21 -06:00
parent ce18ed2fd4
commit 875c5f7f06
Signed by: digimint
GPG key ID: 8DF1C6FD85ABF748
27 changed files with 1343 additions and 46 deletions

3
.gitignore vendored
View file

@ -4,4 +4,5 @@
__pycache__
/instance
/src/instance
/.local
/.local
.secrets

View file

@ -9,9 +9,11 @@ from alembic import context
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# for some reason, flask-migrate decided it was a good idea to override our
# logging config whenever we call any of their functions >:(
#
# this line has been commented out in order to remove this "feature"
# fileConfig(config.config_file_name)
logger = logging.getLogger('alembic.env')

View file

@ -0,0 +1,57 @@
"""Fix tag/field/category links
Revision ID: 067ee615c967
Revises: f34868d85e32
Create Date: 2025-11-28 12:03:26.164002
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '067ee615c967'
down_revision = 'f34868d85e32'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('cat_to_task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('cat', sa.Integer(), nullable=False),
sa.Column('task', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['cat'], ['category.id'], name='fk_c2t_cat', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['task'], ['task.id'], name='fk_c2t_task', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_table('field_to_task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('field', sa.Integer(), nullable=False),
sa.Column('task', sa.Integer(), nullable=False),
sa.Column('raw_val', sa.String(length=1024), nullable=False),
sa.ForeignKeyConstraint(['field'], ['field.id'], name='fk_f2t_field', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['task'], ['task.id'], name='fk_f2t_task', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
with op.batch_alter_table('field', schema=None) as batch_op:
batch_op.drop_column('raw_val')
with op.batch_alter_table('task', schema=None) as batch_op:
batch_op.add_column(sa.Column('soft_due', sa.DateTime(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('task', schema=None) as batch_op:
batch_op.drop_column('soft_due')
with op.batch_alter_table('field', schema=None) as batch_op:
batch_op.add_column(sa.Column('raw_val', sa.VARCHAR(length=1024), nullable=False))
op.drop_table('field_to_task')
op.drop_table('cat_to_task')
# ### end Alembic commands ###

View file

@ -0,0 +1,151 @@
"""base
Revision ID: 6db9f9b83a8d
Revises:
Create Date: 2025-11-28 11:36:25.704433
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6db9f9b83a8d'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('namespace',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=64), nullable=False),
sa.Column('description', sa.String(), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('pwned_password',
sa.Column('hash', sa.String(), nullable=False),
sa.Column('count', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('hash')
)
op.create_table('user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(length=32), nullable=False),
sa.Column('display_name', sa.String(length=256), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('enabled', sa.Boolean(), nullable=False),
sa.Column('administrator', sa.Boolean(), nullable=False),
sa.Column('pr_sub', sa.String(), nullable=False),
sa.Column('pr_obj', sa.String(), nullable=False),
sa.Column('pr_dep', sa.String(), nullable=False),
sa.Column('pr_ind', sa.String(), nullable=False),
sa.Column('pr_ref', sa.String(), nullable=False),
sa.Column('pr_plr', sa.Boolean(), nullable=False),
sa.Column('password', sa.String(length=256), nullable=False),
sa.Column('salt', sa.String(length=256), nullable=False),
sa.Column('hash_params', sa.String(length=256), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('username')
)
op.create_table('namespace_role',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=32), nullable=False),
sa.Column('permissions', sa.Integer(), nullable=False),
sa.Column('perms_deny', sa.Integer(), nullable=False),
sa.Column('priority', sa.Integer(), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('namespace', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['namespace'], ['namespace.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('sign_up_code',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=32), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('grants_admin', sa.Boolean(), nullable=False),
sa.Column('created_by', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('code')
)
op.create_table('task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=64), nullable=False),
sa.Column('description', sa.String(), nullable=False),
sa.Column('due', sa.DateTime(), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('complete', sa.Boolean(), nullable=False),
sa.Column('namespace', sa.Integer(), nullable=False),
sa.Column('owner', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['namespace'], ['namespace.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['owner'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_table('user_role',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('is_self', sa.Boolean(), nullable=False),
sa.Column('name', sa.String(length=32), nullable=False),
sa.Column('permissions', sa.Integer(), nullable=False),
sa.Column('perms_deny', sa.Integer(), nullable=False),
sa.Column('priority', sa.Integer(), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('user', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['user'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_table('namespace_invite_code',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('code', sa.String(length=32), nullable=False),
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.Column('created_by', sa.Integer(), nullable=False),
sa.Column('for_role', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['created_by'], ['user.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['for_role'], ['namespace_role.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('code')
)
op.create_table('task_to_namespace',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('namespace', sa.Integer(), nullable=False),
sa.Column('task', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['namespace'], ['namespace.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['task'], ['task.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_table('user_to_namespace_role',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user', sa.Integer(), nullable=False),
sa.Column('role', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['role'], ['namespace_role.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_table('user_to_user_role',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user', sa.Integer(), nullable=False),
sa.Column('role', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['role'], ['user_role.id'], ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user'], ['user.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('user_to_user_role')
op.drop_table('user_to_namespace_role')
op.drop_table('task_to_namespace')
op.drop_table('namespace_invite_code')
op.drop_table('user_role')
op.drop_table('task')
op.drop_table('sign_up_code')
op.drop_table('namespace_role')
op.drop_table('user')
op.drop_table('pwned_password')
op.drop_table('namespace')
# ### end Alembic commands ###

View file

@ -11,7 +11,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '945140c35257'
down_revision = None
down_revision = '6db9f9b83a8d'
branch_labels = None
depends_on = None

View file

@ -1,7 +1,7 @@
import logging
from typing import Any
from flask import Flask, Request, render_template, url_for
from flask_migrate import Migrate
from flask_migrate import Migrate, upgrade
from taskflower.auth import taskflower_login_manager
from taskflower.auth.startup import startup_checks
@ -18,7 +18,14 @@ from taskflower.web import web_base
from taskflower.tools.hibp import hibp_bp
logging.basicConfig(level=logging.INFO)
def _init_logs():
logging.basicConfig(
format='[%(levelname)-8s] %(name)-24s: %(message)s',
level=logging.INFO,
force=True
)
_init_logs()
log = logging.getLogger(__name__)
@ -48,7 +55,11 @@ APIBase.register(app)
# print(f'Routes: \n{"\n".join([str(r) for r in APIBase.routes])}')
with app.app_context():
db.create_all()
# db.create_all()
log.info('Checking for and applying database migrations')
upgrade()
_init_logs() # flask-migrate overrides our logging config >:(
log = logging.getLogger(__name__)
log.info('Running startup checks...')
res = startup_checks(db)
@ -109,6 +120,9 @@ def template_utility_fns():
**new_args # pyright:ignore[reportAny]
)
def t_len(ls: list[Any]) -> int: # pyright:ignore[reportExplicitAny]
return len(ls)
return dict(
literal_call=literal_call,
reltime=render_reltime,
@ -116,7 +130,8 @@ def template_utility_fns():
can_generate_sign_up_codes=can_generate_sign_up_codes,
render_as_markdown=render_as_markdown,
icon=icon,
cur_page_with_variables=cur_page_with_variables
cur_page_with_variables=cur_page_with_variables,
len=t_len
)
@app.route('/')

View file

@ -27,6 +27,7 @@ class NamespacePermissionType(IntFlag):
DELETE_ALL_TASKS = (1 << 10)
EDIT_TAGS = (1 << 12)
EDIT_FIELDS = (1 << 13)
EDIT_CATEGORIES = (1 << 15)
EDIT_ROLES = (1 << 11)
RENAME = (1 << 14)
ADMINISTRATE = (1 << 1)
@ -71,6 +72,7 @@ SELF_NAMESPACE_PERMISSIONS = (
| NamespacePermissionType.DELETE_ALL_TASKS
| NamespacePermissionType.EDIT_TAGS
| NamespacePermissionType.EDIT_FIELDS
| NamespacePermissionType.EDIT_CATEGORIES
| NamespacePermissionType.EDIT_ROLES
| NamespacePermissionType.RENAME
)
@ -78,7 +80,7 @@ SELF_NAMESPACE_PERMISSIONS = (
def user_friendly_name(perm: NamespacePermissionType|UserPermissionType):
match perm:
case NamespacePermissionType.READ:
return 'See the namespace.'
return 'See the Zone.'
case NamespacePermissionType.CREATE_TASKS_IN:
return 'Create tasks.'
case NamespacePermissionType.COMPLETE_ALL_TASKS:
@ -86,15 +88,19 @@ def user_friendly_name(perm: NamespacePermissionType|UserPermissionType):
case NamespacePermissionType.UNCOMPLETE_ALL_TASKS:
return 'Uncomplete tasks.'
case NamespacePermissionType.EDIT_ALL_TASKS:
return 'Edit tasks, including adding/removing tags and fields.'
return 'Edit tasks, including adding/removing tags, fields, and categories.'
case NamespacePermissionType.DELETE_ALL_TASKS:
return 'Delete tasks.'
case NamespacePermissionType.EDIT_TAGS:
return 'Create new tags and delete existing ones.'
case NamespacePermissionType.EDIT_FIELDS:
return 'Create new fields and delete existing ones.'
case NamespacePermissionType.EDIT_CATEGORIES:
return 'Create new categories and delete existing ones.'
case NamespacePermissionType.EDIT_ROLES:
return 'Manage roles lower than this one.'
case NamespacePermissionType.RENAME:
return 'Change the name and description of this Zone.'
case NamespacePermissionType.ADMINISTRATE:
return 'Administrator. Users with this role bypass all other checks.'
case UserPermissionType.READ_PROFILE:

View file

@ -4,7 +4,9 @@ from types import NoneType
from typing import Self
from sqlalchemy import ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from taskflower.db import db
from taskflower.db import db, db_fetch_by_id
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.task import Task
from taskflower.types import AssertType, CheckType
from taskflower.types.either import Either, Left, Right
from taskflower.util.time import destringify_dt, stringify_dt
@ -28,6 +30,14 @@ class FieldType(Enum):
FLOAT = float
DATETIME = datetime
@classmethod
def from_str(cls, val: str) -> Either[Exception, FieldType]:
for k, v in cls._member_map_.items():
if k.upper() == val.upper():
return CheckType(FieldType)(v)
return Left(KeyError(f'FieldType key {val} not found!'))
def _check_len(v: str) -> Either[Exception, str]:
if len(v) <= FIELD_VAL_MAX_LEN:
return Right(v)
@ -41,6 +51,15 @@ class Tag(db.Model):
name : Mapped[str] = mapped_column(String(TAG_FIELD_NAME_MAX_LEN))
namespace : Mapped[int] = mapped_column(Integer, ForeignKey('namespace.id', ondelete='CASCADE', name='fk_tag_ns'))
class TagToTask(db.Model):
__tablename__: str = 'tag_to_task'
id : Mapped[int] = mapped_column(Integer, primary_key=True)
tag : Mapped[int] = mapped_column(Integer, ForeignKey('tag.id', ondelete='CASCADE', name='fk_t2t_tag'))
task : Mapped[int] = mapped_column(Integer, ForeignKey('task.id', ondelete='CASCADE', name='fk_t2t_task'))
class TaskCategory(db.Model):
__tablename__: str = 'category'
@ -48,26 +67,73 @@ class TaskCategory(db.Model):
name : Mapped[str] = mapped_column(String(TAG_FIELD_NAME_MAX_LEN))
namespace : Mapped[int] = mapped_column(Integer, ForeignKey('namespace.id', ondelete='CASCADE', name='fk_cat_ns'))
class CatToTask(db.Model):
__tablename__: str = 'cat_to_task'
id : Mapped[int] = mapped_column(Integer, primary_key=True)
cat : Mapped[int] = mapped_column(Integer, ForeignKey('category.id', ondelete='CASCADE', name='fk_c2t_cat'))
task : Mapped[int] = mapped_column(Integer, ForeignKey('task.id', ondelete='CASCADE', name='fk_c2t_task'))
class Field(db.Model):
__tablename__: str = 'field'
id : Mapped[int] = mapped_column(Integer, primary_key=True)
name : Mapped[str] = mapped_column(String(TAG_FIELD_NAME_MAX_LEN))
raw_val : Mapped[str] = mapped_column(String(FIELD_VAL_MAX_LEN))
raw_dtype : Mapped[str] = mapped_column(String(FIELD_TYPE_NAME_MAX_LEN))
namespace : Mapped[int] = mapped_column(Integer, ForeignKey('namespace.id', ondelete='CASCADE', name='fk_field_ns'))
@property
def dtype(self) -> FieldType:
for k, v in FieldType._value2member_map_.items(): # pyright:ignore[reportAny]
for k, v in FieldType._member_map_.items():
if self.raw_dtype == k:
return AssertType(FieldType)(v)
return FieldType.INVALID
@classmethod
def from_data(
cls,
name: str,
dtype: FieldType,
ns: Namespace
) -> Either[Exception, Field]:
if dtype == FieldType.INVALID:
return Left(
TypeError('Can\'t create a Field with an invalid data-type!')
)
return Right(cls(
name=name, # pyright:ignore[reportCallIssue]
raw_dtype=dtype.name, # pyright:ignore[reportCallIssue]
namespace=ns.id # pyright:ignore[reportCallIssue]
))
class FieldToTask(db.Model):
__tablename__: str = 'field_to_task'
id: Mapped[int] = mapped_column(Integer, primary_key=True)
field: Mapped[int] = mapped_column(Integer, ForeignKey('field.id', ondelete='CASCADE', name='fk_f2t_field'))
task: Mapped[int] = mapped_column(Integer, ForeignKey('task.id', ondelete='CASCADE', name='fk_f2t_task'))
raw_val: Mapped[str] = mapped_column(String(FIELD_VAL_MAX_LEN))
@property
def dtype(self) -> Either[Exception, FieldType]:
return db_fetch_by_id(
Field,
self.field,
db
).map(
lambda fld: fld.dtype
)
@property
def val(self) -> Either[Exception, FieldTypeAlias]:
match self.dtype:
if isinstance(self.dtype, Left):
return Left(self.dtype.val)
match self.dtype.assert_right().val:
case FieldType.INVALID:
return Left(TypeError(
'Field dtype is `FieldType.INVALID`!'
@ -106,9 +172,14 @@ class Field(db.Model):
)
def val_as_type[T](self, t: type[T]) -> Either[Exception, T]:
if self.dtype.value is not t:
if isinstance(self.dtype, Left):
return Left(self.dtype.val)
dtype = self.dtype.assert_right().val
if dtype.value is not t:
return Left(TypeError(
f'`Field` dtype is `{self.dtype.name}`, which corresponds to python type `{self.dtype.value.__name__}`, not `{t.__name__}`!'
f'`Field` dtype is `{dtype.name}`, which corresponds to python type `{dtype.value.__name__}`, not `{t.__name__}`!'
))
return self.val.flat_map(
@ -116,24 +187,22 @@ class Field(db.Model):
)
@classmethod
def from_vals(
def from_data(
cls,
name: str,
dtype: FieldType,
val: FieldTypeAlias,
namespace_id: int
field: Field,
task: Task,
val: FieldTypeAlias
) -> Either[Exception, Self]:
def _mkobj(raw_data: str) -> Either[Exception, Self]:
return _check_len(raw_data).map(
lambda v: cls(
name = name, # pyright:ignore[reportCallIssue]
raw_val = v, # pyright:ignore[reportCallIssue]
raw_dtype = dtype.name, # pyright:ignore[reportCallIssue]
namespace = namespace_id # pyright:ignore[reportCallIssue]
field = field.id, # pyright:ignore[reportCallIssue]
task = task.id, # pyright:ignore[reportCallIssue]
raw_val = v, # pyright:ignore[reportCallIssue]
)
)
match dtype:
match field.dtype:
case FieldType.INVALID:
return Left(KeyError(
'Can\'t insert a value of type `FieldType.INVALID` into the database!'
@ -159,13 +228,6 @@ class Field(db.Model):
lambda v: _mkobj(stringify_dt(v))
)
class TagToTask(db.Model):
__tablename__: str = 'tag_to_task'
id : Mapped[int] = mapped_column(Integer, primary_key=True)
tag : Mapped[int] = mapped_column(Integer, ForeignKey('tag.id', ondelete='CASCADE', name='fk_t2t_tag'))
task : Mapped[int] = mapped_column(Integer, ForeignKey('task.id', ondelete='CASCADE', name='fk_t2t_task'))
# Run a check on import to catch invalid `FieldType` members.
for v in FieldType._member_names_:

View file

@ -15,6 +15,7 @@ class Task(db.Model):
name : Mapped[str] = mapped_column(String(64))
description : Mapped[str] = mapped_column(String)
soft_due : Mapped[datetime|None] = mapped_column(DateTime(timezone=False), nullable=True)
due : Mapped[datetime] = mapped_column(DateTime(timezone=False))
created : Mapped[datetime] = mapped_column(DateTime(timezone=False), default=now)
complete : Mapped[bool] = mapped_column(Boolean, default=False)

View file

@ -19,6 +19,7 @@ class FormCreatesObject[T](Form):
``AuthorizationError`` if the action is not authorized.
'''
raise NotImplementedError
FormCObj = FormCreatesObject
class FormCreatesObjectWithUser[T](Form):
''' Trait that indicates that this ``Form`` cna be used to create an object,
@ -36,7 +37,8 @@ class FormCreatesObjectWithUser[T](Form):
create an object with the specified parameters.
'''
raise NotImplementedError
FormCObjUser = FormCreatesObjectWithUser
class FormEditsObjectWithUser[T](Form):
''' Trait that indicates that this ``Form`` can be used to edit an object
if provided with a ``User`` object.
@ -54,6 +56,7 @@ class FormEditsObjectWithUser[T](Form):
edit an object with the specified parameters.
'''
raise NotImplementedError()
FormEObjUser = FormEditsObjectWithUser
class FormCreatesObjectWithUserAndNamespace[T](Form):
''' Trait that indicates that this ``Form`` can be used to create an object,
@ -72,7 +75,8 @@ class FormCreatesObjectWithUserAndNamespace[T](Form):
create an object with the specified parameters.
'''
raise NotImplementedError
FormCObjUserNS = FormCreatesObjectWithUserAndNamespace
class FormEditsObjectWithUserAndNamespace[T](Form):
''' Trait that indicates that this ``Form`` can be used to edit an object
if provided with a ``User`` object and a ``Namespace``.
@ -91,6 +95,7 @@ class FormEditsObjectWithUserAndNamespace[T](Form):
edit an object with the specified parameters.
'''
raise NotImplementedError()
FormEObjUserNS = FormEditsObjectWithUserAndNamespace
class FormCEObjUserNS[T](
FormCreatesObjectWithUserAndNamespace[T],

View file

@ -0,0 +1,72 @@
from typing import final, override
from wtforms import Field, Form, StringField, ValidationError
from wtforms.validators import DataRequired, Length
from taskflower.auth.permission import NPT
from taskflower.auth.permission.checks import assert_user_perms_on_namespace
from taskflower.db import db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.tag import TAG_FIELD_NAME_MAX_LEN, TaskCategory
from taskflower.db.model.user import User
from taskflower.form import FormCObjUser
from taskflower.types import CheckType, ann
from taskflower.types.either import Either, Left
@final
class Unique:
def __init__(self, ns: Namespace):
self._ns = ns
def __call__(self, _: Form, fld: Field):
data = CheckType(str)(
fld.data # pyright:ignore[reportAny]
).val_or_raise(
lambda exc: ValidationError(f'Category name is of the wrong type: {str(exc)}!')
)
if db.session.query(
TaskCategory
).filter(
TaskCategory.namespace == self._ns.id
).filter(
TaskCategory.name == data
).one_or_none() is not None:
raise ValidationError('A category with that name already exists!')
def get_category_form_for(ns: Namespace):
class CategoryForm(FormCObjUser[TaskCategory]):
name: StringField = StringField(
'Category Name',
[
Length(
1,
TAG_FIELD_NAME_MAX_LEN,
f'Category name must be between 1 and {TAG_FIELD_NAME_MAX_LEN} characters.'
),
DataRequired(),
Unique(ns)
]
)
@override
def create_object(
self,
current_user: User
) -> Either[Exception, TaskCategory]:
if self.validate():
return assert_user_perms_on_namespace(
current_user,
ns,
NPT.EDIT_CATEGORIES
).map(
lambda _: TaskCategory(
name=ann(self.name.data), # pyright:ignore[reportCallIssue]
namespace=ns.id # pyright:ignore[reportCallIssue]
)
)
else:
return Left(
ValidationError('Category form failed validation!')
)
return CategoryForm

View file

@ -0,0 +1,90 @@
from typing import final, override
from wtforms import Field, Form, RadioField, StringField, ValidationError
from wtforms.validators import DataRequired, Length
from taskflower.auth.permission import NPT
from taskflower.auth.permission.checks import assert_user_perms_on_namespace
from taskflower.db import db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.tag import TAG_FIELD_NAME_MAX_LEN, Field as TagField, FieldType
from taskflower.db.model.user import User
from taskflower.form import FormCObjUser
from taskflower.types import CheckType, ann
from taskflower.types.either import Either, Left
@final
class Unique:
def __init__(self, ns: Namespace):
self._ns = ns
def __call__(self, _: Form, fld: Field):
data = CheckType(str)(
fld.data # pyright:ignore[reportAny]
).val_or_raise(
lambda exc: ValidationError(f'Field name is of the wrong type: {str(exc)}!')
)
if db.session.query(
TagField
).filter(
TagField.namespace == self._ns.id
).filter(
TagField.name == data
).one_or_none() is not None:
raise ValidationError('A field with that name already exists!')
def _gen_field_choices():
return [
(k, k.title())
for k, _ in FieldType._member_map_.items()
if k != 'INVALID'
]
def get_field_form_for(ns: Namespace):
class TagFieldForm(FormCObjUser[TagField]):
name: StringField = StringField(
'Tag Name',
[
Length(
1,
TAG_FIELD_NAME_MAX_LEN,
f'Tag name must be between 1 and {TAG_FIELD_NAME_MAX_LEN} characters.'
),
DataRequired(),
Unique(ns)
]
)
data_type: RadioField = RadioField(
'Field data type',
[
DataRequired()
],
choices=_gen_field_choices
)
@override
def create_object(
self,
current_user: User
) -> Either[Exception, TagField]:
if self.validate():
return assert_user_perms_on_namespace(
current_user,
ns,
NPT.EDIT_TAGS
).flat_map(
lambda _: CheckType(str)(self.data_type.data) # pyright:ignore[reportAny]
).flat_map(
lambda dtype_name: FieldType.from_str(dtype_name)
).flat_map(
lambda dtype: TagField.from_data(
ann(self.name.data),
dtype,
ns
)
)
else:
return Left(
ValidationError('Category form failed validation!')
)
return TagFieldForm

View file

@ -0,0 +1,72 @@
from typing import final, override
from wtforms import Field, Form, StringField, ValidationError
from wtforms.validators import DataRequired, Length
from taskflower.auth.permission import NPT
from taskflower.auth.permission.checks import assert_user_perms_on_namespace
from taskflower.db import db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.tag import TAG_FIELD_NAME_MAX_LEN, Tag
from taskflower.db.model.user import User
from taskflower.form import FormCObjUser
from taskflower.types import CheckType, ann
from taskflower.types.either import Either, Left
@final
class Unique:
def __init__(self, ns: Namespace):
self._ns = ns
def __call__(self, _: Form, fld: Field):
data = CheckType(str)(
fld.data # pyright:ignore[reportAny]
).val_or_raise(
lambda exc: ValidationError(f'Tag name is of the wrong type: {str(exc)}!')
)
if db.session.query(
Tag
).filter(
Tag.namespace == self._ns.id
).filter(
Tag.name == data
).one_or_none() is not None:
raise ValidationError('A tag with that name already exists!')
def get_tag_form_for(ns: Namespace):
class TagForm(FormCObjUser[Tag]):
name: StringField = StringField(
'Tag Name',
[
Length(
1,
TAG_FIELD_NAME_MAX_LEN,
f'Tag name must be between 1 and {TAG_FIELD_NAME_MAX_LEN} characters.'
),
DataRequired(),
Unique(ns)
]
)
@override
def create_object(
self,
current_user: User
) -> Either[Exception, Tag]:
if self.validate():
return assert_user_perms_on_namespace(
current_user,
ns,
NPT.EDIT_TAGS
).map(
lambda _: Tag(
name=ann(self.name.data), # pyright:ignore[reportCallIssue]
namespace=ns.id # pyright:ignore[reportCallIssue]
)
)
else:
return Left(
ValidationError('Category form failed validation!')
)
return TagForm

View file

@ -0,0 +1,97 @@
{% macro link_button_elem(obj_type, action_type, enabled, url, btn_icon, extra_cls='') %}
<td class="pad-even">
{% if enabled %}
<a
class="icon-only-btn {{extra_cls}}"
aria-label="{{action_type}} {{obj_type}}"
href="{{url}}"
>{{btn_icon|safe}}</a>
{% else %}
<a
class="icon-only-btn disabled"
aria-label="Can't {{action_type}} {{obj_type}}"
title="You don't have permission to {{action_type}} this {{obj_type}}"
>{{icon('not-allowed')|safe}}</a>
{% endif %}
</td>
{% endmacro %}
{% macro obj_entry(ns, obj, obj_type, edit_endpoint, delete_endpoint, tab, entries) %}
<tr>
{{
link_button_elem(
obj_type,
'edit',
obj.can_edit,
url_for(
edit_endpoint,
id=ns.id,
tid=obj.id,
next=cur_page_with_variables(
request,
active_tab=tab
)
),
icon('cog')
)
}}
{{
link_button_elem(
obj_type,
'delete',
obj.can_edit,
url_for(
delete_endpoint,
id=ns.id,
tid=obj.id,
next=cur_page_with_variables(
request,
active_tab=tab
)
),
icon('delete'),
'red'
)
}}
{% for entry in entries %}
<td>{{entry}}</td>
{% endfor %}
</tr>
{% endmacro %}
{% macro obj_colgroup(entries) %}
<colgroup>
<col span="1" style="width: 0;"/>
<col span="1" style="width: 0;"/>
{% for entry in entries %}
<col span="1"/>
{% endfor %}
</colgroup>
{% endmacro %}
{% macro obj_header(entries) %}
<tr>
<th>Edit</th>
<th>Del</th>
{% for entry in entries %}
<th>{{entry}}</th>
{% endfor %}
</tr>
{% endmacro %}
{% macro obj_add_row(obj_type, can_add, url, entries) %}
<tr>
<td colspan="{{len(entries) + 2}}">
{% if can_add %}
<a
class="link-btn icon-btn"
href="{{url}}"
>{{icon('add')|safe}} Add a new {{obj_type}}</a>
{% else %}
<a
class="link-btn icon-btn disabled"
>{{icon('not-allowed')|safe}} Add a new {{obj_type}}</a>
{% endif %}
</td>
</tr>
{% endmacro %}

View file

@ -43,12 +43,12 @@
{% endif %}
<table class="list">
<colgroup>
<col span="1" style="width: 0;"/>
<col span="1"/>
<col span="1"/>
</colgroup>
<tbody>
<colgroup>
<col span="1" style="width: 0;"/>
<col span="1"/>
<col span="1"/>
</colgroup>
<tr>
<th>Edit</th>
<th>Display Name</th>

View file

@ -0,0 +1,129 @@
{% from "_formhelpers.html" import render_field with context %}
{% from "namespace/admin/_helpers.html" import obj_entry, obj_header, obj_colgroup, obj_add_row with context %}
{% macro cat_entry(ns, cat) %}
{{
obj_entry(
ns,
cat,
'category',
'web.namespace.category.edit',
'web.namespace.category.delete',
'tags',
[
cat.name
]
)
}}
{% endmacro %}
{% macro tag_entry(ns, tag)%}
{{
obj_entry(
ns,
tag,
'tag',
'web.namespace.tag.edit',
'web.namespace.tag.delete',
'tags',
[
tag.name
]
)
}}
{% endmacro %}
{% macro field_entry(ns, field) %}
{{
obj_entry(
ns,
field,
'field',
'web.namespace.field.edit',
'web.namespace.field.delete',
'tags',
[
field.name,
field.field_type
]
)
}}
{% endmacro %}
<h1>Tags, Fields & Categories</h1>
<h2>Tags</h2>
<table class="list">
{{obj_colgroup(['Name'])}}
<tbody>
{{obj_header(['Name'])}}
{{obj_add_row(
'tag',
can_create_tag,
url_for(
'web.namespace.tag.new',
id=ns.id,
next=cur_page_with_variables(
request,
active_tab='tags'
)
),
['Name']
)}}
{% for tag in tags %}
{{tag_entry(ns, tag)}}
{% endfor %}
</tbody>
</table>
<hr style="margin-top: 3rem;"/>
<h2>Fields</h2>
<table class="list">
{{obj_colgroup(['Name', 'Type'])}}
<tbody>
{{obj_header(['Name', 'Type'])}}
{{obj_add_row(
'field',
can_create_field,
url_for(
'web.namespace.field.new',
id=ns.id,
next=cur_page_with_variables(
request,
active_tab='tags'
)
),
['Name', 'Type']
)}}
{% for field in fields %}
{{field_entry(ns, field)}}
{% endfor %}
</tbody>
</table>
<hr style="margin-top: 3rem;"/>
<h2>Categories</h2>
<table class="list">
{{obj_colgroup(['Name'])}}
<tbody>
{{obj_header(['Name'])}}
{{obj_add_row(
'category',
can_create_cat,
url_for(
'web.namespace.category.new',
id=ns.id,
next=cur_page_with_variables(
request,
active_tab='tags'
)
),
['Name']
)}}
{% for cat in cats %}
{{cat_entry(ns, cat)}}
{% endfor %}
</tbody>
</table>

View file

@ -0,0 +1,16 @@
{% extends "main.html" %}
{% from "_formhelpers.html" import render_field %}
{% block head_extras %}
<link rel="stylesheet" href="{{ url_for('static', filename='forms.css') }}"
{% endblock %}
{% block main_content %}
<form class="default-form" id="create-category-form" method="POST">
<h1>Create a Category</h1>
<dl>
{{ render_field(form.name) }}
</dl>
<button class="icon-btn green" id="submit-form" type="submit">{{icon('add')|safe}}CREATE CATEGORY</button>
</form>
{% endblock %}

View file

@ -0,0 +1,17 @@
{% extends "main.html" %}
{% from "_formhelpers.html" import render_field %}
{% block head_extras %}
<link rel="stylesheet" href="{{ url_for('static', filename='forms.css') }}"
{% endblock %}
{% block main_content %}
<form class="default-form" id="create-field-form" method="POST">
<h1>Create a Field</h1>
<dl>
{{ render_field(form.name) }}
{{ render_field(form.data_type) }}
</dl>
<button class="icon-btn green" id="submit-form" type="submit">{{icon('add')|safe}}CREATE FIELD</button>
</form>
{% endblock %}

View file

@ -7,11 +7,11 @@
{% block main_content %}
<form class="default-form" id="create-namespace-form" method="POST">
<h1>Create a Namespace</h1>
<h1>Create a Zone</h1>
<dl>
{{ render_field(form.name) }}
{{ render_field(form.description) }}
</dl>
<button class="icon-btn green" id="submit-form" type="submit">{{icon('add')|safe}}CREATE NAMESPACE</button>
<button class="icon-btn green" id="submit-form" type="submit">{{icon('add')|safe}}CREATE ZONE</button>
</form>
{% endblock %}

View file

@ -0,0 +1,16 @@
{% extends "main.html" %}
{% from "_formhelpers.html" import render_field %}
{% block head_extras %}
<link rel="stylesheet" href="{{ url_for('static', filename='forms.css') }}"
{% endblock %}
{% block main_content %}
<form class="default-form" id="create-tag-form" method="POST">
<h1>Create a Tag</h1>
<dl>
{{ render_field(form.name) }}
</dl>
<button class="icon-btn green" id="submit-form" type="submit">{{icon('add')|safe}}CREATE TAG</button>
</form>
{% endblock %}

View file

@ -73,7 +73,7 @@ class CheckType[T]:
def __call__(self, x: T|Any) -> Either[Exception, T]: # pyright:ignore[reportExplicitAny]
try:
org = get_origin(self._t) or NoneType
org = get_origin(self._t) or self._t
if isinstance(x, org):
return Right[Exception, T](x)
else:

View file

@ -71,6 +71,19 @@ class Either[L, R](ABC):
) -> X:
raise NotImplementedError
@abstractmethod
def val_or_raise(
self,
exc: Callable[[L], Exception]|None=None
) -> R:
''' Returns ``R`` if this is a ``Right``; otherwise, raise an exception.
If ``exc`` is defined, it will be raised in case of a ``Left``. If
not and if this is a ``Left[Exception, *]``, then its ``val`` will
be raised. Otherwise, ``AssertionError(str(val))`` will be raised.
'''
raise NotImplementedError
@staticmethod
def do_assert(
x: bool,
@ -149,6 +162,18 @@ class Left[L, R](Either[L, R]):
) -> X:
return if_not(self.val)
@override
def val_or_raise(
self,
exc: Callable[[L], Exception]|None=None
) -> R:
if exc:
raise exc(self.val)
elif isinstance(self.val, Exception):
raise self.val
else:
raise AssertionError(str(self.val))
@override
def __eq__(self, value: object) -> bool:
if isinstance(value, Left):
@ -222,6 +247,13 @@ class Right[L, R](Either[L, R]):
) -> X:
return if_okay(self.val)
@override
def val_or_raise(
self,
exc: Callable[[L], Exception]|None=None
) -> R:
return self.val
@override
def __eq__(self, value: object) -> bool:
if isinstance(value, Right):

View file

@ -17,7 +17,10 @@ from taskflower.types.either import Either, Left, Right, gather_successes
from taskflower.types.option import Option
from taskflower.web.errors import ResponseErrorNotFound, response_from_exception
from taskflower.web.namespace.admin import web_namespace_admin
from taskflower.web.namespace.category import web_namespace_cat
from taskflower.web.namespace.field import web_namespace_field
from taskflower.web.namespace.roles import web_namespace_roles
from taskflower.web.namespace.tag import web_namespace_tag
web_namespace = Blueprint(
'namespace',
@ -28,6 +31,9 @@ web_namespace = Blueprint(
web_namespace.register_blueprint(web_namespace_roles)
web_namespace.register_blueprint(web_namespace_admin)
web_namespace.register_blueprint(web_namespace_tag)
web_namespace.register_blueprint(web_namespace_cat)
web_namespace.register_blueprint(web_namespace_field)
@web_namespace.app_context_processor
def namespace_processor():

View file

@ -9,6 +9,7 @@ from taskflower.auth.violations import check_for_auth_err_and_report
from taskflower.db import db, db_fetch_by_id, do_commit
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.role import NamespaceRole
from taskflower.db.model.tag import Field, Tag, TaskCategory
from taskflower.db.model.user import User
from taskflower.form.namespace import namespace_edit_form_for_ns
from taskflower.types import assert_usr
@ -45,6 +46,67 @@ def render_general_page(
)
)
def render_tags_page(
ns: Namespace,
cur_usr: User
) -> Either[Exception, str]:
can_edit_tags = check_user_perms_on_namespace(
cur_usr,
ns,
NPT.EDIT_TAGS
)
can_edit_fields = check_user_perms_on_namespace(
cur_usr,
ns,
NPT.EDIT_FIELDS
)
can_edit_cats = check_user_perms_on_namespace(
cur_usr,
ns,
NPT.EDIT_CATEGORIES
)
tags = [
{'name': t.name, 'id': t.id, 'can_edit': can_edit_tags}
for t in db.session.query(
Tag
).filter(
Tag.namespace == ns.id
).all()
]
fields = [
{
'name': f.name,
'id': f.id,
'field_type': f.dtype.name.title(),
'can_edit': can_edit_fields
}
for f in db.session.query(
Field
).filter(
Field.namespace == ns.id
).all()
]
cats = [
{'name': c.name, 'id': c.id, 'can_edit': can_edit_cats}
for c in db.session.query(
TaskCategory
).filter(
TaskCategory.namespace == ns.id
).all()
]
return Right(render_template(
'namespace/admin/tags_fields_cats_page.html',
can_create_tag=can_edit_tags,
can_create_field=can_edit_fields,
can_create_cat=can_edit_cats,
tags=tags,
fields=fields,
cats=cats,
ns=ns
))
@web_namespace_admin.route('/')
@login_required
def settings(id: int):
@ -60,6 +122,15 @@ def settings(id: int):
Namespace,
id,
db
).flat_map(
lambda ns: assert_user_perms_on_namespace(
cur_usr,
ns,
NPT.READ,
'Open namespace settings'
)
).lside_effect(
check_for_auth_err_and_report
).map(
lambda ns: (
ns,
@ -76,12 +147,19 @@ def settings(id: int):
).and_then(
lambda val: val,
lambda exc: None
),
render_tags_page(
ns,
cur_usr
).and_then(
lambda val: val,
lambda exc: None
)
)
)
if isinstance(res, Right):
ns, role_page, general_page = res.val
ns, role_page, general_page, tags_page = res.val
tfc_page = '<h1>Tags, Fields and Categories</h1>'
if not active_tab:
active_tab = (
@ -97,7 +175,7 @@ def settings(id: int):
return render_template(
'namespace/admin/base.html',
general_page=general_page,
tfc_page='<h1>Tags, Fields and Categories</h1>',
tfc_page=tags_page,
role_page=role_page,
namespace_id=ns.id,
namespace_name=ns.name,

View file

@ -0,0 +1,124 @@
from flask import Blueprint, redirect, render_template, request, url_for
from flask_login import current_user, login_required # pyright:ignore[reportUnknownVariableType,reportMissingTypeStubs]
from taskflower.auth.permission import NPT
from taskflower.auth.permission.checks import assert_user_perms_on_namespace
from taskflower.auth.violations import check_for_auth_err_and_report
from taskflower.db import db, db_fetch_by_id, do_delete
from taskflower.db.helpers import add_to_db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.tag import TaskCategory
from taskflower.form.category import get_category_form_for
from taskflower.types import assert_usr
from taskflower.types.either import Right
from taskflower.web.errors import response_from_exception
from taskflower.web.utils.request import get_next
web_namespace_cat = Blueprint(
'category',
__name__,
url_prefix='/<int:id>/cat'
)
@web_namespace_cat.route('/new', methods=['GET', 'POST'])
@login_required
def new(id: int):
cur_usr = assert_usr(current_user)
next = get_next(
request,
get_ns:=url_for('web.namespace.get', id=id)
).and_then(
lambda val: val,
lambda err: get_ns
)
res = db_fetch_by_id(
Namespace,
id,
db
).flat_map(
lambda ns: assert_user_perms_on_namespace(
cur_usr,
ns,
NPT.EDIT_CATEGORIES,
'Create category'
).lside_effect(
check_for_auth_err_and_report
)
)
if isinstance(res, Right):
ns = res.val
form = get_category_form_for(ns)(request.form)
if request.method == 'POST' and form.validate():
return form.create_object(
cur_usr
).lside_effect(
check_for_auth_err_and_report
).flat_map(
add_to_db
).and_then(
lambda _: redirect(next),
lambda exc: response_from_exception(exc)
)
else:
return render_template(
'namespace/category/new.html',
form=form
)
else:
return response_from_exception(
res.assert_left().val
)
@web_namespace_cat.route('/<int:tid>/edit', methods=['GET', 'POST'])
@login_required
def edit(id: int, tid: int):
next = get_next(
request,
get_ns:=url_for('web.namespace.get', id=id)
).and_then(
lambda val: val,
lambda err: get_ns
)
return redirect(next)
@web_namespace_cat.route('/<int:tid>/delete')
@login_required
def delete(id: int, tid: int):
cur_usr = assert_usr(current_user)
next = get_next(
request,
get_ns:=url_for('web.namespace.get', id=id)
).and_then(
lambda val: val,
lambda err: get_ns
)
return db_fetch_by_id(
Namespace,
id,
db
).flat_map(
lambda ns: assert_user_perms_on_namespace(
cur_usr,
ns,
NPT.EDIT_CATEGORIES,
'Delete category'
).lside_effect(
check_for_auth_err_and_report
).flat_map(
lambda _: db_fetch_by_id(
TaskCategory,
tid,
db
)
).flat_map(
lambda cat: do_delete(cat, db)
)
).and_then(
lambda _: redirect(next),
lambda exc: response_from_exception(exc)
)

View file

@ -0,0 +1,124 @@
from flask import Blueprint, redirect, render_template, request, url_for
from flask_login import current_user, login_required # pyright:ignore[reportUnknownVariableType,reportMissingTypeStubs]
from taskflower.auth.permission import NPT
from taskflower.auth.permission.checks import assert_user_perms_on_namespace
from taskflower.auth.violations import check_for_auth_err_and_report
from taskflower.db import db, db_fetch_by_id, do_delete
from taskflower.db.helpers import add_to_db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.tag import Field
from taskflower.form.field import get_field_form_for
from taskflower.types import assert_usr
from taskflower.types.either import Right
from taskflower.web.errors import response_from_exception
from taskflower.web.utils.request import get_next
web_namespace_field = Blueprint(
'field',
__name__,
url_prefix='/<int:id>/field'
)
@web_namespace_field.route('/new', methods=['GET', 'POST'])
@login_required
def new(id: int):
cur_usr = assert_usr(current_user)
next = get_next(
request,
get_ns:=url_for('web.namespace.get', id=id)
).and_then(
lambda val: val,
lambda err: get_ns
)
res = db_fetch_by_id(
Namespace,
id,
db
).flat_map(
lambda ns: assert_user_perms_on_namespace(
cur_usr,
ns,
NPT.EDIT_FIELDS,
'Create field'
).lside_effect(
check_for_auth_err_and_report
)
)
if isinstance(res, Right):
ns = res.val
form = get_field_form_for(ns)(request.form)
if request.method == 'POST' and form.validate():
return form.create_object(
cur_usr
).lside_effect(
check_for_auth_err_and_report
).flat_map(
add_to_db
).and_then(
lambda _: redirect(next),
lambda exc: response_from_exception(exc)
)
else:
return render_template(
'namespace/field/new.html',
form=form
)
else:
return response_from_exception(
res.assert_left().val
)
@web_namespace_field.route('/<int:tid>/edit', methods=['GET', 'POST'])
@login_required
def edit(id: int, tid: int):
next = get_next(
request,
get_ns:=url_for('web.namespace.get', id=id)
).and_then(
lambda val: val,
lambda err: get_ns
)
return redirect(next)
@web_namespace_field.route('/<int:tid>/delete')
@login_required
def delete(id: int, tid: int):
cur_usr = assert_usr(current_user)
next = get_next(
request,
get_ns:=url_for('web.namespace.get', id=id)
).and_then(
lambda val: val,
lambda err: get_ns
)
return db_fetch_by_id(
Namespace,
id,
db
).flat_map(
lambda ns: assert_user_perms_on_namespace(
cur_usr,
ns,
NPT.EDIT_FIELDS,
'Delete field'
).lside_effect(
check_for_auth_err_and_report
).flat_map(
lambda _: db_fetch_by_id(
Field,
tid,
db
)
).flat_map(
lambda cat: do_delete(cat, db)
)
).and_then(
lambda _: redirect(next),
lambda exc: response_from_exception(exc)
)

View file

@ -0,0 +1,124 @@
from flask import Blueprint, redirect, render_template, request, url_for
from flask_login import current_user, login_required # pyright:ignore[reportUnknownVariableType,reportMissingTypeStubs]
from taskflower.auth.permission import NPT
from taskflower.auth.permission.checks import assert_user_perms_on_namespace
from taskflower.auth.violations import check_for_auth_err_and_report
from taskflower.db import db, db_fetch_by_id, do_delete
from taskflower.db.helpers import add_to_db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.tag import Tag
from taskflower.form.tag import get_tag_form_for
from taskflower.types import assert_usr
from taskflower.types.either import Right
from taskflower.web.errors import response_from_exception
from taskflower.web.utils.request import get_next
web_namespace_tag = Blueprint(
'tag',
__name__,
url_prefix='/<int:id>/tag'
)
@web_namespace_tag.route('/new', methods=['GET', 'POST'])
@login_required
def new(id: int):
cur_usr = assert_usr(current_user)
next = get_next(
request,
get_ns:=url_for('web.namespace.get', id=id)
).and_then(
lambda val: val,
lambda err: get_ns
)
res = db_fetch_by_id(
Namespace,
id,
db
).flat_map(
lambda ns: assert_user_perms_on_namespace(
cur_usr,
ns,
NPT.EDIT_TAGS,
'Create tag'
).lside_effect(
check_for_auth_err_and_report
)
)
if isinstance(res, Right):
ns = res.val
form = get_tag_form_for(ns)(request.form)
if request.method == 'POST' and form.validate():
return form.create_object(
cur_usr
).lside_effect(
check_for_auth_err_and_report
).flat_map(
add_to_db
).and_then(
lambda _: redirect(next),
lambda exc: response_from_exception(exc)
)
else:
return render_template(
'namespace/tag/new.html',
form=form
)
else:
return response_from_exception(
res.assert_left().val
)
@web_namespace_tag.route('/<int:tid>/edit', methods=['GET', 'POST'])
@login_required
def edit(id: int, tid: int):
next = get_next(
request,
get_ns:=url_for('web.namespace.get', id=id)
).and_then(
lambda val: val,
lambda err: get_ns
)
return redirect(next)
@web_namespace_tag.route('/<int:tid>/delete')
@login_required
def delete(id: int, tid: int):
cur_usr = assert_usr(current_user)
next = get_next(
request,
get_ns:=url_for('web.namespace.get', id=id)
).and_then(
lambda val: val,
lambda err: get_ns
)
return db_fetch_by_id(
Namespace,
id,
db
).flat_map(
lambda ns: assert_user_perms_on_namespace(
cur_usr,
ns,
NPT.EDIT_TAGS,
'Delete tag'
).lside_effect(
check_for_auth_err_and_report
).flat_map(
lambda _: db_fetch_by_id(
Tag,
tid,
db
)
).flat_map(
lambda tag: do_delete(tag, db)
)
).and_then(
lambda _: redirect(next),
lambda exc: response_from_exception(exc)
)