m's 14:56 observation: long Paliadin turns showed "Verbindung verloren —
Antwort wird nachgereicht …" but never delivered. The aichat backend
finished the turn upstream; paliad's HTTP client had given up at 130 s
and the legacy filesystem janitor never ran for the aichat path.
Three intertwined fixes, all shipped together because they share the
same wire shape and the same UI states:
1. Switch the aichat backend to /chat/turn/stream
- new AichatPaliadinService.RunTurnStream relays incremental chunks
- SSE parser handles default `data:` frames (chunk/meta/done/error)
and named `event: heartbeat` frames per the upstream contract
- no more 130 s hard ceiling — stream stays open as long as data or
heartbeats flow; silenceTimeout (90 s) catches a true upstream
stall instead
2. Proof-of-life thinking events
- handler emits `event: thinking` every 5 s while the upstream is
silent (synthesised locally) AND relays aichat's `heartbeat`
events as thinking pings
- frontend renders a lime-dot pulse + monospace counter inside the
assistant bubble — the user can SEE the chat is still working
3. Honest disconnect copy + real late-recovery
- new dispatching endpoint GET /api/paliadin/turns/{id}/recover
- aichat backend: asks aichat via GET /chat/conversations and
/chat/conversations/{id}/turns whether the turn actually finished
- legacy backend: falls through to the local row read (janitor)
- frontend swaps "wird nachgereicht" → "Lade frische Antwort …"
while the recovery polls; on confirmed "lost" swaps to
"Antwort konnte nicht zugestellt werden — bitte erneut stellen"
- migration 118 adds aichat_conversation_id to paliadin_turns so
the recovery has a fast path when the done frame arrived before
the drop
Streaming + recovery are a no-op for PALIADIN_BACKEND=legacy: the
StreamingPaliadin interface is detected via type assertion, the
LocalPaliadinService stays on the one-shot RunTurn + filesystem
janitor path.
13 new unit tests cover the SSE parser, the conversation-API client,
and the match-assistant-response helper.
go build ./... + go test ./internal/... + go test ./cmd/server/...
+ bun run build all clean.
791 lines
25 KiB
Go
791 lines
25 KiB
Go
package handlers
|
|
|
|
// paliadin.go — HTTP/SSE handlers for the Paliadin PoC (t-paliad-146).
|
|
//
|
|
// Design: docs/design-paliadin-2026-05-07.md §0.5.
|
|
//
|
|
// Three user-facing surfaces:
|
|
// GET /paliadin — chat panel page shell
|
|
// POST /api/paliadin/turn — initiate a turn, returns {turn_id, sse_url}
|
|
// GET /api/paliadin/stream/{id} — SSE stream of the turn
|
|
// POST /api/paliadin/reset — /clear the conversation
|
|
// GET /admin/paliadin — monitoring dashboard (global_admin)
|
|
// GET /api/admin/paliadin/stats — stats JSON
|
|
// GET /api/admin/paliadin/turns — recent turns JSON
|
|
//
|
|
// Routes register only when the PaliadinService is wired (which only
|
|
// happens when PALIADIN_ENABLED=true at boot). On prod, where it's
|
|
// false by default, none of these URLs exist — they 404 like any
|
|
// unrouted path.
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"mgit.msbls.de/m/paliad/internal/services"
|
|
)
|
|
|
|
// newDetachedContext returns a context with timeout that is independent
|
|
// of any incoming request — needed so the Claude-via-tmux turn isn't
|
|
// cancelled when the originating POST returns ahead of the SSE stream.
|
|
func newDetachedContext(timeout time.Duration) (context.Context, context.CancelFunc) {
|
|
return context.WithTimeout(context.Background(), timeout)
|
|
}
|
|
|
|
// paliadinSvc is the live Paliadin backend. nil when DATABASE_URL was
|
|
// unset (the service depends on the audit table). Set by Register() at
|
|
// boot. The concrete type is decided in cmd/server/main.go: local-tmux
|
|
// PoC, remote-via-SSH (mRiver), or a disabled stub.
|
|
var paliadinSvc services.Paliadin
|
|
|
|
// requirePaliadinOwner gates every paliadin handler to the single
|
|
// owner email (services.PaliadinOwnerEmail = m). Anyone else gets a
|
|
// 404 — the gate is a "this URL doesn't exist for you" pretence
|
|
// rather than a 403, so a curious colleague can't even confirm the
|
|
// route is wired.
|
|
func requirePaliadinOwner(w http.ResponseWriter, r *http.Request) bool {
|
|
if paliadinSvc == nil {
|
|
http.NotFound(w, r)
|
|
return false
|
|
}
|
|
uid, ok := requireUser(w, r)
|
|
if !ok {
|
|
return false
|
|
}
|
|
owner, err := paliadinSvc.IsOwner(r.Context(), uid)
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return false
|
|
}
|
|
if !owner {
|
|
http.NotFound(w, r)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// pendingTurns is an in-memory map of turn_id → result channel. The POST
|
|
// /api/paliadin/turn endpoint kicks off the work + writes a synthetic
|
|
// turn record; the GET /api/paliadin/stream/{id} endpoint reads from
|
|
// the channel + emits SSE events. PoC scope: single user, in-process
|
|
// state. Production v1 would use a Postgres-backed queue or pgnotify.
|
|
var (
|
|
pendingMu sync.Mutex
|
|
pendingTurns = map[uuid.UUID]chan turnEvent{}
|
|
)
|
|
|
|
// turnEvent is one SSE event for a turn-in-flight.
|
|
type turnEvent struct {
|
|
Kind string `json:"kind"` // meta | content | end | error
|
|
Data map[string]any `json:"data"`
|
|
}
|
|
|
|
// turnRequest is the JSON body of POST /api/paliadin/turn.
|
|
//
|
|
// Context (t-paliad-161) is the structured page-context payload from the
|
|
// inline widget. The standalone /paliadin page omits it and only sets
|
|
// PageOrigin (the cosmetic URL). When both are present, Context is the
|
|
// authoritative source — PageOrigin still gets persisted for legacy
|
|
// dashboard queries that filter on path.
|
|
type turnRequest struct {
|
|
UserMessage string `json:"user_message"`
|
|
SessionID string `json:"session_id"`
|
|
PageOrigin string `json:"page_origin,omitempty"`
|
|
Context *services.TurnContext `json:"context,omitempty"`
|
|
}
|
|
|
|
// turnResponse is what POST /api/paliadin/turn returns.
|
|
type turnResponse struct {
|
|
TurnID string `json:"turn_id"`
|
|
SSEURL string `json:"sse_url"`
|
|
}
|
|
|
|
// handlePaliadinPage serves the static /paliadin chat panel. Gated to
|
|
// the single Paliadin owner (m); every other authenticated user gets
|
|
// a 404 — the route effectively does not exist for them.
|
|
func handlePaliadinPage(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
http.ServeFile(w, r, "dist/paliadin.html")
|
|
}
|
|
|
|
// handleAdminPaliadinPage serves the /admin/paliadin monitoring page.
|
|
// Same owner gate — even other global_admins can't see this surface.
|
|
func handleAdminPaliadinPage(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
http.ServeFile(w, r, "dist/admin-paliadin.html")
|
|
}
|
|
|
|
// handlePaliadinTurn kicks off a turn and returns the SSE URL.
|
|
//
|
|
// We don't block here; the actual Claude work runs in a goroutine that
|
|
// pushes events into the per-turn channel. The client immediately opens
|
|
// EventSource on the returned URL and reads as the goroutine writes.
|
|
func handlePaliadinTurn(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
uid, _ := requireUser(w, r) // already validated by requirePaliadinOwner
|
|
var req turnRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON"})
|
|
return
|
|
}
|
|
if req.UserMessage == "" {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "user_message required"})
|
|
return
|
|
}
|
|
if req.SessionID == "" {
|
|
req.SessionID = uuid.New().String()
|
|
}
|
|
|
|
turnID := uuid.New()
|
|
ch := make(chan turnEvent, 16)
|
|
pendingMu.Lock()
|
|
pendingTurns[turnID] = ch
|
|
pendingMu.Unlock()
|
|
|
|
// Backwards compat: if the structured context arrived but PageOrigin
|
|
// is empty, mirror context.PageOrigin into the top-level field so
|
|
// admin queries that filter on page_origin still work.
|
|
pageOrigin := req.PageOrigin
|
|
if pageOrigin == "" && req.Context != nil {
|
|
pageOrigin = req.Context.PageOrigin
|
|
}
|
|
|
|
// Goroutine drives the actual Claude turn. We use a fresh context
|
|
// (not r.Context()) because the request is going to return as soon
|
|
// as we hand back the SSE URL — we don't want the whole turn to
|
|
// cancel when the POST completes.
|
|
go runPaliadinTurnAsync(turnID, services.TurnRequest{
|
|
UserID: uid,
|
|
SessionID: req.SessionID,
|
|
UserMessage: req.UserMessage,
|
|
PageOrigin: pageOrigin,
|
|
Context: req.Context,
|
|
}, ch)
|
|
|
|
writeJSON(w, http.StatusOK, turnResponse{
|
|
TurnID: turnID.String(),
|
|
SSEURL: "/api/paliadin/stream/" + turnID.String(),
|
|
})
|
|
}
|
|
|
|
// runPaliadinTurnAsync executes the turn and writes events into ch.
|
|
//
|
|
// Backend dispatch:
|
|
// - StreamingPaliadin (aichat) → drives runStreamingTurn which relays
|
|
// incremental chunks + upstream heartbeats. No hard ceiling on
|
|
// stream duration; falls back to silence_timeout (silenceTimeout)
|
|
// if the upstream goes dark.
|
|
// - Plain Paliadin (legacy local/remote) → one-shot RunTurn with the
|
|
// original 150 s ceiling (matches the shim's 120 s run-turn cap +
|
|
// SSH overhead per t-paliad-155).
|
|
func runPaliadinTurnAsync(turnID uuid.UUID, req services.TurnRequest, ch chan<- turnEvent) {
|
|
defer func() {
|
|
// Drain + close. The SSE handler reads until the channel closes.
|
|
close(ch)
|
|
}()
|
|
|
|
send(ch, turnEvent{
|
|
Kind: "meta",
|
|
Data: map[string]any{
|
|
"turn_id": turnID.String(),
|
|
"started_at": time.Now().UTC().Format(time.RFC3339),
|
|
},
|
|
})
|
|
|
|
if streamer, ok := paliadinSvc.(services.StreamingPaliadin); ok {
|
|
runStreamingTurn(turnID, req, ch, streamer)
|
|
return
|
|
}
|
|
runOneShotTurn(turnID, req, ch)
|
|
}
|
|
|
|
// runOneShotTurn drives the legacy synchronous backends (local-tmux PoC,
|
|
// remote ssh+paliadin-shim). Preserves the original 150 s ceiling.
|
|
func runOneShotTurn(turnID uuid.UUID, req services.TurnRequest, ch chan<- turnEvent) {
|
|
ctx, cancel := newDetachedContext(150 * time.Second)
|
|
defer cancel()
|
|
|
|
result, err := paliadinSvc.RunTurn(ctx, req)
|
|
if err != nil {
|
|
errCode := "upstream_error"
|
|
if errors.Is(err, services.ErrTmuxUnavailable) {
|
|
errCode = "tmux_unavailable"
|
|
}
|
|
send(ch, turnEvent{
|
|
Kind: "error",
|
|
Data: map[string]any{"code": errCode, "message": err.Error()},
|
|
})
|
|
return
|
|
}
|
|
|
|
// One-shot content event with the full body. The frontend simulates
|
|
// streaming with a typewriter effect.
|
|
send(ch, turnEvent{
|
|
Kind: "content",
|
|
Data: map[string]any{"text": result.Response},
|
|
})
|
|
|
|
send(ch, turnEvent{
|
|
Kind: "end",
|
|
Data: map[string]any{
|
|
"turn_id": turnID.String(),
|
|
"used_tools": result.UsedTools,
|
|
"rows_seen": result.RowsSeen,
|
|
"chip_count": result.ChipCount,
|
|
"classifier_tag": result.ClassifierTag,
|
|
"duration_ms": result.DurationMS,
|
|
},
|
|
})
|
|
}
|
|
|
|
// silenceTimeout is the longest the aichat upstream may stay silent
|
|
// (no chunk, no heartbeat) before runStreamingTurn gives up and fires
|
|
// an error frame. 90 s comfortably exceeds aichat's 5 s heartbeat
|
|
// cadence so a transient stall (model wedge, GC pause) doesn't kill
|
|
// the turn, while still catching a hard upstream drop.
|
|
const silenceTimeout = 90 * time.Second
|
|
|
|
// streamingThinkingInterval is the cadence at which we emit a synthetic
|
|
// `thinking` event when the upstream has gone quiet but the connection
|
|
// is still alive. 5 s matches aichat's own heartbeat tick so the UI
|
|
// pulse never falls more than 5 s out of date.
|
|
const streamingThinkingInterval = 5 * time.Second
|
|
|
|
// streamingTurnDeadline is the upper bound for a single streaming turn.
|
|
// Far above any realistic Claude turn but finite so a runaway upstream
|
|
// (or a paliad bug that never closes the channel) can't leak forever.
|
|
const streamingTurnDeadline = 30 * time.Minute
|
|
|
|
// runStreamingTurn drives an incremental turn against the StreamingPaliadin
|
|
// backend. Relays chunks → content events, upstream heartbeats →
|
|
// thinking events, errors → error events. Adds its own silence-watch:
|
|
// if the upstream emits no event for silenceTimeout, fire an error
|
|
// frame so the client doesn't sit on a dead stream forever.
|
|
func runStreamingTurn(turnID uuid.UUID, req services.TurnRequest, ch chan<- turnEvent, streamer services.StreamingPaliadin) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), streamingTurnDeadline)
|
|
defer cancel()
|
|
|
|
events := make(chan services.StreamEvent, 32)
|
|
startedAt := time.Now()
|
|
|
|
// streamerDone closes when the backend's RunTurnStream returns. We
|
|
// race the silence watcher and the event pump against it so the
|
|
// goroutine exit is clean either way.
|
|
type runResult struct {
|
|
result *services.TurnResult
|
|
err error
|
|
}
|
|
runCh := make(chan runResult, 1)
|
|
go func() {
|
|
res, err := streamer.RunTurnStream(ctx, req, events)
|
|
runCh <- runResult{res, err}
|
|
}()
|
|
|
|
var (
|
|
lastEventAt = time.Now()
|
|
usedTools []string
|
|
rowsSeen []int
|
|
classifierTag string
|
|
convID string
|
|
gotChunk bool
|
|
errorEmitted bool
|
|
)
|
|
|
|
silenceTicker := time.NewTicker(streamingThinkingInterval)
|
|
defer silenceTicker.Stop()
|
|
|
|
emitThinking := func(elapsedSeconds int) {
|
|
// Don't emit `thinking` after the first real chunk arrives —
|
|
// the frontend hides the pulse once content starts flowing
|
|
// anyway, but we save bandwidth by stopping emission.
|
|
send(ch, turnEvent{
|
|
Kind: "thinking",
|
|
Data: map[string]any{
|
|
"elapsed_seconds": elapsedSeconds,
|
|
"since_first": gotChunk,
|
|
},
|
|
})
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case ev, more := <-events:
|
|
if !more {
|
|
events = nil // disable case
|
|
continue
|
|
}
|
|
lastEventAt = time.Now()
|
|
switch ev.Kind {
|
|
case services.StreamChunk:
|
|
gotChunk = true
|
|
send(ch, turnEvent{
|
|
Kind: "content",
|
|
Data: map[string]any{
|
|
"delta": ev.Content,
|
|
"streamed": true,
|
|
},
|
|
})
|
|
case services.StreamHeartbeat:
|
|
// Upstream is alive but no chunks yet (or a mid-stream
|
|
// stall). Pass through with our own thinking shape.
|
|
send(ch, turnEvent{
|
|
Kind: "thinking",
|
|
Data: map[string]any{
|
|
"elapsed_seconds": ev.ElapsedSeconds,
|
|
"since_first": gotChunk,
|
|
"upstream": true,
|
|
},
|
|
})
|
|
case services.StreamMeta:
|
|
usedTools = ev.UsedTools
|
|
rowsSeen = ev.RowsSeen
|
|
classifierTag = ev.ClassifierTag
|
|
case services.StreamConversation:
|
|
convID = ev.ConversationID
|
|
case services.StreamError:
|
|
errorEmitted = true
|
|
send(ch, turnEvent{
|
|
Kind: "error",
|
|
Data: map[string]any{
|
|
"code": ev.Code,
|
|
"message": ev.Message,
|
|
"retryable": ev.Retryable,
|
|
},
|
|
})
|
|
}
|
|
case <-silenceTicker.C:
|
|
elapsed := time.Since(lastEventAt)
|
|
if elapsed >= silenceTimeout {
|
|
send(ch, turnEvent{
|
|
Kind: "error",
|
|
Data: map[string]any{
|
|
"code": "upstream_silence",
|
|
"message": "aichat upstream went silent for over " + silenceTimeout.String(),
|
|
},
|
|
})
|
|
// Cancel the backend so it doesn't keep running.
|
|
cancel()
|
|
continue
|
|
}
|
|
emitThinking(int(time.Since(startedAt).Seconds()))
|
|
case res := <-runCh:
|
|
// Drain any remaining events the backend pushed before
|
|
// closing the channel.
|
|
if events != nil {
|
|
for ev := range events {
|
|
switch ev.Kind {
|
|
case services.StreamChunk:
|
|
gotChunk = true
|
|
send(ch, turnEvent{
|
|
Kind: "content",
|
|
Data: map[string]any{
|
|
"delta": ev.Content,
|
|
"streamed": true,
|
|
},
|
|
})
|
|
case services.StreamMeta:
|
|
usedTools = ev.UsedTools
|
|
rowsSeen = ev.RowsSeen
|
|
classifierTag = ev.ClassifierTag
|
|
case services.StreamConversation:
|
|
convID = ev.ConversationID
|
|
case services.StreamError:
|
|
errorEmitted = true
|
|
send(ch, turnEvent{
|
|
Kind: "error",
|
|
Data: map[string]any{
|
|
"code": ev.Code,
|
|
"message": ev.Message,
|
|
"retryable": ev.Retryable,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
}
|
|
if res.err != nil {
|
|
if !errorEmitted {
|
|
send(ch, turnEvent{
|
|
Kind: "error",
|
|
Data: map[string]any{
|
|
"code": "upstream_error",
|
|
"message": res.err.Error(),
|
|
},
|
|
})
|
|
}
|
|
return
|
|
}
|
|
result := res.result
|
|
if result == nil {
|
|
// Shouldn't happen — backend contract returns either err
|
|
// or a result. Defensive bail.
|
|
if !errorEmitted {
|
|
send(ch, turnEvent{
|
|
Kind: "error",
|
|
Data: map[string]any{
|
|
"code": "upstream_error",
|
|
"message": "stream closed without result",
|
|
},
|
|
})
|
|
}
|
|
return
|
|
}
|
|
if result.UsedTools != nil {
|
|
usedTools = result.UsedTools
|
|
}
|
|
if result.RowsSeen != nil {
|
|
rowsSeen = result.RowsSeen
|
|
}
|
|
if classifierTag == "" && result.ClassifierTag != "" {
|
|
classifierTag = result.ClassifierTag
|
|
}
|
|
endData := map[string]any{
|
|
"turn_id": turnID.String(),
|
|
"used_tools": usedTools,
|
|
"rows_seen": rowsSeen,
|
|
"chip_count": result.ChipCount,
|
|
"classifier_tag": classifierTag,
|
|
"duration_ms": result.DurationMS,
|
|
"streamed": true,
|
|
}
|
|
if convID != "" {
|
|
endData["aichat_conversation_id"] = convID
|
|
}
|
|
send(ch, turnEvent{Kind: "end", Data: endData})
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// handlePaliadinStream is the SSE endpoint the EventSource subscribes
|
|
// to. Reads from the per-turn channel + writes SSE-framed events.
|
|
func handlePaliadinStream(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
turnIDStr := r.PathValue("id")
|
|
turnID, err := uuid.Parse(turnIDStr)
|
|
if err != nil {
|
|
http.Error(w, "invalid turn_id", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
pendingMu.Lock()
|
|
ch, ok := pendingTurns[turnID]
|
|
pendingMu.Unlock()
|
|
if !ok {
|
|
http.Error(w, "unknown turn_id (already finished, or never started)", http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
// SSE headers.
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache, no-transform")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no") // disable nginx/Traefik buffering
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Heartbeat ticker keeps reverse proxies from reaping the connection.
|
|
heartbeat := time.NewTicker(25 * time.Second)
|
|
defer heartbeat.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
// Client closed the connection — don't drain, leave the
|
|
// channel for the goroutine to finish into. Pending-turns
|
|
// cleanup happens after end/error events flush below.
|
|
return
|
|
case <-heartbeat.C:
|
|
fmt.Fprint(w, "event: ping\ndata: {}\n\n")
|
|
flusher.Flush()
|
|
case ev, more := <-ch:
|
|
if !more {
|
|
// Goroutine finished. Tidy up the pending-turns map.
|
|
pendingMu.Lock()
|
|
delete(pendingTurns, turnID)
|
|
pendingMu.Unlock()
|
|
return
|
|
}
|
|
payload, _ := json.Marshal(ev.Data)
|
|
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", ev.Kind, payload)
|
|
flusher.Flush()
|
|
if ev.Kind == "end" || ev.Kind == "error" {
|
|
// Don't return immediately — wait for the channel to
|
|
// close so the cleanup branch above runs and the client
|
|
// gets a clean EOF.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// handlePaliadinTurnGet returns a single turn the caller can see. The
|
|
// chat UI hits this endpoint to recover late responses: when SSE
|
|
// closed with an error (timeout / upstream_error) the bubble registers
|
|
// the turn for polling, and this endpoint reports the row's current
|
|
// state. The janitor (services.LocalPaliadinService.runJanitor) is the
|
|
// other half of that loop — it patches the row when Claude writes the
|
|
// response file after the 60 s pollForResponse window expired.
|
|
//
|
|
// Returns:
|
|
// 200 + {response, error_code, finished_at, duration_ms, used_tools,
|
|
// rows_seen, chip_count, classifier_tag}
|
|
// 404 when the turn is invisible to the caller / absent
|
|
func handlePaliadinTurnGet(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
uid, _ := requireUser(w, r)
|
|
turnID, err := uuid.Parse(r.PathValue("id"))
|
|
if err != nil {
|
|
http.Error(w, "invalid turn_id", http.StatusBadRequest)
|
|
return
|
|
}
|
|
row, err := paliadinSvc.GetTurn(r.Context(), uid, turnID)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
http.Error(w, "not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
http.Error(w, "lookup failed", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
resp := map[string]any{
|
|
"turn_id": row.TurnID.String(),
|
|
"started_at": row.StartedAt.Format(time.RFC3339),
|
|
"response": row.Response,
|
|
"error_code": row.ErrorCode,
|
|
"finished_at": row.FinishedAt,
|
|
"duration_ms": row.DurationMS,
|
|
"used_tools": []string(row.UsedTools),
|
|
"rows_seen": []int64(row.RowsSeen),
|
|
"chip_count": row.ChipCount,
|
|
"classifier_tag": row.ClassifierTag,
|
|
}
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// handlePaliadinTurnRecover is the dispatching late-recovery endpoint
|
|
// (t-paliad-235). Replaces the legacy direct-row-read for the aichat
|
|
// backend. When the backend implements services.AichatRecoverer (the
|
|
// PALIADIN_BACKEND=aichat path), we ask aichat directly via its
|
|
// conversation API whether the turn actually completed upstream after
|
|
// our stream connection dropped. When it doesn't implement it (legacy
|
|
// local/remote backends), we fall back to reading the local row —
|
|
// services.LocalPaliadinService.runJanitor is still the recovery path
|
|
// there.
|
|
//
|
|
// Response shape mirrors handlePaliadinTurnGet so the frontend
|
|
// late-poll module doesn't need a backend-specific code path.
|
|
// Additional field `recovery_state` distinguishes:
|
|
//
|
|
// "recovered" — the response is in the row (already there, or freshly
|
|
// written from the upstream check)
|
|
// "pending" — still no response; caller should keep polling
|
|
// "lost" — backend confirms the turn is gone (aichat doesn't
|
|
// have it either). UI should degrade to "verloren".
|
|
func handlePaliadinTurnRecover(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
uid, _ := requireUser(w, r)
|
|
turnID, err := uuid.Parse(r.PathValue("id"))
|
|
if err != nil {
|
|
http.Error(w, "invalid turn_id", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Quick read first — gives us the row regardless of backend.
|
|
row, err := paliadinSvc.GetTurn(r.Context(), uid, turnID)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
http.Error(w, "not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
http.Error(w, "lookup failed", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
state := recoveryStateFor(row)
|
|
|
|
// Aichat backend: when the row still has no response, ask aichat
|
|
// whether the turn actually finished upstream.
|
|
if state == "pending" {
|
|
if rec, ok := paliadinSvc.(services.AichatRecoverer); ok {
|
|
ctx, cancel := context.WithTimeout(r.Context(), 8*time.Second)
|
|
defer cancel()
|
|
recovered, recErr := rec.RecoverTurn(ctx, uid, turnID)
|
|
if recErr != nil {
|
|
// Log + fall through to a plain pending response — a
|
|
// transient aichat hiccup shouldn't flip the UI to
|
|
// "lost".
|
|
_ = recErr
|
|
} else if recovered != nil {
|
|
row = recovered
|
|
state = recoveryStateFor(row)
|
|
} else {
|
|
// Aichat returned a clean "no, I don't have it either".
|
|
// Only mark as lost when the turn is older than the
|
|
// upstream's plausible turn budget — otherwise the
|
|
// recovery just hit the window between paliad's stream
|
|
// dropping and aichat finishing the run.
|
|
if recoveryShouldGiveUp(row) {
|
|
state = "lost"
|
|
}
|
|
}
|
|
} else if recoveryShouldGiveUp(row) {
|
|
// Legacy backends: rely on the janitor. If we're past the
|
|
// give-up threshold and still no response, surface "lost".
|
|
state = "lost"
|
|
}
|
|
}
|
|
|
|
resp := map[string]any{
|
|
"turn_id": row.TurnID.String(),
|
|
"started_at": row.StartedAt.Format(time.RFC3339),
|
|
"response": row.Response,
|
|
"error_code": row.ErrorCode,
|
|
"finished_at": row.FinishedAt,
|
|
"duration_ms": row.DurationMS,
|
|
"used_tools": []string(row.UsedTools),
|
|
"rows_seen": []int64(row.RowsSeen),
|
|
"chip_count": row.ChipCount,
|
|
"classifier_tag": row.ClassifierTag,
|
|
"recovery_state": state,
|
|
}
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// recoveryStateFor returns the lifecycle state of a paliadin turn from
|
|
// the recovery endpoint's perspective.
|
|
func recoveryStateFor(row *services.PaliadinTurn) string {
|
|
if row.Response != nil && *row.Response != "" {
|
|
return "recovered"
|
|
}
|
|
return "pending"
|
|
}
|
|
|
|
// recoveryShouldGiveUp returns true when a turn has been pending long
|
|
// enough that we should surface "lost" rather than asking the user to
|
|
// keep waiting. 12 minutes is comfortably beyond the longest realistic
|
|
// Claude turn (cold-start + reasoning + tool calls all bundled).
|
|
func recoveryShouldGiveUp(row *services.PaliadinTurn) bool {
|
|
return time.Since(row.StartedAt) > 12*time.Minute
|
|
}
|
|
|
|
// handlePaliadinHistory returns the caller's prior turns for a given
|
|
// browser session id, oldest → newest. Both Paliadin surfaces (the
|
|
// inline drawer and the standalone /paliadin page) hit this on mount
|
|
// to seed their UI with the canonical conversation BEFORE rendering
|
|
// any localStorage cache, so a crash / device swap / cross-surface
|
|
// jump shows the same threading.
|
|
//
|
|
// Query params:
|
|
// session — browser session id (required; empty → empty array)
|
|
// limit — max rows to return (default 50, capped at 200)
|
|
func handlePaliadinHistory(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
uid, _ := requireUser(w, r)
|
|
sessionID := strings.TrimSpace(r.URL.Query().Get("session"))
|
|
limit := 50
|
|
if raw := r.URL.Query().Get("limit"); raw != "" {
|
|
if n, err := strconv.Atoi(raw); err == nil && n > 0 {
|
|
limit = n
|
|
}
|
|
}
|
|
rows, err := paliadinSvc.ListHistoryForSession(r.Context(), uid, sessionID, limit)
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, rows)
|
|
}
|
|
|
|
// handlePaliadinReset kills the caller's Paliadin tmux session so the
|
|
// next turn boots a fresh claude pane (per-user — see t-paliad-155).
|
|
func handlePaliadinReset(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
uid, _ := requireUser(w, r) // already validated by requirePaliadinOwner
|
|
ctx, cancel := newDetachedContext(10 * time.Second)
|
|
defer cancel()
|
|
if err := paliadinSvc.ResetSession(ctx, uid); err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{
|
|
"error": "reset failed: " + err.Error(),
|
|
})
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]bool{"ok": true})
|
|
}
|
|
|
|
// =============================================================================
|
|
// /admin/paliadin — monitoring dashboard.
|
|
// =============================================================================
|
|
|
|
// handleAdminPaliadinStats returns the aggregate stats for the dashboard.
|
|
func handleAdminPaliadinStats(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
uid, _ := requireUser(w, r)
|
|
stats, err := paliadinSvc.Stats(r.Context(), uid)
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, stats)
|
|
}
|
|
|
|
// handleAdminPaliadinTurns returns the most recent turn rows.
|
|
func handleAdminPaliadinTurns(w http.ResponseWriter, r *http.Request) {
|
|
if !requirePaliadinOwner(w, r) {
|
|
return
|
|
}
|
|
uid, _ := requireUser(w, r)
|
|
turns, err := paliadinSvc.ListRecentTurns(r.Context(), uid, 50)
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, turns)
|
|
}
|
|
|
|
// =============================================================================
|
|
// helpers.
|
|
// =============================================================================
|
|
|
|
// send pushes an event onto the channel without blocking — drops on
|
|
// overflow. PoC scope: 16-deep buffer, single subscriber, very unlikely
|
|
// to overflow even with the slow Claude-via-tmux path.
|
|
func send(ch chan<- turnEvent, ev turnEvent) {
|
|
select {
|
|
case ch <- ev:
|
|
default:
|
|
// Channel full — drop. Logged by the SSE consumer's gap (it
|
|
// keeps reading; only "end"/"error" matter for completion).
|
|
}
|
|
}
|