package scheduler import ( "context" "fmt" "io" "log/slog" "net/http" "slices" "strings" "sync" "sync/atomic" "time" "mgit.msbls.de/m/mGPUmanager/internal/config" "mgit.msbls.de/m/mGPUmanager/internal/gpu" "mgit.msbls.de/m/mGPUmanager/internal/registry" ) // vramCushionMiB is the minimum free VRAM the scheduler insists on having // AFTER the target consumer is loaded. Keeps cudaMalloc headers from OOM-ing // at the very edge of available memory. const vramCushionMiB = 256 // maxEvictAttempts caps how many consumers the scheduler will unload in a // single ensureFits cycle before giving up and returning an error. Five is // generous — we only have four consumers configured. const maxEvictAttempts = 5 // Evicting is the Schritt 5 scheduler: it wraps a Locked scheduler with // VRAM-pressure-aware eviction. // // Flow per job: // 1. ensureFits — if the live free VRAM minus a 256 MiB cushion is below // the target consumer's vram_resident_mib AND the target is not already // resident, unload the LRU non-coexistent consumer. Repeat until fit. // 2. ensureLoaded — if the target was previously unloaded, call its // load endpoint (mvoice) or rely on implicit cold-start (whisper, etc.). // 3. inner.Run — acquire the global GPU lock and run the job. // // Eviction state is scheduler-local: registry.Loaded (polled every 5 s) is // authoritative when the consumer reports it, but for the seconds between an // unload call and the next probe we rely on our own bookkeeping. type Evicting struct { cfg *config.Config reg *registry.Registry gpu *gpu.Poller inner *Locked logger *slog.Logger client *http.Client mu sync.Mutex loaded map[string]bool // consumer name -> believed-resident lastUsed map[string]time.Time evictions int64 } // NewEvicting builds the Schritt 5 scheduler. All consumers are assumed // resident at startup — the first health probe will correct any consumers // that actually aren't (e.g. mvoice in 'unloaded' state). func NewEvicting(cfg *config.Config, reg *registry.Registry, gpuPoller *gpu.Poller, logger *slog.Logger) *Evicting { e := &Evicting{ cfg: cfg, reg: reg, gpu: gpuPoller, inner: NewLocked(reg, 1), logger: logger, client: &http.Client{Timeout: 30 * time.Second}, loaded: make(map[string]bool, len(cfg.Consumers)), lastUsed: make(map[string]time.Time, len(cfg.Consumers)), } for name, cons := range cfg.Consumers { e.loaded[name] = initialLoaded(cons) } return e } // initialLoaded picks the believed-loaded state for a consumer at scheduler // startup. The rule: // // - VRAM-managed (ollama): true — we never track or evict it. // - Has a load route AND an unload route (mvoice): true — the consumer // is set up to be controllable in both directions, and typically // preloads on its own systemd-managed startup. // - Has only an unload route, no load route (comfyui): false — lazy. // FLUX isn't resident until the first /prompt; until that happens we // don't account for its VRAM cost. // - Has a systemd_unit but no HTTP routes (whisper-server): true — these // are always-on services that load their model at process start. // - Neither: true — fallback, assume it's there if the consumer is up. // // Getting this right matters for the eviction smoke test: if comfyui were // believed loaded at startup, ensureFits would short-circuit on the first // /v1/image request and never trigger eviction. (m/mGPUmanager#1 live deploy.) func initialLoaded(cons *config.Consumer) bool { if cons.VRAMManaged { return true } if cons.Load == nil && cons.Unload != nil { return false } return true } // Run is the public Scheduler interface: ensure room + load + serialise. func (e *Evicting) Run(ctx context.Context, consumer string, fn Job) error { if err := e.ensureFits(ctx, consumer); err != nil { return fmt.Errorf("eviction: %w", err) } if err := e.ensureLoaded(ctx, consumer); err != nil { return fmt.Errorf("load %s: %w", consumer, err) } err := e.inner.Run(ctx, consumer, fn) if err == nil { e.mu.Lock() e.lastUsed[consumer] = time.Now() e.mu.Unlock() } return err } // Stats forwards from the inner scheduler and adds the eviction counter. func (e *Evicting) Stats() Stats { s := e.inner.Stats() s.Evictions = atomic.LoadInt64(&e.evictions) return s } // ───── ensureFits ──────────────────────────────────────────────────────── func (e *Evicting) ensureFits(ctx context.Context, target string) error { cons := e.cfg.Consumers[target] if cons == nil { return fmt.Errorf("unknown consumer %q", target) } if cons.VRAMResidentMiB == 0 || cons.VRAMManaged { // Self-managed (ollama) or unknown size — let the consumer figure // it out; no preemptive eviction. return nil } // Already resident? No eviction needed. e.mu.Lock() resident := e.loaded[target] e.mu.Unlock() if resident { return nil } for range maxEvictAttempts { if e.fits(cons) { return nil } victim := e.pickLRUVictim(target, cons) if victim == "" { // Nothing left to evict that we're allowed to touch. e.logger.Warn("no eviction candidates", "target", target, "need_mib", cons.VRAMResidentMiB, "free_mib", e.gpu.Last().FreeMiB) return nil } if err := e.unload(ctx, victim); err != nil { e.logger.Warn("evict failed", "victim", victim, "err", err) return fmt.Errorf("unload %s: %w", victim, err) } atomic.AddInt64(&e.evictions, 1) e.logger.Info("evicted consumer", "victim", victim, "target", target, "free_mib_after", e.gpu.Last().FreeMiB, "need_mib", cons.VRAMResidentMiB) // Give the GPU a moment to actually free the VRAM before re-checking. select { case <-time.After(1 * time.Second): case <-ctx.Done(): return ctx.Err() } } return fmt.Errorf("VRAM headroom still insufficient after %d evictions", maxEvictAttempts) } // fits returns true when the live nvidia-smi free VRAM minus the safety // cushion is enough for the target consumer's predicted footprint. // // Falls back to the static budget (cfg.GPU.AvailableMiB() minus the // non-coexistent loaded set) if the GPU poller has not produced a sample // yet (e.g. during the first second of process lifetime). func (e *Evicting) fits(cons *config.Consumer) bool { sample := e.gpu.Last() if sample.FreeMiB > 0 || sample.TotalMiB > 0 { return sample.FreeMiB >= cons.VRAMResidentMiB+vramCushionMiB } return e.fitsByBudget(cons) } func (e *Evicting) fitsByBudget(cons *config.Consumer) bool { headroom := e.cfg.GPU.AvailableMiB() e.mu.Lock() defer e.mu.Unlock() for name, loaded := range e.loaded { if !loaded { continue } other := e.cfg.Consumers[name] if other == nil || other.VRAMManaged { continue } if slices.Contains(cons.CanCoexistWith, name) { continue } headroom -= other.VRAMResidentMiB } return headroom >= cons.VRAMResidentMiB } // pickLRUVictim returns the name of the loaded consumer with the oldest // LastUsed that is NOT in target's can_coexist_with list, NOT the target // itself, NOT VRAM-managed, and has *some* way to be evicted. func (e *Evicting) pickLRUVictim(target string, cons *config.Consumer) string { snap := e.reg.Snapshot() e.mu.Lock() defer e.mu.Unlock() var best string var bestTime time.Time for name, loaded := range e.loaded { if !loaded || name == target { continue } other := e.cfg.Consumers[name] if other == nil || other.VRAMManaged { continue } if slices.Contains(cons.CanCoexistWith, name) { continue } if other.Unload == nil && other.SystemdUnit == "" { continue } // LastUsed: prefer scheduler-local (set on successful job exit) over // registry (set on probe completion). Scheduler-local is more // meaningful for LRU because it reflects real GPU work, not health // chatter. t := e.lastUsed[name] if t.IsZero() { t = snap[name].LastUsed } if best == "" || t.Before(bestTime) { best = name bestTime = t } } return best } // ───── unload + load ───────────────────────────────────────────────────── func (e *Evicting) unload(ctx context.Context, name string) error { cons := e.cfg.Consumers[name] if cons.Unload == nil { // systemd-unit-based unload is whisper-server's path; we don't shell // out to sudo from a server daemon in Phase 1. Mark unloaded so we // don't keep picking it as a victim, and let the next request // cold-start via systemd (whisper-server boots in <2 s). if cons.SystemdUnit != "" { e.mu.Lock() e.loaded[name] = false e.mu.Unlock() return nil } return fmt.Errorf("consumer %s: no unload route configured", name) } url := cons.URL + cons.Unload.Path var body io.Reader if cons.Unload.Body != "" { body = strings.NewReader(cons.Unload.Body) } req, err := http.NewRequestWithContext(ctx, cons.Unload.Method, url, body) if err != nil { return err } if cons.Unload.Body != "" { req.Header.Set("Content-Type", "application/json") } resp, err := e.client.Do(req) if err != nil { return err } defer resp.Body.Close() io.Copy(io.Discard, resp.Body) if resp.StatusCode >= 400 { return fmt.Errorf("unload %s returned status %d", name, resp.StatusCode) } e.mu.Lock() e.loaded[name] = false e.mu.Unlock() return nil } func (e *Evicting) ensureLoaded(ctx context.Context, name string) error { cons := e.cfg.Consumers[name] if cons == nil { return fmt.Errorf("unknown consumer %q", name) } e.mu.Lock() if e.loaded[name] { e.mu.Unlock() return nil } e.mu.Unlock() // No explicit load endpoint — rely on the consumer's own cold-start // behaviour (mvoice would auto-load if a request arrived, comfyui as // well). Mark loaded optimistically. if cons.Load == nil { e.mu.Lock() e.loaded[name] = true e.mu.Unlock() return nil } url := cons.URL + cons.Load.Path var body io.Reader if cons.Load.Body != "" { body = strings.NewReader(cons.Load.Body) } req, err := http.NewRequestWithContext(ctx, cons.Load.Method, url, body) if err != nil { return err } resp, err := e.client.Do(req) if err != nil { return err } defer resp.Body.Close() io.Copy(io.Discard, resp.Body) if resp.StatusCode >= 400 { return fmt.Errorf("load %s returned status %d", name, resp.StatusCode) } e.mu.Lock() e.loaded[name] = true e.mu.Unlock() return nil } // SetLoadedForTest overrides the believed-loaded state for one consumer. // Test-only — production code derives it from health probes + unload calls. func (e *Evicting) SetLoadedForTest(name string, loaded bool) { e.mu.Lock() defer e.mu.Unlock() e.loaded[name] = loaded } // Compile-time interface guard. var _ Scheduler = (*Evicting)(nil)