m's 14:56 observation: long Paliadin turns showed "Verbindung verloren —
Antwort wird nachgereicht …" but never delivered. The aichat backend
finished the turn upstream; paliad's HTTP client had given up at 130 s
and the legacy filesystem janitor never ran for the aichat path.
Three intertwined fixes, all shipped together because they share the
same wire shape and the same UI states:
1. Switch the aichat backend to /chat/turn/stream
- new AichatPaliadinService.RunTurnStream relays incremental chunks
- SSE parser handles default `data:` frames (chunk/meta/done/error)
and named `event: heartbeat` frames per the upstream contract
- no more 130 s hard ceiling — stream stays open as long as data or
heartbeats flow; silenceTimeout (90 s) catches a true upstream
stall instead
2. Proof-of-life thinking events
- handler emits `event: thinking` every 5 s while the upstream is
silent (synthesised locally) AND relays aichat's `heartbeat`
events as thinking pings
- frontend renders a lime-dot pulse + monospace counter inside the
assistant bubble — the user can SEE the chat is still working
3. Honest disconnect copy + real late-recovery
- new dispatching endpoint GET /api/paliadin/turns/{id}/recover
- aichat backend: asks aichat via GET /chat/conversations and
/chat/conversations/{id}/turns whether the turn actually finished
- legacy backend: falls through to the local row read (janitor)
- frontend swaps "wird nachgereicht" → "Lade frische Antwort …"
while the recovery polls; on confirmed "lost" swaps to
"Antwort konnte nicht zugestellt werden — bitte erneut stellen"
- migration 118 adds aichat_conversation_id to paliadin_turns so
the recovery has a fast path when the done frame arrived before
the drop
Streaming + recovery are a no-op for PALIADIN_BACKEND=legacy: the
StreamingPaliadin interface is detected via type assertion, the
LocalPaliadinService stays on the one-shot RunTurn + filesystem
janitor path.
13 new unit tests cover the SSE parser, the conversation-API client,
and the match-assistant-response helper.
go build ./... + go test ./internal/... + go test ./cmd/server/...
+ bun run build all clean.
128 lines
4.9 KiB
Go
128 lines
4.9 KiB
Go
package services
|
|
|
|
// Streaming support for the Paliadin chat surface (t-paliad-235).
|
|
//
|
|
// The legacy LocalPaliadinService.RunTurn returns the full response in
|
|
// one shot — the chat UI gets one `content` blob and the typewriter
|
|
// simulates streaming. That falls apart on long turns: the HTTP client
|
|
// hits its 130 s ceiling, paliad's SSE stream closes, the bubble shows
|
|
// "Verbindung verloren" and the response is lost.
|
|
//
|
|
// The aichat backend exposes a real streaming variant at
|
|
// /chat/turn/stream that emits incremental chunks + named heartbeat
|
|
// events while claude is thinking. AichatPaliadinService implements
|
|
// the StreamingPaliadin interface defined here; the handler probes
|
|
// for it via a type assertion and falls back to the one-shot RunTurn
|
|
// when the backend doesn't support streaming (legacy path).
|
|
//
|
|
// Recovery (a separate axis): when the transport drops mid-turn,
|
|
// the AichatRecoverer interface lets the handler ask the backend to
|
|
// look up the late response via aichat's conversation API rather than
|
|
// rely on the legacy filesystem janitor — which only knows about
|
|
// LocalPaliadinService's per-turn response files.
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// StreamEvent is one increment of a streaming turn. The handler
|
|
// receives these via the channel passed to RunTurnStream and forwards
|
|
// them as SSE frames to the browser.
|
|
//
|
|
// Exactly one of Kind's payloads is meaningful per event:
|
|
//
|
|
// StreamChunk → Content holds the next slice of assistant text
|
|
// StreamHeartbeat → ElapsedSeconds holds upstream "still thinking" tick
|
|
// StreamMeta → UsedTools / RowsSeen / ClassifierTag populated
|
|
// StreamError → Code / Message / Retryable populated
|
|
//
|
|
// StreamDone is implicit: when the channel closes without an error
|
|
// event, the turn completed. The accompanying *TurnResult returned by
|
|
// RunTurnStream carries the final accumulated body + meta + conversation
|
|
// id for persistence and recovery.
|
|
type StreamEvent struct {
|
|
Kind StreamEventKind
|
|
|
|
// StreamChunk
|
|
Content string
|
|
|
|
// StreamHeartbeat
|
|
ElapsedSeconds int
|
|
|
|
// StreamMeta (terminal-side; may also be merged into final TurnResult)
|
|
UsedTools []string
|
|
RowsSeen []int
|
|
ClassifierTag string
|
|
|
|
// StreamError
|
|
Code string
|
|
Message string
|
|
Retryable bool
|
|
|
|
// StreamConversation — aichat sometimes resolves the conversation id
|
|
// before the first chunk arrives. We surface it as soon as we have
|
|
// it so the handler can persist it for recovery, even if the stream
|
|
// is later interrupted.
|
|
ConversationID string
|
|
}
|
|
|
|
// StreamEventKind enumerates the meaningful flavours.
|
|
type StreamEventKind string
|
|
|
|
const (
|
|
StreamChunk StreamEventKind = "chunk"
|
|
StreamHeartbeat StreamEventKind = "heartbeat"
|
|
StreamMeta StreamEventKind = "meta"
|
|
StreamError StreamEventKind = "error"
|
|
StreamConversation StreamEventKind = "conversation"
|
|
)
|
|
|
|
// StreamingPaliadin is the optional extension the AichatPaliadinService
|
|
// implements. Handlers detect it via type assertion; backends that don't
|
|
// implement it (the legacy local + remote paths) fall back to the
|
|
// one-shot Paliadin.RunTurn.
|
|
//
|
|
// Contract:
|
|
// - RunTurnStream MUST close `events` before returning, so the handler
|
|
// loop terminates cleanly.
|
|
// - Returning a non-nil error implies the audit row was already
|
|
// stamped with an error_code; the handler does not double-stamp.
|
|
// - The *TurnResult is populated even on partial failure when the
|
|
// upstream produced any meaningful body — handlers may render it as
|
|
// a salvaged best-effort result instead of an error.
|
|
type StreamingPaliadin interface {
|
|
Paliadin
|
|
|
|
// RunTurnStream drives one turn against the streaming upstream and
|
|
// pushes StreamEvents onto `events` as they arrive. Blocks until the
|
|
// upstream finishes or the context cancels. `events` is closed by
|
|
// the implementation before this method returns.
|
|
RunTurnStream(ctx context.Context, req TurnRequest, events chan<- StreamEvent) (*TurnResult, error)
|
|
}
|
|
|
|
// AichatRecoverer is the optional extension that knows how to ask the
|
|
// aichat backend "did this turn actually complete?" when paliad's local
|
|
// audit row never got a response (because the transport dropped mid
|
|
// turn). Implementations look up the persisted aichat_conversation_id,
|
|
// query aichat's GET /chat/conversations/{id}/turns, find the matching
|
|
// assistant turn, and write the response back to paliad's row.
|
|
//
|
|
// Returns (nil, nil) when aichat doesn't have the response either —
|
|
// i.e. the turn is truly lost and the UI must degrade to "verloren"
|
|
// copy rather than "wird nachgereicht".
|
|
type AichatRecoverer interface {
|
|
RecoverTurn(ctx context.Context, callerID, turnID uuid.UUID) (*PaliadinTurn, error)
|
|
}
|
|
|
|
// safeSendStream pushes an event onto the channel, dropping on context
|
|
// cancel. Mirrors the handler-side `send` helper but works against a
|
|
// generic chan StreamEvent.
|
|
func safeSendStream(ctx context.Context, ch chan<- StreamEvent, ev StreamEvent) {
|
|
select {
|
|
case ch <- ev:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|