Async write path for the flexsiebels owner-mode UI: flexsiebels INSERTs into imagen.jobs, the worker on mRiver claims pending rows via LISTEN/NOTIFY + 5s safety poll, runs the same generate pipeline imagen generate uses, and writes the result through internal/cloud into imagen.images. - Schema migration imagen_jobs_init: table + status CHECK + two indexes + owner-scoped RLS + grants + AFTER INSERT trigger publishing on the imagen_jobs channel via pg_notify. - internal/worker: DB-agnostic loop over a Queue interface. Drains the whole pending backlog on each wake. Job-scoped contexts are derived from Background so SIGTERM lets the in-flight generation finish (no half-state). ResetStaleRunning at startup unsticks rows left over from a previous crash. Eight unit tests cover the done / failed / missing-id / drain / NOTIFY-wake / shutdown / transient-error paths against a fake queue (no real Postgres in CI). - cmd/imagen/worker.go: pgx-backed Queue (one dedicated conn for LISTEN + UPDATE), plus the workerPipeline that reuses buildBackend + attachUsageSink + prompt.Apply + buildWriter + maybeCloudSync. The per-job owner_user_id overrides the env-level fallback so each row in imagen.images is attributed correctly. - maybeCloudSync now returns (*cloud.SyncResult, error) so the worker can link imagen.jobs.image_id to the inserted imagen.images row. The CLI generate path keeps printing its stderr summary unchanged. - scripts/imagen-worker.service + .env.example for the systemd --user unit on mRiver. EnvironmentFile lives in ~/.dotfiles and is never committed. - docs/setup-worker-mriver.md walks through installation + the spec's SQL-INSERT smoke; docs/architecture.md grows an "async write path" section. - worker_integration_test.go (env-guarded by IMAGEN_WORKER_INTEGRATION=1) drives one real job through the full pipeline against msupabase using the mock backend, then verifies imagen.images + Storage object landed and the row flipped to done with image_id linked. Verified end-to-end: pickup latency ~7ms, total 74ms, failure path captures error text.
130 lines
4.0 KiB
Go
130 lines
4.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
|
|
"mgit.msbls.de/m/ImaGen/internal/config"
|
|
"mgit.msbls.de/m/ImaGen/internal/worker"
|
|
)
|
|
|
|
// TestWorker_Integration_EndToEnd runs the full pipeline against a real
|
|
// msupabase instance: insert a row into imagen.jobs, let the worker claim
|
|
// it, generate via the mock backend (no Replicate spend, no ComfyUI
|
|
// dependency), write to Supabase Storage + imagen.images, then flip the job
|
|
// to 'done' with the linked image_id.
|
|
//
|
|
// Guarded by IMAGEN_WORKER_INTEGRATION=1. Required env beyond that:
|
|
//
|
|
// IMAGEN_WORKER_DATABASE_URL postgres DSN (direct, not PostgREST)
|
|
// SUPABASE_URL e.g. https://supa.flexsiebels.de
|
|
// SUPABASE_SERVICE_KEY service-role JWT
|
|
// IMAGEN_OWNER_USER_ID UUID of an auth.users row (RLS fallback)
|
|
//
|
|
// The test creates and later deletes its own job row so repeated runs don't
|
|
// leave debris.
|
|
func TestWorker_Integration_EndToEnd(t *testing.T) {
|
|
if os.Getenv("IMAGEN_WORKER_INTEGRATION") != "1" {
|
|
t.Skip("set IMAGEN_WORKER_INTEGRATION=1 to run the integration test")
|
|
}
|
|
dsn := os.Getenv("IMAGEN_WORKER_DATABASE_URL")
|
|
if dsn == "" {
|
|
t.Fatal("IMAGEN_WORKER_DATABASE_URL must be set for the integration test")
|
|
}
|
|
if os.Getenv("SUPABASE_URL") == "" || os.Getenv("SUPABASE_SERVICE_KEY") == "" {
|
|
t.Fatal("SUPABASE_URL and SUPABASE_SERVICE_KEY must be set for the integration test")
|
|
}
|
|
owner := os.Getenv("IMAGEN_OWNER_USER_ID")
|
|
if owner == "" {
|
|
t.Fatal("IMAGEN_OWNER_USER_ID must be set for the integration test")
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
|
|
defer cancel()
|
|
|
|
q, err := dialQueue(ctx, dsn)
|
|
if err != nil {
|
|
t.Fatalf("dialQueue: %v", err)
|
|
}
|
|
defer q.Close()
|
|
|
|
// Insert the test job on a separate connection (the worker's conn is
|
|
// busy LISTENing). Mock backend = no external dependency.
|
|
insertConn, err := pgx.Connect(ctx, dsn)
|
|
if err != nil {
|
|
t.Fatalf("insert conn: %v", err)
|
|
}
|
|
defer insertConn.Close(ctx)
|
|
|
|
var jobID string
|
|
prompt := fmt.Sprintf("imagen integration test %d", time.Now().UnixNano())
|
|
err = insertConn.QueryRow(ctx, `
|
|
INSERT INTO imagen.jobs (owner_user_id, prompt, backend, width, height)
|
|
VALUES ($1, $2, 'mock', 64, 64)
|
|
RETURNING id`,
|
|
owner, prompt).Scan(&jobID)
|
|
if err != nil {
|
|
t.Fatalf("insert job: %v", err)
|
|
}
|
|
t.Logf("inserted imagen.jobs id=%s", jobID)
|
|
// Tidy up at the end of the test so a re-run starts clean.
|
|
defer func() {
|
|
cleanup, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_, _ = insertConn.Exec(cleanup, `DELETE FROM imagen.jobs WHERE id=$1`, jobID)
|
|
}()
|
|
|
|
// Use a per-test temp dir so the generated PNG doesn't litter the repo.
|
|
tmpDir := t.TempDir()
|
|
cfg := &config.Config{Output: config.OutputConfig{Directory: tmpDir}}
|
|
p := &workerPipeline{cfg: cfg}
|
|
w := worker.New(q, p, worker.Config{
|
|
PollInterval: 1 * time.Second,
|
|
JobTimeout: 30 * time.Second,
|
|
Logger: func(format string, a ...any) { t.Logf("worker: "+format, a...) },
|
|
})
|
|
|
|
// Run the worker until it processes one job (the one we just inserted)
|
|
// or the test context times out.
|
|
runCtx, runCancel := context.WithCancel(ctx)
|
|
done := make(chan struct{})
|
|
go func() {
|
|
_ = w.Run(runCtx)
|
|
close(done)
|
|
}()
|
|
|
|
// Poll for completion.
|
|
deadline := time.Now().Add(60 * time.Second)
|
|
var status, imageID string
|
|
for time.Now().Before(deadline) {
|
|
err = insertConn.QueryRow(ctx,
|
|
`SELECT status, COALESCE(image_id::text,'') FROM imagen.jobs WHERE id=$1`,
|
|
jobID).Scan(&status, &imageID)
|
|
if err != nil {
|
|
t.Fatalf("poll: %v", err)
|
|
}
|
|
if status == "done" || status == "failed" {
|
|
break
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
runCancel()
|
|
<-done
|
|
|
|
if status != "done" {
|
|
var errText string
|
|
_ = insertConn.QueryRow(ctx,
|
|
`SELECT COALESCE(error,'') FROM imagen.jobs WHERE id=$1`, jobID).Scan(&errText)
|
|
t.Fatalf("job not done within timeout: status=%q error=%q", status, errText)
|
|
}
|
|
if imageID == "" {
|
|
t.Fatalf("job done but image_id is empty")
|
|
}
|
|
t.Logf("job done: image_id=%s", imageID)
|
|
}
|