Coverage for .tox / coverage / lib / python3.11 / site-packages / wuttaweb / views / users.py: 100%
224 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-20 21:14 -0500
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-20 21:14 -0500
1# -*- coding: utf-8; -*-
2################################################################################
3#
4# wuttaweb -- Web App for Wutta Framework
5# Copyright © 2024-2026 Lance Edgar
6#
7# This file is part of Wutta Framework.
8#
9# Wutta Framework is free software: you can redistribute it and/or modify it
10# under the terms of the GNU General Public License as published by the Free
11# Software Foundation, either version 3 of the License, or (at your option) any
12# later version.
13#
14# Wutta Framework is distributed in the hope that it will be useful, but
15# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
16# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
17# more details.
18#
19# You should have received a copy of the GNU General Public License along with
20# Wutta Framework. If not, see <http://www.gnu.org/licenses/>.
21#
22################################################################################
23"""
24Views for users
25"""
27import sqlalchemy as sa
29from wuttjamaican.db.model import User
30from wuttaweb.views import MasterView
31from wuttaweb.forms import widgets
32from wuttaweb.forms.schema import PersonRef, RoleRefs
35class UserView(MasterView): # pylint: disable=abstract-method
36 """
37 Master view for users.
39 Default route prefix is ``users``.
41 Notable URLs provided by this class:
43 * ``/users/``
44 * ``/users/new``
45 * ``/users/XXX``
46 * ``/users/XXX/edit``
47 * ``/users/XXX/delete``
48 """
50 model_class = User
52 labels = {
53 "api_tokens": "API Tokens",
54 }
56 grid_columns = [
57 "username",
58 "person",
59 "active",
60 ]
62 filter_defaults = {
63 "username": {"active": True},
64 "active": {"active": True, "verb": "is_true"},
65 }
66 sort_defaults = "username"
68 form_fields = [
69 "username",
70 "person",
71 "active",
72 "prevent_edit",
73 "roles",
74 "api_tokens",
75 ]
77 mergeable = True
78 merge_additive_fields = ["roles"]
80 def get_query(self, session=None): # pylint: disable=empty-docstring
81 """ """
82 query = super().get_query(session=session)
84 # nb. always join Person
85 model = self.app.model
86 query = query.outerjoin(model.Person)
88 return query
90 def configure_grid(self, grid): # pylint: disable=empty-docstring
91 """ """
92 g = grid
93 super().configure_grid(g)
94 model = self.app.model
96 # never show these
97 g.remove("person_uuid", "role_refs", "password")
98 g.remove_filter("password")
100 # username
101 g.set_link("username")
103 # person
104 g.set_link("person")
105 g.set_sorter("person", model.Person.full_name)
106 g.set_filter("person", model.Person.full_name, label="Person Full Name")
108 def grid_row_class( # pylint: disable=empty-docstring,unused-argument
109 self, user, data, i
110 ):
111 """ """
112 if not user.active:
113 return "has-background-warning"
114 return None
116 def is_editable(self, obj): # pylint: disable=empty-docstring
117 """ """
118 user = obj
120 # only root can edit certain users
121 if user.prevent_edit and not self.request.is_root:
122 return False
124 return True
126 def is_deletable(self, obj): # pylint: disable=empty-docstring
127 """ """
128 user = obj
130 # only root can delete certain users
131 if user.prevent_edit and not self.request.is_root:
132 return False
134 return True
136 def configure_form(self, form): # pylint: disable=empty-docstring
137 """ """
138 f = form
139 super().configure_form(f)
140 user = f.model_instance
142 # username
143 f.set_validator("username", self.unique_username)
145 # person
146 if self.creating or self.editing:
147 f.fields.insert_after("person", "first_name")
148 f.set_required("first_name", False)
149 f.fields.insert_after("first_name", "last_name")
150 f.set_required("last_name", False)
151 f.remove("person")
152 if self.editing:
153 person = user.person
154 if person:
155 f.set_default("first_name", person.first_name)
156 f.set_default("last_name", person.last_name)
157 else:
158 f.set_node("person", PersonRef(self.request))
160 # password
161 # nb. we must avoid 'password' as field name since
162 # ColanderAlchemy wants to handle the raw/hashed value
163 f.remove("password")
164 # nb. no need for password field if readonly
165 if self.creating or self.editing:
166 # nb. use 'set_password' as field name
167 f.append("set_password")
168 f.set_required("set_password", False)
169 f.set_widget("set_password", widgets.CheckedPasswordWidget())
171 # roles
172 f.append("roles")
173 f.set_node("roles", RoleRefs(self.request))
174 if not self.creating:
175 f.set_default("roles", [role.uuid.hex for role in user.roles])
177 # api_tokens
178 if self.viewing and self.has_perm("manage_api_tokens"):
179 f.set_grid("api_tokens", self.make_api_tokens_grid(user))
180 else:
181 f.remove("api_tokens")
183 def unique_username(self, node, value): # pylint: disable=empty-docstring
184 """ """
185 model = self.app.model
186 session = self.Session()
188 query = session.query(model.User).filter(model.User.username == value)
190 if self.editing:
191 uuid = self.request.matchdict["uuid"]
192 query = query.filter(model.User.uuid != uuid)
194 if query.count():
195 node.raise_invalid("Username must be unique")
197 def objectify(self, form): # pylint: disable=empty-docstring
198 """ """
199 auth = self.app.get_auth_handler()
200 data = form.validated
202 # normal logic first
203 user = super().objectify(form)
205 # maybe update person name
206 if "first_name" in form and "last_name" in form:
207 first_name = data.get("first_name")
208 last_name = data.get("last_name")
209 if first_name or last_name:
210 user.person.full_name = self.app.make_full_name(first_name, last_name)
211 else:
212 user.person = None
214 # maybe set user password
215 if "set_password" in form and data.get("set_password"):
216 auth.set_user_password(user, data["set_password"])
218 # update roles for user
219 # TODO
220 # if self.has_perm('edit_roles'):
221 self.update_roles(user, form)
223 return user
225 def update_roles(self, user, form): # pylint: disable=empty-docstring
226 """ """
227 # TODO
228 # if not self.has_perm('edit_roles'):
229 # return
230 data = form.validated
231 if "roles" not in data:
232 return
234 model = self.app.model
235 session = self.Session()
236 auth = self.app.get_auth_handler()
238 old_roles = {role.uuid for role in user.roles}
239 new_roles = data["roles"]
241 admin = auth.get_role_administrator(session)
242 ignored = {
243 auth.get_role_authenticated(session).uuid,
244 auth.get_role_anonymous(session).uuid,
245 }
247 # add any new roles for the user, taking care to avoid certain
248 # unwanted operations for built-in roles
249 for uuid in new_roles:
250 if uuid in ignored:
251 continue
252 if uuid in old_roles:
253 continue
254 if uuid == admin.uuid and not self.request.is_root:
255 continue
256 role = session.get(model.Role, uuid)
257 user.roles.append(role)
259 # remove any roles which were *not* specified, taking care to
260 # avoid certain unwanted operations for built-in roles
261 for uuid in old_roles:
262 if uuid in new_roles:
263 continue
264 if uuid == admin.uuid and not self.request.is_root:
265 continue
266 role = session.get(model.Role, uuid)
267 user.roles.remove(role)
269 def make_api_tokens_grid(self, user):
270 """
271 Make and return the grid for the API Tokens field.
273 This is only shown when current user has permission to manage
274 API tokens for other users.
276 :rtype: :class:`~wuttaweb.grids.base.Grid`
277 """
278 route_prefix = self.get_route_prefix()
280 grid = self.make_grid(
281 key=f"{route_prefix}.view.api_tokens",
282 data=[self.normalize_api_token(t) for t in user.api_tokens],
283 columns=[
284 "description",
285 "created",
286 ],
287 sortable=True,
288 sort_on_backend=False,
289 sort_defaults=[("created", "desc")],
290 )
292 if self.has_perm("manage_api_tokens"):
294 # create token
295 button = self.make_button(
296 "New",
297 primary=True,
298 icon_left="plus",
299 **{"@click": "$emit('new-token')"},
300 )
301 grid.add_tool(button, key="create")
303 # delete token
304 grid.add_action(
305 "delete",
306 url="#",
307 icon="trash",
308 link_class="has-text-danger",
309 click_handler="$emit('delete-token', props.row)",
310 )
312 return grid
314 def normalize_api_token(self, token): # pylint: disable=empty-docstring
315 """ """
316 return {
317 "uuid": token.uuid.hex,
318 "description": token.description,
319 "created": self.app.render_datetime(token.created),
320 }
322 def add_api_token(self):
323 """
324 AJAX view for adding a new user API token.
326 This calls
327 :meth:`wuttjamaican:wuttjamaican.auth.AuthHandler.add_api_token()`
328 for the creation logic.
329 """
330 session = self.Session()
331 auth = self.app.get_auth_handler()
332 user = self.get_instance()
333 data = self.request.json_body
335 token = auth.add_api_token(user, data["description"])
336 session.flush()
337 session.refresh(token)
339 result = self.normalize_api_token(token)
340 result["token_string"] = token.token_string
341 result["_action_url_delete"] = "#"
342 return result
344 def delete_api_token(self):
345 """
346 AJAX view for deleting a user API token.
348 This calls
349 :meth:`wuttjamaican:wuttjamaican.auth.AuthHandler.delete_api_token()`
350 for the deletion logic.
351 """
352 model = self.app.model
353 session = self.Session()
354 auth = self.app.get_auth_handler()
355 user = self.get_instance()
356 data = self.request.json_body
358 token = session.get(model.UserAPIToken, data["uuid"])
359 if not token:
360 return {"error": "API token not found"}
362 if token.user is not user:
363 return {"error": "API token not found"}
365 auth.delete_api_token(token)
366 return {}
368 def merge_get_simple_fields(self): # pylint: disable=empty-docstring
369 """ """
370 fields = super().merge_get_simple_fields()
372 if "password" in fields:
373 fields.remove("password")
375 return fields
377 def merge_get_additive_fields(self): # pylint: disable=empty-docstring
378 """ """
379 fields = super().merge_get_additive_fields()
381 if self.app.continuum_is_enabled():
382 if "transaction_count" not in fields:
383 fields.append("transaction_count")
385 return fields
387 def merge_get_data(self, obj): # pylint: disable=empty-docstring
388 """ """
389 data = super().merge_get_data(obj)
390 model_class = self.get_model_class()
391 session = self.Session()
392 user = obj
394 data["roles"] = sorted([role.name for role in user.roles])
396 if self.app.continuum_is_enabled():
397 import sqlalchemy_continuum as continuum # pylint: disable=import-outside-toplevel
399 txncls = continuum.transaction_class(model_class)
400 data["transaction_count"] = (
401 session.query(txncls).filter(txncls.user == user).count()
402 )
404 return data
406 def merge_why_not(self, removing, keeping):
407 """
408 This checks to ensure the *current* user is not the same as
409 the "removing" user.
411 See also parent method:
412 :meth:`~wuttaweb.views.master.MasterView.merge_why_not()`
413 """
414 if removing is self.request.user:
415 return "Cannot remove user who is currently logged in!"
416 return None
418 def merge_execute(self, removing, keeping):
419 """
420 The logic to merge 2 users is extended as follows:
422 The "keeping" user will be assigned to all roles to which the
423 "removing" user belonged.
425 Any upgrades created or executed by the "removing" user will
426 be updated to reference the "keeping" user instead.
428 Any versioning (SQLAlchemy-Continuum) transactions created by
429 the "removing" user will be updated to reference the "keeping"
430 user instead.
432 See also parent method:
433 :meth:`~wuttaweb.views.master.MasterView.merge_execute()`
434 """
435 model = self.app.model
436 session = self.Session()
437 model_class = self.get_model_class()
439 # transfer role membership
440 for role in list(removing.roles):
441 if role not in keeping.roles:
442 keeping.roles.append(role)
444 # reassign upgrade "created by"
445 stmt = (
446 sa.update(model.Upgrade)
447 .where(model.Upgrade.created_by_uuid == removing.uuid)
448 .values(created_by_uuid=keeping.uuid)
449 )
450 session.execute(stmt, execution_options={"synchronize_session": False})
452 # reassign upgrade "executed by"
453 stmt = (
454 sa.update(model.Upgrade)
455 .where(model.Upgrade.executed_by_uuid == removing.uuid)
456 .values(executed_by_uuid=keeping.uuid)
457 )
458 session.execute(stmt, execution_options={"synchronize_session": False})
460 # reassign continuum transactions
461 if self.app.continuum_is_enabled():
462 import sqlalchemy_continuum as continuum # pylint: disable=import-outside-toplevel
464 txncls = continuum.transaction_class(model_class)
465 stmt = (
466 sa.update(txncls)
467 .where(txncls.user_id == removing.uuid)
468 .values(user_id=keeping.uuid)
469 )
470 session.execute(stmt, execution_options={"synchronize_session": False})
472 # continue default merge
473 super().merge_execute(removing, keeping)
475 @classmethod
476 def defaults(cls, config): # pylint: disable=empty-docstring
477 """ """
479 # nb. User may come from custom model
480 wutta_config = config.registry.settings["wutta_config"]
481 app = wutta_config.get_app()
482 cls.model_class = app.model.User
484 cls._user_defaults(config)
485 cls._defaults(config)
487 @classmethod
488 def _user_defaults(cls, config):
489 """
490 Provide extra default configuration for the User master view.
491 """
492 route_prefix = cls.get_route_prefix()
493 permission_prefix = cls.get_permission_prefix()
494 instance_url_prefix = cls.get_instance_url_prefix()
495 model_title = cls.get_model_title()
497 # manage API tokens
498 config.add_wutta_permission(
499 permission_prefix,
500 f"{permission_prefix}.manage_api_tokens",
501 f"Manage API tokens for any {model_title}",
502 )
503 config.add_route(
504 f"{route_prefix}.add_api_token",
505 f"{instance_url_prefix}/add-api-token",
506 request_method="POST",
507 )
508 config.add_view(
509 cls,
510 attr="add_api_token",
511 route_name=f"{route_prefix}.add_api_token",
512 permission=f"{permission_prefix}.manage_api_tokens",
513 renderer="json",
514 )
515 config.add_route(
516 f"{route_prefix}.delete_api_token",
517 f"{instance_url_prefix}/delete-api-token",
518 request_method="POST",
519 )
520 config.add_view(
521 cls,
522 attr="delete_api_token",
523 route_name=f"{route_prefix}.delete_api_token",
524 permission=f"{permission_prefix}.manage_api_tokens",
525 renderer="json",
526 )
529def defaults(config, **kwargs): # pylint: disable=missing-function-docstring
530 base = globals()
532 UserView = kwargs.get( # pylint: disable=invalid-name,redefined-outer-name
533 "UserView", base["UserView"]
534 )
535 UserView.defaults(config)
538def includeme(config): # pylint: disable=missing-function-docstring
539 defaults(config)