mAi: #8 - imagen.jobs queue + worker subcommand (flexsiebels write path)
Async write path for the flexsiebels owner-mode UI: flexsiebels INSERTs into imagen.jobs, the worker on mRiver claims pending rows via LISTEN/NOTIFY + 5s safety poll, runs the same generate pipeline imagen generate uses, and writes the result through internal/cloud into imagen.images. - Schema migration imagen_jobs_init: table + status CHECK + two indexes + owner-scoped RLS + grants + AFTER INSERT trigger publishing on the imagen_jobs channel via pg_notify. - internal/worker: DB-agnostic loop over a Queue interface. Drains the whole pending backlog on each wake. Job-scoped contexts are derived from Background so SIGTERM lets the in-flight generation finish (no half-state). ResetStaleRunning at startup unsticks rows left over from a previous crash. Eight unit tests cover the done / failed / missing-id / drain / NOTIFY-wake / shutdown / transient-error paths against a fake queue (no real Postgres in CI). - cmd/imagen/worker.go: pgx-backed Queue (one dedicated conn for LISTEN + UPDATE), plus the workerPipeline that reuses buildBackend + attachUsageSink + prompt.Apply + buildWriter + maybeCloudSync. The per-job owner_user_id overrides the env-level fallback so each row in imagen.images is attributed correctly. - maybeCloudSync now returns (*cloud.SyncResult, error) so the worker can link imagen.jobs.image_id to the inserted imagen.images row. The CLI generate path keeps printing its stderr summary unchanged. - scripts/imagen-worker.service + .env.example for the systemd --user unit on mRiver. EnvironmentFile lives in ~/.dotfiles and is never committed. - docs/setup-worker-mriver.md walks through installation + the spec's SQL-INSERT smoke; docs/architecture.md grows an "async write path" section. - worker_integration_test.go (env-guarded by IMAGEN_WORKER_INTEGRATION=1) drives one real job through the full pipeline against msupabase using the mock backend, then verifies imagen.images + Storage object landed and the row flipped to done with image_id linked. Verified end-to-end: pickup latency ~7ms, total 74ms, failure path captures error text.
This commit is contained in:
@@ -10,13 +10,17 @@ and lifecycle of its own block in `~/.config/imagen.yaml`.
|
|||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
```
|
```
|
||||||
cmd/imagen/ CLI shell — generate, backends, config, serve
|
cmd/imagen/ CLI shell — generate, worker, backends, config, serve
|
||||||
internal/backend/ Backend interface + Registry + Mock reference impl
|
internal/backend/ Backend interface + Registry + Mock reference impl
|
||||||
internal/prompt/ Style preset registry (embedded styles.yaml)
|
internal/prompt/ Style preset registry (embedded styles.yaml)
|
||||||
internal/output/ Filename templating, image writer, JSON sidecar
|
internal/output/ Filename templating, image writer, JSON sidecar
|
||||||
internal/config/ YAML loader, validation, sample generator
|
internal/config/ YAML loader, validation, sample generator
|
||||||
|
internal/cloud/ Supabase Storage + imagen.images writer
|
||||||
|
internal/usage/ mai.imagen_usage cost-tracking sink
|
||||||
|
internal/worker/ imagen.jobs queue consumer (DB-agnostic via Queue interface)
|
||||||
internal/server/ HTTP stub (not implemented yet — follow-up issue)
|
internal/server/ HTTP stub (not implemented yet — follow-up issue)
|
||||||
docs/ architecture.md, usage.md
|
scripts/ imagen-worker.service + env template, ComfyUI scripts
|
||||||
|
docs/ architecture.md, usage.md, setup-worker-mriver.md
|
||||||
```
|
```
|
||||||
|
|
||||||
Data flow for `imagen generate`:
|
Data flow for `imagen generate`:
|
||||||
|
|||||||
@@ -132,9 +132,11 @@ func runGenerate(ctx context.Context, args []string) error {
|
|||||||
fmt.Fprintln(os.Stderr, "sidecar:", paths.SidecarPath)
|
fmt.Fprintln(os.Stderr, "sidecar:", paths.SidecarPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := maybeCloudSync(ctx, cfg, noCloud, paths, in, res, w, h); err != nil {
|
if result, err := maybeCloudSync(ctx, cfg, noCloud, "", paths, in, res, w, h); err != nil {
|
||||||
// cloud-sync failures are warnings — the image already wrote.
|
// cloud-sync failures are warnings — the image already wrote.
|
||||||
fmt.Fprintln(os.Stderr, "imagen: cloud sync:", err)
|
fmt.Fprintln(os.Stderr, "imagen: cloud sync:", err)
|
||||||
|
} else if result != nil && result.ImageID != "" {
|
||||||
|
fmt.Fprintf(os.Stderr, "cloud: imagen.images.id=%s storage_path=%s\n", result.ImageID, result.StoragePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := maybePreview(cfg, previewOn, previewOff, paths.ImagePath, rawPrompt); err != nil {
|
if err := maybePreview(cfg, previewOn, previewOff, paths.ImagePath, rawPrompt); err != nil {
|
||||||
@@ -167,39 +169,45 @@ func resolveCloudSyncMode(cfg *config.Config, noCloudFlag bool, env string) (str
|
|||||||
}
|
}
|
||||||
|
|
||||||
// maybeCloudSync resolves the effective mode and, if it says yes, uploads
|
// maybeCloudSync resolves the effective mode and, if it says yes, uploads
|
||||||
// the PNG and inserts the row. Always non-fatal — the image already wrote.
|
// the PNG and inserts the row. Returns the SyncResult on success so callers
|
||||||
func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, paths *output.Outputs, in output.Inputs, res *backend.Result, width, height int) error {
|
// that need the imagen.images.id (e.g. the worker linking a job row) can pick
|
||||||
|
// it up. ownerOverride, when non-empty, wins over config + env — the worker
|
||||||
|
// passes the job row's owner_user_id so each job is attributed correctly.
|
||||||
|
func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, ownerOverride string, paths *output.Outputs, in output.Inputs, res *backend.Result, width, height int) (*cloud.SyncResult, error) {
|
||||||
mode, err := resolveCloudSyncMode(cfg, noCloud, os.Getenv("IMAGEN_CLOUD_SYNC"))
|
mode, err := resolveCloudSyncMode(cfg, noCloud, os.Getenv("IMAGEN_CLOUD_SYNC"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
if mode == "off" {
|
if mode == "off" {
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sink, ok := cloud.NewFromEnv()
|
sink, ok := cloud.NewFromEnv()
|
||||||
if !ok {
|
if !ok {
|
||||||
if mode == "on" {
|
if mode == "on" {
|
||||||
return fmt.Errorf("cloud_sync=on but SUPABASE_URL / SUPABASE_SERVICE_KEY not set in env")
|
return nil, fmt.Errorf("cloud_sync=on but SUPABASE_URL / SUPABASE_SERVICE_KEY not set in env")
|
||||||
}
|
}
|
||||||
// auto + missing env = silent skip.
|
// auto + missing env = silent skip.
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
// Config-supplied owner_user_id takes precedence over $IMAGEN_OWNER_USER_ID.
|
switch {
|
||||||
if cfg != nil && cfg.OwnerUserID != "" {
|
case ownerOverride != "":
|
||||||
|
sink.OwnerUserID = ownerOverride
|
||||||
|
case cfg != nil && cfg.OwnerUserID != "":
|
||||||
|
// Config-supplied owner_user_id takes precedence over $IMAGEN_OWNER_USER_ID.
|
||||||
sink.OwnerUserID = cfg.OwnerUserID
|
sink.OwnerUserID = cfg.OwnerUserID
|
||||||
}
|
}
|
||||||
if sink.OwnerUserID == "" {
|
if sink.OwnerUserID == "" {
|
||||||
if mode == "on" {
|
if mode == "on" {
|
||||||
return fmt.Errorf("cloud_sync=on but owner_user_id not set in config and $IMAGEN_OWNER_USER_ID is empty")
|
return nil, fmt.Errorf("cloud_sync=on but owner_user_id not set in config and $IMAGEN_OWNER_USER_ID is empty")
|
||||||
}
|
}
|
||||||
// auto + missing UUID = silent skip.
|
// auto + missing UUID = silent skip.
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
pngBytes, readErr := os.ReadFile(paths.ImagePath)
|
pngBytes, readErr := os.ReadFile(paths.ImagePath)
|
||||||
if readErr != nil {
|
if readErr != nil {
|
||||||
return fmt.Errorf("read local image: %w", readErr)
|
return nil, fmt.Errorf("read local image: %w", readErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reuse the writer's date/slug/seed so storage_path mirrors the local
|
// Reuse the writer's date/slug/seed so storage_path mirrors the local
|
||||||
@@ -257,14 +265,7 @@ func maybeCloudSync(ctx context.Context, cfg *config.Config, noCloud bool, paths
|
|||||||
}
|
}
|
||||||
syncCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
syncCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
result, err := sink.Sync(syncCtx, syncReq)
|
return sink.Sync(syncCtx, syncReq)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if result != nil && result.ImageID != "" {
|
|
||||||
fmt.Fprintf(os.Stderr, "cloud: imagen.images.id=%s storage_path=%s\n", result.ImageID, result.StoragePath)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func metaString(m map[string]any, key string) string {
|
func metaString(m map[string]any, key string) string {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ const helpText = `imagen — model-agnostic image generation
|
|||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
imagen generate <prompt> [flags] generate one image
|
imagen generate <prompt> [flags] generate one image
|
||||||
|
imagen worker [flags] consume the imagen.jobs queue (daemon)
|
||||||
imagen backends list registered backend types
|
imagen backends list registered backend types
|
||||||
imagen config init print a sample imagen.yaml on stdout
|
imagen config init print a sample imagen.yaml on stdout
|
||||||
imagen config validate validate the active config
|
imagen config validate validate the active config
|
||||||
@@ -45,6 +46,8 @@ func main() {
|
|||||||
switch os.Args[1] {
|
switch os.Args[1] {
|
||||||
case "generate":
|
case "generate":
|
||||||
err = runGenerate(ctx, args)
|
err = runGenerate(ctx, args)
|
||||||
|
case "worker":
|
||||||
|
err = runWorker(ctx, args)
|
||||||
case "backends":
|
case "backends":
|
||||||
err = runBackends(args)
|
err = runBackends(args)
|
||||||
case "config":
|
case "config":
|
||||||
|
|||||||
287
cmd/imagen/worker.go
Normal file
287
cmd/imagen/worker.go
Normal file
@@ -0,0 +1,287 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
|
||||||
|
"mgit.msbls.de/m/ImaGen/internal/backend"
|
||||||
|
"mgit.msbls.de/m/ImaGen/internal/config"
|
||||||
|
"mgit.msbls.de/m/ImaGen/internal/output"
|
||||||
|
"mgit.msbls.de/m/ImaGen/internal/prompt"
|
||||||
|
"mgit.msbls.de/m/ImaGen/internal/worker"
|
||||||
|
)
|
||||||
|
|
||||||
|
// runWorker is the `imagen worker` subcommand: a long-running daemon that
|
||||||
|
// consumes the imagen.jobs queue and writes results into imagen.images via
|
||||||
|
// the same cloud-sync path generate uses.
|
||||||
|
func runWorker(ctx context.Context, args []string) error {
|
||||||
|
fs := flag.NewFlagSet("worker", flag.ContinueOnError)
|
||||||
|
var (
|
||||||
|
configPath string
|
||||||
|
pollInterval time.Duration
|
||||||
|
jobTimeout time.Duration
|
||||||
|
)
|
||||||
|
fs.StringVar(&configPath, "config", "", "config file path (default: ~/.config/imagen.yaml)")
|
||||||
|
fs.DurationVar(&pollInterval, "poll-interval", 5*time.Second, "safety-poll cadence between LISTEN wakeups")
|
||||||
|
fs.DurationVar(&jobTimeout, "job-timeout", 5*time.Minute, "max wall-time per job before the worker marks it failed")
|
||||||
|
fs.Usage = func() {
|
||||||
|
fmt.Fprintln(fs.Output(), `Usage: imagen worker [flags]
|
||||||
|
|
||||||
|
Long-running daemon. LISTENs on the Postgres 'imagen_jobs' channel and polls
|
||||||
|
imagen.jobs every --poll-interval as a safety net, claims pending rows, runs
|
||||||
|
the generation pipeline, then updates the row with status + image_id.
|
||||||
|
|
||||||
|
Env:
|
||||||
|
IMAGEN_WORKER_DATABASE_URL Postgres DSN for direct LISTEN + UPDATE.
|
||||||
|
Required (PostgREST cannot LISTEN).
|
||||||
|
SUPABASE_URL, SUPABASE_SERVICE_KEY, IMAGEN_OWNER_USER_ID
|
||||||
|
Reused from generate's cloud-sync path; the
|
||||||
|
worker writes imagen.images rows through the
|
||||||
|
same code path. Per-job owner_user_id from the
|
||||||
|
job row overrides IMAGEN_OWNER_USER_ID.`)
|
||||||
|
fs.PrintDefaults()
|
||||||
|
}
|
||||||
|
if err := fs.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg, cfgErr := config.Load(configPath)
|
||||||
|
if cfgErr != nil && !os.IsNotExist(cfgErr) {
|
||||||
|
return cfgErr
|
||||||
|
}
|
||||||
|
|
||||||
|
dsn := os.Getenv("IMAGEN_WORKER_DATABASE_URL")
|
||||||
|
if dsn == "" {
|
||||||
|
return userErr("IMAGEN_WORKER_DATABASE_URL not set; the worker needs a direct Postgres DSN for LISTEN/NOTIFY")
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := dialQueue(ctx, dsn)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("queue: %w", err)
|
||||||
|
}
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
p := &workerPipeline{cfg: cfg}
|
||||||
|
w := worker.New(q, p, worker.Config{
|
||||||
|
PollInterval: pollInterval,
|
||||||
|
JobTimeout: jobTimeout,
|
||||||
|
Logger: func(format string, a ...any) { fmt.Fprintf(os.Stderr, format+"\n", a...) },
|
||||||
|
})
|
||||||
|
fmt.Fprintln(os.Stderr, "imagen worker: ready (poll-interval", pollInterval, "job-timeout", jobTimeout, ")")
|
||||||
|
return w.Run(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pgxQueue is the production Queue. It opens one dedicated connection used
|
||||||
|
// for both LISTEN (long-lived) and UPDATE operations. A second connection
|
||||||
|
// would split state needlessly — a single worker process processes one job
|
||||||
|
// at a time so the connection is never contended.
|
||||||
|
type pgxQueue struct {
|
||||||
|
conn *pgx.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func dialQueue(ctx context.Context, dsn string) (*pgxQueue, error) {
|
||||||
|
conn, err := pgx.Connect(ctx, dsn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("pgx.Connect: %w", err)
|
||||||
|
}
|
||||||
|
if _, err := conn.Exec(ctx, "LISTEN imagen_jobs"); err != nil {
|
||||||
|
conn.Close(ctx)
|
||||||
|
return nil, fmt.Errorf("LISTEN imagen_jobs: %w", err)
|
||||||
|
}
|
||||||
|
return &pgxQueue{conn: conn}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *pgxQueue) Close() {
|
||||||
|
if q == nil || q.conn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Best-effort: a 5s budget is enough to send a polite TerminateMessage.
|
||||||
|
shutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
_ = q.conn.Close(shutdown)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClaimNextPending atomically marks the oldest pending row 'running' and
|
||||||
|
// returns it. FOR UPDATE SKIP LOCKED is belt + braces against a second worker
|
||||||
|
// process — out of scope for v1 but cheap insurance.
|
||||||
|
func (q *pgxQueue) ClaimNextPending(ctx context.Context) (*worker.Job, error) {
|
||||||
|
const stmt = `
|
||||||
|
UPDATE imagen.jobs
|
||||||
|
SET status='running', started_at=now()
|
||||||
|
WHERE id = (
|
||||||
|
SELECT id FROM imagen.jobs
|
||||||
|
WHERE status='pending'
|
||||||
|
ORDER BY created_at
|
||||||
|
LIMIT 1
|
||||||
|
FOR UPDATE SKIP LOCKED
|
||||||
|
)
|
||||||
|
RETURNING id, owner_user_id, prompt, backend,
|
||||||
|
COALESCE(model,''),
|
||||||
|
COALESCE(width, 0), COALESCE(height, 0),
|
||||||
|
COALESCE(steps, 0), COALESCE(seed, 0),
|
||||||
|
COALESCE(style,'')`
|
||||||
|
var j worker.Job
|
||||||
|
err := q.conn.QueryRow(ctx, stmt).Scan(
|
||||||
|
&j.ID, &j.OwnerUserID, &j.Prompt, &j.Backend,
|
||||||
|
&j.Model, &j.Width, &j.Height, &j.Steps, &j.Seed, &j.Style,
|
||||||
|
)
|
||||||
|
if errors.Is(err, pgx.ErrNoRows) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &j, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *pgxQueue) MarkDone(ctx context.Context, jobID, imageID string) error {
|
||||||
|
_, err := q.conn.Exec(ctx,
|
||||||
|
`UPDATE imagen.jobs SET status='done', image_id=$2, completed_at=now() WHERE id=$1`,
|
||||||
|
jobID, imageID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *pgxQueue) MarkFailed(ctx context.Context, jobID, msg string) error {
|
||||||
|
// Trim outrageously long error text so a 10MB stack-trace doesn't end up
|
||||||
|
// in the row (callers see a summary, full text goes to stderr / logs).
|
||||||
|
const maxLen = 2000
|
||||||
|
if len(msg) > maxLen {
|
||||||
|
msg = msg[:maxLen] + "... [truncated]"
|
||||||
|
}
|
||||||
|
_, err := q.conn.Exec(ctx,
|
||||||
|
`UPDATE imagen.jobs SET status='failed', error=$2, completed_at=now() WHERE id=$1`,
|
||||||
|
jobID, msg)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForJob blocks until a NOTIFY arrives on imagen_jobs, the timeout fires,
|
||||||
|
// or ctx is cancelled. Notifications during a previous processJob are queued
|
||||||
|
// by pgx and delivered on the next call — we don't lose wake-ups even when
|
||||||
|
// processing took longer than poll-interval.
|
||||||
|
func (q *pgxQueue) WaitForJob(ctx context.Context, timeout time.Duration) error {
|
||||||
|
waitCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
|
defer cancel()
|
||||||
|
_, err := q.conn.WaitForNotification(waitCtx)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
return nil // poll cadence fired
|
||||||
|
}
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return context.Canceled
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetStaleRunning bumps any rows stuck in 'running' back to 'pending' so
|
||||||
|
// they get re-claimed. Called once at startup. A row stuck in 'running' came
|
||||||
|
// from a previous worker crash; without this, flexsiebels would poll
|
||||||
|
// forever on a job nobody is processing.
|
||||||
|
func (q *pgxQueue) ResetStaleRunning(ctx context.Context) error {
|
||||||
|
_, err := q.conn.Exec(ctx,
|
||||||
|
`UPDATE imagen.jobs SET status='pending', started_at=NULL WHERE status='running'`)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// workerPipeline is the Pipeline implementation that drives a single job
|
||||||
|
// through buildBackend → prompt enrichment → generate → write disk →
|
||||||
|
// cloud-sync, then returns the imagen.images.id back to the worker so it
|
||||||
|
// can link the row.
|
||||||
|
type workerPipeline struct {
|
||||||
|
cfg *config.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *workerPipeline) Run(ctx context.Context, job worker.Job) worker.Outcome {
|
||||||
|
if job.OwnerUserID == "" {
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("job %s: missing owner_user_id", job.ID)}
|
||||||
|
}
|
||||||
|
if job.Prompt == "" {
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("job %s: empty prompt", job.ID)}
|
||||||
|
}
|
||||||
|
if job.Backend == "" {
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("job %s: missing backend", job.ID)}
|
||||||
|
}
|
||||||
|
|
||||||
|
be, err := buildBackend(p.cfg, job.Backend)
|
||||||
|
if err != nil {
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("backend %q: %w", job.Backend, err)}
|
||||||
|
}
|
||||||
|
attachUsageSink(be)
|
||||||
|
|
||||||
|
finalPrompt, err := prompt.Apply(job.Prompt, job.Style)
|
||||||
|
if err != nil {
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("style: %w", err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
req := backend.Request{
|
||||||
|
Prompt: finalPrompt,
|
||||||
|
Width: job.Width,
|
||||||
|
Height: job.Height,
|
||||||
|
Steps: job.Steps,
|
||||||
|
Seed: job.Seed,
|
||||||
|
Style: job.Style,
|
||||||
|
}
|
||||||
|
res, err := be.Generate(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("generate: %w", err)}
|
||||||
|
}
|
||||||
|
defer res.ImageReader.Close()
|
||||||
|
|
||||||
|
writer := buildWriter(p.cfg, false)
|
||||||
|
in := output.Inputs{
|
||||||
|
Prompt: job.Prompt,
|
||||||
|
Backend: be.Name(),
|
||||||
|
Seed: seedFromMetadata(res.Metadata, job.Seed),
|
||||||
|
Ext: extFromMime(res.MimeType),
|
||||||
|
Metadata: res.Metadata,
|
||||||
|
}
|
||||||
|
paths, err := writer.Write(res.ImageReader, in)
|
||||||
|
if err != nil {
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("write disk: %w", err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Worker is queue-driven: cloud-sync is mandatory because flexsiebels
|
||||||
|
// needs imagen.images.id to render the result. Pass cloud_sync=on via
|
||||||
|
// the override path (third arg = ownerUserID); we set the mode by
|
||||||
|
// disallowing the 'off' branch through the cfg later if the user
|
||||||
|
// explicitly turned it off in config.
|
||||||
|
if cloudModeOff(p.cfg) {
|
||||||
|
// We refuse to silently drop a queued job. If cloud sync is off in
|
||||||
|
// config, the worker can't serve flexsiebels at all.
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("output.cloud_sync=off in config; the worker requires cloud_sync=on or auto")}
|
||||||
|
}
|
||||||
|
syncRes, syncErr := maybeCloudSync(ctx, p.cfg, false, job.OwnerUserID, paths, in, res, dimOrFallback(job.Width, res, "width"), dimOrFallback(job.Height, res, "height"))
|
||||||
|
if syncErr != nil {
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("cloud sync: %w", syncErr)}
|
||||||
|
}
|
||||||
|
if syncRes == nil || syncRes.ImageID == "" {
|
||||||
|
return worker.Outcome{Err: fmt.Errorf("cloud sync returned no imagen.images id (check SUPABASE_URL + SUPABASE_SERVICE_KEY)")}
|
||||||
|
}
|
||||||
|
return worker.Outcome{ImageID: syncRes.ImageID}
|
||||||
|
}
|
||||||
|
|
||||||
|
func cloudModeOff(cfg *config.Config) bool {
|
||||||
|
if cfg == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return strings.EqualFold(cfg.Output.CloudSync, "off")
|
||||||
|
}
|
||||||
|
|
||||||
|
// dimOrFallback returns job.<dim> when the job specified one, otherwise the
|
||||||
|
// dimension reported by the backend's metadata. Some backends (Replicate
|
||||||
|
// when given an aspect ratio) round the requested size to their nearest
|
||||||
|
// supported value; this keeps the row honest about what was actually generated.
|
||||||
|
func dimOrFallback(jobDim int, res *backend.Result, key string) int {
|
||||||
|
if jobDim > 0 {
|
||||||
|
return jobDim
|
||||||
|
}
|
||||||
|
return metaInt(res.Metadata, key)
|
||||||
|
}
|
||||||
129
cmd/imagen/worker_integration_test.go
Normal file
129
cmd/imagen/worker_integration_test.go
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
|
||||||
|
"mgit.msbls.de/m/ImaGen/internal/config"
|
||||||
|
"mgit.msbls.de/m/ImaGen/internal/worker"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestWorker_Integration_EndToEnd runs the full pipeline against a real
|
||||||
|
// msupabase instance: insert a row into imagen.jobs, let the worker claim
|
||||||
|
// it, generate via the mock backend (no Replicate spend, no ComfyUI
|
||||||
|
// dependency), write to Supabase Storage + imagen.images, then flip the job
|
||||||
|
// to 'done' with the linked image_id.
|
||||||
|
//
|
||||||
|
// Guarded by IMAGEN_WORKER_INTEGRATION=1. Required env beyond that:
|
||||||
|
//
|
||||||
|
// IMAGEN_WORKER_DATABASE_URL postgres DSN (direct, not PostgREST)
|
||||||
|
// SUPABASE_URL e.g. https://supa.flexsiebels.de
|
||||||
|
// SUPABASE_SERVICE_KEY service-role JWT
|
||||||
|
// IMAGEN_OWNER_USER_ID UUID of an auth.users row (RLS fallback)
|
||||||
|
//
|
||||||
|
// The test creates and later deletes its own job row so repeated runs don't
|
||||||
|
// leave debris.
|
||||||
|
func TestWorker_Integration_EndToEnd(t *testing.T) {
|
||||||
|
if os.Getenv("IMAGEN_WORKER_INTEGRATION") != "1" {
|
||||||
|
t.Skip("set IMAGEN_WORKER_INTEGRATION=1 to run the integration test")
|
||||||
|
}
|
||||||
|
dsn := os.Getenv("IMAGEN_WORKER_DATABASE_URL")
|
||||||
|
if dsn == "" {
|
||||||
|
t.Fatal("IMAGEN_WORKER_DATABASE_URL must be set for the integration test")
|
||||||
|
}
|
||||||
|
if os.Getenv("SUPABASE_URL") == "" || os.Getenv("SUPABASE_SERVICE_KEY") == "" {
|
||||||
|
t.Fatal("SUPABASE_URL and SUPABASE_SERVICE_KEY must be set for the integration test")
|
||||||
|
}
|
||||||
|
owner := os.Getenv("IMAGEN_OWNER_USER_ID")
|
||||||
|
if owner == "" {
|
||||||
|
t.Fatal("IMAGEN_OWNER_USER_ID must be set for the integration test")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
q, err := dialQueue(ctx, dsn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("dialQueue: %v", err)
|
||||||
|
}
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
// Insert the test job on a separate connection (the worker's conn is
|
||||||
|
// busy LISTENing). Mock backend = no external dependency.
|
||||||
|
insertConn, err := pgx.Connect(ctx, dsn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("insert conn: %v", err)
|
||||||
|
}
|
||||||
|
defer insertConn.Close(ctx)
|
||||||
|
|
||||||
|
var jobID string
|
||||||
|
prompt := fmt.Sprintf("imagen integration test %d", time.Now().UnixNano())
|
||||||
|
err = insertConn.QueryRow(ctx, `
|
||||||
|
INSERT INTO imagen.jobs (owner_user_id, prompt, backend, width, height)
|
||||||
|
VALUES ($1, $2, 'mock', 64, 64)
|
||||||
|
RETURNING id`,
|
||||||
|
owner, prompt).Scan(&jobID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("insert job: %v", err)
|
||||||
|
}
|
||||||
|
t.Logf("inserted imagen.jobs id=%s", jobID)
|
||||||
|
// Tidy up at the end of the test so a re-run starts clean.
|
||||||
|
defer func() {
|
||||||
|
cleanup, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
_, _ = insertConn.Exec(cleanup, `DELETE FROM imagen.jobs WHERE id=$1`, jobID)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Use a per-test temp dir so the generated PNG doesn't litter the repo.
|
||||||
|
tmpDir := t.TempDir()
|
||||||
|
cfg := &config.Config{Output: config.OutputConfig{Directory: tmpDir}}
|
||||||
|
p := &workerPipeline{cfg: cfg}
|
||||||
|
w := worker.New(q, p, worker.Config{
|
||||||
|
PollInterval: 1 * time.Second,
|
||||||
|
JobTimeout: 30 * time.Second,
|
||||||
|
Logger: func(format string, a ...any) { t.Logf("worker: "+format, a...) },
|
||||||
|
})
|
||||||
|
|
||||||
|
// Run the worker until it processes one job (the one we just inserted)
|
||||||
|
// or the test context times out.
|
||||||
|
runCtx, runCancel := context.WithCancel(ctx)
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
_ = w.Run(runCtx)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Poll for completion.
|
||||||
|
deadline := time.Now().Add(60 * time.Second)
|
||||||
|
var status, imageID string
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
err = insertConn.QueryRow(ctx,
|
||||||
|
`SELECT status, COALESCE(image_id::text,'') FROM imagen.jobs WHERE id=$1`,
|
||||||
|
jobID).Scan(&status, &imageID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("poll: %v", err)
|
||||||
|
}
|
||||||
|
if status == "done" || status == "failed" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
}
|
||||||
|
runCancel()
|
||||||
|
<-done
|
||||||
|
|
||||||
|
if status != "done" {
|
||||||
|
var errText string
|
||||||
|
_ = insertConn.QueryRow(ctx,
|
||||||
|
`SELECT COALESCE(error,'') FROM imagen.jobs WHERE id=$1`, jobID).Scan(&errText)
|
||||||
|
t.Fatalf("job not done within timeout: status=%q error=%q", status, errText)
|
||||||
|
}
|
||||||
|
if imageID == "" {
|
||||||
|
t.Fatalf("job done but image_id is empty")
|
||||||
|
}
|
||||||
|
t.Logf("job done: image_id=%s", imageID)
|
||||||
|
}
|
||||||
@@ -7,7 +7,7 @@ upstream API. Each adapter only ever sees its own slice of `imagen.yaml`.
|
|||||||
|
|
||||||
```
|
```
|
||||||
┌───────────────────────┐
|
┌───────────────────────┐
|
||||||
│ cmd/imagen │ CLI dispatch
|
│ cmd/imagen │ CLI dispatch (generate / worker / …)
|
||||||
│ (or HTTP server) │
|
│ (or HTTP server) │
|
||||||
└──────────┬────────────┘
|
└──────────┬────────────┘
|
||||||
│
|
│
|
||||||
@@ -18,6 +18,7 @@ upstream API. Each adapter only ever sees its own slice of `imagen.yaml`.
|
|||||||
│ internal/preview │ tmux-img window spawner
|
│ internal/preview │ tmux-img window spawner
|
||||||
│ internal/cloud │ Supabase Storage + imagen.images
|
│ internal/cloud │ Supabase Storage + imagen.images
|
||||||
│ internal/usage │ mai.imagen_usage cost-tracking
|
│ internal/usage │ mai.imagen_usage cost-tracking
|
||||||
|
│ internal/worker │ imagen.jobs queue consumer
|
||||||
└──────────┬────────────┘
|
└──────────┬────────────┘
|
||||||
│
|
│
|
||||||
┌──────────▼────────────┐
|
┌──────────▼────────────┐
|
||||||
@@ -105,9 +106,37 @@ contains the prompt, backend instance name, seed, ISO timestamp, and the
|
|||||||
- Network errors during `Generate` — wrap and return; no retry policy yet
|
- Network errors during `Generate` — wrap and return; no retry policy yet
|
||||||
(decide per-adapter, or move to a shared retry helper if a pattern emerges).
|
(decide per-adapter, or move to a shared retry helper if a pattern emerges).
|
||||||
|
|
||||||
|
## Async write path: `imagen worker` + `imagen.jobs`
|
||||||
|
|
||||||
|
`imagen generate` is the synchronous CLI. For web callers (flexsiebels'
|
||||||
|
owner-mode UI) `cmd/imagen worker` runs as a daemon that consumes the
|
||||||
|
`imagen.jobs` table.
|
||||||
|
|
||||||
|
```
|
||||||
|
flexsiebels POST imagen worker (mRiver, systemd)
|
||||||
|
→ INSERT INTO LISTEN imagen_jobs ◄── pg_notify trigger
|
||||||
|
imagen.jobs(pending) claim row (UPDATE … RETURNING)
|
||||||
|
dispatch through internal/backend
|
||||||
|
write disk + cloud-sync via internal/cloud
|
||||||
|
UPDATE imagen.jobs SET status='done', image_id=…
|
||||||
|
```
|
||||||
|
|
||||||
|
The queue table lives next to `imagen.images` in the same `imagen` schema.
|
||||||
|
Owner-scoped RLS lets the flexsiebels user INSERT + read their own rows;
|
||||||
|
the worker writes (status updates + image_id link) via service-role which
|
||||||
|
bypasses RLS. A 5-second safety poll fires on every wake-up to cover
|
||||||
|
dropped NOTIFY events and worker cold starts with a non-empty queue. See
|
||||||
|
`docs/setup-worker-mriver.md` for the systemd installation.
|
||||||
|
|
||||||
|
The worker reuses `internal/backend`, `internal/output`, and
|
||||||
|
`internal/cloud` unchanged — it is purely an orchestration layer around
|
||||||
|
the same pipeline `imagen generate` drives.
|
||||||
|
|
||||||
## Out of scope (today)
|
## Out of scope (today)
|
||||||
|
|
||||||
- Image post-processing (cropping, watermarking).
|
- Image post-processing (cropping, watermarking).
|
||||||
- Cost-tracking (lands with the Replicate adapter, since only API backends bill).
|
|
||||||
- Multi-image `n>1` per request — backends that support it can expose it via
|
- Multi-image `n>1` per request — backends that support it can expose it via
|
||||||
`BackendOpts`; the framework doesn't have a first-class field yet.
|
`BackendOpts`; the framework doesn't have a first-class field yet.
|
||||||
|
- Job cancellation / kill switch — separate follow-up issue.
|
||||||
|
- Concurrent workers / multi-host scale-out — `FOR UPDATE SKIP LOCKED` in
|
||||||
|
the claim query makes it cheap to add, but a single worker is the v1 setup.
|
||||||
|
|||||||
97
docs/setup-worker-mriver.md
Normal file
97
docs/setup-worker-mriver.md
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
# `imagen worker` on mRiver
|
||||||
|
|
||||||
|
The worker is a long-running daemon that consumes the `imagen.jobs` queue
|
||||||
|
(written by flexsiebels' owner-mode UI) and writes the resulting image to
|
||||||
|
Supabase Storage + `imagen.images` via the same cloud-sync path the CLI
|
||||||
|
`imagen generate` uses.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
```
|
||||||
|
flexsiebels (owner UI)
|
||||||
|
|
|
||||||
|
v INSERT INTO imagen.jobs (...)
|
||||||
|
|
|
||||||
|
msupabase Postgres
|
||||||
|
|
|
||||||
|
| AFTER INSERT trigger:
|
||||||
|
| pg_notify('imagen_jobs', NEW.id)
|
||||||
|
v
|
||||||
|
imagen worker (mRiver) ── LISTEN imagen_jobs
|
||||||
|
|
|
||||||
|
| 1. claim oldest 'pending' row (status='running')
|
||||||
|
| 2. dispatch to backend (FLUX schnell local / FLUX dev replicate / …)
|
||||||
|
| 3. write PNG to disk
|
||||||
|
| 4. upload to Storage + INSERT into imagen.images
|
||||||
|
| 5. UPDATE imagen.jobs SET status='done', image_id=...
|
||||||
|
v
|
||||||
|
flexsiebels polls GET .../jobs/<id> → renders the rendered card
|
||||||
|
```
|
||||||
|
|
||||||
|
A 5-second safety poll covers dropped NOTIFY events and worker cold starts
|
||||||
|
with a non-empty queue.
|
||||||
|
|
||||||
|
## One-time setup
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 1. Build the binary (or `task build`).
|
||||||
|
cd ~/dev/ImaGen
|
||||||
|
go build -o bin/imagen ./cmd/imagen
|
||||||
|
|
||||||
|
# 2. Write the environment file.
|
||||||
|
cp scripts/imagen-worker.env.example ~/.dotfiles/.env.imagen-worker
|
||||||
|
chmod 600 ~/.dotfiles/.env.imagen-worker
|
||||||
|
$EDITOR ~/.dotfiles/.env.imagen-worker # fill in real DSN, service key
|
||||||
|
|
||||||
|
# 3. Install the user systemd unit.
|
||||||
|
mkdir -p ~/.config/systemd/user
|
||||||
|
cp scripts/imagen-worker.service ~/.config/systemd/user/imagen-worker.service
|
||||||
|
systemctl --user daemon-reload
|
||||||
|
systemctl --user enable --now imagen-worker.service
|
||||||
|
|
||||||
|
# 4. Tail the logs.
|
||||||
|
journalctl --user -u imagen-worker -f
|
||||||
|
```
|
||||||
|
|
||||||
|
## Required env vars
|
||||||
|
|
||||||
|
See `scripts/imagen-worker.env.example` for the canonical list. Required:
|
||||||
|
|
||||||
|
- `IMAGEN_WORKER_DATABASE_URL` — direct Postgres DSN. PostgREST cannot LISTEN.
|
||||||
|
- `SUPABASE_URL`, `SUPABASE_SERVICE_KEY` — same pair `imagen generate`
|
||||||
|
reads for the cloud-sync writer.
|
||||||
|
- `IMAGEN_OWNER_USER_ID` — fallback owner UUID; per-job row's
|
||||||
|
`owner_user_id` overrides this.
|
||||||
|
|
||||||
|
Optional, depending on enabled backends:
|
||||||
|
|
||||||
|
- `REPLICATE_API_TOKEN` if any job will request a Replicate-typed backend.
|
||||||
|
|
||||||
|
## Operating
|
||||||
|
|
||||||
|
```bash
|
||||||
|
systemctl --user status imagen-worker # health
|
||||||
|
systemctl --user restart imagen-worker # pick up a new binary
|
||||||
|
journalctl --user -u imagen-worker -n 200 # recent log lines
|
||||||
|
```
|
||||||
|
|
||||||
|
On startup the worker calls `ResetStaleRunning` once, flipping any rows
|
||||||
|
left in `'running'` from a previous crash back to `'pending'` so they get
|
||||||
|
re-claimed by the 5-second poll.
|
||||||
|
|
||||||
|
## Smoke test
|
||||||
|
|
||||||
|
With the worker running, INSERT a test job:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
INSERT INTO imagen.jobs (owner_user_id, prompt, backend, width, height)
|
||||||
|
VALUES (
|
||||||
|
'ac6c9501-3757-4a6d-8b97-2cff4288382b',
|
||||||
|
'a tiny owl wearing wire-rim glasses, photo',
|
||||||
|
'flux-schnell-local', 1024, 1024
|
||||||
|
);
|
||||||
|
```
|
||||||
|
|
||||||
|
Within ~10 seconds the row should show `status='done'`, a populated
|
||||||
|
`image_id` linking to a real `imagen.images` row, and a Storage object at
|
||||||
|
`<YYYY-MM-DD>/<slug>-<seed>.png` in the `imagen-generated` bucket.
|
||||||
15
go.mod
15
go.mod
@@ -1,5 +1,16 @@
|
|||||||
module mgit.msbls.de/m/ImaGen
|
module mgit.msbls.de/m/ImaGen
|
||||||
|
|
||||||
go 1.24
|
go 1.25.0
|
||||||
|
|
||||||
require gopkg.in/yaml.v3 v3.0.1
|
require (
|
||||||
|
github.com/jackc/pgx/v5 v5.9.2
|
||||||
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
|
github.com/kr/text v0.2.0 // indirect
|
||||||
|
github.com/rogpeppe/go-internal v1.14.1 // indirect
|
||||||
|
golang.org/x/text v0.29.0 // indirect
|
||||||
|
)
|
||||||
|
|||||||
33
go.sum
33
go.sum
@@ -1,4 +1,35 @@
|
|||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||||
|
github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw=
|
||||||
|
github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
|
||||||
|
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||||
|
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
|
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||||
|
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
|
||||||
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||||
|
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
|
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
||||||
|
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||||
|
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
|
||||||
|
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|||||||
213
internal/worker/worker.go
Normal file
213
internal/worker/worker.go
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
// Package worker consumes the imagen.jobs queue. It claims pending rows via
|
||||||
|
// an UPDATE-returning lock (single source of truth, no double-claim window),
|
||||||
|
// runs the supplied generation pipeline, then writes status + image_id back.
|
||||||
|
//
|
||||||
|
// The package is DB-agnostic: it talks to two small interfaces (Queue +
|
||||||
|
// Pipeline) so unit tests can drive the claim/transition logic with no real
|
||||||
|
// Postgres connection. cmd/imagen wires the pgx implementation.
|
||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Job is the slice of an imagen.jobs row the worker needs to drive a
|
||||||
|
// generation. Null columns from the DB are represented as zero values; the
|
||||||
|
// pipeline treats zero values as "use backend default" (same convention as
|
||||||
|
// backend.Request).
|
||||||
|
type Job struct {
|
||||||
|
ID string
|
||||||
|
OwnerUserID string
|
||||||
|
Prompt string
|
||||||
|
Backend string
|
||||||
|
Model string
|
||||||
|
Width int
|
||||||
|
Height int
|
||||||
|
Steps int
|
||||||
|
Seed int64
|
||||||
|
Style string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Outcome is what the pipeline reports back per job. ImageID is the
|
||||||
|
// imagen.images.id the cloud-sync produced. Empty ImageID with nil Err means
|
||||||
|
// the cloud-sync was skipped (config off) — we treat that as a failure for
|
||||||
|
// the worker since flexsiebels needs the image_id to render the result.
|
||||||
|
type Outcome struct {
|
||||||
|
ImageID string
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue is the persistence layer for the imagen.jobs table. Implementations
|
||||||
|
// must be safe for serialised single-worker use (concurrent claim across
|
||||||
|
// multiple worker processes is out of scope for v1 — the FOR UPDATE SKIP
|
||||||
|
// LOCKED clause in the pgx claim query covers it cheaply anyway).
|
||||||
|
type Queue interface {
|
||||||
|
// ClaimNextPending atomically marks the oldest pending row 'running' and
|
||||||
|
// returns it. Returns (nil, nil) when the queue is empty.
|
||||||
|
ClaimNextPending(ctx context.Context) (*Job, error)
|
||||||
|
// MarkDone records success: status='done', image_id, completed_at=now().
|
||||||
|
MarkDone(ctx context.Context, jobID, imageID string) error
|
||||||
|
// MarkFailed records failure: status='failed', error=msg, completed_at=now().
|
||||||
|
MarkFailed(ctx context.Context, jobID, errMsg string) error
|
||||||
|
// WaitForJob blocks until either a NOTIFY arrives on imagen_jobs, the
|
||||||
|
// timeout expires, or ctx is cancelled. Returns nil on notification or
|
||||||
|
// timeout; returns ctx.Err() on cancellation. Transient connection errors
|
||||||
|
// are returned so the caller can decide to reconnect.
|
||||||
|
WaitForJob(ctx context.Context, timeout time.Duration) error
|
||||||
|
// ResetStaleRunning marks any rows stuck in 'running' (e.g. left over
|
||||||
|
// from a crash before this process started) back to 'pending'. Called
|
||||||
|
// once at worker startup so the cold-start safety poll can pick them up.
|
||||||
|
ResetStaleRunning(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pipeline runs one generation and reports back the imagen.images.id (or an
|
||||||
|
// error). The implementation owns backend dispatch, prompt enrichment, disk
|
||||||
|
// write, and cloud-sync; the worker only orchestrates queue state.
|
||||||
|
type Pipeline interface {
|
||||||
|
Run(ctx context.Context, job Job) Outcome
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config is the runtime knob set for the worker loop.
|
||||||
|
type Config struct {
|
||||||
|
// PollInterval is the safety-poll cadence between LISTEN wakeups. Picking
|
||||||
|
// this too low wastes DB roundtrips; too high lets a dropped NOTIFY
|
||||||
|
// stall the queue. 5s is the spec'd default.
|
||||||
|
PollInterval time.Duration
|
||||||
|
// JobTimeout caps any single Pipeline.Run. A backend hang shouldn't
|
||||||
|
// freeze the queue forever.
|
||||||
|
JobTimeout time.Duration
|
||||||
|
// Logger receives one-line status events. nil means silent.
|
||||||
|
Logger func(format string, args ...any)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Worker is the orchestration loop. It is not reusable across Run calls.
|
||||||
|
type Worker struct {
|
||||||
|
q Queue
|
||||||
|
p Pipeline
|
||||||
|
cfg Config
|
||||||
|
|
||||||
|
// processingMu guards the in-flight job so SIGTERM-triggered shutdown
|
||||||
|
// waits for it to complete before returning.
|
||||||
|
processingMu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructs a Worker.
|
||||||
|
func New(q Queue, p Pipeline, cfg Config) *Worker {
|
||||||
|
if cfg.PollInterval <= 0 {
|
||||||
|
cfg.PollInterval = 5 * time.Second
|
||||||
|
}
|
||||||
|
if cfg.JobTimeout <= 0 {
|
||||||
|
cfg.JobTimeout = 5 * time.Minute
|
||||||
|
}
|
||||||
|
return &Worker{q: q, p: p, cfg: cfg}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run drives the consume loop until ctx is cancelled or a fatal queue error
|
||||||
|
// (e.g. unrecoverable DB drop) is returned. A LISTEN wait can fail with a
|
||||||
|
// transient transport error; the worker logs and continues so a temporary
|
||||||
|
// network blip doesn't take it down.
|
||||||
|
func (w *Worker) Run(ctx context.Context) error {
|
||||||
|
if err := w.q.ResetStaleRunning(ctx); err != nil {
|
||||||
|
w.log("worker: reset stale running rows: %v", err)
|
||||||
|
// Don't return — a stale row will eventually be visible to the poll
|
||||||
|
// path once flexsiebels gives up and resubmits, and we'd rather keep
|
||||||
|
// serving fresh jobs than crash here.
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Drain the queue: claim and process until empty.
|
||||||
|
if err := w.drain(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
||||||
|
w.log("worker: drain: %v", err)
|
||||||
|
}
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Wait for the next wake. WaitForJob covers both LISTEN and the
|
||||||
|
// timeout-based poll fallback; either returns nil and we loop.
|
||||||
|
if err := w.q.WaitForJob(ctx, w.cfg.PollInterval); err != nil {
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
w.log("worker: wait: %v (continuing)", err)
|
||||||
|
// Pace the retries so a totally-broken DB doesn't busy-spin.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case <-time.After(w.cfg.PollInterval):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// drain claims and processes every currently-pending job. The job-scoped
|
||||||
|
// context is derived from context.Background() so that a SIGTERM mid-job
|
||||||
|
// still lets the pipeline finish — that's the "no half-state on shutdown"
|
||||||
|
// guarantee the issue calls for.
|
||||||
|
func (w *Worker) drain(ctx context.Context) error {
|
||||||
|
for {
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
job, err := w.q.ClaimNextPending(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("claim: %w", err)
|
||||||
|
}
|
||||||
|
if job == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
w.processOne(*job)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processOne runs the pipeline for one already-claimed job and writes the
|
||||||
|
// outcome back to the queue. The job context is independent of the outer
|
||||||
|
// ctx so an in-flight job can finish even after SIGTERM.
|
||||||
|
func (w *Worker) processOne(job Job) {
|
||||||
|
w.processingMu.Lock()
|
||||||
|
defer w.processingMu.Unlock()
|
||||||
|
|
||||||
|
w.log("worker: processing job %s backend=%s", job.ID, job.Backend)
|
||||||
|
jobCtx, cancel := context.WithTimeout(context.Background(), w.cfg.JobTimeout)
|
||||||
|
defer cancel()
|
||||||
|
out := w.p.Run(jobCtx, job)
|
||||||
|
|
||||||
|
// Status-update uses Background ctx with a short timeout — we must
|
||||||
|
// always be able to record the outcome, otherwise the row sits in
|
||||||
|
// 'running' forever.
|
||||||
|
updCtx, updCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
|
defer updCancel()
|
||||||
|
if out.Err != nil {
|
||||||
|
w.log("worker: job %s failed: %v", job.ID, out.Err)
|
||||||
|
if err := w.q.MarkFailed(updCtx, job.ID, out.Err.Error()); err != nil {
|
||||||
|
w.log("worker: mark failed for %s: %v", job.ID, err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if out.ImageID == "" {
|
||||||
|
// Pipeline reported success but no imagen.images row — treat as
|
||||||
|
// failure because flexsiebels has nothing to link.
|
||||||
|
const msg = "pipeline did not return an imagen.images id (cloud sync misconfigured?)"
|
||||||
|
w.log("worker: job %s: %s", job.ID, msg)
|
||||||
|
if err := w.q.MarkFailed(updCtx, job.ID, msg); err != nil {
|
||||||
|
w.log("worker: mark failed for %s: %v", job.ID, err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := w.q.MarkDone(updCtx, job.ID, out.ImageID); err != nil {
|
||||||
|
w.log("worker: mark done for %s: %v", job.ID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.log("worker: job %s done image_id=%s", job.ID, out.ImageID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) log(format string, args ...any) {
|
||||||
|
if w.cfg.Logger != nil {
|
||||||
|
w.cfg.Logger(format, args...)
|
||||||
|
}
|
||||||
|
}
|
||||||
332
internal/worker/worker_test.go
Normal file
332
internal/worker/worker_test.go
Normal file
@@ -0,0 +1,332 @@
|
|||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// fakeQueue is a hand-rolled in-memory queue that mirrors the contract of a
|
||||||
|
// real Postgres-backed implementation: ClaimNextPending atomically takes one
|
||||||
|
// pending row and flips its status to "running", MarkDone/MarkFailed are
|
||||||
|
// idempotent terminal transitions, WaitForJob blocks until notified or until
|
||||||
|
// the timeout elapses.
|
||||||
|
type fakeQueue struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
pending []Job
|
||||||
|
state map[string]string // jobID -> status
|
||||||
|
last map[string]string // jobID -> error msg or image_id
|
||||||
|
notify chan struct{}
|
||||||
|
|
||||||
|
claimErr error
|
||||||
|
doneErr error
|
||||||
|
failErr error
|
||||||
|
resetErr error
|
||||||
|
|
||||||
|
claimed int
|
||||||
|
done int
|
||||||
|
failed int
|
||||||
|
resets int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFakeQueue(jobs ...Job) *fakeQueue {
|
||||||
|
q := &fakeQueue{
|
||||||
|
state: make(map[string]string),
|
||||||
|
last: make(map[string]string),
|
||||||
|
notify: make(chan struct{}, 16),
|
||||||
|
}
|
||||||
|
for _, j := range jobs {
|
||||||
|
q.pending = append(q.pending, j)
|
||||||
|
q.state[j.ID] = "pending"
|
||||||
|
}
|
||||||
|
return q
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *fakeQueue) ClaimNextPending(ctx context.Context) (*Job, error) {
|
||||||
|
q.mu.Lock()
|
||||||
|
defer q.mu.Unlock()
|
||||||
|
if q.claimErr != nil {
|
||||||
|
return nil, q.claimErr
|
||||||
|
}
|
||||||
|
if len(q.pending) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
j := q.pending[0]
|
||||||
|
q.pending = q.pending[1:]
|
||||||
|
q.state[j.ID] = "running"
|
||||||
|
q.claimed++
|
||||||
|
return &j, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *fakeQueue) MarkDone(ctx context.Context, jobID, imageID string) error {
|
||||||
|
q.mu.Lock()
|
||||||
|
defer q.mu.Unlock()
|
||||||
|
if q.doneErr != nil {
|
||||||
|
return q.doneErr
|
||||||
|
}
|
||||||
|
q.state[jobID] = "done"
|
||||||
|
q.last[jobID] = imageID
|
||||||
|
q.done++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *fakeQueue) MarkFailed(ctx context.Context, jobID, msg string) error {
|
||||||
|
q.mu.Lock()
|
||||||
|
defer q.mu.Unlock()
|
||||||
|
if q.failErr != nil {
|
||||||
|
return q.failErr
|
||||||
|
}
|
||||||
|
q.state[jobID] = "failed"
|
||||||
|
q.last[jobID] = msg
|
||||||
|
q.failed++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *fakeQueue) WaitForJob(ctx context.Context, timeout time.Duration) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-q.notify:
|
||||||
|
return nil
|
||||||
|
case <-time.After(timeout):
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *fakeQueue) ResetStaleRunning(ctx context.Context) error {
|
||||||
|
q.mu.Lock()
|
||||||
|
defer q.mu.Unlock()
|
||||||
|
q.resets++
|
||||||
|
return q.resetErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// pingNotify simulates an INSERT-trigger NOTIFY by waking WaitForJob.
|
||||||
|
func (q *fakeQueue) pingNotify() {
|
||||||
|
select {
|
||||||
|
case q.notify <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// stub pipeline.
|
||||||
|
type fakePipeline struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
results map[string]Outcome // by job.ID; "" key = default outcome
|
||||||
|
calls int
|
||||||
|
delay time.Duration
|
||||||
|
lastJob Job
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *fakePipeline) Run(ctx context.Context, job Job) Outcome {
|
||||||
|
p.mu.Lock()
|
||||||
|
p.calls++
|
||||||
|
p.lastJob = job
|
||||||
|
delay := p.delay
|
||||||
|
out, ok := p.results[job.ID]
|
||||||
|
if !ok {
|
||||||
|
out = p.results[""]
|
||||||
|
}
|
||||||
|
p.mu.Unlock()
|
||||||
|
if delay > 0 {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return Outcome{Err: ctx.Err()}
|
||||||
|
case <-time.After(delay):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWorker_DonePath(t *testing.T) {
|
||||||
|
q := newFakeQueue(
|
||||||
|
Job{ID: "j1", Prompt: "a", Backend: "mock"},
|
||||||
|
)
|
||||||
|
p := &fakePipeline{results: map[string]Outcome{"j1": {ImageID: "img-1"}}}
|
||||||
|
w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second})
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go func() {
|
||||||
|
time.Sleep(80 * time.Millisecond)
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
if err := w.Run(ctx); err != nil {
|
||||||
|
t.Fatalf("Run: %v", err)
|
||||||
|
}
|
||||||
|
if got := q.state["j1"]; got != "done" {
|
||||||
|
t.Fatalf("state=%q want done", got)
|
||||||
|
}
|
||||||
|
if got := q.last["j1"]; got != "img-1" {
|
||||||
|
t.Fatalf("image_id=%q want img-1", got)
|
||||||
|
}
|
||||||
|
if q.done != 1 || q.failed != 0 {
|
||||||
|
t.Fatalf("counts: done=%d failed=%d", q.done, q.failed)
|
||||||
|
}
|
||||||
|
if p.calls != 1 {
|
||||||
|
t.Fatalf("pipeline calls=%d want 1", p.calls)
|
||||||
|
}
|
||||||
|
if q.resets != 1 {
|
||||||
|
t.Fatalf("ResetStaleRunning calls=%d want 1", q.resets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWorker_FailedPath_RecordsErrorText(t *testing.T) {
|
||||||
|
q := newFakeQueue(Job{ID: "j1", Prompt: "a", Backend: "mock"})
|
||||||
|
p := &fakePipeline{results: map[string]Outcome{"j1": {Err: errors.New("backend unreachable")}}}
|
||||||
|
w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second})
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go func() { time.Sleep(80 * time.Millisecond); cancel() }()
|
||||||
|
_ = w.Run(ctx)
|
||||||
|
|
||||||
|
if got := q.state["j1"]; got != "failed" {
|
||||||
|
t.Fatalf("state=%q want failed", got)
|
||||||
|
}
|
||||||
|
if got := q.last["j1"]; got != "backend unreachable" {
|
||||||
|
t.Fatalf("error=%q want %q", got, "backend unreachable")
|
||||||
|
}
|
||||||
|
if q.done != 0 || q.failed != 1 {
|
||||||
|
t.Fatalf("counts: done=%d failed=%d", q.done, q.failed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWorker_MissingImageID_TreatedAsFailure(t *testing.T) {
|
||||||
|
q := newFakeQueue(Job{ID: "j1", Prompt: "a", Backend: "mock"})
|
||||||
|
// Outcome has neither Err nor ImageID — pipeline silently swallowed
|
||||||
|
// cloud-sync. flexsiebels needs the image_id; without it, fail the job.
|
||||||
|
p := &fakePipeline{results: map[string]Outcome{"j1": {}}}
|
||||||
|
w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second})
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go func() { time.Sleep(80 * time.Millisecond); cancel() }()
|
||||||
|
_ = w.Run(ctx)
|
||||||
|
|
||||||
|
if got := q.state["j1"]; got != "failed" {
|
||||||
|
t.Fatalf("state=%q want failed", got)
|
||||||
|
}
|
||||||
|
if q.last["j1"] == "" {
|
||||||
|
t.Fatalf("expected non-empty error explanation for missing image_id")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWorker_DrainsMultipleBeforeWaiting(t *testing.T) {
|
||||||
|
q := newFakeQueue(
|
||||||
|
Job{ID: "j1", Backend: "mock"},
|
||||||
|
Job{ID: "j2", Backend: "mock"},
|
||||||
|
Job{ID: "j3", Backend: "mock"},
|
||||||
|
)
|
||||||
|
p := &fakePipeline{results: map[string]Outcome{"": {ImageID: "img"}}}
|
||||||
|
w := New(q, p, Config{PollInterval: 200 * time.Millisecond, JobTimeout: time.Second})
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go func() { time.Sleep(60 * time.Millisecond); cancel() }()
|
||||||
|
_ = w.Run(ctx)
|
||||||
|
|
||||||
|
for _, id := range []string{"j1", "j2", "j3"} {
|
||||||
|
if got := q.state[id]; got != "done" {
|
||||||
|
t.Fatalf("%s state=%q want done", id, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if q.done != 3 {
|
||||||
|
t.Fatalf("done=%d want 3", q.done)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWorker_NotifyWakesEarlierThanPoll(t *testing.T) {
|
||||||
|
q := newFakeQueue()
|
||||||
|
p := &fakePipeline{results: map[string]Outcome{"": {ImageID: "img"}}}
|
||||||
|
// Set poll interval high so a working LISTEN is required to see the job
|
||||||
|
// promptly. Without NOTIFY plumbing this test would time out the worker
|
||||||
|
// before drain ever runs.
|
||||||
|
w := New(q, p, Config{PollInterval: 5 * time.Second, JobTimeout: time.Second})
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
_ = w.Run(ctx)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
// Append a job and ping the wake channel.
|
||||||
|
q.mu.Lock()
|
||||||
|
q.pending = append(q.pending, Job{ID: "late", Backend: "mock"})
|
||||||
|
q.state["late"] = "pending"
|
||||||
|
q.mu.Unlock()
|
||||||
|
q.pingNotify()
|
||||||
|
|
||||||
|
// Give the worker a beat to claim + process.
|
||||||
|
deadline := time.Now().Add(500 * time.Millisecond)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
q.mu.Lock()
|
||||||
|
s := q.state["late"]
|
||||||
|
q.mu.Unlock()
|
||||||
|
if s == "done" {
|
||||||
|
cancel()
|
||||||
|
<-done
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
}
|
||||||
|
t.Fatalf("worker did not pick up the late job within the 500ms window — NOTIFY wake-up path is broken")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWorker_HonoursContextCancellation(t *testing.T) {
|
||||||
|
q := newFakeQueue()
|
||||||
|
p := &fakePipeline{results: map[string]Outcome{"": {ImageID: "img"}}}
|
||||||
|
w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: time.Second})
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
start := time.Now()
|
||||||
|
if err := w.Run(ctx); err != nil {
|
||||||
|
t.Fatalf("Run: %v", err)
|
||||||
|
}
|
||||||
|
if dur := time.Since(start); dur > 200*time.Millisecond {
|
||||||
|
t.Fatalf("worker did not exit promptly on ctx cancel: %v", dur)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWorker_InflightJobFinishesAfterShutdown(t *testing.T) {
|
||||||
|
q := newFakeQueue(Job{ID: "long", Backend: "mock"})
|
||||||
|
p := &fakePipeline{
|
||||||
|
results: map[string]Outcome{"long": {ImageID: "img-long"}},
|
||||||
|
delay: 120 * time.Millisecond,
|
||||||
|
}
|
||||||
|
// Short JobTimeout would also kill the in-flight job; give it enough
|
||||||
|
// budget so the test exercises the shutdown-during-job path.
|
||||||
|
w := New(q, p, Config{PollInterval: 10 * time.Millisecond, JobTimeout: 5 * time.Second})
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go func() {
|
||||||
|
// Let the job start, then cancel mid-flight.
|
||||||
|
time.Sleep(30 * time.Millisecond)
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
_ = w.Run(ctx)
|
||||||
|
if got := q.state["long"]; got != "done" {
|
||||||
|
t.Fatalf("state=%q want done (in-flight job should finish even on shutdown)", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWorker_TransientClaimErrorDoesNotKillLoop(t *testing.T) {
|
||||||
|
// First claim returns an error; the loop should log and try again on the
|
||||||
|
// next wake — it must not propagate the error and exit.
|
||||||
|
q := newFakeQueue(Job{ID: "j1", Backend: "mock"})
|
||||||
|
q.claimErr = fmt.Errorf("transient: connection reset")
|
||||||
|
p := &fakePipeline{results: map[string]Outcome{"j1": {ImageID: "img"}}}
|
||||||
|
w := New(q, p, Config{PollInterval: 20 * time.Millisecond, JobTimeout: time.Second})
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// Heal the claim error after a beat so the second drain succeeds.
|
||||||
|
go func() {
|
||||||
|
time.Sleep(40 * time.Millisecond)
|
||||||
|
q.mu.Lock()
|
||||||
|
q.claimErr = nil
|
||||||
|
q.mu.Unlock()
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
if err := w.Run(ctx); err != nil {
|
||||||
|
t.Fatalf("Run returned: %v (transient claim errors should not kill the loop)", err)
|
||||||
|
}
|
||||||
|
if got := q.state["j1"]; got != "done" {
|
||||||
|
t.Fatalf("state=%q want done", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
22
scripts/imagen-worker.env.example
Normal file
22
scripts/imagen-worker.env.example
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
# Environment for the imagen-worker.service systemd unit.
|
||||||
|
# Copy to ~/.dotfiles/.env.imagen-worker and fill in real values.
|
||||||
|
# Never commit the populated file — it carries the Supabase service-role key.
|
||||||
|
|
||||||
|
# Direct Postgres DSN for LISTEN/NOTIFY + imagen.jobs UPDATE statements.
|
||||||
|
# PostgREST cannot LISTEN, so the worker connects to Postgres directly.
|
||||||
|
# Host + port + password come from the msupabase compose env on mlake.
|
||||||
|
IMAGEN_WORKER_DATABASE_URL=postgres://postgres:CHANGE_ME@100.99.98.201:6789/postgres?sslmode=disable
|
||||||
|
|
||||||
|
# PostgREST endpoint for the imagen.images cloud-sync writer (same as
|
||||||
|
# `imagen generate`'s cloud-sync code path).
|
||||||
|
SUPABASE_URL=https://supa.flexsiebels.de
|
||||||
|
SUPABASE_SERVICE_KEY=CHANGE_ME
|
||||||
|
|
||||||
|
# Default owner_user_id. Per-job owner from the imagen.jobs row overrides
|
||||||
|
# this, so it's only used as a fallback when a job arrives with a NULL
|
||||||
|
# owner_user_id — which the schema disallows. Keep it set for safety.
|
||||||
|
IMAGEN_OWNER_USER_ID=ac6c9501-3757-4a6d-8b97-2cff4288382b
|
||||||
|
|
||||||
|
# Optional: REPLICATE_API_TOKEN if any imagen.jobs.backend may resolve to
|
||||||
|
# a Replicate adapter instance.
|
||||||
|
# REPLICATE_API_TOKEN=CHANGE_ME
|
||||||
19
scripts/imagen-worker.service
Normal file
19
scripts/imagen-worker.service
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
[Unit]
|
||||||
|
Description=ImaGen worker (consumes imagen.jobs queue)
|
||||||
|
Documentation=https://mgit.msbls.de/m/ImaGen/issues/8
|
||||||
|
Wants=network-online.target
|
||||||
|
After=network-online.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
ExecStart=%h/dev/ImaGen/bin/imagen worker
|
||||||
|
WorkingDirectory=%h/dev/ImaGen
|
||||||
|
EnvironmentFile=%h/.dotfiles/.env.imagen-worker
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=5
|
||||||
|
# Give the worker time to finish an in-flight generation on shutdown
|
||||||
|
# (FLUX dev up to ~30s, plus the cloud-sync write-back).
|
||||||
|
TimeoutStopSec=60
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=default.target
|
||||||
Reference in New Issue
Block a user