Implements the Replicate API backend (FLUX schnell / FLUX dev) per ImaGen issue #3: - internal/backend/replicate.go — Backend adapter. Supports model refs as "owner/name" (uses /v1/models/{owner}/{name}/predictions) and "owner/name:hash" (uses /v1/predictions with explicit version). Polls /v1/predictions/{id} every 500ms with model-aware timeout (60s schnell, 120s dev). Resilience: 401 names api_token_env, 429 with exp backoff up to 3 retries (honours Retry-After), 5xx retries once, image download retries once on transient failure. - internal/backend/replicate_pricing.go — hardcoded per-image USD rates for known FLUX models, snapshotted from replicate.com/pricing with a refresh TODO. - internal/backend/replicate_test.go — mocked-HTTP unit tests covering happy path (model + version-pinned), 401, 429 retry policy, failed prediction, poll timeout, image-download retry, ctx cancel, BackendOpts passthrough, default_steps, aspect-ratio reduction, sha256 prompt hash. - internal/usage/usage.go — Supabase REST sink + read-side query for mai.imagen_usage. Adapter writes are best-effort: failures warn but the image still lands. - cmd/imagen/usage.go — `imagen usage [--since DATE] [--raw]` reads the table and prints a tab-aligned grouped or raw table with totals. - cmd/imagen/backends.go — instances of type=replicate now report "ok" or "not configured (set REPLICATE_API_TOKEN)" depending on env. - internal/config/config.go — sample adds flux-schnell-replicate + flux-dev-replicate; default_backend stays flux-schnell-local. - Supabase migration mai.imagen_usage (id, created_at, backend, model, seed, prompt_hash, latency_ms, cost_usd_estimate, caller) + indexes on (created_at DESC) and (caller). The raw prompt is never stored. Caller identity resolves from MAI_FROM_ID, then the tmux pane's @mai-name option, mirroring the maimcp identity logic. Prompt hash is sha256 of the user-facing prompt; raw prompt never reaches the table.
161 lines
4.9 KiB
Go
161 lines
4.9 KiB
Go
// Package usage records per-call cost-tracking rows for the imagen CLI
|
|
// to mai.imagen_usage on Supabase. The writer is best-effort by design —
|
|
// the calling adapter logs failures and proceeds, because the image
|
|
// itself has already landed on disk by the time we record.
|
|
package usage
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"mgit.msbls.de/m/ImaGen/internal/backend"
|
|
)
|
|
|
|
// Default REST schema is the mai schema where mai.imagen_usage lives.
|
|
const supabaseSchema = "mai"
|
|
|
|
// SupabaseSink writes rows via PostgREST. It uses Accept-Profile/
|
|
// Content-Profile headers to target the mai schema instead of public.
|
|
type SupabaseSink struct {
|
|
URL string // SUPABASE_URL — e.g. https://msup.msbls.de
|
|
APIKey string // SUPABASE_SERVICE_KEY
|
|
HTTP *http.Client
|
|
}
|
|
|
|
// NewSupabaseSinkFromEnv reads SUPABASE_URL and SUPABASE_SERVICE_KEY
|
|
// (falling back to MAI_SUPABASE_KEY) and returns a sink ready to use.
|
|
// Returns nil + ok=false if the env vars are not configured — the CLI
|
|
// uses that to skip cost-tracking gracefully.
|
|
func NewSupabaseSinkFromEnv() (*SupabaseSink, bool) {
|
|
u := strings.TrimRight(os.Getenv("SUPABASE_URL"), "/")
|
|
if u == "" {
|
|
return nil, false
|
|
}
|
|
key := os.Getenv("SUPABASE_SERVICE_KEY")
|
|
if key == "" {
|
|
key = os.Getenv("MAI_SUPABASE_KEY")
|
|
}
|
|
if key == "" {
|
|
return nil, false
|
|
}
|
|
return &SupabaseSink{
|
|
URL: u,
|
|
APIKey: key,
|
|
HTTP: &http.Client{Timeout: 10 * time.Second},
|
|
}, true
|
|
}
|
|
|
|
type supabaseRow struct {
|
|
Backend string `json:"backend"`
|
|
Model string `json:"model"`
|
|
Seed *int64 `json:"seed,omitempty"`
|
|
PromptHash string `json:"prompt_hash"`
|
|
LatencyMs int `json:"latency_ms"`
|
|
CostUSDEstimate *float64 `json:"cost_usd_estimate,omitempty"`
|
|
Caller string `json:"caller,omitempty"`
|
|
}
|
|
|
|
// Record inserts one row into mai.imagen_usage.
|
|
func (s *SupabaseSink) Record(ctx context.Context, row backend.UsageRow) error {
|
|
body, err := json.Marshal(supabaseRow{
|
|
Backend: row.Backend,
|
|
Model: row.Model,
|
|
Seed: row.Seed,
|
|
PromptHash: row.PromptHash,
|
|
LatencyMs: row.LatencyMs,
|
|
CostUSDEstimate: row.CostUSDEstimate,
|
|
Caller: row.Caller,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("usage: marshal: %w", err)
|
|
}
|
|
|
|
endpoint := s.URL + "/rest/v1/imagen_usage"
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("apikey", s.APIKey)
|
|
req.Header.Set("Authorization", "Bearer "+s.APIKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Accept-Profile", supabaseSchema)
|
|
req.Header.Set("Content-Profile", supabaseSchema)
|
|
req.Header.Set("Prefer", "return=minimal")
|
|
|
|
resp, err := s.HTTP.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("usage: POST: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
respBody, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("usage: POST %d: %s", resp.StatusCode, snip(respBody))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Row is the read-side row shape (only the fields the CLI needs).
|
|
type Row struct {
|
|
CreatedAt time.Time `json:"created_at"`
|
|
Backend string `json:"backend"`
|
|
Model string `json:"model"`
|
|
Seed *int64 `json:"seed"`
|
|
PromptHash string `json:"prompt_hash"`
|
|
LatencyMs *int `json:"latency_ms"`
|
|
CostUSDEstimate *float64 `json:"cost_usd_estimate"`
|
|
Caller *string `json:"caller"`
|
|
}
|
|
|
|
// Query returns rows from mai.imagen_usage filtered by created_at >= since.
|
|
// Pass zero time to fetch the full table (capped server-side by PostgREST
|
|
// — we set a hard 5000-row limit here too).
|
|
func (s *SupabaseSink) Query(ctx context.Context, since time.Time) ([]Row, error) {
|
|
q := url.Values{}
|
|
q.Set("select", "created_at,backend,model,seed,prompt_hash,latency_ms,cost_usd_estimate,caller")
|
|
q.Set("order", "created_at.desc")
|
|
q.Set("limit", "5000")
|
|
if !since.IsZero() {
|
|
q.Set("created_at", "gte."+since.UTC().Format(time.RFC3339))
|
|
}
|
|
endpoint := s.URL + "/rest/v1/imagen_usage?" + q.Encode()
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("apikey", s.APIKey)
|
|
req.Header.Set("Authorization", "Bearer "+s.APIKey)
|
|
req.Header.Set("Accept-Profile", supabaseSchema)
|
|
|
|
resp, err := s.HTTP.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("usage: GET: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(resp.Body)
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return nil, fmt.Errorf("usage: GET %d: %s", resp.StatusCode, snip(body))
|
|
}
|
|
var rows []Row
|
|
if err := json.Unmarshal(body, &rows); err != nil {
|
|
return nil, fmt.Errorf("usage: parse rows: %w (body: %s)", err, snip(body))
|
|
}
|
|
return rows, nil
|
|
}
|
|
|
|
func snip(b []byte) string {
|
|
const max = 500
|
|
s := strings.TrimSpace(string(b))
|
|
if len(s) > max {
|
|
s = s[:max] + "..."
|
|
}
|
|
return s
|
|
}
|