Merge: t-paliad-069 reminder ticker boundary alignment + startup catch-up

This commit is contained in:
m
2026-04-30 02:28:32 +02:00
2 changed files with 361 additions and 24 deletions

View File

@@ -52,11 +52,6 @@ import (
"mgit.msbls.de/m/patholo/internal/models"
)
// reminderTickInterval controls how often the service checks for due Deadlines.
// Hourly is enough given the slot-based dedup and "today / today+offset"
// granularity — we don't need minute precision.
const reminderTickInterval = time.Hour
// ReminderService wires the hourly reminder job. Construct with NewReminderService,
// start with Start(ctx), stop by cancelling the parent context.
type ReminderService struct {
@@ -89,54 +84,101 @@ func NewReminderService(db *sqlx.DB, mail *MailService, users *UserService, base
}
}
// Start spawns the hourly ticker goroutine. Returns immediately; the loop
// exits when ctx is cancelled.
// Start spawns the boundary-aligned scanner goroutine. Returns immediately;
// the loop exits when ctx is cancelled.
func (s *ReminderService) Start(ctx context.Context) {
go s.loop(ctx)
}
// nextTopOfHour returns the duration from now until the next HH:00:00 in
// absolute time. Used to align the scanner's wake-up to natural hour
// boundaries instead of the container-start offset (t-paliad-069).
//
// Pre-fix the loop used `time.NewTicker(time.Hour)` directly: a deploy at
// 13:27:50 produced ticks at HH:27:50 forever, drifting the user-visible
// arrival of a 09:00-Berlin digest anywhere in the 09:xx hour and — worse —
// completely missing slots when redeploys clustered inside the slot hour.
// time.Truncate operates on absolute time, so the boundary is HH:00:00 UTC;
// for whole-hour-offset zones (e.g. Europe/Berlin = UTC±N) that's also
// HH:00:00 wall-clock locally, which is what users care about.
func nextTopOfHour(now time.Time) time.Duration {
next := now.Truncate(time.Hour).Add(time.Hour)
return next.Sub(now)
}
func (s *ReminderService) loop(ctx context.Context) {
slog.Info("reminder: starting hourly scanner",
"interval", reminderTickInterval,
slog.Info("reminder: starting boundary-aligned scanner",
"mail_enabled", s.mail.Enabled())
// Run once immediately so a fresh deploy catches up without waiting an
// hour — paired with the slot dedup, this is safe.
s.RunOnce(ctx)
// Startup catch-up: fire any user/slot whose configured hour has already
// arrived today but no log row exists yet. Covers redeploys during or
// after a slot hour — without this, a single mistimed deploy can lose a
// day for affected users (the regular tick filter requires
// local.Hour() == slot_hour, which is only true for one hour per day).
// The slot_date dedup makes re-firing safe: if the previous container
// already logged the slot, this is a no-op.
s.runStartupCatchUp(ctx)
t := time.NewTicker(reminderTickInterval)
defer t.Stop()
// Aligned wait loop. nextTopOfHour is recomputed every iteration so any
// clock skew or RunOnce duration self-corrects rather than accumulating.
for {
timer := time.NewTimer(nextTopOfHour(s.clock()))
select {
case <-ctx.Done():
timer.Stop()
slog.Info("reminder: shutdown")
return
case <-t.C:
s.RunOnce(ctx)
case <-timer.C:
}
s.RunOnce(ctx)
}
}
// RunOnce performs one scan+send pass. Exposed so tests (and, later, an
// admin trigger endpoint) can exercise the path without waiting for the
// ticker. Errors on individual users are logged and swallowed so one bad
// row doesn't block the rest of the scan.
// RunOnce performs one scan+send pass for the regular hourly tick — fires
// only the slots whose configured hour matches the current local hour for
// each user. Exposed so tests (and, later, an admin trigger endpoint) can
// exercise the path without waiting for the ticker. Errors on individual
// users are logged and swallowed so one bad row doesn't block the scan.
func (s *ReminderService) RunOnce(ctx context.Context) {
s.scanForSlots(ctx, "tick", func(now time.Time, u models.User, slot string) bool {
return inSlot(now, u.ReminderTimezone, u.ReminderMorningTime, u.ReminderEveningTime, slot)
})
}
// runStartupCatchUp fires any user/slot whose configured hour has already
// arrived today (regardless of the current hour) but has no log row yet —
// see loop() for the rationale. Goes through the same runSlotForUser path
// as RunOnce, so the slot_date dedup, audience filter, and email shape all
// match the regular tick.
func (s *ReminderService) runStartupCatchUp(ctx context.Context) {
s.scanForSlots(ctx, "startup-catchup", func(now time.Time, u models.User, slot string) bool {
return slotPastDueToday(now, u.ReminderTimezone, u.ReminderMorningTime, u.ReminderEveningTime, slot)
})
}
// scanForSlots is the shared scan body: load all users, walk morning+evening,
// and call runSlotForUser for each that passes filterFn. label distinguishes
// the two callers in logs.
func (s *ReminderService) scanForSlots(
ctx context.Context,
label string,
filterFn func(now time.Time, u models.User, slot string) bool,
) {
now := s.clock()
if s.users == nil {
slog.Warn("reminder: UserService not wired, skipping tick")
slog.Warn("reminder: UserService not wired, skipping scan", "label", label)
return
}
users, err := s.users.List(ctx)
if err != nil {
slog.Warn("reminder: list users failed", "error", err)
slog.Warn("reminder: list users failed", "label", label, "error", err)
return
}
for _, u := range users {
for _, slot := range []string{"morning", "evening"} {
if !inSlot(now, u.ReminderTimezone, u.ReminderMorningTime, u.ReminderEveningTime, slot) {
if !filterFn(now, u, slot) {
continue
}
if !reminderEnabled(u.EmailPreferences, "deadline_reminders") {
@@ -144,7 +186,7 @@ func (s *ReminderService) RunOnce(ctx context.Context) {
}
if err := s.runSlotForUser(ctx, now, u, slot); err != nil {
slog.Warn("reminder: slot run failed",
"user_id", u.ID, "slot", slot, "error", err)
"label", label, "user_id", u.ID, "slot", slot, "error", err)
}
}
}
@@ -459,6 +501,42 @@ func inSlot(now time.Time, tz, morning, evening, slot string) bool {
return local.Hour() == hour
}
// slotPastDueToday reports whether the user's slot hour for today has
// already arrived (or is currently in progress) in the user's timezone.
// It's the relaxed sibling of inSlot: inSlot uses ==, slotPastDueToday
// uses >=. Used by runStartupCatchUp to redeliver slots that the regular
// tick missed because a redeploy moved the tick out of the slot hour.
//
// The slot_date dedup (paliad.reminder_log.slot/slot_date with partial
// UNIQUE INDEX) prevents this from double-firing if the slot already ran
// earlier today, so it's safe to run this opportunistically on every boot.
//
// A bad/empty timezone returns false (skip this user) — same defensive
// stance inSlot took for the same reason (t-paliad-064: alpine tzdata).
func slotPastDueToday(now time.Time, tz, morning, evening, slot string) bool {
loc, err := time.LoadLocation(tz)
if err != nil {
slog.Error("reminder: catch-up cannot load timezone, skipping user",
"tz", tz, "slot", slot, "error", err)
return false
}
local := now.In(loc)
target := morning
if slot == "evening" {
target = evening
}
hour, ok := parseHour(target)
if !ok {
if slot == "evening" {
hour = 16
} else {
hour = 9
}
}
return local.Hour() >= hour
}
// parseHour pulls the hour out of an "HH:MM" or "HH:MM:SS" string. Returns
// (0, false) on malformed input — callers fall back to the column defaults.
func parseHour(s string) (int, bool) {

View File

@@ -320,6 +320,150 @@ func TestTZDataEmbedded(t *testing.T) {
}
}
// TestNextTopOfHour locks the boundary-alignment math. The "fake clock"
// here is the time values fed to the pure function — we don't need a
// real goroutine + timer to verify scheduling, because the helper is what
// determines when the goroutine wakes up. Pre-fix the loop used
// time.NewTicker(time.Hour) directly off container start, which produced
// ticks at HH:MM:SS where MM/SS == container-start offset. Now every
// scheduled wake-up lands at HH:00:00, regardless of when the process
// booted (t-paliad-069).
func TestNextTopOfHour(t *testing.T) {
cases := []struct {
name string
now time.Time
want time.Duration
}{
// Exactly on the hour → wait a full hour to the next boundary.
{"on the hour", time.Date(2026, 4, 30, 13, 0, 0, 0, time.UTC), time.Hour},
// Halfway through.
{"halfway", time.Date(2026, 4, 30, 13, 30, 0, 0, time.UTC), 30 * time.Minute},
// One second before HH:00 → 1 second to wait.
{"one second to next hour", time.Date(2026, 4, 30, 13, 59, 59, 0, time.UTC), time.Second},
// The exact bug signature from the task brief.
{"container start signature 13:27:50", time.Date(2026, 4, 29, 13, 27, 50, 0, time.UTC), 32*time.Minute + 10*time.Second},
// Sub-second precision.
{"sub-second offset", time.Date(2026, 4, 30, 13, 0, 0, 250_000_000, time.UTC), time.Hour - 250*time.Millisecond},
// Across UTC midnight.
{"crosses midnight UTC", time.Date(2026, 4, 30, 23, 45, 0, 0, time.UTC), 15 * time.Minute},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := nextTopOfHour(tc.now)
if got != tc.want {
t.Errorf("nextTopOfHour(%s) = %s, want %s", tc.now.Format(time.RFC3339Nano), got, tc.want)
}
})
}
}
// TestNextTopOfHour_AlwaysLandsOnBoundary fuzzes the helper across an hour's
// worth of starting offsets and confirms now+delay always lands exactly on
// HH:00:00 (well within the 5-second tolerance the task brief allows).
// This is what makes ticks stable across redeploys.
func TestNextTopOfHour_AlwaysLandsOnBoundary(t *testing.T) {
base := time.Date(2026, 4, 30, 13, 0, 0, 0, time.UTC)
for sec := 0; sec < 3600; sec += 137 { // sample non-aligned offsets
now := base.Add(time.Duration(sec) * time.Second)
delay := nextTopOfHour(now)
wakeup := now.Add(delay)
if wakeup.Minute() != 0 || wakeup.Second() != 0 || wakeup.Nanosecond() != 0 {
t.Errorf("from now=%s, wakeup=%s — not on HH:00:00",
now.Format(time.RFC3339Nano), wakeup.Format(time.RFC3339Nano))
}
}
}
// TestNextTopOfHour_StableAfterRunOnce locks the second acceptance: after a
// RunOnce executes at exactly HH:00:00 and the loop re-enters, the next
// wake-up is at (HH+1):00:00 — not at HH:00:00 + (whatever delay we
// happened to compute on the previous iteration). Verifies the
// recompute-per-iteration design self-stabilises rather than drifting.
func TestNextTopOfHour_StableAfterRunOnce(t *testing.T) {
// Container starts mid-hour at 13:27:50.
start := time.Date(2026, 4, 30, 13, 27, 50, 0, time.UTC)
delay1 := nextTopOfHour(start)
if want := 32*time.Minute + 10*time.Second; delay1 != want {
t.Fatalf("first delay = %s, want %s", delay1, want)
}
// First fire lands at 14:00:00 exactly. Simulate RunOnce taking ~zero
// time; the next iteration computes its own delay from "now".
afterFirstFire := start.Add(delay1)
if want := time.Date(2026, 4, 30, 14, 0, 0, 0, time.UTC); !afterFirstFire.Equal(want) {
t.Fatalf("first fire at %s, want %s", afterFirstFire, want)
}
delay2 := nextTopOfHour(afterFirstFire)
if delay2 != time.Hour {
t.Errorf("second delay = %s, want 1h (would have been %s in old broken design)", delay2, delay1)
}
// And the second fire lands at 15:00:00 — not at 14:32:10 (= 14:00 + delay1).
afterSecondFire := afterFirstFire.Add(delay2)
if want := time.Date(2026, 4, 30, 15, 0, 0, 0, time.UTC); !afterSecondFire.Equal(want) {
t.Errorf("second fire at %s, want %s", afterSecondFire, want)
}
}
// TestSlotPastDueToday locks the catch-up filter. Mirrors TestInSlot's
// table shape but with the looser `>=` predicate that the startup catch-up
// uses to redeliver slots a redeploy missed.
func TestSlotPastDueToday(t *testing.T) {
// 2026-04-30 in Berlin is CEST (UTC+2).
utc0830Berlin := time.Date(2026, 4, 30, 6, 30, 0, 0, time.UTC) // 08:30 Berlin
utc0930Berlin := time.Date(2026, 4, 30, 7, 30, 0, 0, time.UTC) // 09:30 Berlin
utc1150Berlin := time.Date(2026, 4, 30, 9, 50, 0, 0, time.UTC) // 11:50 Berlin (the redeploy-after-09:00 case)
utc1630Berlin := time.Date(2026, 4, 30, 14, 30, 0, 0, time.UTC) // 16:30 Berlin
tests := []struct {
name string
now time.Time
tz string
morning string
evening string
slot string
want bool
}{
// Before morning slot: not past due.
{"before morning slot, morning not due", utc0830Berlin, "Europe/Berlin", "09:00", "16:00", "morning", false},
{"before morning slot, evening not due", utc0830Berlin, "Europe/Berlin", "09:00", "16:00", "evening", false},
// Inside morning slot hour: past due (slot has begun).
{"inside morning slot, morning past due", utc0930Berlin, "Europe/Berlin", "09:00", "16:00", "morning", true},
{"inside morning slot, evening not yet due", utc0930Berlin, "Europe/Berlin", "09:00", "16:00", "evening", false},
// After morning slot, before evening: morning past due, evening not. This is the
// canonical bug from the task brief — redeploy at 11:50 Berlin after the
// 09:00 slot was missed; catch-up must fire it.
{"after morning, morning past due", utc1150Berlin, "Europe/Berlin", "09:00", "16:00", "morning", true},
{"after morning, evening still not due", utc1150Berlin, "Europe/Berlin", "09:00", "16:00", "evening", false},
// After evening slot: both past due (catch-up may fire either; dedup decides).
{"after evening, morning still past due", utc1630Berlin, "Europe/Berlin", "09:00", "16:00", "morning", true},
{"after evening, evening past due", utc1630Berlin, "Europe/Berlin", "09:00", "16:00", "evening", true},
// Custom slot times.
{"custom morning 11:00 not yet due at 10:30", time.Date(2026, 4, 30, 8, 30, 0, 0, time.UTC), "Europe/Berlin", "11:00", "16:00", "morning", false},
{"custom morning 11:00 due at 11:30", time.Date(2026, 4, 30, 9, 30, 0, 0, time.UTC), "Europe/Berlin", "11:00", "16:00", "morning", true},
// Bad / empty tz: skip user (mirror inSlot's defensive stance).
{"unknown tz skips user", utc0930Berlin, "Mars/Olympus", "09:00", "16:00", "morning", false},
{"empty tz skips user", utc0930Berlin, "", "09:00", "16:00", "morning", false},
// Empty HH:MM still falls back to defaults (09:00 / 16:00).
{"empty morning string falls back to default 09:00", utc0930Berlin, "Europe/Berlin", "", "", "morning", true},
{"empty evening string falls back to default 16:00", utc1630Berlin, "Europe/Berlin", "", "", "evening", true},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := slotPastDueToday(tc.now, tc.tz, tc.morning, tc.evening, tc.slot)
if got != tc.want {
t.Errorf("slotPastDueToday(now=%s, tz=%q, m=%q, e=%q, slot=%q) = %v, want %v",
tc.now.Format(time.RFC3339), tc.tz, tc.morning, tc.evening, tc.slot, got, tc.want)
}
})
}
}
// TestInSlot_BerlinAt0900_NotAt1100 is the headline regression test for the
// 11:16 prod surprise. With reminder_morning_time=09:00 and tz=Europe/Berlin,
// the slot must fire at 09:xx Berlin (07:xx UTC, the user's chosen time) and
@@ -481,6 +625,121 @@ func TestRunSlotForUser(t *testing.T) {
}
}
// TestRunStartupCatchUp_RecoversMissedMorningSlot is the live-DB version of
// the third acceptance criterion in the task brief: a redeploy at 09:50
// (after the 09:00 slot's regular tick has been skipped) must still
// deliver the morning digest to anyone whose slot has not yet been logged
// for today.
//
// Pre-fix this would fail because RunOnce checks local.Hour() == slot_hour
// and a "now" of 11:50 Berlin is in hour 11, not 9 — the slot was lost
// for the day. With runStartupCatchUp the missed slot fires immediately,
// and the slot_date dedup ensures a second startup the same day is a
// no-op.
func TestRunStartupCatchUp_RecoversMissedMorningSlot(t *testing.T) {
url := os.Getenv("TEST_DATABASE_URL")
if url == "" {
t.Skip("TEST_DATABASE_URL not set — skipping live DB test")
}
if err := db.ApplyMigrations(url); err != nil {
t.Fatalf("apply migrations: %v", err)
}
pool, err := sqlx.Connect("postgres", url)
if err != nil {
t.Fatalf("connect: %v", err)
}
defer pool.Close()
ctx := context.Background()
userID := uuid.New()
projectID := uuid.New()
dlToday := uuid.New()
cleanup := func() {
pool.ExecContext(ctx, `DELETE FROM paliad.reminder_log WHERE user_id = $1`, userID)
pool.ExecContext(ctx, `DELETE FROM paliad.deadlines WHERE id = $1`, dlToday)
pool.ExecContext(ctx, `DELETE FROM paliad.project_teams WHERE project_id = $1`, projectID)
pool.ExecContext(ctx, `DELETE FROM paliad.projects WHERE id = $1`, projectID)
pool.ExecContext(ctx, `DELETE FROM paliad.users WHERE id = $1`, userID)
pool.ExecContext(ctx, `DELETE FROM auth.users WHERE id = $1`, userID)
}
cleanup()
defer cleanup()
if _, err := pool.ExecContext(ctx,
`INSERT INTO auth.users (id, email) VALUES ($1, $2)`,
userID, "catchup-test@hlc.com"); err != nil {
t.Fatalf("seed auth.users: %v", err)
}
if _, err := pool.ExecContext(ctx,
`INSERT INTO paliad.users
(id, email, display_name, office, lang, email_preferences,
reminder_morning_time, reminder_evening_time, reminder_timezone,
reminder_warning_offset_days)
VALUES ($1, $2, 'Catch-Up Test', 'munich', 'de', '{}'::jsonb,
'09:00:00', '16:00:00', 'Europe/Berlin', 7)`,
userID, "catchup-test@hlc.com"); err != nil {
t.Fatalf("seed paliad.users: %v", err)
}
if _, err := pool.ExecContext(ctx,
`INSERT INTO paliad.projects (id, type, path, title, reference, status, created_by)
VALUES ($1, 'project', $1::text, 'Catch-Up Test', '2026/9999', 'active', $2)`,
projectID, userID); err != nil {
t.Fatalf("seed paliad.projects: %v", err)
}
today := time.Date(2026, 4, 28, 0, 0, 0, 0, time.UTC)
if _, err := pool.ExecContext(ctx,
`INSERT INTO paliad.deadlines (id, project_id, title, due_date, source, status, created_by)
VALUES ($1, $2, 'Heute fällig', $3, 'manual', 'pending', $4)`,
dlToday, projectID, today, userID); err != nil {
t.Fatalf("seed today deadline: %v", err)
}
mail, err := NewMailService()
if err != nil {
t.Fatalf("NewMailService: %v", err)
}
users := NewUserService(pool)
svc := NewReminderService(pool, mail, users, "https://paliad.test")
countLog := func(slot string, slotDate time.Time) int {
var n int
if err := pool.GetContext(ctx, &n,
`SELECT count(*) FROM paliad.reminder_log
WHERE user_id = $1 AND slot = $2 AND slot_date = $3`,
userID, slot, slotDate); err != nil {
t.Fatalf("count log: %v", err)
}
return n
}
// 11:50 Berlin (CEST) == 09:50 UTC — the exact "redeploy after the
// morning slot" signature from the task brief. Pre-fix RunOnce would
// see local.Hour()==11 and skip; runStartupCatchUp sees 11>=9 and fires.
svc.clock = func() time.Time {
return time.Date(2026, 4, 28, 9, 50, 0, 0, time.UTC)
}
svc.runStartupCatchUp(ctx)
if got := countLog("morning", today); got != 1 {
t.Errorf("after catch-up at 11:50 Berlin: morning rows = %d, want 1 (catch-up missed slot)", got)
}
// Evening slot is still in the future (16:00 > 11:50) — must not fire.
if got := countLog("evening", today); got != 0 {
t.Errorf("after catch-up at 11:50 Berlin: evening rows = %d, want 0 (slot not yet due)", got)
}
// Second startup-catch-up the same day must be a no-op (dedup).
svc.clock = func() time.Time {
return time.Date(2026, 4, 28, 12, 30, 0, 0, time.UTC)
}
svc.runStartupCatchUp(ctx)
if got := countLog("morning", today); got != 1 {
t.Errorf("second startup catch-up morning rows = %d, want still 1 (dedup failed)", got)
}
}
// TestRunSlotForUser_EmptyDigest verifies the no-spam rule: a user with no
// matching deadlines in their slot gets no email and no log row.
func TestRunSlotForUser_EmptyDigest(t *testing.T) {