Replaces the golang-migrate single-counter tracker with a hand-rolled runner over embed.FS that tracks applied state as a set in paliad.applied_migrations (version PK, name, applied_at, checksum). Closes the parallel-merge skip-hole the 2026-05-20 mig-103 incident exposed (m/paliad#44): a migration whose version is missing from applied_migrations runs on the next deploy regardless of which higher versions are already applied. Gaps are first-class. Slice 1 of the design at docs/design-migration-runner-applied-set-2026-05-20.md. All eight design decisions m-picked = inventor recommendation. Runner contract: - Ensure paliad schema → pg_advisory_lock(hash('paliad.applied_migrations')) → CREATE TABLE IF NOT EXISTS applied_migrations. - bootstrapFromLegacyTracker: if applied_migrations is empty and the legacy paliad.paliad_schema_migrations row is present and clean, INSERT rows 1..N for every on-disk version with checksum=NULL via ON CONFLICT DO NOTHING. Hard-fail if legacy tracker is dirty (operator must recover). - scanEmbeddedMigrations: hard-fail on two .up.sql files sharing a version prefix — the failure mode the post-mortem exposed. - checkNameAgreement: hard-fail on rename-after-apply mismatch (disk name for an already-applied version != DB name). - applyOne: SQL body + INSERT(version, name, now(), sha256(file_bytes)) in one transaction. All-or-nothing per migration. Checksums populated on apply for future drift detection; rows backfilled from the legacy tracker carry NULL (we can't fabricate a hash for what golang-migrate applied historically). Verify-on-deploy intentionally deferred to a focused follow-up — single if-block flip when m wants it. Up-only runner. .down.sql files stay in embed.FS as reference; manual roll-back path is psql + DELETE FROM paliad.applied_migrations WHERE version=N. Zero call sites for migrate.Down in the codebase today. Drops github.com/golang-migrate/migrate/v4 from go.mod (no other importers; verified via grep). Tests: - internal/db/migrate_test.go: TestMigrations_DryRun walks pending = on_disk \\ applied (read from paliad.applied_migrations, missing-table → empty set), runs each in BEGIN/ROLLBACK against the scratch DB. - cmd/server/main_smoke_test.go: TestBootSmoke asserts the applied set equals the on-disk set exactly (not just max-version-match) — catches the skip class the post-mortem documented. Dirty-flag check removed (rows are committed or absent, not 'dirty'). - All 45 service-test call sites of db.ApplyMigrations work unchanged (same signature, same fresh-DB behavior). Follow-up: mig 108_drop_legacy_trackers (DROP paliad.paliad_schema_migrations and public.paliad_schema_migrations) after one or two deploys of burn-in on this slice.
333 lines
11 KiB
Go
333 lines
11 KiB
Go
// Package db owns the Paliad Postgres connection and embedded schema migrations.
|
|
//
|
|
// Migrations are NNN_description.up.sql / .down.sql files in the migrations/
|
|
// subdirectory, embedded into the binary so a single artifact ships with its
|
|
// schema. The server applies pending migrations at startup before binding
|
|
// the HTTP listener.
|
|
//
|
|
// The runner tracks applied state as a set, not a counter: every applied
|
|
// migration gets its own row in paliad.applied_migrations(version PK, name,
|
|
// applied_at, checksum). On every deploy, pending = on_disk \ applied, in
|
|
// ascending version order. Gaps in the version space are first-class — a
|
|
// version that's missing from applied_migrations runs on the next deploy,
|
|
// regardless of which higher versions are already applied.
|
|
//
|
|
// This is what closes the parallel-merge skip-hole that the single-counter
|
|
// tracker (golang-migrate) silently fell into on 2026-05-20 (m/paliad#44).
|
|
// Background and design: docs/design-migration-runner-applied-set-2026-05-20.md.
|
|
//
|
|
// .down.sql files ship in the embedded FS as reference material but are not
|
|
// auto-applied — there are no call sites for rolling back, and operator
|
|
// recovery (psql .down.sql + DELETE FROM paliad.applied_migrations WHERE
|
|
// version=N) is the documented path. If a real call site for auto-rollback
|
|
// materializes later, add it as a focused follow-up.
|
|
package db
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"database/sql"
|
|
"embed"
|
|
"errors"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
//go:embed migrations/*.sql
|
|
var migrationFS embed.FS
|
|
|
|
// advisoryLockID is the Postgres advisory-lock id the runner takes around
|
|
// the apply loop. Derived once from the table name so the value is stable
|
|
// across processes — two concurrent deploys (rolling Dokploy update, dev
|
|
// laptop hitting the same scratch DB as CI) serialize on this id rather
|
|
// than racing on the pending set.
|
|
//
|
|
// FNV-1a-64 is good enough: the id only has to be a stable int64, not
|
|
// cryptographically uniform. Process-wide constant.
|
|
var advisoryLockID = func() int64 {
|
|
h := fnv.New64a()
|
|
_, _ = h.Write([]byte("paliad.applied_migrations"))
|
|
return int64(h.Sum64())
|
|
}()
|
|
|
|
// migration is one *.up.sql file from the embedded FS.
|
|
type migration struct {
|
|
version int
|
|
name string
|
|
filename string
|
|
}
|
|
|
|
// ApplyMigrations applies every pending up-migration to the given database.
|
|
//
|
|
// Safe to call repeatedly; a fully-applied tree is a no-op. Returns the
|
|
// first error encountered (with the offending migration filename wrapped
|
|
// in the message) and leaves the rest of pending unapplied — same fail-fast
|
|
// posture as the previous golang-migrate runner.
|
|
//
|
|
// On first deploy of this code path against a database that still has the
|
|
// legacy paliad.paliad_schema_migrations counter at version N, the runner
|
|
// seeds paliad.applied_migrations with rows 1..N (checksum NULL) before
|
|
// applying anything new. The first deploy is therefore effectively a
|
|
// no-op against the schema — the bootstrap just relabels existing state.
|
|
func ApplyMigrations(databaseURL string) error {
|
|
if databaseURL == "" {
|
|
return errors.New("database URL is empty")
|
|
}
|
|
|
|
conn, err := sql.Open("postgres", databaseURL)
|
|
if err != nil {
|
|
return fmt.Errorf("open database: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
if err := conn.Ping(); err != nil {
|
|
return fmt.Errorf("ping database: %w", err)
|
|
}
|
|
|
|
// Ensure the paliad schema exists. Mig 001 also creates it; the
|
|
// applied_migrations table lives in paliad.* and gets created before
|
|
// any migrations run, so the schema must exist first.
|
|
if _, err := conn.Exec(`CREATE SCHEMA IF NOT EXISTS paliad`); err != nil {
|
|
return fmt.Errorf("ensure paliad schema: %w", err)
|
|
}
|
|
|
|
if _, err := conn.Exec(`SELECT pg_advisory_lock($1)`, advisoryLockID); err != nil {
|
|
return fmt.Errorf("acquire advisory lock: %w", err)
|
|
}
|
|
defer func() {
|
|
_, _ = conn.Exec(`SELECT pg_advisory_unlock($1)`, advisoryLockID)
|
|
}()
|
|
|
|
if _, err := conn.Exec(`
|
|
CREATE TABLE IF NOT EXISTS paliad.applied_migrations (
|
|
version int NOT NULL PRIMARY KEY,
|
|
name text NOT NULL,
|
|
applied_at timestamptz NOT NULL DEFAULT now(),
|
|
checksum text NULL
|
|
)
|
|
`); err != nil {
|
|
return fmt.Errorf("create applied_migrations: %w", err)
|
|
}
|
|
|
|
onDisk, err := scanEmbeddedMigrations()
|
|
if err != nil {
|
|
return fmt.Errorf("scan embedded migrations: %w", err)
|
|
}
|
|
|
|
if err := bootstrapFromLegacyTracker(conn, onDisk); err != nil {
|
|
return fmt.Errorf("bootstrap from legacy tracker: %w", err)
|
|
}
|
|
|
|
applied, err := readAppliedMigrations(conn)
|
|
if err != nil {
|
|
return fmt.Errorf("read applied_migrations: %w", err)
|
|
}
|
|
|
|
if err := checkNameAgreement(onDisk, applied); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, m := range onDisk {
|
|
if _, ok := applied[m.version]; ok {
|
|
continue
|
|
}
|
|
if err := applyOne(conn, m); err != nil {
|
|
return fmt.Errorf("apply %s: %w", m.filename, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// scanEmbeddedMigrations returns every NNN_*.up.sql in the embedded FS,
|
|
// sorted by version ascending. Hard-fails on two files sharing the same
|
|
// version prefix — that's the failure mode the parallel-merge incident
|
|
// exposed, and the runner refuses to start rather than silently picking one.
|
|
func scanEmbeddedMigrations() ([]migration, error) {
|
|
entries, err := migrationFS.ReadDir("migrations")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read migrations dir: %w", err)
|
|
}
|
|
seen := map[int]string{}
|
|
var out []migration
|
|
for _, e := range entries {
|
|
name := e.Name()
|
|
if !strings.HasSuffix(name, ".up.sql") {
|
|
continue
|
|
}
|
|
v, n, ok := parseMigrationFilename(name)
|
|
if !ok {
|
|
return nil, fmt.Errorf("unparseable migration filename %q "+
|
|
"(expected NNN_description.up.sql)", name)
|
|
}
|
|
if prior, dup := seen[v]; dup {
|
|
return nil, fmt.Errorf("two migrations at version %d: %q and %q — "+
|
|
"rename one and redeploy", v, prior, name)
|
|
}
|
|
seen[v] = name
|
|
out = append(out, migration{version: v, name: n, filename: name})
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i].version < out[j].version })
|
|
return out, nil
|
|
}
|
|
|
|
// parseMigrationFilename splits "NNN_description.up.sql" into (NNN, description).
|
|
// Returns ok=false on any deviation from that shape.
|
|
func parseMigrationFilename(filename string) (version int, name string, ok bool) {
|
|
base := strings.TrimSuffix(filename, ".up.sql")
|
|
if base == filename {
|
|
return 0, "", false
|
|
}
|
|
underscore := strings.IndexByte(base, '_')
|
|
if underscore <= 0 {
|
|
return 0, "", false
|
|
}
|
|
v, err := strconv.Atoi(base[:underscore])
|
|
if err != nil {
|
|
return 0, "", false
|
|
}
|
|
return v, base[underscore+1:], true
|
|
}
|
|
|
|
// readAppliedMigrations returns a map version → name from
|
|
// paliad.applied_migrations. Returns an empty map (no error) if the table
|
|
// is missing — that's the fresh-DB path before the CREATE TABLE in
|
|
// ApplyMigrations runs against it.
|
|
func readAppliedMigrations(conn *sql.DB) (map[int]string, error) {
|
|
rows, err := conn.Query(`SELECT version, name FROM paliad.applied_migrations`)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "does not exist") {
|
|
return map[int]string{}, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := map[int]string{}
|
|
for rows.Next() {
|
|
var v int
|
|
var n string
|
|
if err := rows.Scan(&v, &n); err != nil {
|
|
return nil, err
|
|
}
|
|
out[v] = n
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// bootstrapFromLegacyTracker seeds paliad.applied_migrations from
|
|
// paliad.paliad_schema_migrations on the first deploy of the new runner
|
|
// against a DB that previously ran golang-migrate.
|
|
//
|
|
// Behavior:
|
|
// - applied_migrations already has rows → no-op (idempotent).
|
|
// - applied_migrations empty AND legacy tracker missing → no-op
|
|
// (virgin DB; the apply loop will run everything from scratch).
|
|
// - applied_migrations empty AND legacy tracker present, clean, version N
|
|
// → INSERT rows for every on-disk version ≤ N with checksum NULL.
|
|
// - applied_migrations empty AND legacy tracker dirty → hard-fail.
|
|
// The operator must recover the legacy tracker first (it being dirty
|
|
// means a prior golang-migrate run crashed mid-flight); we will not
|
|
// paper over an unknown state by guessing what landed.
|
|
//
|
|
// Backfilled rows have checksum NULL because the legacy runner didn't hash
|
|
// anything — we can't fabricate a provenance hash today without falsely
|
|
// claiming we know the byte-identity of what shipped historically.
|
|
func bootstrapFromLegacyTracker(conn *sql.DB, onDisk []migration) error {
|
|
var count int
|
|
if err := conn.QueryRow(`SELECT count(*) FROM paliad.applied_migrations`).Scan(&count); err != nil {
|
|
return fmt.Errorf("count applied_migrations: %w", err)
|
|
}
|
|
if count > 0 {
|
|
return nil
|
|
}
|
|
|
|
var legacyVer int
|
|
var legacyDirty bool
|
|
err := conn.QueryRow(`SELECT version, dirty FROM paliad.paliad_schema_migrations LIMIT 1`).
|
|
Scan(&legacyVer, &legacyDirty)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "does not exist") {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("read legacy tracker: %w", err)
|
|
}
|
|
if legacyDirty {
|
|
return fmt.Errorf("legacy paliad.paliad_schema_migrations is dirty at version %d — "+
|
|
"recover manually before deploying", legacyVer)
|
|
}
|
|
|
|
for _, m := range onDisk {
|
|
if m.version > legacyVer {
|
|
continue
|
|
}
|
|
if _, err := conn.Exec(`
|
|
INSERT INTO paliad.applied_migrations(version, name, applied_at, checksum)
|
|
VALUES ($1, $2, now(), NULL)
|
|
ON CONFLICT (version) DO NOTHING
|
|
`, m.version, m.name); err != nil {
|
|
return fmt.Errorf("backfill version %d: %w", m.version, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkNameAgreement hard-fails if a version that's already applied has a
|
|
// different name on disk than in the DB. Catches the post-merge rename
|
|
// accident where someone renames `098_foo.up.sql` to `098_bar.up.sql` —
|
|
// the SQL has already run on prod with the old name, so the rename is a
|
|
// lie about history. Operator recovery: revert the rename, or update the
|
|
// DB row if the rename is intentional.
|
|
//
|
|
// Backfilled rows have a name pulled from the on-disk filename, so an
|
|
// out-of-the-box backfill never trips this check.
|
|
func checkNameAgreement(onDisk []migration, applied map[int]string) error {
|
|
for _, m := range onDisk {
|
|
dbName, ok := applied[m.version]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if dbName != m.name {
|
|
return fmt.Errorf("migration %d: disk name %q != DB name %q "+
|
|
"(renamed after apply? revert the rename, or UPDATE paliad.applied_migrations "+
|
|
"SET name=%q WHERE version=%d if the rename is intentional)",
|
|
m.version, m.name, dbName, m.name, m.version)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// applyOne runs one migration's .up.sql plus its INSERT row in a single
|
|
// transaction. All-or-nothing per migration: if the SQL fails, the row
|
|
// isn't inserted and the next deploy re-tries from the same point. If
|
|
// the INSERT fails (e.g. PK violation because the lock wasn't held), the
|
|
// SQL rolls back too.
|
|
func applyOne(conn *sql.DB, m migration) error {
|
|
body, err := migrationFS.ReadFile("migrations/" + m.filename)
|
|
if err != nil {
|
|
return fmt.Errorf("read %s: %w", m.filename, err)
|
|
}
|
|
checksum := fmt.Sprintf("%x", sha256.Sum256(body))
|
|
|
|
tx, err := conn.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("begin tx: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback() }()
|
|
|
|
if _, err := tx.Exec(string(body)); err != nil {
|
|
return fmt.Errorf("exec sql: %w", err)
|
|
}
|
|
if _, err := tx.Exec(`
|
|
INSERT INTO paliad.applied_migrations(version, name, applied_at, checksum)
|
|
VALUES ($1, $2, now(), $3)
|
|
`, m.version, m.name, checksum); err != nil {
|
|
return fmt.Errorf("record applied: %w", err)
|
|
}
|
|
return tx.Commit()
|
|
}
|