Live deploy on mRock surfaced a Schritt 5 bug: comfyui was always
treated as preloaded at scheduler startup, which made ensureFits()
short-circuit on the very first /v1/image request — exactly the
scenario eviction is supposed to handle. mvoice was never picked as
a victim, ComfyUI then OOM'd loading FLUX on top of the still-resident
mvoice.
Fix: replace the blanket 'every consumer starts loaded' init with a
heuristic — initialLoaded(cons):
- VRAMManaged (ollama): true. We never track/evict it; the consumer
runs its own LRU.
- Load+Unload both present (mvoice): true. Designed to be controllable;
typically preloads in its own lifespan.
- Unload only, no Load (comfyui): false. Lazy — FLUX isn't resident
until the first /prompt, so we shouldn't bill its 13 GiB against the
GPU budget until then.
- SystemdUnit only (whisper-server): true. Always-on, model loaded at
process start.
- Empty: true. Safe fallback.
Verified live on mRock (2026-05-15):
Before /v1/image: nvidia-smi 8963 MiB used; mvoice gpu_resident_mib 2345
POST /v1/image: HTTP 400 from upstream (empty workflow), broker did
trigger eviction before forwarding
After: nvidia-smi 6547 MiB used; mvoice gpu_resident_mib 9
(~CUDA context only); scheduler.evictions = 2
POST /v1/tts: audio_url returned, tts_ms 670, audio 3.5 s
After reload: nvidia-smi 8943 MiB used; mvoice gpu_resident_mib 2917
Test: TestInitialLoadedHeuristic pins the four cases down so this
doesn't regress when someone adds a fifth consumer type.
Refs: m/mGPUmanager#1 (live deploy).
355 lines
11 KiB
Go
355 lines
11 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"mgit.msbls.de/m/mGPUmanager/internal/config"
|
|
"mgit.msbls.de/m/mGPUmanager/internal/gpu"
|
|
"mgit.msbls.de/m/mGPUmanager/internal/registry"
|
|
)
|
|
|
|
// vramCushionMiB is the minimum free VRAM the scheduler insists on having
|
|
// AFTER the target consumer is loaded. Keeps cudaMalloc headers from OOM-ing
|
|
// at the very edge of available memory.
|
|
const vramCushionMiB = 256
|
|
|
|
// maxEvictAttempts caps how many consumers the scheduler will unload in a
|
|
// single ensureFits cycle before giving up and returning an error. Five is
|
|
// generous — we only have four consumers configured.
|
|
const maxEvictAttempts = 5
|
|
|
|
// Evicting is the Schritt 5 scheduler: it wraps a Locked scheduler with
|
|
// VRAM-pressure-aware eviction.
|
|
//
|
|
// Flow per job:
|
|
// 1. ensureFits — if the live free VRAM minus a 256 MiB cushion is below
|
|
// the target consumer's vram_resident_mib AND the target is not already
|
|
// resident, unload the LRU non-coexistent consumer. Repeat until fit.
|
|
// 2. ensureLoaded — if the target was previously unloaded, call its
|
|
// load endpoint (mvoice) or rely on implicit cold-start (whisper, etc.).
|
|
// 3. inner.Run — acquire the global GPU lock and run the job.
|
|
//
|
|
// Eviction state is scheduler-local: registry.Loaded (polled every 5 s) is
|
|
// authoritative when the consumer reports it, but for the seconds between an
|
|
// unload call and the next probe we rely on our own bookkeeping.
|
|
type Evicting struct {
|
|
cfg *config.Config
|
|
reg *registry.Registry
|
|
gpu *gpu.Poller
|
|
inner *Locked
|
|
logger *slog.Logger
|
|
client *http.Client
|
|
|
|
mu sync.Mutex
|
|
loaded map[string]bool // consumer name -> believed-resident
|
|
lastUsed map[string]time.Time
|
|
evictions int64
|
|
}
|
|
|
|
// NewEvicting builds the Schritt 5 scheduler. All consumers are assumed
|
|
// resident at startup — the first health probe will correct any consumers
|
|
// that actually aren't (e.g. mvoice in 'unloaded' state).
|
|
func NewEvicting(cfg *config.Config, reg *registry.Registry, gpuPoller *gpu.Poller, logger *slog.Logger) *Evicting {
|
|
e := &Evicting{
|
|
cfg: cfg,
|
|
reg: reg,
|
|
gpu: gpuPoller,
|
|
inner: NewLocked(reg, 1),
|
|
logger: logger,
|
|
client: &http.Client{Timeout: 30 * time.Second},
|
|
loaded: make(map[string]bool, len(cfg.Consumers)),
|
|
lastUsed: make(map[string]time.Time, len(cfg.Consumers)),
|
|
}
|
|
for name, cons := range cfg.Consumers {
|
|
e.loaded[name] = initialLoaded(cons)
|
|
}
|
|
return e
|
|
}
|
|
|
|
// initialLoaded picks the believed-loaded state for a consumer at scheduler
|
|
// startup. The rule:
|
|
//
|
|
// - VRAM-managed (ollama): true — we never track or evict it.
|
|
// - Has a load route AND an unload route (mvoice): true — the consumer
|
|
// is set up to be controllable in both directions, and typically
|
|
// preloads on its own systemd-managed startup.
|
|
// - Has only an unload route, no load route (comfyui): false — lazy.
|
|
// FLUX isn't resident until the first /prompt; until that happens we
|
|
// don't account for its VRAM cost.
|
|
// - Has a systemd_unit but no HTTP routes (whisper-server): true — these
|
|
// are always-on services that load their model at process start.
|
|
// - Neither: true — fallback, assume it's there if the consumer is up.
|
|
//
|
|
// Getting this right matters for the eviction smoke test: if comfyui were
|
|
// believed loaded at startup, ensureFits would short-circuit on the first
|
|
// /v1/image request and never trigger eviction. (m/mGPUmanager#1 live deploy.)
|
|
func initialLoaded(cons *config.Consumer) bool {
|
|
if cons.VRAMManaged {
|
|
return true
|
|
}
|
|
if cons.Load == nil && cons.Unload != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Run is the public Scheduler interface: ensure room + load + serialise.
|
|
func (e *Evicting) Run(ctx context.Context, consumer string, fn Job) error {
|
|
if err := e.ensureFits(ctx, consumer); err != nil {
|
|
return fmt.Errorf("eviction: %w", err)
|
|
}
|
|
if err := e.ensureLoaded(ctx, consumer); err != nil {
|
|
return fmt.Errorf("load %s: %w", consumer, err)
|
|
}
|
|
err := e.inner.Run(ctx, consumer, fn)
|
|
if err == nil {
|
|
e.mu.Lock()
|
|
e.lastUsed[consumer] = time.Now()
|
|
e.mu.Unlock()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Stats forwards from the inner scheduler and adds the eviction counter.
|
|
func (e *Evicting) Stats() Stats {
|
|
s := e.inner.Stats()
|
|
s.Evictions = atomic.LoadInt64(&e.evictions)
|
|
return s
|
|
}
|
|
|
|
// ───── ensureFits ────────────────────────────────────────────────────────
|
|
|
|
func (e *Evicting) ensureFits(ctx context.Context, target string) error {
|
|
cons := e.cfg.Consumers[target]
|
|
if cons == nil {
|
|
return fmt.Errorf("unknown consumer %q", target)
|
|
}
|
|
if cons.VRAMResidentMiB == 0 || cons.VRAMManaged {
|
|
// Self-managed (ollama) or unknown size — let the consumer figure
|
|
// it out; no preemptive eviction.
|
|
return nil
|
|
}
|
|
// Already resident? No eviction needed.
|
|
e.mu.Lock()
|
|
resident := e.loaded[target]
|
|
e.mu.Unlock()
|
|
if resident {
|
|
return nil
|
|
}
|
|
|
|
for range maxEvictAttempts {
|
|
if e.fits(cons) {
|
|
return nil
|
|
}
|
|
victim := e.pickLRUVictim(target, cons)
|
|
if victim == "" {
|
|
// Nothing left to evict that we're allowed to touch.
|
|
e.logger.Warn("no eviction candidates", "target", target,
|
|
"need_mib", cons.VRAMResidentMiB,
|
|
"free_mib", e.gpu.Last().FreeMiB)
|
|
return nil
|
|
}
|
|
if err := e.unload(ctx, victim); err != nil {
|
|
e.logger.Warn("evict failed", "victim", victim, "err", err)
|
|
return fmt.Errorf("unload %s: %w", victim, err)
|
|
}
|
|
atomic.AddInt64(&e.evictions, 1)
|
|
e.logger.Info("evicted consumer",
|
|
"victim", victim, "target", target,
|
|
"free_mib_after", e.gpu.Last().FreeMiB,
|
|
"need_mib", cons.VRAMResidentMiB)
|
|
// Give the GPU a moment to actually free the VRAM before re-checking.
|
|
select {
|
|
case <-time.After(1 * time.Second):
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
return fmt.Errorf("VRAM headroom still insufficient after %d evictions", maxEvictAttempts)
|
|
}
|
|
|
|
// fits returns true when the live nvidia-smi free VRAM minus the safety
|
|
// cushion is enough for the target consumer's predicted footprint.
|
|
//
|
|
// Falls back to the static budget (cfg.GPU.AvailableMiB() minus the
|
|
// non-coexistent loaded set) if the GPU poller has not produced a sample
|
|
// yet (e.g. during the first second of process lifetime).
|
|
func (e *Evicting) fits(cons *config.Consumer) bool {
|
|
sample := e.gpu.Last()
|
|
if sample.FreeMiB > 0 || sample.TotalMiB > 0 {
|
|
return sample.FreeMiB >= cons.VRAMResidentMiB+vramCushionMiB
|
|
}
|
|
return e.fitsByBudget(cons)
|
|
}
|
|
|
|
func (e *Evicting) fitsByBudget(cons *config.Consumer) bool {
|
|
headroom := e.cfg.GPU.AvailableMiB()
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
for name, loaded := range e.loaded {
|
|
if !loaded {
|
|
continue
|
|
}
|
|
other := e.cfg.Consumers[name]
|
|
if other == nil || other.VRAMManaged {
|
|
continue
|
|
}
|
|
if slices.Contains(cons.CanCoexistWith, name) {
|
|
continue
|
|
}
|
|
headroom -= other.VRAMResidentMiB
|
|
}
|
|
return headroom >= cons.VRAMResidentMiB
|
|
}
|
|
|
|
// pickLRUVictim returns the name of the loaded consumer with the oldest
|
|
// LastUsed that is NOT in target's can_coexist_with list, NOT the target
|
|
// itself, NOT VRAM-managed, and has *some* way to be evicted.
|
|
func (e *Evicting) pickLRUVictim(target string, cons *config.Consumer) string {
|
|
snap := e.reg.Snapshot()
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
var best string
|
|
var bestTime time.Time
|
|
for name, loaded := range e.loaded {
|
|
if !loaded || name == target {
|
|
continue
|
|
}
|
|
other := e.cfg.Consumers[name]
|
|
if other == nil || other.VRAMManaged {
|
|
continue
|
|
}
|
|
if slices.Contains(cons.CanCoexistWith, name) {
|
|
continue
|
|
}
|
|
if other.Unload == nil && other.SystemdUnit == "" {
|
|
continue
|
|
}
|
|
// LastUsed: prefer scheduler-local (set on successful job exit) over
|
|
// registry (set on probe completion). Scheduler-local is more
|
|
// meaningful for LRU because it reflects real GPU work, not health
|
|
// chatter.
|
|
t := e.lastUsed[name]
|
|
if t.IsZero() {
|
|
t = snap[name].LastUsed
|
|
}
|
|
if best == "" || t.Before(bestTime) {
|
|
best = name
|
|
bestTime = t
|
|
}
|
|
}
|
|
return best
|
|
}
|
|
|
|
// ───── unload + load ─────────────────────────────────────────────────────
|
|
|
|
func (e *Evicting) unload(ctx context.Context, name string) error {
|
|
cons := e.cfg.Consumers[name]
|
|
if cons.Unload == nil {
|
|
// systemd-unit-based unload is whisper-server's path; we don't shell
|
|
// out to sudo from a server daemon in Phase 1. Mark unloaded so we
|
|
// don't keep picking it as a victim, and let the next request
|
|
// cold-start via systemd (whisper-server boots in <2 s).
|
|
if cons.SystemdUnit != "" {
|
|
e.mu.Lock()
|
|
e.loaded[name] = false
|
|
e.mu.Unlock()
|
|
return nil
|
|
}
|
|
return fmt.Errorf("consumer %s: no unload route configured", name)
|
|
}
|
|
|
|
url := cons.URL + cons.Unload.Path
|
|
var body io.Reader
|
|
if cons.Unload.Body != "" {
|
|
body = strings.NewReader(cons.Unload.Body)
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, cons.Unload.Method, url, body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cons.Unload.Body != "" {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
resp, err := e.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
io.Copy(io.Discard, resp.Body)
|
|
if resp.StatusCode >= 400 {
|
|
return fmt.Errorf("unload %s returned status %d", name, resp.StatusCode)
|
|
}
|
|
e.mu.Lock()
|
|
e.loaded[name] = false
|
|
e.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (e *Evicting) ensureLoaded(ctx context.Context, name string) error {
|
|
cons := e.cfg.Consumers[name]
|
|
if cons == nil {
|
|
return fmt.Errorf("unknown consumer %q", name)
|
|
}
|
|
e.mu.Lock()
|
|
if e.loaded[name] {
|
|
e.mu.Unlock()
|
|
return nil
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
// No explicit load endpoint — rely on the consumer's own cold-start
|
|
// behaviour (mvoice would auto-load if a request arrived, comfyui as
|
|
// well). Mark loaded optimistically.
|
|
if cons.Load == nil {
|
|
e.mu.Lock()
|
|
e.loaded[name] = true
|
|
e.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
url := cons.URL + cons.Load.Path
|
|
var body io.Reader
|
|
if cons.Load.Body != "" {
|
|
body = strings.NewReader(cons.Load.Body)
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, cons.Load.Method, url, body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp, err := e.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
io.Copy(io.Discard, resp.Body)
|
|
if resp.StatusCode >= 400 {
|
|
return fmt.Errorf("load %s returned status %d", name, resp.StatusCode)
|
|
}
|
|
e.mu.Lock()
|
|
e.loaded[name] = true
|
|
e.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// SetLoadedForTest overrides the believed-loaded state for one consumer.
|
|
// Test-only — production code derives it from health probes + unload calls.
|
|
func (e *Evicting) SetLoadedForTest(name string, loaded bool) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.loaded[name] = loaded
|
|
}
|
|
|
|
// Compile-time interface guard.
|
|
var _ Scheduler = (*Evicting)(nil)
|