Coverage for .tox / coverage / lib / python3.11 / site-packages / wuttaweb / views / alembic.py: 100%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-15 10:16 -0500

1# -*- coding: utf-8; -*- 

2################################################################################ 

3# 

4# wuttaweb -- Web App for Wutta Framework 

5# Copyright © 2024-2025 Lance Edgar 

6# 

7# This file is part of Wutta Framework. 

8# 

9# Wutta Framework is free software: you can redistribute it and/or modify it 

10# under the terms of the GNU General Public License as published by the Free 

11# Software Foundation, either version 3 of the License, or (at your option) any 

12# later version. 

13# 

14# Wutta Framework is distributed in the hope that it will be useful, but 

15# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 

16# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 

17# more details. 

18# 

19# You should have received a copy of the GNU General Public License along with 

20# Wutta Framework. If not, see <http://www.gnu.org/licenses/>. 

21# 

22################################################################################ 

23""" 

24Views for Alembic 

25""" 

26 

27import datetime 

28import logging 

29import os 

30import re 

31 

32from alembic import command as alembic_command 

33from alembic.migration import MigrationContext 

34from alembic.util import CommandError 

35 

36from wuttjamaican.db.conf import ( 

37 make_alembic_config, 

38 get_alembic_scriptdir, 

39 check_alembic_current, 

40) 

41 

42import colander 

43from webhelpers2.html import tags, HTML 

44 

45from wuttaweb.views import View, MasterView 

46from wuttaweb.forms import widgets 

47from wuttaweb.forms.schema import WuttaDateTime 

48 

49 

50log = logging.getLogger(__name__) 

51 

52 

53def normalize_revision( 

54 config, rev, json_safe=False 

55): # pylint: disable=missing-function-docstring 

56 app = config.get_app() 

57 

58 created = None 

59 if match := re.search(r"Create Date: (\d{4}-\d{2}-\d{2}[\d:\. ]+\d)", rev.longdoc): 

60 created = datetime.datetime.fromisoformat(match.group(1)) 

61 created = app.localtime(created, from_utc=False) 

62 if json_safe: 

63 created = app.render_datetime(created) 

64 

65 return { 

66 "revision": rev.revision, 

67 "branch_labels": ", ".join(rev.branch_labels), 

68 "doc": rev.doc, 

69 "longdoc": rev.longdoc, 

70 "path": rev.path, 

71 "dependencies": rev.dependencies or "", 

72 "down_revision": rev.down_revision or "", 

73 "nextrev": ", ".join(rev.nextrev), 

74 "is_base": app.render_boolean(rev.is_base), 

75 "is_branch_point": app.render_boolean(rev.is_branch_point), 

76 "is_head": rev.is_head, 

77 "created": created, 

78 } 

79 

80 

81class AlembicDashboardView(View): 

82 """ 

83 Custom views for the Alembic Dashboard. 

84 """ 

85 

86 def dashboard(self): 

87 """ 

88 Main view for the Alembic Dashboard. 

89 

90 Route name is ``alembic.dashboard``; URL is 

91 ``/alembic/dashboard`` 

92 """ 

93 script = get_alembic_scriptdir(self.config) 

94 with self.config.appdb_engine.begin() as conn: 

95 context = MigrationContext.configure(conn) 

96 current_heads = context.get_current_heads() 

97 

98 def normalize(rev): 

99 normal = normalize_revision(self.config, rev, json_safe=True) 

100 normal["is_current"] = rev.revision in current_heads 

101 

102 normal["revision"] = tags.link_to( 

103 normal["revision"], 

104 self.request.route_url( 

105 "alembic.migrations.view", revision=normal["revision"] 

106 ), 

107 ) 

108 

109 if normal["down_revision"]: 

110 normal["down_revision"] = tags.link_to( 

111 normal["down_revision"], 

112 self.request.route_url( 

113 "alembic.migrations.view", revision=normal["down_revision"] 

114 ), 

115 ) 

116 

117 return normal 

118 

119 script_heads = [] 

120 for head in script.get_heads(): 

121 rev = script.get_revision(head) 

122 script_heads.append(normalize(rev)) 

123 

124 db_heads = [] 

125 for head in current_heads: 

126 rev = script.get_revision(head) 

127 db_heads.append(normalize(rev)) 

128 

129 script_heads.sort(key=lambda rev: rev["branch_labels"]) 

130 db_heads.sort(key=lambda rev: rev["branch_labels"]) 

131 

132 return { 

133 "index_title": "Alembic Dashboard", 

134 "script": { 

135 "dir": script.dir, 

136 "version_locations": sorted(script.version_locations), 

137 "env_py_location": script.env_py_location, 

138 "file_template": script.file_template, 

139 }, 

140 "script_heads": script_heads, 

141 "db_heads": db_heads, 

142 } 

143 

144 def migrate(self): 

145 """ 

146 Action view to migrate the database. POST request must be used. 

147 

148 This directly invokes the :func:`alembic upgrade 

149 <alembic:alembic.command.upgrade>` (or :func:`alembic 

150 downgrade <alembic:alembic.command.downgrade>`) command. 

151 

152 It then sets a flash message per the command status, and 

153 redirects user back to the Dashboard (or other referrer). 

154 

155 The request must specify a ``revspec`` param, which we pass 

156 along as-is to the ``alembic`` command. We assume ``alembic 

157 upgrade`` unless the request sets ``direction`` param to 

158 ``"downgrade"``. 

159 """ 

160 referrer = self.request.get_referrer( 

161 default=self.request.route_url("alembic.dashboard") 

162 ) 

163 if self.request.method != "POST": 

164 return self.redirect(referrer) 

165 

166 revspec = self.request.POST.get("revspec") 

167 if not revspec: 

168 self.request.session.flash("You must provide a target revspec.", "error") 

169 

170 else: 

171 direction = self.request.POST.get("direction") 

172 if direction != "downgrade": 

173 direction = "upgrade" 

174 alembic = make_alembic_config(self.config) 

175 command = ( 

176 alembic_command.downgrade 

177 if direction == "downgrade" 

178 else alembic_command.upgrade 

179 ) 

180 

181 # invoke alembic upgrade/downgrade 

182 try: 

183 command(alembic, revspec) 

184 except Exception as err: # pylint: disable=broad-exception-caught 

185 log.exception( 

186 "database failed to %s using revspec: %s", direction, revspec 

187 ) 

188 self.request.session.flash(f"Migrate failed: {err}", "error") 

189 else: 

190 self.request.session.flash("Database has been migrated.") 

191 

192 return self.redirect(referrer) 

193 

194 @classmethod 

195 def defaults(cls, config): # pylint: disable=empty-docstring 

196 """ """ 

197 cls._defaults(config) 

198 

199 @classmethod 

200 def _defaults(cls, config): 

201 

202 # permission group 

203 config.add_wutta_permission_group( 

204 "alembic", "Alembic (General)", overwrite=False 

205 ) 

206 

207 # dashboard 

208 config.add_wutta_permission( 

209 "alembic", 

210 "alembic.dashboard", 

211 "Basic (view) access to the Alembic Dashboard", 

212 ) 

213 config.add_route("alembic.dashboard", "/alembic/dashboard") 

214 config.add_view( 

215 cls, 

216 attr="dashboard", 

217 route_name="alembic.dashboard", 

218 renderer="/alembic/dashboard.mako", 

219 permission="alembic.dashboard", 

220 ) 

221 

222 # migrate 

223 config.add_wutta_permission( 

224 "alembic", 

225 "alembic.migrate", 

226 "Run migration scripts on the database", 

227 ) 

228 config.add_route("alembic.migrate", "/alembic/migrate") 

229 config.add_view( 

230 cls, 

231 attr="migrate", 

232 route_name="alembic.migrate", 

233 permission="alembic.migrate", 

234 ) 

235 

236 

237class AlembicMigrationView(MasterView): # pylint: disable=abstract-method 

238 """ 

239 Master view for Alembic Migrations. 

240 

241 Route prefix is ``alembic.migrations``; notable URLs include: 

242 

243 * ``/alembic/migrations/`` 

244 * ``/alembic/migrations/new`` 

245 * ``/alembic/migrations/XXX`` 

246 """ 

247 

248 # pylint: disable=duplicate-code 

249 model_name = "alembic_migration" 

250 model_key = "revision" 

251 model_title = "Alembic Migration" 

252 route_prefix = "alembic.migrations" 

253 url_prefix = "/alembic/migrations" 

254 filterable = False 

255 sortable = True 

256 sort_on_backend = False 

257 paginated = True 

258 paginate_on_backend = False 

259 editable = False 

260 configurable = True 

261 # pylint: enable=duplicate-code 

262 

263 labels = { 

264 "doc": "Description", 

265 "longdoc": "Long Description", 

266 "nextrev": "Next Revision", 

267 } 

268 

269 grid_columns = [ 

270 "is_head", 

271 "revision", 

272 "doc", 

273 "branch_labels", 

274 "down_revision", 

275 "created", 

276 ] 

277 

278 sort_defaults = ("is_head", "desc") 

279 

280 form_fields = [ 

281 "revision", 

282 "doc", 

283 "longdoc", 

284 "branch_labels", 

285 "dependencies", 

286 "down_revision", 

287 "nextrev", 

288 "is_base", 

289 "is_branch_point", 

290 "is_head", 

291 "path", 

292 "created", 

293 ] 

294 

295 def get_grid_data( # pylint: disable=empty-docstring 

296 self, columns=None, session=None 

297 ): 

298 """ """ 

299 data = [] 

300 script = get_alembic_scriptdir(self.config) 

301 for rev in script.walk_revisions(): 

302 data.append(normalize_revision(self.config, rev)) 

303 return data 

304 

305 def configure_grid(self, grid): # pylint: disable=empty-docstring 

306 """ """ 

307 g = grid 

308 super().configure_grid(g) 

309 

310 # revision 

311 g.set_link("revision") 

312 g.set_searchable("revision") 

313 

314 # doc 

315 g.set_link("doc") 

316 g.set_searchable("doc") 

317 

318 # branch_labels 

319 g.set_searchable("branch_labels") 

320 

321 # is_head 

322 g.set_label("is_head", "Head") 

323 g.set_renderer("is_head", self.render_is_head) 

324 

325 # created 

326 g.set_renderer("created", "datetime") 

327 

328 def render_is_head( # pylint: disable=missing-function-docstring,unused-argument 

329 self, rev, field, value 

330 ): 

331 return self.app.render_boolean(value) if value else "" 

332 

333 def get_instance( 

334 self, **kwargs 

335 ): # pylint: disable=empty-docstring,arguments-differ,unused-argument 

336 """ """ 

337 if "_cached_instance" not in self.__dict__: 

338 revision = self.request.matchdict["revision"] 

339 script = get_alembic_scriptdir(self.config) 

340 try: 

341 rev = script.get_revision(revision) 

342 except CommandError: 

343 rev = None 

344 if not rev: 

345 raise self.notfound() 

346 self.__dict__["_cached_instance"] = normalize_revision(self.config, rev) 

347 return self.__dict__["_cached_instance"] 

348 

349 def get_instance_title(self, instance): # pylint: disable=empty-docstring 

350 """ """ 

351 text = f"({instance['branch_labels']}) {instance['doc']}" 

352 if instance.get("is_head"): 

353 text += " [head]" 

354 return text 

355 

356 def configure_form(self, form): # pylint: disable=empty-docstring 

357 """ """ 

358 f = form 

359 super().configure_form(f) 

360 

361 # revision 

362 f.set_widget("revision", widgets.CopyableTextWidget()) 

363 

364 # longdoc 

365 f.set_widget("longdoc", "notes") 

366 

367 # down_revision 

368 f.set_widget("down_revision", widgets.AlembicRevisionWidget(self.request)) 

369 

370 # nextrev 

371 f.set_widget("nextrev", widgets.AlembicRevisionsWidget(self.request)) 

372 

373 # is_head 

374 f.set_node("is_head", colander.Boolean()) 

375 

376 # path 

377 f.set_widget("path", widgets.CopyableTextWidget()) 

378 

379 # created 

380 f.set_node("created", WuttaDateTime()) 

381 f.set_widget("created", widgets.WuttaDateTimeWidget(self.request)) 

382 

383 def make_create_form(self): # pylint: disable=empty-docstring 

384 """ """ 

385 alembic = make_alembic_config(self.config) 

386 script = get_alembic_scriptdir(self.config, alembic) 

387 

388 schema = colander.Schema() 

389 schema.add(colander.SchemaNode(colander.String(), name="description")) 

390 

391 schema.add( 

392 colander.SchemaNode( 

393 colander.Boolean(), 

394 name="autogenerate", 

395 default=check_alembic_current(self.config, alembic), 

396 ) 

397 ) 

398 

399 schema.add( 

400 colander.SchemaNode( 

401 colander.String(), name="branching_option", default="revise" 

402 ) 

403 ) 

404 

405 branch_options = self.get_revise_branch_options(script) 

406 

407 revise_branch = colander.SchemaNode( 

408 colander.String(), 

409 name="revise_branch", 

410 missing=colander.null, 

411 validator=colander.OneOf(branch_options), 

412 widget=widgets.SelectWidget(values=[(b, b) for b in branch_options]), 

413 ) 

414 

415 branch = self.config.get(f"{self.config.appname}.alembic.default_revise_branch") 

416 if not branch and len(branch_options) == 1: 

417 branch = branch_options[0] 

418 if branch: 

419 revise_branch.default = branch 

420 

421 schema.add(revise_branch) 

422 

423 schema.add( 

424 colander.SchemaNode( 

425 colander.String(), name="new_branch", missing=colander.null 

426 ) 

427 ) 

428 

429 version_locations = sorted( 

430 self.config.parse_list(alembic.get_main_option("version_locations")) 

431 ) 

432 

433 schema.add( 

434 colander.SchemaNode( 

435 colander.String(), 

436 name="version_location", 

437 missing=colander.null, 

438 validator=colander.OneOf(version_locations), 

439 widget=widgets.SelectWidget(values=[(v, v) for v in version_locations]), 

440 ) 

441 ) 

442 

443 schema.validator = colander.All( 

444 self.validate_revise_branch, self.validate_new_branch 

445 ) 

446 

447 form = self.make_form( 

448 schema=schema, 

449 cancel_url_fallback=self.get_index_url(), 

450 button_label_submit="Write Script File", 

451 ) 

452 

453 form.set_label("revise_branch", "Branch") 

454 

455 return form 

456 

457 def validate_revise_branch( # pylint: disable=missing-function-docstring 

458 self, node, value 

459 ): 

460 if value["branching_option"] == "revise": 

461 if not value["revise_branch"]: 

462 node["revise_branch"].raise_invalid( 

463 "Must specify which branch to revise." 

464 ) 

465 

466 def validate_new_branch( # pylint: disable=missing-function-docstring 

467 self, node, value 

468 ): 

469 if value["branching_option"] == "new": 

470 

471 if not value["new_branch"]: 

472 node["new_branch"].raise_invalid("New branch requires a name.") 

473 

474 if not value["version_location"]: 

475 node["version_location"].raise_invalid( 

476 "New branch requires a version location." 

477 ) 

478 

479 def save_create_form(self, form): # pylint: disable=empty-docstring 

480 """ """ 

481 alembic = make_alembic_config(self.config) 

482 data = form.validated 

483 

484 # kwargs for `alembic revision` command 

485 kw = { 

486 "message": data["description"], 

487 "autogenerate": data["autogenerate"], 

488 } 

489 if data["branching_option"] == "new": 

490 kw["head"] = "base" 

491 kw["branch_label"] = data["new_branch"] 

492 kw["version_path"] = self.app.resource_path(data["version_location"]) 

493 else: 

494 assert data["branching_option"] == "revise" 

495 kw["head"] = f"{data['revise_branch']}@head" 

496 

497 # run `alembic revision` 

498 revision = alembic_command.revision(alembic, **kw) 

499 

500 intro = HTML.tag( 

501 "p", 

502 class_="block", 

503 c="New migration script has been created. " 

504 "Please review and modify the file contents as needed:", 

505 ) 

506 

507 path = HTML.tag( 

508 "p", 

509 class_="block has-background-white has-text-black is-family-monospace", 

510 style="padding: 0.5rem;", 

511 c=[HTML.tag("wutta-copyable-text", text=revision.path)], 

512 ) 

513 

514 outro = HTML.tag( 

515 "p", 

516 class_="block", 

517 c=[ 

518 "When satisfied, proceed to ", 

519 tags.link_to( 

520 "Migrate Database", self.request.route_url("alembic.dashboard") 

521 ), 

522 ".", 

523 ], 

524 ) 

525 

526 self.request.session.flash(HTML.tag("div", c=[intro, path, outro])) 

527 return revision 

528 

529 def save_delete_form(self, form): # pylint: disable=empty-docstring 

530 """ """ 

531 rev = self.get_instance() 

532 os.remove(rev["path"]) 

533 

534 # TODO: this is effectivey duplicated in AppTableView.get_migration_branch_options() 

535 def get_revise_branch_options( # pylint: disable=missing-function-docstring 

536 self, script 

537 ): 

538 branches = set() 

539 for rev in script.get_revisions(script.get_heads()): 

540 branches.update(rev.branch_labels) 

541 return sorted(branches) 

542 

543 def configure_get_simple_settings(self): # pylint: disable=empty-docstring 

544 """ """ 

545 return [ 

546 {"name": f"{self.config.appname}.alembic.default_revise_branch"}, 

547 ] 

548 

549 def configure_get_context( # pylint: disable=empty-docstring,arguments-differ 

550 self, **kwargs 

551 ): 

552 """ """ 

553 context = super().configure_get_context(**kwargs) 

554 

555 script = get_alembic_scriptdir(self.config) 

556 context["revise_branch_options"] = self.get_revise_branch_options(script) 

557 

558 return context 

559 

560 

561def defaults(config, **kwargs): # pylint: disable=missing-function-docstring 

562 base = globals() 

563 

564 AlembicDashboardView = ( # pylint: disable=invalid-name,redefined-outer-name 

565 kwargs.get("AlembicDashboardView", base["AlembicDashboardView"]) 

566 ) 

567 AlembicDashboardView.defaults(config) 

568 

569 AlembicMigrationView = ( # pylint: disable=invalid-name,redefined-outer-name 

570 kwargs.get("AlembicMigrationView", base["AlembicMigrationView"]) 

571 ) 

572 AlembicMigrationView.defaults(config) 

573 

574 

575def includeme(config): # pylint: disable=missing-function-docstring 

576 defaults(config)