// Package cloud syncs a generated image to Supabase Storage and inserts // a row into imagen.images. Both steps are best-effort: callers log the // returned error and proceed, because the local PNG + sidecar are already // on disk by the time Sync runs and a cloud blip should not lose the // artefact. // // The single source of truth for the row schema is the imagen_schema_init // migration — see internal docs in the issue body for #7. package cloud import ( "bytes" "context" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "io" "net/http" "net/url" "os" "path" "strings" "time" ) // supabaseSchema is the PostgREST profile header value the imagen schema // is exposed under (see ALTER ROLE authenticator SET pgrst.db_schemas). const supabaseSchema = "imagen" // bucketName is the Supabase Storage bucket all generated images land in. const bucketName = "imagen-generated" // Sink writes one PNG + one row per generation. It is safe to share // across goroutines. type Sink struct { // URL is SUPABASE_URL — e.g. https://supa.flexsiebels.de. URL string // APIKey is the service-role key (SUPABASE_SERVICE_KEY). Storage uploads // and DB inserts both bypass RLS with this key — the policies on the // table + bucket are the contract for the read side. APIKey string // OwnerUserID is m's auth.users.id. It populates owner_user_id on every // row. Empty means the sink refuses to insert (the column is NOT NULL // and the user-mode reader needs it for the RLS policy). OwnerUserID string // HTTP is the http client; tests inject one pointing at httptest. HTTP *http.Client // MaxRetries is the number of additional attempts after the first // failure for retryable (5xx) responses. Zero means single-shot. MaxRetries int // InitialBackoff is the wait before the first retry; doubles per attempt. // Set very small in tests. InitialBackoff time.Duration } // NewFromEnv returns a sink populated from SUPABASE_URL + // SUPABASE_SERVICE_KEY (or MAI_SUPABASE_KEY) + IMAGEN_OWNER_USER_ID. // Returns ok=false if the URL or key are missing — the caller treats that // as "cloud-sync disabled by environment". func NewFromEnv() (*Sink, bool) { u := strings.TrimRight(os.Getenv("SUPABASE_URL"), "/") if u == "" { return nil, false } key := os.Getenv("SUPABASE_SERVICE_KEY") if key == "" { key = os.Getenv("MAI_SUPABASE_KEY") } if key == "" { return nil, false } return &Sink{ URL: u, APIKey: key, OwnerUserID: os.Getenv("IMAGEN_OWNER_USER_ID"), HTTP: &http.Client{Timeout: 30 * time.Second}, MaxRetries: 2, InitialBackoff: time.Second, }, true } // SyncRequest is the cross-backend ingredient set Sync needs. Date is // formatted as YYYY-MM-DD; Slug + Seed are reused from the local // filename so storage_path mirrors disk layout. type SyncRequest struct { Date string Slug string Seed int64 Ext string // "png", "jpg", "webp" — no leading dot PNG []byte MimeType string Prompt string Backend string Model string Steps int Width int Height int LatencyMs int CostUSDEstimate *float64 Sidecar map[string]any } // SyncResult tells the caller what landed where. type SyncResult struct { StoragePath string // e.g. "2026-05-11/lighthouse-42.png" ImageID string // imagen.images.id (UUID) } // Sync uploads the bytes and inserts the metadata row. Returns the row's // id and storage_path on success; any non-nil error is what the caller // surfaces as "imagen: cloud sync: " and otherwise ignores. func (s *Sink) Sync(ctx context.Context, req SyncRequest) (*SyncResult, error) { if s == nil { return nil, fmt.Errorf("cloud sink not configured") } if s.OwnerUserID == "" { return nil, fmt.Errorf("owner_user_id not set (config or $IMAGEN_OWNER_USER_ID); refusing to insert NULL into imagen.images") } if req.Date == "" || req.Slug == "" { return nil, fmt.Errorf("date and slug are required for storage_path") } ext := req.Ext if ext == "" { ext = "png" } storagePath := fmt.Sprintf("%s/%s-%d.%s", req.Date, req.Slug, req.Seed, ext) if err := s.upload(ctx, storagePath, req.PNG, req.MimeType); err != nil { return nil, fmt.Errorf("storage upload: %w", err) } id, err := s.insertRow(ctx, storagePath, req) if err != nil { return &SyncResult{StoragePath: storagePath}, fmt.Errorf("db insert: %w", err) } return &SyncResult{StoragePath: storagePath, ImageID: id}, nil } // upload PUTs the PNG into the imagen-generated bucket. We use // Content-Type so signed URLs render in the browser without a download // prompt. POST would error on second-write; PUT (with x-upsert: true) is // idempotent for re-runs of the same date+slug+seed. func (s *Sink) upload(ctx context.Context, storagePath string, body []byte, mime string) error { if mime == "" { mime = "image/png" } endpoint := fmt.Sprintf("%s/storage/v1/object/%s/%s", s.URL, bucketName, pathEscape(storagePath)) return s.doRetry(ctx, func(ctx context.Context) (*http.Response, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPut, endpoint, bytes.NewReader(body)) if err != nil { return nil, err } req.Header.Set("apikey", s.APIKey) req.Header.Set("Authorization", "Bearer "+s.APIKey) req.Header.Set("Content-Type", mime) req.Header.Set("x-upsert", "true") return s.HTTP.Do(req) }) } // insertRow POSTs to PostgREST against the imagen schema. Prefer: // return=representation gives us the inserted id back without a second // round-trip. func (s *Sink) insertRow(ctx context.Context, storagePath string, req SyncRequest) (string, error) { row := map[string]any{ "owner_user_id": s.OwnerUserID, "prompt": req.Prompt, "prompt_hash": hashPrompt(req.Prompt), "backend": req.Backend, "storage_path": storagePath, } if req.Model != "" { row["model"] = req.Model } if req.Seed != 0 { row["seed"] = req.Seed } if req.Steps != 0 { row["steps"] = req.Steps } if req.Width != 0 { row["width"] = req.Width } if req.Height != 0 { row["height"] = req.Height } if req.LatencyMs != 0 { row["latency_ms"] = req.LatencyMs } if req.CostUSDEstimate != nil { row["cost_usd_estimate"] = *req.CostUSDEstimate } if len(req.Sidecar) > 0 { row["sidecar"] = req.Sidecar } body, err := json.Marshal(row) if err != nil { return "", fmt.Errorf("marshal row: %w", err) } endpoint := s.URL + "/rest/v1/images" respBody, err := s.doRetryRead(ctx, func(ctx context.Context) (*http.Response, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { return nil, err } req.Header.Set("apikey", s.APIKey) req.Header.Set("Authorization", "Bearer "+s.APIKey) req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept-Profile", supabaseSchema) req.Header.Set("Content-Profile", supabaseSchema) req.Header.Set("Prefer", "return=representation") return s.HTTP.Do(req) }) if err != nil { return "", err } var rows []struct { ID string `json:"id"` } if err := json.Unmarshal(respBody, &rows); err != nil { return "", fmt.Errorf("parse insert response: %w (body: %s)", err, snip(respBody)) } if len(rows) == 0 { return "", fmt.Errorf("insert returned 0 rows (body: %s)", snip(respBody)) } return rows[0].ID, nil } // SignedURL asks the Storage API for a time-limited URL. ttlSeconds is // the validity window. Returned URL is host-qualified and ready to hand // to a browser. func (s *Sink) SignedURL(ctx context.Context, storagePath string, ttlSeconds int) (string, error) { if s == nil { return "", fmt.Errorf("cloud sink not configured") } if ttlSeconds <= 0 { ttlSeconds = 3600 } endpoint := fmt.Sprintf("%s/storage/v1/object/sign/%s/%s", s.URL, bucketName, pathEscape(storagePath)) body, err := json.Marshal(map[string]any{"expiresIn": ttlSeconds}) if err != nil { return "", err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { return "", err } req.Header.Set("apikey", s.APIKey) req.Header.Set("Authorization", "Bearer "+s.APIKey) req.Header.Set("Content-Type", "application/json") resp, err := s.HTTP.Do(req) if err != nil { return "", err } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) if resp.StatusCode < 200 || resp.StatusCode >= 300 { return "", fmt.Errorf("sign %d: %s", resp.StatusCode, snip(respBody)) } var parsed struct { SignedURL string `json:"signedURL"` } if err := json.Unmarshal(respBody, &parsed); err != nil { return "", fmt.Errorf("parse sign response: %w (body: %s)", err, snip(respBody)) } if parsed.SignedURL == "" { return "", fmt.Errorf("empty signedURL in response: %s", snip(respBody)) } full := parsed.SignedURL if strings.HasPrefix(full, "/") { full = s.URL + full } return full, nil } // doRetry runs op up to MaxRetries+1 times. 5xx and transport errors are // retried with exponential backoff; 4xx surfaces immediately as a // permanent error (caller's bug in the row, not a network blip). func (s *Sink) doRetry(ctx context.Context, op func(context.Context) (*http.Response, error)) error { _, err := s.doRetryRead(ctx, op) return err } // doRetryRead is the read-the-body variant. Returns the 2xx response // body bytes; non-2xx is wrapped in an error. Same retry semantics as // doRetry: 5xx/transport retries with exponential backoff, 4xx is fatal. func (s *Sink) doRetryRead(ctx context.Context, op func(context.Context) (*http.Response, error)) ([]byte, error) { backoff := s.InitialBackoff if backoff == 0 { backoff = time.Second } attempts := s.MaxRetries + 1 if attempts < 1 { attempts = 1 } var lastErr error for i := 0; i < attempts; i++ { if i > 0 { select { case <-ctx.Done(): return nil, ctx.Err() case <-time.After(backoff): } backoff *= 2 } resp, err := op(ctx) if err != nil { lastErr = err continue } body, readErr := io.ReadAll(resp.Body) resp.Body.Close() if readErr != nil { lastErr = fmt.Errorf("read body: %w", readErr) continue } if resp.StatusCode >= 200 && resp.StatusCode < 300 { return body, nil } if resp.StatusCode >= 400 && resp.StatusCode < 500 { return nil, fmt.Errorf("%d: %s", resp.StatusCode, snip(body)) } lastErr = fmt.Errorf("%d: %s", resp.StatusCode, snip(body)) } return nil, lastErr } func hashPrompt(p string) string { sum := sha256.Sum256([]byte(p)) return hex.EncodeToString(sum[:]) } // pathEscape encodes each path segment but keeps the slashes — the // Storage API treats the part after the bucket name as a virtual file // path with directory separators. func pathEscape(p string) string { parts := strings.Split(p, "/") for i, seg := range parts { parts[i] = url.PathEscape(seg) } return path.Join(parts...) } func snip(b []byte) string { const max = 500 s := strings.TrimSpace(string(b)) if len(s) > max { s = s[:max] + "..." } return s }