From 64120c27d7caf497039d8a056931873f4c0d86c1 Mon Sep 17 00:00:00 2001 From: mAi Date: Mon, 11 May 2026 10:48:12 +0200 Subject: [PATCH] mAi: #9 - imagen.series (batch tries 1-10 + selection) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Schema (applied via migration imagen_series_init): - imagen.series parent table (prompt + params + count CHECK 1..10 + selected_image_id) - imagen.jobs += series_id (FK) + series_idx - imagen.images += series_id (FK) - Owner-scoped RLS on series (SELECT/INSERT/UPDATE) + grants - Partial indexes WHERE series_id IS NOT NULL on both child tables Worker pipeline: - worker.Job += SeriesID, populated from imagen.jobs.series_id via the claim query. - cloud.SyncRequest += SeriesID; insertRow writes series_id when non-empty, omits the key when empty so solo runs leave the column NULL. - maybeCloudSync threads seriesID from job.SeriesID through to the cloud sink. generate.go (CLI) always passes "" — solo path unchanged. Tests: - worker: SeriesID propagates from Job to fakePipeline.lastJob unchanged, solo job keeps it empty. - cloud: SyncRequest.SeriesID lands as row.series_id in the POST body; empty SeriesID omits the key entirely. Refs ImaGen#9. --- cmd/imagen/generate.go | 8 ++++-- cmd/imagen/worker.go | 9 +++++-- internal/cloud/cloud.go | 9 +++++++ internal/cloud/cloud_test.go | 48 ++++++++++++++++++++++++++++++++++ internal/worker/worker.go | 4 +++ internal/worker/worker_test.go | 44 +++++++++++++++++++++++++++++++ 6 files changed, 118 insertions(+), 4 deletions(-) diff --git a/cmd/imagen/generate.go b/cmd/imagen/generate.go index cbd5ad6..dd6dba2 100644 --- a/cmd/imagen/generate.go +++ b/cmd/imagen/generate.go @@ -132,7 +132,7 @@ func runGenerate(ctx context.Context, args []string) error { fmt.Fprintln(os.Stderr, "sidecar:", paths.SidecarPath) } - if result, err := maybeCloudSync(ctx, cfg, noCloud, "", paths, in, res, w, h); err != nil { + if result, err := maybeCloudSync(ctx, cfg, noCloud, "", "", paths, in, res, w, h); err != nil { // cloud-sync failures are warnings — the image already wrote. fmt.Fprintln(os.Stderr, "imagen: cloud sync:", err) } else if result != nil && result.ImageID != "" { @@ -173,7 +173,10 @@ func resolveCloudSyncMode(cfg *config.Config, noCloudFlag bool, env string) (str // that need the imagen.images.id (e.g. the worker linking a job row) can pick // it up. ownerOverride, when non-empty, wins over config + env — the worker // passes the job row's owner_user_id so each job is attributed correctly. -func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, ownerOverride string, paths *output.Outputs, in output.Inputs, res *backend.Result, width, height int) (*cloud.SyncResult, error) { +// seriesID, when non-empty, lands on imagen.images.series_id so the +// list-page query (`WHERE series_id IS NULL`) hides series members from +// the flat grid; empty means solo run. +func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, ownerOverride, seriesID string, paths *output.Outputs, in output.Inputs, res *backend.Result, width, height int) (*cloud.SyncResult, error) { mode, err := resolveCloudSyncMode(cfg, noCloud, os.Getenv("IMAGEN_CLOUD_SYNC")) if err != nil { return nil, err @@ -262,6 +265,7 @@ func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, owner LatencyMs: latency, CostUSDEstimate: cost, Sidecar: sidecar, + SeriesID: seriesID, } syncCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() diff --git a/cmd/imagen/worker.go b/cmd/imagen/worker.go index 33ee4e2..97546e5 100644 --- a/cmd/imagen/worker.go +++ b/cmd/imagen/worker.go @@ -112,6 +112,9 @@ func (q *pgxQueue) Close() { // returns it. FOR UPDATE SKIP LOCKED is belt + braces against a second worker // process — out of scope for v1 but cheap insurance. func (q *pgxQueue) ClaimNextPending(ctx context.Context) (*worker.Job, error) { + // series_id is nullable on imagen.jobs (solo run when NULL); cast to text + // with COALESCE so pgx scans into a plain Go string. Empty string = + // solo run; the pipeline skips series propagation in that case. const stmt = ` UPDATE imagen.jobs SET status='running', started_at=now() @@ -126,11 +129,13 @@ func (q *pgxQueue) ClaimNextPending(ctx context.Context) (*worker.Job, error) { COALESCE(model,''), COALESCE(width, 0), COALESCE(height, 0), COALESCE(steps, 0), COALESCE(seed, 0), - COALESCE(style,'')` + COALESCE(style,''), + COALESCE(series_id::text, '')` var j worker.Job err := q.conn.QueryRow(ctx, stmt).Scan( &j.ID, &j.OwnerUserID, &j.Prompt, &j.Backend, &j.Model, &j.Width, &j.Height, &j.Steps, &j.Seed, &j.Style, + &j.SeriesID, ) if errors.Is(err, pgx.ErrNoRows) { return nil, nil @@ -258,7 +263,7 @@ func (p *workerPipeline) Run(ctx context.Context, job worker.Job) worker.Outcome // config, the worker can't serve flexsiebels at all. return worker.Outcome{Err: fmt.Errorf("output.cloud_sync=off in config; the worker requires cloud_sync=on or auto")} } - syncRes, syncErr := maybeCloudSync(ctx, p.cfg, false, job.OwnerUserID, paths, in, res, dimOrFallback(job.Width, res, "width"), dimOrFallback(job.Height, res, "height")) + syncRes, syncErr := maybeCloudSync(ctx, p.cfg, false, job.OwnerUserID, job.SeriesID, paths, in, res, dimOrFallback(job.Width, res, "width"), dimOrFallback(job.Height, res, "height")) if syncErr != nil { return worker.Outcome{Err: fmt.Errorf("cloud sync: %w", syncErr)} } diff --git a/internal/cloud/cloud.go b/internal/cloud/cloud.go index 3ed8994..15f5e61 100644 --- a/internal/cloud/cloud.go +++ b/internal/cloud/cloud.go @@ -100,6 +100,12 @@ type SyncRequest struct { LatencyMs int CostUSDEstimate *float64 Sidecar map[string]any + + // SeriesID is the parent imagen.series row when this image is one of + // N tries in a batch. Empty means a solo run — the column stays NULL, + // which keeps the row visible on the main list-page query + // (`WHERE series_id IS NULL`). + SeriesID string } // SyncResult tells the caller what landed where. @@ -195,6 +201,9 @@ func (s *Sink) insertRow(ctx context.Context, storagePath string, req SyncReques if len(req.Sidecar) > 0 { row["sidecar"] = req.Sidecar } + if req.SeriesID != "" { + row["series_id"] = req.SeriesID + } body, err := json.Marshal(row) if err != nil { diff --git a/internal/cloud/cloud_test.go b/internal/cloud/cloud_test.go index 3fb5954..4e2cfcd 100644 --- a/internal/cloud/cloud_test.go +++ b/internal/cloud/cloud_test.go @@ -305,6 +305,54 @@ func TestSyncDBFailureSurfacesPathOnError(t *testing.T) { } } +// TestSyncWritesSeriesID is the second half of the ImaGen#9 propagation +// contract: when SeriesID is non-empty, the POST body to imagen.images +// carries `series_id`. When empty, the key is omitted entirely so the +// row's series_id stays NULL (solo-run path, list-page query +// `WHERE series_id IS NULL` keeps showing it). +func TestSyncWritesSeriesID(t *testing.T) { + const seriesID = "22222222-2222-2222-2222-222222222222" + f := newFakeSupabase(t) + s := newSink(f.server) + + _, err := s.Sync(context.Background(), SyncRequest{ + Date: "2026-05-11", Slug: "x", Seed: 1, Ext: "png", + PNG: []byte("p"), Prompt: "p", Backend: "b", + SeriesID: seriesID, + }) + if err != nil { + t.Fatalf("Sync: %v", err) + } + var row map[string]any + if err := json.Unmarshal(f.insertBody, &row); err != nil { + t.Fatalf("parse insert body: %v\n%s", err, f.insertBody) + } + if row["series_id"] != seriesID { + t.Fatalf("row.series_id = %v want %q", row["series_id"], seriesID) + } +} + +func TestSyncOmitsSeriesIDWhenEmpty(t *testing.T) { + f := newFakeSupabase(t) + s := newSink(f.server) + + _, err := s.Sync(context.Background(), SyncRequest{ + Date: "2026-05-11", Slug: "x", Seed: 1, Ext: "png", + PNG: []byte("p"), Prompt: "p", Backend: "b", + // SeriesID intentionally empty. + }) + if err != nil { + t.Fatalf("Sync: %v", err) + } + var row map[string]any + if err := json.Unmarshal(f.insertBody, &row); err != nil { + t.Fatalf("parse insert body: %v\n%s", err, f.insertBody) + } + if _, present := row["series_id"]; present { + t.Fatalf("solo run should omit series_id from POST body, got %v", row["series_id"]) + } +} + func TestPathEscape(t *testing.T) { cases := map[string]string{ "2026-05-11/lighthouse-42.png": "2026-05-11/lighthouse-42.png", diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 4474994..135b149 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -30,6 +30,10 @@ type Job struct { Steps int Seed int64 Style string + // SeriesID is the parent imagen.series row when this job is one of N + // tries in a batch. Empty means a solo run — the pipeline must not + // propagate a series_id onto the resulting imagen.images row. + SeriesID string } // Outcome is what the pipeline reports back per job. ImageID is the diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index 0bcec30..7e209a4 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -303,6 +303,50 @@ func TestWorker_InflightJobFinishesAfterShutdown(t *testing.T) { } } +// TestWorker_PropagatesSeriesIDToPipeline verifies the worker hands the +// Job's SeriesID through to the pipeline unchanged. The pipeline owns the +// cloud-sync side of the propagation (cloud.SyncRequest.SeriesID lands on +// imagen.images.series_id) — see cloud_test.go for that half — so the +// worker contract is simply: don't drop or rewrite SeriesID between +// claim and Run. +func TestWorker_PropagatesSeriesIDToPipeline(t *testing.T) { + const seriesID = "11111111-1111-1111-1111-111111111111" + q := newFakeQueue(Job{ + ID: "j-series", + Prompt: "p", + Backend: "mock", + SeriesID: seriesID, + }) + p := &fakePipeline{results: map[string]Outcome{"j-series": {ImageID: "img-series"}}} + w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { time.Sleep(80 * time.Millisecond); cancel() }() + if err := w.Run(ctx); err != nil { + t.Fatalf("Run: %v", err) + } + if got := p.lastJob.SeriesID; got != seriesID { + t.Fatalf("pipeline saw SeriesID=%q want %q", got, seriesID) + } + if got := q.state["j-series"]; got != "done" { + t.Fatalf("state=%q want done", got) + } +} + +// TestWorker_SoloJobLeavesSeriesIDEmpty is the negative case — a job +// claimed with no series row keeps the field empty all the way to the +// pipeline so cloud-sync writes NULL into imagen.images.series_id. +func TestWorker_SoloJobLeavesSeriesIDEmpty(t *testing.T) { + q := newFakeQueue(Job{ID: "j-solo", Prompt: "p", Backend: "mock"}) + p := &fakePipeline{results: map[string]Outcome{"j-solo": {ImageID: "img-solo"}}} + w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { time.Sleep(80 * time.Millisecond); cancel() }() + _ = w.Run(ctx) + if got := p.lastJob.SeriesID; got != "" { + t.Fatalf("solo job pipeline.lastJob.SeriesID=%q want empty", got) + } +} + func TestWorker_TransientClaimErrorDoesNotKillLoop(t *testing.T) { // First claim returns an error; the loop should log and try again on the // next wake — it must not propagate the error and exit.