diff --git a/cmd/imagen/config.go b/cmd/imagen/config.go index 617ab1a..c5ef3bc 100644 --- a/cmd/imagen/config.go +++ b/cmd/imagen/config.go @@ -39,6 +39,18 @@ func runConfig(args []string) error { } fmt.Fprintf(os.Stdout, "OK — %d backend(s) defined, default=%q\n", len(cfg.Backends), cfg.DefaultBackend) + // Soft warnings — surfaced on stderr so they're visible but don't + // fail the validate exit code. + cloudMode := cfg.Output.CloudSync + if cloudMode == "" { + cloudMode = "auto" + } + if cloudMode != "off" && cfg.OwnerUserID == "" { + fmt.Fprintln(os.Stderr, + "warning: cloud_sync is "+cloudMode+" but owner_user_id is empty — DB inserts will be skipped.") + fmt.Fprintln(os.Stderr, + " look it up: SELECT id FROM auth.users WHERE email = '';") + } return nil default: return userErr("unknown config subcommand %q (init|validate|path)", args[0]) diff --git a/cmd/imagen/generate.go b/cmd/imagen/generate.go index 00ba642..b01cce1 100644 --- a/cmd/imagen/generate.go +++ b/cmd/imagen/generate.go @@ -2,13 +2,17 @@ package main import ( "context" + "encoding/json" "flag" "fmt" "os" + "path/filepath" "strconv" "strings" + "time" "mgit.msbls.de/m/ImaGen/internal/backend" + "mgit.msbls.de/m/ImaGen/internal/cloud" "mgit.msbls.de/m/ImaGen/internal/config" "mgit.msbls.de/m/ImaGen/internal/output" "mgit.msbls.de/m/ImaGen/internal/preview" @@ -30,6 +34,7 @@ func runGenerate(ctx context.Context, args []string) error { noSidecar bool previewOn bool previewOff bool + noCloud bool ) fs.StringVar(&backendName, "backend", "", "backend instance name (default: config.default_backend)") fs.StringVar(&size, "size", "1024x1024", "WxH, e.g. 1024x1024") @@ -42,6 +47,7 @@ func runGenerate(ctx context.Context, args []string) error { fs.BoolVar(&noSidecar, "no-sidecar", false, "skip the JSON sidecar even if config enables it") fs.BoolVar(&previewOn, "preview", false, "force tmux preview window on (errors outside $TMUX)") fs.BoolVar(&previewOff, "no-preview", false, "skip the tmux preview window") + fs.BoolVar(&noCloud, "no-cloud", false, "skip Supabase upload + imagen.images insert for this generation") fs.Usage = func() { fmt.Fprintln(fs.Output(), `Usage: imagen generate "" [flags]`) fs.PrintDefaults() @@ -126,6 +132,11 @@ func runGenerate(ctx context.Context, args []string) error { fmt.Fprintln(os.Stderr, "sidecar:", paths.SidecarPath) } + if err := maybeCloudSync(ctx, cfg, noCloud, paths, in, res, w, h); err != nil { + // cloud-sync failures are warnings — the image already wrote. + fmt.Fprintln(os.Stderr, "imagen: cloud sync:", err) + } + if err := maybePreview(cfg, previewOn, previewOff, paths.ImagePath, rawPrompt); err != nil { // preview failures are warnings — the image already wrote. fmt.Fprintln(os.Stderr, "imagen: preview:", err) @@ -133,6 +144,175 @@ func runGenerate(ctx context.Context, args []string) error { return nil } +// resolveCloudSyncMode applies the precedence chain config -> env -> flag. +// Flags win, env beats config, config beats the implicit auto default. +// Mirrors resolvePreviewMode shape. +func resolveCloudSyncMode(cfg *config.Config, noCloudFlag bool, env string) (string, error) { + mode := "auto" + if cfg != nil && cfg.Output.CloudSync != "" { + mode = cfg.Output.CloudSync + } + if env != "" { + switch env { + case "auto", "on", "off": + mode = env + default: + return "", fmt.Errorf("$IMAGEN_CLOUD_SYNC = %q (must be auto|on|off)", env) + } + } + if noCloudFlag { + mode = "off" + } + return mode, nil +} + +// maybeCloudSync resolves the effective mode and, if it says yes, uploads +// the PNG and inserts the row. Always non-fatal — the image already wrote. +func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, paths *output.Outputs, in output.Inputs, res *backend.Result, width, height int) error { + mode, err := resolveCloudSyncMode(cfg, noCloud, os.Getenv("IMAGEN_CLOUD_SYNC")) + if err != nil { + return err + } + if mode == "off" { + return nil + } + + sink, ok := cloud.NewFromEnv() + if !ok { + if mode == "on" { + return fmt.Errorf("cloud_sync=on but SUPABASE_URL / SUPABASE_SERVICE_KEY not set in env") + } + // auto + missing env = silent skip. + return nil + } + // Config-supplied owner_user_id takes precedence over $IMAGEN_OWNER_USER_ID. + if cfg != nil && cfg.OwnerUserID != "" { + sink.OwnerUserID = cfg.OwnerUserID + } + if sink.OwnerUserID == "" { + if mode == "on" { + return fmt.Errorf("cloud_sync=on but owner_user_id not set in config and $IMAGEN_OWNER_USER_ID is empty") + } + // auto + missing UUID = silent skip. + return nil + } + + pngBytes, readErr := os.ReadFile(paths.ImagePath) + if readErr != nil { + return fmt.Errorf("read local image: %w", readErr) + } + + // Reuse the writer's date/slug/seed so storage_path mirrors the local + // filename's prefix exactly — viewers can join `imagen.images` on + // either side without timezone drift. + date := paths.Date + slug := paths.Slug + if date == "" || slug == "" { + now := time.Now() + date = now.Format("2006-01-02") + slug = output.Slug(in.Prompt) + } + ext := in.Ext + if ext == "" { + ext = strings.TrimPrefix(filepath.Ext(paths.ImagePath), ".") + } + if ext == "" { + ext = "png" + } + + // Snapshot the sidecar (if it exists) so the row carries the same + // metadata view a downstream viewer would see on disk. + var sidecar map[string]any + if paths.SidecarPath != "" { + if scBytes, err := os.ReadFile(paths.SidecarPath); err == nil { + _ = json.Unmarshal(scBytes, &sidecar) + } + } + + model := metaString(res.Metadata, "model") + steps := metaInt(res.Metadata, "steps") + cost := metaFloatPtr(res.Metadata, "cost_usd_estimate") + latency := metaInt(res.Metadata, "latency_ms") + + seed := paths.Seed + if seed == 0 { + seed = in.Seed + } + syncReq := cloud.SyncRequest{ + Date: date, + Slug: slug, + Seed: seed, + Ext: ext, + PNG: pngBytes, + MimeType: res.MimeType, + Prompt: in.Prompt, + Backend: in.Backend, + Model: model, + Steps: steps, + Width: width, + Height: height, + LatencyMs: latency, + CostUSDEstimate: cost, + Sidecar: sidecar, + } + syncCtx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + result, err := sink.Sync(syncCtx, syncReq) + if err != nil { + return err + } + if result != nil && result.ImageID != "" { + fmt.Fprintf(os.Stderr, "cloud: imagen.images.id=%s storage_path=%s\n", result.ImageID, result.StoragePath) + } + return nil +} + +func metaString(m map[string]any, key string) string { + if v, ok := m[key]; ok { + if s, ok := v.(string); ok { + return s + } + } + return "" +} + +func metaInt(m map[string]any, key string) int { + v, ok := m[key] + if !ok { + return 0 + } + switch n := v.(type) { + case int: + return n + case int64: + return int(n) + case float64: + return int(n) + } + return 0 +} + +func metaFloatPtr(m map[string]any, key string) *float64 { + v, ok := m[key] + if !ok { + return nil + } + switch n := v.(type) { + case float64: + return &n + case float32: + f := float64(n) + return &f + case int: + f := float64(n) + return &f + case int64: + f := float64(n) + return &f + } + return nil +} + // resolvePreviewMode applies the precedence chain config -> env -> flag. // Flags win, env beats config, config beats the implicit auto default. func resolvePreviewMode(cfg *config.Config, flagOn, flagOff bool, env string) (preview.Mode, error) { diff --git a/cmd/imagen/generate_test.go b/cmd/imagen/generate_test.go index c979df0..f7cf8cc 100644 --- a/cmd/imagen/generate_test.go +++ b/cmd/imagen/generate_test.go @@ -48,3 +48,40 @@ func TestResolvePreviewMode(t *testing.T) { }) } } + +func TestResolveCloudSyncMode(t *testing.T) { + type tc struct { + name string + cfg *config.Config + noCloud bool + env string + want string + wantError bool + } + cases := []tc{ + {name: "all-empty-defaults-to-auto", want: "auto"}, + {name: "config-on", cfg: &config.Config{Output: config.OutputConfig{CloudSync: "on"}}, want: "on"}, + {name: "config-off", cfg: &config.Config{Output: config.OutputConfig{CloudSync: "off"}}, want: "off"}, + {name: "env-overrides-config", cfg: &config.Config{Output: config.OutputConfig{CloudSync: "on"}}, env: "off", want: "off"}, + {name: "flag-overrides-env-and-config", cfg: &config.Config{Output: config.OutputConfig{CloudSync: "on"}}, env: "on", noCloud: true, want: "off"}, + {name: "flag-overrides-config-on", cfg: &config.Config{Output: config.OutputConfig{CloudSync: "on"}}, noCloud: true, want: "off"}, + {name: "bad-env-errors", env: "yes", wantError: true}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got, err := resolveCloudSyncMode(c.cfg, c.noCloud, c.env) + if c.wantError { + if err == nil { + t.Fatalf("expected error, got mode %q", got) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != c.want { + t.Errorf("mode = %q, want %q", got, c.want) + } + }) + } +} diff --git a/docs/architecture.md b/docs/architecture.md index 72c519f..3510c3d 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -16,6 +16,8 @@ upstream API. Each adapter only ever sees its own slice of `imagen.yaml`. │ internal/output │ filename templating, sidecar │ internal/config │ YAML loader, validation │ internal/preview │ tmux-img window spawner + │ internal/cloud │ Supabase Storage + imagen.images + │ internal/usage │ mai.imagen_usage cost-tracking └──────────┬────────────┘ │ ┌──────────▼────────────┐ diff --git a/docs/usage.md b/docs/usage.md index 01fd1ea..75585fc 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -26,6 +26,7 @@ imagen version print version | `--no-sidecar` | `false` | Skip the JSON sidecar even if config enables it | | `--preview` | (auto) | Force open a tmux preview window via `tmux-img` | | `--no-preview` | (auto) | Suppress the preview window (use for batch / CI callers) | +| `--no-cloud` | `false` | Skip Supabase upload + `imagen.images` insert for this call | | `--config` | `~/.config/imagen.yaml` | Override config path | ### Preview window @@ -114,3 +115,30 @@ imagen usage --since 2026-05-01 --raw Per-model rates live in `internal/backend/replicate_pricing.go` — they are snapshotted from and refreshed on a quarterly cadence. + +## Cloud-sync (Supabase) + +Successful generations also upload the PNG to the private Supabase +Storage bucket `imagen-generated` (path: `/-.png`) +and insert a row into `imagen.images`. The row carries the prompt, +sha256-hashed prompt, backend, model, seed/steps/width/height, latency, +cost estimate, the full local sidecar JSON, and an empty `tags` array +ready for the flexsiebels viewer to fill in. + +Configuration: + +- `owner_user_id` in `imagen.yaml` — m's `auth.users.id`. Empty disables + inserts (the column is `NOT NULL`). +- `output.cloud_sync` in `imagen.yaml`: `auto` (default — on iff + SUPABASE creds + `owner_user_id` are set), `on` (errors if either is + missing), `off`. +- `IMAGEN_CLOUD_SYNC=auto|on|off` overrides config. +- `--no-cloud` overrides everything for one call. + +Reuses the same Supabase env (`SUPABASE_URL` + `SUPABASE_SERVICE_KEY` or +`MAI_SUPABASE_KEY`) as cost-tracking. Service-role bypasses RLS for +inserts; the `owner_user_id = auth.uid()` policy on the table gates the +read path the flexsiebels viewer hits. + +Failures (Storage 5xx, DB unreachable) emit `imagen: cloud sync: ` +to stderr and the local PNG + sidecar stay put. Exit code is unchanged. diff --git a/internal/cloud/cloud.go b/internal/cloud/cloud.go new file mode 100644 index 0000000..3ed8994 --- /dev/null +++ b/internal/cloud/cloud.go @@ -0,0 +1,356 @@ +// Package cloud syncs a generated image to Supabase Storage and inserts +// a row into imagen.images. Both steps are best-effort: callers log the +// returned error and proceed, because the local PNG + sidecar are already +// on disk by the time Sync runs and a cloud blip should not lose the +// artefact. +// +// The single source of truth for the row schema is the imagen_schema_init +// migration — see internal docs in the issue body for #7. +package cloud + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path" + "strings" + "time" +) + +// supabaseSchema is the PostgREST profile header value the imagen schema +// is exposed under (see ALTER ROLE authenticator SET pgrst.db_schemas). +const supabaseSchema = "imagen" + +// bucketName is the Supabase Storage bucket all generated images land in. +const bucketName = "imagen-generated" + +// Sink writes one PNG + one row per generation. It is safe to share +// across goroutines. +type Sink struct { + // URL is SUPABASE_URL — e.g. https://supa.flexsiebels.de. + URL string + // APIKey is the service-role key (SUPABASE_SERVICE_KEY). Storage uploads + // and DB inserts both bypass RLS with this key — the policies on the + // table + bucket are the contract for the read side. + APIKey string + // OwnerUserID is m's auth.users.id. It populates owner_user_id on every + // row. Empty means the sink refuses to insert (the column is NOT NULL + // and the user-mode reader needs it for the RLS policy). + OwnerUserID string + // HTTP is the http client; tests inject one pointing at httptest. + HTTP *http.Client + // MaxRetries is the number of additional attempts after the first + // failure for retryable (5xx) responses. Zero means single-shot. + MaxRetries int + // InitialBackoff is the wait before the first retry; doubles per attempt. + // Set very small in tests. + InitialBackoff time.Duration +} + +// NewFromEnv returns a sink populated from SUPABASE_URL + +// SUPABASE_SERVICE_KEY (or MAI_SUPABASE_KEY) + IMAGEN_OWNER_USER_ID. +// Returns ok=false if the URL or key are missing — the caller treats that +// as "cloud-sync disabled by environment". +func NewFromEnv() (*Sink, bool) { + u := strings.TrimRight(os.Getenv("SUPABASE_URL"), "/") + if u == "" { + return nil, false + } + key := os.Getenv("SUPABASE_SERVICE_KEY") + if key == "" { + key = os.Getenv("MAI_SUPABASE_KEY") + } + if key == "" { + return nil, false + } + return &Sink{ + URL: u, + APIKey: key, + OwnerUserID: os.Getenv("IMAGEN_OWNER_USER_ID"), + HTTP: &http.Client{Timeout: 30 * time.Second}, + MaxRetries: 2, + InitialBackoff: time.Second, + }, true +} + +// SyncRequest is the cross-backend ingredient set Sync needs. Date is +// formatted as YYYY-MM-DD; Slug + Seed are reused from the local +// filename so storage_path mirrors disk layout. +type SyncRequest struct { + Date string + Slug string + Seed int64 + Ext string // "png", "jpg", "webp" — no leading dot + PNG []byte + MimeType string + + Prompt string + Backend string + Model string + Steps int + Width int + Height int + LatencyMs int + CostUSDEstimate *float64 + Sidecar map[string]any +} + +// SyncResult tells the caller what landed where. +type SyncResult struct { + StoragePath string // e.g. "2026-05-11/lighthouse-42.png" + ImageID string // imagen.images.id (UUID) +} + +// Sync uploads the bytes and inserts the metadata row. Returns the row's +// id and storage_path on success; any non-nil error is what the caller +// surfaces as "imagen: cloud sync: " and otherwise ignores. +func (s *Sink) Sync(ctx context.Context, req SyncRequest) (*SyncResult, error) { + if s == nil { + return nil, fmt.Errorf("cloud sink not configured") + } + if s.OwnerUserID == "" { + return nil, fmt.Errorf("owner_user_id not set (config or $IMAGEN_OWNER_USER_ID); refusing to insert NULL into imagen.images") + } + if req.Date == "" || req.Slug == "" { + return nil, fmt.Errorf("date and slug are required for storage_path") + } + ext := req.Ext + if ext == "" { + ext = "png" + } + storagePath := fmt.Sprintf("%s/%s-%d.%s", req.Date, req.Slug, req.Seed, ext) + + if err := s.upload(ctx, storagePath, req.PNG, req.MimeType); err != nil { + return nil, fmt.Errorf("storage upload: %w", err) + } + + id, err := s.insertRow(ctx, storagePath, req) + if err != nil { + return &SyncResult{StoragePath: storagePath}, fmt.Errorf("db insert: %w", err) + } + return &SyncResult{StoragePath: storagePath, ImageID: id}, nil +} + +// upload PUTs the PNG into the imagen-generated bucket. We use +// Content-Type so signed URLs render in the browser without a download +// prompt. POST would error on second-write; PUT (with x-upsert: true) is +// idempotent for re-runs of the same date+slug+seed. +func (s *Sink) upload(ctx context.Context, storagePath string, body []byte, mime string) error { + if mime == "" { + mime = "image/png" + } + endpoint := fmt.Sprintf("%s/storage/v1/object/%s/%s", s.URL, bucketName, pathEscape(storagePath)) + return s.doRetry(ctx, func(ctx context.Context) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, endpoint, bytes.NewReader(body)) + if err != nil { + return nil, err + } + req.Header.Set("apikey", s.APIKey) + req.Header.Set("Authorization", "Bearer "+s.APIKey) + req.Header.Set("Content-Type", mime) + req.Header.Set("x-upsert", "true") + return s.HTTP.Do(req) + }) +} + +// insertRow POSTs to PostgREST against the imagen schema. Prefer: +// return=representation gives us the inserted id back without a second +// round-trip. +func (s *Sink) insertRow(ctx context.Context, storagePath string, req SyncRequest) (string, error) { + row := map[string]any{ + "owner_user_id": s.OwnerUserID, + "prompt": req.Prompt, + "prompt_hash": hashPrompt(req.Prompt), + "backend": req.Backend, + "storage_path": storagePath, + } + if req.Model != "" { + row["model"] = req.Model + } + if req.Seed != 0 { + row["seed"] = req.Seed + } + if req.Steps != 0 { + row["steps"] = req.Steps + } + if req.Width != 0 { + row["width"] = req.Width + } + if req.Height != 0 { + row["height"] = req.Height + } + if req.LatencyMs != 0 { + row["latency_ms"] = req.LatencyMs + } + if req.CostUSDEstimate != nil { + row["cost_usd_estimate"] = *req.CostUSDEstimate + } + if len(req.Sidecar) > 0 { + row["sidecar"] = req.Sidecar + } + + body, err := json.Marshal(row) + if err != nil { + return "", fmt.Errorf("marshal row: %w", err) + } + + endpoint := s.URL + "/rest/v1/images" + + respBody, err := s.doRetryRead(ctx, func(ctx context.Context) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + return nil, err + } + req.Header.Set("apikey", s.APIKey) + req.Header.Set("Authorization", "Bearer "+s.APIKey) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept-Profile", supabaseSchema) + req.Header.Set("Content-Profile", supabaseSchema) + req.Header.Set("Prefer", "return=representation") + return s.HTTP.Do(req) + }) + if err != nil { + return "", err + } + var rows []struct { + ID string `json:"id"` + } + if err := json.Unmarshal(respBody, &rows); err != nil { + return "", fmt.Errorf("parse insert response: %w (body: %s)", err, snip(respBody)) + } + if len(rows) == 0 { + return "", fmt.Errorf("insert returned 0 rows (body: %s)", snip(respBody)) + } + return rows[0].ID, nil +} + +// SignedURL asks the Storage API for a time-limited URL. ttlSeconds is +// the validity window. Returned URL is host-qualified and ready to hand +// to a browser. +func (s *Sink) SignedURL(ctx context.Context, storagePath string, ttlSeconds int) (string, error) { + if s == nil { + return "", fmt.Errorf("cloud sink not configured") + } + if ttlSeconds <= 0 { + ttlSeconds = 3600 + } + endpoint := fmt.Sprintf("%s/storage/v1/object/sign/%s/%s", s.URL, bucketName, pathEscape(storagePath)) + body, err := json.Marshal(map[string]any{"expiresIn": ttlSeconds}) + if err != nil { + return "", err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + return "", err + } + req.Header.Set("apikey", s.APIKey) + req.Header.Set("Authorization", "Bearer "+s.APIKey) + req.Header.Set("Content-Type", "application/json") + resp, err := s.HTTP.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", fmt.Errorf("sign %d: %s", resp.StatusCode, snip(respBody)) + } + var parsed struct { + SignedURL string `json:"signedURL"` + } + if err := json.Unmarshal(respBody, &parsed); err != nil { + return "", fmt.Errorf("parse sign response: %w (body: %s)", err, snip(respBody)) + } + if parsed.SignedURL == "" { + return "", fmt.Errorf("empty signedURL in response: %s", snip(respBody)) + } + full := parsed.SignedURL + if strings.HasPrefix(full, "/") { + full = s.URL + full + } + return full, nil +} + +// doRetry runs op up to MaxRetries+1 times. 5xx and transport errors are +// retried with exponential backoff; 4xx surfaces immediately as a +// permanent error (caller's bug in the row, not a network blip). +func (s *Sink) doRetry(ctx context.Context, op func(context.Context) (*http.Response, error)) error { + _, err := s.doRetryRead(ctx, op) + return err +} + +// doRetryRead is the read-the-body variant. Returns the 2xx response +// body bytes; non-2xx is wrapped in an error. Same retry semantics as +// doRetry: 5xx/transport retries with exponential backoff, 4xx is fatal. +func (s *Sink) doRetryRead(ctx context.Context, op func(context.Context) (*http.Response, error)) ([]byte, error) { + backoff := s.InitialBackoff + if backoff == 0 { + backoff = time.Second + } + attempts := s.MaxRetries + 1 + if attempts < 1 { + attempts = 1 + } + var lastErr error + for i := 0; i < attempts; i++ { + if i > 0 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(backoff): + } + backoff *= 2 + } + resp, err := op(ctx) + if err != nil { + lastErr = err + continue + } + body, readErr := io.ReadAll(resp.Body) + resp.Body.Close() + if readErr != nil { + lastErr = fmt.Errorf("read body: %w", readErr) + continue + } + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return body, nil + } + if resp.StatusCode >= 400 && resp.StatusCode < 500 { + return nil, fmt.Errorf("%d: %s", resp.StatusCode, snip(body)) + } + lastErr = fmt.Errorf("%d: %s", resp.StatusCode, snip(body)) + } + return nil, lastErr +} + +func hashPrompt(p string) string { + sum := sha256.Sum256([]byte(p)) + return hex.EncodeToString(sum[:]) +} + +// pathEscape encodes each path segment but keeps the slashes — the +// Storage API treats the part after the bucket name as a virtual file +// path with directory separators. +func pathEscape(p string) string { + parts := strings.Split(p, "/") + for i, seg := range parts { + parts[i] = url.PathEscape(seg) + } + return path.Join(parts...) +} + +func snip(b []byte) string { + const max = 500 + s := strings.TrimSpace(string(b)) + if len(s) > max { + s = s[:max] + "..." + } + return s +} diff --git a/internal/cloud/cloud_test.go b/internal/cloud/cloud_test.go new file mode 100644 index 0000000..3fb5954 --- /dev/null +++ b/internal/cloud/cloud_test.go @@ -0,0 +1,326 @@ +package cloud + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync/atomic" + "testing" + "time" +) + +// fakeSupabase is a tiny stand-in for Supabase Storage + PostgREST. It +// records what came in and returns canned responses based on path. +type fakeSupabase struct { + t *testing.T + mux *http.ServeMux + server *httptest.Server + uploadCalls int32 + insertCalls int32 + uploadBytes []byte + uploadHdr http.Header + insertBody []byte + insertHdr http.Header +} + +func newFakeSupabase(t *testing.T, opts ...func(*fakeSupabase)) *fakeSupabase { + f := &fakeSupabase{t: t} + f.mux = http.NewServeMux() + // Storage upload — anything under /storage/v1/object//... + f.mux.HandleFunc("/storage/v1/object/imagen-generated/", func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&f.uploadCalls, 1) + body, _ := io.ReadAll(r.Body) + f.uploadBytes = body + f.uploadHdr = r.Header.Clone() + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"Key":"imagen-generated/somepath"}`)) + }) + // Storage sign URL + f.mux.HandleFunc("/storage/v1/object/sign/imagen-generated/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"signedURL":"/storage/v1/object/sign/imagen-generated/some.png?token=abc"}`)) + }) + // PostgREST insert + f.mux.HandleFunc("/rest/v1/images", func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&f.insertCalls, 1) + body, _ := io.ReadAll(r.Body) + f.insertBody = body + f.insertHdr = r.Header.Clone() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + w.Write([]byte(`[{"id":"00000000-0000-0000-0000-000000000abc"}]`)) + }) + for _, opt := range opts { + opt(f) + } + f.server = httptest.NewServer(f.mux) + t.Cleanup(f.server.Close) + return f +} + +func newSink(server *httptest.Server) *Sink { + return &Sink{ + URL: server.URL, + APIKey: "fake-service-key", + OwnerUserID: "00000000-0000-0000-0000-000000000001", + HTTP: server.Client(), + MaxRetries: 2, + InitialBackoff: time.Millisecond, + } +} + +func TestSyncHappyPath(t *testing.T) { + f := newFakeSupabase(t) + s := newSink(f.server) + + cost := 0.003 + res, err := s.Sync(context.Background(), SyncRequest{ + Date: "2026-05-11", + Slug: "lighthouse", + Seed: 42, + Ext: "png", + PNG: []byte("PNGbytes"), + MimeType: "image/png", + Prompt: "a tiny lighthouse on a stormy cliff", + Backend: "flux-schnell-local", + Model: "flux1-schnell", + Steps: 4, + Width: 1024, + Height: 1024, + LatencyMs: 1500, + CostUSDEstimate: &cost, + Sidecar: map[string]any{ + "timestamp": "2026-05-11T01:30:00Z", + "backend": "flux-schnell-local", + }, + }) + if err != nil { + t.Fatalf("Sync: %v", err) + } + if res.StoragePath != "2026-05-11/lighthouse-42.png" { + t.Errorf("storage_path = %q", res.StoragePath) + } + if res.ImageID != "00000000-0000-0000-0000-000000000abc" { + t.Errorf("image_id = %q", res.ImageID) + } + if got := atomic.LoadInt32(&f.uploadCalls); got != 1 { + t.Errorf("upload calls = %d, want 1", got) + } + if got := atomic.LoadInt32(&f.insertCalls); got != 1 { + t.Errorf("insert calls = %d, want 1", got) + } + if !bytes.Equal(f.uploadBytes, []byte("PNGbytes")) { + t.Errorf("uploaded bytes = %q", f.uploadBytes) + } + + // Verify the row payload carries the prompt + computed hash + non-zero + // metadata. Empty fields should be omitted from the JSON body so RLS + // won't see surprise keys. + var row map[string]any + if err := json.Unmarshal(f.insertBody, &row); err != nil { + t.Fatalf("insert body parse: %v\n%s", err, f.insertBody) + } + if row["prompt"] != "a tiny lighthouse on a stormy cliff" { + t.Errorf("row.prompt = %v", row["prompt"]) + } + if row["owner_user_id"] != "00000000-0000-0000-0000-000000000001" { + t.Errorf("row.owner_user_id = %v", row["owner_user_id"]) + } + if row["storage_path"] != "2026-05-11/lighthouse-42.png" { + t.Errorf("row.storage_path = %v", row["storage_path"]) + } + hash, _ := row["prompt_hash"].(string) + if len(hash) != 64 { + t.Errorf("prompt_hash should be 64-char sha256 hex, got %q", hash) + } + if row["backend"] != "flux-schnell-local" { + t.Errorf("row.backend = %v", row["backend"]) + } + if row["seed"].(float64) != 42 { + t.Errorf("row.seed = %v", row["seed"]) + } + if row["latency_ms"].(float64) != 1500 { + t.Errorf("row.latency_ms = %v", row["latency_ms"]) + } + if row["cost_usd_estimate"].(float64) != 0.003 { + t.Errorf("row.cost = %v", row["cost_usd_estimate"]) + } + if row["sidecar"] == nil { + t.Errorf("row.sidecar missing") + } + + // PostgREST schema headers — hardcoded to "imagen". + if got := f.insertHdr.Get("Accept-Profile"); got != "imagen" { + t.Errorf("Accept-Profile = %q", got) + } + if got := f.insertHdr.Get("Content-Profile"); got != "imagen" { + t.Errorf("Content-Profile = %q", got) + } + if got := f.insertHdr.Get("Authorization"); !strings.HasPrefix(got, "Bearer ") { + t.Errorf("Authorization = %q", got) + } + + // Storage upsert should be set so re-runs of the same date+slug+seed + // don't fail with 409. + if got := f.uploadHdr.Get("x-upsert"); got != "true" { + t.Errorf("x-upsert = %q", got) + } +} + +func TestSyncRetryOn5xx(t *testing.T) { + var uploadAttempts int32 + mux := http.NewServeMux() + mux.HandleFunc("/storage/v1/object/imagen-generated/", func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&uploadAttempts, 1) + // Two 503s, then OK. + if n < 3 { + http.Error(w, "service unavailable", http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + }) + mux.HandleFunc("/rest/v1/images", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusCreated) + w.Write([]byte(`[{"id":"row-id"}]`)) + }) + srv := httptest.NewServer(mux) + defer srv.Close() + s := newSink(srv) + + res, err := s.Sync(context.Background(), SyncRequest{ + Date: "2026-05-11", Slug: "x", Seed: 1, Ext: "png", + PNG: []byte("p"), Prompt: "p", Backend: "b", + }) + if err != nil { + t.Fatalf("Sync (with retry): %v", err) + } + if got := atomic.LoadInt32(&uploadAttempts); got != 3 { + t.Errorf("upload attempts = %d, want 3", got) + } + if res.ImageID != "row-id" { + t.Errorf("image_id = %q", res.ImageID) + } +} + +func TestSyncNoRetryOn4xx(t *testing.T) { + var uploadAttempts int32 + mux := http.NewServeMux() + mux.HandleFunc("/storage/v1/object/imagen-generated/", func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&uploadAttempts, 1) + http.Error(w, `{"message":"bad request"}`, http.StatusBadRequest) + }) + srv := httptest.NewServer(mux) + defer srv.Close() + s := newSink(srv) + + _, err := s.Sync(context.Background(), SyncRequest{ + Date: "2026-05-11", Slug: "x", Seed: 1, Ext: "png", + PNG: []byte("p"), Prompt: "p", Backend: "b", + }) + if err == nil { + t.Fatal("expected error on 400") + } + if !strings.Contains(err.Error(), "400") { + t.Errorf("error should mention 400 status: %v", err) + } + if got := atomic.LoadInt32(&uploadAttempts); got != 1 { + t.Errorf("upload attempts = %d, want 1 (no retry on 4xx)", got) + } +} + +func TestSyncMissingOwnerUserID(t *testing.T) { + srv := httptest.NewServer(http.NewServeMux()) + defer srv.Close() + s := &Sink{ + URL: srv.URL, + APIKey: "k", + // OwnerUserID intentionally empty. + HTTP: srv.Client(), + InitialBackoff: time.Millisecond, + } + _, err := s.Sync(context.Background(), SyncRequest{ + Date: "2026-05-11", Slug: "x", Seed: 1, Ext: "png", + PNG: []byte("p"), Prompt: "p", Backend: "b", + }) + if err == nil { + t.Fatal("expected error when owner_user_id unset") + } + if !strings.Contains(err.Error(), "owner_user_id") { + t.Errorf("error should mention owner_user_id: %v", err) + } +} + +func TestSyncRequiresDateAndSlug(t *testing.T) { + srv := httptest.NewServer(http.NewServeMux()) + defer srv.Close() + s := newSink(srv) + _, err := s.Sync(context.Background(), SyncRequest{ + Slug: "x", Seed: 1, Ext: "png", + PNG: []byte("p"), Prompt: "p", Backend: "b", + }) + if err == nil { + t.Fatal("expected error for missing date") + } +} + +func TestSignedURL(t *testing.T) { + f := newFakeSupabase(t) + s := newSink(f.server) + got, err := s.SignedURL(context.Background(), "2026-05-11/x.png", 60) + if err != nil { + t.Fatalf("SignedURL: %v", err) + } + want := f.server.URL + "/storage/v1/object/sign/imagen-generated/some.png?token=abc" + if got != want { + t.Errorf("signed URL = %q, want %q", got, want) + } +} + +func TestSyncDBFailureSurfacesPathOnError(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/storage/v1/object/imagen-generated/", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + mux.HandleFunc("/rest/v1/images", func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "schema cache miss", http.StatusInternalServerError) + }) + srv := httptest.NewServer(mux) + defer srv.Close() + s := newSink(srv) + res, err := s.Sync(context.Background(), SyncRequest{ + Date: "2026-05-11", Slug: "x", Seed: 9, Ext: "png", + PNG: []byte("p"), Prompt: "p", Backend: "b", + }) + if err == nil { + t.Fatal("expected error from DB insert failure") + } + // Storage upload succeeded — caller can still see the upload landed. + if res == nil || res.StoragePath != "2026-05-11/x-9.png" { + t.Errorf("expected storage_path on partial success, got %+v", res) + } +} + +func TestPathEscape(t *testing.T) { + cases := map[string]string{ + "2026-05-11/lighthouse-42.png": "2026-05-11/lighthouse-42.png", + "2026-05-11/two words.png": "2026-05-11/two%20words.png", + "with#hash/and?query.png": "with%23hash/and%3Fquery.png", + } + for in, want := range cases { + got := pathEscape(in) + if got != want { + t.Errorf("pathEscape(%q) = %q, want %q", in, got, want) + } + // Sanity: every part should round-trip via url.PathUnescape. + for _, seg := range strings.Split(got, "/") { + if _, err := url.PathUnescape(seg); err != nil { + t.Errorf("segment %q failed unescape: %v", seg, err) + } + } + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 5c86149..a8a809e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,10 @@ import ( // Config is the top-level shape of imagen.yaml. type Config struct { DefaultBackend string `yaml:"default_backend"` + // OwnerUserID is m's auth.users.id on msupabase. The cloud-sync writer + // uses it to populate imagen.images.owner_user_id (NOT NULL, owns RLS). + // Empty disables DB inserts even when cloud_sync is on. + OwnerUserID string `yaml:"owner_user_id"` Output OutputConfig `yaml:"output"` Backends map[string]BackendSpec `yaml:"backends"` } @@ -29,6 +33,11 @@ type OutputConfig struct { // Empty / unset is treated as "auto". $IMAGEN_PREVIEW and the // --preview/--no-preview flags override this in turn. Preview string `yaml:"preview"` + // CloudSync controls whether successful generations also upload to + // Supabase Storage and insert into imagen.images. Tri-state mirroring + // Preview: "auto" (default — on when SUPABASE_URL + SUPABASE_SERVICE_KEY + // are set), "on" (errors if env unset), "off". --no-cloud overrides. + CloudSync string `yaml:"cloud_sync"` } // BackendSpec is one entry under `backends:`. Type identifies the adapter; @@ -88,6 +97,11 @@ func (c *Config) Validate() error { default: return fmt.Errorf("output.preview = %q (must be auto|on|off)", c.Output.Preview) } + switch c.Output.CloudSync { + case "", "auto", "on", "off": + default: + return fmt.Errorf("output.cloud_sync = %q (must be auto|on|off)", c.Output.CloudSync) + } for name, spec := range c.Backends { if name == "" { return errors.New("empty backend name") @@ -107,6 +121,11 @@ const Sample = `# imagen.yaml — config for the imagen CLI. default_backend: flux-schnell-local +# Owner UUID for the cloud-sync row in imagen.images. Look up via: +# SELECT id FROM auth.users WHERE email = ''; +# Empty disables imagen.images inserts even when cloud_sync is on. +owner_user_id: "" + output: directory: ~/Pictures/imagen naming: "{date}-{slug}-{seed}.png" @@ -116,6 +135,13 @@ output: # on: always preview (errors outside a tmux session). # off: never preview (use this for batch / CI callers). preview: auto + # Sync the PNG to Supabase Storage (bucket: imagen-generated) and insert + # a row into imagen.images. Reads SUPABASE_URL + SUPABASE_SERVICE_KEY + # from env (same as mai.imagen_usage cost-tracking). + # auto (default): on iff env is configured AND owner_user_id is set. + # on: always upload (errors if env or owner_user_id is missing). + # off: never upload. --no-cloud also forces off per-call. + cloud_sync: auto backends: flux-schnell-local: diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 222c788..1278978 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -73,6 +73,39 @@ func TestValidatePreviewMode(t *testing.T) { } } +func TestValidateCloudSyncMode(t *testing.T) { + for _, mode := range []string{"", "auto", "on", "off"} { + c := &Config{Output: OutputConfig{CloudSync: mode}} + if err := c.Validate(); err != nil { + t.Errorf("cloud_sync=%q: unexpected error %v", mode, err) + } + } + bad := &Config{Output: OutputConfig{CloudSync: "yes"}} + if err := bad.Validate(); err == nil { + t.Errorf("expected error for invalid cloud_sync value") + } +} + +func TestSampleParsesCloudSyncAuto(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "imagen.yaml") + if err := os.WriteFile(path, []byte(Sample), 0o644); err != nil { + t.Fatalf("write sample: %v", err) + } + cfg, err := Load(path) + if err != nil { + t.Fatalf("Load: %v", err) + } + if cfg.Output.CloudSync != "auto" { + t.Errorf("Output.CloudSync = %q, want auto", cfg.Output.CloudSync) + } + // owner_user_id is intentionally empty in the sample — operators fill + // it in after looking up their auth.users.id. + if cfg.OwnerUserID != "" { + t.Errorf("Sample OwnerUserID should be empty, got %q", cfg.OwnerUserID) + } +} + func TestSampleParsesPreviewAuto(t *testing.T) { dir := t.TempDir() path := filepath.Join(dir, "imagen.yaml") diff --git a/internal/output/output.go b/internal/output/output.go index 878f224..c67d2fd 100644 --- a/internal/output/output.go +++ b/internal/output/output.go @@ -35,6 +35,13 @@ type Inputs struct { type Outputs struct { ImagePath string SidecarPath string + // Date is the YYYY-MM-DD the writer used for the filename. Cloud sync + // reuses this so storage_path matches the local filename's date. + Date string + // Slug is the filename-safe prompt fragment the writer used. + Slug string + // Seed is the seed value baked into the filename. + Seed int64 } // Write streams img to disk and, if enabled, writes a sidecar. The image @@ -50,10 +57,12 @@ func (w *Writer) Write(img io.Reader, in Inputs) (*Outputs, error) { if tmpl == "" { tmpl = "{date}-{slug}-{seed}.{ext}" } + date := now.Format("2006-01-02") + slug := Slug(in.Prompt) name := renderTemplate(tmpl, map[string]string{ - "date": now.Format("2006-01-02"), + "date": date, "time": now.Format("150405"), - "slug": Slug(in.Prompt), + "slug": slug, "seed": fmt.Sprintf("%d", in.Seed), "backend": in.Backend, "ext": strings.TrimPrefix(ext, "."), @@ -80,7 +89,7 @@ func (w *Writer) Write(img io.Reader, in Inputs) (*Outputs, error) { return nil, fmt.Errorf("close %s: %w", imagePath, err) } - out := &Outputs{ImagePath: imagePath} + out := &Outputs{ImagePath: imagePath, Date: date, Slug: slug, Seed: in.Seed} if w.WriteSidecar { sidecar := imagePath + ".json" @@ -122,7 +131,7 @@ func (w *Writer) WriteToPath(img io.Reader, path string, in Inputs) (*Outputs, e if err := f.Close(); err != nil { return nil, fmt.Errorf("close %s: %w", path, err) } - out := &Outputs{ImagePath: path} + out := &Outputs{ImagePath: path, Date: now.Format("2006-01-02"), Slug: Slug(in.Prompt), Seed: in.Seed} if w.WriteSidecar { sidecar := path + ".json" body := map[string]any{