m's 14:56 observation: long Paliadin turns showed "Verbindung verloren —
Antwort wird nachgereicht …" but never delivered. The aichat backend
finished the turn upstream; paliad's HTTP client had given up at 130 s
and the legacy filesystem janitor never ran for the aichat path.
Three intertwined fixes, all shipped together because they share the
same wire shape and the same UI states:
1. Switch the aichat backend to /chat/turn/stream
- new AichatPaliadinService.RunTurnStream relays incremental chunks
- SSE parser handles default `data:` frames (chunk/meta/done/error)
and named `event: heartbeat` frames per the upstream contract
- no more 130 s hard ceiling — stream stays open as long as data or
heartbeats flow; silenceTimeout (90 s) catches a true upstream
stall instead
2. Proof-of-life thinking events
- handler emits `event: thinking` every 5 s while the upstream is
silent (synthesised locally) AND relays aichat's `heartbeat`
events as thinking pings
- frontend renders a lime-dot pulse + monospace counter inside the
assistant bubble — the user can SEE the chat is still working
3. Honest disconnect copy + real late-recovery
- new dispatching endpoint GET /api/paliadin/turns/{id}/recover
- aichat backend: asks aichat via GET /chat/conversations and
/chat/conversations/{id}/turns whether the turn actually finished
- legacy backend: falls through to the local row read (janitor)
- frontend swaps "wird nachgereicht" → "Lade frische Antwort …"
while the recovery polls; on confirmed "lost" swaps to
"Antwort konnte nicht zugestellt werden — bitte erneut stellen"
- migration 118 adds aichat_conversation_id to paliadin_turns so
the recovery has a fast path when the done frame arrived before
the drop
Streaming + recovery are a no-op for PALIADIN_BACKEND=legacy: the
StreamingPaliadin interface is detected via type assertion, the
LocalPaliadinService stays on the one-shot RunTurn + filesystem
janitor path.
13 new unit tests cover the SSE parser, the conversation-API client,
and the match-assistant-response helper.
go build ./... + go test ./internal/... + go test ./cmd/server/...
+ bun run build all clean.
260 lines
8.5 KiB
Go
260 lines
8.5 KiB
Go
package services
|
|
|
|
// Streaming + recovery tests for AichatPaliadinService (t-paliad-235).
|
|
//
|
|
// Like the sync-path tests next door, every test bypasses the HTTP wire
|
|
// (streamHook / httpHook). DB-write paths in RunTurnStream are out of
|
|
// scope here for the same reason — paliad has no sqlx mock. We focus
|
|
// on the SSE parser, the conversation-API client, and the
|
|
// match-assistant-response helper.
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
// =============================================================================
|
|
// SSE parser
|
|
// =============================================================================
|
|
|
|
func TestParseSSEStream_DefaultEvents(t *testing.T) {
|
|
body := `data: {"type":"chunk","content":"Hello "}
|
|
|
|
data: {"type":"chunk","content":"world"}
|
|
|
|
data: {"type":"meta","used_tools":["search"],"rows_seen":["3"],"classifier_tag":"howto"}
|
|
|
|
data: {"type":"done","turn_id":"abc","conversation_id":"11111111-1111-1111-1111-111111111111","duration_ms":1234,"pane_spawned":false,"resumed":false}
|
|
|
|
`
|
|
var frames []streamFrame
|
|
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
|
frames = append(frames, f)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("parseSSEStream: %v", err)
|
|
}
|
|
if len(frames) != 4 {
|
|
t.Fatalf("got %d frames; want 4 (%+v)", len(frames), frames)
|
|
}
|
|
if frames[0].data.Type != "chunk" || frames[0].data.Content != "Hello " {
|
|
t.Errorf("frame 0 = %+v; want chunk Hello ", frames[0])
|
|
}
|
|
if frames[1].data.Type != "chunk" || frames[1].data.Content != "world" {
|
|
t.Errorf("frame 1 = %+v; want chunk world", frames[1])
|
|
}
|
|
if frames[2].data.Type != "meta" || frames[2].data.ClassifierTag != "howto" {
|
|
t.Errorf("frame 2 = %+v; want meta howto", frames[2])
|
|
}
|
|
if frames[3].data.Type != "done" || frames[3].data.ConversationID == "" {
|
|
t.Errorf("frame 3 = %+v; want done with conversation_id", frames[3])
|
|
}
|
|
}
|
|
|
|
func TestParseSSEStream_HeartbeatEvent(t *testing.T) {
|
|
body := `event: heartbeat
|
|
data: {"elapsed_seconds": 5}
|
|
|
|
event: heartbeat
|
|
data: {"elapsed_seconds": 10}
|
|
|
|
data: {"type":"chunk","content":"Hi"}
|
|
|
|
`
|
|
var frames []streamFrame
|
|
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
|
frames = append(frames, f)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("parseSSEStream: %v", err)
|
|
}
|
|
if len(frames) != 3 {
|
|
t.Fatalf("got %d frames; want 3", len(frames))
|
|
}
|
|
if frames[0].event != "heartbeat" || frames[0].heartbeat.ElapsedSeconds != 5 {
|
|
t.Errorf("frame 0 = %+v; want heartbeat 5s", frames[0])
|
|
}
|
|
if frames[1].event != "heartbeat" || frames[1].heartbeat.ElapsedSeconds != 10 {
|
|
t.Errorf("frame 1 = %+v; want heartbeat 10s", frames[1])
|
|
}
|
|
if frames[2].data.Type != "chunk" || frames[2].data.Content != "Hi" {
|
|
t.Errorf("frame 2 = %+v; want chunk Hi", frames[2])
|
|
}
|
|
}
|
|
|
|
func TestParseSSEStream_IgnoresComments(t *testing.T) {
|
|
body := `: keep-alive comment line
|
|
|
|
data: {"type":"chunk","content":"x"}
|
|
|
|
`
|
|
var frames []streamFrame
|
|
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
|
frames = append(frames, f)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("parseSSEStream: %v", err)
|
|
}
|
|
if len(frames) != 1 || frames[0].data.Content != "x" {
|
|
t.Errorf("frames = %+v; want 1 chunk", frames)
|
|
}
|
|
}
|
|
|
|
func TestParseSSEStream_HandlesCRLF(t *testing.T) {
|
|
body := "data: {\"type\":\"chunk\",\"content\":\"crlf\"}\r\n\r\n"
|
|
var frames []streamFrame
|
|
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
|
frames = append(frames, f)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("parseSSEStream: %v", err)
|
|
}
|
|
if len(frames) != 1 || frames[0].data.Content != "crlf" {
|
|
t.Errorf("frames = %+v; want crlf chunk", frames)
|
|
}
|
|
}
|
|
|
|
func TestParseSSEStream_MultilineData(t *testing.T) {
|
|
// Two data: lines for the same event must concatenate with \n.
|
|
body := `data: {"type":"chunk",
|
|
data: "content":"x"}
|
|
|
|
`
|
|
var frames []streamFrame
|
|
err := parseSSEStream(context.Background(), strings.NewReader(body), func(f streamFrame) {
|
|
frames = append(frames, f)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("parseSSEStream: %v", err)
|
|
}
|
|
if len(frames) != 1 || frames[0].data.Content != "x" {
|
|
t.Errorf("frames = %+v; want 1 chunk x", frames)
|
|
}
|
|
}
|
|
|
|
// =============================================================================
|
|
// matchAssistantResponse
|
|
// =============================================================================
|
|
|
|
func TestMatchAssistantResponse_PrefersUserPrecededAssistant(t *testing.T) {
|
|
turns := []aichatConversationTurn{
|
|
{Role: "user", Body: "first question"},
|
|
{Role: "assistant", Body: "first answer"},
|
|
{Role: "user", Body: "second question"},
|
|
{Role: "assistant", Body: "second answer"},
|
|
}
|
|
got := matchAssistantResponse(turns, "second question")
|
|
if got != "second answer" {
|
|
t.Errorf("got %q; want %q", got, "second answer")
|
|
}
|
|
}
|
|
|
|
func TestMatchAssistantResponse_NormaliseCase(t *testing.T) {
|
|
turns := []aichatConversationTurn{
|
|
{Role: "user", Body: "Hello World"},
|
|
{Role: "assistant", Body: "hi back"},
|
|
}
|
|
got := matchAssistantResponse(turns, " hello world ")
|
|
if got != "hi back" {
|
|
t.Errorf("got %q; want %q", got, "hi back")
|
|
}
|
|
}
|
|
|
|
func TestMatchAssistantResponse_FallbackToLastAssistant(t *testing.T) {
|
|
// User message doesn't match (aichat persisted with a different
|
|
// envelope or wrapper). Fallback: take the last assistant turn.
|
|
turns := []aichatConversationTurn{
|
|
{Role: "user", Body: "[ctx route=x] my question"},
|
|
{Role: "assistant", Body: "the answer"},
|
|
}
|
|
got := matchAssistantResponse(turns, "my question")
|
|
if got != "the answer" {
|
|
t.Errorf("got %q; want %q", got, "the answer")
|
|
}
|
|
}
|
|
|
|
func TestMatchAssistantResponse_NoAssistantTurns(t *testing.T) {
|
|
turns := []aichatConversationTurn{
|
|
{Role: "user", Body: "lonely"},
|
|
}
|
|
got := matchAssistantResponse(turns, "lonely")
|
|
if got != "" {
|
|
t.Errorf("got %q; want empty", got)
|
|
}
|
|
}
|
|
|
|
func TestMatchAssistantResponse_EmptyAssistantSkipped(t *testing.T) {
|
|
turns := []aichatConversationTurn{
|
|
{Role: "user", Body: "q1"},
|
|
{Role: "assistant", Body: ""},
|
|
{Role: "user", Body: "q2"},
|
|
{Role: "assistant", Body: "a2"},
|
|
}
|
|
got := matchAssistantResponse(turns, "q1")
|
|
if got != "a2" {
|
|
// q1's immediate next is the empty-body assistant — we skip it
|
|
// and fall back to the last non-empty assistant body.
|
|
t.Errorf("got %q; want %q (fallback)", got, "a2")
|
|
}
|
|
}
|
|
|
|
// =============================================================================
|
|
// Conversation-API HTTP client
|
|
// =============================================================================
|
|
|
|
func TestListAichatConversations_BuildsExpectedQuery(t *testing.T) {
|
|
var seenPath string
|
|
s := newAichatService(t, nil, func(ctx context.Context, method, path string, body any, out any) error {
|
|
seenPath = path
|
|
if dst, ok := out.(*aichatListConversationsResponse); ok {
|
|
dst.Conversations = []aichatConversationSummary{{ID: "11111111-1111-1111-1111-111111111111"}}
|
|
}
|
|
return nil
|
|
})
|
|
got, err := s.listAichatConversations(context.Background(), "alice", "00000000-0000-0000-0000-000000000001")
|
|
if err != nil {
|
|
t.Fatalf("listAichatConversations: %v", err)
|
|
}
|
|
if len(got) != 1 || got[0].ID == "" {
|
|
t.Errorf("got = %+v; want one conversation", got)
|
|
}
|
|
for _, want := range []string{"/chat/conversations?", "persona=paliadin", "username=alice", "user_id=00000000-0000-0000-0000-000000000001", "limit=5"} {
|
|
if !strings.Contains(seenPath, want) {
|
|
t.Errorf("path %q missing %q", seenPath, want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFetchAichatConversationTurns_BuildsPath(t *testing.T) {
|
|
var seenPath string
|
|
s := newAichatService(t, nil, func(ctx context.Context, method, path string, body any, out any) error {
|
|
seenPath = path
|
|
if dst, ok := out.(*aichatGetConversationTurnsResponse); ok {
|
|
dst.Turns = []aichatConversationTurn{{Role: "assistant", Body: "answer"}}
|
|
}
|
|
return nil
|
|
})
|
|
turns, err := s.fetchAichatConversationTurns(context.Background(), "11111111-1111-1111-1111-111111111111")
|
|
if err != nil {
|
|
t.Fatalf("fetchAichatConversationTurns: %v", err)
|
|
}
|
|
if len(turns) != 1 || turns[0].Body != "answer" {
|
|
t.Errorf("turns = %+v; want one assistant", turns)
|
|
}
|
|
for _, want := range []string{"/chat/conversations/11111111-1111-1111-1111-111111111111/turns", "persona=paliadin", "limit=20"} {
|
|
if !strings.Contains(seenPath, want) {
|
|
t.Errorf("path %q missing %q", seenPath, want)
|
|
}
|
|
}
|
|
}
|
|
|
|
// =============================================================================
|
|
// Interface conformance
|
|
// =============================================================================
|
|
|
|
func TestAichatPaliadinService_ImplementsStreaming(t *testing.T) {
|
|
var _ StreamingPaliadin = (*AichatPaliadinService)(nil)
|
|
var _ AichatRecoverer = (*AichatPaliadinService)(nil)
|
|
}
|