Files
projax/internal/aggregate/aggregator.go
mAi 326f4c83b9 feat(aggregate): introduce internal/aggregate/ for fan-out + day-grouping
Phase 5a slice A: a new package that concentrates the "fan out across
linked items" pattern web/dashboard.go, web/timeline.go and mcp/tools.go
each had separate copies of. No callers touch it yet — slices B/C/D
migrate them in turn.

- Aggregator with five methods (Todos/Events/Issues/Docs/Creations) plus
  All convenience for the MCP timeline. Each method takes a *store.Item
  slice and (optionally) a Window, returns typed Row slices.
- Row types embed the underlying caldav.Todo / caldav.Event / gitea.Issue
  so existing html/template field accesses (.Todo.UID, .Event.Summary,
  …) keep resolving via Go field promotion in slices B/C.
- TimelineRow sum-type wrapper (with pointer slots per Kind) plus the
  flat template-friendly fields. Lifted-but-untouched from web/.
- BuildTimelineDays + SortTimelineRows + EventStartLabel +
  EventDurationHint lifted near-verbatim from web/timeline.go.
- CalDAV/Gitea/Store interfaces in the aggregator so unit tests stub IO
  cleanly. Real *caldav.Client / *gitea.Client / *store.Store satisfy
  by method set.
- Per-source error handling preserved: log at WARN + skip the bad
  fetch, return surviving rows.

Tests cover empty inputs, fan-out call counts, per-source error
recovery, window narrowing for todos, issue-cache hit path, doc/creation
allow-list filtering, BuildTimelineDays asc/desc order, sticky pills,
far-future fade, within-day sort.

Plan doc captures the slicing strategy + design decisions:
docs/plans/aggregator-refactor.md.

Task: t-projax-5a-aggregator
2026-05-21 23:57:54 +02:00

367 lines
11 KiB
Go

package aggregate
import (
"context"
"log/slog"
"slices"
"sync"
"time"
"github.com/m/projax/caldav"
"github.com/m/projax/gitea"
"github.com/m/projax/store"
)
// Ref-type constants for the projax.item_links rows the aggregator fans out
// over. Mirrors the values web/caldav.go and web/gitea.go use today.
const (
RefTypeCalDAV = "caldav-list"
RefTypeGiteaRepo = "gitea-repo"
)
// Default per-call worker count. Matches the pre-Phase-5a value used by all
// five collect functions.
const defaultWorkers = 4
// CalDAVClient is the slice of *caldav.Client the aggregator calls. Kept as
// an interface so tests can stub network IO without spinning up a real DAV
// server.
type CalDAVClient interface {
ListTodos(ctx context.Context, calendarURL string) ([]caldav.Todo, error)
ListEvents(ctx context.Context, calendarURL string, opts caldav.ListEventsOpts) ([]caldav.Event, error)
}
// GiteaClient is the slice of *gitea.Client the aggregator calls.
type GiteaClient interface {
ListIssues(ctx context.Context, owner, repo string, opts gitea.ListOpts) ([]gitea.Issue, error)
}
// LinkLister is the slice of *store.Store the aggregator calls.
type LinkLister interface {
LinksByType(ctx context.Context, itemID, refType string) ([]*store.ItemLink, error)
DatedLinksRange(ctx context.Context, from, to time.Time) ([]*store.ItemLinkWithItem, error)
ItemsCreatedInRange(ctx context.Context, from, to time.Time) ([]*store.Item, error)
}
// IssueCache is the small TTL cache web/gitea.go already maintains for
// repeated dashboard renders. Nil = no caching (used in unit tests).
type IssueCache interface {
Get(key string) ([]gitea.Issue, bool)
Set(key string, issues []gitea.Issue)
}
// Aggregator owns the fan-out logic. Construct via New so the logger
// fallback to slog.Default applies — every public method assumes Logger
// non-nil.
type Aggregator struct {
Store LinkLister
CalDAV CalDAVClient // nil = todo/event/issue methods return empty
Gitea GiteaClient // nil = Issues returns empty
IssueCache IssueCache // nil = no caching
Logger *slog.Logger
}
// New builds an Aggregator with sensible defaults. Any of the deps may be
// nil; the corresponding methods become no-ops (this matches the optional
// CalDAV/Gitea integration shape the rest of projax already follows).
func New(st LinkLister, cal CalDAVClient, git GiteaClient, cache IssueCache, logger *slog.Logger) *Aggregator {
if logger == nil {
logger = slog.Default()
}
return &Aggregator{
Store: st,
CalDAV: cal,
Gitea: git,
IssueCache: cache,
Logger: logger,
}
}
// Window is the optional time range applied by Todos/Events/Docs/Creations.
// Zero From + To means "no narrowing" — Todos and the dashboard use that
// when they want every open task irrespective of due date. From is
// inclusive, To is exclusive.
type Window struct {
From time.Time
To time.Time
}
// IsZero reports whether the window has no bounds set.
func (w Window) IsZero() bool { return w.From.IsZero() && w.To.IsZero() }
// contains reports whether t falls in [From, To). Always true on a zero
// window so callers can pass Window{} for "match everything".
func (w Window) contains(t time.Time) bool {
if w.IsZero() {
return true
}
if t.Before(w.From) {
return false
}
if !t.Before(w.To) {
return false
}
return true
}
// AllOpts narrows what All fetches. Empty Kinds = all five. Window applies
// to Todos/Events/Docs/Creations; Issues ignores it.
type AllOpts struct {
Window Window
Kinds []string // subset of {KindTodo, KindEvent, KindDoc, KindCreation, "issue"}
}
func (o AllOpts) want(kind string) bool {
if len(o.Kinds) == 0 {
return true
}
return slices.Contains(o.Kinds, kind)
}
// All runs every fetch in one pass. The MCP timeline tool consumes the
// result; the dashboard + web timeline call individual methods instead so
// they don't pay for fetches they won't render.
func (a *Aggregator) All(ctx context.Context, items []*store.Item, opts AllOpts) Result {
var r Result
if opts.want(KindTodo) {
r.Todos = a.Todos(ctx, items, opts.Window)
}
if opts.want(KindEvent) {
r.Events = a.Events(ctx, items, opts.Window)
}
if opts.want("issue") {
r.Issues = a.Issues(ctx, items)
}
if opts.want(KindDoc) {
r.Docs = a.Docs(ctx, items, opts.Window)
}
if opts.want(KindCreation) {
r.Creations = a.Creations(ctx, items, opts.Window)
}
return r
}
// linkJob is one (item, link) pair the worker pool consumes.
type linkJob struct {
item *store.Item
link *store.ItemLink
}
// jobsByLink walks every item, looks up item_links of refType, and emits
// one job per (item, link). Per-item lookup errors are logged at WARN and
// skipped — same behaviour as the pre-Phase-5a code so a single broken
// item never blanks the section.
func (a *Aggregator) jobsByLink(ctx context.Context, items []*store.Item, refType, scope string) []linkJob {
jobs := make([]linkJob, 0, len(items))
for _, it := range items {
links, err := a.Store.LinksByType(ctx, it.ID, refType)
if err != nil {
a.Logger.Warn("aggregate links", "scope", scope, "item", it.PrimaryPath(), "ref_type", refType, "err", err)
continue
}
for _, l := range links {
jobs = append(jobs, linkJob{item: it, link: l})
}
}
return jobs
}
// runLinkPool runs work(ctx, item, link) across at most defaultWorkers
// goroutines. The worker emits zero or more rows per job by calling sink;
// rows are collected into the returned slice in arbitrary order (callers
// sort downstream). Each job may produce N rows so a channel sized to
// len(jobs) would deadlock — we use a mutex-protected slice instead.
func runLinkPool[R any](ctx context.Context, jobs []linkJob, work func(ctx context.Context, item *store.Item, link *store.ItemLink, sink func(R))) []R {
if len(jobs) == 0 {
return nil
}
var (
out []R
mu sync.Mutex
)
sink := func(r R) {
mu.Lock()
out = append(out, r)
mu.Unlock()
}
in := make(chan linkJob, len(jobs))
var wg sync.WaitGroup
for i := 0; i < defaultWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := range in {
work(ctx, j.item, j.link, sink)
}
}()
}
for _, j := range jobs {
in <- j
}
close(in)
wg.Wait()
return out
}
// Todos returns one TodoRow per VTODO on every caldav-list link of the
// given items. Empty Window = no narrowing (dashboard pattern). Otherwise
// rows are filtered:
// - open rows: kept when Due is in [From, To).
// - done/cancelled rows: kept when LastModified (Due fallback) is in [From, To).
//
// Per-calendar errors are logged at WARN and the calendar is skipped; the
// surviving rows still come back so one bad calendar doesn't blank the
// surface.
func (a *Aggregator) Todos(ctx context.Context, items []*store.Item, w Window) []TodoRow {
if a.CalDAV == nil {
return nil
}
jobs := a.jobsByLink(ctx, items, RefTypeCalDAV, "todos")
return runLinkPool[TodoRow](ctx, jobs, func(ctx context.Context, item *store.Item, link *store.ItemLink, sink func(TodoRow)) {
todos, err := a.CalDAV.ListTodos(ctx, link.RefID)
if err != nil {
a.Logger.Warn("aggregate list todos", "calendar", link.RefID, "err", err)
return
}
for _, td := range todos {
if !w.IsZero() {
open := td.Status != "COMPLETED" && td.Status != "CANCELLED"
var anchor *time.Time
if open {
anchor = td.Due
} else if td.LastModified != nil {
anchor = td.LastModified
} else {
anchor = td.Due
}
if anchor == nil {
continue
}
if !w.contains(startOfDay(anchor.Local())) {
continue
}
}
sink(TodoRow{Item: item, CalendarURL: link.RefID, Todo: td})
}
})
}
// Events returns one EventRow per VEVENT on every caldav-list link of the
// given items, within the Window's [From, To) range. The window is
// REQUIRED here because the CalDAV REPORT for VEVENTs is time-bounded —
// passing Window{} would refuse at the server.
func (a *Aggregator) Events(ctx context.Context, items []*store.Item, w Window) []EventRow {
if a.CalDAV == nil {
return nil
}
jobs := a.jobsByLink(ctx, items, RefTypeCalDAV, "events")
opts := caldav.ListEventsOpts{TimeMin: w.From, TimeMax: w.To}
return runLinkPool[EventRow](ctx, jobs, func(ctx context.Context, item *store.Item, link *store.ItemLink, sink func(EventRow)) {
events, err := a.CalDAV.ListEvents(ctx, link.RefID, opts)
if err != nil {
a.Logger.Warn("aggregate list events", "calendar", link.RefID, "err", err)
return
}
for _, ev := range events {
sink(EventRow{Item: item, Event: ev})
}
})
}
// Issues returns every open Gitea issue across the items' gitea-repo
// links. Repeat callers benefit from the IssueCache when one is wired in
// — same 3-minute TTL pattern web/gitea.go uses today.
func (a *Aggregator) Issues(ctx context.Context, items []*store.Item) []IssueRow {
if a.Gitea == nil {
return nil
}
jobs := a.jobsByLink(ctx, items, RefTypeGiteaRepo, "issues")
return runLinkPool[IssueRow](ctx, jobs, func(ctx context.Context, item *store.Item, link *store.ItemLink, sink func(IssueRow)) {
owner, repo := gitea.ParseRepoRef(link.RefID)
if owner == "" || repo == "" {
return
}
key := link.RefID + "|open"
var open []gitea.Issue
if a.IssueCache != nil {
if v, ok := a.IssueCache.Get(key); ok {
open = v
}
}
if open == nil {
var err error
open, err = a.Gitea.ListIssues(ctx, owner, repo, gitea.ListOpts{State: "open"})
if err != nil {
a.Logger.Warn("aggregate list issues", "repo", link.RefID, "err", err)
return
}
if a.IssueCache != nil {
a.IssueCache.Set(key, open)
}
}
for _, iss := range open {
sink(IssueRow{Item: item, Repo: link.RefID, Issue: iss})
}
})
}
// Docs returns every dated item_link within the window whose owning item
// is in the caller's allow-list. Wraps store.DatedLinksRange — the heavy
// lifting (event_date scan, item join) happens in SQL; aggregate just
// filters by the caller's items.
func (a *Aggregator) Docs(ctx context.Context, items []*store.Item, w Window) []DocRow {
rows, err := a.Store.DatedLinksRange(ctx, w.From, w.To)
if err != nil {
a.Logger.Warn("aggregate dated links", "err", err)
return nil
}
if len(rows) == 0 {
return nil
}
byID := map[string]*store.Item{}
for _, it := range items {
byID[it.ID] = it
}
out := make([]DocRow, 0, len(rows))
for _, r := range rows {
it, ok := byID[r.Link.ItemID]
if !ok {
continue
}
link := r.Link
out = append(out, DocRow{Item: it, Link: &link})
}
return out
}
// Creations returns CreationRow per projax item created in the window,
// filtered to items in the caller's allow-list.
func (a *Aggregator) Creations(ctx context.Context, items []*store.Item, w Window) []CreationRow {
created, err := a.Store.ItemsCreatedInRange(ctx, w.From, w.To)
if err != nil {
a.Logger.Warn("aggregate created", "err", err)
return nil
}
if len(created) == 0 {
return nil
}
byID := map[string]struct{}{}
for _, it := range items {
byID[it.ID] = struct{}{}
}
out := make([]CreationRow, 0, len(created))
for _, it := range created {
if _, ok := byID[it.ID]; !ok {
continue
}
out = append(out, CreationRow{Item: it})
}
return out
}
// startOfDay zeroes a time to local midnight. Re-declared here (rather
// than imported from web/) so the package stays free of cross-cuts.
func startOfDay(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}