m/paliad#77 Slice A. Folds the unbuilt t-paliad-214 Slice 3 (org async export) into a new "Backup Mode" surface gated by adminGate. m's calls (all 4 material picks per design §2): - Storage: local disk PALIAD_EXPORT_DIR (LocalDiskStore only) - Format: .zip bundle (xlsx + JSON + CSV + README) — no-lock-in preserved - paliadin_turns + paliadin_aichat_conversation: EXCLUDE structurally - Scheduler (Slice B): nightly 03:00 UTC, env-tunable Wiring: - mig 123 adds paliad.backups catalog table (kind/status/storage_uri/ size/row_counts/warnings/error/deleted_at + admin-only RLS). - ExportService.WriteOrg + orgSheetQueries enumerate 37 entity sheets + 12 ref sheets; REPEATABLE READ READ ONLY tx wraps the dump for snapshot consistency (design §3.3). - writeBundle + runSheetQuery refactored to take a sqlx.QueryerContext so both *sqlx.DB (personal/project paths, unchanged) and *sqlx.Tx (org snapshot path) work. - BackupRunner orchestrates: catalog INSERT → audit INSERT (event_type='backup_created') → WriteOrg → ArtifactStore.Put → patch catalog + audit on success/failure. - ArtifactStore interface + LocalDiskStore impl (defense-in-depth key validation + URI-outside-dir guard). - Sentinel actor for scheduled runs: actor_email='system@paliad', actor_id=NULL — no phantom user in paliad.users. - Admin handlers POST /api/admin/backups/run + GET list/get/download behind adminGate(users, …); /admin/backups page + sidebar entry + bilingual i18n keys. - BackupRunner only wired when PALIAD_EXPORT_DIR is set; routes return 503 otherwise (same shape as requireDB). Tests: 8 pure-function tests cover registry shape (no dups, paliadin absent both as sheet name and SQL substring, ref__* sheets unscoped, every sheet has ORDER BY) and LocalDiskStore (round-trip, bad-key rejection, URI-traversal rejection, mkdir on construction). go build ./... + go test ./internal/... clean. bun run build clean. Slice B (BackupScheduler + retention cleanup) and Slice C (UI polish) are separate follow-ups per head's instruction.
556 lines
20 KiB
Go
556 lines
20 KiB
Go
package services
|
|
|
|
// Backup Mode runtime (t-paliad-246 / m/paliad#77 Slice A).
|
|
//
|
|
// One file because all four pieces are tightly coupled:
|
|
//
|
|
// - ArtifactStore interface + LocalDiskStore implementation
|
|
// (storage abstraction; m picked local disk for v1, the interface
|
|
// stays so a future swap to Supabase Storage is one impl away).
|
|
//
|
|
// - BackupRunner — the orchestration the on-demand handler and the
|
|
// (Slice B) scheduler share. Wraps the export pipeline:
|
|
// 1. INSERT paliad.backups (status='running')
|
|
// 2. INSERT paliad.system_audit_log (event_type='backup_created')
|
|
// 3. ExportService.WriteOrg → in-memory buffer
|
|
// 4. ArtifactStore.Put → file
|
|
// 5. UPDATE paliad.backups (status='done', storage_uri, …)
|
|
// 6. PATCH paliad.system_audit_log metadata
|
|
//
|
|
// Design: docs/design-backup-mode-2026-05-25.md.
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/jmoiron/sqlx"
|
|
)
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// ArtifactStore interface + LocalDiskStore impl
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// ArtifactStore persists the bytes of a backup artifact. The interface
|
|
// is deliberately small so Slice B can drop in a SupabaseStorageStore
|
|
// (or any object-store implementation) without changing the runner.
|
|
//
|
|
// URIs returned by Put are opaque to callers — they round-trip through
|
|
// Get/Delete. v1's LocalDiskStore uses `file://<absolute-path>`.
|
|
type ArtifactStore interface {
|
|
// Put writes the given body to the store under the given key and
|
|
// returns the URI for later retrieval. Implementations must overwrite
|
|
// an existing object at the same key (catalog rows make keys unique
|
|
// in practice, but the contract is overwrite-on-conflict to keep
|
|
// retries idempotent).
|
|
Put(ctx context.Context, key string, body []byte) (uri string, err error)
|
|
// Get streams the artifact bytes at the given URI.
|
|
Get(ctx context.Context, uri string) (rc io.ReadCloser, size int64, err error)
|
|
// Delete removes the artifact at the given URI. Returns nil if the
|
|
// artifact is already absent (idempotent).
|
|
Delete(ctx context.Context, uri string) error
|
|
}
|
|
|
|
// LocalDiskStore is the v1 ArtifactStore — writes artifacts to a local
|
|
// directory specified at construction time. Mode 0700 on the directory
|
|
// + 0600 on artifact files keeps the files private to the paliad
|
|
// process owner on the Dokploy host.
|
|
type LocalDiskStore struct {
|
|
dir string
|
|
}
|
|
|
|
// NewLocalDiskStore creates a LocalDiskStore rooted at dir. Creates the
|
|
// directory (0700) if it doesn't exist. Returns an error if dir is
|
|
// empty or the mkdir fails.
|
|
func NewLocalDiskStore(dir string) (*LocalDiskStore, error) {
|
|
if strings.TrimSpace(dir) == "" {
|
|
return nil, errors.New("LocalDiskStore: empty directory")
|
|
}
|
|
if err := os.MkdirAll(dir, 0o700); err != nil {
|
|
return nil, fmt.Errorf("LocalDiskStore mkdir %q: %w", dir, err)
|
|
}
|
|
abs, err := filepath.Abs(dir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("LocalDiskStore abs %q: %w", dir, err)
|
|
}
|
|
return &LocalDiskStore{dir: abs}, nil
|
|
}
|
|
|
|
// Put writes body to <dir>/<key>. Returns a file:// URI.
|
|
func (s *LocalDiskStore) Put(_ context.Context, key string, body []byte) (string, error) {
|
|
if err := validateKey(key); err != nil {
|
|
return "", err
|
|
}
|
|
full := filepath.Join(s.dir, key)
|
|
if err := os.WriteFile(full, body, 0o600); err != nil {
|
|
return "", fmt.Errorf("LocalDiskStore write %q: %w", full, err)
|
|
}
|
|
return "file://" + full, nil
|
|
}
|
|
|
|
// Get opens the file referenced by uri. Returns a *os.File (io.ReadCloser)
|
|
// + the file's size in bytes.
|
|
func (s *LocalDiskStore) Get(_ context.Context, uri string) (io.ReadCloser, int64, error) {
|
|
path, err := s.pathFromURI(uri)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
info, err := os.Stat(path)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("LocalDiskStore stat %q: %w", path, err)
|
|
}
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("LocalDiskStore open %q: %w", path, err)
|
|
}
|
|
return f, info.Size(), nil
|
|
}
|
|
|
|
// Delete removes the file referenced by uri. Idempotent — missing file
|
|
// is treated as success.
|
|
func (s *LocalDiskStore) Delete(_ context.Context, uri string) error {
|
|
path, err := s.pathFromURI(uri)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
|
|
return fmt.Errorf("LocalDiskStore remove %q: %w", path, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// pathFromURI parses a file:// URI and validates that the resolved
|
|
// path is inside this store's directory. Defense-in-depth against a
|
|
// malformed catalog row pointing at an arbitrary file.
|
|
func (s *LocalDiskStore) pathFromURI(uri string) (string, error) {
|
|
u, err := url.Parse(uri)
|
|
if err != nil {
|
|
return "", fmt.Errorf("LocalDiskStore parse uri %q: %w", uri, err)
|
|
}
|
|
if u.Scheme != "file" {
|
|
return "", fmt.Errorf("LocalDiskStore: unsupported uri scheme %q (want file://)", u.Scheme)
|
|
}
|
|
// url.Parse drops the leading "/" for file:// URIs into u.Path.
|
|
path := u.Path
|
|
if u.Host != "" {
|
|
// "file://host/path" — we don't issue these. Reject.
|
|
return "", fmt.Errorf("LocalDiskStore: file:// uri with host is unsupported (%q)", uri)
|
|
}
|
|
clean := filepath.Clean(path)
|
|
rel, err := filepath.Rel(s.dir, clean)
|
|
if err != nil || strings.HasPrefix(rel, "..") {
|
|
return "", fmt.Errorf("LocalDiskStore: uri %q resolves outside store dir %q", uri, s.dir)
|
|
}
|
|
return clean, nil
|
|
}
|
|
|
|
// validateKey rejects keys that would escape the store dir (path
|
|
// separators, "..", absolute paths). Backup runner uses
|
|
// "<uuid>.zip" so this is a defensive guard.
|
|
func validateKey(key string) error {
|
|
if key == "" {
|
|
return errors.New("ArtifactStore: empty key")
|
|
}
|
|
if strings.ContainsAny(key, "/\\") {
|
|
return fmt.Errorf("ArtifactStore: key %q contains path separator", key)
|
|
}
|
|
if strings.Contains(key, "..") {
|
|
return fmt.Errorf("ArtifactStore: key %q contains traversal", key)
|
|
}
|
|
if filepath.IsAbs(key) {
|
|
return fmt.Errorf("ArtifactStore: key %q is absolute", key)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// BackupRunner
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// BackupKind discriminates a scheduled run from an on-demand one.
|
|
const (
|
|
BackupKindOnDemand = "on_demand"
|
|
BackupKindScheduled = "scheduled"
|
|
)
|
|
|
|
// BackupStatus values mirror the paliad.backups status check constraint.
|
|
const (
|
|
BackupStatusRunning = "running"
|
|
BackupStatusDone = "done"
|
|
BackupStatusFailed = "failed"
|
|
)
|
|
|
|
// SystemActorEmail is the sentinel actor_email written for scheduled
|
|
// backups (kind='scheduled'). Matches design §3.4 — we don't seed a
|
|
// phantom user, we just stamp the audit row with a stable sentinel.
|
|
const SystemActorEmail = "system@paliad"
|
|
|
|
// BackupActor identifies who requested a backup. For kind='scheduled'
|
|
// pass (nil, SystemActorEmail, "Paliad Backup System"). For on-demand
|
|
// pass the calling admin's id/email/display_name.
|
|
type BackupActor struct {
|
|
ID *uuid.UUID
|
|
Email string
|
|
Label string
|
|
}
|
|
|
|
// BackupResult is what Run returns to the caller. Empty on failure
|
|
// (the error gets the failure detail; the catalog/audit rows are
|
|
// already updated).
|
|
type BackupResult struct {
|
|
ID uuid.UUID
|
|
AuditID uuid.UUID
|
|
StorageURI string
|
|
SizeBytes int64
|
|
RowCounts map[string]int
|
|
SheetCount int
|
|
}
|
|
|
|
// BackupRunner orchestrates one backup run. Stateless except for the
|
|
// wired dependencies; safe to share across goroutines (the handler
|
|
// holds one instance; the Slice B scheduler will hold the same one).
|
|
type BackupRunner struct {
|
|
db *sqlx.DB
|
|
export *ExportService
|
|
store ArtifactStore
|
|
}
|
|
|
|
// NewBackupRunner wires the runner. All three deps are required; the
|
|
// caller (cmd/server/main.go) is responsible for instantiating the
|
|
// ArtifactStore from env config.
|
|
func NewBackupRunner(db *sqlx.DB, export *ExportService, store ArtifactStore) *BackupRunner {
|
|
return &BackupRunner{db: db, export: export, store: store}
|
|
}
|
|
|
|
// Store returns the configured store. Exposed for the download handler
|
|
// to stream artifacts via Get.
|
|
func (r *BackupRunner) Store() ArtifactStore { return r.store }
|
|
|
|
// Run performs one backup. Writes catalog + audit rows, generates the
|
|
// bundle via ExportService.WriteOrg, uploads to the configured store,
|
|
// patches catalog + audit on success/failure.
|
|
//
|
|
// On any error after the catalog/audit rows are written, the rows are
|
|
// patched to status='failed' / event_type='backup_failed' before
|
|
// returning. The returned error is always the export/upload failure —
|
|
// catalog-update failures during the failure-recovery path are best-
|
|
// effort logged but not surfaced (the real error is the one to bubble).
|
|
func (r *BackupRunner) Run(ctx context.Context, kind string, actor BackupActor) (BackupResult, error) {
|
|
if kind != BackupKindOnDemand && kind != BackupKindScheduled {
|
|
return BackupResult{}, fmt.Errorf("BackupRunner.Run: invalid kind %q", kind)
|
|
}
|
|
if actor.Email == "" {
|
|
return BackupResult{}, errors.New("BackupRunner.Run: empty actor email")
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
spec := ExportSpec{
|
|
Scope: ExportScopeOrg,
|
|
ActorID: uuid.Nil, // overwritten below when actor.ID != nil
|
|
ActorEmail: actor.Email,
|
|
ActorLabel: actor.Label,
|
|
GeneratedAt: now,
|
|
}
|
|
if actor.ID != nil {
|
|
spec.ActorID = *actor.ID
|
|
}
|
|
|
|
// Step 1+2: catalog row (status='running') + audit row
|
|
// (event_type='backup_created'). Both happen before the export
|
|
// generation so failure paths can always find them.
|
|
catalogID, err := r.insertCatalogRow(ctx, kind, actor, uuid.Nil, now)
|
|
if err != nil {
|
|
return BackupResult{}, fmt.Errorf("backup catalog insert: %w", err)
|
|
}
|
|
auditID, err := r.insertAuditRow(ctx, kind, actor, catalogID, now)
|
|
if err != nil {
|
|
// Best-effort patch on the catalog row so it doesn't sit
|
|
// "running" forever.
|
|
r.patchCatalogRowFailed(context.Background(), catalogID, fmt.Errorf("audit insert: %w", err))
|
|
return BackupResult{}, fmt.Errorf("backup audit insert: %w", err)
|
|
}
|
|
// Back-link the audit id into the catalog row so the UI can JOIN.
|
|
if err := r.linkAuditID(ctx, catalogID, auditID); err != nil {
|
|
// Non-fatal — the link is for UI convenience, not correctness.
|
|
// The error is logged via the patch path; we keep going.
|
|
}
|
|
|
|
// Step 3: generate the bundle into an in-memory buffer. We materialise
|
|
// fully before uploading so a partial upload doesn't strand bytes in
|
|
// the store under a "done" catalog row.
|
|
var buf bytes.Buffer
|
|
meta, err := r.export.WriteOrg(ctx, &buf, spec)
|
|
if err != nil {
|
|
r.failRun(context.Background(), catalogID, auditID, fmt.Errorf("generate: %w", err))
|
|
return BackupResult{}, fmt.Errorf("backup generate: %w", err)
|
|
}
|
|
|
|
// Step 4: upload to storage. Key = "<catalog_id>.zip".
|
|
key := catalogID.String() + ".zip"
|
|
uri, err := r.store.Put(ctx, key, buf.Bytes())
|
|
if err != nil {
|
|
r.failRun(context.Background(), catalogID, auditID, fmt.Errorf("upload: %w", err))
|
|
return BackupResult{}, fmt.Errorf("backup upload: %w", err)
|
|
}
|
|
|
|
// Step 5+6: patch catalog + audit on success.
|
|
size := int64(buf.Len())
|
|
sheetCount := len(meta.RowCounts)
|
|
if err := r.patchCatalogRowDone(ctx, catalogID, uri, size, sheetCount, meta); err != nil {
|
|
// At this point the artifact is on disk, the audit row was
|
|
// inserted, and the only thing that failed is the catalog
|
|
// flip. Surface as an error so the handler can log; the
|
|
// artifact is recoverable manually via the audit metadata.
|
|
return BackupResult{}, fmt.Errorf("backup catalog patch: %w", err)
|
|
}
|
|
if err := r.patchAuditRowDone(ctx, auditID, uri, size, sheetCount, meta); err != nil {
|
|
// Non-fatal — the catalog row is already authoritative; the
|
|
// audit row is the audit-trail twin. Log via the caller.
|
|
}
|
|
|
|
return BackupResult{
|
|
ID: catalogID,
|
|
AuditID: auditID,
|
|
StorageURI: uri,
|
|
SizeBytes: size,
|
|
RowCounts: meta.RowCounts,
|
|
SheetCount: sheetCount,
|
|
}, nil
|
|
}
|
|
|
|
// RecordDownload writes a paliad.system_audit_log row of
|
|
// event_type='backup_downloaded' when an admin downloads a backup
|
|
// via /api/admin/backups/{id}/file. Separate row per click — the
|
|
// existing 'backup_created' row stays untouched.
|
|
func (r *BackupRunner) RecordDownload(ctx context.Context, backupID uuid.UUID, by BackupActor) error {
|
|
if by.Email == "" {
|
|
return errors.New("BackupRunner.RecordDownload: empty actor email")
|
|
}
|
|
meta, _ := json.Marshal(map[string]any{
|
|
"backup_id": backupID.String(),
|
|
"downloaded_by_email": by.Email,
|
|
"downloaded_at": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
var actorID any
|
|
if by.ID != nil {
|
|
actorID = *by.ID
|
|
}
|
|
_, err := r.db.ExecContext(ctx,
|
|
`INSERT INTO paliad.system_audit_log
|
|
(event_type, actor_id, actor_email, scope, scope_root, metadata)
|
|
VALUES ('backup_downloaded', $1, $2, 'org', NULL, $3::jsonb)`,
|
|
actorID, by.Email, string(meta),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("backup_downloaded audit insert: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Catalog read helpers (List + Get for the admin UI)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// BackupSummary is the row shape returned by ListBackups + GetBackup —
|
|
// shaped for the /admin/backups UI. Nullable columns are pointers.
|
|
type BackupSummary struct {
|
|
ID uuid.UUID `db:"id" json:"id"`
|
|
Kind string `db:"kind" json:"kind"`
|
|
Status string `db:"status" json:"status"`
|
|
RequestedBy *uuid.UUID `db:"requested_by" json:"requested_by,omitempty"`
|
|
RequestedByEmail string `db:"requested_by_email" json:"requested_by_email"`
|
|
AuditID *uuid.UUID `db:"audit_id" json:"audit_id,omitempty"`
|
|
StorageURI *string `db:"storage_uri" json:"storage_uri,omitempty"`
|
|
SizeBytes *int64 `db:"size_bytes" json:"size_bytes,omitempty"`
|
|
RowCounts []byte `db:"row_counts" json:"row_counts,omitempty"`
|
|
SheetCount *int `db:"sheet_count" json:"sheet_count,omitempty"`
|
|
Warnings []byte `db:"warnings" json:"warnings,omitempty"`
|
|
Error *string `db:"error" json:"error,omitempty"`
|
|
StartedAt time.Time `db:"started_at" json:"started_at"`
|
|
FinishedAt *time.Time `db:"finished_at" json:"finished_at,omitempty"`
|
|
DeletedAt *time.Time `db:"deleted_at" json:"deleted_at,omitempty"`
|
|
}
|
|
|
|
// ListBackups returns the most recent backups (highest started_at first),
|
|
// capped at limit. limit <= 0 means default (100).
|
|
func (r *BackupRunner) ListBackups(ctx context.Context, limit int) ([]BackupSummary, error) {
|
|
if limit <= 0 {
|
|
limit = 100
|
|
}
|
|
var rows []BackupSummary
|
|
err := r.db.SelectContext(ctx, &rows,
|
|
`SELECT id, kind, status, requested_by, requested_by_email, audit_id,
|
|
storage_uri, size_bytes, row_counts, sheet_count, warnings,
|
|
error, started_at, finished_at, deleted_at
|
|
FROM paliad.backups
|
|
ORDER BY started_at DESC
|
|
LIMIT $1`,
|
|
limit,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list backups: %w", err)
|
|
}
|
|
return rows, nil
|
|
}
|
|
|
|
// GetBackup fetches one backup by id. Returns sql.ErrNoRows when not
|
|
// found (caller maps to 404).
|
|
func (r *BackupRunner) GetBackup(ctx context.Context, id uuid.UUID) (BackupSummary, error) {
|
|
var row BackupSummary
|
|
err := r.db.GetContext(ctx, &row,
|
|
`SELECT id, kind, status, requested_by, requested_by_email, audit_id,
|
|
storage_uri, size_bytes, row_counts, sheet_count, warnings,
|
|
error, started_at, finished_at, deleted_at
|
|
FROM paliad.backups
|
|
WHERE id = $1`,
|
|
id,
|
|
)
|
|
if err != nil {
|
|
return BackupSummary{}, err
|
|
}
|
|
return row, nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Catalog + audit SQL helpers (private — used by Run + RecordDownload).
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func (r *BackupRunner) insertCatalogRow(ctx context.Context, kind string, actor BackupActor, auditID uuid.UUID, now time.Time) (uuid.UUID, error) {
|
|
var actorID any
|
|
if actor.ID != nil {
|
|
actorID = *actor.ID
|
|
}
|
|
var auditArg any
|
|
if auditID != uuid.Nil {
|
|
auditArg = auditID
|
|
}
|
|
var id uuid.UUID
|
|
err := r.db.QueryRowxContext(ctx,
|
|
`INSERT INTO paliad.backups
|
|
(kind, status, requested_by, requested_by_email, audit_id, started_at)
|
|
VALUES ($1, 'running', $2, $3, $4, $5)
|
|
RETURNING id`,
|
|
kind, actorID, actor.Email, auditArg, now,
|
|
).Scan(&id)
|
|
if err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func (r *BackupRunner) insertAuditRow(ctx context.Context, kind string, actor BackupActor, catalogID uuid.UUID, now time.Time) (uuid.UUID, error) {
|
|
meta, _ := json.Marshal(map[string]any{
|
|
"kind": kind,
|
|
"catalog_id": catalogID.String(),
|
|
"requested_by_email": actor.Email,
|
|
"requested_at": now.Format(time.RFC3339),
|
|
})
|
|
var actorID any
|
|
if actor.ID != nil {
|
|
actorID = *actor.ID
|
|
}
|
|
var id uuid.UUID
|
|
err := r.db.QueryRowxContext(ctx,
|
|
`INSERT INTO paliad.system_audit_log
|
|
(event_type, actor_id, actor_email, scope, scope_root, metadata)
|
|
VALUES ('backup_created', $1, $2, 'org', NULL, $3::jsonb)
|
|
RETURNING id`,
|
|
actorID, actor.Email, string(meta),
|
|
).Scan(&id)
|
|
if err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func (r *BackupRunner) linkAuditID(ctx context.Context, catalogID, auditID uuid.UUID) error {
|
|
_, err := r.db.ExecContext(ctx,
|
|
`UPDATE paliad.backups SET audit_id = $2 WHERE id = $1`,
|
|
catalogID, auditID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (r *BackupRunner) patchCatalogRowDone(ctx context.Context, id uuid.UUID, uri string, size int64, sheetCount int, meta ExportMeta) error {
|
|
rcJSON, _ := json.Marshal(meta.RowCounts)
|
|
warnJSON, _ := json.Marshal(meta.Warnings)
|
|
if meta.Warnings == nil {
|
|
warnJSON = []byte("[]")
|
|
}
|
|
_, err := r.db.ExecContext(ctx,
|
|
`UPDATE paliad.backups
|
|
SET status = 'done',
|
|
storage_uri = $2,
|
|
size_bytes = $3,
|
|
sheet_count = $4,
|
|
row_counts = $5::jsonb,
|
|
warnings = $6::jsonb,
|
|
finished_at = now()
|
|
WHERE id = $1`,
|
|
id, uri, size, sheetCount, string(rcJSON), string(warnJSON),
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (r *BackupRunner) patchCatalogRowFailed(ctx context.Context, id uuid.UUID, runErr error) {
|
|
_, _ = r.db.ExecContext(ctx,
|
|
`UPDATE paliad.backups
|
|
SET status = 'failed',
|
|
error = $2,
|
|
finished_at = now()
|
|
WHERE id = $1`,
|
|
id, runErr.Error(),
|
|
)
|
|
}
|
|
|
|
func (r *BackupRunner) patchAuditRowDone(ctx context.Context, id uuid.UUID, uri string, size int64, sheetCount int, meta ExportMeta) error {
|
|
payload, _ := json.Marshal(map[string]any{
|
|
"row_counts": meta.RowCounts,
|
|
"file_size_bytes": size,
|
|
"sheet_count": sheetCount,
|
|
"storage_uri": uri,
|
|
"warnings": meta.Warnings,
|
|
"completed_at": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
_, err := r.db.ExecContext(ctx,
|
|
`UPDATE paliad.system_audit_log
|
|
SET metadata = metadata || $2::jsonb,
|
|
updated_at = now()
|
|
WHERE id = $1`,
|
|
id, string(payload),
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (r *BackupRunner) patchAuditRowFailed(ctx context.Context, id uuid.UUID, runErr error) {
|
|
payload, _ := json.Marshal(map[string]any{
|
|
"error": runErr.Error(),
|
|
"failed_at": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
_, _ = r.db.ExecContext(ctx,
|
|
`UPDATE paliad.system_audit_log
|
|
SET event_type = 'backup_failed',
|
|
metadata = metadata || $2::jsonb,
|
|
updated_at = now()
|
|
WHERE id = $1`,
|
|
id, string(payload),
|
|
)
|
|
}
|
|
|
|
// failRun is the shared failure-recovery path: patch the catalog +
|
|
// audit rows to their failed states. Uses a context.Background so the
|
|
// patch happens even if the original ctx is already cancelled.
|
|
func (r *BackupRunner) failRun(ctx context.Context, catalogID, auditID uuid.UUID, runErr error) {
|
|
r.patchCatalogRowFailed(ctx, catalogID, runErr)
|
|
r.patchAuditRowFailed(ctx, auditID, runErr)
|
|
}
|