Symptom: paliadin chat returns "Verbindung verloren" because aichat's
paliadin persona is not configured with streaming support — every
RunTurnStream() call gets back HTTP 400 unsupported_streaming and the
SSE stream closes empty.
Fix: when RunTurnStream() detects "unsupported_streaming" in the
upstream error, transparently retry against /chat/turn (non-streaming)
with the same body. The full response gets emitted as a single
StreamChunk + StreamMeta so the SSE relay sees identical event
ordering. Persistence (completeTurn + markPrimed) mirrors the one-shot
RunTurn() path.
No real-time chunking until the persona is reconfigured upstream, but
the chat works end-to-end. Once the paliadin persona supports streaming
on aichat, this code path goes dormant — the unsupported_streaming
branch is only entered when the upstream actually returns that error.
Diagnostic logs from commit 937ff13 made this visible:
paliadin: backend returned error err=aichat: HTTP 400 (bad_request):
unsupported_streaming: persona paliadin does not support streaming
Refs m/paliad demo path.
737 lines
24 KiB
Go
737 lines
24 KiB
Go
package services
|
|
|
|
// Streaming + recovery support for AichatPaliadinService (t-paliad-235).
|
|
//
|
|
// =============================================================================
|
|
// Upstream contract — /chat/turn/stream
|
|
// =============================================================================
|
|
//
|
|
// Source of truth: m/mAi internal/aichat/api/stream.go. Captured here as
|
|
// inline doc so future debugging doesn't require chasing across repos:
|
|
//
|
|
// Request body: same shape as POST /chat/turn (TurnRequest mirror in
|
|
// aichat_paliadin.go). Persona must support streaming; paliad's
|
|
// "paliadin" persona does.
|
|
//
|
|
// Response: text/event-stream. Two SSE event flavours:
|
|
//
|
|
// 1. The default unnamed `data:` event carries a discriminated-union
|
|
// JSON object keyed by `"type"`:
|
|
//
|
|
// {"type":"chunk","content":"…"}
|
|
// {"type":"meta","used_tools":[…],"rows_seen":[…],"classifier_tag":"…"}
|
|
// {"type":"done","turn_id":"…","conversation_id":"…",
|
|
// "duration_ms":1234,"pane_spawned":false,"resumed":false}
|
|
// {"type":"error","code":"…","message":"…","retryable":true}
|
|
//
|
|
// 2. The named `event: heartbeat` event carries:
|
|
//
|
|
// {"elapsed_seconds": N}
|
|
//
|
|
// Emitted every 5 s by the upstream while the runner has been
|
|
// silent (no content). aichat keeps emitting these for the lifetime
|
|
// of the runner so the client can render "Paliadin denkt nach
|
|
// (N s)" without conflating with actual content.
|
|
//
|
|
// Errors before the stream starts (auth failure, persona unknown,
|
|
// validation) come back as a normal JSON envelope with the appropriate
|
|
// HTTP status — not SSE. Those land in callHTTP via decodeAichatError.
|
|
//
|
|
// =============================================================================
|
|
// Conversation-based late recovery
|
|
// =============================================================================
|
|
//
|
|
// Aichat exposes:
|
|
//
|
|
// GET /chat/conversations?persona=…&username=…&user_id=…
|
|
// → list of ConversationSummary, ordered last_turn_at DESC
|
|
// GET /chat/conversations/{id}/turns
|
|
// → list of TurnRow (role=user|assistant, body, created_at)
|
|
//
|
|
// When paliad's stream drops mid-turn we:
|
|
// 1. Look up paliad.paliadin_turns.aichat_conversation_id for the row.
|
|
// 2. If unset (stream dropped before the `done` frame): list the user's
|
|
// conversations and take the most recent one for the persona —
|
|
// that's the pane our turn ran against (aichat owns one active
|
|
// conversation per persona+user, see m/mAi#243).
|
|
// 3. GET that conversation's turns. Find the latest assistant turn
|
|
// whose preceding user-role turn body matches our user_message.
|
|
// 4. Persist the response (completeTurnLate) and return it.
|
|
//
|
|
// If aichat returns no matching assistant turn → the turn is truly lost
|
|
// (transport drop + upstream crash). Recovery returns (nil, nil) and
|
|
// the handler degrades the UI to "verloren".
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// =============================================================================
|
|
// Streaming RunTurnStream
|
|
// =============================================================================
|
|
|
|
// RunTurnStream drives one /chat/turn/stream turn against aichat and
|
|
// relays incremental events onto `events`. Closes `events` before
|
|
// returning. Implements StreamingPaliadin.
|
|
func (s *AichatPaliadinService) RunTurnStream(ctx context.Context, req TurnRequest, events chan<- StreamEvent) (*TurnResult, error) {
|
|
defer close(events)
|
|
|
|
s.turnMu.Lock()
|
|
defer s.turnMu.Unlock()
|
|
|
|
turnID := uuid.New()
|
|
startedAt := time.Now().UTC()
|
|
|
|
if err := s.insertTurnRow(ctx, &PaliadinTurn{
|
|
TurnID: turnID,
|
|
UserID: req.UserID,
|
|
SessionID: req.SessionID,
|
|
StartedAt: startedAt,
|
|
UserMessage: req.UserMessage,
|
|
PageOrigin: optionalString(req.PageOrigin),
|
|
}, req.Context); err != nil {
|
|
return nil, fmt.Errorf("paliadin: insert turn row: %w", err)
|
|
}
|
|
|
|
if err := s.healthGate(ctx); err != nil {
|
|
_ = s.markTurnError(ctx, turnID, "mriver_unreachable")
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamError,
|
|
Code: "mriver_unreachable",
|
|
Message: err.Error(),
|
|
})
|
|
return nil, err
|
|
}
|
|
|
|
username := s.usernameFor(ctx, req.UserID)
|
|
session := s.cfg.Persona + ":" + username
|
|
primer := s.buildPrimerExchanges(ctx, session, req)
|
|
|
|
jwt, err := s.mintJWTIfConfigured(req.UserID)
|
|
if err != nil {
|
|
_ = s.markTurnError(ctx, turnID, "jwt_mint_failed")
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamError,
|
|
Code: "shim_error",
|
|
Message: fmt.Sprintf("mint turn jwt: %v", err),
|
|
})
|
|
return nil, fmt.Errorf("paliadin: mint turn jwt: %w", err)
|
|
}
|
|
|
|
body := aichatTurnRequest{
|
|
Persona: s.cfg.Persona,
|
|
Username: username,
|
|
UserID: req.UserID.String(),
|
|
SessionID: req.SessionID,
|
|
Message: sanitiseForTmux(req.UserMessage),
|
|
JWT: jwt,
|
|
Primer: primer,
|
|
Meta: buildAichatMeta(req),
|
|
}
|
|
|
|
// Stream the upstream call. acc accumulates the full text so we can
|
|
// persist the row + return a TurnResult on success.
|
|
var (
|
|
acc strings.Builder
|
|
streamMeta trailerMeta
|
|
convID string
|
|
paneSpawned bool
|
|
upstreamDoneMs int64
|
|
)
|
|
|
|
streamErr := s.callStreamingHTTP(ctx, "/chat/turn/stream", body, func(frame streamFrame) {
|
|
switch {
|
|
case frame.event == "heartbeat":
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamHeartbeat,
|
|
ElapsedSeconds: frame.heartbeat.ElapsedSeconds,
|
|
})
|
|
case frame.data.Type == "chunk":
|
|
if frame.data.Content == "" {
|
|
return
|
|
}
|
|
acc.WriteString(frame.data.Content)
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamChunk,
|
|
Content: frame.data.Content,
|
|
})
|
|
case frame.data.Type == "meta":
|
|
streamMeta = trailerMeta{
|
|
UsedTools: append([]string(nil), frame.data.UsedTools...),
|
|
RowsSeen: coerceAichatRowsSeen(frame.data.RowsSeen),
|
|
ClassifierTag: frame.data.ClassifierTag,
|
|
}
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamMeta,
|
|
UsedTools: streamMeta.UsedTools,
|
|
RowsSeen: streamMeta.RowsSeen,
|
|
ClassifierTag: streamMeta.ClassifierTag,
|
|
})
|
|
case frame.data.Type == "done":
|
|
if frame.data.ConversationID != "" {
|
|
convID = frame.data.ConversationID
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamConversation,
|
|
ConversationID: convID,
|
|
})
|
|
}
|
|
paneSpawned = frame.data.PaneSpawned
|
|
upstreamDoneMs = frame.data.DurationMs
|
|
case frame.data.Type == "error":
|
|
// Forward as a stream error AND mark for non-nil err
|
|
// propagation via the streamErr captured below.
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamError,
|
|
Code: frame.data.Code,
|
|
Message: frame.data.Message,
|
|
Retryable: frame.data.Retryable,
|
|
})
|
|
}
|
|
})
|
|
|
|
cleanBody := acc.String()
|
|
tokens := approxTokenCount(cleanBody)
|
|
chipCount := countChips(cleanBody)
|
|
finished := time.Now().UTC()
|
|
durationMS := int(finished.Sub(startedAt) / time.Millisecond)
|
|
if upstreamDoneMs > 0 {
|
|
durationMS = int(upstreamDoneMs)
|
|
}
|
|
|
|
// Persist the conversation id we learned (best-effort — failure here
|
|
// just means recovery for THIS turn will have to list conversations
|
|
// rather than fast-path to a single id).
|
|
if convID != "" {
|
|
if err := s.setAichatConversationID(ctx, turnID, convID); err != nil {
|
|
log.Printf("paliadin: persist aichat conversation id %s: %v", convID, err)
|
|
}
|
|
}
|
|
|
|
if streamErr != nil {
|
|
// Aichat persona without streaming support — graceful fallback to
|
|
// the one-shot /chat/turn endpoint. Same body shape; we adapt the
|
|
// non-streaming response into a single StreamChunk so the caller
|
|
// sees identical event ordering.
|
|
if strings.Contains(streamErr.Error(), "unsupported_streaming") {
|
|
log.Printf("paliadin: persona %q lacks streaming support — falling back to one-shot turn %s", s.cfg.Persona, turnID)
|
|
return s.fallbackOneShotFromStream(ctx, turnID, body, events, startedAt, session)
|
|
}
|
|
// Don't overwrite an existing error_code we may have set above.
|
|
_ = s.markTurnError(ctx, turnID, classifyAichatError(streamErr))
|
|
return nil, streamErr
|
|
}
|
|
|
|
// Aichat is stateless on user content; the client owns the primer.
|
|
if paneSpawned {
|
|
s.clearPrimed(session)
|
|
} else {
|
|
s.markPrimed(session)
|
|
}
|
|
|
|
if cleanBody == "" {
|
|
// Upstream closed cleanly with no error event but no content
|
|
// either (unexpected — log + treat as upstream_error so the
|
|
// handler doesn't ship an empty bubble).
|
|
_ = s.markTurnError(ctx, turnID, "shim_error")
|
|
return nil, errors.New("aichat: stream closed with no content and no error")
|
|
}
|
|
|
|
if err := s.completeTurn(ctx, turnID, finished, durationMS, cleanBody, tokens, streamMeta, chipCount); err != nil {
|
|
log.Printf("paliadin: complete turn %s: %v", turnID, err)
|
|
}
|
|
|
|
return &TurnResult{
|
|
TurnID: turnID,
|
|
Response: cleanBody,
|
|
UsedTools: streamMeta.UsedTools,
|
|
RowsSeen: streamMeta.RowsSeen,
|
|
ChipCount: chipCount,
|
|
ClassifierTag: streamMeta.ClassifierTag,
|
|
DurationMS: durationMS,
|
|
}, nil
|
|
}
|
|
|
|
// fallbackOneShotFromStream runs the same `body` against aichat's
|
|
// non-streaming /chat/turn endpoint and adapts the response into the
|
|
// StreamingPaliadin contract — a single StreamChunk + StreamMeta +
|
|
// StreamConversation, followed by `events` being closed by the
|
|
// outer RunTurnStream's defer. Used when the configured persona doesn't
|
|
// support streaming (aichat returns HTTP 400 unsupported_streaming).
|
|
//
|
|
// Identical persistence shape as the one-shot RunTurn: completeTurn +
|
|
// markPrimed/clearPrimed. No new turn row (already inserted by
|
|
// RunTurnStream). No primer rebuild (already in body).
|
|
func (s *AichatPaliadinService) fallbackOneShotFromStream(
|
|
ctx context.Context,
|
|
turnID uuid.UUID,
|
|
body aichatTurnRequest,
|
|
events chan<- StreamEvent,
|
|
startedAt time.Time,
|
|
session string,
|
|
) (*TurnResult, error) {
|
|
var resp aichatTurnResponse
|
|
if err := s.callHTTP(ctx, http.MethodPost, "/chat/turn", body, &resp); err != nil {
|
|
_ = s.markTurnError(ctx, turnID, classifyAichatError(err))
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamError,
|
|
Code: classifyAichatError(err),
|
|
Message: err.Error(),
|
|
})
|
|
return nil, err
|
|
}
|
|
|
|
if resp.PaneSpawned {
|
|
s.clearPrimed(session)
|
|
} else {
|
|
s.markPrimed(session)
|
|
}
|
|
|
|
cleanBody := resp.Response
|
|
tokens := approxTokenCount(cleanBody)
|
|
chipCount := countChips(cleanBody)
|
|
finished := time.Now().UTC()
|
|
durationMS := int(finished.Sub(startedAt) / time.Millisecond)
|
|
|
|
tmeta := trailerMeta{
|
|
UsedTools: resp.Meta.UsedTools,
|
|
ClassifierTag: resp.Meta.ClassifierTag,
|
|
RowsSeen: coerceAichatRowsSeen(resp.Meta.RowsSeen),
|
|
}
|
|
|
|
// Emit the response as a single chunk so the frontend renders it.
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamChunk,
|
|
Content: cleanBody,
|
|
})
|
|
safeSendStream(ctx, events, StreamEvent{
|
|
Kind: StreamMeta,
|
|
UsedTools: tmeta.UsedTools,
|
|
ClassifierTag: tmeta.ClassifierTag,
|
|
RowsSeen: tmeta.RowsSeen,
|
|
})
|
|
|
|
if err := s.completeTurn(ctx, turnID, finished, durationMS, cleanBody, tokens, tmeta, chipCount); err != nil {
|
|
log.Printf("paliadin: complete turn %s (fallback one-shot): %v", turnID, err)
|
|
}
|
|
|
|
return &TurnResult{
|
|
TurnID: turnID,
|
|
Response: cleanBody,
|
|
UsedTools: tmeta.UsedTools,
|
|
RowsSeen: tmeta.RowsSeen,
|
|
ChipCount: chipCount,
|
|
ClassifierTag: tmeta.ClassifierTag,
|
|
DurationMS: durationMS,
|
|
}, nil
|
|
}
|
|
|
|
// streamFrame is one decoded SSE event.
|
|
type streamFrame struct {
|
|
event string // "" → default (data:) event
|
|
data streamDataFrame
|
|
heartbeat streamHeartbeatFrame
|
|
}
|
|
|
|
type streamDataFrame struct {
|
|
Type string `json:"type"`
|
|
Content string `json:"content,omitempty"`
|
|
UsedTools []string `json:"used_tools,omitempty"`
|
|
RowsSeen []string `json:"rows_seen,omitempty"`
|
|
ClassifierTag string `json:"classifier_tag,omitempty"`
|
|
TurnID string `json:"turn_id,omitempty"`
|
|
ConversationID string `json:"conversation_id,omitempty"`
|
|
DurationMs int64 `json:"duration_ms,omitempty"`
|
|
PaneSpawned bool `json:"pane_spawned,omitempty"`
|
|
Resumed bool `json:"resumed,omitempty"`
|
|
Code string `json:"code,omitempty"`
|
|
Message string `json:"message,omitempty"`
|
|
Retryable bool `json:"retryable,omitempty"`
|
|
}
|
|
|
|
type streamHeartbeatFrame struct {
|
|
ElapsedSeconds int `json:"elapsed_seconds"`
|
|
}
|
|
|
|
// callStreamingHTTP opens a streaming POST to aichat and invokes `emit`
|
|
// for each parsed SSE frame. Returns once the stream closes; surfaces
|
|
// non-2xx responses via decodeAichatError, transport errors via the
|
|
// underlying http.Client error.
|
|
//
|
|
// Tests can override the parsing path by setting streamHook (kept null
|
|
// in production).
|
|
func (s *AichatPaliadinService) callStreamingHTTP(ctx context.Context, path string, body any, emit func(streamFrame)) error {
|
|
if s.streamHook != nil {
|
|
return s.streamHook(ctx, path, body, emit)
|
|
}
|
|
|
|
buf, err := json.Marshal(body)
|
|
if err != nil {
|
|
return fmt.Errorf("aichat: encode %s body: %w", path, err)
|
|
}
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, s.cfg.BaseURL+path, strings.NewReader(string(buf)))
|
|
if err != nil {
|
|
return fmt.Errorf("aichat: build %s request: %w", path, err)
|
|
}
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
httpReq.Header.Set("Accept", "text/event-stream")
|
|
if s.cfg.BearerToken != "" {
|
|
httpReq.Header.Set("Authorization", "Bearer "+s.cfg.BearerToken)
|
|
}
|
|
|
|
// Use a dedicated client without the short Timeout — for streaming
|
|
// we rely on the silence_timeout watch (no events for > 90 s ⇒ fail)
|
|
// rather than a hard ceiling on the whole turn. The aichat upstream
|
|
// keeps emitting heartbeats while it's alive, so a true upstream
|
|
// stall is observable here.
|
|
client := s.streamingClient()
|
|
resp, err := client.Do(httpReq)
|
|
if err != nil {
|
|
return fmt.Errorf("aichat: POST %s: %w", path, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10))
|
|
return decodeAichatError(resp.StatusCode, respBytes)
|
|
}
|
|
|
|
return parseSSEStream(ctx, resp.Body, emit)
|
|
}
|
|
|
|
// streamingClient returns an HTTP client tuned for streaming — no
|
|
// per-request Timeout (kills mid-stream), but a long IdleConnTimeout so
|
|
// the connection stays usable for multi-minute turns.
|
|
func (s *AichatPaliadinService) streamingClient() *http.Client {
|
|
if s.cfg.HTTPClient == nil {
|
|
return &http.Client{Timeout: 0}
|
|
}
|
|
c := *s.cfg.HTTPClient
|
|
c.Timeout = 0
|
|
return &c
|
|
}
|
|
|
|
// parseSSEStream tokenises an SSE byte stream into streamFrame events
|
|
// and calls emit for each. Returns nil on clean EOF; returns the read
|
|
// error otherwise.
|
|
//
|
|
// Frame format (per https://html.spec.whatwg.org/multipage/server-sent-events.html):
|
|
//
|
|
// event: <name>\n
|
|
// data: <payload>\n
|
|
// <blank line>\n
|
|
//
|
|
// Multiple `data:` lines per event are concatenated with `\n`. Lines
|
|
// starting with `:` are comments and ignored.
|
|
func parseSSEStream(ctx context.Context, r io.Reader, emit func(streamFrame)) error {
|
|
br := bufio.NewReaderSize(r, 64<<10)
|
|
var (
|
|
eventName string
|
|
dataLines []string
|
|
)
|
|
flush := func() {
|
|
if len(dataLines) == 0 && eventName == "" {
|
|
return
|
|
}
|
|
payload := strings.Join(dataLines, "\n")
|
|
eventName = strings.TrimSpace(eventName)
|
|
dataLines = nil
|
|
eventOut := eventName
|
|
eventName = ""
|
|
if eventOut == "heartbeat" {
|
|
var hb streamHeartbeatFrame
|
|
if err := json.Unmarshal([]byte(payload), &hb); err != nil {
|
|
return
|
|
}
|
|
emit(streamFrame{event: "heartbeat", heartbeat: hb})
|
|
return
|
|
}
|
|
// Default event (unnamed) — discriminated by `type` field.
|
|
var d streamDataFrame
|
|
if err := json.Unmarshal([]byte(payload), &d); err != nil {
|
|
return
|
|
}
|
|
emit(streamFrame{event: "", data: d})
|
|
}
|
|
|
|
for {
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
line, err := br.ReadString('\n')
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
// Final frame may not be terminated by a blank line on
|
|
// abrupt close — flush whatever we accumulated.
|
|
if line != "" {
|
|
processSSELine(line, &eventName, &dataLines)
|
|
}
|
|
flush()
|
|
return nil
|
|
}
|
|
return fmt.Errorf("aichat: read sse: %w", err)
|
|
}
|
|
// Normalise line endings (some intermediaries send \r\n).
|
|
line = strings.TrimRight(line, "\r\n")
|
|
if line == "" {
|
|
flush()
|
|
continue
|
|
}
|
|
processSSELine(line, &eventName, &dataLines)
|
|
}
|
|
}
|
|
|
|
// processSSELine handles one line of the SSE wire format.
|
|
func processSSELine(line string, eventName *string, dataLines *[]string) {
|
|
if strings.HasPrefix(line, ":") {
|
|
return // comment / keep-alive
|
|
}
|
|
if idx := strings.IndexByte(line, ':'); idx >= 0 {
|
|
field := line[:idx]
|
|
value := line[idx+1:]
|
|
if strings.HasPrefix(value, " ") {
|
|
value = value[1:]
|
|
}
|
|
switch field {
|
|
case "event":
|
|
*eventName = value
|
|
case "data":
|
|
*dataLines = append(*dataLines, value)
|
|
}
|
|
return
|
|
}
|
|
// Field with no value (rare). Treat the whole line as field name
|
|
// per spec.
|
|
}
|
|
|
|
// =============================================================================
|
|
// AichatRecoverer — late recovery via the conversation API
|
|
// =============================================================================
|
|
|
|
// RecoverTurn asks aichat whether the given paliad turn has a response.
|
|
// Returns the up-to-date row on success (including a freshly persisted
|
|
// response when aichat had one), nil + nil when aichat doesn't know
|
|
// either, or an error on transport / DB failures.
|
|
func (s *AichatPaliadinService) RecoverTurn(ctx context.Context, callerID, turnID uuid.UUID) (*PaliadinTurn, error) {
|
|
row, err := s.GetTurn(ctx, callerID, turnID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Fast path: the row already has a response (the janitor or a
|
|
// concurrent stream finished writing). Return it as-is.
|
|
if row.Response != nil && *row.Response != "" {
|
|
return row, nil
|
|
}
|
|
|
|
convID, err := s.resolveAichatConversationID(ctx, row)
|
|
if err != nil {
|
|
log.Printf("paliadin: recover %s: resolve conversation: %v", turnID, err)
|
|
return nil, nil
|
|
}
|
|
if convID == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
turns, err := s.fetchAichatConversationTurns(ctx, convID)
|
|
if err != nil {
|
|
log.Printf("paliadin: recover %s: fetch turns: %v", turnID, err)
|
|
return nil, nil
|
|
}
|
|
|
|
assistantBody := matchAssistantResponse(turns, row.UserMessage)
|
|
if assistantBody == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
finished := time.Now().UTC()
|
|
durationMS := int(finished.Sub(row.StartedAt) / time.Millisecond)
|
|
tokens := approxTokenCount(assistantBody)
|
|
chipCount := countChips(assistantBody)
|
|
|
|
if err := s.completeTurnLate(ctx, turnID, finished, durationMS, assistantBody, tokens, trailerMeta{}, chipCount); err != nil {
|
|
log.Printf("paliadin: recover %s: complete late: %v", turnID, err)
|
|
return nil, err
|
|
}
|
|
|
|
// Re-read so the caller gets a row that reflects the late-write.
|
|
return s.GetTurn(ctx, callerID, turnID)
|
|
}
|
|
|
|
// resolveAichatConversationID returns the conversation the turn lived
|
|
// in. Fast path: read the column on the row. Fallback: list aichat
|
|
// conversations for the user+persona and take the most recent.
|
|
func (s *AichatPaliadinService) resolveAichatConversationID(ctx context.Context, row *PaliadinTurn) (string, error) {
|
|
stored, err := s.getAichatConversationID(ctx, row.TurnID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if stored != "" {
|
|
return stored, nil
|
|
}
|
|
username := s.usernameFor(ctx, row.UserID)
|
|
convs, err := s.listAichatConversations(ctx, username, row.UserID.String())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if len(convs) == 0 {
|
|
return "", nil
|
|
}
|
|
// Aichat orders by last_turn_at DESC; the head is the most recently
|
|
// active conversation, which is the pane the lost turn ran against.
|
|
return convs[0].ID, nil
|
|
}
|
|
|
|
// matchAssistantResponse walks the aichat turn list and returns the
|
|
// body of the latest assistant turn whose preceding user-role turn body
|
|
// matches `userMessage` (verbatim — aichat persists the raw message
|
|
// the same way paliad does).
|
|
//
|
|
// Falls back to "the last assistant body in the conversation" when no
|
|
// match is found but the conversation has assistant content. This
|
|
// covers cases where aichat persisted the user turn with envelope
|
|
// prefixes that don't exactly match our user_message (e.g. an embedded
|
|
// [ctx …] block).
|
|
func matchAssistantResponse(turns []aichatConversationTurn, userMessage string) string {
|
|
wantedNorm := normaliseForMatch(userMessage)
|
|
|
|
for i := 0; i < len(turns)-1; i++ {
|
|
t := turns[i]
|
|
if t.Role != "user" {
|
|
continue
|
|
}
|
|
if normaliseForMatch(t.Body) != wantedNorm {
|
|
continue
|
|
}
|
|
next := turns[i+1]
|
|
if next.Role == "assistant" && next.Body != "" {
|
|
return next.Body
|
|
}
|
|
}
|
|
|
|
for i := len(turns) - 1; i >= 0; i-- {
|
|
t := turns[i]
|
|
if t.Role == "assistant" && t.Body != "" {
|
|
return t.Body
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// normaliseForMatch lowercases, strips surrounding whitespace, and
|
|
// collapses internal whitespace runs. Comparison only — no semantic
|
|
// meaning beyond "did aichat persist the same prompt we sent".
|
|
func normaliseForMatch(s string) string {
|
|
s = strings.TrimSpace(strings.ToLower(s))
|
|
for strings.Contains(s, " ") {
|
|
s = strings.ReplaceAll(s, " ", " ")
|
|
}
|
|
return s
|
|
}
|
|
|
|
// =============================================================================
|
|
// aichat conversation API client helpers
|
|
// =============================================================================
|
|
|
|
type aichatConversationSummary struct {
|
|
ID string `json:"id"`
|
|
Persona string `json:"persona"`
|
|
LastTurnAt string `json:"last_turn_at"`
|
|
}
|
|
|
|
type aichatListConversationsResponse struct {
|
|
Conversations []aichatConversationSummary `json:"conversations"`
|
|
}
|
|
|
|
type aichatConversationTurn struct {
|
|
ID string `json:"id"`
|
|
Seq int `json:"seq"`
|
|
Role string `json:"role"`
|
|
Body string `json:"body"`
|
|
CreatedAt string `json:"created_at"`
|
|
}
|
|
|
|
type aichatGetConversationTurnsResponse struct {
|
|
ConversationID string `json:"conversation_id"`
|
|
Turns []aichatConversationTurn `json:"turns"`
|
|
HasMore bool `json:"has_more"`
|
|
}
|
|
|
|
// listAichatConversations calls GET /chat/conversations for the user.
|
|
func (s *AichatPaliadinService) listAichatConversations(ctx context.Context, username, userID string) ([]aichatConversationSummary, error) {
|
|
q := url.Values{}
|
|
q.Set("persona", s.cfg.Persona)
|
|
q.Set("username", username)
|
|
q.Set("user_id", userID)
|
|
q.Set("limit", "5")
|
|
path := "/chat/conversations?" + q.Encode()
|
|
var resp aichatListConversationsResponse
|
|
if err := s.callHTTP(ctx, http.MethodGet, path, nil, &resp); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp.Conversations, nil
|
|
}
|
|
|
|
// fetchAichatConversationTurns calls GET /chat/conversations/{id}/turns.
|
|
func (s *AichatPaliadinService) fetchAichatConversationTurns(ctx context.Context, convID string) ([]aichatConversationTurn, error) {
|
|
q := url.Values{}
|
|
q.Set("persona", s.cfg.Persona)
|
|
q.Set("limit", "20")
|
|
path := "/chat/conversations/" + url.PathEscape(convID) + "/turns?" + q.Encode()
|
|
var resp aichatGetConversationTurnsResponse
|
|
if err := s.callHTTP(ctx, http.MethodGet, path, nil, &resp); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp.Turns, nil
|
|
}
|
|
|
|
// =============================================================================
|
|
// DB helpers for paliadin_turns.aichat_conversation_id (migration 118)
|
|
// =============================================================================
|
|
|
|
func (s *AichatPaliadinService) setAichatConversationID(ctx context.Context, turnID uuid.UUID, convID string) error {
|
|
if convID == "" {
|
|
return nil
|
|
}
|
|
convUUID, err := uuid.Parse(convID)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid conversation id %q: %w", convID, err)
|
|
}
|
|
_, err = s.db.ExecContext(ctx, `
|
|
UPDATE paliad.paliadin_turns
|
|
SET aichat_conversation_id = $2
|
|
WHERE turn_id = $1
|
|
AND aichat_conversation_id IS DISTINCT FROM $2
|
|
`, turnID, convUUID)
|
|
return err
|
|
}
|
|
|
|
func (s *AichatPaliadinService) getAichatConversationID(ctx context.Context, turnID uuid.UUID) (string, error) {
|
|
var convID *uuid.UUID
|
|
err := s.db.QueryRowxContext(ctx,
|
|
`SELECT aichat_conversation_id FROM paliad.paliadin_turns WHERE turn_id = $1`,
|
|
turnID).Scan(&convID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if convID == nil {
|
|
return "", nil
|
|
}
|
|
return convID.String(), nil
|
|
}
|
|
|
|
// Compile-time interface conformance — fail the build if a streaming
|
|
// method drifts off this backend.
|
|
var _ StreamingPaliadin = (*AichatPaliadinService)(nil)
|
|
var _ AichatRecoverer = (*AichatPaliadinService)(nil)
|