mAi: #3 - Replicate adapter, mai.imagen_usage cost-tracking, usage CLI
Implements the Replicate API backend (FLUX schnell / FLUX dev) per ImaGen issue #3: - internal/backend/replicate.go — Backend adapter. Supports model refs as "owner/name" (uses /v1/models/{owner}/{name}/predictions) and "owner/name:hash" (uses /v1/predictions with explicit version). Polls /v1/predictions/{id} every 500ms with model-aware timeout (60s schnell, 120s dev). Resilience: 401 names api_token_env, 429 with exp backoff up to 3 retries (honours Retry-After), 5xx retries once, image download retries once on transient failure. - internal/backend/replicate_pricing.go — hardcoded per-image USD rates for known FLUX models, snapshotted from replicate.com/pricing with a refresh TODO. - internal/backend/replicate_test.go — mocked-HTTP unit tests covering happy path (model + version-pinned), 401, 429 retry policy, failed prediction, poll timeout, image-download retry, ctx cancel, BackendOpts passthrough, default_steps, aspect-ratio reduction, sha256 prompt hash. - internal/usage/usage.go — Supabase REST sink + read-side query for mai.imagen_usage. Adapter writes are best-effort: failures warn but the image still lands. - cmd/imagen/usage.go — `imagen usage [--since DATE] [--raw]` reads the table and prints a tab-aligned grouped or raw table with totals. - cmd/imagen/backends.go — instances of type=replicate now report "ok" or "not configured (set REPLICATE_API_TOKEN)" depending on env. - internal/config/config.go — sample adds flux-schnell-replicate + flux-dev-replicate; default_backend stays flux-schnell-local. - Supabase migration mai.imagen_usage (id, created_at, backend, model, seed, prompt_hash, latency_ms, cost_usd_estimate, caller) + indexes on (created_at DESC) and (caller). The raw prompt is never stored. Caller identity resolves from MAI_FROM_ID, then the tmux pane's @mai-name option, mirroring the maimcp identity logic. Prompt hash is sha256 of the user-facing prompt; raw prompt never reaches the table.
This commit is contained in:
@@ -10,6 +10,27 @@ import (
|
||||
"mgit.msbls.de/m/ImaGen/internal/config"
|
||||
)
|
||||
|
||||
// instanceStatus checks adapter-specific preconditions (e.g. the
|
||||
// Replicate API token env var being set) and returns a short
|
||||
// user-facing status string.
|
||||
func instanceStatus(spec config.BackendSpec) string {
|
||||
if !backend.Default.Has(spec.Type) {
|
||||
return fmt.Sprintf("type %q not compiled in", spec.Type)
|
||||
}
|
||||
switch spec.Type {
|
||||
case backend.ReplicateType:
|
||||
envName, _ := spec.Raw["api_token_env"].(string)
|
||||
if envName == "" {
|
||||
envName = "REPLICATE_API_TOKEN"
|
||||
}
|
||||
if os.Getenv(envName) == "" {
|
||||
return fmt.Sprintf("not configured (set %s)", envName)
|
||||
}
|
||||
return "ok"
|
||||
}
|
||||
return "registered"
|
||||
}
|
||||
|
||||
func runBackends(args []string) error {
|
||||
fs := flag.NewFlagSet("backends", flag.ContinueOnError)
|
||||
var configPath string
|
||||
@@ -27,10 +48,7 @@ func runBackends(args []string) error {
|
||||
fmt.Fprintln(tw, "INSTANCE\tTYPE\tSTATUS")
|
||||
if cfg != nil {
|
||||
for name, spec := range cfg.Backends {
|
||||
status := "registered"
|
||||
if !backend.Default.Has(spec.Type) {
|
||||
status = fmt.Sprintf("type %q not compiled in", spec.Type)
|
||||
}
|
||||
status := instanceStatus(spec)
|
||||
marker := ""
|
||||
if name == cfg.DefaultBackend {
|
||||
marker = " (default)"
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"mgit.msbls.de/m/ImaGen/internal/output"
|
||||
"mgit.msbls.de/m/ImaGen/internal/preview"
|
||||
"mgit.msbls.de/m/ImaGen/internal/prompt"
|
||||
"mgit.msbls.de/m/ImaGen/internal/usage"
|
||||
)
|
||||
|
||||
func runGenerate(ctx context.Context, args []string) error {
|
||||
@@ -81,6 +82,7 @@ func runGenerate(ctx context.Context, args []string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
attachUsageSink(be)
|
||||
|
||||
finalPrompt, err := prompt.Apply(rawPrompt, style)
|
||||
if err != nil {
|
||||
@@ -219,6 +221,21 @@ func parseSize(s string) (int, int, error) {
|
||||
return w, h, nil
|
||||
}
|
||||
|
||||
// attachUsageSink wires a Supabase cost-tracking sink into the backend
|
||||
// when it accepts one and the env is configured. Adapters that record
|
||||
// usage expose a public Sink field of type backend.UsageSink.
|
||||
func attachUsageSink(be backend.Backend) {
|
||||
r, ok := be.(*backend.Replicate)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
sink, ok := usage.NewSupabaseSinkFromEnv()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
r.Sink = sink
|
||||
}
|
||||
|
||||
func buildBackend(cfg *config.Config, name string) (backend.Backend, error) {
|
||||
if cfg != nil {
|
||||
spec, ok := cfg.Backends[name]
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
_ "mgit.msbls.de/m/ImaGen/internal/backend"
|
||||
)
|
||||
|
||||
const usage = `imagen — model-agnostic image generation
|
||||
const helpText = `imagen — model-agnostic image generation
|
||||
|
||||
Usage:
|
||||
imagen generate <prompt> [flags] generate one image
|
||||
@@ -22,6 +22,7 @@ Usage:
|
||||
imagen config init print a sample imagen.yaml on stdout
|
||||
imagen config validate validate the active config
|
||||
imagen serve [--addr :8080] (stub) start the HTTP server
|
||||
imagen usage [--since DATE] show cost-tracking rows
|
||||
imagen version print version
|
||||
imagen help show this help
|
||||
|
||||
@@ -33,7 +34,7 @@ var Version = "dev"
|
||||
|
||||
func main() {
|
||||
if len(os.Args) < 2 {
|
||||
fmt.Fprint(os.Stderr, usage)
|
||||
fmt.Fprint(os.Stderr, helpText)
|
||||
os.Exit(2)
|
||||
}
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
@@ -50,12 +51,14 @@ func main() {
|
||||
err = runConfig(args)
|
||||
case "serve":
|
||||
err = runServe(args)
|
||||
case "usage":
|
||||
err = runUsage(ctx, args)
|
||||
case "version", "-v", "--version":
|
||||
fmt.Println(Version)
|
||||
case "help", "-h", "--help":
|
||||
fmt.Print(usage)
|
||||
fmt.Print(helpText)
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "imagen: unknown subcommand %q\n\n%s", os.Args[1], usage)
|
||||
fmt.Fprintf(os.Stderr, "imagen: unknown subcommand %q\n\n%s", os.Args[1], helpText)
|
||||
os.Exit(2)
|
||||
}
|
||||
if err != nil {
|
||||
|
||||
189
cmd/imagen/usage.go
Normal file
189
cmd/imagen/usage.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"mgit.msbls.de/m/ImaGen/internal/usage"
|
||||
)
|
||||
|
||||
// runUsage handles `imagen usage [--since DATE]`. Reads mai.imagen_usage
|
||||
// via Supabase REST and prints a tab-aligned table grouped by week +
|
||||
// backend + model + caller, with totals at the bottom.
|
||||
func runUsage(ctx context.Context, args []string) error {
|
||||
fs := flag.NewFlagSet("usage", flag.ContinueOnError)
|
||||
var (
|
||||
since string
|
||||
raw bool
|
||||
)
|
||||
fs.StringVar(&since, "since", "", "ISO date (YYYY-MM-DD) — only rows on/after this UTC date")
|
||||
fs.BoolVar(&raw, "raw", false, "print one line per row instead of grouped")
|
||||
fs.Usage = func() {
|
||||
fmt.Fprintln(fs.Output(), "Usage: imagen usage [--since YYYY-MM-DD] [--raw]")
|
||||
fs.PrintDefaults()
|
||||
}
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var sinceT time.Time
|
||||
if since != "" {
|
||||
t, err := time.Parse("2006-01-02", since)
|
||||
if err != nil {
|
||||
return userErr("--since must be YYYY-MM-DD: %v", err)
|
||||
}
|
||||
sinceT = t
|
||||
}
|
||||
|
||||
sink, ok := usage.NewSupabaseSinkFromEnv()
|
||||
if !ok {
|
||||
return userErr("SUPABASE_URL and SUPABASE_SERVICE_KEY (or MAI_SUPABASE_KEY) must be set to read mai.imagen_usage")
|
||||
}
|
||||
rows, err := sink.Query(ctx, sinceT)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if raw {
|
||||
printRawRows(rows)
|
||||
return nil
|
||||
}
|
||||
printGroupedRows(rows)
|
||||
return nil
|
||||
}
|
||||
|
||||
func printRawRows(rows []usage.Row) {
|
||||
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
||||
fmt.Fprintln(tw, "TIME\tBACKEND\tMODEL\tCALLER\tLATENCY_MS\tCOST_USD")
|
||||
var totalCost float64
|
||||
for _, r := range rows {
|
||||
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\n",
|
||||
r.CreatedAt.Local().Format("2006-01-02 15:04"),
|
||||
r.Backend,
|
||||
r.Model,
|
||||
derefString(r.Caller),
|
||||
intOrDash(r.LatencyMs),
|
||||
costOrDash(r.CostUSDEstimate),
|
||||
)
|
||||
if r.CostUSDEstimate != nil {
|
||||
totalCost += *r.CostUSDEstimate
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(tw, "\t\t\t\t%d rows\t%.4f USD\n", len(rows), totalCost)
|
||||
_ = tw.Flush()
|
||||
}
|
||||
|
||||
type group struct {
|
||||
week string
|
||||
backend string
|
||||
model string
|
||||
caller string
|
||||
count int
|
||||
cost float64
|
||||
costSet bool
|
||||
}
|
||||
|
||||
type groupKey struct {
|
||||
week, backend, model, caller string
|
||||
}
|
||||
|
||||
func printGroupedRows(rows []usage.Row) {
|
||||
groups := map[groupKey]*group{}
|
||||
for _, r := range rows {
|
||||
caller := derefString(r.Caller)
|
||||
k := groupKey{
|
||||
week: weekStart(r.CreatedAt).Format("2006-01-02"),
|
||||
backend: r.Backend,
|
||||
model: r.Model,
|
||||
caller: caller,
|
||||
}
|
||||
g, ok := groups[k]
|
||||
if !ok {
|
||||
g = &group{week: k.week, backend: r.Backend, model: r.Model, caller: caller}
|
||||
groups[k] = g
|
||||
}
|
||||
g.count++
|
||||
if r.CostUSDEstimate != nil {
|
||||
g.cost += *r.CostUSDEstimate
|
||||
g.costSet = true
|
||||
}
|
||||
}
|
||||
|
||||
keys := make([]groupKey, 0, len(groups))
|
||||
for k := range groups {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Slice(keys, func(i, j int) bool {
|
||||
if keys[i].week != keys[j].week {
|
||||
return keys[i].week > keys[j].week // newest first
|
||||
}
|
||||
if keys[i].backend != keys[j].backend {
|
||||
return keys[i].backend < keys[j].backend
|
||||
}
|
||||
if keys[i].model != keys[j].model {
|
||||
return keys[i].model < keys[j].model
|
||||
}
|
||||
return keys[i].caller < keys[j].caller
|
||||
})
|
||||
|
||||
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
||||
fmt.Fprintln(tw, "WEEK_OF\tBACKEND\tMODEL\tCALLER\tCOUNT\tCOST_USD")
|
||||
var totalCount int
|
||||
var totalCost float64
|
||||
for _, k := range keys {
|
||||
g := groups[k]
|
||||
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%d\t%s\n",
|
||||
g.week, g.backend, g.model, g.caller, g.count, costStr(g.cost, g.costSet),
|
||||
)
|
||||
totalCount += g.count
|
||||
totalCost += g.cost
|
||||
}
|
||||
fmt.Fprintf(tw, "\t\t\tTOTAL\t%d\t%.4f USD\n", totalCount, totalCost)
|
||||
_ = tw.Flush()
|
||||
}
|
||||
|
||||
// weekStart returns the Monday of the week containing t (UTC).
|
||||
func weekStart(t time.Time) time.Time {
|
||||
t = t.UTC()
|
||||
wd := int(t.Weekday())
|
||||
if wd == 0 {
|
||||
wd = 7 // shift Sunday to end-of-week
|
||||
}
|
||||
delta := time.Duration(wd-1) * -24 * time.Hour
|
||||
d := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
|
||||
return d.Add(delta)
|
||||
}
|
||||
|
||||
func derefString(s *string) string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return *s
|
||||
}
|
||||
|
||||
func intOrDash(p *int) string {
|
||||
if p == nil {
|
||||
return "-"
|
||||
}
|
||||
return fmt.Sprintf("%d", *p)
|
||||
}
|
||||
|
||||
func costOrDash(p *float64) string {
|
||||
if p == nil {
|
||||
return "-"
|
||||
}
|
||||
return fmt.Sprintf("%.4f", *p)
|
||||
}
|
||||
|
||||
func costStr(v float64, set bool) string {
|
||||
if !set {
|
||||
return "-"
|
||||
}
|
||||
return strings.TrimSpace(fmt.Sprintf("%.4f", v))
|
||||
}
|
||||
Reference in New Issue
Block a user