Schema (applied via migration imagen_series_init): - imagen.series parent table (prompt + params + count CHECK 1..10 + selected_image_id) - imagen.jobs += series_id (FK) + series_idx - imagen.images += series_id (FK) - Owner-scoped RLS on series (SELECT/INSERT/UPDATE) + grants - Partial indexes WHERE series_id IS NOT NULL on both child tables Worker pipeline: - worker.Job += SeriesID, populated from imagen.jobs.series_id via the claim query. - cloud.SyncRequest += SeriesID; insertRow writes series_id when non-empty, omits the key when empty so solo runs leave the column NULL. - maybeCloudSync threads seriesID from job.SeriesID through to the cloud sink. generate.go (CLI) always passes "" — solo path unchanged. Tests: - worker: SeriesID propagates from Job to fakePipeline.lastJob unchanged, solo job keeps it empty. - cloud: SyncRequest.SeriesID lands as row.series_id in the POST body; empty SeriesID omits the key entirely. Refs ImaGen#9.
366 lines
11 KiB
Go
366 lines
11 KiB
Go
// Package cloud syncs a generated image to Supabase Storage and inserts
|
|
// a row into imagen.images. Both steps are best-effort: callers log the
|
|
// returned error and proceed, because the local PNG + sidecar are already
|
|
// on disk by the time Sync runs and a cloud blip should not lose the
|
|
// artefact.
|
|
//
|
|
// The single source of truth for the row schema is the imagen_schema_init
|
|
// migration — see internal docs in the issue body for #7.
|
|
package cloud
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// supabaseSchema is the PostgREST profile header value the imagen schema
|
|
// is exposed under (see ALTER ROLE authenticator SET pgrst.db_schemas).
|
|
const supabaseSchema = "imagen"
|
|
|
|
// bucketName is the Supabase Storage bucket all generated images land in.
|
|
const bucketName = "imagen-generated"
|
|
|
|
// Sink writes one PNG + one row per generation. It is safe to share
|
|
// across goroutines.
|
|
type Sink struct {
|
|
// URL is SUPABASE_URL — e.g. https://supa.flexsiebels.de.
|
|
URL string
|
|
// APIKey is the service-role key (SUPABASE_SERVICE_KEY). Storage uploads
|
|
// and DB inserts both bypass RLS with this key — the policies on the
|
|
// table + bucket are the contract for the read side.
|
|
APIKey string
|
|
// OwnerUserID is m's auth.users.id. It populates owner_user_id on every
|
|
// row. Empty means the sink refuses to insert (the column is NOT NULL
|
|
// and the user-mode reader needs it for the RLS policy).
|
|
OwnerUserID string
|
|
// HTTP is the http client; tests inject one pointing at httptest.
|
|
HTTP *http.Client
|
|
// MaxRetries is the number of additional attempts after the first
|
|
// failure for retryable (5xx) responses. Zero means single-shot.
|
|
MaxRetries int
|
|
// InitialBackoff is the wait before the first retry; doubles per attempt.
|
|
// Set very small in tests.
|
|
InitialBackoff time.Duration
|
|
}
|
|
|
|
// NewFromEnv returns a sink populated from SUPABASE_URL +
|
|
// SUPABASE_SERVICE_KEY (or MAI_SUPABASE_KEY) + IMAGEN_OWNER_USER_ID.
|
|
// Returns ok=false if the URL or key are missing — the caller treats that
|
|
// as "cloud-sync disabled by environment".
|
|
func NewFromEnv() (*Sink, bool) {
|
|
u := strings.TrimRight(os.Getenv("SUPABASE_URL"), "/")
|
|
if u == "" {
|
|
return nil, false
|
|
}
|
|
key := os.Getenv("SUPABASE_SERVICE_KEY")
|
|
if key == "" {
|
|
key = os.Getenv("MAI_SUPABASE_KEY")
|
|
}
|
|
if key == "" {
|
|
return nil, false
|
|
}
|
|
return &Sink{
|
|
URL: u,
|
|
APIKey: key,
|
|
OwnerUserID: os.Getenv("IMAGEN_OWNER_USER_ID"),
|
|
HTTP: &http.Client{Timeout: 30 * time.Second},
|
|
MaxRetries: 2,
|
|
InitialBackoff: time.Second,
|
|
}, true
|
|
}
|
|
|
|
// SyncRequest is the cross-backend ingredient set Sync needs. Date is
|
|
// formatted as YYYY-MM-DD; Slug + Seed are reused from the local
|
|
// filename so storage_path mirrors disk layout.
|
|
type SyncRequest struct {
|
|
Date string
|
|
Slug string
|
|
Seed int64
|
|
Ext string // "png", "jpg", "webp" — no leading dot
|
|
PNG []byte
|
|
MimeType string
|
|
|
|
Prompt string
|
|
Backend string
|
|
Model string
|
|
Steps int
|
|
Width int
|
|
Height int
|
|
LatencyMs int
|
|
CostUSDEstimate *float64
|
|
Sidecar map[string]any
|
|
|
|
// SeriesID is the parent imagen.series row when this image is one of
|
|
// N tries in a batch. Empty means a solo run — the column stays NULL,
|
|
// which keeps the row visible on the main list-page query
|
|
// (`WHERE series_id IS NULL`).
|
|
SeriesID string
|
|
}
|
|
|
|
// SyncResult tells the caller what landed where.
|
|
type SyncResult struct {
|
|
StoragePath string // e.g. "2026-05-11/lighthouse-42.png"
|
|
ImageID string // imagen.images.id (UUID)
|
|
}
|
|
|
|
// Sync uploads the bytes and inserts the metadata row. Returns the row's
|
|
// id and storage_path on success; any non-nil error is what the caller
|
|
// surfaces as "imagen: cloud sync: <err>" and otherwise ignores.
|
|
func (s *Sink) Sync(ctx context.Context, req SyncRequest) (*SyncResult, error) {
|
|
if s == nil {
|
|
return nil, fmt.Errorf("cloud sink not configured")
|
|
}
|
|
if s.OwnerUserID == "" {
|
|
return nil, fmt.Errorf("owner_user_id not set (config or $IMAGEN_OWNER_USER_ID); refusing to insert NULL into imagen.images")
|
|
}
|
|
if req.Date == "" || req.Slug == "" {
|
|
return nil, fmt.Errorf("date and slug are required for storage_path")
|
|
}
|
|
ext := req.Ext
|
|
if ext == "" {
|
|
ext = "png"
|
|
}
|
|
storagePath := fmt.Sprintf("%s/%s-%d.%s", req.Date, req.Slug, req.Seed, ext)
|
|
|
|
if err := s.upload(ctx, storagePath, req.PNG, req.MimeType); err != nil {
|
|
return nil, fmt.Errorf("storage upload: %w", err)
|
|
}
|
|
|
|
id, err := s.insertRow(ctx, storagePath, req)
|
|
if err != nil {
|
|
return &SyncResult{StoragePath: storagePath}, fmt.Errorf("db insert: %w", err)
|
|
}
|
|
return &SyncResult{StoragePath: storagePath, ImageID: id}, nil
|
|
}
|
|
|
|
// upload PUTs the PNG into the imagen-generated bucket. We use
|
|
// Content-Type so signed URLs render in the browser without a download
|
|
// prompt. POST would error on second-write; PUT (with x-upsert: true) is
|
|
// idempotent for re-runs of the same date+slug+seed.
|
|
func (s *Sink) upload(ctx context.Context, storagePath string, body []byte, mime string) error {
|
|
if mime == "" {
|
|
mime = "image/png"
|
|
}
|
|
endpoint := fmt.Sprintf("%s/storage/v1/object/%s/%s", s.URL, bucketName, pathEscape(storagePath))
|
|
return s.doRetry(ctx, func(ctx context.Context) (*http.Response, error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPut, endpoint, bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("apikey", s.APIKey)
|
|
req.Header.Set("Authorization", "Bearer "+s.APIKey)
|
|
req.Header.Set("Content-Type", mime)
|
|
req.Header.Set("x-upsert", "true")
|
|
return s.HTTP.Do(req)
|
|
})
|
|
}
|
|
|
|
// insertRow POSTs to PostgREST against the imagen schema. Prefer:
|
|
// return=representation gives us the inserted id back without a second
|
|
// round-trip.
|
|
func (s *Sink) insertRow(ctx context.Context, storagePath string, req SyncRequest) (string, error) {
|
|
row := map[string]any{
|
|
"owner_user_id": s.OwnerUserID,
|
|
"prompt": req.Prompt,
|
|
"prompt_hash": hashPrompt(req.Prompt),
|
|
"backend": req.Backend,
|
|
"storage_path": storagePath,
|
|
}
|
|
if req.Model != "" {
|
|
row["model"] = req.Model
|
|
}
|
|
if req.Seed != 0 {
|
|
row["seed"] = req.Seed
|
|
}
|
|
if req.Steps != 0 {
|
|
row["steps"] = req.Steps
|
|
}
|
|
if req.Width != 0 {
|
|
row["width"] = req.Width
|
|
}
|
|
if req.Height != 0 {
|
|
row["height"] = req.Height
|
|
}
|
|
if req.LatencyMs != 0 {
|
|
row["latency_ms"] = req.LatencyMs
|
|
}
|
|
if req.CostUSDEstimate != nil {
|
|
row["cost_usd_estimate"] = *req.CostUSDEstimate
|
|
}
|
|
if len(req.Sidecar) > 0 {
|
|
row["sidecar"] = req.Sidecar
|
|
}
|
|
if req.SeriesID != "" {
|
|
row["series_id"] = req.SeriesID
|
|
}
|
|
|
|
body, err := json.Marshal(row)
|
|
if err != nil {
|
|
return "", fmt.Errorf("marshal row: %w", err)
|
|
}
|
|
|
|
endpoint := s.URL + "/rest/v1/images"
|
|
|
|
respBody, err := s.doRetryRead(ctx, func(ctx context.Context) (*http.Response, error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("apikey", s.APIKey)
|
|
req.Header.Set("Authorization", "Bearer "+s.APIKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Accept-Profile", supabaseSchema)
|
|
req.Header.Set("Content-Profile", supabaseSchema)
|
|
req.Header.Set("Prefer", "return=representation")
|
|
return s.HTTP.Do(req)
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
var rows []struct {
|
|
ID string `json:"id"`
|
|
}
|
|
if err := json.Unmarshal(respBody, &rows); err != nil {
|
|
return "", fmt.Errorf("parse insert response: %w (body: %s)", err, snip(respBody))
|
|
}
|
|
if len(rows) == 0 {
|
|
return "", fmt.Errorf("insert returned 0 rows (body: %s)", snip(respBody))
|
|
}
|
|
return rows[0].ID, nil
|
|
}
|
|
|
|
// SignedURL asks the Storage API for a time-limited URL. ttlSeconds is
|
|
// the validity window. Returned URL is host-qualified and ready to hand
|
|
// to a browser.
|
|
func (s *Sink) SignedURL(ctx context.Context, storagePath string, ttlSeconds int) (string, error) {
|
|
if s == nil {
|
|
return "", fmt.Errorf("cloud sink not configured")
|
|
}
|
|
if ttlSeconds <= 0 {
|
|
ttlSeconds = 3600
|
|
}
|
|
endpoint := fmt.Sprintf("%s/storage/v1/object/sign/%s/%s", s.URL, bucketName, pathEscape(storagePath))
|
|
body, err := json.Marshal(map[string]any{"expiresIn": ttlSeconds})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
req.Header.Set("apikey", s.APIKey)
|
|
req.Header.Set("Authorization", "Bearer "+s.APIKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
resp, err := s.HTTP.Do(req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
respBody, _ := io.ReadAll(resp.Body)
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return "", fmt.Errorf("sign %d: %s", resp.StatusCode, snip(respBody))
|
|
}
|
|
var parsed struct {
|
|
SignedURL string `json:"signedURL"`
|
|
}
|
|
if err := json.Unmarshal(respBody, &parsed); err != nil {
|
|
return "", fmt.Errorf("parse sign response: %w (body: %s)", err, snip(respBody))
|
|
}
|
|
if parsed.SignedURL == "" {
|
|
return "", fmt.Errorf("empty signedURL in response: %s", snip(respBody))
|
|
}
|
|
full := parsed.SignedURL
|
|
if strings.HasPrefix(full, "/") {
|
|
full = s.URL + full
|
|
}
|
|
return full, nil
|
|
}
|
|
|
|
// doRetry runs op up to MaxRetries+1 times. 5xx and transport errors are
|
|
// retried with exponential backoff; 4xx surfaces immediately as a
|
|
// permanent error (caller's bug in the row, not a network blip).
|
|
func (s *Sink) doRetry(ctx context.Context, op func(context.Context) (*http.Response, error)) error {
|
|
_, err := s.doRetryRead(ctx, op)
|
|
return err
|
|
}
|
|
|
|
// doRetryRead is the read-the-body variant. Returns the 2xx response
|
|
// body bytes; non-2xx is wrapped in an error. Same retry semantics as
|
|
// doRetry: 5xx/transport retries with exponential backoff, 4xx is fatal.
|
|
func (s *Sink) doRetryRead(ctx context.Context, op func(context.Context) (*http.Response, error)) ([]byte, error) {
|
|
backoff := s.InitialBackoff
|
|
if backoff == 0 {
|
|
backoff = time.Second
|
|
}
|
|
attempts := s.MaxRetries + 1
|
|
if attempts < 1 {
|
|
attempts = 1
|
|
}
|
|
var lastErr error
|
|
for i := 0; i < attempts; i++ {
|
|
if i > 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-time.After(backoff):
|
|
}
|
|
backoff *= 2
|
|
}
|
|
resp, err := op(ctx)
|
|
if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
body, readErr := io.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
if readErr != nil {
|
|
lastErr = fmt.Errorf("read body: %w", readErr)
|
|
continue
|
|
}
|
|
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
|
return body, nil
|
|
}
|
|
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
|
|
return nil, fmt.Errorf("%d: %s", resp.StatusCode, snip(body))
|
|
}
|
|
lastErr = fmt.Errorf("%d: %s", resp.StatusCode, snip(body))
|
|
}
|
|
return nil, lastErr
|
|
}
|
|
|
|
func hashPrompt(p string) string {
|
|
sum := sha256.Sum256([]byte(p))
|
|
return hex.EncodeToString(sum[:])
|
|
}
|
|
|
|
// pathEscape encodes each path segment but keeps the slashes — the
|
|
// Storage API treats the part after the bucket name as a virtual file
|
|
// path with directory separators.
|
|
func pathEscape(p string) string {
|
|
parts := strings.Split(p, "/")
|
|
for i, seg := range parts {
|
|
parts[i] = url.PathEscape(seg)
|
|
}
|
|
return path.Join(parts...)
|
|
}
|
|
|
|
func snip(b []byte) string {
|
|
const max = 500
|
|
s := strings.TrimSpace(string(b))
|
|
if len(s) > max {
|
|
s = s[:max] + "..."
|
|
}
|
|
return s
|
|
}
|