Finish basic namespace / permission implementation

This commit is contained in:
digimint 2025-11-17 23:12:09 -06:00
parent 1204e50c52
commit fe871625f4
Signed by: digimint
GPG key ID: 8DF1C6FD85ABF748
30 changed files with 893 additions and 245 deletions

View file

@ -1,3 +1,4 @@
from typing import Any
from flask import Flask, render_template
from taskflower.auth import taskflower_login_manager
@ -23,6 +24,26 @@ app.register_blueprint(hibp_bp)
APIBase.register(app)
# print(f'Routes: \n{"\n".join([str(r) for r in APIBase.routes])}')
@app.context_processor
def template_utility_fns():
def literal_call(fname: str, *args: Any) -> str: # pyright:ignore[reportAny,reportExplicitAny]
# Generate a hard-coded function call.
# e.g. ``literal_call('set_active', id)`` will convert ``id`` to a
# string ('12345' in this example) and then generate a string:
#
# >>> literal_call('set_active', 12345)
# 'set_active(\'12345\')'
return (
fname + '('
+ ','.join([f'{str(a)}' for a in args]) # pyright:ignore[reportAny]
+ ')'
)
return dict(
literal_call=literal_call
)
@app.route('/')
def index():
return render_template('home.html')

View file

@ -156,8 +156,4 @@ def password_breach_count(password: str) -> Option[int]:
lambda: _get_breach_count_through_local_cache(hash)
)
case HIBPMode.LOCAL_ONLY:
return _get_breach_count_through_local_cache(hash)
def report_incorrect_login_attempt(user: User):
# TODO: Implement account lockout
log.warning(f'Incorrect login attempt for user {user.username}!')
return _get_breach_count_through_local_cache(hash)

View file

@ -0,0 +1,5 @@
def not_found_or_not_authorized(
res_type: str,
res_id: str
) -> str:
return f'{res_type} with id {res_id} was not found, or you don\'t have permission to view it.'

View file

@ -1,12 +1,16 @@
from enum import IntFlag
from typing import override
from taskflower.db import db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.role import NamespaceRole, UserRole, UserToNamespaceRole
from taskflower.db.model.task import Task
from taskflower.db.model.user import User
from taskflower.types.either import Either, Left, Right
from taskflower.types.option import Option, Some
ProtectedResourceType = User|Task|Namespace|NamespaceRole|UserRole
class NamespacePermissionType(IntFlag):
NO_PERMS = 0
@ -147,4 +151,35 @@ def initialize_user_permissions(user: User):
)
).map(
lambda _: user
)
)
class AuthorizationError(Exception):
def __init__(
self,
user : User|Option[User]|None,
resource : ProtectedResourceType,
action : str,
user_perms : NamespacePermissionType,
required_perms : NamespacePermissionType
) -> None:
self.user : Option[User] = Option[User].ensure(user)
self.resource : ProtectedResourceType = resource
self.action : str = action
self.user_perms : NamespacePermissionType = user_perms
self.required_perms : NamespacePermissionType = required_perms
super().__init__(str(self))
@override
def __str__(self) -> str:
user_str = (
f'User ``{self.user.val.display_name}`` (username ``{self.user.val.username}``; id ``{self.user.val.id}``)'
) if isinstance(self.user, Some) else (
'Anonymous User'
)
return (
f'AuthorizationError: {user_str} tried to perform '
+ f'action ``{self.action}`` on {type(self.resource)} object '
+ f'``{str(self.resource)}``.'
+ f'\n - Required permissions: {str(self.required_perms)}'
+ f'\n - User has permissions: {str(self.user_perms)}'
)

View file

@ -0,0 +1,70 @@
from taskflower.auth.permission import AuthorizationError, NamespacePermissionType
from taskflower.auth.permission.lookups import get_user_perms_on_namespace, get_user_perms_on_task
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.user import User
from taskflower.db.model.task import Task
from taskflower.types.either import Either, Left, Right
from taskflower.types.option import Some
def check_user_perms_on_namespace(
usr: User,
ns: Namespace,
perms: NamespacePermissionType
) -> bool:
return (
(nsperms:=get_user_perms_on_namespace(usr, ns) & perms)
== perms
) or NamespacePermissionType.ADMINISTRATE in nsperms
def assert_user_perms_on_namespace(
usr: User,
ns: Namespace,
perms: NamespacePermissionType,
action: str = '[unspecified action]'
) -> Either[Exception, Namespace]:
return (
(
Right(ns)
) if check_user_perms_on_namespace(usr, ns, perms)
else (
Left(AuthorizationError(
Some(usr),
ns,
action,
get_user_perms_on_namespace(usr, ns),
perms
))
)
)
def check_user_perms_on_task(
usr: User,
tsk: Task,
perms: NamespacePermissionType
) -> bool:
return (
(tperms:=get_user_perms_on_task(usr, tsk) & perms)
== perms
) or NamespacePermissionType.ADMINISTRATE in tperms
def assert_user_perms_on_task(
usr: User,
tsk: Task,
perms: NamespacePermissionType,
action: str = '[unspecified action]'
) -> Either[Exception, Task]:
return (
(
Right(tsk)
) if check_user_perms_on_task(usr, tsk, perms)
else(
Left(AuthorizationError(
Some(usr),
tsk,
action,
get_user_perms_on_task(usr, tsk),
perms
))
)
)

View file

@ -1,7 +1,10 @@
from taskflower.auth.permission import NamespacePermissionType
from taskflower.auth.permission.resolve import resolve_perms_on_namespace, resolve_perms_on_task
from taskflower.db import db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.role import NamespaceRole, UserToNamespaceRole
from taskflower.db.model.task import Task
from taskflower.db.model.user import User
def get_namespaces_for_user(user: User):
@ -15,4 +18,74 @@ def get_namespaces_for_user(user: User):
NamespaceRole.id == UserToNamespaceRole.role
).filter(
UserToNamespaceRole.user == user.id
).all()
).all()
def get_tasks_for_user(user: User) -> list[Task]:
return db.session.query(
Task
).join(
Namespace,
Namespace.id == Task.namespace
).join(
NamespaceRole,
Namespace.id == NamespaceRole.namespace
).join(
UserToNamespaceRole,
NamespaceRole.id == UserToNamespaceRole.role
).filter(
UserToNamespaceRole.user == user.id
).all()
def namespaces_where_user_can(
user: User,
permissions: NamespacePermissionType
):
return [
ns
for ns in get_namespaces_for_user(user)
if (
(nperms:=resolve_perms_on_namespace(user, ns) & permissions)
== permissions
) or (
NamespacePermissionType.ADMINISTRATE in nperms
)
]
def tasks_where_user_can(
user: User,
permissions: NamespacePermissionType
):
return [
tsk
for tsk in get_tasks_for_user(user)
if (
(tperms:=resolve_perms_on_task(user, tsk) & permissions)
== permissions
) or (
NamespacePermissionType.ADMINISTRATE in tperms
)
]
def get_user_perms_on_task(
usr: User,
tsk: Task
) -> NamespacePermissionType:
ns = db.session.query(
Namespace
).filter(
Namespace.id == tsk.namespace
).one()
return resolve_perms_on_namespace(
usr,
ns
)
def get_user_perms_on_namespace(
usr: User,
ns: Namespace
) -> NamespacePermissionType:
return resolve_perms_on_namespace(
usr,
ns
)

View file

@ -4,7 +4,9 @@ from taskflower.auth.permission import NamespacePermissionType
from taskflower.db import db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.role import NamespaceRole, UserToNamespaceRole
from taskflower.db.model.task import Task
from taskflower.db.model.user import User
from taskflower.types.option import Option
def resolve_perms_on_namespace(
user: User,
@ -37,4 +39,22 @@ def resolve_perms_on_namespace(
& ~(role.perms_deny)
)
return perms
return perms
def resolve_perms_on_task(
user: User,
task: Task
):
return Option[Namespace].encapsulate(
db.session.query(
Namespace
).filter(
Namespace.id == task.namespace
).one_or_none()
).and_then(
lambda ns: resolve_perms_on_namespace(
user,
ns
),
lambda: NamespacePermissionType.NO_PERMS
)

View file

@ -0,0 +1,20 @@
import logging
from taskflower.auth.permission import AuthorizationError
from taskflower.db.model.user import User
log = logging.getLogger(__name__)
def report_incorrect_login_attempt(user: User):
# TODO: Implement account lockout
log.warning(f'Incorrect login attempt for user {user.username}!')
def report_authorization_error(vio: AuthorizationError):
log.warning(f'[ACCESS VIOLATION] {str(vio)}')
def check_for_auth_err_and_report(vio: Exception):
''' Checks if an ``Exception`` is an ``AuthorizationError``. If it is,
calls ``report_authorization_error()``
'''
if isinstance(vio, AuthorizationError):
report_authorization_error(vio)

View file

@ -2,47 +2,10 @@ from datetime import datetime
from sqlalchemy.sql import func
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import Boolean, ForeignKey, Integer, String, DateTime
from typing import Self, override
from wtforms import DateTimeLocalField, Form, StringField, validators
import humanize
from taskflower.db import db
from taskflower.db.model.user import User
from taskflower.types import JSONSerializeableDict
from taskflower.types.either import Either, Left, Right
from taskflower.types.resource import APISerializable
def _due_str(due: datetime) -> str:
now = datetime.now()
delta = datetime.now() - due
if now > due:
return humanize.naturaldelta(
delta
) + ' ago'
else:
return 'in ' + humanize.naturaldelta(
delta
)
class TaskForm(Form):
name: StringField = StringField(
'Task Name',
[
validators.Length(min=1, message='Task name is too short!')
]
)
due: DateTimeLocalField = DateTimeLocalField(
'Due Date'
)
description: StringField = StringField(
'Description',
[
validators.Optional()
]
)
class Task(db.Model, APISerializable):
class Task(db.Model):
__tablename__: str = 'task'
id: Mapped[int] = mapped_column(Integer, primary_key=True)
@ -51,47 +14,4 @@ class Task(db.Model, APISerializable):
due: Mapped[datetime] = mapped_column(DateTime(timezone=True))
created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
complete: Mapped[bool] = mapped_column(Boolean, default=False)
owner: Mapped[int] = mapped_column(Integer, ForeignKey('user.id'))
@override
def _serialize(self) -> Either[Exception, JSONSerializeableDict]:
return Right(
{
'id' : self.id,
'name' : self.name,
'due' : str(self.due),
'due_rel' : _due_str(self.due),
'description' : self.description,
'created' : str(self.created),
'complete' : self.complete
}
)
form: type[Form] = TaskForm
@classmethod
def from_form(
cls: type[Self],
form_data: Form,
current_user: User
) -> Either[Exception, Self]:
if isinstance(form_data, TaskForm):
if form_data.validate():
return Right(cls(
name=form_data.name.data, # pyright:ignore[reportCallIssue]
due=form_data.due.data, # pyright:ignore[reportCallIssue]
description=( # pyright:ignore[reportCallIssue]
form_data.description.data
if form_data.description.data
else ''
),
owner=current_user.id # pyright:ignore[reportCallIssue]
))
else:
return Left(ValueError(
'``form_data`` failed validation!'
))
else:
return Left(TypeError(
'``form_data`` must be a subclass of ``TaskForm``'
))
namespace: Mapped[int] = mapped_column(Integer, ForeignKey('namespace.id'))

View file

@ -13,7 +13,7 @@ class User(db.Model, UserMixin, APISerializable):
__tablename__: str = 'user'
id: Mapped[int] = mapped_column(Integer, primary_key=True)
username: Mapped[str] = mapped_column(String, unique=True)
username: Mapped[str] = mapped_column(String(32), unique=True)
display_name: Mapped[str] = mapped_column(String(256))
created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())

View file

@ -0,0 +1,37 @@
from wtforms import Form
from taskflower.db.model.user import User
from taskflower.types.either import Either
class FormCreatesObject[T](Form):
''' Trait that indicates that this ``Form`` can be used directly to create
an object.
'''
def create_object(
self
) -> Either[Exception, T]:
''' Try to create a ``T``.
This function checks for authorization, and will return an
``AuthorizationError`` if the action is not authorized.
'''
raise NotImplementedError
class FormCreatesObjectWithUser[T](Form):
''' Trait that indicates that this ``Form`` cna be used to create an object,
if provided with a ``User`` object.
'''
def create_object(
self,
current_user: User # pyright:ignore[reportUnusedParameter]
) -> Either[Exception, T]:
''' Try to create a ``T`` on behalf of ``current_user``.
This function checks for authorization, and will return an
``AuthorizationError`` if ``current_user`` is not authorized to
create an object with the specified parameters.
'''
raise NotImplementedError

View file

@ -0,0 +1,89 @@
from typing import override
from wtforms import DateTimeLocalField, SelectField, StringField, ValidationError, validators
from taskflower.auth.permission import NamespacePermissionType
from taskflower.auth.permission.checks import assert_user_perms_on_namespace
from taskflower.auth.permission.lookups import namespaces_where_user_can
from taskflower.db import db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.task import Task
from taskflower.db.model.user import User
from taskflower.form import FormCreatesObjectWithUser
from taskflower.types.either import Either, Left, Right
from taskflower.types.option import Option
def task_form_for_user(
user: User
) -> type[FormCreatesObjectWithUser[Task]]:
namespace_choices = [
(ns.id, ns.name)
for ns in namespaces_where_user_can(
user,
NamespacePermissionType.CREATE_IN
)
]
class TaskForm(FormCreatesObjectWithUser[Task]):
name: StringField = StringField(
'Task Name',
[
validators.Length(min=1, message='Task name is too short!')
]
)
due: DateTimeLocalField = DateTimeLocalField(
'Due Date'
)
namespace: SelectField = SelectField(
'Select a namespace',
[
validators.DataRequired('Invalid namespace')
],
choices=namespace_choices,
coerce=int
)
description: StringField = StringField(
'Description',
[
validators.Optional()
]
)
@override
def create_object(
self,
current_user: User
) -> Either[Exception, Task]:
if self.validate():
return Option[Namespace].encapsulate(
db.session.query(
Namespace
).filter(
Namespace.id == self.namespace.data # pyright:ignore[reportAny]
).one_or_none()
).and_then(
lambda val: Right[Exception, Namespace](val),
lambda: Left[Exception, Namespace](KeyError('Namespace not found!'))
).flat_map(
lambda ns: assert_user_perms_on_namespace(
current_user,
ns,
NamespacePermissionType.CREATE_IN,
'Create task'
)
).map(
lambda ns: Task(
name=self.name.data, # pyright:ignore[reportCallIssue]
due=self.due.data, # pyright:ignore[reportCallIssue]
description=( # pyright:ignore[reportCallIssue]
self.description.data
if self.description.data
else ''
),
namespace=ns.id # pyright:ignore[reportCallIssue]
)
)
else:
return Left(ValidationError('Form data did not validate correctly!'))
return TaskForm

View file

View file

@ -0,0 +1,74 @@
from dataclasses import dataclass
from taskflower.auth.permission import NamespacePermissionType
from taskflower.auth.permission.resolve import resolve_perms_on_namespace
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.user import User
from taskflower.sanitize.task import TaskForUser
from taskflower.types.either import Either, Left, Right
@dataclass(frozen=True)
class NamespacePermsForUser:
read: bool
create_in: bool
complete_own: bool
complete_all: bool
uncomplete_own: bool
uncomplete_all: bool
edit_own: bool
edit_all: bool
delete_own: bool
delete_all: bool
edit_roles: bool
administrate: bool
@dataclass(frozen=True)
class NamespaceForUser():
id: int
name: str
description: str
perms: NamespacePermsForUser
tasks: list[TaskForUser]
@staticmethod
def from_user(
ns: Namespace,
user: User,
tasks: list[TaskForUser]|None = None
) -> Either[Exception, 'NamespaceForUser']:
perms = resolve_perms_on_namespace(
user,
ns
)
if not (
NamespacePermissionType.READ in perms
or NamespacePermissionType.ADMINISTRATE in perms
):
return Left(KeyError('No such namespace or insufficient permissions.'))
return Right(
NamespaceForUser(
ns.id,
ns.name,
ns.description,
NamespacePermsForUser(
NamespacePermissionType.READ in perms,
NamespacePermissionType.CREATE_IN in perms,
NamespacePermissionType.COMPLETE_OWN in perms,
NamespacePermissionType.COMPLETE_ALL in perms,
NamespacePermissionType.UNCOMPLETE_OWN in perms,
NamespacePermissionType.UNCOMPLETE_ALL in perms,
NamespacePermissionType.EDIT_OWN in perms,
NamespacePermissionType.EDIT_ALL in perms,
NamespacePermissionType.DELETE_OWN in perms,
NamespacePermissionType.DELETE_ALL in perms,
NamespacePermissionType.EDIT_ROLES in perms,
NamespacePermissionType.ADMINISTRATE in perms
),
(
tasks if tasks
else []
)
)
)

View file

@ -0,0 +1,66 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Self
import humanize
from taskflower.auth.permission import NamespacePermissionType
from taskflower.auth.permission.checks import assert_user_perms_on_task
from taskflower.db.model.task import Task
from taskflower.db.model.user import User
from taskflower.types.either import Either
def _due_str(due: datetime) -> str:
now = datetime.now()
delta = datetime.now() - due
if now > due:
return humanize.naturaldelta(
delta
) + ' ago'
else:
return 'in ' + humanize.naturaldelta(
delta
)
@dataclass(frozen=True)
class TaskForUser:
id: int
name: str
description: str
due: datetime
due_rel: str
created: datetime
complete: bool
namespace_id: int
@classmethod
def from_task(
cls,
tsk: Task,
usr: User
) -> Either[Exception, Self]:
''' Returns a user-sanitized version of the task.
This function performs authorization checks on ``usr`` and will
return ``Left(AuthorizationError)`` if the action is unauthorized.
However, it is the caller's responsibility to report the violation
if warranted (use ``taskflower.auth.violations.check_for_auth_error_and_report``
in an ``lside_effect()``)
'''
return assert_user_perms_on_task(
usr,
tsk,
NamespacePermissionType.READ,
'Retrieve task'
).map(
lambda val: cls(
tsk.id,
tsk.name,
tsk.description,
tsk.due,
_due_str(tsk.due),
tsk.created,
tsk.complete,
tsk.namespace
)
)

View file

@ -52,8 +52,53 @@
font-style: italic;
}
.list .detail-view-elem h1 {
margin: 0;
.list .detail-view-elem hr {
color: var(--on-table-row);
}
.list .detail-view-elem .detail-view-header h1 {
margin: 0 1rem;
font-size: larger;
}
.list .detail-view-elem .detail-view-header {
display: flex;
margin-bottom: 1rem;
}
.list .detail-view-elem .detail-view-header::before{
content: "";
flex: 1 1;
background: repeating-linear-gradient(
60deg,
var(--on-table-row) 0,
var(--on-table-row) 0.25rem,
transparent 0.35rem,
transparent 1.0rem,
var(--on-table-row) 1.1rem
);
width: 100%;
max-width: 2rem;
}
.list .detail-view-elem .detail-view-header::after{
content: "";
flex: 1 1;
background: repeating-linear-gradient(
60deg,
var(--on-table-row) 0,
var(--on-table-row) 0.25rem,
transparent 0.35rem,
transparent 1.0rem,
var(--on-table-row) 1.1rem
);
width: 100%;
}
.list .detail-view-elem .main-description {

View file

@ -54,12 +54,8 @@ a {
color: var(--on-block-1);
display: flex;
flex-direction: column;
}
#sidebar a {
display: block;
border-top: 1px solid var(--on-block-1);
border-bottom: 1px solid var(--on-block-1);
max-width: 20%;
overflow: clip;
}
#sidebar-top {
@ -68,12 +64,25 @@ a {
}
#sidebar a {
display: block;
border-top: 1px solid var(--on-block-1);
border-bottom: 1px solid var(--on-block-1);
color: var(--on-block-1);
text-decoration: none;
font-family: var(--header-font);
padding: 0.5rem 2rem;
}
#sidebar-namespace-list, #sidebar-spacer {
flex: 1 1;
border-top: 1px solid var(--on-block-1);
border-bottom: 1px solid var(--on-block-1);
}
#sidebar-namespace-list a {
border: none;
}
#footer {
background-color: var(--footer);
color: var(--on-footer);
@ -125,10 +134,4 @@ h2 {
h3 {
font-size: large;
font-style: italic;
}
#sidebar-spacer {
flex: 1 1;
border-top: 1px solid var(--on-block-1);
border-bottom: 1px solid var(--on-block-1);
}

View file

@ -0,0 +1,11 @@
{% macro namespace_entry(ns) %}
<a href={{ url_for('web.namespace.get', id=ns.id) }}>{{ ns.name }}</a>
{% endmacro %}
{% macro namespace_list(namespaces) %}
<div id="sidebar-namespace-list">
{% for ns in namespaces %}
{{ namespace_entry(ns) }}
{% endfor %}
</div>
{% endmacro %}

View file

@ -1,4 +1,6 @@
{% extends "base.html" %}
{% from "_namespace-list.html" import namespace_list %}
{% block raw_title %}{% block title %}{% endblock %} | Taskflower🌺{% endblock %}
{% block content %}
<div id="sidebar">
@ -6,7 +8,11 @@
<a id="sidebar-top" href="/">Taskflower 🌺</a>
<a href={{ url_for("web.task.all") }}>My Tasks</a>
<a href={{ url_for("web.task.new") }}>Create Task</a>
{% if current_user.is_authenticated %}
{{ namespace_list(fetch_namespaces(current_user)) }}
{% else %}
<div id="sidebar-spacer"></div>
{% endif %}
{% if current_user.is_authenticated %}
<a href={{ url_for(
"web.user.profile",

View file

@ -0,0 +1,18 @@
{% extends "main.html" %}
{% from "task/_tasklist.html" import task_list, task_list_script with context %}
{% block head_extras %}
<link rel="stylesheet" href={{ url_for("static", filename="list-view.css") }} />
{% endblock %}
{% block title %}{{ namespace.name }}{% endblock %}
{% block main_content %}
<h1>{{ namespace.name }}</h1>
<p>{{ namespace.description }}</p>
<h2>Tasks</h2>
{{ task_list(namespace.tasks) }}
{{ task_list_script() }}
{% endblock %}

View file

@ -10,6 +10,7 @@
{{ render_field(form.name) }}
{{ render_field(form.due) }}
{{ render_field(form.description) }}
{{ render_field(form.namespace) }}
</dl>
<p><input type="submit">Create Task</input></p>
</form>

View file

@ -1,43 +1,46 @@
{% macro inline_task_header() %}
{% macro inline_task_header(list_id=0) %}
<tr class="task-header-row">
<th class="check-hdr" id="check-header"></th>
<th class="name-hdr" id="task-name-hdr">
<th class="check-hdr" id={{ "check-header-"~list_id }}></th>
<th class="name-hdr" id={{ "task-name-hdr-"~list_id }}>
<h3>Task</h3>
</th>
<th class="due-hdr" id="task-due-hdr">
<th class="due-hdr" id={{ "task-due-hdr"~list_id }}>
<h3>Due</h3>
</th>
</tr>
{% endmacro %}
{% macro inline_task(task) %}
<tr class="task-table-row" id={{ "row-"~task.id }}>
<td class="checkbox" id={{ "check-"~task.id }}>
{% macro inline_task(task, list_id=0) %}
<tr class="task-table-row" id="{{ tlist_cid('row', task.id, list_id) }}">
<td class="checkbox" id="{{ tlist_cid('check', task.id, list_id) }}">
{% if task.complete %}
<input id={{ "check-inner-"~task.id }} type="checkbox" checked>
<input id="{{ tlist_cid('check-inner', task.id, list_id) }}" type="checkbox" checked>
{% else %}
<input id={{ "check-inner-"~task.id }} type="checkbox">
<input id="{{ tlist_cid('check-inner', task.id, list_id) }}" type="checkbox">
{% endif %}
</td>
<td
class="task-name" id={{ "task-name-"~task.id }}
onclick={{"set_active('task-detail-"~task.id~"')"}}
class="task-name" id="{{ tlist_cid('task-name', task.id, list_id) }}"
onclick="set_active('{{ tlist_cid('task-detail', task.id, list_id) }}', {{list_id}})"
>
<label for={{ "check-inner-"~task.id }}>{{ task.name }}</label>
<label for="{{ tlist_cid('check-inner', task.id, list_id) }}">{{ task.name }}</label>
</td>
<td class="task-due" id={{ "task-due"~task.id }}>
<td class="task-due" id="{{ tlist_cid('task-due', task.id, list_id) }}">
<p>{{ task.due_rel }}</p>
</td>
</tr>
<tr></tr> <!-- placeholder for CSS styling -->
<tr class="detail-view" id={{"task-detail-"~task.id}}>
<tr class="detail-view tl-{{ list_id }}" id="{{ tlist_cid('task-detail', task.id, list_id) }}">
<td class="detail-view-elem" colspan=3>
<h1>Task Details</h1>
<div class="detail-view-header">
<h1>Task Details: {{ task.name }}</h1>
</div>
<p class="small-details">
Task ID: {{ task.id }}
<br/>Created: {{ task.created }}
<br/>Created: {{ task.created }} in namespace {{ task.namespace_id }}
<br/>Due: {{ task.due }}
</p>
</p>
<hr/>
<p class="main-description">{{ task.description }}</p>
</td>
</tr>

View file

@ -0,0 +1,42 @@
{% from "task/_shorttask.html" import inline_task, inline_task_header with context %}
{% macro task_list(tasks, list_id=0) %}
<table class="list">
<colgroup>
<col span="1" style="width: 0;"/>
<col span="1"/>
<col span="1" style="width: 20%;"/>
</colgroup>
<tbody class="list">
{{ inline_task_header(list_id) }}
{% for task in tasks %}
{{ inline_task(task, list_id) }}
{% endfor %}
</tbody>
</table>
{% endmacro %}
{% macro task_list_script() %}
<style>
.list .checkbox {
padding: 1rem;
}
.list .task-due {
padding-right: 1rem;
}
</style>
<script>
function set_active(task_id, list_id){
Array.from(document.querySelectorAll(
`.list .detail-view.shown.tl-${list_id}`
)).forEach(
(el) => el.classList.remove('shown')
)
document.getElementById(task_id).classList.add('shown')
}
</script>
{% endmacro %}

View file

@ -1,5 +1,5 @@
{% extends "main.html" %}
{% from "task/_shorttask.html" import inline_task, inline_task_header %}
{% from "task/_tasklist.html" import task_list, task_list_script with context %}
{% block head_extras %}
<link rel="stylesheet" href={{ url_for("static", filename="list-view.css") }} />
@ -9,41 +9,8 @@
{% block main_content %}
<h1>My Tasks</h1>
<table class="list">
<colgroup>
<col span="1" style="width: 0;"/>
<col span="1"/>
<col span="1" style="width: 20%;"/>
</colgroup>
<tbody class="list">
{{ inline_task_header() }}
{% for task in tasks %}
{{ inline_task(task) }}
{% endfor %}
</tbody>
</table>
<style>
.list .checkbox {
padding: 1rem;
}
.list .task-due {
padding-right: 1rem;
}
</style>
<script>
function set_active(id){
Array.from(document.querySelectorAll(
'.task-list .detail-view.shown'
)).forEach(
(el) => el.classList.remove('shown')
)
document.getElementById(id).classList.add('shown')
}
</script>
{{ task_list(tasks) }}
{{ task_list_script() }}
{% endblock %}

View file

@ -16,6 +16,8 @@ ArbitraryKwargs = dict[str, Any] # pyright:ignore[reportExplicitAny]
ViewFunction = Callable[..., FlaskViewReturnType]
JSONSerializableData: TypeAlias = 'str|int|float|dict[str, JSONSerializableData]|list[JSONSerializableData]|bool|NoneType'
Stringable = str|Any # pyright:ignore[reportExplicitAny]
def ann[T](x: T|None) -> T:
''' Assert Not None

View file

@ -23,12 +23,79 @@ class Option[T](ABC):
if_some: Callable[[T], Y],
if_none: Callable[[], Y]
) -> Y:
''' Extract the value from the option, returning some other type ``Y``.
If this ``Option`` is ``Some``, then ``if_some()`` will be called
with the ``Option`` contents as a parameter. Otherwise,
``if_none()`` will be called wiht no parameters. Both functions must
return the same type - the called function's return value will be
returned from ``and_then()``
>>> Some(1).and_then(
... lambda v: 2*v,
... lambda: 0
... )
4
>>> Nothing().and_then(
... lambda v: 2*v,
... lambda: 0
... )
0
>>> Some(1).and_then(
... lambda v: Right(v),
... lambda: Left(KeyError('Key Not Found'))
... )
Right(1)
>>> Nothing().and_then(
... lambda v: Right(v),
... lambda: Left(KeyError('Key Not Found'))
... )
Left(KeyError('Key Not Found'))
'''
raise NotImplementedError
@staticmethod
def encapsulate(v: T|None) -> 'Option[T]':
''' "Encapsulate" a nullable value (i.e. a value which can either be
an object of type ``T`` or ``None``) into an ``Option[T]``.
If ``v`` is ``None``, this returns ``Nothing()``; otherwise, it
returns ``Some(v)``
>>> Option.encapsulate(1)
Some(1)
>>> Option.encapsulate(None)
Nothing()
'''
return Some(v) if v is not None else Nothing()
@staticmethod
def ensure(v: 'Option[T]|T|None') -> 'Option[T]':
''' As ``encapsulate()``, but if ``v`` is already an ``Option``, it is
assumed to be an ``Option[T]`` and passed through transparently.
>>> Option.ensure(Some(1))
Some(1)
>>> Option.ensure(1)
Some(1)
>>> Option.ensure(None)
Nothing()
'''
return (
(
v
) if isinstance(v, Option) # pyright:ignore[reportUnknownVariableType]
else (
Some(v)
) if v is not None
else (
Nothing()
)
)
@final
class Some[T](Option[T]):

View file

@ -4,7 +4,7 @@ from flask_login import login_required, login_user, logout_user # pyright:ignore
from wtforms import Form, PasswordField, StringField
from wtforms.validators import Length
from taskflower.auth import report_incorrect_login_attempt
from taskflower.auth.violations import report_incorrect_login_attempt
from taskflower.auth.hash import check_hash
from taskflower.db import db
from taskflower.db.model.user import User

View file

@ -1,15 +1,20 @@
from dataclasses import dataclass
from flask import Blueprint, redirect, render_template, url_for
from flask import Blueprint, render_template, request
from flask_login import current_user, login_required # pyright:ignore[reportUnknownVariableType,reportMissingTypeStubs]
from taskflower.auth.messages import not_found_or_not_authorized
from taskflower.auth.permission import NamespacePermissionType
from taskflower.auth.permission.checks import assert_user_perms_on_namespace, assert_user_perms_on_task
from taskflower.auth.permission.lookups import get_namespaces_for_user
from taskflower.auth.permission.resolve import resolve_perms_on_namespace
from taskflower.auth.violations import check_for_auth_err_and_report
from taskflower.db import db
from taskflower.db.model.namespace import Namespace
from taskflower.db.model.task import Task
from taskflower.db.model.user import User
from taskflower.types.either import Either, Left, Right, gather_successes
from taskflower.sanitize.namespace import NamespaceForUser
from taskflower.sanitize.task import TaskForUser
from taskflower.types.either import Left, Right, gather_successes
from taskflower.types.option import Option
from taskflower.web.errors import ResponseErrorNotFound, response_from_exception
web_namespace = Blueprint(
'namespace',
@ -18,65 +23,23 @@ web_namespace = Blueprint(
url_prefix='/namespace'
)
@dataclass(frozen=True)
class NamespacePermsForUser:
read: bool
create_in: bool
complete_own: bool
complete_all: bool
uncomplete_own: bool
uncomplete_all: bool
edit_own: bool
edit_all: bool
delete_own: bool
delete_all: bool
edit_roles: bool
administrate: bool
@web_namespace.app_context_processor
def namespace_processor():
# Inject some namespace helper functions into the jinja template
# context.
def fetch_namespaces(user: User) -> list[NamespaceForUser]:
namespace_list = get_namespaces_for_user(user)
@dataclass(frozen=True)
class NamespaceForUser():
id: int
name: str
description: str
perms: NamespacePermsForUser
@staticmethod
def from_user(ns: Namespace, user: User) -> Either[Exception, 'NamespaceForUser']:
perms = resolve_perms_on_namespace(
user,
ns
)
if not (
NamespacePermissionType.READ in perms
or NamespacePermissionType.ADMINISTRATE in perms
):
return Left(KeyError('No such namespace or insufficient permissions.'))
return Right(
NamespaceForUser(
ns.id,
ns.name,
ns.description,
NamespacePermsForUser(
NamespacePermissionType.READ in perms,
NamespacePermissionType.CREATE_IN in perms,
NamespacePermissionType.COMPLETE_OWN in perms,
NamespacePermissionType.COMPLETE_ALL in perms,
NamespacePermissionType.UNCOMPLETE_OWN in perms,
NamespacePermissionType.UNCOMPLETE_ALL in perms,
NamespacePermissionType.EDIT_OWN in perms,
NamespacePermissionType.EDIT_ALL in perms,
NamespacePermissionType.DELETE_OWN in perms,
NamespacePermissionType.DELETE_ALL in perms,
NamespacePermissionType.EDIT_ROLES in perms,
NamespacePermissionType.ADMINISTRATE in perms
)
return gather_successes([
NamespaceForUser.from_user(
ns, user
)
)
for ns in namespace_list
])
return dict(
fetch_namespaces=fetch_namespaces
)
@web_namespace.route('/')
@login_required
@ -99,4 +62,78 @@ def all():
@web_namespace.route('/<int:id>')
@login_required
def get(id: int):
return redirect(url_for('web.namespace.all'))
def _fetch_tasks_for(
ns: Namespace,
usr: User
) -> list[TaskForUser]:
tsks = db.session.query(
Task
).filter(
Task.namespace == ns.id
).all()
return gather_successes(
[
assert_user_perms_on_task(
usr,
tsk,
NamespacePermissionType.READ,
'Read task'
).flat_map(
lambda t: TaskForUser.from_task(
t,
usr
)
)
for tsk in tsks
]
)
cur_usr: User = current_user # pyright:ignore[reportAssignmentType]
return Option[Namespace].encapsulate(
db.session.query(
Namespace
).filter(
Namespace.id == id
).one_or_none()
).and_then(
lambda val: Right[Exception, Namespace](val),
lambda: Left[Exception, Namespace](
ResponseErrorNotFound(
request.method,
request.path,
f'Namespace with id {id} not found!',
not_found_or_not_authorized(
'Namespace',
'id'
)
)
)
).flat_map(
lambda ns: assert_user_perms_on_namespace(
cur_usr,
ns,
NamespacePermissionType.READ,
'Read'
)
).lside_effect(
check_for_auth_err_and_report
).flat_map(
lambda ns: NamespaceForUser.from_user(
ns,
cur_usr,
_fetch_tasks_for(
ns,
cur_usr
)
)
).and_then(
lambda ns_usr: render_template(
'namespace/detail.html',
namespace=ns_usr
),
lambda exc: response_from_exception(
exc
)
)

View file

@ -1,9 +1,12 @@
from flask import Blueprint, redirect, render_template, request, url_for
from flask_login import current_user, login_required # pyright:ignore[reportUnknownVariableType,reportMissingTypeStubs]
from taskflower.db import db
from taskflower.auth.permission import NamespacePermissionType
from taskflower.auth.permission.lookups import tasks_where_user_can
from taskflower.auth.violations import check_for_auth_err_and_report
from taskflower.db.helpers import add_to_db
from taskflower.db.model.task import Task
from taskflower.form.task import task_form_for_user
from taskflower.sanitize.task import TaskForUser
from taskflower.types.either import reduce_either
from taskflower.web.errors import (
response_from_exception
@ -16,23 +19,37 @@ web_tasks = Blueprint(
url_prefix='/tasks'
)
@web_tasks.app_context_processor
def tasks_processor():
# Inject some task helper functions into the jinja template
# context.
def tlist_cid(component: str, task_id: str|int, list_id: str|int = 0) -> str:
# Generate a unique HTML ``id`` for a component in a task list.
return f'list-{list_id}-{component}-{task_id}'
return dict(
tlist_cid=tlist_cid
)
@web_tasks.route('/')
@login_required
def all():
task_list: list[Task] = db.session.execute(
db.select( # pyright:ignore[reportAssignmentType,reportAny]
Task
).filter_by(
owner=current_user.id # pyright:ignore[reportAny]
).order_by(
Task.due
)
).scalars()
task_list = tasks_where_user_can(
current_user, # pyright:ignore[reportArgumentType]
NamespacePermissionType.READ
)
return reduce_either([
t.sanitize()
for t in task_list
]).and_then(
return reduce_either(
[
TaskForUser.from_task(
t,
current_user # pyright:ignore[reportArgumentType]
).lside_effect(
check_for_auth_err_and_report
)
for t in task_list
]
).and_then(
lambda task_list_sanitized: render_template(
'task/list.html',
tasks=task_list_sanitized
@ -43,17 +60,20 @@ def all():
@web_tasks.route('/new', methods=['GET', 'POST'])
@login_required
def new():
form_data = Task.form(request.form)
form_data = task_form_for_user(
current_user # pyright:ignore[reportArgumentType]
)(request.form)
if request.method == 'POST' and form_data.validate():
return Task.from_form(
form_data,
return form_data.create_object(
current_user # pyright:ignore[reportArgumentType]
).lside_effect(
check_for_auth_err_and_report
).flat_map(
lambda task: add_to_db(task)
).and_then(
lambda task: redirect(url_for(
'web.task.all',
id=task.id
'web.task.all'
)),
lambda exc: response_from_exception(exc)
)

View file

@ -127,8 +127,8 @@ class CreateUserForm(Form):
message='Display names must be at least one character long!'
),
Length(
max=64,
message='Display names can\'t be longer than 64 characters!'
max=256,
message='Display names can\'t be longer than 256 characters!'
)
]
)
@ -271,4 +271,4 @@ def create_user_page():
@web_user.route('/profile/<int:id>')
def profile(id: int):
return redirect(url_for('web.task.all_tasks_view'))
return redirect(url_for('web.task.all'))