package services // Streaming + recovery support for AichatPaliadinService (t-paliad-235). // // ============================================================================= // Upstream contract — /chat/turn/stream // ============================================================================= // // Source of truth: m/mAi internal/aichat/api/stream.go. Captured here as // inline doc so future debugging doesn't require chasing across repos: // // Request body: same shape as POST /chat/turn (TurnRequest mirror in // aichat_paliadin.go). Persona must support streaming; paliad's // "paliadin" persona does. // // Response: text/event-stream. Two SSE event flavours: // // 1. The default unnamed `data:` event carries a discriminated-union // JSON object keyed by `"type"`: // // {"type":"chunk","content":"…"} // {"type":"meta","used_tools":[…],"rows_seen":[…],"classifier_tag":"…"} // {"type":"done","turn_id":"…","conversation_id":"…", // "duration_ms":1234,"pane_spawned":false,"resumed":false} // {"type":"error","code":"…","message":"…","retryable":true} // // 2. The named `event: heartbeat` event carries: // // {"elapsed_seconds": N} // // Emitted every 5 s by the upstream while the runner has been // silent (no content). aichat keeps emitting these for the lifetime // of the runner so the client can render "Paliadin denkt nach // (N s)" without conflating with actual content. // // Errors before the stream starts (auth failure, persona unknown, // validation) come back as a normal JSON envelope with the appropriate // HTTP status — not SSE. Those land in callHTTP via decodeAichatError. // // ============================================================================= // Conversation-based late recovery // ============================================================================= // // Aichat exposes: // // GET /chat/conversations?persona=…&username=…&user_id=… // → list of ConversationSummary, ordered last_turn_at DESC // GET /chat/conversations/{id}/turns // → list of TurnRow (role=user|assistant, body, created_at) // // When paliad's stream drops mid-turn we: // 1. Look up paliad.paliadin_turns.aichat_conversation_id for the row. // 2. If unset (stream dropped before the `done` frame): list the user's // conversations and take the most recent one for the persona — // that's the pane our turn ran against (aichat owns one active // conversation per persona+user, see m/mAi#243). // 3. GET that conversation's turns. Find the latest assistant turn // whose preceding user-role turn body matches our user_message. // 4. Persist the response (completeTurnLate) and return it. // // If aichat returns no matching assistant turn → the turn is truly lost // (transport drop + upstream crash). Recovery returns (nil, nil) and // the handler degrades the UI to "verloren". import ( "bufio" "context" "encoding/json" "errors" "fmt" "io" "log" "net/http" "net/url" "strings" "time" "github.com/google/uuid" ) // ============================================================================= // Streaming RunTurnStream // ============================================================================= // RunTurnStream drives one /chat/turn/stream turn against aichat and // relays incremental events onto `events`. Closes `events` before // returning. Implements StreamingPaliadin. func (s *AichatPaliadinService) RunTurnStream(ctx context.Context, req TurnRequest, events chan<- StreamEvent) (*TurnResult, error) { defer close(events) s.turnMu.Lock() defer s.turnMu.Unlock() turnID := uuid.New() startedAt := time.Now().UTC() if err := s.insertTurnRow(ctx, &PaliadinTurn{ TurnID: turnID, UserID: req.UserID, SessionID: req.SessionID, StartedAt: startedAt, UserMessage: req.UserMessage, PageOrigin: optionalString(req.PageOrigin), }, req.Context); err != nil { return nil, fmt.Errorf("paliadin: insert turn row: %w", err) } if err := s.healthGate(ctx); err != nil { _ = s.markTurnError(ctx, turnID, "mriver_unreachable") safeSendStream(ctx, events, StreamEvent{ Kind: StreamError, Code: "mriver_unreachable", Message: err.Error(), }) return nil, err } username := s.usernameFor(ctx, req.UserID) session := s.cfg.Persona + ":" + username primer := s.buildPrimerExchanges(ctx, session, req) jwt, err := s.mintJWTIfConfigured(req.UserID) if err != nil { _ = s.markTurnError(ctx, turnID, "jwt_mint_failed") safeSendStream(ctx, events, StreamEvent{ Kind: StreamError, Code: "shim_error", Message: fmt.Sprintf("mint turn jwt: %v", err), }) return nil, fmt.Errorf("paliadin: mint turn jwt: %w", err) } body := aichatTurnRequest{ Persona: s.cfg.Persona, Username: username, UserID: req.UserID.String(), SessionID: req.SessionID, Message: sanitiseForTmux(req.UserMessage), JWT: jwt, Primer: primer, Meta: buildAichatMeta(req), } // Stream the upstream call. acc accumulates the full text so we can // persist the row + return a TurnResult on success. var ( acc strings.Builder streamMeta trailerMeta convID string paneSpawned bool upstreamDoneMs int64 ) streamErr := s.callStreamingHTTP(ctx, "/chat/turn/stream", body, func(frame streamFrame) { switch { case frame.event == "heartbeat": safeSendStream(ctx, events, StreamEvent{ Kind: StreamHeartbeat, ElapsedSeconds: frame.heartbeat.ElapsedSeconds, }) case frame.data.Type == "chunk": if frame.data.Content == "" { return } acc.WriteString(frame.data.Content) safeSendStream(ctx, events, StreamEvent{ Kind: StreamChunk, Content: frame.data.Content, }) case frame.data.Type == "meta": streamMeta = trailerMeta{ UsedTools: append([]string(nil), frame.data.UsedTools...), RowsSeen: coerceAichatRowsSeen(frame.data.RowsSeen), ClassifierTag: frame.data.ClassifierTag, } safeSendStream(ctx, events, StreamEvent{ Kind: StreamMeta, UsedTools: streamMeta.UsedTools, RowsSeen: streamMeta.RowsSeen, ClassifierTag: streamMeta.ClassifierTag, }) case frame.data.Type == "done": if frame.data.ConversationID != "" { convID = frame.data.ConversationID safeSendStream(ctx, events, StreamEvent{ Kind: StreamConversation, ConversationID: convID, }) } paneSpawned = frame.data.PaneSpawned upstreamDoneMs = frame.data.DurationMs case frame.data.Type == "error": // Forward as a stream error AND mark for non-nil err // propagation via the streamErr captured below. safeSendStream(ctx, events, StreamEvent{ Kind: StreamError, Code: frame.data.Code, Message: frame.data.Message, Retryable: frame.data.Retryable, }) } }) cleanBody := acc.String() tokens := approxTokenCount(cleanBody) chipCount := countChips(cleanBody) finished := time.Now().UTC() durationMS := int(finished.Sub(startedAt) / time.Millisecond) if upstreamDoneMs > 0 { durationMS = int(upstreamDoneMs) } // Persist the conversation id we learned (best-effort — failure here // just means recovery for THIS turn will have to list conversations // rather than fast-path to a single id). if convID != "" { if err := s.setAichatConversationID(ctx, turnID, convID); err != nil { log.Printf("paliadin: persist aichat conversation id %s: %v", convID, err) } } if streamErr != nil { // Aichat persona without streaming support — graceful fallback to // the one-shot /chat/turn endpoint. Same body shape; we adapt the // non-streaming response into a single StreamChunk so the caller // sees identical event ordering. if strings.Contains(streamErr.Error(), "unsupported_streaming") { log.Printf("paliadin: persona %q lacks streaming support — falling back to one-shot turn %s", s.cfg.Persona, turnID) return s.fallbackOneShotFromStream(ctx, turnID, body, events, startedAt, session) } // Don't overwrite an existing error_code we may have set above. _ = s.markTurnError(ctx, turnID, classifyAichatError(streamErr)) return nil, streamErr } // Aichat is stateless on user content; the client owns the primer. if paneSpawned { s.clearPrimed(session) } else { s.markPrimed(session) } if cleanBody == "" { // Upstream closed cleanly with no error event but no content // either (unexpected — log + treat as upstream_error so the // handler doesn't ship an empty bubble). _ = s.markTurnError(ctx, turnID, "shim_error") return nil, errors.New("aichat: stream closed with no content and no error") } if err := s.completeTurn(ctx, turnID, finished, durationMS, cleanBody, tokens, streamMeta, chipCount); err != nil { log.Printf("paliadin: complete turn %s: %v", turnID, err) } return &TurnResult{ TurnID: turnID, Response: cleanBody, UsedTools: streamMeta.UsedTools, RowsSeen: streamMeta.RowsSeen, ChipCount: chipCount, ClassifierTag: streamMeta.ClassifierTag, DurationMS: durationMS, }, nil } // fallbackOneShotFromStream runs the same `body` against aichat's // non-streaming /chat/turn endpoint and adapts the response into the // StreamingPaliadin contract — a single StreamChunk + StreamMeta + // StreamConversation, followed by `events` being closed by the // outer RunTurnStream's defer. Used when the configured persona doesn't // support streaming (aichat returns HTTP 400 unsupported_streaming). // // Identical persistence shape as the one-shot RunTurn: completeTurn + // markPrimed/clearPrimed. No new turn row (already inserted by // RunTurnStream). No primer rebuild (already in body). func (s *AichatPaliadinService) fallbackOneShotFromStream( ctx context.Context, turnID uuid.UUID, body aichatTurnRequest, events chan<- StreamEvent, startedAt time.Time, session string, ) (*TurnResult, error) { var resp aichatTurnResponse if err := s.callHTTP(ctx, http.MethodPost, "/chat/turn", body, &resp); err != nil { _ = s.markTurnError(ctx, turnID, classifyAichatError(err)) safeSendStream(ctx, events, StreamEvent{ Kind: StreamError, Code: classifyAichatError(err), Message: err.Error(), }) return nil, err } if resp.PaneSpawned { s.clearPrimed(session) } else { s.markPrimed(session) } cleanBody := resp.Response tokens := approxTokenCount(cleanBody) chipCount := countChips(cleanBody) finished := time.Now().UTC() durationMS := int(finished.Sub(startedAt) / time.Millisecond) tmeta := trailerMeta{ UsedTools: resp.Meta.UsedTools, ClassifierTag: resp.Meta.ClassifierTag, RowsSeen: coerceAichatRowsSeen(resp.Meta.RowsSeen), } // Emit the response as a single chunk so the frontend renders it. safeSendStream(ctx, events, StreamEvent{ Kind: StreamChunk, Content: cleanBody, }) safeSendStream(ctx, events, StreamEvent{ Kind: StreamMeta, UsedTools: tmeta.UsedTools, ClassifierTag: tmeta.ClassifierTag, RowsSeen: tmeta.RowsSeen, }) if err := s.completeTurn(ctx, turnID, finished, durationMS, cleanBody, tokens, tmeta, chipCount); err != nil { log.Printf("paliadin: complete turn %s (fallback one-shot): %v", turnID, err) } return &TurnResult{ TurnID: turnID, Response: cleanBody, UsedTools: tmeta.UsedTools, RowsSeen: tmeta.RowsSeen, ChipCount: chipCount, ClassifierTag: tmeta.ClassifierTag, DurationMS: durationMS, }, nil } // streamFrame is one decoded SSE event. type streamFrame struct { event string // "" → default (data:) event data streamDataFrame heartbeat streamHeartbeatFrame } type streamDataFrame struct { Type string `json:"type"` Content string `json:"content,omitempty"` UsedTools []string `json:"used_tools,omitempty"` RowsSeen []string `json:"rows_seen,omitempty"` ClassifierTag string `json:"classifier_tag,omitempty"` TurnID string `json:"turn_id,omitempty"` ConversationID string `json:"conversation_id,omitempty"` DurationMs int64 `json:"duration_ms,omitempty"` PaneSpawned bool `json:"pane_spawned,omitempty"` Resumed bool `json:"resumed,omitempty"` Code string `json:"code,omitempty"` Message string `json:"message,omitempty"` Retryable bool `json:"retryable,omitempty"` } type streamHeartbeatFrame struct { ElapsedSeconds int `json:"elapsed_seconds"` } // callStreamingHTTP opens a streaming POST to aichat and invokes `emit` // for each parsed SSE frame. Returns once the stream closes; surfaces // non-2xx responses via decodeAichatError, transport errors via the // underlying http.Client error. // // Tests can override the parsing path by setting streamHook (kept null // in production). func (s *AichatPaliadinService) callStreamingHTTP(ctx context.Context, path string, body any, emit func(streamFrame)) error { if s.streamHook != nil { return s.streamHook(ctx, path, body, emit) } buf, err := json.Marshal(body) if err != nil { return fmt.Errorf("aichat: encode %s body: %w", path, err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, s.cfg.BaseURL+path, strings.NewReader(string(buf))) if err != nil { return fmt.Errorf("aichat: build %s request: %w", path, err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Accept", "text/event-stream") if s.cfg.BearerToken != "" { httpReq.Header.Set("Authorization", "Bearer "+s.cfg.BearerToken) } // Use a dedicated client without the short Timeout — for streaming // we rely on the silence_timeout watch (no events for > 90 s ⇒ fail) // rather than a hard ceiling on the whole turn. The aichat upstream // keeps emitting heartbeats while it's alive, so a true upstream // stall is observable here. client := s.streamingClient() resp, err := client.Do(httpReq) if err != nil { return fmt.Errorf("aichat: POST %s: %w", path, err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10)) return decodeAichatError(resp.StatusCode, respBytes) } return parseSSEStream(ctx, resp.Body, emit) } // streamingClient returns an HTTP client tuned for streaming — no // per-request Timeout (kills mid-stream), but a long IdleConnTimeout so // the connection stays usable for multi-minute turns. func (s *AichatPaliadinService) streamingClient() *http.Client { if s.cfg.HTTPClient == nil { return &http.Client{Timeout: 0} } c := *s.cfg.HTTPClient c.Timeout = 0 return &c } // parseSSEStream tokenises an SSE byte stream into streamFrame events // and calls emit for each. Returns nil on clean EOF; returns the read // error otherwise. // // Frame format (per https://html.spec.whatwg.org/multipage/server-sent-events.html): // // event: \n // data: \n // \n // // Multiple `data:` lines per event are concatenated with `\n`. Lines // starting with `:` are comments and ignored. func parseSSEStream(ctx context.Context, r io.Reader, emit func(streamFrame)) error { br := bufio.NewReaderSize(r, 64<<10) var ( eventName string dataLines []string ) flush := func() { if len(dataLines) == 0 && eventName == "" { return } payload := strings.Join(dataLines, "\n") eventName = strings.TrimSpace(eventName) dataLines = nil eventOut := eventName eventName = "" if eventOut == "heartbeat" { var hb streamHeartbeatFrame if err := json.Unmarshal([]byte(payload), &hb); err != nil { return } emit(streamFrame{event: "heartbeat", heartbeat: hb}) return } // Default event (unnamed) — discriminated by `type` field. var d streamDataFrame if err := json.Unmarshal([]byte(payload), &d); err != nil { return } emit(streamFrame{event: "", data: d}) } for { if err := ctx.Err(); err != nil { return err } line, err := br.ReadString('\n') if err != nil { if errors.Is(err, io.EOF) { // Final frame may not be terminated by a blank line on // abrupt close — flush whatever we accumulated. if line != "" { processSSELine(line, &eventName, &dataLines) } flush() return nil } return fmt.Errorf("aichat: read sse: %w", err) } // Normalise line endings (some intermediaries send \r\n). line = strings.TrimRight(line, "\r\n") if line == "" { flush() continue } processSSELine(line, &eventName, &dataLines) } } // processSSELine handles one line of the SSE wire format. func processSSELine(line string, eventName *string, dataLines *[]string) { if strings.HasPrefix(line, ":") { return // comment / keep-alive } if idx := strings.IndexByte(line, ':'); idx >= 0 { field := line[:idx] value := line[idx+1:] if strings.HasPrefix(value, " ") { value = value[1:] } switch field { case "event": *eventName = value case "data": *dataLines = append(*dataLines, value) } return } // Field with no value (rare). Treat the whole line as field name // per spec. } // ============================================================================= // AichatRecoverer — late recovery via the conversation API // ============================================================================= // RecoverTurn asks aichat whether the given paliad turn has a response. // Returns the up-to-date row on success (including a freshly persisted // response when aichat had one), nil + nil when aichat doesn't know // either, or an error on transport / DB failures. func (s *AichatPaliadinService) RecoverTurn(ctx context.Context, callerID, turnID uuid.UUID) (*PaliadinTurn, error) { row, err := s.GetTurn(ctx, callerID, turnID) if err != nil { return nil, err } // Fast path: the row already has a response (the janitor or a // concurrent stream finished writing). Return it as-is. if row.Response != nil && *row.Response != "" { return row, nil } convID, err := s.resolveAichatConversationID(ctx, row) if err != nil { log.Printf("paliadin: recover %s: resolve conversation: %v", turnID, err) return nil, nil } if convID == "" { return nil, nil } turns, err := s.fetchAichatConversationTurns(ctx, convID) if err != nil { log.Printf("paliadin: recover %s: fetch turns: %v", turnID, err) return nil, nil } assistantBody := matchAssistantResponse(turns, row.UserMessage) if assistantBody == "" { return nil, nil } finished := time.Now().UTC() durationMS := int(finished.Sub(row.StartedAt) / time.Millisecond) tokens := approxTokenCount(assistantBody) chipCount := countChips(assistantBody) if err := s.completeTurnLate(ctx, turnID, finished, durationMS, assistantBody, tokens, trailerMeta{}, chipCount); err != nil { log.Printf("paliadin: recover %s: complete late: %v", turnID, err) return nil, err } // Re-read so the caller gets a row that reflects the late-write. return s.GetTurn(ctx, callerID, turnID) } // resolveAichatConversationID returns the conversation the turn lived // in. Fast path: read the column on the row. Fallback: list aichat // conversations for the user+persona and take the most recent. func (s *AichatPaliadinService) resolveAichatConversationID(ctx context.Context, row *PaliadinTurn) (string, error) { stored, err := s.getAichatConversationID(ctx, row.TurnID) if err != nil { return "", err } if stored != "" { return stored, nil } username := s.usernameFor(ctx, row.UserID) convs, err := s.listAichatConversations(ctx, username, row.UserID.String()) if err != nil { return "", err } if len(convs) == 0 { return "", nil } // Aichat orders by last_turn_at DESC; the head is the most recently // active conversation, which is the pane the lost turn ran against. return convs[0].ID, nil } // matchAssistantResponse walks the aichat turn list and returns the // body of the latest assistant turn whose preceding user-role turn body // matches `userMessage` (verbatim — aichat persists the raw message // the same way paliad does). // // Falls back to "the last assistant body in the conversation" when no // match is found but the conversation has assistant content. This // covers cases where aichat persisted the user turn with envelope // prefixes that don't exactly match our user_message (e.g. an embedded // [ctx …] block). func matchAssistantResponse(turns []aichatConversationTurn, userMessage string) string { wantedNorm := normaliseForMatch(userMessage) for i := 0; i < len(turns)-1; i++ { t := turns[i] if t.Role != "user" { continue } if normaliseForMatch(t.Body) != wantedNorm { continue } next := turns[i+1] if next.Role == "assistant" && next.Body != "" { return next.Body } } for i := len(turns) - 1; i >= 0; i-- { t := turns[i] if t.Role == "assistant" && t.Body != "" { return t.Body } } return "" } // normaliseForMatch lowercases, strips surrounding whitespace, and // collapses internal whitespace runs. Comparison only — no semantic // meaning beyond "did aichat persist the same prompt we sent". func normaliseForMatch(s string) string { s = strings.TrimSpace(strings.ToLower(s)) for strings.Contains(s, " ") { s = strings.ReplaceAll(s, " ", " ") } return s } // ============================================================================= // aichat conversation API client helpers // ============================================================================= type aichatConversationSummary struct { ID string `json:"id"` Persona string `json:"persona"` LastTurnAt string `json:"last_turn_at"` } type aichatListConversationsResponse struct { Conversations []aichatConversationSummary `json:"conversations"` } type aichatConversationTurn struct { ID string `json:"id"` Seq int `json:"seq"` Role string `json:"role"` Body string `json:"body"` CreatedAt string `json:"created_at"` } type aichatGetConversationTurnsResponse struct { ConversationID string `json:"conversation_id"` Turns []aichatConversationTurn `json:"turns"` HasMore bool `json:"has_more"` } // listAichatConversations calls GET /chat/conversations for the user. func (s *AichatPaliadinService) listAichatConversations(ctx context.Context, username, userID string) ([]aichatConversationSummary, error) { q := url.Values{} q.Set("persona", s.cfg.Persona) q.Set("username", username) q.Set("user_id", userID) q.Set("limit", "5") path := "/chat/conversations?" + q.Encode() var resp aichatListConversationsResponse if err := s.callHTTP(ctx, http.MethodGet, path, nil, &resp); err != nil { return nil, err } return resp.Conversations, nil } // fetchAichatConversationTurns calls GET /chat/conversations/{id}/turns. func (s *AichatPaliadinService) fetchAichatConversationTurns(ctx context.Context, convID string) ([]aichatConversationTurn, error) { q := url.Values{} q.Set("persona", s.cfg.Persona) q.Set("limit", "20") path := "/chat/conversations/" + url.PathEscape(convID) + "/turns?" + q.Encode() var resp aichatGetConversationTurnsResponse if err := s.callHTTP(ctx, http.MethodGet, path, nil, &resp); err != nil { return nil, err } return resp.Turns, nil } // ============================================================================= // DB helpers for paliadin_turns.aichat_conversation_id (migration 118) // ============================================================================= func (s *AichatPaliadinService) setAichatConversationID(ctx context.Context, turnID uuid.UUID, convID string) error { if convID == "" { return nil } convUUID, err := uuid.Parse(convID) if err != nil { return fmt.Errorf("invalid conversation id %q: %w", convID, err) } _, err = s.db.ExecContext(ctx, ` UPDATE paliad.paliadin_turns SET aichat_conversation_id = $2 WHERE turn_id = $1 AND aichat_conversation_id IS DISTINCT FROM $2 `, turnID, convUUID) return err } func (s *AichatPaliadinService) getAichatConversationID(ctx context.Context, turnID uuid.UUID) (string, error) { var convID *uuid.UUID err := s.db.QueryRowxContext(ctx, `SELECT aichat_conversation_id FROM paliad.paliadin_turns WHERE turn_id = $1`, turnID).Scan(&convID) if err != nil { return "", err } if convID == nil { return "", nil } return convID.String(), nil } // Compile-time interface conformance — fail the build if a streaming // method drifts off this backend. var _ StreamingPaliadin = (*AichatPaliadinService)(nil) var _ AichatRecoverer = (*AichatPaliadinService)(nil)