Schema (applied via migration imagen_series_init): - imagen.series parent table (prompt + params + count CHECK 1..10 + selected_image_id) - imagen.jobs += series_id (FK) + series_idx - imagen.images += series_id (FK) - Owner-scoped RLS on series (SELECT/INSERT/UPDATE) + grants - Partial indexes WHERE series_id IS NOT NULL on both child tables Worker pipeline: - worker.Job += SeriesID, populated from imagen.jobs.series_id via the claim query. - cloud.SyncRequest += SeriesID; insertRow writes series_id when non-empty, omits the key when empty so solo runs leave the column NULL. - maybeCloudSync threads seriesID from job.SeriesID through to the cloud sink. generate.go (CLI) always passes "" — solo path unchanged. Tests: - worker: SeriesID propagates from Job to fakePipeline.lastJob unchanged, solo job keeps it empty. - cloud: SyncRequest.SeriesID lands as row.series_id in the POST body; empty SeriesID omits the key entirely. Refs ImaGen#9.
293 lines
9.9 KiB
Go
293 lines
9.9 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
|
|
"mgit.msbls.de/m/ImaGen/internal/backend"
|
|
"mgit.msbls.de/m/ImaGen/internal/config"
|
|
"mgit.msbls.de/m/ImaGen/internal/output"
|
|
"mgit.msbls.de/m/ImaGen/internal/prompt"
|
|
"mgit.msbls.de/m/ImaGen/internal/worker"
|
|
)
|
|
|
|
// runWorker is the `imagen worker` subcommand: a long-running daemon that
|
|
// consumes the imagen.jobs queue and writes results into imagen.images via
|
|
// the same cloud-sync path generate uses.
|
|
func runWorker(ctx context.Context, args []string) error {
|
|
fs := flag.NewFlagSet("worker", flag.ContinueOnError)
|
|
var (
|
|
configPath string
|
|
pollInterval time.Duration
|
|
jobTimeout time.Duration
|
|
)
|
|
fs.StringVar(&configPath, "config", "", "config file path (default: ~/.config/imagen.yaml)")
|
|
fs.DurationVar(&pollInterval, "poll-interval", 5*time.Second, "safety-poll cadence between LISTEN wakeups")
|
|
fs.DurationVar(&jobTimeout, "job-timeout", 5*time.Minute, "max wall-time per job before the worker marks it failed")
|
|
fs.Usage = func() {
|
|
fmt.Fprintln(fs.Output(), `Usage: imagen worker [flags]
|
|
|
|
Long-running daemon. LISTENs on the Postgres 'imagen_jobs' channel and polls
|
|
imagen.jobs every --poll-interval as a safety net, claims pending rows, runs
|
|
the generation pipeline, then updates the row with status + image_id.
|
|
|
|
Env:
|
|
IMAGEN_WORKER_DATABASE_URL Postgres DSN for direct LISTEN + UPDATE.
|
|
Required (PostgREST cannot LISTEN).
|
|
SUPABASE_URL, SUPABASE_SERVICE_KEY, IMAGEN_OWNER_USER_ID
|
|
Reused from generate's cloud-sync path; the
|
|
worker writes imagen.images rows through the
|
|
same code path. Per-job owner_user_id from the
|
|
job row overrides IMAGEN_OWNER_USER_ID.`)
|
|
fs.PrintDefaults()
|
|
}
|
|
if err := fs.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
cfg, cfgErr := config.Load(configPath)
|
|
if cfgErr != nil && !os.IsNotExist(cfgErr) {
|
|
return cfgErr
|
|
}
|
|
|
|
dsn := os.Getenv("IMAGEN_WORKER_DATABASE_URL")
|
|
if dsn == "" {
|
|
return userErr("IMAGEN_WORKER_DATABASE_URL not set; the worker needs a direct Postgres DSN for LISTEN/NOTIFY")
|
|
}
|
|
|
|
q, err := dialQueue(ctx, dsn)
|
|
if err != nil {
|
|
return fmt.Errorf("queue: %w", err)
|
|
}
|
|
defer q.Close()
|
|
|
|
p := &workerPipeline{cfg: cfg}
|
|
w := worker.New(q, p, worker.Config{
|
|
PollInterval: pollInterval,
|
|
JobTimeout: jobTimeout,
|
|
Logger: func(format string, a ...any) { fmt.Fprintf(os.Stderr, format+"\n", a...) },
|
|
})
|
|
fmt.Fprintln(os.Stderr, "imagen worker: ready (poll-interval", pollInterval, "job-timeout", jobTimeout, ")")
|
|
return w.Run(ctx)
|
|
}
|
|
|
|
// pgxQueue is the production Queue. It opens one dedicated connection used
|
|
// for both LISTEN (long-lived) and UPDATE operations. A second connection
|
|
// would split state needlessly — a single worker process processes one job
|
|
// at a time so the connection is never contended.
|
|
type pgxQueue struct {
|
|
conn *pgx.Conn
|
|
}
|
|
|
|
func dialQueue(ctx context.Context, dsn string) (*pgxQueue, error) {
|
|
conn, err := pgx.Connect(ctx, dsn)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pgx.Connect: %w", err)
|
|
}
|
|
if _, err := conn.Exec(ctx, "LISTEN imagen_jobs"); err != nil {
|
|
conn.Close(ctx)
|
|
return nil, fmt.Errorf("LISTEN imagen_jobs: %w", err)
|
|
}
|
|
return &pgxQueue{conn: conn}, nil
|
|
}
|
|
|
|
func (q *pgxQueue) Close() {
|
|
if q == nil || q.conn == nil {
|
|
return
|
|
}
|
|
// Best-effort: a 5s budget is enough to send a polite TerminateMessage.
|
|
shutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_ = q.conn.Close(shutdown)
|
|
}
|
|
|
|
// ClaimNextPending atomically marks the oldest pending row 'running' and
|
|
// returns it. FOR UPDATE SKIP LOCKED is belt + braces against a second worker
|
|
// process — out of scope for v1 but cheap insurance.
|
|
func (q *pgxQueue) ClaimNextPending(ctx context.Context) (*worker.Job, error) {
|
|
// series_id is nullable on imagen.jobs (solo run when NULL); cast to text
|
|
// with COALESCE so pgx scans into a plain Go string. Empty string =
|
|
// solo run; the pipeline skips series propagation in that case.
|
|
const stmt = `
|
|
UPDATE imagen.jobs
|
|
SET status='running', started_at=now()
|
|
WHERE id = (
|
|
SELECT id FROM imagen.jobs
|
|
WHERE status='pending'
|
|
ORDER BY created_at
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING id, owner_user_id, prompt, backend,
|
|
COALESCE(model,''),
|
|
COALESCE(width, 0), COALESCE(height, 0),
|
|
COALESCE(steps, 0), COALESCE(seed, 0),
|
|
COALESCE(style,''),
|
|
COALESCE(series_id::text, '')`
|
|
var j worker.Job
|
|
err := q.conn.QueryRow(ctx, stmt).Scan(
|
|
&j.ID, &j.OwnerUserID, &j.Prompt, &j.Backend,
|
|
&j.Model, &j.Width, &j.Height, &j.Steps, &j.Seed, &j.Style,
|
|
&j.SeriesID,
|
|
)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &j, nil
|
|
}
|
|
|
|
func (q *pgxQueue) MarkDone(ctx context.Context, jobID, imageID string) error {
|
|
_, err := q.conn.Exec(ctx,
|
|
`UPDATE imagen.jobs SET status='done', image_id=$2, completed_at=now() WHERE id=$1`,
|
|
jobID, imageID)
|
|
return err
|
|
}
|
|
|
|
func (q *pgxQueue) MarkFailed(ctx context.Context, jobID, msg string) error {
|
|
// Trim outrageously long error text so a 10MB stack-trace doesn't end up
|
|
// in the row (callers see a summary, full text goes to stderr / logs).
|
|
const maxLen = 2000
|
|
if len(msg) > maxLen {
|
|
msg = msg[:maxLen] + "... [truncated]"
|
|
}
|
|
_, err := q.conn.Exec(ctx,
|
|
`UPDATE imagen.jobs SET status='failed', error=$2, completed_at=now() WHERE id=$1`,
|
|
jobID, msg)
|
|
return err
|
|
}
|
|
|
|
// WaitForJob blocks until a NOTIFY arrives on imagen_jobs, the timeout fires,
|
|
// or ctx is cancelled. Notifications during a previous processJob are queued
|
|
// by pgx and delivered on the next call — we don't lose wake-ups even when
|
|
// processing took longer than poll-interval.
|
|
func (q *pgxQueue) WaitForJob(ctx context.Context, timeout time.Duration) error {
|
|
waitCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
_, err := q.conn.WaitForNotification(waitCtx)
|
|
if err != nil {
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return nil // poll cadence fired
|
|
}
|
|
if errors.Is(err, context.Canceled) {
|
|
return context.Canceled
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ResetStaleRunning bumps any rows stuck in 'running' back to 'pending' so
|
|
// they get re-claimed. Called once at startup. A row stuck in 'running' came
|
|
// from a previous worker crash; without this, flexsiebels would poll
|
|
// forever on a job nobody is processing.
|
|
func (q *pgxQueue) ResetStaleRunning(ctx context.Context) error {
|
|
_, err := q.conn.Exec(ctx,
|
|
`UPDATE imagen.jobs SET status='pending', started_at=NULL WHERE status='running'`)
|
|
return err
|
|
}
|
|
|
|
// workerPipeline is the Pipeline implementation that drives a single job
|
|
// through buildBackend → prompt enrichment → generate → write disk →
|
|
// cloud-sync, then returns the imagen.images.id back to the worker so it
|
|
// can link the row.
|
|
type workerPipeline struct {
|
|
cfg *config.Config
|
|
}
|
|
|
|
func (p *workerPipeline) Run(ctx context.Context, job worker.Job) worker.Outcome {
|
|
if job.OwnerUserID == "" {
|
|
return worker.Outcome{Err: fmt.Errorf("job %s: missing owner_user_id", job.ID)}
|
|
}
|
|
if job.Prompt == "" {
|
|
return worker.Outcome{Err: fmt.Errorf("job %s: empty prompt", job.ID)}
|
|
}
|
|
if job.Backend == "" {
|
|
return worker.Outcome{Err: fmt.Errorf("job %s: missing backend", job.ID)}
|
|
}
|
|
|
|
be, err := buildBackend(p.cfg, job.Backend)
|
|
if err != nil {
|
|
return worker.Outcome{Err: fmt.Errorf("backend %q: %w", job.Backend, err)}
|
|
}
|
|
attachUsageSink(be)
|
|
|
|
finalPrompt, err := prompt.Apply(job.Prompt, job.Style)
|
|
if err != nil {
|
|
return worker.Outcome{Err: fmt.Errorf("style: %w", err)}
|
|
}
|
|
|
|
req := backend.Request{
|
|
Prompt: finalPrompt,
|
|
Width: job.Width,
|
|
Height: job.Height,
|
|
Steps: job.Steps,
|
|
Seed: job.Seed,
|
|
Style: job.Style,
|
|
}
|
|
res, err := be.Generate(ctx, req)
|
|
if err != nil {
|
|
return worker.Outcome{Err: fmt.Errorf("generate: %w", err)}
|
|
}
|
|
defer res.ImageReader.Close()
|
|
|
|
writer := buildWriter(p.cfg, false)
|
|
in := output.Inputs{
|
|
Prompt: job.Prompt,
|
|
Backend: be.Name(),
|
|
Seed: seedFromMetadata(res.Metadata, job.Seed),
|
|
Ext: extFromMime(res.MimeType),
|
|
Metadata: res.Metadata,
|
|
}
|
|
paths, err := writer.Write(res.ImageReader, in)
|
|
if err != nil {
|
|
return worker.Outcome{Err: fmt.Errorf("write disk: %w", err)}
|
|
}
|
|
|
|
// Worker is queue-driven: cloud-sync is mandatory because flexsiebels
|
|
// needs imagen.images.id to render the result. Pass cloud_sync=on via
|
|
// the override path (third arg = ownerUserID); we set the mode by
|
|
// disallowing the 'off' branch through the cfg later if the user
|
|
// explicitly turned it off in config.
|
|
if cloudModeOff(p.cfg) {
|
|
// We refuse to silently drop a queued job. If cloud sync is off in
|
|
// config, the worker can't serve flexsiebels at all.
|
|
return worker.Outcome{Err: fmt.Errorf("output.cloud_sync=off in config; the worker requires cloud_sync=on or auto")}
|
|
}
|
|
syncRes, syncErr := maybeCloudSync(ctx, p.cfg, false, job.OwnerUserID, job.SeriesID, paths, in, res, dimOrFallback(job.Width, res, "width"), dimOrFallback(job.Height, res, "height"))
|
|
if syncErr != nil {
|
|
return worker.Outcome{Err: fmt.Errorf("cloud sync: %w", syncErr)}
|
|
}
|
|
if syncRes == nil || syncRes.ImageID == "" {
|
|
return worker.Outcome{Err: fmt.Errorf("cloud sync returned no imagen.images id (check SUPABASE_URL + SUPABASE_SERVICE_KEY)")}
|
|
}
|
|
return worker.Outcome{ImageID: syncRes.ImageID}
|
|
}
|
|
|
|
func cloudModeOff(cfg *config.Config) bool {
|
|
if cfg == nil {
|
|
return false
|
|
}
|
|
return strings.EqualFold(cfg.Output.CloudSync, "off")
|
|
}
|
|
|
|
// dimOrFallback returns job.<dim> when the job specified one, otherwise the
|
|
// dimension reported by the backend's metadata. Some backends (Replicate
|
|
// when given an aspect ratio) round the requested size to their nearest
|
|
// supported value; this keeps the row honest about what was actually generated.
|
|
func dimOrFallback(jobDim int, res *backend.Result, key string) int {
|
|
if jobDim > 0 {
|
|
return jobDim
|
|
}
|
|
return metaInt(res.Metadata, key)
|
|
}
|