mAi: #8 - imagen.jobs queue + worker subcommand (flexsiebels write path)

Async write path for the flexsiebels owner-mode UI: flexsiebels INSERTs into
imagen.jobs, the worker on mRiver claims pending rows via LISTEN/NOTIFY +
5s safety poll, runs the same generate pipeline imagen generate uses, and
writes the result through internal/cloud into imagen.images.

- Schema migration imagen_jobs_init: table + status CHECK + two indexes +
  owner-scoped RLS + grants + AFTER INSERT trigger publishing on the
  imagen_jobs channel via pg_notify.
- internal/worker: DB-agnostic loop over a Queue interface. Drains the
  whole pending backlog on each wake. Job-scoped contexts are derived
  from Background so SIGTERM lets the in-flight generation finish (no
  half-state). ResetStaleRunning at startup unsticks rows left over from
  a previous crash. Eight unit tests cover the done / failed / missing-id /
  drain / NOTIFY-wake / shutdown / transient-error paths against a fake
  queue (no real Postgres in CI).
- cmd/imagen/worker.go: pgx-backed Queue (one dedicated conn for LISTEN +
  UPDATE), plus the workerPipeline that reuses buildBackend +
  attachUsageSink + prompt.Apply + buildWriter + maybeCloudSync. The
  per-job owner_user_id overrides the env-level fallback so each row in
  imagen.images is attributed correctly.
- maybeCloudSync now returns (*cloud.SyncResult, error) so the worker can
  link imagen.jobs.image_id to the inserted imagen.images row. The CLI
  generate path keeps printing its stderr summary unchanged.
- scripts/imagen-worker.service + .env.example for the systemd --user unit
  on mRiver. EnvironmentFile lives in ~/.dotfiles and is never committed.
- docs/setup-worker-mriver.md walks through installation + the spec's
  SQL-INSERT smoke; docs/architecture.md grows an "async write path"
  section.
- worker_integration_test.go (env-guarded by IMAGEN_WORKER_INTEGRATION=1)
  drives one real job through the full pipeline against msupabase using
  the mock backend, then verifies imagen.images + Storage object landed
  and the row flipped to done with image_id linked. Verified end-to-end:
  pickup latency ~7ms, total 74ms, failure path captures error text.
This commit is contained in:
mAi
2026-05-11 10:23:33 +02:00
parent cb6656c436
commit 2758c5a500
13 changed files with 1205 additions and 27 deletions

View File

@@ -132,9 +132,11 @@ func runGenerate(ctx context.Context, args []string) error {
fmt.Fprintln(os.Stderr, "sidecar:", paths.SidecarPath)
}
if err := maybeCloudSync(ctx, cfg, noCloud, paths, in, res, w, h); err != nil {
if result, err := maybeCloudSync(ctx, cfg, noCloud, "", paths, in, res, w, h); err != nil {
// cloud-sync failures are warnings — the image already wrote.
fmt.Fprintln(os.Stderr, "imagen: cloud sync:", err)
} else if result != nil && result.ImageID != "" {
fmt.Fprintf(os.Stderr, "cloud: imagen.images.id=%s storage_path=%s\n", result.ImageID, result.StoragePath)
}
if err := maybePreview(cfg, previewOn, previewOff, paths.ImagePath, rawPrompt); err != nil {
@@ -167,39 +169,45 @@ func resolveCloudSyncMode(cfg *config.Config, noCloudFlag bool, env string) (str
}
// maybeCloudSync resolves the effective mode and, if it says yes, uploads
// the PNG and inserts the row. Always non-fatal — the image already wrote.
func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, paths *output.Outputs, in output.Inputs, res *backend.Result, width, height int) error {
// the PNG and inserts the row. Returns the SyncResult on success so callers
// that need the imagen.images.id (e.g. the worker linking a job row) can pick
// it up. ownerOverride, when non-empty, wins over config + env — the worker
// passes the job row's owner_user_id so each job is attributed correctly.
func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, ownerOverride string, paths *output.Outputs, in output.Inputs, res *backend.Result, width, height int) (*cloud.SyncResult, error) {
mode, err := resolveCloudSyncMode(cfg, noCloud, os.Getenv("IMAGEN_CLOUD_SYNC"))
if err != nil {
return err
return nil, err
}
if mode == "off" {
return nil
return nil, nil
}
sink, ok := cloud.NewFromEnv()
if !ok {
if mode == "on" {
return fmt.Errorf("cloud_sync=on but SUPABASE_URL / SUPABASE_SERVICE_KEY not set in env")
return nil, fmt.Errorf("cloud_sync=on but SUPABASE_URL / SUPABASE_SERVICE_KEY not set in env")
}
// auto + missing env = silent skip.
return nil
return nil, nil
}
// Config-supplied owner_user_id takes precedence over $IMAGEN_OWNER_USER_ID.
if cfg != nil && cfg.OwnerUserID != "" {
switch {
case ownerOverride != "":
sink.OwnerUserID = ownerOverride
case cfg != nil && cfg.OwnerUserID != "":
// Config-supplied owner_user_id takes precedence over $IMAGEN_OWNER_USER_ID.
sink.OwnerUserID = cfg.OwnerUserID
}
if sink.OwnerUserID == "" {
if mode == "on" {
return fmt.Errorf("cloud_sync=on but owner_user_id not set in config and $IMAGEN_OWNER_USER_ID is empty")
return nil, fmt.Errorf("cloud_sync=on but owner_user_id not set in config and $IMAGEN_OWNER_USER_ID is empty")
}
// auto + missing UUID = silent skip.
return nil
return nil, nil
}
pngBytes, readErr := os.ReadFile(paths.ImagePath)
if readErr != nil {
return fmt.Errorf("read local image: %w", readErr)
return nil, fmt.Errorf("read local image: %w", readErr)
}
// Reuse the writer's date/slug/seed so storage_path mirrors the local
@@ -257,14 +265,7 @@ func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, paths
}
syncCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
result, err := sink.Sync(syncCtx, syncReq)
if err != nil {
return err
}
if result != nil && result.ImageID != "" {
fmt.Fprintf(os.Stderr, "cloud: imagen.images.id=%s storage_path=%s\n", result.ImageID, result.StoragePath)
}
return nil
return sink.Sync(syncCtx, syncReq)
}
func metaString(m map[string]any, key string) string {

View File

@@ -18,6 +18,7 @@ const helpText = `imagen — model-agnostic image generation
Usage:
imagen generate <prompt> [flags] generate one image
imagen worker [flags] consume the imagen.jobs queue (daemon)
imagen backends list registered backend types
imagen config init print a sample imagen.yaml on stdout
imagen config validate validate the active config
@@ -45,6 +46,8 @@ func main() {
switch os.Args[1] {
case "generate":
err = runGenerate(ctx, args)
case "worker":
err = runWorker(ctx, args)
case "backends":
err = runBackends(args)
case "config":

287
cmd/imagen/worker.go Normal file
View File

@@ -0,0 +1,287 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
"strings"
"time"
"github.com/jackc/pgx/v5"
"mgit.msbls.de/m/ImaGen/internal/backend"
"mgit.msbls.de/m/ImaGen/internal/config"
"mgit.msbls.de/m/ImaGen/internal/output"
"mgit.msbls.de/m/ImaGen/internal/prompt"
"mgit.msbls.de/m/ImaGen/internal/worker"
)
// runWorker is the `imagen worker` subcommand: a long-running daemon that
// consumes the imagen.jobs queue and writes results into imagen.images via
// the same cloud-sync path generate uses.
func runWorker(ctx context.Context, args []string) error {
fs := flag.NewFlagSet("worker", flag.ContinueOnError)
var (
configPath string
pollInterval time.Duration
jobTimeout time.Duration
)
fs.StringVar(&configPath, "config", "", "config file path (default: ~/.config/imagen.yaml)")
fs.DurationVar(&pollInterval, "poll-interval", 5*time.Second, "safety-poll cadence between LISTEN wakeups")
fs.DurationVar(&jobTimeout, "job-timeout", 5*time.Minute, "max wall-time per job before the worker marks it failed")
fs.Usage = func() {
fmt.Fprintln(fs.Output(), `Usage: imagen worker [flags]
Long-running daemon. LISTENs on the Postgres 'imagen_jobs' channel and polls
imagen.jobs every --poll-interval as a safety net, claims pending rows, runs
the generation pipeline, then updates the row with status + image_id.
Env:
IMAGEN_WORKER_DATABASE_URL Postgres DSN for direct LISTEN + UPDATE.
Required (PostgREST cannot LISTEN).
SUPABASE_URL, SUPABASE_SERVICE_KEY, IMAGEN_OWNER_USER_ID
Reused from generate's cloud-sync path; the
worker writes imagen.images rows through the
same code path. Per-job owner_user_id from the
job row overrides IMAGEN_OWNER_USER_ID.`)
fs.PrintDefaults()
}
if err := fs.Parse(args); err != nil {
return err
}
cfg, cfgErr := config.Load(configPath)
if cfgErr != nil && !os.IsNotExist(cfgErr) {
return cfgErr
}
dsn := os.Getenv("IMAGEN_WORKER_DATABASE_URL")
if dsn == "" {
return userErr("IMAGEN_WORKER_DATABASE_URL not set; the worker needs a direct Postgres DSN for LISTEN/NOTIFY")
}
q, err := dialQueue(ctx, dsn)
if err != nil {
return fmt.Errorf("queue: %w", err)
}
defer q.Close()
p := &workerPipeline{cfg: cfg}
w := worker.New(q, p, worker.Config{
PollInterval: pollInterval,
JobTimeout: jobTimeout,
Logger: func(format string, a ...any) { fmt.Fprintf(os.Stderr, format+"\n", a...) },
})
fmt.Fprintln(os.Stderr, "imagen worker: ready (poll-interval", pollInterval, "job-timeout", jobTimeout, ")")
return w.Run(ctx)
}
// pgxQueue is the production Queue. It opens one dedicated connection used
// for both LISTEN (long-lived) and UPDATE operations. A second connection
// would split state needlessly — a single worker process processes one job
// at a time so the connection is never contended.
type pgxQueue struct {
conn *pgx.Conn
}
func dialQueue(ctx context.Context, dsn string) (*pgxQueue, error) {
conn, err := pgx.Connect(ctx, dsn)
if err != nil {
return nil, fmt.Errorf("pgx.Connect: %w", err)
}
if _, err := conn.Exec(ctx, "LISTEN imagen_jobs"); err != nil {
conn.Close(ctx)
return nil, fmt.Errorf("LISTEN imagen_jobs: %w", err)
}
return &pgxQueue{conn: conn}, nil
}
func (q *pgxQueue) Close() {
if q == nil || q.conn == nil {
return
}
// Best-effort: a 5s budget is enough to send a polite TerminateMessage.
shutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = q.conn.Close(shutdown)
}
// ClaimNextPending atomically marks the oldest pending row 'running' and
// returns it. FOR UPDATE SKIP LOCKED is belt + braces against a second worker
// process — out of scope for v1 but cheap insurance.
func (q *pgxQueue) ClaimNextPending(ctx context.Context) (*worker.Job, error) {
const stmt = `
UPDATE imagen.jobs
SET status='running', started_at=now()
WHERE id = (
SELECT id FROM imagen.jobs
WHERE status='pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, owner_user_id, prompt, backend,
COALESCE(model,''),
COALESCE(width, 0), COALESCE(height, 0),
COALESCE(steps, 0), COALESCE(seed, 0),
COALESCE(style,'')`
var j worker.Job
err := q.conn.QueryRow(ctx, stmt).Scan(
&j.ID, &j.OwnerUserID, &j.Prompt, &j.Backend,
&j.Model, &j.Width, &j.Height, &j.Steps, &j.Seed, &j.Style,
)
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
return &j, nil
}
func (q *pgxQueue) MarkDone(ctx context.Context, jobID, imageID string) error {
_, err := q.conn.Exec(ctx,
`UPDATE imagen.jobs SET status='done', image_id=$2, completed_at=now() WHERE id=$1`,
jobID, imageID)
return err
}
func (q *pgxQueue) MarkFailed(ctx context.Context, jobID, msg string) error {
// Trim outrageously long error text so a 10MB stack-trace doesn't end up
// in the row (callers see a summary, full text goes to stderr / logs).
const maxLen = 2000
if len(msg) > maxLen {
msg = msg[:maxLen] + "... [truncated]"
}
_, err := q.conn.Exec(ctx,
`UPDATE imagen.jobs SET status='failed', error=$2, completed_at=now() WHERE id=$1`,
jobID, msg)
return err
}
// WaitForJob blocks until a NOTIFY arrives on imagen_jobs, the timeout fires,
// or ctx is cancelled. Notifications during a previous processJob are queued
// by pgx and delivered on the next call — we don't lose wake-ups even when
// processing took longer than poll-interval.
func (q *pgxQueue) WaitForJob(ctx context.Context, timeout time.Duration) error {
waitCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
_, err := q.conn.WaitForNotification(waitCtx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil // poll cadence fired
}
if errors.Is(err, context.Canceled) {
return context.Canceled
}
return err
}
return nil
}
// ResetStaleRunning bumps any rows stuck in 'running' back to 'pending' so
// they get re-claimed. Called once at startup. A row stuck in 'running' came
// from a previous worker crash; without this, flexsiebels would poll
// forever on a job nobody is processing.
func (q *pgxQueue) ResetStaleRunning(ctx context.Context) error {
_, err := q.conn.Exec(ctx,
`UPDATE imagen.jobs SET status='pending', started_at=NULL WHERE status='running'`)
return err
}
// workerPipeline is the Pipeline implementation that drives a single job
// through buildBackend → prompt enrichment → generate → write disk →
// cloud-sync, then returns the imagen.images.id back to the worker so it
// can link the row.
type workerPipeline struct {
cfg *config.Config
}
func (p *workerPipeline) Run(ctx context.Context, job worker.Job) worker.Outcome {
if job.OwnerUserID == "" {
return worker.Outcome{Err: fmt.Errorf("job %s: missing owner_user_id", job.ID)}
}
if job.Prompt == "" {
return worker.Outcome{Err: fmt.Errorf("job %s: empty prompt", job.ID)}
}
if job.Backend == "" {
return worker.Outcome{Err: fmt.Errorf("job %s: missing backend", job.ID)}
}
be, err := buildBackend(p.cfg, job.Backend)
if err != nil {
return worker.Outcome{Err: fmt.Errorf("backend %q: %w", job.Backend, err)}
}
attachUsageSink(be)
finalPrompt, err := prompt.Apply(job.Prompt, job.Style)
if err != nil {
return worker.Outcome{Err: fmt.Errorf("style: %w", err)}
}
req := backend.Request{
Prompt: finalPrompt,
Width: job.Width,
Height: job.Height,
Steps: job.Steps,
Seed: job.Seed,
Style: job.Style,
}
res, err := be.Generate(ctx, req)
if err != nil {
return worker.Outcome{Err: fmt.Errorf("generate: %w", err)}
}
defer res.ImageReader.Close()
writer := buildWriter(p.cfg, false)
in := output.Inputs{
Prompt: job.Prompt,
Backend: be.Name(),
Seed: seedFromMetadata(res.Metadata, job.Seed),
Ext: extFromMime(res.MimeType),
Metadata: res.Metadata,
}
paths, err := writer.Write(res.ImageReader, in)
if err != nil {
return worker.Outcome{Err: fmt.Errorf("write disk: %w", err)}
}
// Worker is queue-driven: cloud-sync is mandatory because flexsiebels
// needs imagen.images.id to render the result. Pass cloud_sync=on via
// the override path (third arg = ownerUserID); we set the mode by
// disallowing the 'off' branch through the cfg later if the user
// explicitly turned it off in config.
if cloudModeOff(p.cfg) {
// We refuse to silently drop a queued job. If cloud sync is off in
// config, the worker can't serve flexsiebels at all.
return worker.Outcome{Err: fmt.Errorf("output.cloud_sync=off in config; the worker requires cloud_sync=on or auto")}
}
syncRes, syncErr := maybeCloudSync(ctx, p.cfg, false, job.OwnerUserID, paths, in, res, dimOrFallback(job.Width, res, "width"), dimOrFallback(job.Height, res, "height"))
if syncErr != nil {
return worker.Outcome{Err: fmt.Errorf("cloud sync: %w", syncErr)}
}
if syncRes == nil || syncRes.ImageID == "" {
return worker.Outcome{Err: fmt.Errorf("cloud sync returned no imagen.images id (check SUPABASE_URL + SUPABASE_SERVICE_KEY)")}
}
return worker.Outcome{ImageID: syncRes.ImageID}
}
func cloudModeOff(cfg *config.Config) bool {
if cfg == nil {
return false
}
return strings.EqualFold(cfg.Output.CloudSync, "off")
}
// dimOrFallback returns job.<dim> when the job specified one, otherwise the
// dimension reported by the backend's metadata. Some backends (Replicate
// when given an aspect ratio) round the requested size to their nearest
// supported value; this keeps the row honest about what was actually generated.
func dimOrFallback(jobDim int, res *backend.Result, key string) int {
if jobDim > 0 {
return jobDim
}
return metaInt(res.Metadata, key)
}

View File

@@ -0,0 +1,129 @@
package main
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/jackc/pgx/v5"
"mgit.msbls.de/m/ImaGen/internal/config"
"mgit.msbls.de/m/ImaGen/internal/worker"
)
// TestWorker_Integration_EndToEnd runs the full pipeline against a real
// msupabase instance: insert a row into imagen.jobs, let the worker claim
// it, generate via the mock backend (no Replicate spend, no ComfyUI
// dependency), write to Supabase Storage + imagen.images, then flip the job
// to 'done' with the linked image_id.
//
// Guarded by IMAGEN_WORKER_INTEGRATION=1. Required env beyond that:
//
// IMAGEN_WORKER_DATABASE_URL postgres DSN (direct, not PostgREST)
// SUPABASE_URL e.g. https://supa.flexsiebels.de
// SUPABASE_SERVICE_KEY service-role JWT
// IMAGEN_OWNER_USER_ID UUID of an auth.users row (RLS fallback)
//
// The test creates and later deletes its own job row so repeated runs don't
// leave debris.
func TestWorker_Integration_EndToEnd(t *testing.T) {
if os.Getenv("IMAGEN_WORKER_INTEGRATION") != "1" {
t.Skip("set IMAGEN_WORKER_INTEGRATION=1 to run the integration test")
}
dsn := os.Getenv("IMAGEN_WORKER_DATABASE_URL")
if dsn == "" {
t.Fatal("IMAGEN_WORKER_DATABASE_URL must be set for the integration test")
}
if os.Getenv("SUPABASE_URL") == "" || os.Getenv("SUPABASE_SERVICE_KEY") == "" {
t.Fatal("SUPABASE_URL and SUPABASE_SERVICE_KEY must be set for the integration test")
}
owner := os.Getenv("IMAGEN_OWNER_USER_ID")
if owner == "" {
t.Fatal("IMAGEN_OWNER_USER_ID must be set for the integration test")
}
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()
q, err := dialQueue(ctx, dsn)
if err != nil {
t.Fatalf("dialQueue: %v", err)
}
defer q.Close()
// Insert the test job on a separate connection (the worker's conn is
// busy LISTENing). Mock backend = no external dependency.
insertConn, err := pgx.Connect(ctx, dsn)
if err != nil {
t.Fatalf("insert conn: %v", err)
}
defer insertConn.Close(ctx)
var jobID string
prompt := fmt.Sprintf("imagen integration test %d", time.Now().UnixNano())
err = insertConn.QueryRow(ctx, `
INSERT INTO imagen.jobs (owner_user_id, prompt, backend, width, height)
VALUES ($1, $2, 'mock', 64, 64)
RETURNING id`,
owner, prompt).Scan(&jobID)
if err != nil {
t.Fatalf("insert job: %v", err)
}
t.Logf("inserted imagen.jobs id=%s", jobID)
// Tidy up at the end of the test so a re-run starts clean.
defer func() {
cleanup, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, _ = insertConn.Exec(cleanup, `DELETE FROM imagen.jobs WHERE id=$1`, jobID)
}()
// Use a per-test temp dir so the generated PNG doesn't litter the repo.
tmpDir := t.TempDir()
cfg := &config.Config{Output: config.OutputConfig{Directory: tmpDir}}
p := &workerPipeline{cfg: cfg}
w := worker.New(q, p, worker.Config{
PollInterval: 1 * time.Second,
JobTimeout: 30 * time.Second,
Logger: func(format string, a ...any) { t.Logf("worker: "+format, a...) },
})
// Run the worker until it processes one job (the one we just inserted)
// or the test context times out.
runCtx, runCancel := context.WithCancel(ctx)
done := make(chan struct{})
go func() {
_ = w.Run(runCtx)
close(done)
}()
// Poll for completion.
deadline := time.Now().Add(60 * time.Second)
var status, imageID string
for time.Now().Before(deadline) {
err = insertConn.QueryRow(ctx,
`SELECT status, COALESCE(image_id::text,'') FROM imagen.jobs WHERE id=$1`,
jobID).Scan(&status, &imageID)
if err != nil {
t.Fatalf("poll: %v", err)
}
if status == "done" || status == "failed" {
break
}
time.Sleep(500 * time.Millisecond)
}
runCancel()
<-done
if status != "done" {
var errText string
_ = insertConn.QueryRow(ctx,
`SELECT COALESCE(error,'') FROM imagen.jobs WHERE id=$1`, jobID).Scan(&errText)
t.Fatalf("job not done within timeout: status=%q error=%q", status, errText)
}
if imageID == "" {
t.Fatalf("job done but image_id is empty")
}
t.Logf("job done: image_id=%s", imageID)
}