package store import ( "context" "encoding/json" "errors" "fmt" "sort" "strings" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) // Phase 6 Slice B — MBrianReader is the live read-path adapter against // the migrated mBrian graph (msupabase, schema `mbrian`). Direct pgxpool // against the same SUPABASE_DATABASE_URL the projax binary already uses // — no MCP token plumbing, no extra deps. Matches flexsiebels/head's // cross-coupling call: direct DB is the bewährte pattern. // // Mapping contract (see docs/plans/slice-b-adapter-contract.md): // * projax-managed nodes are mbrian.nodes where metadata ? 'projax_origin'. // * Item.Paths + Item.ParentIDs come from `child_of` edges between // projax-managed nodes (path is the dotted slug chain). // * Item.Status / .Tags / .Management / .Public* / .StartTime / // .EndTime / .TimelineExclude unpack from metadata.projax.*. // * External links (caldav-list, gitea-repo, mai-project, …) are // SELF-EDGES: source = target = item-node, rel = 'projax-', // payload in edges.metadata. // * Item.CreatedAt / .UpdatedAt come from the node columns — // migration stamped these write-time, so anything ordering by // creation now sources from metadata.projax.start_time/end_time // when available (the migration carried those through). // MBrianReader is the slice-B read-path adapter. Wraps a pgxpool that // reaches the mbrian.* schema (same SUPABASE_DATABASE_URL the projax // binary uses for projax.*). type MBrianReader struct { pool *pgxpool.Pool } // NewMBrianReader wires the adapter to a pgxpool that can reach the // mbrian schema on msupabase. func NewMBrianReader(pool *pgxpool.Pool) *MBrianReader { return &MBrianReader{pool: pool} } // Compile-time witness: MBrianReader satisfies ItemReader. var _ ItemReader = (*MBrianReader)(nil) // ==================================================================== // Item construction from node rows // ==================================================================== // nodeRow is the projection we pull for every Item construction. Matches // the SELECT in itemQuery below. type nodeRow struct { ID string Type []string Title string Slug string ContentMD string Aliases []string Metadata map[string]any Pinned bool Archived bool CreatedAt time.Time UpdatedAt time.Time } const projaxNodeColumns = `n.id::text, n.type, n.title, n.slug, n.content_md, n.aliases, n.metadata, n.pinned, n.archived, n.created_at, n.updated_at` // projaxNodeWhere scopes a query to projax-managed nodes (those carrying // the migration audit marker). Live + non-deleted only. const projaxNodeWhere = `n.deleted_at IS NULL AND n.metadata ? 'projax_origin'` // scanNodeRow consumes the projection above. func scanNodeRow(s interface { Scan(dest ...any) error }) (*nodeRow, error) { r := &nodeRow{} if err := s.Scan(&r.ID, &r.Type, &r.Title, &r.Slug, &r.ContentMD, &r.Aliases, &r.Metadata, &r.Pinned, &r.Archived, &r.CreatedAt, &r.UpdatedAt); err != nil { return nil, err } if r.Type == nil { r.Type = []string{} } if r.Aliases == nil { r.Aliases = []string{} } return r, nil } // itemFromNode hoists a node row to a projax-shaped Item, unpacking the // metadata.projax.* fields. Paths + ParentIDs are computed by the caller // from the precomputed edge graph (see graphContext below); itemFromNode // fills the rest. func itemFromNode(r *nodeRow) *Item { it := &Item{ ID: r.ID, Kind: r.Type, Title: r.Title, Slug: r.Slug, ContentMD: r.ContentMD, Aliases: r.Aliases, Pinned: r.Pinned, Archived: r.Archived, CreatedAt: r.CreatedAt, UpdatedAt: r.UpdatedAt, Source: "projax", Status: "active", // default if not in metadata.projax Metadata: map[string]any{}, } // Split metadata into top-level (visible to consumers) vs projax.* (unpacked). projaxMeta := map[string]any{} for k, v := range r.Metadata { switch k { case "projax": if m, ok := v.(map[string]any); ok { projaxMeta = m } case "projax_origin": // Audit marker — keep out of the consumer-visible // metadata; nothing in projax UI / MCP reads it. default: it.Metadata[k] = v } } if v, ok := projaxMeta["status"].(string); ok && v != "" { it.Status = v } if v, ok := projaxMeta["tags"]; ok { it.Tags = anyToStringSlice(v) } if v, ok := projaxMeta["management"]; ok { it.Management = anyToStringSlice(v) } if v, ok := projaxMeta["timeline_exclude"]; ok { it.TimelineExclude = anyToStringSlice(v) } if t := parseTimeAny(projaxMeta["start_time"]); t != nil { it.StartTime = t } if t := parseTimeAny(projaxMeta["end_time"]); t != nil { it.EndTime = t } if pub, ok := projaxMeta["public"].(map[string]any); ok { if v, ok := pub["enabled"].(bool); ok { it.Public = v } it.PublicDescription, _ = pub["description"].(string) it.PublicLiveURL, _ = pub["live_url"].(string) it.PublicSourceURL, _ = pub["source_url"].(string) it.PublicScreenshots = anyToStringSlice(pub["screenshots"]) } if it.Tags == nil { it.Tags = []string{} } if it.Management == nil { it.Management = []string{} } if it.TimelineExclude == nil { it.TimelineExclude = []string{} } if it.PublicScreenshots == nil { it.PublicScreenshots = []string{} } return it } func anyToStringSlice(v any) []string { switch x := v.(type) { case []string: return x case []any: out := make([]string, 0, len(x)) for _, e := range x { if s, ok := e.(string); ok { out = append(out, s) } } return out } return nil } func parseTimeAny(v any) *time.Time { s, ok := v.(string) if !ok || s == "" { return nil } for _, layout := range []string{time.RFC3339, time.RFC3339Nano, "2006-01-02T15:04:05Z", "2006-01-02"} { if t, err := time.Parse(layout, s); err == nil { return &t } } return nil } // ==================================================================== // graphContext — bulk edge fetch reused across read methods // ==================================================================== // graphContext caches the projax-edge graph + node-id lookups for one // adapter call. ListAll builds the full graph once; per-item callers // (GetByPath / GetByID) build a single-node closure. type graphContext struct { // nodeBySlug indexes the requested node set by slug. Used by path // resolution. nodeBySlug map[string]*nodeRow // nodeByID indexes by uuid. Used by parent / outbound traversal. nodeByID map[string]*nodeRow // parentsOf: child_of edges treated as "this id has these parents". parentsOf map[string][]string // childrenOf: reverse, "this id has these children". Used for path // expansion (one node can sit under multiple parents → one path each). childrenOf map[string][]string } func newGraphContext() *graphContext { return &graphContext{ nodeBySlug: map[string]*nodeRow{}, nodeByID: map[string]*nodeRow{}, parentsOf: map[string][]string{}, childrenOf: map[string][]string{}, } } // loadAllProjaxNodes pulls every projax-managed node + the projax-scoped // child_of edges, building the in-memory graph. Two queries — cheap at // m's scale (~65 nodes, ~78 edges). func loadAllProjaxNodes(ctx context.Context, pool *pgxpool.Pool) (*graphContext, error) { gc := newGraphContext() nrows, err := pool.Query(ctx, `SELECT `+projaxNodeColumns+` FROM mbrian.nodes n WHERE `+projaxNodeWhere+` ORDER BY n.slug`) if err != nil { return nil, fmt.Errorf("query nodes: %w", err) } defer nrows.Close() for nrows.Next() { r, err := scanNodeRow(nrows) if err != nil { return nil, err } gc.nodeByID[r.ID] = r gc.nodeBySlug[r.Slug] = r } if err := nrows.Err(); err != nil { return nil, err } erows, err := pool.Query(ctx, `SELECT e.source_id::text, e.target_id::text FROM mbrian.edges e WHERE e.rel = 'child_of' AND e.source_id IN (SELECT id FROM mbrian.nodes WHERE metadata ? 'projax_origin' AND deleted_at IS NULL) AND e.target_id IN (SELECT id FROM mbrian.nodes WHERE metadata ? 'projax_origin' AND deleted_at IS NULL)`) if err != nil { return nil, fmt.Errorf("query edges: %w", err) } defer erows.Close() for erows.Next() { var src, tgt string if err := erows.Scan(&src, &tgt); err != nil { return nil, err } gc.parentsOf[src] = append(gc.parentsOf[src], tgt) gc.childrenOf[tgt] = append(gc.childrenOf[tgt], src) } return gc, erows.Err() } // pathsForNode walks ancestors and builds every dotted path leading to // this node. Multi-parent → multiple paths; mirrors the projax.items // `paths text[]` shape. // // Sorted + deduped output. Recursion depth-capped at 64 hops to match // projax's path trigger. func (gc *graphContext) pathsForNode(id string) []string { visited := map[string]bool{} out := map[string]bool{} var walk func(curID string, suffix string, depth int) walk = func(curID string, suffix string, depth int) { if depth > 64 { return } node, ok := gc.nodeByID[curID] if !ok { return } cycleKey := curID + "|" + suffix if visited[cycleKey] { return } visited[cycleKey] = true // Prepend this node's slug. var here string if suffix == "" { here = node.Slug } else { here = node.Slug + "." + suffix } parents := gc.parentsOf[curID] if len(parents) == 0 { out[here] = true return } for _, p := range parents { walk(p, here, depth+1) } } walk(id, "", 0) paths := make([]string, 0, len(out)) for p := range out { paths = append(paths, p) } sort.Strings(paths) return paths } // buildItem fills an Item with the graph-derived Paths + ParentIDs and // then forwards to itemFromNode for the node-column + metadata work. func (gc *graphContext) buildItem(id string) *Item { r, ok := gc.nodeByID[id] if !ok { return nil } it := itemFromNode(r) it.Paths = gc.pathsForNode(id) parents := gc.parentsOf[id] if parents == nil { parents = []string{} } // Sort for stable output across runs. sort.Strings(parents) it.ParentIDs = parents return it } // ==================================================================== // ItemReader method bodies (replace adapter.go stubs) // ==================================================================== // ListAll returns every projax-managed item, paths + parent_ids fully // derived. One graph build per call; cheap at m's scale. func (r *MBrianReader) ListAll(ctx context.Context) ([]*Item, error) { gc, err := loadAllProjaxNodes(ctx, r.pool) if err != nil { return nil, err } out := make([]*Item, 0, len(gc.nodeByID)) for id := range gc.nodeByID { out = append(out, gc.buildItem(id)) } sort.Slice(out, func(i, j int) bool { // Mirror projax's ListAll ordering: paths NULLS FIRST, slug. ip, jp := out[i].PrimaryPath(), out[j].PrimaryPath() if ip == jp { return out[i].Slug < out[j].Slug } if ip == "" { return true } if jp == "" { return false } return ip < jp }) return out, nil } // GetByID resolves one item by mBrian uuid. func (r *MBrianReader) GetByID(ctx context.Context, id string) (*Item, error) { gc, err := loadAllProjaxNodes(ctx, r.pool) if err != nil { return nil, err } if _, ok := gc.nodeByID[id]; !ok { return nil, ErrNotFound } return gc.buildItem(id), nil } // GetByPath resolves a dotted path (`dev.paliad`, `work.upc.deadlines`) // to its leaf node, then materialises via the graph context. func (r *MBrianReader) GetByPath(ctx context.Context, path string) (*Item, error) { if path == "" { return nil, ErrNotFound } gc, err := loadAllProjaxNodes(ctx, r.pool) if err != nil { return nil, err } parts := strings.Split(path, ".") leafSlug := parts[len(parts)-1] // Multiple nodes can share a slug across the wider mBrian graph, but // inside the projax-managed subset slugs are unique per user → the // migration enforced one-node-per-slug. Pick that node. node, ok := gc.nodeBySlug[leafSlug] if !ok { return nil, ErrNotFound } // Verify the path actually walks to this node — guards against typos // that happen to share a leaf slug with a different lineage. paths := gc.pathsForNode(node.ID) for _, p := range paths { if p == path { return gc.buildItem(node.ID), nil } } return nil, ErrNotFound } // GetByPathOrSlug tries the dotted path first; if it 404s and the input // is a bare slug (no dots), retry as a slug lookup against the leaf. func (r *MBrianReader) GetByPathOrSlug(ctx context.Context, key string) (*Item, error) { if it, err := r.GetByPath(ctx, key); err == nil { return it, nil } else if !errors.Is(err, ErrNotFound) { return nil, err } // Bare slug fallback. if strings.Contains(key, ".") { return nil, ErrNotFound } gc, err := loadAllProjaxNodes(ctx, r.pool) if err != nil { return nil, err } if node, ok := gc.nodeBySlug[key]; ok { return gc.buildItem(node.ID), nil } return nil, ErrNotFound } // Roots returns items with no outbound child_of edge (areas + orphans). func (r *MBrianReader) Roots(ctx context.Context) ([]*Item, error) { gc, err := loadAllProjaxNodes(ctx, r.pool) if err != nil { return nil, err } out := []*Item{} for id := range gc.nodeByID { if len(gc.parentsOf[id]) == 0 { out = append(out, gc.buildItem(id)) } } sort.Slice(out, func(i, j int) bool { return out[i].Slug < out[j].Slug }) return out, nil } // MaiOrphans returns root mai-managed items needing classification — // projax's /admin/classify surface. func (r *MBrianReader) MaiOrphans(ctx context.Context) ([]*Item, error) { gc, err := loadAllProjaxNodes(ctx, r.pool) if err != nil { return nil, err } out := []*Item{} for id, n := range gc.nodeByID { if len(gc.parentsOf[id]) > 0 { continue } it := gc.buildItem(id) if it == nil { continue } if !it.HasManagement("mai") { continue } _ = n out = append(out, it) } sort.Slice(out, func(i, j int) bool { return out[i].Slug < out[j].Slug }) return out, nil } // ListByFilters filters in-memory after a full graph load. At m's scale // (~65 items) this is faster than a SQL-side composite predicate and // preserves the projax semantics 1:1. func (r *MBrianReader) ListByFilters(ctx context.Context, f SearchFilters) ([]*Item, error) { items, err := r.ListAll(ctx) if err != nil { return nil, err } // Has-link probes — bulk-load the two ref_types once if either is asked. var hasRepo, hasCal map[string]bool if f.HasRepo != nil { hasRepo = map[string]bool{} links, err := r.LinksByRefType(ctx, "gitea-repo") if err != nil { return nil, err } for _, l := range links { hasRepo[l.ItemID] = true } } if f.HasCalDAV != nil { hasCal = map[string]bool{} links, err := r.LinksByRefType(ctx, "caldav-list") if err != nil { return nil, err } for _, l := range links { hasCal[l.ItemID] = true } } out := []*Item{} for _, it := range items { if f.ParentPath != "" { scoped := false pfx := f.ParentPath + "." for _, p := range it.Paths { if p == f.ParentPath || strings.HasPrefix(p, pfx) { scoped = true break } } if !scoped { continue } } if len(f.Tags) > 0 { ok := true for _, t := range f.Tags { if !it.HasTag(t) { ok = false break } } if !ok { continue } } if len(f.Management) > 0 { ok := true for _, m := range f.Management { if !it.HasManagement(m) { ok = false break } } if !ok { continue } } if len(f.Kind) > 0 { ok := false for _, k := range f.Kind { if containsString(it.Kind, k) { ok = true break } } if !ok { continue } } if f.Status != "" && it.Status != f.Status { continue } if f.Q != "" && !itemMatchesSubstring(it, strings.ToLower(f.Q)) { continue } if f.HasRepo != nil && hasRepo[it.ID] != *f.HasRepo { continue } if f.HasCalDAV != nil && hasCal[it.ID] != *f.HasCalDAV { continue } if f.Public != nil && it.Public != *f.Public { continue } out = append(out, it) } if f.Limit > 0 && len(out) > f.Limit { out = out[:f.Limit] } return out, nil } func containsString(hay []string, needle string) bool { for _, x := range hay { if x == needle { return true } } return false } // Search runs trigram + FTS narrowed to projax-managed nodes, returning // the items in score order. Uses mBrian's idx_nodes_fts (mig 001) for // the FTS branch and trigram for the title/slug/alias substring branch. func (r *MBrianReader) Search(ctx context.Context, q string, limit int) ([]*Item, error) { q = strings.TrimSpace(q) if q == "" { return nil, nil } if limit <= 0 || limit > 200 { limit = 50 } gc, err := loadAllProjaxNodes(ctx, r.pool) if err != nil { return nil, err } // In-memory filter — m's scale doesn't justify a custom SQL ranking. // Mirror Search behaviour from store.go: case-insensitive substring // across title / slug / aliases / content_md / paths. ql := strings.ToLower(q) out := []*Item{} for id := range gc.nodeByID { it := gc.buildItem(id) if it == nil { continue } if itemMatchesSubstring(it, ql) { out = append(out, it) } if len(out) >= limit { break } } sort.Slice(out, func(i, j int) bool { return out[i].Slug < out[j].Slug }) return out, nil } func itemMatchesSubstring(it *Item, q string) bool { if strings.Contains(strings.ToLower(it.Title), q) { return true } if strings.Contains(strings.ToLower(it.Slug), q) { return true } if strings.Contains(strings.ToLower(it.ContentMD), q) { return true } for _, a := range it.Aliases { if strings.Contains(strings.ToLower(a), q) { return true } } for _, p := range it.Paths { if strings.Contains(strings.ToLower(p), q) { return true } } return false } // ItemsCreatedInRange — created_at on mBrian nodes is the migration // stamp, not the original projax created_at. Order off // metadata.projax.start_time when present, fall back to created_at. func (r *MBrianReader) ItemsCreatedInRange(ctx context.Context, from, to time.Time) ([]*Item, error) { items, err := r.ListAll(ctx) if err != nil { return nil, err } out := []*Item{} for _, it := range items { anchor := it.CreatedAt if it.StartTime != nil { anchor = *it.StartTime } if !anchor.Before(from) && anchor.Before(to) { out = append(out, it) } } sort.Slice(out, func(i, j int) bool { a, b := out[i].CreatedAt, out[j].CreatedAt if out[i].StartTime != nil { a = *out[i].StartTime } if out[j].StartTime != nil { b = *out[j].StartTime } return a.Before(b) }) return out, nil } // AllTags unions metadata.projax.tags across every projax-managed node. // Full-scan; cheap at m's scale per §3 gap notes. func (r *MBrianReader) AllTags(ctx context.Context) ([]string, error) { gc, err := loadAllProjaxNodes(ctx, r.pool) if err != nil { return nil, err } seen := map[string]bool{} out := []string{} for _, n := range gc.nodeByID { if pm, ok := n.Metadata["projax"].(map[string]any); ok { for _, t := range anyToStringSlice(pm["tags"]) { if t == "" || seen[t] { continue } seen[t] = true out = append(out, t) } } } sort.Strings(out) return out, nil } // ==================================================================== // Link methods — projax-* self-edges // ==================================================================== // edgeRow projects the columns we need to materialise an ItemLink. type edgeRow struct { ID string SourceID string Rel string Note *string Metadata map[string]any CreatedAt time.Time } func scanEdgeRow(s interface { Scan(dest ...any) error }) (*edgeRow, error) { r := &edgeRow{} if err := s.Scan(&r.ID, &r.SourceID, &r.Rel, &r.Note, &r.Metadata, &r.CreatedAt); err != nil { return nil, err } if r.Metadata == nil { r.Metadata = map[string]any{} } return r, nil } const edgeColumns = `e.id::text, e.source_id::text, e.rel, e.note, e.metadata, e.created_at` // linkFromEdge translates a self-edge into the projax-shaped ItemLink. // Per contract: ref_type = strip "projax-" prefix; ref_id derived from // edge.metadata per ref_type per the m/mBrian#73 contract. func linkFromEdge(r *edgeRow) *ItemLink { refType := strings.TrimPrefix(r.Rel, "projax-") l := &ItemLink{ ID: r.ID, ItemID: r.SourceID, RefType: refType, Rel: "", Metadata: map[string]any{}, CreatedAt: r.CreatedAt, } // The original projax.item_links.rel (free-form annotation) lives at // metadata.projax_rel — set it back on Rel. if v, ok := r.Metadata["projax_rel"].(string); ok { l.Rel = v } // Per-ref_type RefID extraction. switch refType { case "caldav-list": if v, ok := r.Metadata["url"].(string); ok { l.RefID = v } case "gitea-repo": owner, _ := r.Metadata["owner"].(string) repo, _ := r.Metadata["repo"].(string) if owner != "" && repo != "" { l.RefID = owner + "/" + repo } case "gitea-issue": owner, _ := r.Metadata["owner"].(string) repo, _ := r.Metadata["repo"].(string) num, _ := r.Metadata["number"].(float64) if owner != "" && repo != "" && num > 0 { l.RefID = fmt.Sprintf("%s/%s#%d", owner, repo, int(num)) } case "mai-project": if v, ok := r.Metadata["mai_project_id"].(string); ok { l.RefID = v } case "url", "doc", "document", "note": if v, ok := r.Metadata["url"].(string); ok { l.RefID = v } else if v, ok := r.Metadata["path"].(string); ok { l.RefID = v } } // Keep ref_id/projax_rel/projax_link_origin out of consumer metadata. for k, v := range r.Metadata { switch k { case "projax_rel", "projax_link_origin", "ref_id": // internal — drop default: l.Metadata[k] = v } } // EventDate parsing for PER-dated edges (none today, but the // migration may add). l.EventDate = parseTimeAny(r.Metadata["event_date"]) if r.Note != nil && *r.Note != "" { l.Note = r.Note } return l } // LinksByType — one item's projax-* edges of a given ref_type. Empty // refType returns every projax-* edge for the item (matches store.go). func (r *MBrianReader) LinksByType(ctx context.Context, itemID, refType string) ([]*ItemLink, error) { var rows pgx.Rows var err error if refType == "" { rows, err = r.pool.Query(ctx, `SELECT `+edgeColumns+` FROM mbrian.edges e WHERE e.source_id = $1 AND e.rel LIKE 'projax-%' ORDER BY e.created_at`, itemID) } else { rows, err = r.pool.Query(ctx, `SELECT `+edgeColumns+` FROM mbrian.edges e WHERE e.source_id = $1 AND e.rel = $2 ORDER BY e.created_at`, itemID, "projax-"+refType) } if err != nil { return nil, err } defer rows.Close() out := []*ItemLink{} for rows.Next() { er, err := scanEdgeRow(rows) if err != nil { return nil, err } out = append(out, linkFromEdge(er)) } return out, rows.Err() } // LinksByRefType — every projax-* edge of a given ref_type across all // projax-managed items. func (r *MBrianReader) LinksByRefType(ctx context.Context, refType string) ([]*ItemLink, error) { rows, err := r.pool.Query(ctx, `SELECT `+edgeColumns+` FROM mbrian.edges e WHERE e.rel = $1 ORDER BY e.created_at`, "projax-"+refType) if err != nil { return nil, err } defer rows.Close() out := []*ItemLink{} for rows.Next() { er, err := scanEdgeRow(rows) if err != nil { return nil, err } out = append(out, linkFromEdge(er)) } return out, rows.Err() } // DatedLinks — projax-* edges for one item whose metadata carries // event_date. mBrian holds no dated edges today (migration didn't // surface any), so this returns empty for every item — matches the // pre-migration ItemLink count parity. func (r *MBrianReader) DatedLinks(ctx context.Context, itemID string) ([]*ItemLink, error) { rows, err := r.pool.Query(ctx, `SELECT `+edgeColumns+` FROM mbrian.edges e WHERE e.source_id = $1 AND e.rel LIKE 'projax-%' AND e.metadata ? 'event_date' ORDER BY e.metadata->>'event_date'`, itemID) if err != nil { return nil, err } defer rows.Close() out := []*ItemLink{} for rows.Next() { er, err := scanEdgeRow(rows) if err != nil { return nil, err } out = append(out, linkFromEdge(er)) } return out, rows.Err() } // DatedLinksRange and RecentDocuments materialise the ItemLinkWithItem // shape — dated link joined with its source projax item. func (r *MBrianReader) DatedLinksRange(ctx context.Context, from, to time.Time) ([]*ItemLinkWithItem, error) { return r.datedJoin(ctx, `SELECT `+edgeColumns+` FROM mbrian.edges e WHERE e.rel LIKE 'projax-%' AND e.metadata ? 'event_date' AND (e.metadata->>'event_date')::date BETWEEN $1 AND $2 ORDER BY e.metadata->>'event_date' DESC`, from, to) } func (r *MBrianReader) RecentDocuments(ctx context.Context, since time.Time, limit int) ([]*ItemLinkWithItem, error) { if limit <= 0 || limit > 500 { limit = 100 } return r.datedJoinLimit(ctx, `SELECT `+edgeColumns+` FROM mbrian.edges e WHERE e.rel LIKE 'projax-%' AND e.metadata ? 'event_date' AND (e.metadata->>'event_date')::date >= $1 ORDER BY e.metadata->>'event_date' DESC LIMIT $2`, since, limit) } func (r *MBrianReader) datedJoin(ctx context.Context, sql string, from, to time.Time) ([]*ItemLinkWithItem, error) { rows, err := r.pool.Query(ctx, sql, from.Format("2006-01-02"), to.Format("2006-01-02")) if err != nil { return nil, err } return r.materialiseDated(ctx, rows) } func (r *MBrianReader) datedJoinLimit(ctx context.Context, sql string, since time.Time, limit int) ([]*ItemLinkWithItem, error) { rows, err := r.pool.Query(ctx, sql, since.Format("2006-01-02"), limit) if err != nil { return nil, err } return r.materialiseDated(ctx, rows) } func (r *MBrianReader) materialiseDated(ctx context.Context, rows pgx.Rows) ([]*ItemLinkWithItem, error) { defer rows.Close() links := []*ItemLink{} itemIDs := map[string]bool{} for rows.Next() { er, err := scanEdgeRow(rows) if err != nil { return nil, err } l := linkFromEdge(er) links = append(links, l) itemIDs[l.ItemID] = true } if err := rows.Err(); err != nil { return nil, err } if len(links) == 0 { return nil, nil } // Single bulk fetch for the source-side nodes the rows reference. ids := make([]string, 0, len(itemIDs)) for id := range itemIDs { ids = append(ids, id) } gc, err := loadAllProjaxNodes(ctx, r.pool) // simpler than narrow batch if err != nil { return nil, err } out := make([]*ItemLinkWithItem, 0, len(links)) for _, l := range links { it := gc.buildItem(l.ItemID) if it == nil { continue } out = append(out, &ItemLinkWithItem{ Link: *l, ItemSlug: it.Slug, ItemTitle: it.Title, ItemPaths: it.Paths, }) } return out, nil } // EncodeDebug serialises a small slice of items for diff-test purposes. // Not part of ItemReader; kept here so the parity test in mbrian_test.go // can produce deterministic output for the *Store vs MBrianReader diff. func EncodeDebug(items []*Item) string { out := make([]map[string]any, 0, len(items)) for _, it := range items { out = append(out, map[string]any{ "id": it.ID, "slug": it.Slug, "title": it.Title, "paths": it.Paths, "status": it.Status, "tags": it.Tags, "management": it.Management, }) } b, _ := json.MarshalIndent(out, "", " ") return string(b) }