Coverage for .tox / coverage / lib / python3.11 / site-packages / wuttaweb / views / users.py: 100%

224 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-20 21:14 -0500

1# -*- coding: utf-8; -*- 

2################################################################################ 

3# 

4# wuttaweb -- Web App for Wutta Framework 

5# Copyright © 2024-2026 Lance Edgar 

6# 

7# This file is part of Wutta Framework. 

8# 

9# Wutta Framework is free software: you can redistribute it and/or modify it 

10# under the terms of the GNU General Public License as published by the Free 

11# Software Foundation, either version 3 of the License, or (at your option) any 

12# later version. 

13# 

14# Wutta Framework is distributed in the hope that it will be useful, but 

15# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 

16# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 

17# more details. 

18# 

19# You should have received a copy of the GNU General Public License along with 

20# Wutta Framework. If not, see <http://www.gnu.org/licenses/>. 

21# 

22################################################################################ 

23""" 

24Views for users 

25""" 

26 

27import sqlalchemy as sa 

28 

29from wuttjamaican.db.model import User 

30from wuttaweb.views import MasterView 

31from wuttaweb.forms import widgets 

32from wuttaweb.forms.schema import PersonRef, RoleRefs 

33 

34 

35class UserView(MasterView): # pylint: disable=abstract-method 

36 """ 

37 Master view for users. 

38 

39 Default route prefix is ``users``. 

40 

41 Notable URLs provided by this class: 

42 

43 * ``/users/`` 

44 * ``/users/new`` 

45 * ``/users/XXX`` 

46 * ``/users/XXX/edit`` 

47 * ``/users/XXX/delete`` 

48 """ 

49 

50 model_class = User 

51 

52 labels = { 

53 "api_tokens": "API Tokens", 

54 } 

55 

56 grid_columns = [ 

57 "username", 

58 "person", 

59 "active", 

60 ] 

61 

62 filter_defaults = { 

63 "username": {"active": True}, 

64 "active": {"active": True, "verb": "is_true"}, 

65 } 

66 sort_defaults = "username" 

67 

68 form_fields = [ 

69 "username", 

70 "person", 

71 "active", 

72 "prevent_edit", 

73 "roles", 

74 "api_tokens", 

75 ] 

76 

77 mergeable = True 

78 merge_additive_fields = ["roles"] 

79 

80 def get_query(self, session=None): # pylint: disable=empty-docstring 

81 """ """ 

82 query = super().get_query(session=session) 

83 

84 # nb. always join Person 

85 model = self.app.model 

86 query = query.outerjoin(model.Person) 

87 

88 return query 

89 

90 def configure_grid(self, grid): # pylint: disable=empty-docstring 

91 """ """ 

92 g = grid 

93 super().configure_grid(g) 

94 model = self.app.model 

95 

96 # never show these 

97 g.remove("person_uuid", "role_refs", "password") 

98 g.remove_filter("password") 

99 

100 # username 

101 g.set_link("username") 

102 

103 # person 

104 g.set_link("person") 

105 g.set_sorter("person", model.Person.full_name) 

106 g.set_filter("person", model.Person.full_name, label="Person Full Name") 

107 

108 def grid_row_class( # pylint: disable=empty-docstring,unused-argument 

109 self, user, data, i 

110 ): 

111 """ """ 

112 if not user.active: 

113 return "has-background-warning" 

114 return None 

115 

116 def is_editable(self, obj): # pylint: disable=empty-docstring 

117 """ """ 

118 user = obj 

119 

120 # only root can edit certain users 

121 if user.prevent_edit and not self.request.is_root: 

122 return False 

123 

124 return True 

125 

126 def is_deletable(self, obj): # pylint: disable=empty-docstring 

127 """ """ 

128 user = obj 

129 

130 # only root can delete certain users 

131 if user.prevent_edit and not self.request.is_root: 

132 return False 

133 

134 return True 

135 

136 def configure_form(self, form): # pylint: disable=empty-docstring 

137 """ """ 

138 f = form 

139 super().configure_form(f) 

140 user = f.model_instance 

141 

142 # username 

143 f.set_validator("username", self.unique_username) 

144 

145 # person 

146 if self.creating or self.editing: 

147 f.fields.insert_after("person", "first_name") 

148 f.set_required("first_name", False) 

149 f.fields.insert_after("first_name", "last_name") 

150 f.set_required("last_name", False) 

151 f.remove("person") 

152 if self.editing: 

153 person = user.person 

154 if person: 

155 f.set_default("first_name", person.first_name) 

156 f.set_default("last_name", person.last_name) 

157 else: 

158 f.set_node("person", PersonRef(self.request)) 

159 

160 # password 

161 # nb. we must avoid 'password' as field name since 

162 # ColanderAlchemy wants to handle the raw/hashed value 

163 f.remove("password") 

164 # nb. no need for password field if readonly 

165 if self.creating or self.editing: 

166 # nb. use 'set_password' as field name 

167 f.append("set_password") 

168 f.set_required("set_password", False) 

169 f.set_widget("set_password", widgets.CheckedPasswordWidget()) 

170 

171 # roles 

172 f.append("roles") 

173 f.set_node("roles", RoleRefs(self.request)) 

174 if not self.creating: 

175 f.set_default("roles", [role.uuid.hex for role in user.roles]) 

176 

177 # api_tokens 

178 if self.viewing and self.has_perm("manage_api_tokens"): 

179 f.set_grid("api_tokens", self.make_api_tokens_grid(user)) 

180 else: 

181 f.remove("api_tokens") 

182 

183 def unique_username(self, node, value): # pylint: disable=empty-docstring 

184 """ """ 

185 model = self.app.model 

186 session = self.Session() 

187 

188 query = session.query(model.User).filter(model.User.username == value) 

189 

190 if self.editing: 

191 uuid = self.request.matchdict["uuid"] 

192 query = query.filter(model.User.uuid != uuid) 

193 

194 if query.count(): 

195 node.raise_invalid("Username must be unique") 

196 

197 def objectify(self, form): # pylint: disable=empty-docstring 

198 """ """ 

199 auth = self.app.get_auth_handler() 

200 data = form.validated 

201 

202 # normal logic first 

203 user = super().objectify(form) 

204 

205 # maybe update person name 

206 if "first_name" in form and "last_name" in form: 

207 first_name = data.get("first_name") 

208 last_name = data.get("last_name") 

209 if first_name or last_name: 

210 user.person.full_name = self.app.make_full_name(first_name, last_name) 

211 else: 

212 user.person = None 

213 

214 # maybe set user password 

215 if "set_password" in form and data.get("set_password"): 

216 auth.set_user_password(user, data["set_password"]) 

217 

218 # update roles for user 

219 # TODO 

220 # if self.has_perm('edit_roles'): 

221 self.update_roles(user, form) 

222 

223 return user 

224 

225 def update_roles(self, user, form): # pylint: disable=empty-docstring 

226 """ """ 

227 # TODO 

228 # if not self.has_perm('edit_roles'): 

229 # return 

230 data = form.validated 

231 if "roles" not in data: 

232 return 

233 

234 model = self.app.model 

235 session = self.Session() 

236 auth = self.app.get_auth_handler() 

237 

238 old_roles = {role.uuid for role in user.roles} 

239 new_roles = data["roles"] 

240 

241 admin = auth.get_role_administrator(session) 

242 ignored = { 

243 auth.get_role_authenticated(session).uuid, 

244 auth.get_role_anonymous(session).uuid, 

245 } 

246 

247 # add any new roles for the user, taking care to avoid certain 

248 # unwanted operations for built-in roles 

249 for uuid in new_roles: 

250 if uuid in ignored: 

251 continue 

252 if uuid in old_roles: 

253 continue 

254 if uuid == admin.uuid and not self.request.is_root: 

255 continue 

256 role = session.get(model.Role, uuid) 

257 user.roles.append(role) 

258 

259 # remove any roles which were *not* specified, taking care to 

260 # avoid certain unwanted operations for built-in roles 

261 for uuid in old_roles: 

262 if uuid in new_roles: 

263 continue 

264 if uuid == admin.uuid and not self.request.is_root: 

265 continue 

266 role = session.get(model.Role, uuid) 

267 user.roles.remove(role) 

268 

269 def make_api_tokens_grid(self, user): 

270 """ 

271 Make and return the grid for the API Tokens field. 

272 

273 This is only shown when current user has permission to manage 

274 API tokens for other users. 

275 

276 :rtype: :class:`~wuttaweb.grids.base.Grid` 

277 """ 

278 route_prefix = self.get_route_prefix() 

279 

280 grid = self.make_grid( 

281 key=f"{route_prefix}.view.api_tokens", 

282 data=[self.normalize_api_token(t) for t in user.api_tokens], 

283 columns=[ 

284 "description", 

285 "created", 

286 ], 

287 sortable=True, 

288 sort_on_backend=False, 

289 sort_defaults=[("created", "desc")], 

290 ) 

291 

292 if self.has_perm("manage_api_tokens"): 

293 

294 # create token 

295 button = self.make_button( 

296 "New", 

297 primary=True, 

298 icon_left="plus", 

299 **{"@click": "$emit('new-token')"}, 

300 ) 

301 grid.add_tool(button, key="create") 

302 

303 # delete token 

304 grid.add_action( 

305 "delete", 

306 url="#", 

307 icon="trash", 

308 link_class="has-text-danger", 

309 click_handler="$emit('delete-token', props.row)", 

310 ) 

311 

312 return grid 

313 

314 def normalize_api_token(self, token): # pylint: disable=empty-docstring 

315 """ """ 

316 return { 

317 "uuid": token.uuid.hex, 

318 "description": token.description, 

319 "created": self.app.render_datetime(token.created), 

320 } 

321 

322 def add_api_token(self): 

323 """ 

324 AJAX view for adding a new user API token. 

325 

326 This calls 

327 :meth:`wuttjamaican:wuttjamaican.auth.AuthHandler.add_api_token()` 

328 for the creation logic. 

329 """ 

330 session = self.Session() 

331 auth = self.app.get_auth_handler() 

332 user = self.get_instance() 

333 data = self.request.json_body 

334 

335 token = auth.add_api_token(user, data["description"]) 

336 session.flush() 

337 session.refresh(token) 

338 

339 result = self.normalize_api_token(token) 

340 result["token_string"] = token.token_string 

341 result["_action_url_delete"] = "#" 

342 return result 

343 

344 def delete_api_token(self): 

345 """ 

346 AJAX view for deleting a user API token. 

347 

348 This calls 

349 :meth:`wuttjamaican:wuttjamaican.auth.AuthHandler.delete_api_token()` 

350 for the deletion logic. 

351 """ 

352 model = self.app.model 

353 session = self.Session() 

354 auth = self.app.get_auth_handler() 

355 user = self.get_instance() 

356 data = self.request.json_body 

357 

358 token = session.get(model.UserAPIToken, data["uuid"]) 

359 if not token: 

360 return {"error": "API token not found"} 

361 

362 if token.user is not user: 

363 return {"error": "API token not found"} 

364 

365 auth.delete_api_token(token) 

366 return {} 

367 

368 def merge_get_simple_fields(self): # pylint: disable=empty-docstring 

369 """ """ 

370 fields = super().merge_get_simple_fields() 

371 

372 if "password" in fields: 

373 fields.remove("password") 

374 

375 return fields 

376 

377 def merge_get_additive_fields(self): # pylint: disable=empty-docstring 

378 """ """ 

379 fields = super().merge_get_additive_fields() 

380 

381 if self.app.continuum_is_enabled(): 

382 if "transaction_count" not in fields: 

383 fields.append("transaction_count") 

384 

385 return fields 

386 

387 def merge_get_data(self, obj): # pylint: disable=empty-docstring 

388 """ """ 

389 data = super().merge_get_data(obj) 

390 model_class = self.get_model_class() 

391 session = self.Session() 

392 user = obj 

393 

394 data["roles"] = sorted([role.name for role in user.roles]) 

395 

396 if self.app.continuum_is_enabled(): 

397 import sqlalchemy_continuum as continuum # pylint: disable=import-outside-toplevel 

398 

399 txncls = continuum.transaction_class(model_class) 

400 data["transaction_count"] = ( 

401 session.query(txncls).filter(txncls.user == user).count() 

402 ) 

403 

404 return data 

405 

406 def merge_why_not(self, removing, keeping): 

407 """ 

408 This checks to ensure the *current* user is not the same as 

409 the "removing" user. 

410 

411 See also parent method: 

412 :meth:`~wuttaweb.views.master.MasterView.merge_why_not()` 

413 """ 

414 if removing is self.request.user: 

415 return "Cannot remove user who is currently logged in!" 

416 return None 

417 

418 def merge_execute(self, removing, keeping): 

419 """ 

420 The logic to merge 2 users is extended as follows: 

421 

422 The "keeping" user will be assigned to all roles to which the 

423 "removing" user belonged. 

424 

425 Any upgrades created or executed by the "removing" user will 

426 be updated to reference the "keeping" user instead. 

427 

428 Any versioning (SQLAlchemy-Continuum) transactions created by 

429 the "removing" user will be updated to reference the "keeping" 

430 user instead. 

431 

432 See also parent method: 

433 :meth:`~wuttaweb.views.master.MasterView.merge_execute()` 

434 """ 

435 model = self.app.model 

436 session = self.Session() 

437 model_class = self.get_model_class() 

438 

439 # transfer role membership 

440 for role in list(removing.roles): 

441 if role not in keeping.roles: 

442 keeping.roles.append(role) 

443 

444 # reassign upgrade "created by" 

445 stmt = ( 

446 sa.update(model.Upgrade) 

447 .where(model.Upgrade.created_by_uuid == removing.uuid) 

448 .values(created_by_uuid=keeping.uuid) 

449 ) 

450 session.execute(stmt, execution_options={"synchronize_session": False}) 

451 

452 # reassign upgrade "executed by" 

453 stmt = ( 

454 sa.update(model.Upgrade) 

455 .where(model.Upgrade.executed_by_uuid == removing.uuid) 

456 .values(executed_by_uuid=keeping.uuid) 

457 ) 

458 session.execute(stmt, execution_options={"synchronize_session": False}) 

459 

460 # reassign continuum transactions 

461 if self.app.continuum_is_enabled(): 

462 import sqlalchemy_continuum as continuum # pylint: disable=import-outside-toplevel 

463 

464 txncls = continuum.transaction_class(model_class) 

465 stmt = ( 

466 sa.update(txncls) 

467 .where(txncls.user_id == removing.uuid) 

468 .values(user_id=keeping.uuid) 

469 ) 

470 session.execute(stmt, execution_options={"synchronize_session": False}) 

471 

472 # continue default merge 

473 super().merge_execute(removing, keeping) 

474 

475 @classmethod 

476 def defaults(cls, config): # pylint: disable=empty-docstring 

477 """ """ 

478 

479 # nb. User may come from custom model 

480 wutta_config = config.registry.settings["wutta_config"] 

481 app = wutta_config.get_app() 

482 cls.model_class = app.model.User 

483 

484 cls._user_defaults(config) 

485 cls._defaults(config) 

486 

487 @classmethod 

488 def _user_defaults(cls, config): 

489 """ 

490 Provide extra default configuration for the User master view. 

491 """ 

492 route_prefix = cls.get_route_prefix() 

493 permission_prefix = cls.get_permission_prefix() 

494 instance_url_prefix = cls.get_instance_url_prefix() 

495 model_title = cls.get_model_title() 

496 

497 # manage API tokens 

498 config.add_wutta_permission( 

499 permission_prefix, 

500 f"{permission_prefix}.manage_api_tokens", 

501 f"Manage API tokens for any {model_title}", 

502 ) 

503 config.add_route( 

504 f"{route_prefix}.add_api_token", 

505 f"{instance_url_prefix}/add-api-token", 

506 request_method="POST", 

507 ) 

508 config.add_view( 

509 cls, 

510 attr="add_api_token", 

511 route_name=f"{route_prefix}.add_api_token", 

512 permission=f"{permission_prefix}.manage_api_tokens", 

513 renderer="json", 

514 ) 

515 config.add_route( 

516 f"{route_prefix}.delete_api_token", 

517 f"{instance_url_prefix}/delete-api-token", 

518 request_method="POST", 

519 ) 

520 config.add_view( 

521 cls, 

522 attr="delete_api_token", 

523 route_name=f"{route_prefix}.delete_api_token", 

524 permission=f"{permission_prefix}.manage_api_tokens", 

525 renderer="json", 

526 ) 

527 

528 

529def defaults(config, **kwargs): # pylint: disable=missing-function-docstring 

530 base = globals() 

531 

532 UserView = kwargs.get( # pylint: disable=invalid-name,redefined-outer-name 

533 "UserView", base["UserView"] 

534 ) 

535 UserView.defaults(config) 

536 

537 

538def includeme(config): # pylint: disable=missing-function-docstring 

539 defaults(config)