// Package db owns the Paliad Postgres connection and embedded schema migrations. // // Migrations are NNN_description.up.sql / .down.sql files in the migrations/ // subdirectory, embedded into the binary so a single artifact ships with its // schema. The server applies pending migrations at startup before binding // the HTTP listener. // // The runner tracks applied state as a set, not a counter: every applied // migration gets its own row in paliad.applied_migrations(version PK, name, // applied_at, checksum). On every deploy, pending = on_disk \ applied, in // ascending version order. Gaps in the version space are first-class — a // version that's missing from applied_migrations runs on the next deploy, // regardless of which higher versions are already applied. // // This is what closes the parallel-merge skip-hole that the single-counter // tracker (golang-migrate) silently fell into on 2026-05-20 (m/paliad#44). // Background and design: docs/design-migration-runner-applied-set-2026-05-20.md. // // .down.sql files ship in the embedded FS as reference material but are not // auto-applied — there are no call sites for rolling back, and operator // recovery (psql .down.sql + DELETE FROM paliad.applied_migrations WHERE // version=N) is the documented path. If a real call site for auto-rollback // materializes later, add it as a focused follow-up. package db import ( "crypto/sha256" "database/sql" "embed" "errors" "fmt" "hash/fnv" "sort" "strconv" "strings" _ "github.com/lib/pq" ) //go:embed migrations/*.sql var migrationFS embed.FS // advisoryLockID is the Postgres advisory-lock id the runner takes around // the apply loop. Derived once from the table name so the value is stable // across processes — two concurrent deploys (rolling Dokploy update, dev // laptop hitting the same scratch DB as CI) serialize on this id rather // than racing on the pending set. // // FNV-1a-64 is good enough: the id only has to be a stable int64, not // cryptographically uniform. Process-wide constant. var advisoryLockID = func() int64 { h := fnv.New64a() _, _ = h.Write([]byte("paliad.applied_migrations")) return int64(h.Sum64()) }() // migration is one *.up.sql file from the embedded FS. type migration struct { version int name string filename string } // ApplyMigrations applies every pending up-migration to the given database. // // Safe to call repeatedly; a fully-applied tree is a no-op. Returns the // first error encountered (with the offending migration filename wrapped // in the message) and leaves the rest of pending unapplied — same fail-fast // posture as the previous golang-migrate runner. // // On first deploy of this code path against a database that still has the // legacy paliad.paliad_schema_migrations counter at version N, the runner // seeds paliad.applied_migrations with rows 1..N (checksum NULL) before // applying anything new. The first deploy is therefore effectively a // no-op against the schema — the bootstrap just relabels existing state. func ApplyMigrations(databaseURL string) error { if databaseURL == "" { return errors.New("database URL is empty") } conn, err := sql.Open("postgres", databaseURL) if err != nil { return fmt.Errorf("open database: %w", err) } defer conn.Close() if err := conn.Ping(); err != nil { return fmt.Errorf("ping database: %w", err) } // Ensure the paliad schema exists. Mig 001 also creates it; the // applied_migrations table lives in paliad.* and gets created before // any migrations run, so the schema must exist first. if _, err := conn.Exec(`CREATE SCHEMA IF NOT EXISTS paliad`); err != nil { return fmt.Errorf("ensure paliad schema: %w", err) } if _, err := conn.Exec(`SELECT pg_advisory_lock($1)`, advisoryLockID); err != nil { return fmt.Errorf("acquire advisory lock: %w", err) } defer func() { _, _ = conn.Exec(`SELECT pg_advisory_unlock($1)`, advisoryLockID) }() if _, err := conn.Exec(` CREATE TABLE IF NOT EXISTS paliad.applied_migrations ( version int NOT NULL PRIMARY KEY, name text NOT NULL, applied_at timestamptz NOT NULL DEFAULT now(), checksum text NULL ) `); err != nil { return fmt.Errorf("create applied_migrations: %w", err) } onDisk, err := scanEmbeddedMigrations() if err != nil { return fmt.Errorf("scan embedded migrations: %w", err) } if err := bootstrapFromLegacyTracker(conn, onDisk); err != nil { return fmt.Errorf("bootstrap from legacy tracker: %w", err) } applied, err := readAppliedMigrations(conn) if err != nil { return fmt.Errorf("read applied_migrations: %w", err) } if err := checkNameAgreement(onDisk, applied); err != nil { return err } for _, m := range onDisk { if _, ok := applied[m.version]; ok { continue } if err := applyOne(conn, m); err != nil { return fmt.Errorf("apply %s: %w", m.filename, err) } } return nil } // scanEmbeddedMigrations returns every NNN_*.up.sql in the embedded FS, // sorted by version ascending. Hard-fails on two files sharing the same // version prefix — that's the failure mode the parallel-merge incident // exposed, and the runner refuses to start rather than silently picking one. func scanEmbeddedMigrations() ([]migration, error) { entries, err := migrationFS.ReadDir("migrations") if err != nil { return nil, fmt.Errorf("read migrations dir: %w", err) } seen := map[int]string{} var out []migration for _, e := range entries { name := e.Name() if !strings.HasSuffix(name, ".up.sql") { continue } v, n, ok := parseMigrationFilename(name) if !ok { return nil, fmt.Errorf("unparseable migration filename %q "+ "(expected NNN_description.up.sql)", name) } if prior, dup := seen[v]; dup { return nil, fmt.Errorf("two migrations at version %d: %q and %q — "+ "rename one and redeploy", v, prior, name) } seen[v] = name out = append(out, migration{version: v, name: n, filename: name}) } sort.Slice(out, func(i, j int) bool { return out[i].version < out[j].version }) return out, nil } // parseMigrationFilename splits "NNN_description.up.sql" into (NNN, description). // Returns ok=false on any deviation from that shape. func parseMigrationFilename(filename string) (version int, name string, ok bool) { base := strings.TrimSuffix(filename, ".up.sql") if base == filename { return 0, "", false } underscore := strings.IndexByte(base, '_') if underscore <= 0 { return 0, "", false } v, err := strconv.Atoi(base[:underscore]) if err != nil { return 0, "", false } return v, base[underscore+1:], true } // readAppliedMigrations returns a map version → name from // paliad.applied_migrations. Returns an empty map (no error) if the table // is missing — that's the fresh-DB path before the CREATE TABLE in // ApplyMigrations runs against it. func readAppliedMigrations(conn *sql.DB) (map[int]string, error) { rows, err := conn.Query(`SELECT version, name FROM paliad.applied_migrations`) if err != nil { if strings.Contains(err.Error(), "does not exist") { return map[int]string{}, nil } return nil, err } defer rows.Close() out := map[int]string{} for rows.Next() { var v int var n string if err := rows.Scan(&v, &n); err != nil { return nil, err } out[v] = n } return out, rows.Err() } // bootstrapFromLegacyTracker seeds paliad.applied_migrations from // paliad.paliad_schema_migrations on the first deploy of the new runner // against a DB that previously ran golang-migrate. // // Behavior: // - applied_migrations already has rows → no-op (idempotent). // - applied_migrations empty AND legacy tracker missing → no-op // (virgin DB; the apply loop will run everything from scratch). // - applied_migrations empty AND legacy tracker present, clean, version N // → INSERT rows for every on-disk version ≤ N with checksum NULL. // - applied_migrations empty AND legacy tracker dirty → hard-fail. // The operator must recover the legacy tracker first (it being dirty // means a prior golang-migrate run crashed mid-flight); we will not // paper over an unknown state by guessing what landed. // // Backfilled rows have checksum NULL because the legacy runner didn't hash // anything — we can't fabricate a provenance hash today without falsely // claiming we know the byte-identity of what shipped historically. func bootstrapFromLegacyTracker(conn *sql.DB, onDisk []migration) error { var count int if err := conn.QueryRow(`SELECT count(*) FROM paliad.applied_migrations`).Scan(&count); err != nil { return fmt.Errorf("count applied_migrations: %w", err) } if count > 0 { return nil } var legacyVer int var legacyDirty bool err := conn.QueryRow(`SELECT version, dirty FROM paliad.paliad_schema_migrations LIMIT 1`). Scan(&legacyVer, &legacyDirty) if errors.Is(err, sql.ErrNoRows) { return nil } if err != nil { if strings.Contains(err.Error(), "does not exist") { return nil } return fmt.Errorf("read legacy tracker: %w", err) } if legacyDirty { return fmt.Errorf("legacy paliad.paliad_schema_migrations is dirty at version %d — "+ "recover manually before deploying", legacyVer) } for _, m := range onDisk { if m.version > legacyVer { continue } if _, err := conn.Exec(` INSERT INTO paliad.applied_migrations(version, name, applied_at, checksum) VALUES ($1, $2, now(), NULL) ON CONFLICT (version) DO NOTHING `, m.version, m.name); err != nil { return fmt.Errorf("backfill version %d: %w", m.version, err) } } return nil } // checkNameAgreement hard-fails if a version that's already applied has a // different name on disk than in the DB. Catches the post-merge rename // accident where someone renames `098_foo.up.sql` to `098_bar.up.sql` — // the SQL has already run on prod with the old name, so the rename is a // lie about history. Operator recovery: revert the rename, or update the // DB row if the rename is intentional. // // Backfilled rows have a name pulled from the on-disk filename, so an // out-of-the-box backfill never trips this check. func checkNameAgreement(onDisk []migration, applied map[int]string) error { for _, m := range onDisk { dbName, ok := applied[m.version] if !ok { continue } if dbName != m.name { return fmt.Errorf("migration %d: disk name %q != DB name %q "+ "(renamed after apply? revert the rename, or UPDATE paliad.applied_migrations "+ "SET name=%q WHERE version=%d if the rename is intentional)", m.version, m.name, dbName, m.name, m.version) } } return nil } // applyOne runs one migration's .up.sql plus its INSERT row in a single // transaction. All-or-nothing per migration: if the SQL fails, the row // isn't inserted and the next deploy re-tries from the same point. If // the INSERT fails (e.g. PK violation because the lock wasn't held), the // SQL rolls back too. func applyOne(conn *sql.DB, m migration) error { body, err := migrationFS.ReadFile("migrations/" + m.filename) if err != nil { return fmt.Errorf("read %s: %w", m.filename, err) } checksum := fmt.Sprintf("%x", sha256.Sum256(body)) tx, err := conn.Begin() if err != nil { return fmt.Errorf("begin tx: %w", err) } defer func() { _ = tx.Rollback() }() if _, err := tx.Exec(string(body)); err != nil { return fmt.Errorf("exec sql: %w", err) } if _, err := tx.Exec(` INSERT INTO paliad.applied_migrations(version, name, applied_at, checksum) VALUES ($1, $2, now(), $3) `, m.version, m.name, checksum); err != nil { return fmt.Errorf("record applied: %w", err) } return tx.Commit() }