feat(paliadin): stream + honest late-recovery (t-paliad-235)
m's 14:56 observation: long Paliadin turns showed "Verbindung verloren —
Antwort wird nachgereicht …" but never delivered. The aichat backend
finished the turn upstream; paliad's HTTP client had given up at 130 s
and the legacy filesystem janitor never ran for the aichat path.
Three intertwined fixes, all shipped together because they share the
same wire shape and the same UI states:
1. Switch the aichat backend to /chat/turn/stream
- new AichatPaliadinService.RunTurnStream relays incremental chunks
- SSE parser handles default `data:` frames (chunk/meta/done/error)
and named `event: heartbeat` frames per the upstream contract
- no more 130 s hard ceiling — stream stays open as long as data or
heartbeats flow; silenceTimeout (90 s) catches a true upstream
stall instead
2. Proof-of-life thinking events
- handler emits `event: thinking` every 5 s while the upstream is
silent (synthesised locally) AND relays aichat's `heartbeat`
events as thinking pings
- frontend renders a lime-dot pulse + monospace counter inside the
assistant bubble — the user can SEE the chat is still working
3. Honest disconnect copy + real late-recovery
- new dispatching endpoint GET /api/paliadin/turns/{id}/recover
- aichat backend: asks aichat via GET /chat/conversations and
/chat/conversations/{id}/turns whether the turn actually finished
- legacy backend: falls through to the local row read (janitor)
- frontend swaps "wird nachgereicht" → "Lade frische Antwort …"
while the recovery polls; on confirmed "lost" swaps to
"Antwort konnte nicht zugestellt werden — bitte erneut stellen"
- migration 118 adds aichat_conversation_id to paliadin_turns so
the recovery has a fast path when the done frame arrived before
the drop
Streaming + recovery are a no-op for PALIADIN_BACKEND=legacy: the
StreamingPaliadin interface is detected via type assertion, the
LocalPaliadinService stays on the one-shot RunTurn + filesystem
janitor path.
13 new unit tests cover the SSE parser, the conversation-API client,
and the match-assistant-response helper.
go build ./... + go test ./internal/... + go test ./cmd/server/...
+ bun run build all clean.
This commit is contained in:
@@ -2048,8 +2048,13 @@ const translations: Record<Lang, Record<string, string>> = {
|
||||
"paliadin.error.timeout": "Paliadin antwortet nicht (Timeout 60s). Nochmal versuchen.",
|
||||
"paliadin.error.connection_lost": "Verbindung verloren.",
|
||||
"paliadin.error.upstream": "Fehler beim Senden.",
|
||||
"paliadin.error.upstream_silence": "Paliadin meldet sich nicht mehr — Verbindung wird beendet.",
|
||||
"paliadin.late.waiting": "Antwort wird nachgereicht, sobald sie eintrifft …",
|
||||
"paliadin.late.checking": "Verbindung verloren — Paliadin denkt vielleicht noch. Lade frische Antwort …",
|
||||
"paliadin.late.lost": "Antwort konnte nicht zugestellt werden — bitte Frage erneut stellen.",
|
||||
"paliadin.late.marker": "verspätet",
|
||||
"paliadin.thinking": "Paliadin denkt nach",
|
||||
"paliadin.thinking.seconds": "{seconds}s",
|
||||
"paliadin.widget.title": "Paliadin",
|
||||
"paliadin.widget.trigger": "Paliadin (Cmd+J)",
|
||||
"paliadin.widget.empty": "Was kann ich für dich tun?",
|
||||
@@ -4907,8 +4912,13 @@ const translations: Record<Lang, Record<string, string>> = {
|
||||
"paliadin.error.timeout": "Paliadin didn't respond in time (60s). Try again.",
|
||||
"paliadin.error.connection_lost": "Connection lost.",
|
||||
"paliadin.error.upstream": "Send failed.",
|
||||
"paliadin.error.upstream_silence": "Paliadin went silent — closing the connection.",
|
||||
"paliadin.late.waiting": "Will fill in the response when it arrives …",
|
||||
"paliadin.late.checking": "Connection lost — Paliadin may still be thinking. Fetching fresh answer …",
|
||||
"paliadin.late.lost": "Answer couldn't be delivered — please ask again.",
|
||||
"paliadin.late.marker": "late",
|
||||
"paliadin.thinking": "Paliadin is thinking",
|
||||
"paliadin.thinking.seconds": "{seconds}s",
|
||||
"paliadin.widget.title": "Paliadin",
|
||||
"paliadin.widget.trigger": "Paliadin (Cmd+J)",
|
||||
"paliadin.widget.empty": "What can I help you with?",
|
||||
|
||||
@@ -1,15 +1,24 @@
|
||||
// Late-response polling. The Go backend's pollForResponse window is
|
||||
// 60 s; if Claude writes the response file after that (because the
|
||||
// tmux pane was busy mid-turn when the message arrived), the SSE
|
||||
// stream has already closed with an `error` event. The Janitor
|
||||
// (services.LocalPaliadinService.runJanitor) then patches the
|
||||
// paliadin_turns row when the file lands.
|
||||
// Late-response polling (t-paliad-235 rewrite).
|
||||
//
|
||||
// This module is the FE half of that loop: after the bubble shows an
|
||||
// error, the caller registers the turn here. We poll
|
||||
// `/api/paliadin/turns/{id}` every 3 s for up to 10 minutes; once the
|
||||
// row has a non-empty response, we hand it back so the caller can
|
||||
// swap the bubble content in place.
|
||||
// When the SSE stream closes mid-turn with an error event, the bubble
|
||||
// can't tell from the wire whether (a) the upstream is still finishing
|
||||
// the turn and we just lost transport, or (b) the upstream is truly
|
||||
// dead.
|
||||
//
|
||||
// This module hits the dispatching recovery endpoint
|
||||
// `/api/paliadin/turns/{id}/recover`, which knows the active backend:
|
||||
//
|
||||
// - aichat backend → asks aichat via its conversation API whether
|
||||
// the turn actually completed upstream
|
||||
// - legacy backend → reads the local row (paliad's filesystem
|
||||
// janitor patches it when claude writes the
|
||||
// response file late)
|
||||
//
|
||||
// The endpoint returns:
|
||||
//
|
||||
// recovery_state="recovered" → response is in the payload, render it
|
||||
// recovery_state="pending" → keep polling
|
||||
// recovery_state="lost" → upstream is truly gone, give up
|
||||
|
||||
export interface LateTurn {
|
||||
turn_id: string;
|
||||
@@ -28,6 +37,10 @@ export interface LatePollOptions {
|
||||
intervalMs?: number; // default 3000
|
||||
maxDurationMs?: number; // default 600000 (10 min)
|
||||
onLateResponse: (turn: LateTurn) => void;
|
||||
// onLost — backend confirmed the turn is unrecoverable. Caller should
|
||||
// swap the bubble copy to the "verloren" string. Distinct from
|
||||
// onGiveUp (which fires only on the local timeout).
|
||||
onLost?: () => void;
|
||||
onGiveUp?: () => void;
|
||||
}
|
||||
|
||||
@@ -35,6 +48,20 @@ export interface LatePollHandle {
|
||||
cancel: () => void;
|
||||
}
|
||||
|
||||
interface RecoverResponse {
|
||||
turn_id: string;
|
||||
started_at: string;
|
||||
response: string | null;
|
||||
error_code: string | null;
|
||||
finished_at: string | null;
|
||||
duration_ms: number | null;
|
||||
used_tools: string[];
|
||||
rows_seen: number[];
|
||||
chip_count: number;
|
||||
classifier_tag: string | null;
|
||||
recovery_state: "recovered" | "pending" | "lost";
|
||||
}
|
||||
|
||||
export function pollForLateResponse(opts: LatePollOptions): LatePollHandle {
|
||||
const interval = opts.intervalMs ?? 3000;
|
||||
const maxDuration = opts.maxDurationMs ?? 10 * 60 * 1000;
|
||||
@@ -50,18 +77,24 @@ export function pollForLateResponse(opts: LatePollOptions): LatePollHandle {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const r = await fetch(`/api/paliadin/turns/${opts.turnId}`, {
|
||||
const r = await fetch(`/api/paliadin/turns/${opts.turnId}/recover`, {
|
||||
credentials: "same-origin",
|
||||
});
|
||||
if (r.ok) {
|
||||
const turn = (await r.json()) as LateTurn;
|
||||
if (turn.response && turn.response.length > 0) {
|
||||
opts.onLateResponse(turn);
|
||||
const body = (await r.json()) as RecoverResponse;
|
||||
if (body.recovery_state === "recovered" && body.response) {
|
||||
opts.onLateResponse(toLateTurn(body));
|
||||
return;
|
||||
}
|
||||
}
|
||||
// 404: row gone (very unlikely) — give up.
|
||||
if (r.status === 404) {
|
||||
if (body.recovery_state === "lost") {
|
||||
opts.onLost?.();
|
||||
return;
|
||||
}
|
||||
// pending — keep polling
|
||||
} else if (r.status === 404) {
|
||||
// Row gone — give up. Different signal from `lost`: a missing row
|
||||
// is a paliad-side bookkeeping problem; aichat may still have the
|
||||
// answer but we can't surface it without the row.
|
||||
opts.onGiveUp?.();
|
||||
return;
|
||||
}
|
||||
@@ -72,7 +105,8 @@ export function pollForLateResponse(opts: LatePollOptions): LatePollHandle {
|
||||
};
|
||||
|
||||
// First poll deliberately runs after one interval so we don't race
|
||||
// the 60 s timeout on the very first tick.
|
||||
// the dispatch endpoint on the very first tick (gives the upstream a
|
||||
// moment to actually settle the row after the stream drop).
|
||||
timer = window.setTimeout(tick, interval);
|
||||
|
||||
return {
|
||||
@@ -82,3 +116,17 @@ export function pollForLateResponse(opts: LatePollOptions): LatePollHandle {
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function toLateTurn(body: RecoverResponse): LateTurn {
|
||||
return {
|
||||
turn_id: body.turn_id,
|
||||
response: body.response,
|
||||
error_code: body.error_code,
|
||||
finished_at: body.finished_at,
|
||||
duration_ms: body.duration_ms,
|
||||
used_tools: body.used_tools ?? [],
|
||||
rows_seen: body.rows_seen ?? [],
|
||||
chip_count: body.chip_count ?? 0,
|
||||
classifier_tag: body.classifier_tag,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -381,11 +381,32 @@ async function sendTurn(): Promise<void> {
|
||||
const es = new EventSource(turnRes.sse_url);
|
||||
activeStream = es;
|
||||
|
||||
startWidgetThinking(placeholder);
|
||||
|
||||
let fullText = "";
|
||||
es.addEventListener("thinking", (ev) => {
|
||||
let elapsed = 0;
|
||||
try {
|
||||
const data = JSON.parse((ev as MessageEvent).data || "{}");
|
||||
if (typeof data.elapsed_seconds === "number") elapsed = data.elapsed_seconds;
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
updateWidgetThinking(placeholder, elapsed);
|
||||
});
|
||||
es.addEventListener("content", (ev) => {
|
||||
try {
|
||||
const data = JSON.parse((ev as MessageEvent).data);
|
||||
if (typeof data.delta === "string" && data.delta) {
|
||||
// Streamed delta (aichat backend) — append.
|
||||
stopWidgetThinking(placeholder);
|
||||
fullText += data.delta;
|
||||
setBubbleText(placeholder, fullText);
|
||||
return;
|
||||
}
|
||||
// Legacy one-shot full-text payload.
|
||||
fullText = String(data.text || "");
|
||||
stopWidgetThinking(placeholder);
|
||||
setBubbleText(placeholder, fullText);
|
||||
} catch {
|
||||
/* ignore parse error */
|
||||
@@ -393,13 +414,15 @@ async function sendTurn(): Promise<void> {
|
||||
});
|
||||
es.addEventListener("end", () => {
|
||||
placeholder.dataset.streaming = "false";
|
||||
stopWidgetThinking(placeholder);
|
||||
history.push({ role: "assistant", text: fullText || "", ts: new Date().toISOString() });
|
||||
saveHistory();
|
||||
cleanupStream();
|
||||
});
|
||||
es.addEventListener("error", () => {
|
||||
stopWidgetThinking(placeholder);
|
||||
const errText = t("paliadin.error.connection_lost");
|
||||
setBubbleText(placeholder, errText + " " + t("paliadin.late.waiting"));
|
||||
setBubbleText(placeholder, errText + " " + t("paliadin.late.checking"));
|
||||
placeholder.classList.add("paliadin-widget-bubble--error");
|
||||
placeholder.classList.add("paliadin-widget-bubble--late-pending");
|
||||
placeholder.dataset.streaming = "false";
|
||||
@@ -412,6 +435,39 @@ async function sendTurn(): Promise<void> {
|
||||
});
|
||||
}
|
||||
|
||||
function startWidgetThinking(bubble: HTMLElement): void {
|
||||
if (bubble.querySelector(".paliadin-widget-thinking")) return;
|
||||
// Clear the static placeholder text — the live pulse + counter is
|
||||
// the canonical "denkt nach" signal.
|
||||
const textNode = bubble.querySelector(".paliadin-widget-bubble-text");
|
||||
if (textNode) textNode.textContent = "";
|
||||
const node = document.createElement("div");
|
||||
node.className = "paliadin-widget-thinking";
|
||||
node.innerHTML = `
|
||||
<span class="paliadin-widget-thinking-dot" aria-hidden="true"></span>
|
||||
<span class="paliadin-widget-thinking-label"></span>
|
||||
<span class="paliadin-widget-thinking-elapsed"></span>
|
||||
`;
|
||||
const label = node.querySelector(".paliadin-widget-thinking-label")!;
|
||||
label.textContent = t("paliadin.thinking");
|
||||
bubble.appendChild(node);
|
||||
updateWidgetThinking(bubble, 0);
|
||||
}
|
||||
|
||||
function updateWidgetThinking(bubble: HTMLElement, elapsedSeconds: number): void {
|
||||
const node = bubble.querySelector(".paliadin-widget-thinking") as HTMLElement | null;
|
||||
if (!node) return;
|
||||
const elapsed = node.querySelector(".paliadin-widget-thinking-elapsed");
|
||||
if (elapsed) {
|
||||
const s = elapsedSeconds < 0 ? 0 : Math.round(elapsedSeconds);
|
||||
elapsed.textContent = t("paliadin.thinking.seconds").replace("{seconds}", String(s));
|
||||
}
|
||||
}
|
||||
|
||||
function stopWidgetThinking(bubble: HTMLElement): void {
|
||||
bubble.querySelector(".paliadin-widget-thinking")?.remove();
|
||||
}
|
||||
|
||||
function cleanupStream(): void {
|
||||
activeStream?.close();
|
||||
activeStream = null;
|
||||
@@ -427,13 +483,24 @@ function startWidgetLatePoll(turnId: string, bubble: HTMLElement): void {
|
||||
lateWidgetPolls.delete(turnId);
|
||||
applyWidgetLateResponse(bubble, turn);
|
||||
},
|
||||
onLost: () => {
|
||||
lateWidgetPolls.delete(turnId);
|
||||
applyWidgetLost(bubble);
|
||||
},
|
||||
onGiveUp: () => {
|
||||
lateWidgetPolls.delete(turnId);
|
||||
applyWidgetLost(bubble);
|
||||
},
|
||||
});
|
||||
lateWidgetPolls.set(turnId, handle);
|
||||
}
|
||||
|
||||
function applyWidgetLost(bubble: HTMLElement): void {
|
||||
bubble.classList.remove("paliadin-widget-bubble--late-pending");
|
||||
bubble.classList.add("paliadin-widget-bubble--lost");
|
||||
setBubbleText(bubble, t("paliadin.late.lost"));
|
||||
}
|
||||
|
||||
function applyWidgetLateResponse(bubble: HTMLElement, turn: LateTurn): void {
|
||||
if (!turn.response) return;
|
||||
bubble.classList.remove(
|
||||
|
||||
@@ -3,16 +3,25 @@ import { initSidebar } from "./sidebar";
|
||||
import { renderResponseHTML } from "./paliadin-render";
|
||||
import { pollForLateResponse, type LateTurn, type LatePollHandle } from "./paliadin-late-poll";
|
||||
|
||||
// Paliadin chat panel client (t-paliad-146 PoC).
|
||||
// Paliadin chat panel client (t-paliad-146 PoC, streaming upgrade
|
||||
// t-paliad-235).
|
||||
//
|
||||
// State machine: empty → typing → sending → streaming → done.
|
||||
// State machine: empty → typing → sending → thinking → streaming → done.
|
||||
// History lives in localStorage under "paliadin:history:<sessionId>"
|
||||
// — design §0.5.4 session-only persistence.
|
||||
//
|
||||
// SSE consumer subscribes to `event: meta`, `event: content`,
|
||||
// `event: end`, `event: error`, `event: ping`. Backend currently
|
||||
// emits one `content` blob per turn (real chunked streaming is
|
||||
// production-v1; PoC simulates with a typewriter effect).
|
||||
// `event: thinking`, `event: end`, `event: error`, `event: ping`.
|
||||
//
|
||||
// `content` events from the aichat backend arrive as incremental
|
||||
// `{delta: "..."}` chunks; the bubble accumulates them in real time —
|
||||
// no typewriter simulation needed. Legacy backends still emit a single
|
||||
// `{text: "..."}` payload and we fall back to the typewriter for that
|
||||
// shape.
|
||||
//
|
||||
// `thinking` events fire while the upstream is alive but hasn't
|
||||
// produced content yet (or stalled mid-stream); the bubble renders a
|
||||
// pulse + counter so the user can SEE the chat is still working.
|
||||
|
||||
interface HistoryEntry {
|
||||
role: "user" | "assistant";
|
||||
@@ -167,25 +176,53 @@ async function sendTurn(text: string): Promise<void> {
|
||||
const es = new EventSource(turnRes.sse_url);
|
||||
currentEventSource = es;
|
||||
|
||||
// Show the thinking pulse immediately — the placeholder text already
|
||||
// says "denkt nach", but the visible pulse + counter is the live
|
||||
// proof-of-life signal m needs to trust that the chat is working.
|
||||
startThinkingIndicator(placeholder);
|
||||
// Reset the streamed accumulator for this turn.
|
||||
placeholder.dataset.fullText = "";
|
||||
|
||||
es.addEventListener("meta", () => {
|
||||
// Could surface a "thinking" indicator; placeholder text already does.
|
||||
});
|
||||
|
||||
es.addEventListener("thinking", (ev) => {
|
||||
let elapsed = 0;
|
||||
try {
|
||||
const data = JSON.parse((ev as MessageEvent).data || "{}");
|
||||
if (typeof data.elapsed_seconds === "number") {
|
||||
elapsed = data.elapsed_seconds;
|
||||
}
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
updateThinkingIndicator(placeholder, elapsed);
|
||||
});
|
||||
|
||||
es.addEventListener("content", (ev) => {
|
||||
const data = JSON.parse((ev as MessageEvent).data);
|
||||
const delta = typeof data.delta === "string" ? data.delta : "";
|
||||
if (delta) {
|
||||
// Aichat streaming path — accumulate the delta into the bubble.
|
||||
stopThinkingIndicator(placeholder);
|
||||
const current = placeholder.dataset.fullText ?? "";
|
||||
const next = current + delta;
|
||||
placeholder.dataset.fullText = next;
|
||||
writeStreamedText(placeholder, next);
|
||||
return;
|
||||
}
|
||||
// Legacy one-shot path — full body in `text`.
|
||||
const text = String(data.text || "");
|
||||
// Cache the full text on the bubble so finishBubble can render the
|
||||
// complete response even when the typewriter is mid-flight when end
|
||||
// arrives. textContent reflects only what's been typed so far and
|
||||
// would otherwise truncate the rendered Markdown (m, 2026-05-08 —
|
||||
// saw "## Proje" instead of the full 1408-byte body).
|
||||
placeholder.dataset.fullText = text;
|
||||
stopThinkingIndicator(placeholder);
|
||||
typewriter(placeholder, text);
|
||||
});
|
||||
|
||||
es.addEventListener("end", (ev) => {
|
||||
const data = JSON.parse((ev as MessageEvent).data);
|
||||
placeholder.dataset.streaming = "false";
|
||||
stopThinkingIndicator(placeholder);
|
||||
finishBubble(placeholder, data);
|
||||
history.push({
|
||||
role: "assistant",
|
||||
@@ -210,12 +247,12 @@ async function sendTurn(text: string): Promise<void> {
|
||||
|
||||
es.addEventListener("error", (ev) => {
|
||||
const errText = friendlyErrorMessage((ev as MessageEvent).data);
|
||||
// Annotate the error bubble with a "warten auf späte Antwort" hint
|
||||
// so m knows the turn isn't dead; if Claude finishes after the
|
||||
// 60 s window the Janitor (services.LocalPaliadinService.runJanitor)
|
||||
// patches the row and pollForLateResponse swaps in the real reply.
|
||||
stopThinkingIndicator(placeholder);
|
||||
// Honest copy: we don't claim "nachgereicht" because the recovery
|
||||
// path may report "lost". Frame it as "checking" while we ask the
|
||||
// backend whether the turn actually completed upstream.
|
||||
placeholder.querySelector(".paliadin-bubble-text")!.textContent =
|
||||
errText + " " + t("paliadin.late.waiting");
|
||||
errText + " " + t("paliadin.late.checking");
|
||||
placeholder.classList.add("paliadin-bubble--error");
|
||||
placeholder.classList.add("paliadin-bubble--late-pending");
|
||||
placeholder.dataset.streaming = "false";
|
||||
@@ -232,6 +269,65 @@ async function sendTurn(text: string): Promise<void> {
|
||||
});
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// thinking indicator — proof-of-life pulse + elapsed counter
|
||||
// =============================================================================
|
||||
|
||||
function startThinkingIndicator(bubble: HTMLElement): void {
|
||||
// Append a thinking node next to the bubble text (sibling, so the
|
||||
// typewriter rewriting text content doesn't clobber it). The node
|
||||
// shows a pulse dot + the elapsed counter.
|
||||
let node = bubble.querySelector(".paliadin-thinking") as HTMLElement | null;
|
||||
if (node) return; // already running
|
||||
// Clear the static placeholder text — the live pulse + counter is
|
||||
// now the canonical "denkt nach" signal. Leaving the text in place
|
||||
// would render the same phrase twice.
|
||||
const textNode = bubble.querySelector(".paliadin-bubble-text");
|
||||
if (textNode) textNode.textContent = "";
|
||||
node = document.createElement("div");
|
||||
node.className = "paliadin-thinking";
|
||||
node.innerHTML = `
|
||||
<span class="paliadin-thinking-dot" aria-hidden="true"></span>
|
||||
<span class="paliadin-thinking-label"></span>
|
||||
<span class="paliadin-thinking-elapsed"></span>
|
||||
`;
|
||||
const label = node.querySelector(".paliadin-thinking-label")!;
|
||||
label.textContent = t("paliadin.thinking");
|
||||
bubble.appendChild(node);
|
||||
// Initial 0s — replaced as soon as a thinking event arrives or our
|
||||
// local ticker fires.
|
||||
updateThinkingIndicator(bubble, 0);
|
||||
}
|
||||
|
||||
function updateThinkingIndicator(bubble: HTMLElement, elapsedSeconds: number): void {
|
||||
const node = bubble.querySelector(".paliadin-thinking") as HTMLElement | null;
|
||||
if (!node) return;
|
||||
const elapsed = node.querySelector(".paliadin-thinking-elapsed");
|
||||
if (elapsed) {
|
||||
elapsed.textContent = formatThinkingSeconds(elapsedSeconds);
|
||||
}
|
||||
}
|
||||
|
||||
function stopThinkingIndicator(bubble: HTMLElement): void {
|
||||
bubble.querySelector(".paliadin-thinking")?.remove();
|
||||
}
|
||||
|
||||
function formatThinkingSeconds(s: number): string {
|
||||
if (s < 0) s = 0;
|
||||
return t("paliadin.thinking.seconds").replace("{seconds}", String(Math.round(s)));
|
||||
}
|
||||
|
||||
// writeStreamedText fills the bubble with raw text as it accumulates.
|
||||
// Cheaper than the typewriter — we already have the real cadence from
|
||||
// the wire, no need to simulate it.
|
||||
function writeStreamedText(bubble: HTMLElement, text: string): void {
|
||||
const node = bubble.querySelector(".paliadin-bubble-text");
|
||||
if (!node) return;
|
||||
node.textContent = text;
|
||||
const stream = document.getElementById("paliadin-stream");
|
||||
if (stream) stream.scrollTop = stream.scrollHeight;
|
||||
}
|
||||
|
||||
// Server emits SSE error events as JSON `{code, message}`. Map known
|
||||
// codes to localised, user-friendly text; fall through to a generic
|
||||
// "connection lost" for anything we don't recognise (including raw
|
||||
@@ -361,11 +457,12 @@ function finishBubble(bubble: HTMLElement, data: any): void {
|
||||
}
|
||||
|
||||
|
||||
// startLatePoll registers the Janitor-patched row poller for one
|
||||
// errored turn. When the row gains a response we swap the bubble's
|
||||
// content + drop the error class + retroactively replace the history
|
||||
// entry (which was never written for the failed turn — append now so
|
||||
// reload renders the late reply).
|
||||
// startLatePoll registers the recovery-endpoint poller for one errored
|
||||
// turn. When the row gains a response we swap the bubble's content +
|
||||
// drop the error class + retroactively replace the history entry
|
||||
// (which was never written for the failed turn — append now so reload
|
||||
// renders the late reply). When the backend confirms the turn is
|
||||
// "lost", we swap the bubble to the honest "verloren" copy.
|
||||
function startLatePoll(turnId: string, bubble: HTMLElement): void {
|
||||
// Avoid duplicate pollers for the same turn (e.g. SSE error fires
|
||||
// twice in some browsers when the connection drops).
|
||||
@@ -376,13 +473,25 @@ function startLatePoll(turnId: string, bubble: HTMLElement): void {
|
||||
latePolls.delete(turnId);
|
||||
applyLateResponse(bubble, turn);
|
||||
},
|
||||
onLost: () => {
|
||||
latePolls.delete(turnId);
|
||||
applyLostResponse(bubble);
|
||||
},
|
||||
onGiveUp: () => {
|
||||
latePolls.delete(turnId);
|
||||
applyLostResponse(bubble);
|
||||
},
|
||||
});
|
||||
latePolls.set(turnId, handle);
|
||||
}
|
||||
|
||||
function applyLostResponse(bubble: HTMLElement): void {
|
||||
bubble.classList.remove("paliadin-bubble--late-pending");
|
||||
bubble.classList.add("paliadin-bubble--lost");
|
||||
const node = bubble.querySelector(".paliadin-bubble-text");
|
||||
if (node) node.textContent = t("paliadin.late.lost");
|
||||
}
|
||||
|
||||
function applyLateResponse(bubble: HTMLElement, turn: LateTurn): void {
|
||||
if (!turn.response) return;
|
||||
bubble.classList.remove("paliadin-bubble--error", "paliadin-bubble--late-pending");
|
||||
|
||||
@@ -1985,8 +1985,11 @@ export type I18nKey =
|
||||
| "paliadin.error.shim_error"
|
||||
| "paliadin.error.timeout"
|
||||
| "paliadin.error.upstream"
|
||||
| "paliadin.error.upstream_silence"
|
||||
| "paliadin.heading"
|
||||
| "paliadin.input.placeholder"
|
||||
| "paliadin.late.checking"
|
||||
| "paliadin.late.lost"
|
||||
| "paliadin.late.marker"
|
||||
| "paliadin.late.waiting"
|
||||
| "paliadin.reset"
|
||||
@@ -1996,6 +1999,8 @@ export type I18nKey =
|
||||
| "paliadin.starter.week"
|
||||
| "paliadin.stop"
|
||||
| "paliadin.tagline"
|
||||
| "paliadin.thinking"
|
||||
| "paliadin.thinking.seconds"
|
||||
| "paliadin.title"
|
||||
| "paliadin.widget.close"
|
||||
| "paliadin.widget.context.on_page"
|
||||
|
||||
@@ -13353,6 +13353,48 @@ dialog.quick-add-sheet::backdrop {
|
||||
font-style: italic;
|
||||
}
|
||||
|
||||
/* lost: backend confirmed the turn is unrecoverable (t-paliad-235).
|
||||
Different from error: the upstream had a chance to finish but the
|
||||
conversation lookup didn't find a response — show the honest
|
||||
"verloren" copy. */
|
||||
.paliadin-bubble--lost {
|
||||
color: var(--status-red-fg);
|
||||
border-color: var(--status-red-border);
|
||||
background: var(--status-red-bg);
|
||||
opacity: 0.9;
|
||||
}
|
||||
|
||||
/* Thinking indicator (t-paliad-235) — proof-of-life pulse + elapsed
|
||||
counter while the upstream is alive but no content has streamed
|
||||
yet. Lives as a sibling node inside the assistant bubble; removed
|
||||
once the first chunk arrives. */
|
||||
.paliadin-thinking {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 0.5rem;
|
||||
margin-top: 0.5rem;
|
||||
font-size: 0.8rem;
|
||||
color: var(--color-text-muted);
|
||||
font-family: monospace;
|
||||
}
|
||||
|
||||
.paliadin-thinking-dot {
|
||||
width: 0.5rem;
|
||||
height: 0.5rem;
|
||||
border-radius: 50%;
|
||||
background: var(--color-bg-lime);
|
||||
animation: paliadin-thinking-pulse 1.4s ease-in-out infinite;
|
||||
}
|
||||
|
||||
.paliadin-thinking-elapsed {
|
||||
font-variant-numeric: tabular-nums;
|
||||
}
|
||||
|
||||
@keyframes paliadin-thinking-pulse {
|
||||
0%, 100% { opacity: 0.4; transform: scale(0.9); }
|
||||
50% { opacity: 1.0; transform: scale(1.1); }
|
||||
}
|
||||
|
||||
.paliadin-bubble-role {
|
||||
font-size: 0.75rem;
|
||||
font-weight: 600;
|
||||
@@ -14718,6 +14760,55 @@ dialog.quick-add-sheet::backdrop {
|
||||
border: 1px solid var(--status-red-border, var(--color-border));
|
||||
}
|
||||
|
||||
/* late-pending: stream dropped, recovery endpoint still polling. */
|
||||
.paliadin-widget-bubble--late-pending {
|
||||
opacity: 0.85;
|
||||
}
|
||||
|
||||
/* late: response arrived after the stream closed. */
|
||||
.paliadin-widget-bubble--late {
|
||||
color: inherit;
|
||||
background: var(--color-surface);
|
||||
border: 1px solid var(--color-border);
|
||||
}
|
||||
|
||||
.paliadin-widget-bubble-late-tag {
|
||||
color: var(--color-text-muted);
|
||||
font-style: italic;
|
||||
margin-left: 0.25rem;
|
||||
}
|
||||
|
||||
/* lost: backend confirmed the turn is unrecoverable (t-paliad-235). */
|
||||
.paliadin-widget-bubble--lost {
|
||||
background: var(--status-red-bg, var(--color-surface-2));
|
||||
color: var(--status-red-fg, var(--color-text));
|
||||
border: 1px solid var(--status-red-border, var(--color-border));
|
||||
opacity: 0.9;
|
||||
}
|
||||
|
||||
/* Thinking indicator inside widget bubbles (t-paliad-235). */
|
||||
.paliadin-widget-thinking {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 0.5rem;
|
||||
margin-top: 0.5rem;
|
||||
font-size: 0.8rem;
|
||||
color: var(--color-text-muted);
|
||||
font-family: monospace;
|
||||
}
|
||||
|
||||
.paliadin-widget-thinking-dot {
|
||||
width: 0.5rem;
|
||||
height: 0.5rem;
|
||||
border-radius: 50%;
|
||||
background: var(--color-bg-lime);
|
||||
animation: paliadin-thinking-pulse 1.4s ease-in-out infinite;
|
||||
}
|
||||
|
||||
.paliadin-widget-thinking-elapsed {
|
||||
font-variant-numeric: tabular-nums;
|
||||
}
|
||||
|
||||
.paliadin-widget-form {
|
||||
display: flex;
|
||||
align-items: flex-end;
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE paliad.paliadin_turns
|
||||
DROP COLUMN IF EXISTS aichat_conversation_id;
|
||||
@@ -0,0 +1,14 @@
|
||||
-- t-paliad-235: track aichat conversation id on each paliadin turn so the
|
||||
-- recovery endpoint can ask aichat for the late-arriving response when
|
||||
-- paliad's stream connection drops mid-turn.
|
||||
--
|
||||
-- The PALIADIN_BACKEND=aichat path persists this from the upstream
|
||||
-- /chat/turn/stream `done` frame's conversation_id. PALIADIN_BACKEND=legacy
|
||||
-- turns leave it NULL — the filesystem janitor is still the recovery path
|
||||
-- there.
|
||||
|
||||
ALTER TABLE paliad.paliadin_turns
|
||||
ADD COLUMN IF NOT EXISTS aichat_conversation_id uuid;
|
||||
|
||||
COMMENT ON COLUMN paliad.paliadin_turns.aichat_conversation_id IS
|
||||
'Aichat backend conversation id (t-paliad-235). Set when the streaming /chat/turn/stream done frame arrives, or when the recovery endpoint asks aichat to disambiguate which conversation this turn lives in. NULL for legacy backend turns and for aichat turns that errored before the conversation id was resolved.';
|
||||
@@ -649,6 +649,11 @@ func Register(mux *http.ServeMux, client *auth.Client, giteaAPIToken string, svc
|
||||
protected.HandleFunc("POST /api/paliadin/turn", handlePaliadinTurn)
|
||||
protected.HandleFunc("GET /api/paliadin/stream/{id}", handlePaliadinStream)
|
||||
protected.HandleFunc("GET /api/paliadin/turns/{id}", handlePaliadinTurnGet)
|
||||
// Recovery endpoint (t-paliad-235): when the SSE stream drops mid-turn,
|
||||
// the frontend hits this to ask whether aichat actually finished the
|
||||
// turn upstream. Dispatches per backend — aichat hits the conversation
|
||||
// API; legacy backends fall through to the local row read + janitor.
|
||||
protected.HandleFunc("GET /api/paliadin/turns/{id}/recover", handlePaliadinTurnRecover)
|
||||
// Crash-resistant history hydrate (t-paliad-161 follow-up): both
|
||||
// Paliadin surfaces use this to seed their UI from the DB before
|
||||
// consulting localStorage.
|
||||
|
||||
@@ -185,16 +185,21 @@ func handlePaliadinTurn(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// runPaliadinTurnAsync executes the turn and writes events into ch.
|
||||
// Uses a 150 s hard timeout independently of the originating request,
|
||||
// which leaves headroom over the shim's 120 s run-turn cap + SSH
|
||||
// overhead (t-paliad-155: cold-start safety for skill + MCP discovery).
|
||||
//
|
||||
// Backend dispatch:
|
||||
// - StreamingPaliadin (aichat) → drives runStreamingTurn which relays
|
||||
// incremental chunks + upstream heartbeats. No hard ceiling on
|
||||
// stream duration; falls back to silence_timeout (silenceTimeout)
|
||||
// if the upstream goes dark.
|
||||
// - Plain Paliadin (legacy local/remote) → one-shot RunTurn with the
|
||||
// original 150 s ceiling (matches the shim's 120 s run-turn cap +
|
||||
// SSH overhead per t-paliad-155).
|
||||
func runPaliadinTurnAsync(turnID uuid.UUID, req services.TurnRequest, ch chan<- turnEvent) {
|
||||
defer func() {
|
||||
// Drain + close. The SSE handler reads until the channel closes.
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
// Send a meta event so the client can show "Paliadin denkt nach …"
|
||||
send(ch, turnEvent{
|
||||
Kind: "meta",
|
||||
Data: map[string]any{
|
||||
@@ -203,6 +208,16 @@ func runPaliadinTurnAsync(turnID uuid.UUID, req services.TurnRequest, ch chan<-
|
||||
},
|
||||
})
|
||||
|
||||
if streamer, ok := paliadinSvc.(services.StreamingPaliadin); ok {
|
||||
runStreamingTurn(turnID, req, ch, streamer)
|
||||
return
|
||||
}
|
||||
runOneShotTurn(turnID, req, ch)
|
||||
}
|
||||
|
||||
// runOneShotTurn drives the legacy synchronous backends (local-tmux PoC,
|
||||
// remote ssh+paliadin-shim). Preserves the original 150 s ceiling.
|
||||
func runOneShotTurn(turnID uuid.UUID, req services.TurnRequest, ch chan<- turnEvent) {
|
||||
ctx, cancel := newDetachedContext(150 * time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -220,9 +235,7 @@ func runPaliadinTurnAsync(turnID uuid.UUID, req services.TurnRequest, ch chan<-
|
||||
}
|
||||
|
||||
// One-shot content event with the full body. The frontend simulates
|
||||
// streaming with a typewriter effect (cf. design §0.5.5: real
|
||||
// chunked streaming would require Claude to write the response file
|
||||
// progressively — out of PoC scope).
|
||||
// streaming with a typewriter effect.
|
||||
send(ch, turnEvent{
|
||||
Kind: "content",
|
||||
Data: map[string]any{"text": result.Response},
|
||||
@@ -241,6 +254,224 @@ func runPaliadinTurnAsync(turnID uuid.UUID, req services.TurnRequest, ch chan<-
|
||||
})
|
||||
}
|
||||
|
||||
// silenceTimeout is the longest the aichat upstream may stay silent
|
||||
// (no chunk, no heartbeat) before runStreamingTurn gives up and fires
|
||||
// an error frame. 90 s comfortably exceeds aichat's 5 s heartbeat
|
||||
// cadence so a transient stall (model wedge, GC pause) doesn't kill
|
||||
// the turn, while still catching a hard upstream drop.
|
||||
const silenceTimeout = 90 * time.Second
|
||||
|
||||
// streamingThinkingInterval is the cadence at which we emit a synthetic
|
||||
// `thinking` event when the upstream has gone quiet but the connection
|
||||
// is still alive. 5 s matches aichat's own heartbeat tick so the UI
|
||||
// pulse never falls more than 5 s out of date.
|
||||
const streamingThinkingInterval = 5 * time.Second
|
||||
|
||||
// streamingTurnDeadline is the upper bound for a single streaming turn.
|
||||
// Far above any realistic Claude turn but finite so a runaway upstream
|
||||
// (or a paliad bug that never closes the channel) can't leak forever.
|
||||
const streamingTurnDeadline = 30 * time.Minute
|
||||
|
||||
// runStreamingTurn drives an incremental turn against the StreamingPaliadin
|
||||
// backend. Relays chunks → content events, upstream heartbeats →
|
||||
// thinking events, errors → error events. Adds its own silence-watch:
|
||||
// if the upstream emits no event for silenceTimeout, fire an error
|
||||
// frame so the client doesn't sit on a dead stream forever.
|
||||
func runStreamingTurn(turnID uuid.UUID, req services.TurnRequest, ch chan<- turnEvent, streamer services.StreamingPaliadin) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), streamingTurnDeadline)
|
||||
defer cancel()
|
||||
|
||||
events := make(chan services.StreamEvent, 32)
|
||||
startedAt := time.Now()
|
||||
|
||||
// streamerDone closes when the backend's RunTurnStream returns. We
|
||||
// race the silence watcher and the event pump against it so the
|
||||
// goroutine exit is clean either way.
|
||||
type runResult struct {
|
||||
result *services.TurnResult
|
||||
err error
|
||||
}
|
||||
runCh := make(chan runResult, 1)
|
||||
go func() {
|
||||
res, err := streamer.RunTurnStream(ctx, req, events)
|
||||
runCh <- runResult{res, err}
|
||||
}()
|
||||
|
||||
var (
|
||||
lastEventAt = time.Now()
|
||||
usedTools []string
|
||||
rowsSeen []int
|
||||
classifierTag string
|
||||
convID string
|
||||
gotChunk bool
|
||||
errorEmitted bool
|
||||
)
|
||||
|
||||
silenceTicker := time.NewTicker(streamingThinkingInterval)
|
||||
defer silenceTicker.Stop()
|
||||
|
||||
emitThinking := func(elapsedSeconds int) {
|
||||
// Don't emit `thinking` after the first real chunk arrives —
|
||||
// the frontend hides the pulse once content starts flowing
|
||||
// anyway, but we save bandwidth by stopping emission.
|
||||
send(ch, turnEvent{
|
||||
Kind: "thinking",
|
||||
Data: map[string]any{
|
||||
"elapsed_seconds": elapsedSeconds,
|
||||
"since_first": gotChunk,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case ev, more := <-events:
|
||||
if !more {
|
||||
events = nil // disable case
|
||||
continue
|
||||
}
|
||||
lastEventAt = time.Now()
|
||||
switch ev.Kind {
|
||||
case services.StreamChunk:
|
||||
gotChunk = true
|
||||
send(ch, turnEvent{
|
||||
Kind: "content",
|
||||
Data: map[string]any{
|
||||
"delta": ev.Content,
|
||||
"streamed": true,
|
||||
},
|
||||
})
|
||||
case services.StreamHeartbeat:
|
||||
// Upstream is alive but no chunks yet (or a mid-stream
|
||||
// stall). Pass through with our own thinking shape.
|
||||
send(ch, turnEvent{
|
||||
Kind: "thinking",
|
||||
Data: map[string]any{
|
||||
"elapsed_seconds": ev.ElapsedSeconds,
|
||||
"since_first": gotChunk,
|
||||
"upstream": true,
|
||||
},
|
||||
})
|
||||
case services.StreamMeta:
|
||||
usedTools = ev.UsedTools
|
||||
rowsSeen = ev.RowsSeen
|
||||
classifierTag = ev.ClassifierTag
|
||||
case services.StreamConversation:
|
||||
convID = ev.ConversationID
|
||||
case services.StreamError:
|
||||
errorEmitted = true
|
||||
send(ch, turnEvent{
|
||||
Kind: "error",
|
||||
Data: map[string]any{
|
||||
"code": ev.Code,
|
||||
"message": ev.Message,
|
||||
"retryable": ev.Retryable,
|
||||
},
|
||||
})
|
||||
}
|
||||
case <-silenceTicker.C:
|
||||
elapsed := time.Since(lastEventAt)
|
||||
if elapsed >= silenceTimeout {
|
||||
send(ch, turnEvent{
|
||||
Kind: "error",
|
||||
Data: map[string]any{
|
||||
"code": "upstream_silence",
|
||||
"message": "aichat upstream went silent for over " + silenceTimeout.String(),
|
||||
},
|
||||
})
|
||||
// Cancel the backend so it doesn't keep running.
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
emitThinking(int(time.Since(startedAt).Seconds()))
|
||||
case res := <-runCh:
|
||||
// Drain any remaining events the backend pushed before
|
||||
// closing the channel.
|
||||
if events != nil {
|
||||
for ev := range events {
|
||||
switch ev.Kind {
|
||||
case services.StreamChunk:
|
||||
gotChunk = true
|
||||
send(ch, turnEvent{
|
||||
Kind: "content",
|
||||
Data: map[string]any{
|
||||
"delta": ev.Content,
|
||||
"streamed": true,
|
||||
},
|
||||
})
|
||||
case services.StreamMeta:
|
||||
usedTools = ev.UsedTools
|
||||
rowsSeen = ev.RowsSeen
|
||||
classifierTag = ev.ClassifierTag
|
||||
case services.StreamConversation:
|
||||
convID = ev.ConversationID
|
||||
case services.StreamError:
|
||||
errorEmitted = true
|
||||
send(ch, turnEvent{
|
||||
Kind: "error",
|
||||
Data: map[string]any{
|
||||
"code": ev.Code,
|
||||
"message": ev.Message,
|
||||
"retryable": ev.Retryable,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
if res.err != nil {
|
||||
if !errorEmitted {
|
||||
send(ch, turnEvent{
|
||||
Kind: "error",
|
||||
Data: map[string]any{
|
||||
"code": "upstream_error",
|
||||
"message": res.err.Error(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
result := res.result
|
||||
if result == nil {
|
||||
// Shouldn't happen — backend contract returns either err
|
||||
// or a result. Defensive bail.
|
||||
if !errorEmitted {
|
||||
send(ch, turnEvent{
|
||||
Kind: "error",
|
||||
Data: map[string]any{
|
||||
"code": "upstream_error",
|
||||
"message": "stream closed without result",
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
if result.UsedTools != nil {
|
||||
usedTools = result.UsedTools
|
||||
}
|
||||
if result.RowsSeen != nil {
|
||||
rowsSeen = result.RowsSeen
|
||||
}
|
||||
if classifierTag == "" && result.ClassifierTag != "" {
|
||||
classifierTag = result.ClassifierTag
|
||||
}
|
||||
endData := map[string]any{
|
||||
"turn_id": turnID.String(),
|
||||
"used_tools": usedTools,
|
||||
"rows_seen": rowsSeen,
|
||||
"chip_count": result.ChipCount,
|
||||
"classifier_tag": classifierTag,
|
||||
"duration_ms": result.DurationMS,
|
||||
"streamed": true,
|
||||
}
|
||||
if convID != "" {
|
||||
endData["aichat_conversation_id"] = convID
|
||||
}
|
||||
send(ch, turnEvent{Kind: "end", Data: endData})
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handlePaliadinStream is the SSE endpoint the EventSource subscribes
|
||||
// to. Reads from the per-turn channel + writes SSE-framed events.
|
||||
func handlePaliadinStream(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -354,6 +585,114 @@ func handlePaliadinTurnGet(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// handlePaliadinTurnRecover is the dispatching late-recovery endpoint
|
||||
// (t-paliad-235). Replaces the legacy direct-row-read for the aichat
|
||||
// backend. When the backend implements services.AichatRecoverer (the
|
||||
// PALIADIN_BACKEND=aichat path), we ask aichat directly via its
|
||||
// conversation API whether the turn actually completed upstream after
|
||||
// our stream connection dropped. When it doesn't implement it (legacy
|
||||
// local/remote backends), we fall back to reading the local row —
|
||||
// services.LocalPaliadinService.runJanitor is still the recovery path
|
||||
// there.
|
||||
//
|
||||
// Response shape mirrors handlePaliadinTurnGet so the frontend
|
||||
// late-poll module doesn't need a backend-specific code path.
|
||||
// Additional field `recovery_state` distinguishes:
|
||||
//
|
||||
// "recovered" — the response is in the row (already there, or freshly
|
||||
// written from the upstream check)
|
||||
// "pending" — still no response; caller should keep polling
|
||||
// "lost" — backend confirms the turn is gone (aichat doesn't
|
||||
// have it either). UI should degrade to "verloren".
|
||||
func handlePaliadinTurnRecover(w http.ResponseWriter, r *http.Request) {
|
||||
if !requirePaliadinOwner(w, r) {
|
||||
return
|
||||
}
|
||||
uid, _ := requireUser(w, r)
|
||||
turnID, err := uuid.Parse(r.PathValue("id"))
|
||||
if err != nil {
|
||||
http.Error(w, "invalid turn_id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Quick read first — gives us the row regardless of backend.
|
||||
row, err := paliadinSvc.GetTurn(r.Context(), uid, turnID)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
http.Error(w, "not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
http.Error(w, "lookup failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
state := recoveryStateFor(row)
|
||||
|
||||
// Aichat backend: when the row still has no response, ask aichat
|
||||
// whether the turn actually finished upstream.
|
||||
if state == "pending" {
|
||||
if rec, ok := paliadinSvc.(services.AichatRecoverer); ok {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 8*time.Second)
|
||||
defer cancel()
|
||||
recovered, recErr := rec.RecoverTurn(ctx, uid, turnID)
|
||||
if recErr != nil {
|
||||
// Log + fall through to a plain pending response — a
|
||||
// transient aichat hiccup shouldn't flip the UI to
|
||||
// "lost".
|
||||
_ = recErr
|
||||
} else if recovered != nil {
|
||||
row = recovered
|
||||
state = recoveryStateFor(row)
|
||||
} else {
|
||||
// Aichat returned a clean "no, I don't have it either".
|
||||
// Only mark as lost when the turn is older than the
|
||||
// upstream's plausible turn budget — otherwise the
|
||||
// recovery just hit the window between paliad's stream
|
||||
// dropping and aichat finishing the run.
|
||||
if recoveryShouldGiveUp(row) {
|
||||
state = "lost"
|
||||
}
|
||||
}
|
||||
} else if recoveryShouldGiveUp(row) {
|
||||
// Legacy backends: rely on the janitor. If we're past the
|
||||
// give-up threshold and still no response, surface "lost".
|
||||
state = "lost"
|
||||
}
|
||||
}
|
||||
|
||||
resp := map[string]any{
|
||||
"turn_id": row.TurnID.String(),
|
||||
"started_at": row.StartedAt.Format(time.RFC3339),
|
||||
"response": row.Response,
|
||||
"error_code": row.ErrorCode,
|
||||
"finished_at": row.FinishedAt,
|
||||
"duration_ms": row.DurationMS,
|
||||
"used_tools": []string(row.UsedTools),
|
||||
"rows_seen": []int64(row.RowsSeen),
|
||||
"chip_count": row.ChipCount,
|
||||
"classifier_tag": row.ClassifierTag,
|
||||
"recovery_state": state,
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// recoveryStateFor returns the lifecycle state of a paliadin turn from
|
||||
// the recovery endpoint's perspective.
|
||||
func recoveryStateFor(row *services.PaliadinTurn) string {
|
||||
if row.Response != nil && *row.Response != "" {
|
||||
return "recovered"
|
||||
}
|
||||
return "pending"
|
||||
}
|
||||
|
||||
// recoveryShouldGiveUp returns true when a turn has been pending long
|
||||
// enough that we should surface "lost" rather than asking the user to
|
||||
// keep waiting. 12 minutes is comfortably beyond the longest realistic
|
||||
// Claude turn (cold-start + reasoning + tool calls all bundled).
|
||||
func recoveryShouldGiveUp(row *services.PaliadinTurn) bool {
|
||||
return time.Since(row.StartedAt) > 12*time.Minute
|
||||
}
|
||||
|
||||
// handlePaliadinHistory returns the caller's prior turns for a given
|
||||
// browser session id, oldest → newest. Both Paliadin surfaces (the
|
||||
// inline drawer and the standalone /paliadin page) hit this on mount
|
||||
|
||||
@@ -112,6 +112,11 @@ type AichatPaliadinService struct {
|
||||
// Hook for tests — when non-nil, callHTTP delegates here instead
|
||||
// of hitting the wire. Production code never sets this.
|
||||
httpHook func(ctx context.Context, method, path string, body any, out any) error
|
||||
|
||||
// Hook for tests — when non-nil, callStreamingHTTP delegates here
|
||||
// instead of opening a real SSE connection. Production code never
|
||||
// sets this.
|
||||
streamHook func(ctx context.Context, path string, body any, emit func(streamFrame)) error
|
||||
}
|
||||
|
||||
// ErrAichatAuthFailed signals the aichat service rejected the bearer
|
||||
|
||||
654
internal/services/aichat_paliadin_stream.go
Normal file
654
internal/services/aichat_paliadin_stream.go
Normal file
@@ -0,0 +1,654 @@
|
||||
package services
|
||||
|
||||
// Streaming + recovery support for AichatPaliadinService (t-paliad-235).
|
||||
//
|
||||
// =============================================================================
|
||||
// Upstream contract — /chat/turn/stream
|
||||
// =============================================================================
|
||||
//
|
||||
// Source of truth: m/mAi internal/aichat/api/stream.go. Captured here as
|
||||
// inline doc so future debugging doesn't require chasing across repos:
|
||||
//
|
||||
// Request body: same shape as POST /chat/turn (TurnRequest mirror in
|
||||
// aichat_paliadin.go). Persona must support streaming; paliad's
|
||||
// "paliadin" persona does.
|
||||
//
|
||||
// Response: text/event-stream. Two SSE event flavours:
|
||||
//
|
||||
// 1. The default unnamed `data:` event carries a discriminated-union
|
||||
// JSON object keyed by `"type"`:
|
||||
//
|
||||
// {"type":"chunk","content":"…"}
|
||||
// {"type":"meta","used_tools":[…],"rows_seen":[…],"classifier_tag":"…"}
|
||||
// {"type":"done","turn_id":"…","conversation_id":"…",
|
||||
// "duration_ms":1234,"pane_spawned":false,"resumed":false}
|
||||
// {"type":"error","code":"…","message":"…","retryable":true}
|
||||
//
|
||||
// 2. The named `event: heartbeat` event carries:
|
||||
//
|
||||
// {"elapsed_seconds": N}
|
||||
//
|
||||
// Emitted every 5 s by the upstream while the runner has been
|
||||
// silent (no content). aichat keeps emitting these for the lifetime
|
||||
// of the runner so the client can render "Paliadin denkt nach
|
||||
// (N s)" without conflating with actual content.
|
||||
//
|
||||
// Errors before the stream starts (auth failure, persona unknown,
|
||||
// validation) come back as a normal JSON envelope with the appropriate
|
||||
// HTTP status — not SSE. Those land in callHTTP via decodeAichatError.
|
||||
//
|
||||
// =============================================================================
|
||||
// Conversation-based late recovery
|
||||
// =============================================================================
|
||||
//
|
||||
// Aichat exposes:
|
||||
//
|
||||
// GET /chat/conversations?persona=…&username=…&user_id=…
|
||||
// → list of ConversationSummary, ordered last_turn_at DESC
|
||||
// GET /chat/conversations/{id}/turns
|
||||
// → list of TurnRow (role=user|assistant, body, created_at)
|
||||
//
|
||||
// When paliad's stream drops mid-turn we:
|
||||
// 1. Look up paliad.paliadin_turns.aichat_conversation_id for the row.
|
||||
// 2. If unset (stream dropped before the `done` frame): list the user's
|
||||
// conversations and take the most recent one for the persona —
|
||||
// that's the pane our turn ran against (aichat owns one active
|
||||
// conversation per persona+user, see m/mAi#243).
|
||||
// 3. GET that conversation's turns. Find the latest assistant turn
|
||||
// whose preceding user-role turn body matches our user_message.
|
||||
// 4. Persist the response (completeTurnLate) and return it.
|
||||
//
|
||||
// If aichat returns no matching assistant turn → the turn is truly lost
|
||||
// (transport drop + upstream crash). Recovery returns (nil, nil) and
|
||||
// the handler degrades the UI to "verloren".
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// =============================================================================
|
||||
// Streaming RunTurnStream
|
||||
// =============================================================================
|
||||
|
||||
// RunTurnStream drives one /chat/turn/stream turn against aichat and
|
||||
// relays incremental events onto `events`. Closes `events` before
|
||||
// returning. Implements StreamingPaliadin.
|
||||
func (s *AichatPaliadinService) RunTurnStream(ctx context.Context, req TurnRequest, events chan<- StreamEvent) (*TurnResult, error) {
|
||||
defer close(events)
|
||||
|
||||
s.turnMu.Lock()
|
||||
defer s.turnMu.Unlock()
|
||||
|
||||
turnID := uuid.New()
|
||||
startedAt := time.Now().UTC()
|
||||
|
||||
if err := s.insertTurnRow(ctx, &PaliadinTurn{
|
||||
TurnID: turnID,
|
||||
UserID: req.UserID,
|
||||
SessionID: req.SessionID,
|
||||
StartedAt: startedAt,
|
||||
UserMessage: req.UserMessage,
|
||||
PageOrigin: optionalString(req.PageOrigin),
|
||||
}, req.Context); err != nil {
|
||||
return nil, fmt.Errorf("paliadin: insert turn row: %w", err)
|
||||
}
|
||||
|
||||
if err := s.healthGate(ctx); err != nil {
|
||||
_ = s.markTurnError(ctx, turnID, "mriver_unreachable")
|
||||
safeSendStream(ctx, events, StreamEvent{
|
||||
Kind: StreamError,
|
||||
Code: "mriver_unreachable",
|
||||
Message: err.Error(),
|
||||
})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
username := s.usernameFor(ctx, req.UserID)
|
||||
session := s.cfg.Persona + ":" + username
|
||||
primer := s.buildPrimerExchanges(ctx, session, req)
|
||||
|
||||
jwt, err := s.mintJWTIfConfigured(req.UserID)
|
||||
if err != nil {
|
||||
_ = s.markTurnError(ctx, turnID, "jwt_mint_failed")
|
||||
safeSendStream(ctx, events, StreamEvent{
|
||||
Kind: StreamError,
|
||||
Code: "shim_error",
|
||||
Message: fmt.Sprintf("mint turn jwt: %v", err),
|
||||
})
|
||||
return nil, fmt.Errorf("paliadin: mint turn jwt: %w", err)
|
||||
}
|
||||
|
||||
body := aichatTurnRequest{
|
||||
Persona: s.cfg.Persona,
|
||||
Username: username,
|
||||
UserID: req.UserID.String(),
|
||||
SessionID: req.SessionID,
|
||||
Message: sanitiseForTmux(req.UserMessage),
|
||||
JWT: jwt,
|
||||
Primer: primer,
|
||||
Meta: buildAichatMeta(req),
|
||||
}
|
||||
|
||||
// Stream the upstream call. acc accumulates the full text so we can
|
||||
// persist the row + return a TurnResult on success.
|
||||
var (
|
||||
acc strings.Builder
|
||||
streamMeta trailerMeta
|
||||
convID string
|
||||
paneSpawned bool
|
||||
upstreamDoneMs int64
|
||||
)
|
||||
|
||||
streamErr := s.callStreamingHTTP(ctx, "/chat/turn/stream", body, func(frame streamFrame) {
|
||||
switch {
|
||||
case frame.event == "heartbeat":
|
||||
safeSendStream(ctx, events, StreamEvent{
|
||||
Kind: StreamHeartbeat,
|
||||
ElapsedSeconds: frame.heartbeat.ElapsedSeconds,
|
||||
})
|
||||
case frame.data.Type == "chunk":
|
||||
if frame.data.Content == "" {
|
||||
return
|
||||
}
|
||||
acc.WriteString(frame.data.Content)
|
||||
safeSendStream(ctx, events, StreamEvent{
|
||||
Kind: StreamChunk,
|
||||
Content: frame.data.Content,
|
||||
})
|
||||
case frame.data.Type == "meta":
|
||||
streamMeta = trailerMeta{
|
||||
UsedTools: append([]string(nil), frame.data.UsedTools...),
|
||||
RowsSeen: coerceAichatRowsSeen(frame.data.RowsSeen),
|
||||
ClassifierTag: frame.data.ClassifierTag,
|
||||
}
|
||||
safeSendStream(ctx, events, StreamEvent{
|
||||
Kind: StreamMeta,
|
||||
UsedTools: streamMeta.UsedTools,
|
||||
RowsSeen: streamMeta.RowsSeen,
|
||||
ClassifierTag: streamMeta.ClassifierTag,
|
||||
})
|
||||
case frame.data.Type == "done":
|
||||
if frame.data.ConversationID != "" {
|
||||
convID = frame.data.ConversationID
|
||||
safeSendStream(ctx, events, StreamEvent{
|
||||
Kind: StreamConversation,
|
||||
ConversationID: convID,
|
||||
})
|
||||
}
|
||||
paneSpawned = frame.data.PaneSpawned
|
||||
upstreamDoneMs = frame.data.DurationMs
|
||||
case frame.data.Type == "error":
|
||||
// Forward as a stream error AND mark for non-nil err
|
||||
// propagation via the streamErr captured below.
|
||||
safeSendStream(ctx, events, StreamEvent{
|
||||
Kind: StreamError,
|
||||
Code: frame.data.Code,
|
||||
Message: frame.data.Message,
|
||||
Retryable: frame.data.Retryable,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
cleanBody := acc.String()
|
||||
tokens := approxTokenCount(cleanBody)
|
||||
chipCount := countChips(cleanBody)
|
||||
finished := time.Now().UTC()
|
||||
durationMS := int(finished.Sub(startedAt) / time.Millisecond)
|
||||
if upstreamDoneMs > 0 {
|
||||
durationMS = int(upstreamDoneMs)
|
||||
}
|
||||
|
||||
// Persist the conversation id we learned (best-effort — failure here
|
||||
// just means recovery for THIS turn will have to list conversations
|
||||
// rather than fast-path to a single id).
|
||||
if convID != "" {
|
||||
if err := s.setAichatConversationID(ctx, turnID, convID); err != nil {
|
||||
log.Printf("paliadin: persist aichat conversation id %s: %v", convID, err)
|
||||
}
|
||||
}
|
||||
|
||||
if streamErr != nil {
|
||||
// Don't overwrite an existing error_code we may have set above.
|
||||
_ = s.markTurnError(ctx, turnID, classifyAichatError(streamErr))
|
||||
return nil, streamErr
|
||||
}
|
||||
|
||||
// Aichat is stateless on user content; the client owns the primer.
|
||||
if paneSpawned {
|
||||
s.clearPrimed(session)
|
||||
} else {
|
||||
s.markPrimed(session)
|
||||
}
|
||||
|
||||
if cleanBody == "" {
|
||||
// Upstream closed cleanly with no error event but no content
|
||||
// either (unexpected — log + treat as upstream_error so the
|
||||
// handler doesn't ship an empty bubble).
|
||||
_ = s.markTurnError(ctx, turnID, "shim_error")
|
||||
return nil, errors.New("aichat: stream closed with no content and no error")
|
||||
}
|
||||
|
||||
if err := s.completeTurn(ctx, turnID, finished, durationMS, cleanBody, tokens, streamMeta, chipCount); err != nil {
|
||||
log.Printf("paliadin: complete turn %s: %v", turnID, err)
|
||||
}
|
||||
|
||||
return &TurnResult{
|
||||
TurnID: turnID,
|
||||
Response: cleanBody,
|
||||
UsedTools: streamMeta.UsedTools,
|
||||
RowsSeen: streamMeta.RowsSeen,
|
||||
ChipCount: chipCount,
|
||||
ClassifierTag: streamMeta.ClassifierTag,
|
||||
DurationMS: durationMS,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// streamFrame is one decoded SSE event.
|
||||
type streamFrame struct {
|
||||
event string // "" → default (data:) event
|
||||
data streamDataFrame
|
||||
heartbeat streamHeartbeatFrame
|
||||
}
|
||||
|
||||
type streamDataFrame struct {
|
||||
Type string `json:"type"`
|
||||
Content string `json:"content,omitempty"`
|
||||
UsedTools []string `json:"used_tools,omitempty"`
|
||||
RowsSeen []string `json:"rows_seen,omitempty"`
|
||||
ClassifierTag string `json:"classifier_tag,omitempty"`
|
||||
TurnID string `json:"turn_id,omitempty"`
|
||||
ConversationID string `json:"conversation_id,omitempty"`
|
||||
DurationMs int64 `json:"duration_ms,omitempty"`
|
||||
PaneSpawned bool `json:"pane_spawned,omitempty"`
|
||||
Resumed bool `json:"resumed,omitempty"`
|
||||
Code string `json:"code,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
Retryable bool `json:"retryable,omitempty"`
|
||||
}
|
||||
|
||||
type streamHeartbeatFrame struct {
|
||||
ElapsedSeconds int `json:"elapsed_seconds"`
|
||||
}
|
||||
|
||||
// callStreamingHTTP opens a streaming POST to aichat and invokes `emit`
|
||||
// for each parsed SSE frame. Returns once the stream closes; surfaces
|
||||
// non-2xx responses via decodeAichatError, transport errors via the
|
||||
// underlying http.Client error.
|
||||
//
|
||||
// Tests can override the parsing path by setting streamHook (kept null
|
||||
// in production).
|
||||
func (s *AichatPaliadinService) callStreamingHTTP(ctx context.Context, path string, body any, emit func(streamFrame)) error {
|
||||
if s.streamHook != nil {
|
||||
return s.streamHook(ctx, path, body, emit)
|
||||
}
|
||||
|
||||
buf, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("aichat: encode %s body: %w", path, err)
|
||||
}
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, s.cfg.BaseURL+path, strings.NewReader(string(buf)))
|
||||
if err != nil {
|
||||
return fmt.Errorf("aichat: build %s request: %w", path, err)
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
httpReq.Header.Set("Accept", "text/event-stream")
|
||||
if s.cfg.BearerToken != "" {
|
||||
httpReq.Header.Set("Authorization", "Bearer "+s.cfg.BearerToken)
|
||||
}
|
||||
|
||||
// Use a dedicated client without the short Timeout — for streaming
|
||||
// we rely on the silence_timeout watch (no events for > 90 s ⇒ fail)
|
||||
// rather than a hard ceiling on the whole turn. The aichat upstream
|
||||
// keeps emitting heartbeats while it's alive, so a true upstream
|
||||
// stall is observable here.
|
||||
client := s.streamingClient()
|
||||
resp, err := client.Do(httpReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("aichat: POST %s: %w", path, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10))
|
||||
return decodeAichatError(resp.StatusCode, respBytes)
|
||||
}
|
||||
|
||||
return parseSSEStream(ctx, resp.Body, emit)
|
||||
}
|
||||
|
||||
// streamingClient returns an HTTP client tuned for streaming — no
|
||||
// per-request Timeout (kills mid-stream), but a long IdleConnTimeout so
|
||||
// the connection stays usable for multi-minute turns.
|
||||
func (s *AichatPaliadinService) streamingClient() *http.Client {
|
||||
if s.cfg.HTTPClient == nil {
|
||||
return &http.Client{Timeout: 0}
|
||||
}
|
||||
c := *s.cfg.HTTPClient
|
||||
c.Timeout = 0
|
||||
return &c
|
||||
}
|
||||
|
||||
// parseSSEStream tokenises an SSE byte stream into streamFrame events
|
||||
// and calls emit for each. Returns nil on clean EOF; returns the read
|
||||
// error otherwise.
|
||||
//
|
||||
// Frame format (per https://html.spec.whatwg.org/multipage/server-sent-events.html):
|
||||
//
|
||||
// event: <name>\n
|
||||
// data: <payload>\n
|
||||
// <blank line>\n
|
||||
//
|
||||
// Multiple `data:` lines per event are concatenated with `\n`. Lines
|
||||
// starting with `:` are comments and ignored.
|
||||
func parseSSEStream(ctx context.Context, r io.Reader, emit func(streamFrame)) error {
|
||||
br := bufio.NewReaderSize(r, 64<<10)
|
||||
var (
|
||||
eventName string
|
||||
dataLines []string
|
||||
)
|
||||
flush := func() {
|
||||
if len(dataLines) == 0 && eventName == "" {
|
||||
return
|
||||
}
|
||||
payload := strings.Join(dataLines, "\n")
|
||||
eventName = strings.TrimSpace(eventName)
|
||||
dataLines = nil
|
||||
eventOut := eventName
|
||||
eventName = ""
|
||||
if eventOut == "heartbeat" {
|
||||
var hb streamHeartbeatFrame
|
||||
if err := json.Unmarshal([]byte(payload), &hb); err != nil {
|
||||
return
|
||||
}
|
||||
emit(streamFrame{event: "heartbeat", heartbeat: hb})
|
||||
return
|
||||
}
|
||||
// Default event (unnamed) — discriminated by `type` field.
|
||||
var d streamDataFrame
|
||||
if err := json.Unmarshal([]byte(payload), &d); err != nil {
|
||||
return
|
||||
}
|
||||
emit(streamFrame{event: "", data: d})
|
||||
}
|
||||
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
line, err := br.ReadString('\n')
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
// Final frame may not be terminated by a blank line on
|
||||
// abrupt close — flush whatever we accumulated.
|
||||
if line != "" {
|
||||
processSSELine(line, &eventName, &dataLines)
|
||||
}
|
||||
flush()
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("aichat: read sse: %w", err)
|
||||
}
|
||||
// Normalise line endings (some intermediaries send \r\n).
|
||||
line = strings.TrimRight(line, "\r\n")
|
||||
if line == "" {
|
||||
flush()
|
||||
continue
|
||||
}
|
||||
processSSELine(line, &eventName, &dataLines)
|
||||
}
|
||||
}
|
||||
|
||||
// processSSELine handles one line of the SSE wire format.
|
||||
func processSSELine(line string, eventName *string, dataLines *[]string) {
|
||||
if strings.HasPrefix(line, ":") {
|
||||
return // comment / keep-alive
|
||||
}
|
||||
if idx := strings.IndexByte(line, ':'); idx >= 0 {
|
||||
field := line[:idx]
|
||||
value := line[idx+1:]
|
||||
if strings.HasPrefix(value, " ") {
|
||||
value = value[1:]
|
||||
}
|
||||
switch field {
|
||||
case "event":
|
||||
*eventName = value
|
||||
case "data":
|
||||
*dataLines = append(*dataLines, value)
|
||||
}
|
||||
return
|
||||
}
|
||||
// Field with no value (rare). Treat the whole line as field name
|
||||
// per spec.
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// AichatRecoverer — late recovery via the conversation API
|
||||
// =============================================================================
|
||||
|
||||
// RecoverTurn asks aichat whether the given paliad turn has a response.
|
||||
// Returns the up-to-date row on success (including a freshly persisted
|
||||
// response when aichat had one), nil + nil when aichat doesn't know
|
||||
// either, or an error on transport / DB failures.
|
||||
func (s *AichatPaliadinService) RecoverTurn(ctx context.Context, callerID, turnID uuid.UUID) (*PaliadinTurn, error) {
|
||||
row, err := s.GetTurn(ctx, callerID, turnID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Fast path: the row already has a response (the janitor or a
|
||||
// concurrent stream finished writing). Return it as-is.
|
||||
if row.Response != nil && *row.Response != "" {
|
||||
return row, nil
|
||||
}
|
||||
|
||||
convID, err := s.resolveAichatConversationID(ctx, row)
|
||||
if err != nil {
|
||||
log.Printf("paliadin: recover %s: resolve conversation: %v", turnID, err)
|
||||
return nil, nil
|
||||
}
|
||||
if convID == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
turns, err := s.fetchAichatConversationTurns(ctx, convID)
|
||||
if err != nil {
|
||||
log.Printf("paliadin: recover %s: fetch turns: %v", turnID, err)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
assistantBody := matchAssistantResponse(turns, row.UserMessage)
|
||||
if assistantBody == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
finished := time.Now().UTC()
|
||||
durationMS := int(finished.Sub(row.StartedAt) / time.Millisecond)
|
||||
tokens := approxTokenCount(assistantBody)
|
||||
chipCount := countChips(assistantBody)
|
||||
|
||||
if err := s.completeTurnLate(ctx, turnID, finished, durationMS, assistantBody, tokens, trailerMeta{}, chipCount); err != nil {
|
||||
log.Printf("paliadin: recover %s: complete late: %v", turnID, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Re-read so the caller gets a row that reflects the late-write.
|
||||
return s.GetTurn(ctx, callerID, turnID)
|
||||
}
|
||||
|
||||
// resolveAichatConversationID returns the conversation the turn lived
|
||||
// in. Fast path: read the column on the row. Fallback: list aichat
|
||||
// conversations for the user+persona and take the most recent.
|
||||
func (s *AichatPaliadinService) resolveAichatConversationID(ctx context.Context, row *PaliadinTurn) (string, error) {
|
||||
stored, err := s.getAichatConversationID(ctx, row.TurnID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if stored != "" {
|
||||
return stored, nil
|
||||
}
|
||||
username := s.usernameFor(ctx, row.UserID)
|
||||
convs, err := s.listAichatConversations(ctx, username, row.UserID.String())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(convs) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
// Aichat orders by last_turn_at DESC; the head is the most recently
|
||||
// active conversation, which is the pane the lost turn ran against.
|
||||
return convs[0].ID, nil
|
||||
}
|
||||
|
||||
// matchAssistantResponse walks the aichat turn list and returns the
|
||||
// body of the latest assistant turn whose preceding user-role turn body
|
||||
// matches `userMessage` (verbatim — aichat persists the raw message
|
||||
// the same way paliad does).
|
||||
//
|
||||
// Falls back to "the last assistant body in the conversation" when no
|
||||
// match is found but the conversation has assistant content. This
|
||||
// covers cases where aichat persisted the user turn with envelope
|
||||
// prefixes that don't exactly match our user_message (e.g. an embedded
|
||||
// [ctx …] block).
|
||||
func matchAssistantResponse(turns []aichatConversationTurn, userMessage string) string {
|
||||
wantedNorm := normaliseForMatch(userMessage)
|
||||
|
||||
for i := 0; i < len(turns)-1; i++ {
|
||||
t := turns[i]
|
||||
if t.Role != "user" {
|
||||
continue
|
||||
}
|
||||
if normaliseForMatch(t.Body) != wantedNorm {
|
||||
continue
|
||||
}
|
||||
next := turns[i+1]
|
||||
if next.Role == "assistant" && next.Body != "" {
|
||||
return next.Body
|
||||
}
|
||||
}
|
||||
|
||||
for i := len(turns) - 1; i >= 0; i-- {
|
||||
t := turns[i]
|
||||
if t.Role == "assistant" && t.Body != "" {
|
||||
return t.Body
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// normaliseForMatch lowercases, strips surrounding whitespace, and
|
||||
// collapses internal whitespace runs. Comparison only — no semantic
|
||||
// meaning beyond "did aichat persist the same prompt we sent".
|
||||
func normaliseForMatch(s string) string {
|
||||
s = strings.TrimSpace(strings.ToLower(s))
|
||||
for strings.Contains(s, " ") {
|
||||
s = strings.ReplaceAll(s, " ", " ")
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// aichat conversation API client helpers
|
||||
// =============================================================================
|
||||
|
||||
type aichatConversationSummary struct {
|
||||
ID string `json:"id"`
|
||||
Persona string `json:"persona"`
|
||||
LastTurnAt string `json:"last_turn_at"`
|
||||
}
|
||||
|
||||
type aichatListConversationsResponse struct {
|
||||
Conversations []aichatConversationSummary `json:"conversations"`
|
||||
}
|
||||
|
||||
type aichatConversationTurn struct {
|
||||
ID string `json:"id"`
|
||||
Seq int `json:"seq"`
|
||||
Role string `json:"role"`
|
||||
Body string `json:"body"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
type aichatGetConversationTurnsResponse struct {
|
||||
ConversationID string `json:"conversation_id"`
|
||||
Turns []aichatConversationTurn `json:"turns"`
|
||||
HasMore bool `json:"has_more"`
|
||||
}
|
||||
|
||||
// listAichatConversations calls GET /chat/conversations for the user.
|
||||
func (s *AichatPaliadinService) listAichatConversations(ctx context.Context, username, userID string) ([]aichatConversationSummary, error) {
|
||||
q := url.Values{}
|
||||
q.Set("persona", s.cfg.Persona)
|
||||
q.Set("username", username)
|
||||
q.Set("user_id", userID)
|
||||
q.Set("limit", "5")
|
||||
path := "/chat/conversations?" + q.Encode()
|
||||
var resp aichatListConversationsResponse
|
||||
if err := s.callHTTP(ctx, http.MethodGet, path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp.Conversations, nil
|
||||
}
|
||||
|
||||
// fetchAichatConversationTurns calls GET /chat/conversations/{id}/turns.
|
||||
func (s *AichatPaliadinService) fetchAichatConversationTurns(ctx context.Context, convID string) ([]aichatConversationTurn, error) {
|
||||
q := url.Values{}
|
||||
q.Set("persona", s.cfg.Persona)
|
||||
q.Set("limit", "20")
|
||||
path := "/chat/conversations/" + url.PathEscape(convID) + "/turns?" + q.Encode()
|
||||
var resp aichatGetConversationTurnsResponse
|
||||
if err := s.callHTTP(ctx, http.MethodGet, path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp.Turns, nil
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// DB helpers for paliadin_turns.aichat_conversation_id (migration 118)
|
||||
// =============================================================================
|
||||
|
||||
func (s *AichatPaliadinService) setAichatConversationID(ctx context.Context, turnID uuid.UUID, convID string) error {
|
||||
if convID == "" {
|
||||
return nil
|
||||
}
|
||||
convUUID, err := uuid.Parse(convID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid conversation id %q: %w", convID, err)
|
||||
}
|
||||
_, err = s.db.ExecContext(ctx, `
|
||||
UPDATE paliad.paliadin_turns
|
||||
SET aichat_conversation_id = $2
|
||||
WHERE turn_id = $1
|
||||
AND aichat_conversation_id IS DISTINCT FROM $2
|
||||
`, turnID, convUUID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *AichatPaliadinService) getAichatConversationID(ctx context.Context, turnID uuid.UUID) (string, error) {
|
||||
var convID *uuid.UUID
|
||||
err := s.db.QueryRowxContext(ctx,
|
||||
`SELECT aichat_conversation_id FROM paliad.paliadin_turns WHERE turn_id = $1`,
|
||||
turnID).Scan(&convID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if convID == nil {
|
||||
return "", nil
|
||||
}
|
||||
return convID.String(), nil
|
||||
}
|
||||
|
||||
// Compile-time interface conformance — fail the build if a streaming
|
||||
// method drifts off this backend.
|
||||
var _ StreamingPaliadin = (*AichatPaliadinService)(nil)
|
||||
var _ AichatRecoverer = (*AichatPaliadinService)(nil)
|
||||
259
internal/services/aichat_paliadin_stream_test.go
Normal file
259
internal/services/aichat_paliadin_stream_test.go
Normal file
@@ -0,0 +1,259 @@
|
||||
package services
|
||||
|
||||
// Streaming + recovery tests for AichatPaliadinService (t-paliad-235).
|
||||
//
|
||||
// Like the sync-path tests next door, every test bypasses the HTTP wire
|
||||
// (streamHook / httpHook). DB-write paths in RunTurnStream are out of
|
||||
// scope here for the same reason — paliad has no sqlx mock. We focus
|
||||
// on the SSE parser, the conversation-API client, and the
|
||||
// match-assistant-response helper.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// =============================================================================
|
||||
// SSE parser
|
||||
// =============================================================================
|
||||
|
||||
func TestParseSSEStream_DefaultEvents(t *testing.T) {
|
||||
body := `data: {"type":"chunk","content":"Hello "}
|
||||
|
||||
data: {"type":"chunk","content":"world"}
|
||||
|
||||
data: {"type":"meta","used_tools":["search"],"rows_seen":["3"],"classifier_tag":"howto"}
|
||||
|
||||
data: {"type":"done","turn_id":"abc","conversation_id":"11111111-1111-1111-1111-111111111111","duration_ms":1234,"pane_spawned":false,"resumed":false}
|
||||
|
||||
`
|
||||
var frames []streamFrame
|
||||
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
||||
frames = append(frames, f)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("parseSSEStream: %v", err)
|
||||
}
|
||||
if len(frames) != 4 {
|
||||
t.Fatalf("got %d frames; want 4 (%+v)", len(frames), frames)
|
||||
}
|
||||
if frames[0].data.Type != "chunk" || frames[0].data.Content != "Hello " {
|
||||
t.Errorf("frame 0 = %+v; want chunk Hello ", frames[0])
|
||||
}
|
||||
if frames[1].data.Type != "chunk" || frames[1].data.Content != "world" {
|
||||
t.Errorf("frame 1 = %+v; want chunk world", frames[1])
|
||||
}
|
||||
if frames[2].data.Type != "meta" || frames[2].data.ClassifierTag != "howto" {
|
||||
t.Errorf("frame 2 = %+v; want meta howto", frames[2])
|
||||
}
|
||||
if frames[3].data.Type != "done" || frames[3].data.ConversationID == "" {
|
||||
t.Errorf("frame 3 = %+v; want done with conversation_id", frames[3])
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSSEStream_HeartbeatEvent(t *testing.T) {
|
||||
body := `event: heartbeat
|
||||
data: {"elapsed_seconds": 5}
|
||||
|
||||
event: heartbeat
|
||||
data: {"elapsed_seconds": 10}
|
||||
|
||||
data: {"type":"chunk","content":"Hi"}
|
||||
|
||||
`
|
||||
var frames []streamFrame
|
||||
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
||||
frames = append(frames, f)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("parseSSEStream: %v", err)
|
||||
}
|
||||
if len(frames) != 3 {
|
||||
t.Fatalf("got %d frames; want 3", len(frames))
|
||||
}
|
||||
if frames[0].event != "heartbeat" || frames[0].heartbeat.ElapsedSeconds != 5 {
|
||||
t.Errorf("frame 0 = %+v; want heartbeat 5s", frames[0])
|
||||
}
|
||||
if frames[1].event != "heartbeat" || frames[1].heartbeat.ElapsedSeconds != 10 {
|
||||
t.Errorf("frame 1 = %+v; want heartbeat 10s", frames[1])
|
||||
}
|
||||
if frames[2].data.Type != "chunk" || frames[2].data.Content != "Hi" {
|
||||
t.Errorf("frame 2 = %+v; want chunk Hi", frames[2])
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSSEStream_IgnoresComments(t *testing.T) {
|
||||
body := `: keep-alive comment line
|
||||
|
||||
data: {"type":"chunk","content":"x"}
|
||||
|
||||
`
|
||||
var frames []streamFrame
|
||||
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
||||
frames = append(frames, f)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("parseSSEStream: %v", err)
|
||||
}
|
||||
if len(frames) != 1 || frames[0].data.Content != "x" {
|
||||
t.Errorf("frames = %+v; want 1 chunk", frames)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSSEStream_HandlesCRLF(t *testing.T) {
|
||||
body := "data: {\"type\":\"chunk\",\"content\":\"crlf\"}\r\n\r\n"
|
||||
var frames []streamFrame
|
||||
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
||||
frames = append(frames, f)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("parseSSEStream: %v", err)
|
||||
}
|
||||
if len(frames) != 1 || frames[0].data.Content != "crlf" {
|
||||
t.Errorf("frames = %+v; want crlf chunk", frames)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSSEStream_MultilineData(t *testing.T) {
|
||||
// Two data: lines for the same event must concatenate with \n.
|
||||
body := `data: {"type":"chunk",
|
||||
data: "content":"x"}
|
||||
|
||||
`
|
||||
var frames []streamFrame
|
||||
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
||||
frames = append(frames, f)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("parseSSEStream: %v", err)
|
||||
}
|
||||
if len(frames) != 1 || frames[0].data.Content != "x" {
|
||||
t.Errorf("frames = %+v; want 1 chunk x", frames)
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// matchAssistantResponse
|
||||
// =============================================================================
|
||||
|
||||
func TestMatchAssistantResponse_PrefersUserPrecededAssistant(t *testing.T) {
|
||||
turns := []aichatConversationTurn{
|
||||
{Role: "user", Body: "first question"},
|
||||
{Role: "assistant", Body: "first answer"},
|
||||
{Role: "user", Body: "second question"},
|
||||
{Role: "assistant", Body: "second answer"},
|
||||
}
|
||||
got := matchAssistantResponse(turns, "second question")
|
||||
if got != "second answer" {
|
||||
t.Errorf("got %q; want %q", got, "second answer")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMatchAssistantResponse_NormaliseCase(t *testing.T) {
|
||||
turns := []aichatConversationTurn{
|
||||
{Role: "user", Body: "Hello World"},
|
||||
{Role: "assistant", Body: "hi back"},
|
||||
}
|
||||
got := matchAssistantResponse(turns, " hello world ")
|
||||
if got != "hi back" {
|
||||
t.Errorf("got %q; want %q", got, "hi back")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMatchAssistantResponse_FallbackToLastAssistant(t *testing.T) {
|
||||
// User message doesn't match (aichat persisted with a different
|
||||
// envelope or wrapper). Fallback: take the last assistant turn.
|
||||
turns := []aichatConversationTurn{
|
||||
{Role: "user", Body: "[ctx route=x] my question"},
|
||||
{Role: "assistant", Body: "the answer"},
|
||||
}
|
||||
got := matchAssistantResponse(turns, "my question")
|
||||
if got != "the answer" {
|
||||
t.Errorf("got %q; want %q", got, "the answer")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMatchAssistantResponse_NoAssistantTurns(t *testing.T) {
|
||||
turns := []aichatConversationTurn{
|
||||
{Role: "user", Body: "lonely"},
|
||||
}
|
||||
got := matchAssistantResponse(turns, "lonely")
|
||||
if got != "" {
|
||||
t.Errorf("got %q; want empty", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMatchAssistantResponse_EmptyAssistantSkipped(t *testing.T) {
|
||||
turns := []aichatConversationTurn{
|
||||
{Role: "user", Body: "q1"},
|
||||
{Role: "assistant", Body: ""},
|
||||
{Role: "user", Body: "q2"},
|
||||
{Role: "assistant", Body: "a2"},
|
||||
}
|
||||
got := matchAssistantResponse(turns, "q1")
|
||||
if got != "a2" {
|
||||
// q1's immediate next is the empty-body assistant — we skip it
|
||||
// and fall back to the last non-empty assistant body.
|
||||
t.Errorf("got %q; want %q (fallback)", got, "a2")
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Conversation-API HTTP client
|
||||
// =============================================================================
|
||||
|
||||
func TestListAichatConversations_BuildsExpectedQuery(t *testing.T) {
|
||||
var seenPath string
|
||||
s := newAichatService(t, nil, func(ctx context.Context, method, path string, body any, out any) error {
|
||||
seenPath = path
|
||||
if dst, ok := out.(*aichatListConversationsResponse); ok {
|
||||
dst.Conversations = []aichatConversationSummary{{ID: "11111111-1111-1111-1111-111111111111"}}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
got, err := s.listAichatConversations(context.Background(), "alice", "00000000-0000-0000-0000-000000000001")
|
||||
if err != nil {
|
||||
t.Fatalf("listAichatConversations: %v", err)
|
||||
}
|
||||
if len(got) != 1 || got[0].ID == "" {
|
||||
t.Errorf("got = %+v; want one conversation", got)
|
||||
}
|
||||
for _, want := range []string{"/chat/conversations?", "persona=paliadin", "username=alice", "user_id=00000000-0000-0000-0000-000000000001", "limit=5"} {
|
||||
if !strings.Contains(seenPath, want) {
|
||||
t.Errorf("path %q missing %q", seenPath, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchAichatConversationTurns_BuildsPath(t *testing.T) {
|
||||
var seenPath string
|
||||
s := newAichatService(t, nil, func(ctx context.Context, method, path string, body any, out any) error {
|
||||
seenPath = path
|
||||
if dst, ok := out.(*aichatGetConversationTurnsResponse); ok {
|
||||
dst.Turns = []aichatConversationTurn{{Role: "assistant", Body: "answer"}}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
turns, err := s.fetchAichatConversationTurns(context.Background(), "11111111-1111-1111-1111-111111111111")
|
||||
if err != nil {
|
||||
t.Fatalf("fetchAichatConversationTurns: %v", err)
|
||||
}
|
||||
if len(turns) != 1 || turns[0].Body != "answer" {
|
||||
t.Errorf("turns = %+v; want one assistant", turns)
|
||||
}
|
||||
for _, want := range []string{"/chat/conversations/11111111-1111-1111-1111-111111111111/turns", "persona=paliadin", "limit=20"} {
|
||||
if !strings.Contains(seenPath, want) {
|
||||
t.Errorf("path %q missing %q", seenPath, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Interface conformance
|
||||
// =============================================================================
|
||||
|
||||
func TestAichatPaliadinService_ImplementsStreaming(t *testing.T) {
|
||||
var _ StreamingPaliadin = (*AichatPaliadinService)(nil)
|
||||
var _ AichatRecoverer = (*AichatPaliadinService)(nil)
|
||||
}
|
||||
127
internal/services/paliadin_streaming.go
Normal file
127
internal/services/paliadin_streaming.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package services
|
||||
|
||||
// Streaming support for the Paliadin chat surface (t-paliad-235).
|
||||
//
|
||||
// The legacy LocalPaliadinService.RunTurn returns the full response in
|
||||
// one shot — the chat UI gets one `content` blob and the typewriter
|
||||
// simulates streaming. That falls apart on long turns: the HTTP client
|
||||
// hits its 130 s ceiling, paliad's SSE stream closes, the bubble shows
|
||||
// "Verbindung verloren" and the response is lost.
|
||||
//
|
||||
// The aichat backend exposes a real streaming variant at
|
||||
// /chat/turn/stream that emits incremental chunks + named heartbeat
|
||||
// events while claude is thinking. AichatPaliadinService implements
|
||||
// the StreamingPaliadin interface defined here; the handler probes
|
||||
// for it via a type assertion and falls back to the one-shot RunTurn
|
||||
// when the backend doesn't support streaming (legacy path).
|
||||
//
|
||||
// Recovery (a separate axis): when the transport drops mid-turn,
|
||||
// the AichatRecoverer interface lets the handler ask the backend to
|
||||
// look up the late response via aichat's conversation API rather than
|
||||
// rely on the legacy filesystem janitor — which only knows about
|
||||
// LocalPaliadinService's per-turn response files.
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// StreamEvent is one increment of a streaming turn. The handler
|
||||
// receives these via the channel passed to RunTurnStream and forwards
|
||||
// them as SSE frames to the browser.
|
||||
//
|
||||
// Exactly one of Kind's payloads is meaningful per event:
|
||||
//
|
||||
// StreamChunk → Content holds the next slice of assistant text
|
||||
// StreamHeartbeat → ElapsedSeconds holds upstream "still thinking" tick
|
||||
// StreamMeta → UsedTools / RowsSeen / ClassifierTag populated
|
||||
// StreamError → Code / Message / Retryable populated
|
||||
//
|
||||
// StreamDone is implicit: when the channel closes without an error
|
||||
// event, the turn completed. The accompanying *TurnResult returned by
|
||||
// RunTurnStream carries the final accumulated body + meta + conversation
|
||||
// id for persistence and recovery.
|
||||
type StreamEvent struct {
|
||||
Kind StreamEventKind
|
||||
|
||||
// StreamChunk
|
||||
Content string
|
||||
|
||||
// StreamHeartbeat
|
||||
ElapsedSeconds int
|
||||
|
||||
// StreamMeta (terminal-side; may also be merged into final TurnResult)
|
||||
UsedTools []string
|
||||
RowsSeen []int
|
||||
ClassifierTag string
|
||||
|
||||
// StreamError
|
||||
Code string
|
||||
Message string
|
||||
Retryable bool
|
||||
|
||||
// StreamConversation — aichat sometimes resolves the conversation id
|
||||
// before the first chunk arrives. We surface it as soon as we have
|
||||
// it so the handler can persist it for recovery, even if the stream
|
||||
// is later interrupted.
|
||||
ConversationID string
|
||||
}
|
||||
|
||||
// StreamEventKind enumerates the meaningful flavours.
|
||||
type StreamEventKind string
|
||||
|
||||
const (
|
||||
StreamChunk StreamEventKind = "chunk"
|
||||
StreamHeartbeat StreamEventKind = "heartbeat"
|
||||
StreamMeta StreamEventKind = "meta"
|
||||
StreamError StreamEventKind = "error"
|
||||
StreamConversation StreamEventKind = "conversation"
|
||||
)
|
||||
|
||||
// StreamingPaliadin is the optional extension the AichatPaliadinService
|
||||
// implements. Handlers detect it via type assertion; backends that don't
|
||||
// implement it (the legacy local + remote paths) fall back to the
|
||||
// one-shot Paliadin.RunTurn.
|
||||
//
|
||||
// Contract:
|
||||
// - RunTurnStream MUST close `events` before returning, so the handler
|
||||
// loop terminates cleanly.
|
||||
// - Returning a non-nil error implies the audit row was already
|
||||
// stamped with an error_code; the handler does not double-stamp.
|
||||
// - The *TurnResult is populated even on partial failure when the
|
||||
// upstream produced any meaningful body — handlers may render it as
|
||||
// a salvaged best-effort result instead of an error.
|
||||
type StreamingPaliadin interface {
|
||||
Paliadin
|
||||
|
||||
// RunTurnStream drives one turn against the streaming upstream and
|
||||
// pushes StreamEvents onto `events` as they arrive. Blocks until the
|
||||
// upstream finishes or the context cancels. `events` is closed by
|
||||
// the implementation before this method returns.
|
||||
RunTurnStream(ctx context.Context, req TurnRequest, events chan<- StreamEvent) (*TurnResult, error)
|
||||
}
|
||||
|
||||
// AichatRecoverer is the optional extension that knows how to ask the
|
||||
// aichat backend "did this turn actually complete?" when paliad's local
|
||||
// audit row never got a response (because the transport dropped mid
|
||||
// turn). Implementations look up the persisted aichat_conversation_id,
|
||||
// query aichat's GET /chat/conversations/{id}/turns, find the matching
|
||||
// assistant turn, and write the response back to paliad's row.
|
||||
//
|
||||
// Returns (nil, nil) when aichat doesn't have the response either —
|
||||
// i.e. the turn is truly lost and the UI must degrade to "verloren"
|
||||
// copy rather than "wird nachgereicht".
|
||||
type AichatRecoverer interface {
|
||||
RecoverTurn(ctx context.Context, callerID, turnID uuid.UUID) (*PaliadinTurn, error)
|
||||
}
|
||||
|
||||
// safeSendStream pushes an event onto the channel, dropping on context
|
||||
// cancel. Mirrors the handler-side `send` helper but works against a
|
||||
// generic chan StreamEvent.
|
||||
func safeSendStream(ctx context.Context, ch chan<- StreamEvent, ev StreamEvent) {
|
||||
select {
|
||||
case ch <- ev:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user