Files
paliad/frontend/src/client/paliadin-late-poll.ts
mAi cdd27d674e feat(paliadin): stream + honest late-recovery (t-paliad-235)
m's 14:56 observation: long Paliadin turns showed "Verbindung verloren —
Antwort wird nachgereicht …" but never delivered. The aichat backend
finished the turn upstream; paliad's HTTP client had given up at 130 s
and the legacy filesystem janitor never ran for the aichat path.

Three intertwined fixes, all shipped together because they share the
same wire shape and the same UI states:

1. Switch the aichat backend to /chat/turn/stream
   - new AichatPaliadinService.RunTurnStream relays incremental chunks
   - SSE parser handles default `data:` frames (chunk/meta/done/error)
     and named `event: heartbeat` frames per the upstream contract
   - no more 130 s hard ceiling — stream stays open as long as data or
     heartbeats flow; silenceTimeout (90 s) catches a true upstream
     stall instead

2. Proof-of-life thinking events
   - handler emits `event: thinking` every 5 s while the upstream is
     silent (synthesised locally) AND relays aichat's `heartbeat`
     events as thinking pings
   - frontend renders a lime-dot pulse + monospace counter inside the
     assistant bubble — the user can SEE the chat is still working

3. Honest disconnect copy + real late-recovery
   - new dispatching endpoint GET /api/paliadin/turns/{id}/recover
   - aichat backend: asks aichat via GET /chat/conversations and
     /chat/conversations/{id}/turns whether the turn actually finished
   - legacy backend: falls through to the local row read (janitor)
   - frontend swaps "wird nachgereicht" → "Lade frische Antwort …"
     while the recovery polls; on confirmed "lost" swaps to
     "Antwort konnte nicht zugestellt werden — bitte erneut stellen"
   - migration 118 adds aichat_conversation_id to paliadin_turns so
     the recovery has a fast path when the done frame arrived before
     the drop

Streaming + recovery are a no-op for PALIADIN_BACKEND=legacy: the
StreamingPaliadin interface is detected via type assertion, the
LocalPaliadinService stays on the one-shot RunTurn + filesystem
janitor path.

13 new unit tests cover the SSE parser, the conversation-API client,
and the match-assistant-response helper.

go build ./... + go test ./internal/... + go test ./cmd/server/...
+ bun run build all clean.
2026-05-22 15:17:24 +02:00

133 lines
4.1 KiB
TypeScript

// Late-response polling (t-paliad-235 rewrite).
//
// When the SSE stream closes mid-turn with an error event, the bubble
// can't tell from the wire whether (a) the upstream is still finishing
// the turn and we just lost transport, or (b) the upstream is truly
// dead.
//
// This module hits the dispatching recovery endpoint
// `/api/paliadin/turns/{id}/recover`, which knows the active backend:
//
// - aichat backend → asks aichat via its conversation API whether
// the turn actually completed upstream
// - legacy backend → reads the local row (paliad's filesystem
// janitor patches it when claude writes the
// response file late)
//
// The endpoint returns:
//
// recovery_state="recovered" → response is in the payload, render it
// recovery_state="pending" → keep polling
// recovery_state="lost" → upstream is truly gone, give up
export interface LateTurn {
turn_id: string;
response: string | null;
error_code: string | null;
finished_at: string | null;
duration_ms: number | null;
used_tools: string[];
rows_seen: number[];
chip_count: number;
classifier_tag: string | null;
}
export interface LatePollOptions {
turnId: string;
intervalMs?: number; // default 3000
maxDurationMs?: number; // default 600000 (10 min)
onLateResponse: (turn: LateTurn) => void;
// onLost — backend confirmed the turn is unrecoverable. Caller should
// swap the bubble copy to the "verloren" string. Distinct from
// onGiveUp (which fires only on the local timeout).
onLost?: () => void;
onGiveUp?: () => void;
}
export interface LatePollHandle {
cancel: () => void;
}
interface RecoverResponse {
turn_id: string;
started_at: string;
response: string | null;
error_code: string | null;
finished_at: string | null;
duration_ms: number | null;
used_tools: string[];
rows_seen: number[];
chip_count: number;
classifier_tag: string | null;
recovery_state: "recovered" | "pending" | "lost";
}
export function pollForLateResponse(opts: LatePollOptions): LatePollHandle {
const interval = opts.intervalMs ?? 3000;
const maxDuration = opts.maxDurationMs ?? 10 * 60 * 1000;
const startedAt = Date.now();
let cancelled = false;
let timer: number | undefined;
const tick = async () => {
if (cancelled) return;
if (Date.now() - startedAt > maxDuration) {
opts.onGiveUp?.();
return;
}
try {
const r = await fetch(`/api/paliadin/turns/${opts.turnId}/recover`, {
credentials: "same-origin",
});
if (r.ok) {
const body = (await r.json()) as RecoverResponse;
if (body.recovery_state === "recovered" && body.response) {
opts.onLateResponse(toLateTurn(body));
return;
}
if (body.recovery_state === "lost") {
opts.onLost?.();
return;
}
// pending — keep polling
} else if (r.status === 404) {
// Row gone — give up. Different signal from `lost`: a missing row
// is a paliad-side bookkeeping problem; aichat may still have the
// answer but we can't surface it without the row.
opts.onGiveUp?.();
return;
}
} catch {
// Transient network error; retry on next tick.
}
timer = window.setTimeout(tick, interval);
};
// First poll deliberately runs after one interval so we don't race
// the dispatch endpoint on the very first tick (gives the upstream a
// moment to actually settle the row after the stream drop).
timer = window.setTimeout(tick, interval);
return {
cancel: () => {
cancelled = true;
if (timer != null) window.clearTimeout(timer);
},
};
}
function toLateTurn(body: RecoverResponse): LateTurn {
return {
turn_id: body.turn_id,
response: body.response,
error_code: body.error_code,
finished_at: body.finished_at,
duration_ms: body.duration_ms,
used_tools: body.used_tools ?? [],
rows_seen: body.rows_seen ?? [],
chip_count: body.chip_count ?? 0,
classifier_tag: body.classifier_tag,
};
}