Files
paliad/internal/services/aichat_paliadin_stream_test.go
mAi cdd27d674e feat(paliadin): stream + honest late-recovery (t-paliad-235)
m's 14:56 observation: long Paliadin turns showed "Verbindung verloren —
Antwort wird nachgereicht …" but never delivered. The aichat backend
finished the turn upstream; paliad's HTTP client had given up at 130 s
and the legacy filesystem janitor never ran for the aichat path.

Three intertwined fixes, all shipped together because they share the
same wire shape and the same UI states:

1. Switch the aichat backend to /chat/turn/stream
   - new AichatPaliadinService.RunTurnStream relays incremental chunks
   - SSE parser handles default `data:` frames (chunk/meta/done/error)
     and named `event: heartbeat` frames per the upstream contract
   - no more 130 s hard ceiling — stream stays open as long as data or
     heartbeats flow; silenceTimeout (90 s) catches a true upstream
     stall instead

2. Proof-of-life thinking events
   - handler emits `event: thinking` every 5 s while the upstream is
     silent (synthesised locally) AND relays aichat's `heartbeat`
     events as thinking pings
   - frontend renders a lime-dot pulse + monospace counter inside the
     assistant bubble — the user can SEE the chat is still working

3. Honest disconnect copy + real late-recovery
   - new dispatching endpoint GET /api/paliadin/turns/{id}/recover
   - aichat backend: asks aichat via GET /chat/conversations and
     /chat/conversations/{id}/turns whether the turn actually finished
   - legacy backend: falls through to the local row read (janitor)
   - frontend swaps "wird nachgereicht" → "Lade frische Antwort …"
     while the recovery polls; on confirmed "lost" swaps to
     "Antwort konnte nicht zugestellt werden — bitte erneut stellen"
   - migration 118 adds aichat_conversation_id to paliadin_turns so
     the recovery has a fast path when the done frame arrived before
     the drop

Streaming + recovery are a no-op for PALIADIN_BACKEND=legacy: the
StreamingPaliadin interface is detected via type assertion, the
LocalPaliadinService stays on the one-shot RunTurn + filesystem
janitor path.

13 new unit tests cover the SSE parser, the conversation-API client,
and the match-assistant-response helper.

go build ./... + go test ./internal/... + go test ./cmd/server/...
+ bun run build all clean.
2026-05-22 15:17:24 +02:00

260 lines
8.5 KiB
Go

package services
// Streaming + recovery tests for AichatPaliadinService (t-paliad-235).
//
// Like the sync-path tests next door, every test bypasses the HTTP wire
// (streamHook / httpHook). DB-write paths in RunTurnStream are out of
// scope here for the same reason — paliad has no sqlx mock. We focus
// on the SSE parser, the conversation-API client, and the
// match-assistant-response helper.
import (
"context"
"strings"
"testing"
)
// =============================================================================
// SSE parser
// =============================================================================
func TestParseSSEStream_DefaultEvents(t *testing.T) {
body := `data: {"type":"chunk","content":"Hello "}
data: {"type":"chunk","content":"world"}
data: {"type":"meta","used_tools":["search"],"rows_seen":["3"],"classifier_tag":"howto"}
data: {"type":"done","turn_id":"abc","conversation_id":"11111111-1111-1111-1111-111111111111","duration_ms":1234,"pane_spawned":false,"resumed":false}
`
var frames []streamFrame
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
frames = append(frames, f)
})
if err != nil {
t.Fatalf("parseSSEStream: %v", err)
}
if len(frames) != 4 {
t.Fatalf("got %d frames; want 4 (%+v)", len(frames), frames)
}
if frames[0].data.Type != "chunk" || frames[0].data.Content != "Hello " {
t.Errorf("frame 0 = %+v; want chunk Hello ", frames[0])
}
if frames[1].data.Type != "chunk" || frames[1].data.Content != "world" {
t.Errorf("frame 1 = %+v; want chunk world", frames[1])
}
if frames[2].data.Type != "meta" || frames[2].data.ClassifierTag != "howto" {
t.Errorf("frame 2 = %+v; want meta howto", frames[2])
}
if frames[3].data.Type != "done" || frames[3].data.ConversationID == "" {
t.Errorf("frame 3 = %+v; want done with conversation_id", frames[3])
}
}
func TestParseSSEStream_HeartbeatEvent(t *testing.T) {
body := `event: heartbeat
data: {"elapsed_seconds": 5}
event: heartbeat
data: {"elapsed_seconds": 10}
data: {"type":"chunk","content":"Hi"}
`
var frames []streamFrame
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
frames = append(frames, f)
})
if err != nil {
t.Fatalf("parseSSEStream: %v", err)
}
if len(frames) != 3 {
t.Fatalf("got %d frames; want 3", len(frames))
}
if frames[0].event != "heartbeat" || frames[0].heartbeat.ElapsedSeconds != 5 {
t.Errorf("frame 0 = %+v; want heartbeat 5s", frames[0])
}
if frames[1].event != "heartbeat" || frames[1].heartbeat.ElapsedSeconds != 10 {
t.Errorf("frame 1 = %+v; want heartbeat 10s", frames[1])
}
if frames[2].data.Type != "chunk" || frames[2].data.Content != "Hi" {
t.Errorf("frame 2 = %+v; want chunk Hi", frames[2])
}
}
func TestParseSSEStream_IgnoresComments(t *testing.T) {
body := `: keep-alive comment line
data: {"type":"chunk","content":"x"}
`
var frames []streamFrame
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
frames = append(frames, f)
})
if err != nil {
t.Fatalf("parseSSEStream: %v", err)
}
if len(frames) != 1 || frames[0].data.Content != "x" {
t.Errorf("frames = %+v; want 1 chunk", frames)
}
}
func TestParseSSEStream_HandlesCRLF(t *testing.T) {
body := "data: {\"type\":\"chunk\",\"content\":\"crlf\"}\r\n\r\n"
var frames []streamFrame
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
frames = append(frames, f)
})
if err != nil {
t.Fatalf("parseSSEStream: %v", err)
}
if len(frames) != 1 || frames[0].data.Content != "crlf" {
t.Errorf("frames = %+v; want crlf chunk", frames)
}
}
func TestParseSSEStream_MultilineData(t *testing.T) {
// Two data: lines for the same event must concatenate with \n.
body := `data: {"type":"chunk",
data: "content":"x"}
`
var frames []streamFrame
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
frames = append(frames, f)
})
if err != nil {
t.Fatalf("parseSSEStream: %v", err)
}
if len(frames) != 1 || frames[0].data.Content != "x" {
t.Errorf("frames = %+v; want 1 chunk x", frames)
}
}
// =============================================================================
// matchAssistantResponse
// =============================================================================
func TestMatchAssistantResponse_PrefersUserPrecededAssistant(t *testing.T) {
turns := []aichatConversationTurn{
{Role: "user", Body: "first question"},
{Role: "assistant", Body: "first answer"},
{Role: "user", Body: "second question"},
{Role: "assistant", Body: "second answer"},
}
got := matchAssistantResponse(turns, "second question")
if got != "second answer" {
t.Errorf("got %q; want %q", got, "second answer")
}
}
func TestMatchAssistantResponse_NormaliseCase(t *testing.T) {
turns := []aichatConversationTurn{
{Role: "user", Body: "Hello World"},
{Role: "assistant", Body: "hi back"},
}
got := matchAssistantResponse(turns, " hello world ")
if got != "hi back" {
t.Errorf("got %q; want %q", got, "hi back")
}
}
func TestMatchAssistantResponse_FallbackToLastAssistant(t *testing.T) {
// User message doesn't match (aichat persisted with a different
// envelope or wrapper). Fallback: take the last assistant turn.
turns := []aichatConversationTurn{
{Role: "user", Body: "[ctx route=x] my question"},
{Role: "assistant", Body: "the answer"},
}
got := matchAssistantResponse(turns, "my question")
if got != "the answer" {
t.Errorf("got %q; want %q", got, "the answer")
}
}
func TestMatchAssistantResponse_NoAssistantTurns(t *testing.T) {
turns := []aichatConversationTurn{
{Role: "user", Body: "lonely"},
}
got := matchAssistantResponse(turns, "lonely")
if got != "" {
t.Errorf("got %q; want empty", got)
}
}
func TestMatchAssistantResponse_EmptyAssistantSkipped(t *testing.T) {
turns := []aichatConversationTurn{
{Role: "user", Body: "q1"},
{Role: "assistant", Body: ""},
{Role: "user", Body: "q2"},
{Role: "assistant", Body: "a2"},
}
got := matchAssistantResponse(turns, "q1")
if got != "a2" {
// q1's immediate next is the empty-body assistant — we skip it
// and fall back to the last non-empty assistant body.
t.Errorf("got %q; want %q (fallback)", got, "a2")
}
}
// =============================================================================
// Conversation-API HTTP client
// =============================================================================
func TestListAichatConversations_BuildsExpectedQuery(t *testing.T) {
var seenPath string
s := newAichatService(t, nil, func(ctx context.Context, method, path string, body any, out any) error {
seenPath = path
if dst, ok := out.(*aichatListConversationsResponse); ok {
dst.Conversations = []aichatConversationSummary{{ID: "11111111-1111-1111-1111-111111111111"}}
}
return nil
})
got, err := s.listAichatConversations(context.Background(), "alice", "00000000-0000-0000-0000-000000000001")
if err != nil {
t.Fatalf("listAichatConversations: %v", err)
}
if len(got) != 1 || got[0].ID == "" {
t.Errorf("got = %+v; want one conversation", got)
}
for _, want := range []string{"/chat/conversations?", "persona=paliadin", "username=alice", "user_id=00000000-0000-0000-0000-000000000001", "limit=5"} {
if !strings.Contains(seenPath, want) {
t.Errorf("path %q missing %q", seenPath, want)
}
}
}
func TestFetchAichatConversationTurns_BuildsPath(t *testing.T) {
var seenPath string
s := newAichatService(t, nil, func(ctx context.Context, method, path string, body any, out any) error {
seenPath = path
if dst, ok := out.(*aichatGetConversationTurnsResponse); ok {
dst.Turns = []aichatConversationTurn{{Role: "assistant", Body: "answer"}}
}
return nil
})
turns, err := s.fetchAichatConversationTurns(context.Background(), "11111111-1111-1111-1111-111111111111")
if err != nil {
t.Fatalf("fetchAichatConversationTurns: %v", err)
}
if len(turns) != 1 || turns[0].Body != "answer" {
t.Errorf("turns = %+v; want one assistant", turns)
}
for _, want := range []string{"/chat/conversations/11111111-1111-1111-1111-111111111111/turns", "persona=paliadin", "limit=20"} {
if !strings.Contains(seenPath, want) {
t.Errorf("path %q missing %q", seenPath, want)
}
}
}
// =============================================================================
// Interface conformance
// =============================================================================
func TestAichatPaliadinService_ImplementsStreaming(t *testing.T) {
var _ StreamingPaliadin = (*AichatPaliadinService)(nil)
var _ AichatRecoverer = (*AichatPaliadinService)(nil)
}