Files
ImaGen/docs/architecture.md
mAi 2758c5a500 mAi: #8 - imagen.jobs queue + worker subcommand (flexsiebels write path)
Async write path for the flexsiebels owner-mode UI: flexsiebels INSERTs into
imagen.jobs, the worker on mRiver claims pending rows via LISTEN/NOTIFY +
5s safety poll, runs the same generate pipeline imagen generate uses, and
writes the result through internal/cloud into imagen.images.

- Schema migration imagen_jobs_init: table + status CHECK + two indexes +
  owner-scoped RLS + grants + AFTER INSERT trigger publishing on the
  imagen_jobs channel via pg_notify.
- internal/worker: DB-agnostic loop over a Queue interface. Drains the
  whole pending backlog on each wake. Job-scoped contexts are derived
  from Background so SIGTERM lets the in-flight generation finish (no
  half-state). ResetStaleRunning at startup unsticks rows left over from
  a previous crash. Eight unit tests cover the done / failed / missing-id /
  drain / NOTIFY-wake / shutdown / transient-error paths against a fake
  queue (no real Postgres in CI).
- cmd/imagen/worker.go: pgx-backed Queue (one dedicated conn for LISTEN +
  UPDATE), plus the workerPipeline that reuses buildBackend +
  attachUsageSink + prompt.Apply + buildWriter + maybeCloudSync. The
  per-job owner_user_id overrides the env-level fallback so each row in
  imagen.images is attributed correctly.
- maybeCloudSync now returns (*cloud.SyncResult, error) so the worker can
  link imagen.jobs.image_id to the inserted imagen.images row. The CLI
  generate path keeps printing its stderr summary unchanged.
- scripts/imagen-worker.service + .env.example for the systemd --user unit
  on mRiver. EnvironmentFile lives in ~/.dotfiles and is never committed.
- docs/setup-worker-mriver.md walks through installation + the spec's
  SQL-INSERT smoke; docs/architecture.md grows an "async write path"
  section.
- worker_integration_test.go (env-guarded by IMAGEN_WORKER_INTEGRATION=1)
  drives one real job through the full pipeline against msupabase using
  the mock backend, then verifies imagen.images + Storage object landed
  and the row flipped to done with image_id linked. Verified end-to-end:
  pickup latency ~7ms, total 74ms, failure path captures error text.
2026-05-11 10:23:33 +02:00

5.6 KiB

ImaGen architecture

ImaGen is intentionally small. The framework owns plumbing; adapters own the upstream API. Each adapter only ever sees its own slice of imagen.yaml.

Layers

        ┌───────────────────────┐
        │   cmd/imagen          │   CLI dispatch (generate / worker / …)
        │   (or HTTP server)    │
        └──────────┬────────────┘
                   │
        ┌──────────▼────────────┐
        │   internal/prompt     │   style preset → prompt suffix
        │   internal/output     │   filename templating, sidecar
        │   internal/config     │   YAML loader, validation
        │   internal/preview    │   tmux-img window spawner
        │   internal/cloud      │   Supabase Storage + imagen.images
        │   internal/usage      │   mai.imagen_usage cost-tracking
        │   internal/worker     │   imagen.jobs queue consumer
        └──────────┬────────────┘
                   │
        ┌──────────▼────────────┐
        │   internal/backend    │   Backend interface + Registry
        └──────────┬────────────┘
                   │
        ┌──────────▼────────────┐
        │   adapters            │   ComfyUI · Replicate · OpenAI · …
        │   (each one register- │   each registers a `type` name on
        │    s in init())       │   `backend.Default` at init time.
        └───────────────────────┘

The Backend contract

type Request struct {
    Prompt         string
    NegativePrompt string
    Width, Height  int
    Steps          int
    Seed           int64
    Style          string
    BackendOpts    map[string]any
}

type Result struct {
    ImageReader io.ReadCloser
    MimeType    string
    Metadata    map[string]any
}

type Backend interface {
    Name() string
    Generate(ctx context.Context, req Request) (*Result, error)
}

Adapters translate Request into whatever the upstream expects. Fields they can't honour (e.g. NegativePrompt on DALL-E) are silently ignored.

Registry

backend.Default holds the process-wide name → constructor map. Each adapter calls backend.Register("<type>", NewX) from its init(). The CLI imports internal/backend (which transitively triggers the mock's init) and any extra adapter packages.

Config flow

imagen.yaml
  backends:
    flux-schnell-local:
      type: comfyui                  ──┐
      base_url: http://mrock:8188      │  framework keeps `type`,
      model: flux1-schnell.safetensors │  hands the rest to the
      default_steps: 4                 │  comfyui adapter as cfg map[string]any
                                     ──┘

The framework never inspects fields below type. That's the adapter's contract with itself, expressed however the adapter wants (typed struct, map lookups, JSON tags — its call).

Output

output:
  directory: ~/Pictures/imagen
  naming: "{date}-{slug}-{seed}.png"
  write_metadata_json: true

Placeholders: {date}, {time}, {slug} (lowercased prompt, alnum-only, truncated to 40 chars), {seed}, {backend}, {ext}. The sidecar JSON contains the prompt, backend instance name, seed, ISO timestamp, and the Result.Metadata map verbatim.

Where adapters fail fast

  • Missing required field in their config block — return an error from the constructor; the CLI surfaces it as imagen: backend "X": <err>.
  • Unset env-var for credentials — same.
  • Network errors during Generate — wrap and return; no retry policy yet (decide per-adapter, or move to a shared retry helper if a pattern emerges).

Async write path: imagen worker + imagen.jobs

imagen generate is the synchronous CLI. For web callers (flexsiebels' owner-mode UI) cmd/imagen worker runs as a daemon that consumes the imagen.jobs table.

flexsiebels POST          imagen worker (mRiver, systemd)
  → INSERT INTO              LISTEN imagen_jobs  ◄── pg_notify trigger
    imagen.jobs(pending)     claim row (UPDATE … RETURNING)
                             dispatch through internal/backend
                             write disk + cloud-sync via internal/cloud
                             UPDATE imagen.jobs SET status='done', image_id=…

The queue table lives next to imagen.images in the same imagen schema. Owner-scoped RLS lets the flexsiebels user INSERT + read their own rows; the worker writes (status updates + image_id link) via service-role which bypasses RLS. A 5-second safety poll fires on every wake-up to cover dropped NOTIFY events and worker cold starts with a non-empty queue. See docs/setup-worker-mriver.md for the systemd installation.

The worker reuses internal/backend, internal/output, and internal/cloud unchanged — it is purely an orchestration layer around the same pipeline imagen generate drives.

Out of scope (today)

  • Image post-processing (cropping, watermarking).
  • Multi-image n>1 per request — backends that support it can expose it via BackendOpts; the framework doesn't have a first-class field yet.
  • Job cancellation / kill switch — separate follow-up issue.
  • Concurrent workers / multi-host scale-out — FOR UPDATE SKIP LOCKED in the claim query makes it cheap to add, but a single worker is the v1 setup.