Async write path for the flexsiebels owner-mode UI: flexsiebels INSERTs into imagen.jobs, the worker on mRiver claims pending rows via LISTEN/NOTIFY + 5s safety poll, runs the same generate pipeline imagen generate uses, and writes the result through internal/cloud into imagen.images. - Schema migration imagen_jobs_init: table + status CHECK + two indexes + owner-scoped RLS + grants + AFTER INSERT trigger publishing on the imagen_jobs channel via pg_notify. - internal/worker: DB-agnostic loop over a Queue interface. Drains the whole pending backlog on each wake. Job-scoped contexts are derived from Background so SIGTERM lets the in-flight generation finish (no half-state). ResetStaleRunning at startup unsticks rows left over from a previous crash. Eight unit tests cover the done / failed / missing-id / drain / NOTIFY-wake / shutdown / transient-error paths against a fake queue (no real Postgres in CI). - cmd/imagen/worker.go: pgx-backed Queue (one dedicated conn for LISTEN + UPDATE), plus the workerPipeline that reuses buildBackend + attachUsageSink + prompt.Apply + buildWriter + maybeCloudSync. The per-job owner_user_id overrides the env-level fallback so each row in imagen.images is attributed correctly. - maybeCloudSync now returns (*cloud.SyncResult, error) so the worker can link imagen.jobs.image_id to the inserted imagen.images row. The CLI generate path keeps printing its stderr summary unchanged. - scripts/imagen-worker.service + .env.example for the systemd --user unit on mRiver. EnvironmentFile lives in ~/.dotfiles and is never committed. - docs/setup-worker-mriver.md walks through installation + the spec's SQL-INSERT smoke; docs/architecture.md grows an "async write path" section. - worker_integration_test.go (env-guarded by IMAGEN_WORKER_INTEGRATION=1) drives one real job through the full pipeline against msupabase using the mock backend, then verifies imagen.images + Storage object landed and the row flipped to done with image_id linked. Verified end-to-end: pickup latency ~7ms, total 74ms, failure path captures error text.
98 lines
3.0 KiB
Markdown
98 lines
3.0 KiB
Markdown
# `imagen worker` on mRiver
|
|
|
|
The worker is a long-running daemon that consumes the `imagen.jobs` queue
|
|
(written by flexsiebels' owner-mode UI) and writes the resulting image to
|
|
Supabase Storage + `imagen.images` via the same cloud-sync path the CLI
|
|
`imagen generate` uses.
|
|
|
|
## Architecture
|
|
|
|
```
|
|
flexsiebels (owner UI)
|
|
|
|
|
v INSERT INTO imagen.jobs (...)
|
|
|
|
|
msupabase Postgres
|
|
|
|
|
| AFTER INSERT trigger:
|
|
| pg_notify('imagen_jobs', NEW.id)
|
|
v
|
|
imagen worker (mRiver) ── LISTEN imagen_jobs
|
|
|
|
|
| 1. claim oldest 'pending' row (status='running')
|
|
| 2. dispatch to backend (FLUX schnell local / FLUX dev replicate / …)
|
|
| 3. write PNG to disk
|
|
| 4. upload to Storage + INSERT into imagen.images
|
|
| 5. UPDATE imagen.jobs SET status='done', image_id=...
|
|
v
|
|
flexsiebels polls GET .../jobs/<id> → renders the rendered card
|
|
```
|
|
|
|
A 5-second safety poll covers dropped NOTIFY events and worker cold starts
|
|
with a non-empty queue.
|
|
|
|
## One-time setup
|
|
|
|
```bash
|
|
# 1. Build the binary (or `task build`).
|
|
cd ~/dev/ImaGen
|
|
go build -o bin/imagen ./cmd/imagen
|
|
|
|
# 2. Write the environment file.
|
|
cp scripts/imagen-worker.env.example ~/.dotfiles/.env.imagen-worker
|
|
chmod 600 ~/.dotfiles/.env.imagen-worker
|
|
$EDITOR ~/.dotfiles/.env.imagen-worker # fill in real DSN, service key
|
|
|
|
# 3. Install the user systemd unit.
|
|
mkdir -p ~/.config/systemd/user
|
|
cp scripts/imagen-worker.service ~/.config/systemd/user/imagen-worker.service
|
|
systemctl --user daemon-reload
|
|
systemctl --user enable --now imagen-worker.service
|
|
|
|
# 4. Tail the logs.
|
|
journalctl --user -u imagen-worker -f
|
|
```
|
|
|
|
## Required env vars
|
|
|
|
See `scripts/imagen-worker.env.example` for the canonical list. Required:
|
|
|
|
- `IMAGEN_WORKER_DATABASE_URL` — direct Postgres DSN. PostgREST cannot LISTEN.
|
|
- `SUPABASE_URL`, `SUPABASE_SERVICE_KEY` — same pair `imagen generate`
|
|
reads for the cloud-sync writer.
|
|
- `IMAGEN_OWNER_USER_ID` — fallback owner UUID; per-job row's
|
|
`owner_user_id` overrides this.
|
|
|
|
Optional, depending on enabled backends:
|
|
|
|
- `REPLICATE_API_TOKEN` if any job will request a Replicate-typed backend.
|
|
|
|
## Operating
|
|
|
|
```bash
|
|
systemctl --user status imagen-worker # health
|
|
systemctl --user restart imagen-worker # pick up a new binary
|
|
journalctl --user -u imagen-worker -n 200 # recent log lines
|
|
```
|
|
|
|
On startup the worker calls `ResetStaleRunning` once, flipping any rows
|
|
left in `'running'` from a previous crash back to `'pending'` so they get
|
|
re-claimed by the 5-second poll.
|
|
|
|
## Smoke test
|
|
|
|
With the worker running, INSERT a test job:
|
|
|
|
```sql
|
|
INSERT INTO imagen.jobs (owner_user_id, prompt, backend, width, height)
|
|
VALUES (
|
|
'ac6c9501-3757-4a6d-8b97-2cff4288382b',
|
|
'a tiny owl wearing wire-rim glasses, photo',
|
|
'flux-schnell-local', 1024, 1024
|
|
);
|
|
```
|
|
|
|
Within ~10 seconds the row should show `status='done'`, a populated
|
|
`image_id` linking to a real `imagen.images` row, and a Storage object at
|
|
`<YYYY-MM-DD>/<slug>-<seed>.png` in the `imagen-generated` bucket.
|