package main import ( "context" "fmt" "os" "testing" "time" "github.com/jackc/pgx/v5" "mgit.msbls.de/m/ImaGen/internal/config" "mgit.msbls.de/m/ImaGen/internal/worker" ) // TestWorker_Integration_EndToEnd runs the full pipeline against a real // msupabase instance: insert a row into imagen.jobs, let the worker claim // it, generate via the mock backend (no Replicate spend, no ComfyUI // dependency), write to Supabase Storage + imagen.images, then flip the job // to 'done' with the linked image_id. // // Guarded by IMAGEN_WORKER_INTEGRATION=1. Required env beyond that: // // IMAGEN_WORKER_DATABASE_URL postgres DSN (direct, not PostgREST) // SUPABASE_URL e.g. https://supa.flexsiebels.de // SUPABASE_SERVICE_KEY service-role JWT // IMAGEN_OWNER_USER_ID UUID of an auth.users row (RLS fallback) // // The test creates and later deletes its own job row so repeated runs don't // leave debris. func TestWorker_Integration_EndToEnd(t *testing.T) { if os.Getenv("IMAGEN_WORKER_INTEGRATION") != "1" { t.Skip("set IMAGEN_WORKER_INTEGRATION=1 to run the integration test") } dsn := os.Getenv("IMAGEN_WORKER_DATABASE_URL") if dsn == "" { t.Fatal("IMAGEN_WORKER_DATABASE_URL must be set for the integration test") } if os.Getenv("SUPABASE_URL") == "" || os.Getenv("SUPABASE_SERVICE_KEY") == "" { t.Fatal("SUPABASE_URL and SUPABASE_SERVICE_KEY must be set for the integration test") } owner := os.Getenv("IMAGEN_OWNER_USER_ID") if owner == "" { t.Fatal("IMAGEN_OWNER_USER_ID must be set for the integration test") } ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) defer cancel() q, err := dialQueue(ctx, dsn) if err != nil { t.Fatalf("dialQueue: %v", err) } defer q.Close() // Insert the test job on a separate connection (the worker's conn is // busy LISTENing). Mock backend = no external dependency. insertConn, err := pgx.Connect(ctx, dsn) if err != nil { t.Fatalf("insert conn: %v", err) } defer insertConn.Close(ctx) var jobID string prompt := fmt.Sprintf("imagen integration test %d", time.Now().UnixNano()) err = insertConn.QueryRow(ctx, ` INSERT INTO imagen.jobs (owner_user_id, prompt, backend, width, height) VALUES ($1, $2, 'mock', 64, 64) RETURNING id`, owner, prompt).Scan(&jobID) if err != nil { t.Fatalf("insert job: %v", err) } t.Logf("inserted imagen.jobs id=%s", jobID) // Tidy up at the end of the test so a re-run starts clean. defer func() { cleanup, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, _ = insertConn.Exec(cleanup, `DELETE FROM imagen.jobs WHERE id=$1`, jobID) }() // Use a per-test temp dir so the generated PNG doesn't litter the repo. tmpDir := t.TempDir() cfg := &config.Config{Output: config.OutputConfig{Directory: tmpDir}} p := &workerPipeline{cfg: cfg} w := worker.New(q, p, worker.Config{ PollInterval: 1 * time.Second, JobTimeout: 30 * time.Second, Logger: func(format string, a ...any) { t.Logf("worker: "+format, a...) }, }) // Run the worker until it processes one job (the one we just inserted) // or the test context times out. runCtx, runCancel := context.WithCancel(ctx) done := make(chan struct{}) go func() { _ = w.Run(runCtx) close(done) }() // Poll for completion. deadline := time.Now().Add(60 * time.Second) var status, imageID string for time.Now().Before(deadline) { err = insertConn.QueryRow(ctx, `SELECT status, COALESCE(image_id::text,'') FROM imagen.jobs WHERE id=$1`, jobID).Scan(&status, &imageID) if err != nil { t.Fatalf("poll: %v", err) } if status == "done" || status == "failed" { break } time.Sleep(500 * time.Millisecond) } runCancel() <-done if status != "done" { var errText string _ = insertConn.QueryRow(ctx, `SELECT COALESCE(error,'') FROM imagen.jobs WHERE id=$1`, jobID).Scan(&errText) t.Fatalf("job not done within timeout: status=%q error=%q", status, errText) } if imageID == "" { t.Fatalf("job done but image_id is empty") } t.Logf("job done: image_id=%s", imageID) }