package aggregate import ( "context" "log/slog" "slices" "sync" "time" "github.com/m/projax/caldav" "github.com/m/projax/gitea" "github.com/m/projax/store" ) // Ref-type constants for the projax.item_links rows the aggregator fans out // over. Mirrors the values web/caldav.go and web/gitea.go use today. const ( RefTypeCalDAV = "caldav-list" RefTypeGiteaRepo = "gitea-repo" ) // Default per-call worker count. Matches the pre-Phase-5a value used by all // five collect functions. const defaultWorkers = 4 // CalDAVClient is the slice of *caldav.Client the aggregator calls. Kept as // an interface so tests can stub network IO without spinning up a real DAV // server. type CalDAVClient interface { ListTodos(ctx context.Context, calendarURL string) ([]caldav.Todo, error) ListEvents(ctx context.Context, calendarURL string, opts caldav.ListEventsOpts) ([]caldav.Event, error) } // GiteaClient is the slice of *gitea.Client the aggregator calls. type GiteaClient interface { ListIssues(ctx context.Context, owner, repo string, opts gitea.ListOpts) ([]gitea.Issue, error) } // LinkLister is the slice of *store.Store the aggregator calls. type LinkLister interface { LinksByType(ctx context.Context, itemID, refType string) ([]*store.ItemLink, error) DatedLinksRange(ctx context.Context, from, to time.Time) ([]*store.ItemLinkWithItem, error) ItemsCreatedInRange(ctx context.Context, from, to time.Time) ([]*store.Item, error) } // IssueCache is the small TTL cache web/gitea.go already maintains for // repeated dashboard renders. Nil = no caching (used in unit tests). type IssueCache interface { Get(key string) ([]gitea.Issue, bool) Set(key string, issues []gitea.Issue) } // Aggregator owns the fan-out logic. Construct via New so the logger // fallback to slog.Default applies — every public method assumes Logger // non-nil. type Aggregator struct { Store LinkLister CalDAV CalDAVClient // nil = todo/event/issue methods return empty Gitea GiteaClient // nil = Issues returns empty IssueCache IssueCache // nil = no caching Logger *slog.Logger } // New builds an Aggregator with sensible defaults. Any of the deps may be // nil; the corresponding methods become no-ops (this matches the optional // CalDAV/Gitea integration shape the rest of projax already follows). func New(st LinkLister, cal CalDAVClient, git GiteaClient, cache IssueCache, logger *slog.Logger) *Aggregator { if logger == nil { logger = slog.Default() } return &Aggregator{ Store: st, CalDAV: cal, Gitea: git, IssueCache: cache, Logger: logger, } } // Window is the optional time range applied by Todos/Events/Docs/Creations. // Zero From + To means "no narrowing" — Todos and the dashboard use that // when they want every open task irrespective of due date. From is // inclusive, To is exclusive. type Window struct { From time.Time To time.Time } // IsZero reports whether the window has no bounds set. func (w Window) IsZero() bool { return w.From.IsZero() && w.To.IsZero() } // contains reports whether t falls in [From, To). Always true on a zero // window so callers can pass Window{} for "match everything". func (w Window) contains(t time.Time) bool { if w.IsZero() { return true } if t.Before(w.From) { return false } if !t.Before(w.To) { return false } return true } // AllOpts narrows what All fetches. Empty Kinds = all five. Window applies // to Todos/Events/Docs/Creations; Issues ignores it. type AllOpts struct { Window Window Kinds []string // subset of {KindTodo, KindEvent, KindDoc, KindCreation, "issue"} } func (o AllOpts) want(kind string) bool { if len(o.Kinds) == 0 { return true } return slices.Contains(o.Kinds, kind) } // All runs every fetch in one pass. The MCP timeline tool consumes the // result; the dashboard + web timeline call individual methods instead so // they don't pay for fetches they won't render. func (a *Aggregator) All(ctx context.Context, items []*store.Item, opts AllOpts) Result { var r Result if opts.want(KindTodo) { r.Todos = a.Todos(ctx, items, opts.Window) } if opts.want(KindEvent) { r.Events = a.Events(ctx, items, opts.Window) } if opts.want("issue") { r.Issues = a.Issues(ctx, items) } if opts.want(KindDoc) { r.Docs = a.Docs(ctx, items, opts.Window) } if opts.want(KindCreation) { r.Creations = a.Creations(ctx, items, opts.Window) } return r } // linkJob is one (item, link) pair the worker pool consumes. type linkJob struct { item *store.Item link *store.ItemLink } // jobsByLink walks every item, looks up item_links of refType, and emits // one job per (item, link). Per-item lookup errors are logged at WARN and // skipped — same behaviour as the pre-Phase-5a code so a single broken // item never blanks the section. func (a *Aggregator) jobsByLink(ctx context.Context, items []*store.Item, refType, scope string) []linkJob { jobs := make([]linkJob, 0, len(items)) for _, it := range items { links, err := a.Store.LinksByType(ctx, it.ID, refType) if err != nil { a.Logger.Warn("aggregate links", "scope", scope, "item", it.PrimaryPath(), "ref_type", refType, "err", err) continue } for _, l := range links { jobs = append(jobs, linkJob{item: it, link: l}) } } return jobs } // runLinkPool runs work(ctx, item, link) across at most defaultWorkers // goroutines. The worker emits zero or more rows per job by calling sink; // rows are collected into the returned slice in arbitrary order (callers // sort downstream). Each job may produce N rows so a channel sized to // len(jobs) would deadlock — we use a mutex-protected slice instead. func runLinkPool[R any](ctx context.Context, jobs []linkJob, work func(ctx context.Context, item *store.Item, link *store.ItemLink, sink func(R))) []R { if len(jobs) == 0 { return nil } var ( out []R mu sync.Mutex ) sink := func(r R) { mu.Lock() out = append(out, r) mu.Unlock() } in := make(chan linkJob, len(jobs)) var wg sync.WaitGroup for i := 0; i < defaultWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for j := range in { work(ctx, j.item, j.link, sink) } }() } for _, j := range jobs { in <- j } close(in) wg.Wait() return out } // Todos returns one TodoRow per VTODO on every caldav-list link of the // given items. Empty Window = no narrowing (dashboard pattern). Otherwise // rows are filtered: // - open rows: kept when Due is in [From, To). // - done/cancelled rows: kept when LastModified (Due fallback) is in [From, To). // // Per-calendar errors are logged at WARN and the calendar is skipped; the // surviving rows still come back so one bad calendar doesn't blank the // surface. func (a *Aggregator) Todos(ctx context.Context, items []*store.Item, w Window) []TodoRow { if a.CalDAV == nil { return nil } jobs := a.jobsByLink(ctx, items, RefTypeCalDAV, "todos") return runLinkPool[TodoRow](ctx, jobs, func(ctx context.Context, item *store.Item, link *store.ItemLink, sink func(TodoRow)) { todos, err := a.CalDAV.ListTodos(ctx, link.RefID) if err != nil { a.Logger.Warn("aggregate list todos", "calendar", link.RefID, "err", err) return } for _, td := range todos { if !w.IsZero() { open := td.Status != "COMPLETED" && td.Status != "CANCELLED" var anchor *time.Time if open { anchor = td.Due } else if td.LastModified != nil { anchor = td.LastModified } else { anchor = td.Due } if anchor == nil { continue } if !w.contains(startOfDay(anchor.Local())) { continue } } sink(TodoRow{Item: item, CalendarURL: link.RefID, Todo: td}) } }) } // Events returns one EventRow per VEVENT on every caldav-list link of the // given items, within the Window's [From, To) range. The window is // REQUIRED here because the CalDAV REPORT for VEVENTs is time-bounded — // passing Window{} would refuse at the server. func (a *Aggregator) Events(ctx context.Context, items []*store.Item, w Window) []EventRow { if a.CalDAV == nil { return nil } jobs := a.jobsByLink(ctx, items, RefTypeCalDAV, "events") opts := caldav.ListEventsOpts{TimeMin: w.From, TimeMax: w.To} return runLinkPool[EventRow](ctx, jobs, func(ctx context.Context, item *store.Item, link *store.ItemLink, sink func(EventRow)) { events, err := a.CalDAV.ListEvents(ctx, link.RefID, opts) if err != nil { a.Logger.Warn("aggregate list events", "calendar", link.RefID, "err", err) return } for _, ev := range events { sink(EventRow{Item: item, Event: ev}) } }) } // Issues returns every open Gitea issue across the items' gitea-repo // links. Repeat callers benefit from the IssueCache when one is wired in // — same 3-minute TTL pattern web/gitea.go uses today. func (a *Aggregator) Issues(ctx context.Context, items []*store.Item) []IssueRow { if a.Gitea == nil { return nil } jobs := a.jobsByLink(ctx, items, RefTypeGiteaRepo, "issues") return runLinkPool[IssueRow](ctx, jobs, func(ctx context.Context, item *store.Item, link *store.ItemLink, sink func(IssueRow)) { owner, repo := gitea.ParseRepoRef(link.RefID) if owner == "" || repo == "" { return } key := link.RefID + "|open" var open []gitea.Issue if a.IssueCache != nil { if v, ok := a.IssueCache.Get(key); ok { open = v } } if open == nil { var err error open, err = a.Gitea.ListIssues(ctx, owner, repo, gitea.ListOpts{State: "open"}) if err != nil { a.Logger.Warn("aggregate list issues", "repo", link.RefID, "err", err) return } if a.IssueCache != nil { a.IssueCache.Set(key, open) } } for _, iss := range open { sink(IssueRow{Item: item, Repo: link.RefID, Issue: iss}) } }) } // Docs returns every dated item_link within the window whose owning item // is in the caller's allow-list. Wraps store.DatedLinksRange — the heavy // lifting (event_date scan, item join) happens in SQL; aggregate just // filters by the caller's items. func (a *Aggregator) Docs(ctx context.Context, items []*store.Item, w Window) []DocRow { rows, err := a.Store.DatedLinksRange(ctx, w.From, w.To) if err != nil { a.Logger.Warn("aggregate dated links", "err", err) return nil } if len(rows) == 0 { return nil } byID := map[string]*store.Item{} for _, it := range items { byID[it.ID] = it } out := make([]DocRow, 0, len(rows)) for _, r := range rows { it, ok := byID[r.Link.ItemID] if !ok { continue } link := r.Link out = append(out, DocRow{Item: it, Link: &link}) } return out } // Creations returns CreationRow per projax item created in the window, // filtered to items in the caller's allow-list. func (a *Aggregator) Creations(ctx context.Context, items []*store.Item, w Window) []CreationRow { created, err := a.Store.ItemsCreatedInRange(ctx, w.From, w.To) if err != nil { a.Logger.Warn("aggregate created", "err", err) return nil } if len(created) == 0 { return nil } byID := map[string]struct{}{} for _, it := range items { byID[it.ID] = struct{}{} } out := make([]CreationRow, 0, len(created)) for _, it := range created { if _, ok := byID[it.ID]; !ok { continue } out = append(out, CreationRow{Item: it}) } return out } // startOfDay zeroes a time to local midnight. Re-declared here (rather // than imported from web/) so the package stays free of cross-cuts. func startOfDay(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) }