Coverage for .tox / coverage / lib / python3.11 / site-packages / wuttaweb / views / alembic.py: 100%

208 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-28 15:23 -0600

1# -*- coding: utf-8; -*- 

2################################################################################ 

3# 

4# wuttaweb -- Web App for Wutta Framework 

5# Copyright © 2024-2025 Lance Edgar 

6# 

7# This file is part of Wutta Framework. 

8# 

9# Wutta Framework is free software: you can redistribute it and/or modify it 

10# under the terms of the GNU General Public License as published by the Free 

11# Software Foundation, either version 3 of the License, or (at your option) any 

12# later version. 

13# 

14# Wutta Framework is distributed in the hope that it will be useful, but 

15# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 

16# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 

17# more details. 

18# 

19# You should have received a copy of the GNU General Public License along with 

20# Wutta Framework. If not, see <http://www.gnu.org/licenses/>. 

21# 

22################################################################################ 

23""" 

24Views for Alembic 

25""" 

26 

27import datetime 

28import logging 

29import os 

30import re 

31 

32from alembic import command as alembic_command 

33from alembic.migration import MigrationContext 

34from alembic.util import CommandError 

35 

36from wuttjamaican.db.conf import ( 

37 make_alembic_config, 

38 get_alembic_scriptdir, 

39 check_alembic_current, 

40) 

41 

42import colander 

43from webhelpers2.html import tags, HTML 

44 

45from wuttaweb.views import View, MasterView 

46from wuttaweb.forms import widgets 

47 

48 

49log = logging.getLogger(__name__) 

50 

51 

52def normalize_revision(config, rev): # pylint: disable=missing-function-docstring 

53 app = config.get_app() 

54 

55 created = None 

56 if match := re.search(r"Create Date: (\d{4}-\d{2}-\d{2}[\d:\. ]+\d)", rev.longdoc): 

57 created = datetime.datetime.fromisoformat(match.group(1)) 

58 created = app.localtime(created, from_utc=False) 

59 created = app.render_datetime(created) 

60 

61 return { 

62 "revision": rev.revision, 

63 "branch_labels": ", ".join(rev.branch_labels), 

64 "doc": rev.doc, 

65 "longdoc": rev.longdoc, 

66 "path": rev.path, 

67 "dependencies": rev.dependencies or "", 

68 "down_revision": rev.down_revision or "", 

69 "nextrev": ", ".join(rev.nextrev), 

70 "is_base": app.render_boolean(rev.is_base), 

71 "is_branch_point": app.render_boolean(rev.is_branch_point), 

72 "is_head": rev.is_head, 

73 "created": created, 

74 } 

75 

76 

77class AlembicDashboardView(View): 

78 """ 

79 Custom views for the Alembic Dashboard. 

80 """ 

81 

82 def dashboard(self): 

83 """ 

84 Main view for the Alembic Dashboard. 

85 

86 Route name is ``alembic.dashboard``; URL is 

87 ``/alembic/dashboard`` 

88 """ 

89 script = get_alembic_scriptdir(self.config) 

90 with self.config.appdb_engine.begin() as conn: 

91 context = MigrationContext.configure(conn) 

92 current_heads = context.get_current_heads() 

93 

94 def normalize(rev): 

95 normal = normalize_revision(self.config, rev) 

96 normal["is_current"] = rev.revision in current_heads 

97 

98 normal["revision"] = tags.link_to( 

99 normal["revision"], 

100 self.request.route_url( 

101 "alembic.migrations.view", revision=normal["revision"] 

102 ), 

103 ) 

104 

105 if normal["down_revision"]: 

106 normal["down_revision"] = tags.link_to( 

107 normal["down_revision"], 

108 self.request.route_url( 

109 "alembic.migrations.view", revision=normal["down_revision"] 

110 ), 

111 ) 

112 

113 return normal 

114 

115 script_heads = [] 

116 for head in script.get_heads(): 

117 rev = script.get_revision(head) 

118 script_heads.append(normalize(rev)) 

119 

120 db_heads = [] 

121 for head in current_heads: 

122 rev = script.get_revision(head) 

123 db_heads.append(normalize(rev)) 

124 

125 script_heads.sort(key=lambda rev: rev["branch_labels"]) 

126 db_heads.sort(key=lambda rev: rev["branch_labels"]) 

127 

128 return { 

129 "index_title": "Alembic Dashboard", 

130 "script": { 

131 "dir": script.dir, 

132 "version_locations": sorted(script.version_locations), 

133 "env_py_location": script.env_py_location, 

134 "file_template": script.file_template, 

135 }, 

136 "script_heads": script_heads, 

137 "db_heads": db_heads, 

138 } 

139 

140 def migrate(self): 

141 """ 

142 Action view to migrate the database. POST request must be used. 

143 

144 This directly invokes the :func:`alembic upgrade 

145 <alembic:alembic.command.upgrade>` (or :func:`alembic 

146 downgrade <alembic:alembic.command.downgrade>`) command. 

147 

148 It then sets a flash message per the command status, and 

149 redirects user back to the Dashboard (or other referrer). 

150 

151 The request must specify a ``revspec`` param, which we pass 

152 along as-is to the ``alembic`` command. We assume ``alembic 

153 upgrade`` unless the request sets ``direction`` param to 

154 ``"downgrade"``. 

155 """ 

156 referrer = self.request.get_referrer( 

157 default=self.request.route_url("alembic.dashboard") 

158 ) 

159 if self.request.method != "POST": 

160 return self.redirect(referrer) 

161 

162 revspec = self.request.POST.get("revspec") 

163 if not revspec: 

164 self.request.session.flash("You must provide a target revspec.", "error") 

165 

166 else: 

167 direction = self.request.POST.get("direction") 

168 if direction != "downgrade": 

169 direction = "upgrade" 

170 alembic = make_alembic_config(self.config) 

171 command = ( 

172 alembic_command.downgrade 

173 if direction == "downgrade" 

174 else alembic_command.upgrade 

175 ) 

176 

177 # invoke alembic upgrade/downgrade 

178 try: 

179 command(alembic, revspec) 

180 except Exception as err: # pylint: disable=broad-exception-caught 

181 log.exception( 

182 "database failed to %s using revspec: %s", direction, revspec 

183 ) 

184 self.request.session.flash(f"Migrate failed: {err}", "error") 

185 else: 

186 self.request.session.flash("Database has been migrated.") 

187 

188 return self.redirect(referrer) 

189 

190 @classmethod 

191 def defaults(cls, config): # pylint: disable=empty-docstring 

192 """ """ 

193 cls._defaults(config) 

194 

195 @classmethod 

196 def _defaults(cls, config): 

197 

198 # permission group 

199 config.add_wutta_permission_group( 

200 "alembic", "Alembic (General)", overwrite=False 

201 ) 

202 

203 # dashboard 

204 config.add_wutta_permission( 

205 "alembic", 

206 "alembic.dashboard", 

207 "Basic (view) access to the Alembic Dashboard", 

208 ) 

209 config.add_route("alembic.dashboard", "/alembic/dashboard") 

210 config.add_view( 

211 cls, 

212 attr="dashboard", 

213 route_name="alembic.dashboard", 

214 renderer="/alembic/dashboard.mako", 

215 permission="alembic.dashboard", 

216 ) 

217 

218 # migrate 

219 config.add_wutta_permission( 

220 "alembic", 

221 "alembic.migrate", 

222 "Run migration scripts on the database", 

223 ) 

224 config.add_route("alembic.migrate", "/alembic/migrate") 

225 config.add_view( 

226 cls, 

227 attr="migrate", 

228 route_name="alembic.migrate", 

229 permission="alembic.migrate", 

230 ) 

231 

232 

233class AlembicMigrationView(MasterView): # pylint: disable=abstract-method 

234 """ 

235 Master view for Alembic Migrations. 

236 

237 Route prefix is ``alembic.migrations``; notable URLs include: 

238 

239 * ``/alembic/migrations/`` 

240 * ``/alembic/migrations/new`` 

241 * ``/alembic/migrations/XXX`` 

242 """ 

243 

244 # pylint: disable=duplicate-code 

245 model_name = "alembic_migration" 

246 model_key = "revision" 

247 model_title = "Alembic Migration" 

248 route_prefix = "alembic.migrations" 

249 url_prefix = "/alembic/migrations" 

250 filterable = False 

251 sortable = True 

252 sort_on_backend = False 

253 paginated = True 

254 paginate_on_backend = False 

255 editable = False 

256 configurable = True 

257 # pylint: enable=duplicate-code 

258 

259 labels = { 

260 "doc": "Description", 

261 "longdoc": "Long Description", 

262 "nextrev": "Next Revision", 

263 } 

264 

265 grid_columns = [ 

266 "is_head", 

267 "revision", 

268 "doc", 

269 "branch_labels", 

270 "down_revision", 

271 "created", 

272 ] 

273 

274 sort_defaults = ("is_head", "desc") 

275 

276 form_fields = [ 

277 "revision", 

278 "doc", 

279 "longdoc", 

280 "branch_labels", 

281 "dependencies", 

282 "down_revision", 

283 "nextrev", 

284 "is_base", 

285 "is_branch_point", 

286 "is_head", 

287 "path", 

288 "created", 

289 ] 

290 

291 def get_grid_data( # pylint: disable=empty-docstring 

292 self, columns=None, session=None 

293 ): 

294 """ """ 

295 data = [] 

296 script = get_alembic_scriptdir(self.config) 

297 for rev in script.walk_revisions(): 

298 data.append(normalize_revision(self.config, rev)) 

299 return data 

300 

301 def configure_grid(self, grid): # pylint: disable=empty-docstring 

302 """ """ 

303 g = grid 

304 super().configure_grid(g) 

305 

306 # revision 

307 g.set_link("revision") 

308 g.set_searchable("revision") 

309 

310 # doc 

311 g.set_link("doc") 

312 g.set_searchable("doc") 

313 

314 # branch_labels 

315 g.set_searchable("branch_labels") 

316 

317 # is_head 

318 g.set_label("is_head", "Head") 

319 g.set_renderer("is_head", self.render_is_head) 

320 

321 def render_is_head( # pylint: disable=missing-function-docstring,unused-argument 

322 self, rev, field, value 

323 ): 

324 return self.app.render_boolean(value) if value else "" 

325 

326 def get_instance( 

327 self, **kwargs 

328 ): # pylint: disable=empty-docstring,arguments-differ,unused-argument 

329 """ """ 

330 if "_cached_instance" not in self.__dict__: 

331 revision = self.request.matchdict["revision"] 

332 script = get_alembic_scriptdir(self.config) 

333 try: 

334 rev = script.get_revision(revision) 

335 except CommandError: 

336 rev = None 

337 if not rev: 

338 raise self.notfound() 

339 self.__dict__["_cached_instance"] = normalize_revision(self.config, rev) 

340 return self.__dict__["_cached_instance"] 

341 

342 def get_instance_title(self, instance): # pylint: disable=empty-docstring 

343 """ """ 

344 text = f"({instance['branch_labels']}) {instance['doc']}" 

345 if instance.get("is_head"): 

346 text += " [head]" 

347 return text 

348 

349 def configure_form(self, form): # pylint: disable=empty-docstring 

350 """ """ 

351 f = form 

352 super().configure_form(f) 

353 

354 # revision 

355 f.set_widget("revision", widgets.CopyableTextWidget()) 

356 

357 # longdoc 

358 f.set_widget("longdoc", "notes") 

359 

360 # down_revision 

361 f.set_widget("down_revision", widgets.AlembicRevisionWidget(self.request)) 

362 

363 # nextrev 

364 f.set_widget("nextrev", widgets.AlembicRevisionsWidget(self.request)) 

365 

366 # is_head 

367 f.set_node("is_head", colander.Boolean()) 

368 

369 # path 

370 f.set_widget("path", widgets.CopyableTextWidget()) 

371 

372 def make_create_form(self): # pylint: disable=empty-docstring 

373 """ """ 

374 alembic = make_alembic_config(self.config) 

375 script = get_alembic_scriptdir(self.config, alembic) 

376 

377 schema = colander.Schema() 

378 schema.add(colander.SchemaNode(colander.String(), name="description")) 

379 

380 schema.add( 

381 colander.SchemaNode( 

382 colander.Boolean(), 

383 name="autogenerate", 

384 default=check_alembic_current(self.config, alembic), 

385 ) 

386 ) 

387 

388 schema.add( 

389 colander.SchemaNode( 

390 colander.String(), name="branching_option", default="revise" 

391 ) 

392 ) 

393 

394 branch_options = self.get_revise_branch_options(script) 

395 

396 revise_branch = colander.SchemaNode( 

397 colander.String(), 

398 name="revise_branch", 

399 missing=colander.null, 

400 validator=colander.OneOf(branch_options), 

401 widget=widgets.SelectWidget(values=[(b, b) for b in branch_options]), 

402 ) 

403 

404 branch = self.config.get(f"{self.config.appname}.alembic.default_revise_branch") 

405 if not branch and len(branch_options) == 1: 

406 branch = branch_options[0] 

407 if branch: 

408 revise_branch.default = branch 

409 

410 schema.add(revise_branch) 

411 

412 schema.add( 

413 colander.SchemaNode( 

414 colander.String(), name="new_branch", missing=colander.null 

415 ) 

416 ) 

417 

418 version_locations = sorted( 

419 self.config.parse_list(alembic.get_main_option("version_locations")) 

420 ) 

421 

422 schema.add( 

423 colander.SchemaNode( 

424 colander.String(), 

425 name="version_location", 

426 missing=colander.null, 

427 validator=colander.OneOf(version_locations), 

428 widget=widgets.SelectWidget(values=[(v, v) for v in version_locations]), 

429 ) 

430 ) 

431 

432 schema.validator = colander.All( 

433 self.validate_revise_branch, self.validate_new_branch 

434 ) 

435 

436 form = self.make_form( 

437 schema=schema, 

438 cancel_url_fallback=self.get_index_url(), 

439 button_label_submit="Write Script File", 

440 ) 

441 

442 form.set_label("revise_branch", "Branch") 

443 

444 return form 

445 

446 def validate_revise_branch( # pylint: disable=missing-function-docstring 

447 self, node, value 

448 ): 

449 if value["branching_option"] == "revise": 

450 if not value["revise_branch"]: 

451 node["revise_branch"].raise_invalid( 

452 "Must specify which branch to revise." 

453 ) 

454 

455 def validate_new_branch( # pylint: disable=missing-function-docstring 

456 self, node, value 

457 ): 

458 if value["branching_option"] == "new": 

459 

460 if not value["new_branch"]: 

461 node["new_branch"].raise_invalid("New branch requires a name.") 

462 

463 if not value["version_location"]: 

464 node["version_location"].raise_invalid( 

465 "New branch requires a version location." 

466 ) 

467 

468 def save_create_form(self, form): # pylint: disable=empty-docstring 

469 """ """ 

470 alembic = make_alembic_config(self.config) 

471 data = form.validated 

472 

473 # kwargs for `alembic revision` command 

474 kw = { 

475 "message": data["description"], 

476 "autogenerate": data["autogenerate"], 

477 } 

478 if data["branching_option"] == "new": 

479 kw["head"] = "base" 

480 kw["branch_label"] = data["new_branch"] 

481 kw["version_path"] = self.app.resource_path(data["version_location"]) 

482 else: 

483 assert data["branching_option"] == "revise" 

484 kw["head"] = f"{data['revise_branch']}@head" 

485 

486 # run `alembic revision` 

487 revision = alembic_command.revision(alembic, **kw) 

488 

489 intro = HTML.tag( 

490 "p", 

491 class_="block", 

492 c="New migration script has been created. " 

493 "Please review and modify the file contents as needed:", 

494 ) 

495 

496 path = HTML.tag( 

497 "p", 

498 class_="block has-background-white has-text-black is-family-monospace", 

499 style="padding: 0.5rem;", 

500 c=[HTML.tag("wutta-copyable-text", text=revision.path)], 

501 ) 

502 

503 outro = HTML.tag( 

504 "p", 

505 class_="block", 

506 c=[ 

507 "When satisfied, proceed to ", 

508 tags.link_to( 

509 "Migrate Database", self.request.route_url("alembic.dashboard") 

510 ), 

511 ".", 

512 ], 

513 ) 

514 

515 self.request.session.flash(HTML.tag("div", c=[intro, path, outro])) 

516 return revision 

517 

518 def save_delete_form(self, form): # pylint: disable=empty-docstring 

519 """ """ 

520 rev = self.get_instance() 

521 os.remove(rev["path"]) 

522 

523 # TODO: this is effectivey duplicated in AppTableView.get_migration_branch_options() 

524 def get_revise_branch_options( # pylint: disable=missing-function-docstring 

525 self, script 

526 ): 

527 branches = set() 

528 for rev in script.get_revisions(script.get_heads()): 

529 branches.update(rev.branch_labels) 

530 return sorted(branches) 

531 

532 def configure_get_simple_settings(self): # pylint: disable=empty-docstring 

533 """ """ 

534 return [ 

535 {"name": f"{self.config.appname}.alembic.default_revise_branch"}, 

536 ] 

537 

538 def configure_get_context( # pylint: disable=empty-docstring,arguments-differ 

539 self, **kwargs 

540 ): 

541 """ """ 

542 context = super().configure_get_context(**kwargs) 

543 

544 script = get_alembic_scriptdir(self.config) 

545 context["revise_branch_options"] = self.get_revise_branch_options(script) 

546 

547 return context 

548 

549 

550def defaults(config, **kwargs): # pylint: disable=missing-function-docstring 

551 base = globals() 

552 

553 AlembicDashboardView = ( # pylint: disable=invalid-name,redefined-outer-name 

554 kwargs.get("AlembicDashboardView", base["AlembicDashboardView"]) 

555 ) 

556 AlembicDashboardView.defaults(config) 

557 

558 AlembicMigrationView = ( # pylint: disable=invalid-name,redefined-outer-name 

559 kwargs.get("AlembicMigrationView", base["AlembicMigrationView"]) 

560 ) 

561 AlembicMigrationView.defaults(config) 

562 

563 

564def includeme(config): # pylint: disable=missing-function-docstring 

565 defaults(config)