Files
paliad/internal/services/broadcast_service.go
m 52ee319fd8 feat(t-paliad-147): bulk team email — send to filtered selection from /team page
Implements issue #7. Adds an "E-Mail an Auswahl" button on /team that sends
personalised emails to a filter-narrowed subset of the team. Each recipient
gets their own envelope (per-recipient privacy, no shared To: list); From
stays on the SMTP infrastructure address with Reply-To set to the human
sender so replies route correctly without forging DKIM/SPF.

Backend
- Migration 057: paliad.email_broadcasts (subject, body, sender_id,
  template_key, recipient_filter jsonb, recipient_user_ids uuid[],
  send_report jsonb, sent_at). RLS: senders read own rows, global_admin
  reads all; inserts must self-attribute. No CHECK-constraint extension to
  partner_unit_events — broadcasts get their own table per the lock.
- BroadcastService (internal/services/broadcast_service.go): validates
  subject/body/recipient cap (100), enforces project_lead-OR-global_admin,
  persists audit row, dispatches via 5-deep goroutine pool with 15s
  per-send timeout. Send report (sent/failed counts + per-recipient errors)
  is captured back into email_broadcasts.send_report.
- markdown.go: minimal Markdown→safe HTML renderer (paragraphs, **bold**,
  *italic*, `code`, [text](url), bullet lists). Inputs are HTML-escaped
  first; only whitelisted tags re-emitted. Script tags and javascript:
  URLs can't slip through.
- Placeholder substitution: {{name}}, {{first_name}},
  {{role_on_project}} (whitespace tolerated). Unknown {{...}} tokens pass
  through unchanged.
- mail_service.go: buildMIMEWithReplyTo helper layers a Reply-To header
  on top of the existing multipart/alternative envelope.
- TeamService.ListMembershipsIndex: visibility-gated user→project_ids
  index. Powers the /team project multi-select filter without N round
  trips per project.
- Handlers: POST /api/team/broadcast (gateOnboarded; service enforces
  authority), GET /api/team/memberships, GET /api/admin/broadcasts (list),
  GET /api/admin/broadcasts/{id} (detail), GET /admin/broadcasts (page).
  /admin/broadcasts is gateOnboarded (not adminGate) so leads can see
  their own sends; the service applies the per-row visibility filter.

Frontend
- /team gains a project multi-select chip dropdown (visible projects
  loaded from /api/projects, intersected against the memberships index)
  alongside the existing office and role filters.
- "E-Mail an Auswahl (N)" button appears only when canBroadcast() is
  true (global_admin always; non-admin needs lead-ship on selected
  projects, or at least one project when no filter is set). Server still
  re-checks per send.
- Compose modal (broadcast.ts): subject + body textarea + optional
  template dropdown (loads existing email templates and strips Go-template
  directives) + recipient preview (first 5 + expand) + send. Hard-blocks
  empty subject/body and N=0. Shows per-send report on success.
- /admin/broadcasts viewer: read-only list with click-row-to-expand
  detail (subject, body, recipient list, send_report counts).

Tests
- broadcast_service_test.go: placeholder substitution table-driven,
  Markdown safe-render incl. XSS guards (<script>, javascript: URLs),
  validation cases (empty subject/body, recipient cap, invalid email),
  signature rendering DE/EN.
- broadcast_service_live_test.go: end-to-end Send + List + Get + visibility
  rules (lead can send on own project, member cannot, admin sees all,
  member can't read lead's row). Skips when TEST_DATABASE_URL is unset.

i18n: 60 new keys × 2 langs (broadcast modal labels, error messages,
recipient summary, /admin/broadcasts viewer, common.close/loading/forbidden/
load_error).
2026-05-07 20:58:57 +02:00

588 lines
21 KiB
Go

// Package services — BroadcastService — bulk team-email send.
//
// Backs the /team page "E-Mail an Auswahl" flow (t-paliad-147 / issue #7).
// Each call:
//
// 1. Validates the sender's authority (project lead OR global_admin)
// and the recipient cap.
// 2. Renders the per-recipient body (Markdown → HTML, with
// {{name}} / {{first_name}} / {{role_on_project}} placeholder
// substitution) inside the standard email base wrapper.
// 3. Dispatches via MailService.Send with Reply-To set to the
// sender's address — From: stays on the SMTP infra address so
// DKIM/SPF still hold. Replies route back to the human.
// 4. Persists a paliad.email_broadcasts row capturing subject,
// body, sender, filter snapshot, and per-recipient send report.
//
// Per-recipient privacy: each recipient gets their own envelope. We
// never put more than one address on the To: header. Recipients can't
// see each other.
//
// Concurrency: a fixed 5-deep goroutine pool dispatches sends with a
// per-send timeout. SMTP failures are logged into the report and the
// batch continues — one bad address never blocks the rest.
package services
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/mail"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"mgit.msbls.de/m/paliad/internal/branding"
"mgit.msbls.de/m/paliad/internal/models"
)
// BroadcastRecipientCap is the soft maximum number of recipients per
// broadcast. m-locked at 100 (2026-05-07) — admin-tweakable later if
// HLC's regular use case grows.
const BroadcastRecipientCap = 100
// BroadcastSendConcurrency caps the number of in-flight SMTP
// connections during a single broadcast. Five is generous enough to
// finish a 100-recipient batch in a few seconds while leaving headroom
// for the reminder job's own SMTP usage.
const BroadcastSendConcurrency = 5
// BroadcastSendTimeout bounds a single per-recipient SMTP delivery.
// Hostinger's submission endpoint typically returns within a second;
// 15s gives plenty of slack for transient slowness without holding the
// HTTP request open indefinitely.
const BroadcastSendTimeout = 15 * time.Second
// Sentinel errors. Handlers map these to HTTP status codes.
var (
ErrBroadcastForbidden = errors.New("broadcast: caller is neither project lead nor global_admin")
ErrBroadcastNoRecipients = errors.New("broadcast: empty recipient list")
ErrBroadcastTooManyRecipients = errors.New("broadcast: recipient cap exceeded")
ErrBroadcastEmptySubject = errors.New("broadcast: empty subject")
ErrBroadcastEmptyBody = errors.New("broadcast: empty body")
ErrBroadcastInvalidEmail = errors.New("broadcast: invalid recipient email")
)
// BroadcastService wires the bulk-send flow.
type BroadcastService struct {
db *sqlx.DB
mail *MailService
users *UserService
team *TeamService
templates *EmailTemplateService
// clock isolates time.Now for tests.
clock func() time.Time
}
// NewBroadcastService wires the service. mail/users/team/templates
// must all be non-nil — the service is only constructed in the DB-backed
// path.
func NewBroadcastService(db *sqlx.DB, mail *MailService, users *UserService, team *TeamService, templates *EmailTemplateService) *BroadcastService {
return &BroadcastService{
db: db,
mail: mail,
users: users,
team: team,
templates: templates,
clock: func() time.Time { return time.Now() },
}
}
// BroadcastRecipient is one row in the resolved addressee list. Name
// values are the per-recipient placeholder substitutions surfaced in
// the body.
type BroadcastRecipient struct {
UserID uuid.UUID
Email string
DisplayName string
FirstName string
RoleOnProject string
}
// BroadcastInput is what a handler hands to Send.
type BroadcastInput struct {
// ProjectID identifies the project the broadcast is scoped to. The
// caller must be a 'lead' on this project (or a global_admin) for
// the send to proceed. nil/zero means "no specific project" —
// only global_admin may send in that case.
ProjectID *uuid.UUID
Subject string
// Body is the Markdown source the sender typed. Per-recipient
// placeholders ({{name}}, {{first_name}}, {{role_on_project}})
// are substituted before Markdown rendering.
Body string
// TemplateKey is optional — when set, the broadcast is recorded as
// having started from a template, but Subject/Body are still the
// authoritative source (we don't re-fetch from the template at
// send time).
TemplateKey string
// RecipientFilter is the snapshot of filter chips the sender had
// selected. Persisted into email_broadcasts.recipient_filter for
// future audit.
RecipientFilter map[string]any
Recipients []BroadcastRecipient
// Lang controls the wrapper template language. Defaults to "de".
Lang string
}
// BroadcastReport summarises a send.
type BroadcastReport struct {
BroadcastID uuid.UUID `json:"broadcast_id"`
Total int `json:"total"`
Sent int `json:"sent"`
Failed int `json:"failed"`
Errors map[string]string `json:"errors,omitempty"` // user_id → error
SentAt time.Time `json:"sent_at"`
}
// Send dispatches a broadcast. Returns the persisted ID and a per-send
// report. The full pipeline runs even when MailService is disabled —
// the audit row still lands so deploys without SMTP can be exercised.
func (s *BroadcastService) Send(ctx context.Context, callerID uuid.UUID, in BroadcastInput) (*BroadcastReport, error) {
// --- Validation (cheap checks first) ----------------------------
subject := strings.TrimSpace(in.Subject)
if subject == "" {
return nil, ErrBroadcastEmptySubject
}
body := strings.TrimSpace(in.Body)
if body == "" {
return nil, ErrBroadcastEmptyBody
}
if len(in.Recipients) == 0 {
return nil, ErrBroadcastNoRecipients
}
if len(in.Recipients) > BroadcastRecipientCap {
return nil, fmt.Errorf("%w: %d > %d", ErrBroadcastTooManyRecipients, len(in.Recipients), BroadcastRecipientCap)
}
for _, r := range in.Recipients {
if _, err := mail.ParseAddress(r.Email); err != nil {
return nil, fmt.Errorf("%w: %q", ErrBroadcastInvalidEmail, r.Email)
}
}
// --- Authorisation ---------------------------------------------
sender, err := s.users.GetByID(ctx, callerID)
if err != nil {
return nil, fmt.Errorf("load sender: %w", err)
}
if sender == nil {
return nil, ErrBroadcastForbidden
}
if err := s.assertCanBroadcast(ctx, sender, in.ProjectID); err != nil {
return nil, err
}
// --- Persist audit row ahead of send so a partial-batch crash
// still leaves a record of intent. send_report is filled in
// post-dispatch via UPDATE.
lang := in.Lang
if lang == "" {
lang = "de"
}
broadcastID := uuid.New()
recipientIDs := make([]uuid.UUID, 0, len(in.Recipients))
for _, r := range in.Recipients {
recipientIDs = append(recipientIDs, r.UserID)
}
filterJSON, err := json.Marshal(filterMapOrEmpty(in.RecipientFilter))
if err != nil {
return nil, fmt.Errorf("marshal filter: %w", err)
}
templateKey := strings.TrimSpace(in.TemplateKey)
var templateKeyArg any
if templateKey != "" {
templateKeyArg = templateKey
}
if _, err := s.db.ExecContext(ctx, `
INSERT INTO paliad.email_broadcasts
(id, subject, body, sender_id, template_key, recipient_filter, recipient_user_ids, send_report, sent_at)
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7, '{}'::jsonb, now())`,
broadcastID, subject, body, callerID, templateKeyArg, string(filterJSON), pq.Array(recipientIDs),
); err != nil {
return nil, fmt.Errorf("insert broadcast: %w", err)
}
// --- Dispatch -------------------------------------------------
report, sendErr := s.dispatch(ctx, *sender, broadcastID, subject, body, lang, in.Recipients)
report.BroadcastID = broadcastID
// Persist the report regardless of dispatch outcome; surface the
// dispatch error to the caller so the UI can show a partial-success
// toast.
reportJSON, marshalErr := json.Marshal(report)
if marshalErr != nil {
// Truly unexpected — fall back to an empty report shape rather
// than wedging the audit row.
slog.Error("broadcast: marshal report failed", "broadcast_id", broadcastID, "error", marshalErr)
reportJSON = []byte(`{}`)
}
if _, err := s.db.ExecContext(ctx,
`UPDATE paliad.email_broadcasts SET send_report = $1::jsonb WHERE id = $2`,
string(reportJSON), broadcastID,
); err != nil {
slog.Error("broadcast: persist report failed", "broadcast_id", broadcastID, "error", err)
}
if sendErr != nil {
return report, sendErr
}
return report, nil
}
// assertCanBroadcast enforces project_lead-OR-global_admin. global_admin
// always wins; otherwise the sender must have role='lead' on
// in.ProjectID.
func (s *BroadcastService) assertCanBroadcast(ctx context.Context, sender *models.User, projectID *uuid.UUID) error {
if sender.GlobalRole == "global_admin" {
return nil
}
if projectID == nil {
return ErrBroadcastForbidden
}
var count int
if err := s.db.GetContext(ctx, &count,
`SELECT COUNT(*) FROM paliad.project_teams
WHERE project_id = $1 AND user_id = $2 AND role = 'lead'`,
*projectID, sender.ID,
); err != nil {
return fmt.Errorf("check lead role: %w", err)
}
if count == 0 {
return ErrBroadcastForbidden
}
return nil
}
// dispatch fans out the per-recipient sends through a bounded pool and
// collects the report.
func (s *BroadcastService) dispatch(ctx context.Context, sender models.User, broadcastID uuid.UUID, subject, body, lang string, recipients []BroadcastRecipient) (*BroadcastReport, error) {
type result struct {
userID uuid.UUID
err error
}
results := make(chan result, len(recipients))
sem := make(chan struct{}, BroadcastSendConcurrency)
var wg sync.WaitGroup
for _, r := range recipients {
wg.Add(1)
go func(rec BroadcastRecipient) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
sendCtx, cancel := context.WithTimeout(ctx, BroadcastSendTimeout)
defer cancel()
err := s.sendOne(sendCtx, sender, broadcastID, subject, body, lang, rec)
results <- result{userID: rec.UserID, err: err}
}(r)
}
wg.Wait()
close(results)
report := &BroadcastReport{
Total: len(recipients),
Errors: map[string]string{},
SentAt: s.clock(),
}
for res := range results {
if res.err != nil {
report.Failed++
report.Errors[res.userID.String()] = res.err.Error()
slog.Warn("broadcast: send failed",
"broadcast_id", broadcastID, "user_id", res.userID, "error", res.err)
} else {
report.Sent++
}
}
return report, nil
}
// sendOne renders one personalised email and dispatches it. The
// MailService no-ops cleanly when disabled — that path still treats
// the recipient as "sent" for the purposes of the report so dev
// deploys aren't littered with phantom failures.
func (s *BroadcastService) sendOne(ctx context.Context, sender models.User, broadcastID uuid.UUID, subject, body, lang string, rec BroadcastRecipient) error {
// Subject can carry placeholders too ("Hallo {{first_name}}, …").
rendered := substitutePlaceholders(subject, rec)
personalisedBody := substitutePlaceholders(body, rec)
htmlBody, err := s.renderBroadcastBody(ctx, lang, personalisedBody, sender)
if err != nil {
return fmt.Errorf("render body: %w", err)
}
textBody := htmlToText(htmlBody)
// Custom envelope — we want Reply-To: sender so replies route to the
// human who composed the broadcast.
if !s.mail.Enabled() {
slog.Debug("broadcast: SendOne skipped (mail disabled)",
"broadcast_id", broadcastID, "to", rec.Email)
return nil
}
msg := buildMIMEWithReplyTo(s.mail.cfg.From, s.mail.cfg.FromName, sender.Email,
rec.Email, rendered, htmlBody, textBody)
deliverDone := make(chan error, 1)
go func() {
deliverDone <- s.mail.deliver(rec.Email, msg)
}()
select {
case err := <-deliverDone:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// renderBroadcastBody wraps the personalised Markdown body in the
// standard base.html (DB override or embedded fallback) so broadcast
// emails look like the rest of Paliad's mail.
func (s *BroadcastService) renderBroadcastBody(ctx context.Context, lang, markdownBody string, sender models.User) (string, error) {
htmlContent := renderMarkdownSafe(markdownBody)
signature := senderSignature(lang, sender)
// Build the {{define "content"}} block expected by base.html. The
// inner HTML is treated as trusted output (we generated it from
// known-safe Markdown rules). Senders can't sneak script tags
// because renderMarkdownSafe escapes everything before re-introducing
// the whitelisted markup.
contentBlock := fmt.Sprintf(`{{define "content"}}%s%s{{end}}`, htmlContent, signature)
// Look up base.html (key='base'). Same fallback discipline as
// MailService.RenderTemplate — if the active row is malformed we
// retry with the embedded default.
var (
baseBody string
err error
)
if s.templates != nil {
row, lookupErr := s.templates.GetActive(ctx, EmailTemplateKeyBase, lang)
if lookupErr != nil {
return "", fmt.Errorf("lookup base template: %w", lookupErr)
}
baseBody = row.Body
} else {
baseBody, err = readEmbeddedBody(EmailTemplateKeyBase, lang)
if err != nil {
return "", fmt.Errorf("read embedded base: %w", err)
}
}
payload := map[string]any{
"Lang": lang,
"Firm": branding.Name,
"Subject": "", // base.html title field; we don't need it here.
}
html, err := renderBaseAndContent(baseBody, contentBlock, payload)
if err == nil {
return html, nil
}
// Active row malformed — fall back to embedded.
slog.Error("broadcast: base render failed, falling back to embedded",
"lang", lang, "error", err)
fbBase, fbErr := readEmbeddedBody(EmailTemplateKeyBase, lang)
if fbErr != nil {
return "", fmt.Errorf("fallback base: %w", fbErr)
}
return renderBaseAndContent(fbBase, contentBlock, payload)
}
// substitutePlaceholders replaces {{name}}, {{first_name}}, and
// {{role_on_project}} with the per-recipient values. Whitespace
// inside the braces is tolerated. Unknown {{...}} tokens pass through
// untouched so a sender's accidental "literal {{example}}" stays
// readable in the rendered mail.
func substitutePlaceholders(src string, rec BroadcastRecipient) string {
repl := strings.NewReplacer(
"{{name}}", rec.DisplayName,
"{{ name }}", rec.DisplayName,
"{{first_name}}", rec.FirstName,
"{{ first_name }}", rec.FirstName,
"{{role_on_project}}", rec.RoleOnProject,
"{{ role_on_project }}", rec.RoleOnProject,
)
return repl.Replace(src)
}
// senderSignature appends a "Geschickt von <DisplayName> <email>"
// footer below the body so the recipient sees who wrote the mail
// even though From: is the SMTP infrastructure address.
func senderSignature(lang string, sender models.User) string {
prefix := "Gesendet von"
if lang == "en" {
prefix = "Sent by"
}
if sender.DisplayName == "" {
return fmt.Sprintf(`<p style="margin-top:24px;font-size:13px;color:#78716c;">%s <a href="mailto:%s">%s</a></p>`,
prefix, escapeHTML(sender.Email), escapeHTML(sender.Email))
}
return fmt.Sprintf(`<p style="margin-top:24px;font-size:13px;color:#78716c;">%s %s &lt;<a href="mailto:%s">%s</a>&gt;</p>`,
prefix, escapeHTML(sender.DisplayName), escapeHTML(sender.Email), escapeHTML(sender.Email))
}
// filterMapOrEmpty normalises a nil filter map to an empty one for
// jsonb persistence.
func filterMapOrEmpty(in map[string]any) map[string]any {
if in == nil {
return map[string]any{}
}
return in
}
// --- broadcast list / get queries ----------------------------------
// BroadcastListEntry is one row on the /admin/broadcasts list.
type BroadcastListEntry struct {
ID uuid.UUID `db:"id" json:"id"`
Subject string `db:"subject" json:"subject"`
SenderID uuid.UUID `db:"sender_id" json:"sender_id"`
SenderName string `db:"sender_name" json:"sender_name"`
SenderEmail string `db:"sender_email" json:"sender_email"`
RecipientCount int `db:"recipient_count" json:"recipient_count"`
SentAt time.Time `db:"sent_at" json:"sent_at"`
TemplateKey *string `db:"template_key" json:"template_key,omitempty"`
}
// BroadcastDetail is the per-row detail view.
type BroadcastDetail struct {
BroadcastListEntry
Body string `db:"body" json:"body"`
RecipientFilter json.RawMessage `db:"recipient_filter" json:"recipient_filter"`
SendReport json.RawMessage `db:"send_report" json:"send_report"`
Recipients []BroadcastDetailRecipient `json:"recipients"`
}
// BroadcastDetailRecipient is one resolved addressee on the detail page.
// Names are joined from paliad.users at read time so the most recent
// display_name shows up; the audit row only retains the user_id.
type BroadcastDetailRecipient struct {
UserID uuid.UUID `db:"id" json:"id"`
Email string `db:"email" json:"email"`
DisplayName string `db:"display_name" json:"display_name"`
}
// List returns broadcasts visible to the caller. global_admin sees
// every row; everyone else sees only their own sends.
func (s *BroadcastService) List(ctx context.Context, callerID uuid.UUID, limit int) ([]BroadcastListEntry, error) {
if limit <= 0 || limit > 200 {
limit = 50
}
caller, err := s.users.GetByID(ctx, callerID)
if err != nil {
return nil, fmt.Errorf("load caller: %w", err)
}
if caller == nil {
return nil, ErrBroadcastForbidden
}
var (
rows []BroadcastListEntry
q string
args []any
)
if caller.GlobalRole == "global_admin" {
q = listBroadcastsSQL + ` ORDER BY b.sent_at DESC LIMIT $1`
args = []any{limit}
} else {
q = listBroadcastsSQL + ` WHERE b.sender_id = $1 ORDER BY b.sent_at DESC LIMIT $2`
args = []any{callerID, limit}
}
if err := s.db.SelectContext(ctx, &rows, q, args...); err != nil {
return nil, fmt.Errorf("list broadcasts: %w", err)
}
return rows, nil
}
// Get returns one broadcast plus its resolved recipient list. Same
// visibility rules as List.
func (s *BroadcastService) Get(ctx context.Context, callerID, id uuid.UUID) (*BroadcastDetail, error) {
caller, err := s.users.GetByID(ctx, callerID)
if err != nil {
return nil, fmt.Errorf("load caller: %w", err)
}
if caller == nil {
return nil, ErrBroadcastForbidden
}
var detail BroadcastDetail
q := `
SELECT b.id, b.subject, b.sender_id, b.template_key,
array_length(b.recipient_user_ids, 1) AS recipient_count,
b.sent_at, b.body, b.recipient_filter, b.send_report,
u.display_name AS sender_name, u.email AS sender_email
FROM paliad.email_broadcasts b
LEFT JOIN paliad.users u ON u.id = b.sender_id
WHERE b.id = $1`
if err := s.db.GetContext(ctx, &detail, q, id); err != nil {
return nil, fmt.Errorf("get broadcast: %w", err)
}
if caller.GlobalRole != "global_admin" && detail.SenderID != callerID {
return nil, ErrBroadcastForbidden
}
// Resolve recipient names. The audit row stores user_ids only; we
// re-join paliad.users at read time so renames flow through. The
// uuid[] column comes back as pq.Array; copy it out for sqlx.
var idArr pq.StringArray
if err := s.db.GetContext(ctx, &idArr,
`SELECT recipient_user_ids::text[] FROM paliad.email_broadcasts WHERE id = $1`, id); err != nil {
return nil, fmt.Errorf("load recipient ids: %w", err)
}
recipientIDs := make([]uuid.UUID, 0, len(idArr))
for _, s := range idArr {
if uid, err := uuid.Parse(s); err == nil {
recipientIDs = append(recipientIDs, uid)
}
}
if len(recipientIDs) > 0 {
var rec []BroadcastDetailRecipient
if err := s.db.SelectContext(ctx, &rec,
`SELECT id, email, display_name
FROM paliad.users
WHERE id = ANY($1)`, pq.Array(recipientIDs),
); err != nil {
return nil, fmt.Errorf("load recipients: %w", err)
}
// Preserve the audit-row order — clients want the original
// dispatch list, not whatever paliad.users ordered them by.
byID := make(map[uuid.UUID]BroadcastDetailRecipient, len(rec))
for _, r := range rec {
byID[r.UserID] = r
}
ordered := make([]BroadcastDetailRecipient, 0, len(recipientIDs))
for _, uid := range recipientIDs {
if r, ok := byID[uid]; ok {
ordered = append(ordered, r)
continue
}
// User row was deleted post-broadcast. Show the bare ID so
// the audit page still accounts for the slot.
ordered = append(ordered, BroadcastDetailRecipient{UserID: uid})
}
detail.Recipients = ordered
}
return &detail, nil
}
const listBroadcastsSQL = `
SELECT b.id, b.subject, b.sender_id, b.template_key,
COALESCE(array_length(b.recipient_user_ids, 1), 0) AS recipient_count,
b.sent_at,
u.display_name AS sender_name, u.email AS sender_email
FROM paliad.email_broadcasts b
LEFT JOIN paliad.users u ON u.id = b.sender_id
`