package notification import ( "context" "crypto/rand" "crypto/sha1" "crypto/tls" "database/sql" "encoding/hex" "encoding/json" "errors" "fmt" "log" "math" "net" "net/mail" "net/smtp" "os" "sort" "strconv" "strings" "sync" "text/template" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/segmentio/kafka-go" ) const ( statusPending = "pending" statusSent = "sent" statusFailed = "failed" statusDead = "dead" ) type Config struct { Enabled bool KafkaBrokers []string NotifyTopic string DispatchBatch int SchedulerTick time.Duration MaxRetry int BackoffBase time.Duration DispatchLease time.Duration DueSoonWindow time.Duration OverdueThrottle time.Duration AlertFailureThreshold float64 AlertPendingThreshold int AlertDLQThreshold int ConsumerGroup string SMTPHost string SMTPPort string SMTPUser string SMTPPass string SMTPFrom string SMTPUseTLS bool DailySummaryDefaultLoc string } func LoadConfigFromEnv() Config { cfg := Config{ Enabled: parseBoolEnv("NOTIFY_ENABLED", true), KafkaBrokers: splitCSV(os.Getenv("KAFKA_BROKERS")), NotifyTopic: getenv("NOTIFY_TOPIC", "notification.jobs"), DispatchBatch: parseIntEnv("NOTIFY_DISPATCH_BATCH", 200), SchedulerTick: time.Duration(parseIntEnv("NOTIFY_SCHED_TICK_SECONDS", 60)) * time.Second, MaxRetry: parseIntEnv("NOTIFY_MAX_RETRY", 5), BackoffBase: time.Duration(parseIntEnv("NOTIFY_BACKOFF_BASE_SECONDS", 30)) * time.Second, DispatchLease: 10 * time.Second, DueSoonWindow: 60 * time.Minute, OverdueThrottle: 30 * time.Minute, AlertFailureThreshold: 0.2, AlertPendingThreshold: 1000, AlertDLQThreshold: 50, ConsumerGroup: getenv("NOTIFY_CONSUMER_GROUP", "supertodo-notification-worker"), SMTPHost: strings.TrimSpace(os.Getenv("SMTP_HOST")), SMTPPort: getenv("SMTP_PORT", "465"), SMTPUser: strings.TrimSpace(os.Getenv("SMTP_USER")), SMTPPass: strings.TrimSpace(os.Getenv("SMTP_PASS")), SMTPFrom: strings.TrimSpace(os.Getenv("SMTP_FROM")), SMTPUseTLS: parseBoolEnv("SMTP_USE_TLS", true), DailySummaryDefaultLoc: getenv("NOTIFY_DEFAULT_LOCALE", "zh"), } if cfg.SMTPFrom == "" { cfg.SMTPFrom = cfg.SMTPUser } if cfg.DispatchBatch <= 0 { cfg.DispatchBatch = 200 } if cfg.SchedulerTick <= 0 { cfg.SchedulerTick = time.Minute } if cfg.MaxRetry <= 0 { cfg.MaxRetry = 5 } if cfg.BackoffBase <= 0 { cfg.BackoffBase = 30 * time.Second } return cfg } type Service struct { pool *pgxpool.Pool cfg Config writer *kafka.Writer reader *kafka.Reader email *EmailAdapter wg sync.WaitGroup } func NewService(pool *pgxpool.Pool, cfg Config) *Service { s := &Service{pool: pool, cfg: cfg} if cfg.Enabled && len(cfg.KafkaBrokers) > 0 { s.writer = &kafka.Writer{ Addr: kafka.TCP(cfg.KafkaBrokers...), Topic: cfg.NotifyTopic, Balancer: &kafka.LeastBytes{}, } s.reader = kafka.NewReader(kafka.ReaderConfig{ Brokers: cfg.KafkaBrokers, Topic: cfg.NotifyTopic, GroupID: cfg.ConsumerGroup, MinBytes: 1, MaxBytes: 10e6, }) } s.email = NewEmailAdapter(cfg) return s } func (s *Service) Close() { if s.writer != nil { _ = s.writer.Close() } if s.reader != nil { _ = s.reader.Close() } s.wg.Wait() } func (s *Service) Start(ctx context.Context) { if !s.cfg.Enabled { log.Println("notification service disabled") return } s.wg.Add(1) go func() { defer s.wg.Done() s.runDueScheduler(ctx) }() s.wg.Add(1) go func() { defer s.wg.Done() s.runDailySummaryScheduler(ctx) }() s.wg.Add(1) go func() { defer s.wg.Done() s.runJobDispatcher(ctx) }() s.wg.Add(1) go func() { defer s.wg.Done() s.runWorker(ctx) }() s.wg.Add(1) go func() { defer s.wg.Done() s.runAlertChecker(ctx) }() } type UserNotificationPrefs struct { Subscribed bool `json:"subscribed"` DNDStart string `json:"dnd_start"` DNDEnd string `json:"dnd_end"` Locale string `json:"locale"` Timezone string `json:"timezone"` DailySummaryEnabled bool `json:"daily_summary_enabled"` DailySummaryTime string `json:"daily_summary_time"` } type NotificationGroup struct { ID int64 `json:"id"` Name string `json:"name"` Emails []string `json:"emails"` } type DLQItem struct { JobID string `json:"job_id"` TemplateID string `json:"template_id"` ToEmails []string `json:"to_emails"` Reason string `json:"reason"` FailedAt string `json:"failed_at"` RetryCount int `json:"retry_count"` TraceID string `json:"trace_id"` ErrorCode string `json:"error_code"` } type FailureReason struct { Code string `json:"code"` Count int64 `json:"count"` } type Metrics struct { From string `json:"from"` To string `json:"to"` SentCount int64 `json:"sent_count"` FailedCount int64 `json:"failed_count"` DeliveryRate float64 `json:"delivery_rate"` AvgRetryCount float64 `json:"avg_retry_count"` P95LatencyMS float64 `json:"p95_latency_ms"` PendingBacklog int64 `json:"pending_backlog"` FailureReasons []FailureReason `json:"failure_reasons"` DLQInLast10Mins int64 `json:"dlq_last_10m"` } func (s *Service) GetPrefs(ctx context.Context, userID int64) (UserNotificationPrefs, error) { if err := s.ensureUserPrefs(ctx, userID); err != nil { return UserNotificationPrefs{}, err } var prefs UserNotificationPrefs err := s.pool.QueryRow(ctx, ` SELECT subscribed, COALESCE(to_char(dnd_start, 'HH24:MI'), ''), COALESCE(to_char(dnd_end, 'HH24:MI'), ''), locale, timezone, daily_summary_enabled, COALESCE(to_char(daily_summary_time, 'HH24:MI'), '09:30') FROM user_notification_prefs WHERE user_id = $1`, userID, ).Scan( &prefs.Subscribed, &prefs.DNDStart, &prefs.DNDEnd, &prefs.Locale, &prefs.Timezone, &prefs.DailySummaryEnabled, &prefs.DailySummaryTime, ) if err != nil { return UserNotificationPrefs{}, err } if prefs.Locale == "" { prefs.Locale = "zh" } if prefs.Timezone == "" { prefs.Timezone = "Asia/Shanghai" } if prefs.DailySummaryTime == "" { prefs.DailySummaryTime = "09:30" } return prefs, nil } func (s *Service) UpdatePrefs(ctx context.Context, userID int64, input UserNotificationPrefs) (UserNotificationPrefs, error) { if err := s.ensureUserPrefs(ctx, userID); err != nil { return UserNotificationPrefs{}, err } input.Locale = normalizeLocale(input.Locale) input.Timezone = strings.TrimSpace(input.Timezone) if input.Timezone == "" { input.Timezone = "Asia/Shanghai" } if _, err := time.LoadLocation(input.Timezone); err != nil { return UserNotificationPrefs{}, fmt.Errorf("invalid timezone") } if input.DailySummaryTime == "" { input.DailySummaryTime = "09:30" } if !isValidHHMM(input.DailySummaryTime) { return UserNotificationPrefs{}, fmt.Errorf("invalid daily_summary_time") } if input.DNDStart != "" && !isValidHHMM(input.DNDStart) { return UserNotificationPrefs{}, fmt.Errorf("invalid dnd_start") } if input.DNDEnd != "" && !isValidHHMM(input.DNDEnd) { return UserNotificationPrefs{}, fmt.Errorf("invalid dnd_end") } _, err := s.pool.Exec(ctx, ` UPDATE user_notification_prefs SET subscribed = $2, dnd_start = NULLIF($3, '')::time, dnd_end = NULLIF($4, '')::time, locale = $5, timezone = $6, daily_summary_enabled = $7, daily_summary_time = NULLIF($8, '')::time, updated_at = now() WHERE user_id = $1`, userID, input.Subscribed, input.DNDStart, input.DNDEnd, input.Locale, input.Timezone, input.DailySummaryEnabled, input.DailySummaryTime, ) if err != nil { return UserNotificationPrefs{}, err } return s.GetPrefs(ctx, userID) } func (s *Service) ListGroups(ctx context.Context, userID int64) ([]NotificationGroup, error) { rows, err := s.pool.Query(ctx, `SELECT id, name, emails FROM notification_groups WHERE user_id = $1 ORDER BY id DESC`, userID) if err != nil { return nil, err } defer rows.Close() out := make([]NotificationGroup, 0) for rows.Next() { var item NotificationGroup if err := rows.Scan(&item.ID, &item.Name, &item.Emails); err != nil { return nil, err } out = append(out, item) } return out, rows.Err() } func (s *Service) CreateGroup(ctx context.Context, userID int64, group NotificationGroup) (NotificationGroup, error) { group.Name = strings.TrimSpace(group.Name) group.Emails = normalizeEmails(group.Emails) if group.Name == "" { return NotificationGroup{}, fmt.Errorf("group name required") } if len(group.Emails) == 0 { return NotificationGroup{}, fmt.Errorf("emails required") } err := s.pool.QueryRow(ctx, ` INSERT INTO notification_groups (user_id, name, emails, created_at, updated_at) VALUES ($1, $2, $3, now(), now()) RETURNING id, name, emails`, userID, group.Name, group.Emails, ).Scan(&group.ID, &group.Name, &group.Emails) if err != nil { return NotificationGroup{}, err } return group, nil } func (s *Service) UpdateGroup(ctx context.Context, userID, id int64, group NotificationGroup) (NotificationGroup, error) { group.Name = strings.TrimSpace(group.Name) group.Emails = normalizeEmails(group.Emails) if group.Name == "" { return NotificationGroup{}, fmt.Errorf("group name required") } if len(group.Emails) == 0 { return NotificationGroup{}, fmt.Errorf("emails required") } err := s.pool.QueryRow(ctx, ` UPDATE notification_groups SET name = $3, emails = $4, updated_at = now() WHERE id = $1 AND user_id = $2 RETURNING id, name, emails`, id, userID, group.Name, group.Emails, ).Scan(&group.ID, &group.Name, &group.Emails) if err != nil { return NotificationGroup{}, err } return group, nil } func (s *Service) DeleteGroup(ctx context.Context, userID, id int64) error { res, err := s.pool.Exec(ctx, `DELETE FROM notification_groups WHERE id = $1 AND user_id = $2`, id, userID) if err != nil { return err } if res.RowsAffected() == 0 { return pgx.ErrNoRows } return nil } func (s *Service) ListDLQ(ctx context.Context, userID int64, page, pageSize int) ([]DLQItem, error) { if page < 1 { page = 1 } if pageSize <= 0 || pageSize > 200 { pageSize = 20 } offset := (page - 1) * pageSize rows, err := s.pool.Query(ctx, ` SELECT j.job_id, j.template_id, j.to_emails, d.reason, d.failed_at, j.retry_count, j.trace_id, COALESCE(j.last_error_code, '') FROM notification_dlq d JOIN notification_jobs j ON j.job_id = d.job_id WHERE j.user_id = $1 ORDER BY d.failed_at DESC LIMIT $2 OFFSET $3`, userID, pageSize, offset, ) if err != nil { return nil, err } defer rows.Close() out := make([]DLQItem, 0) for rows.Next() { var item DLQItem var failedAt time.Time if err := rows.Scan(&item.JobID, &item.TemplateID, &item.ToEmails, &item.Reason, &failedAt, &item.RetryCount, &item.TraceID, &item.ErrorCode); err != nil { return nil, err } item.FailedAt = failedAt.UTC().Format(time.RFC3339) out = append(out, item) } return out, rows.Err() } func (s *Service) GetMetrics(ctx context.Context, from, to time.Time) (Metrics, error) { if from.IsZero() { from = time.Now().Add(-24 * time.Hour) } if to.IsZero() { to = time.Now() } out := Metrics{From: from.UTC().Format(time.RFC3339), To: to.UTC().Format(time.RFC3339)} _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE status = 'sent' AND sent_at BETWEEN $1 AND $2`, from, to).Scan(&out.SentCount) _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE status IN ('failed','dead') AND updated_at BETWEEN $1 AND $2`, from, to).Scan(&out.FailedCount) _ = s.pool.QueryRow(ctx, `SELECT COALESCE(avg(retry_count), 0) FROM notification_jobs WHERE created_at BETWEEN $1 AND $2`, from, to).Scan(&out.AvgRetryCount) _ = s.pool.QueryRow(ctx, `SELECT COALESCE(percentile_cont(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM (sent_at - scheduled_at))*1000), 0) FROM notification_jobs WHERE status='sent' AND sent_at BETWEEN $1 AND $2`, from, to).Scan(&out.P95LatencyMS) _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE status='pending'`).Scan(&out.PendingBacklog) _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_dlq WHERE failed_at >= now() - interval '10 minutes'`).Scan(&out.DLQInLast10Mins) total := out.SentCount + out.FailedCount if total > 0 { out.DeliveryRate = float64(out.SentCount) / float64(total) } rows, err := s.pool.Query(ctx, ` SELECT COALESCE(last_error_code,''), count(*) FROM notification_jobs WHERE status IN ('failed','dead') AND updated_at BETWEEN $1 AND $2 GROUP BY COALESCE(last_error_code,'') ORDER BY count(*) DESC LIMIT 10`, from, to, ) if err == nil { defer rows.Close() for rows.Next() { var item FailureReason if err := rows.Scan(&item.Code, &item.Count); err == nil { out.FailureReasons = append(out.FailureReasons, item) } } } return out, nil } type TaskSnapshot struct { ID int64 UserID int64 Title string Description string Status string DueAt string Priority int Tags []string NotifyGroupIDs []int64 } func (s *Service) PublishTaskEvent(ctx context.Context, eventType string, task TaskSnapshot) error { eventID := randomID("evt") traceID := randomID("trace") if eventType == "task.due_soon" { eventID = fmt.Sprintf("task-due-soon:%d:%s", task.ID, task.DueAt) traceID = eventID } if eventType == "task.overdue" { eventID = fmt.Sprintf("task-overdue:%d:%s", task.ID, task.DueAt) traceID = eventID } payload := map[string]any{ "task_id": task.ID, "title": task.Title, "description": task.Description, "status": task.Status, "due_at": task.DueAt, "priority": task.Priority, "tags": task.Tags, "notify_group_ids": task.NotifyGroupIDs, } inserted, err := s.insertEvent(ctx, eventID, eventType, task.UserID, traceID, payload) if err != nil { return err } if !inserted { return nil } return s.evaluateAndCreateJob(ctx, eventID, eventType, traceID, task.UserID, payload) } func (s *Service) PublishDailySummaryEvent(ctx context.Context, userID int64, localDate string, payload map[string]any) error { eventID := fmt.Sprintf("daily-summary:%d:%s", userID, localDate) traceID := eventID inserted, err := s.insertEvent(ctx, eventID, "task.daily_summary", userID, traceID, payload) if err != nil { return err } if !inserted { return nil } return s.evaluateAndCreateJob(ctx, eventID, "task.daily_summary", traceID, userID, payload) } func (s *Service) insertEvent(ctx context.Context, eventID, eventType string, userID int64, traceID string, payload map[string]any) (bool, error) { data, _ := json.Marshal(payload) res, err := s.pool.Exec(ctx, ` INSERT INTO notification_events (event_id, event_type, user_id, occurred_at, payload, trace_id, created_at) VALUES ($1, $2, $3, now(), $4, $5, now()) ON CONFLICT (event_id) DO NOTHING`, eventID, eventType, userID, data, traceID, ) if err != nil { return false, err } return res.RowsAffected() > 0, nil } func (s *Service) evaluateAndCreateJob(ctx context.Context, eventID, eventType, traceID string, userID int64, payload map[string]any) error { prefs, err := s.GetPrefs(ctx, userID) if err != nil { return err } if !prefs.Subscribed { return nil } templateID := mapTemplateID(eventType) if templateID == "" { return nil } locale := normalizeLocale(prefs.Locale) tz := prefs.Timezone if tz == "" { tz = "Asia/Shanghai" } var recipients []string if eventType == "task.daily_summary" { email, err := s.getUserEmail(ctx, userID) if err != nil { return nil } recipients = []string{email} } else { recipients, err = s.resolveRecipientsByPayload(ctx, userID, payload) if err != nil { return err } } recipients = normalizeEmails(recipients) if len(recipients) == 0 { return nil } now := time.Now().UTC() scheduledAt := now if eventType == "task.daily_summary" { scheduledAt = s.nextDailyTime(now, prefs) } else { scheduledAt = applyDND(now, prefs) } if scheduledAt.Before(now) { scheduledAt = now } params := map[string]any{ "event_type": eventType, "locale": locale, "timezone": tz, "payload": payload, } jobID := randomID("job") idem := idempotencyKey(eventID, templateID, recipients) paramsJSON, _ := json.Marshal(params) _, err = s.pool.Exec(ctx, ` INSERT INTO notification_jobs ( job_id, event_id, trace_id, user_id, channel, to_emails, template_id, params, idempotency_key, status, scheduled_at, available_at, retry_count, max_retry, created_at, updated_at ) VALUES ($1, $2, $3, $4, 'email', $5, $6, $7, $8, 'pending', $9, $9, 0, $10, now(), now()) ON CONFLICT (idempotency_key) DO NOTHING`, jobID, eventID, traceID, userID, recipients, templateID, paramsJSON, idem, scheduledAt, s.cfg.MaxRetry, ) if err != nil { return err } return nil } func (s *Service) resolveRecipientsByPayload(ctx context.Context, userID int64, payload map[string]any) ([]string, error) { rawIDs, ok := payload["notify_group_ids"] if !ok { return nil, nil } groupIDs := parseAnyInt64Slice(rawIDs) if len(groupIDs) == 0 { return nil, nil } rows, err := s.pool.Query(ctx, `SELECT emails FROM notification_groups WHERE user_id = $1 AND id = ANY($2)`, userID, groupIDs) if err != nil { return nil, err } defer rows.Close() emails := make([]string, 0) for rows.Next() { var list []string if err := rows.Scan(&list); err != nil { return nil, err } emails = append(emails, list...) } return normalizeEmails(emails), rows.Err() } func (s *Service) getUserEmail(ctx context.Context, userID int64) (string, error) { var email string err := s.pool.QueryRow(ctx, `SELECT email FROM users WHERE id = $1`, userID).Scan(&email) if err != nil { return "", err } return strings.TrimSpace(strings.ToLower(email)), nil } func (s *Service) ensureUserPrefs(ctx context.Context, userID int64) error { _, err := s.pool.Exec(ctx, ` INSERT INTO user_notification_prefs ( user_id, subscribed, locale, timezone, daily_summary_enabled, daily_summary_time, updated_at ) VALUES ($1, true, 'zh', 'Asia/Shanghai', true, '09:30:00', now()) ON CONFLICT (user_id) DO NOTHING`, userID, ) return err } func (s *Service) runDueScheduler(ctx context.Context) { ticker := time.NewTicker(s.cfg.SchedulerTick) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.scheduleDueEvents(ctx) } } } func (s *Service) scheduleDueEvents(ctx context.Context) { rows, err := s.pool.Query(ctx, ` SELECT id, user_id, title, description, status, due_at, priority, tags, notify_group_ids FROM tasks WHERE status <> 'done' AND due_at IS NOT NULL AND due_at <> ''`, ) if err != nil { log.Printf("notification due scheduler query failed: %v", err) return } defer rows.Close() now := time.Now().UTC() for rows.Next() { var task TaskSnapshot if err := rows.Scan(&task.ID, &task.UserID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &task.Tags, &task.NotifyGroupIDs); err != nil { continue } dueAt, err := time.Parse(time.RFC3339, task.DueAt) if err != nil { continue } if dueAt.After(now) && dueAt.Before(now.Add(s.cfg.DueSoonWindow)) { if s.hasExistingTaskEvent(ctx, task.ID, "task.due_soon", task.DueAt) { continue } _ = s.PublishTaskEvent(ctx, "task.due_soon", task) } if dueAt.Before(now) { if s.hasExistingTaskEvent(ctx, task.ID, "task.overdue", task.DueAt) { continue } _ = s.PublishTaskEvent(ctx, "task.overdue", task) } } } func (s *Service) hasExistingTaskEvent(ctx context.Context, taskID int64, eventType, dueAt string) bool { var exists bool err := s.pool.QueryRow(ctx, ` SELECT EXISTS ( SELECT 1 FROM notification_events WHERE event_type = $1 AND payload->>'task_id' = $2 AND payload->>'due_at' = $3 )`, eventType, strconv.FormatInt(taskID, 10), dueAt, ).Scan(&exists) if err != nil { return false } return exists } func (s *Service) runDailySummaryScheduler(ctx context.Context) { ticker := time.NewTicker(s.cfg.SchedulerTick) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.scheduleDailySummary(ctx) } } } func (s *Service) scheduleDailySummary(ctx context.Context) { rows, err := s.pool.Query(ctx, ` SELECT user_id, COALESCE(timezone, 'Asia/Shanghai'), COALESCE(locale, 'zh'), COALESCE(to_char(daily_summary_time, 'HH24:MI'), '09:30'), subscribed, daily_summary_enabled, COALESCE(to_char(dnd_start, 'HH24:MI'), ''), COALESCE(to_char(dnd_end, 'HH24:MI'), '') FROM user_notification_prefs WHERE daily_summary_enabled = true`, ) if err != nil { log.Printf("daily summary query failed: %v", err) return } defer rows.Close() nowUTC := time.Now().UTC() for rows.Next() { var userID int64 var timezone, locale, summaryAt, dndStart, dndEnd string var subscribed, enabled bool if err := rows.Scan(&userID, &timezone, &locale, &summaryAt, &subscribed, &enabled, &dndStart, &dndEnd); err != nil { continue } if !subscribed || !enabled { continue } loc, err := time.LoadLocation(timezone) if err != nil { loc = time.FixedZone("UTC", 0) } localNow := nowUTC.In(loc) if localNow.Format("15:04") != summaryAt { continue } openCount, overdueCount := s.summaryCounts(ctx, userID, nowUTC) payload := map[string]any{ "open_count": openCount, "overdue_count": overdueCount, "summary_date": localNow.Format("2006-01-02"), "timezone": timezone, "locale": locale, "dnd_start": dndStart, "dnd_end": dndEnd, } _ = s.PublishDailySummaryEvent(ctx, userID, localNow.Format("2006-01-02"), payload) } } func (s *Service) summaryCounts(ctx context.Context, userID int64, now time.Time) (int64, int64) { var openCount int64 _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM tasks WHERE user_id = $1 AND status <> 'done'`, userID).Scan(&openCount) var overdueCount int64 _ = s.pool.QueryRow(ctx, ` SELECT count(*) FROM tasks WHERE user_id = $1 AND status <> 'done' AND due_at IS NOT NULL AND due_at <> '' AND due_at::timestamptz < $2`, userID, now, ).Scan(&overdueCount) return openCount, overdueCount } func (s *Service) runJobDispatcher(ctx context.Context) { if s.writer == nil { log.Println("notification dispatcher disabled: kafka writer not configured") return } ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.dispatchPendingJobs(ctx) } } } type jobRecord struct { JobID string TraceID string Channel string ToEmails []string TemplateID string Params map[string]any RetryCount int IdempotencyKey string ScheduledAt time.Time } func (s *Service) dispatchPendingJobs(ctx context.Context) { jobs, err := s.reservePendingJobs(ctx, s.cfg.DispatchBatch, s.cfg.DispatchLease) if err != nil || len(jobs) == 0 { return } messages := make([]kafka.Message, 0, len(jobs)) for _, job := range jobs { data, _ := json.Marshal(map[string]any{ "job_id": job.JobID, "trace_id": job.TraceID, "channel": job.Channel, "to": job.ToEmails, "template_id": job.TemplateID, "params": job.Params, "retry_count": job.RetryCount, "idempotency_key": job.IdempotencyKey, "scheduled_at": job.ScheduledAt.UTC().Format(time.RFC3339), }) messages = append(messages, kafka.Message{Key: []byte(job.JobID), Value: data}) } writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := s.writer.WriteMessages(writeCtx, messages...); err != nil { _ = s.resetReservedJobs(ctx, jobs) log.Printf("notification dispatcher publish failed: %v", err) } } func (s *Service) reservePendingJobs(ctx context.Context, batch int, lease time.Duration) ([]jobRecord, error) { tx, err := s.pool.Begin(ctx) if err != nil { return nil, err } defer tx.Rollback(ctx) rows, err := tx.Query(ctx, ` WITH picked AS ( SELECT job_id FROM notification_jobs WHERE status = 'pending' AND available_at <= now() ORDER BY available_at ASC LIMIT $1 FOR UPDATE SKIP LOCKED ) UPDATE notification_jobs nj SET available_at = now() + $2::interval, updated_at = now() FROM picked WHERE nj.job_id = picked.job_id RETURNING nj.job_id, nj.trace_id, nj.channel, nj.to_emails, nj.template_id, nj.params, nj.retry_count, nj.idempotency_key, nj.scheduled_at`, batch, fmt.Sprintf("%d seconds", int(lease.Seconds())), ) if err != nil { return nil, err } defer rows.Close() jobs := make([]jobRecord, 0) for rows.Next() { var item jobRecord var paramsBytes []byte if err := rows.Scan(&item.JobID, &item.TraceID, &item.Channel, &item.ToEmails, &item.TemplateID, ¶msBytes, &item.RetryCount, &item.IdempotencyKey, &item.ScheduledAt); err != nil { return nil, err } _ = json.Unmarshal(paramsBytes, &item.Params) jobs = append(jobs, item) } if err := rows.Err(); err != nil { return nil, err } if err := tx.Commit(ctx); err != nil { return nil, err } return jobs, nil } func (s *Service) resetReservedJobs(ctx context.Context, jobs []jobRecord) error { ids := make([]string, 0, len(jobs)) for _, job := range jobs { ids = append(ids, job.JobID) } if len(ids) == 0 { return nil } _, err := s.pool.Exec(ctx, `UPDATE notification_jobs SET available_at = now(), updated_at = now() WHERE job_id = ANY($1)`, ids) return err } func (s *Service) runWorker(ctx context.Context) { if s.reader == nil { log.Println("notification worker disabled: kafka reader not configured") return } for { msg, err := s.reader.FetchMessage(ctx) if err != nil { if ctx.Err() != nil { return } log.Printf("notification worker fetch failed: %v", err) continue } if err := s.handleMessage(ctx, msg); err != nil { log.Printf("notification worker handle failed: %v", err) continue } if err := s.reader.CommitMessages(ctx, msg); err != nil { log.Printf("notification worker commit failed: %v", err) } } } func (s *Service) handleMessage(ctx context.Context, msg kafka.Message) error { var payload struct { JobID string `json:"job_id"` } if err := json.Unmarshal(msg.Value, &payload); err != nil { return nil } if payload.JobID == "" { return nil } job, err := s.loadJob(ctx, payload.JobID) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return nil } return err } if job.Status == statusSent || job.Status == statusDead { return nil } if s.isDuplicateSent(ctx, job.IdempotencyKey, job.JobID) { _, _ = s.pool.Exec(ctx, `UPDATE notification_jobs SET status='sent', sent_at=now(), updated_at=now() WHERE job_id = $1`, job.JobID) return nil } start := time.Now() subject, body, err := s.renderTemplate(job.TemplateID, job.Params) if err != nil { s.failJob(ctx, job, "TEMPLATE_RENDER", err.Error(), false) return nil } sendErr := s.email.Send(job.ToEmails, subject, body) latency := int(time.Since(start).Milliseconds()) if sendErr == nil { _, _ = s.pool.Exec(ctx, `UPDATE notification_jobs SET status='sent', sent_at=now(), updated_at=now(), last_error_code=NULL, last_error_message=NULL WHERE job_id = $1`, job.JobID) _, _ = s.pool.Exec(ctx, `INSERT INTO notification_attempts (job_id, attempt_no, success, latency_ms, created_at) VALUES ($1, $2, true, $3, now())`, job.JobID, job.RetryCount+1, latency) log.Printf("notification sent trace_id=%s job_id=%s status=sent latency_ms=%d", job.TraceID, job.JobID, latency) return nil } retryable := isRetryableError(sendErr.Code) s.failJob(ctx, job, sendErr.Code, sendErr.Error(), retryable) _, _ = s.pool.Exec(ctx, `INSERT INTO notification_attempts (job_id, attempt_no, success, error_code, error_message, latency_ms, created_at) VALUES ($1, $2, false, $3, $4, $5, now())`, job.JobID, job.RetryCount+1, sendErr.Code, truncate(sendErr.Error(), 400), latency) log.Printf("notification failed trace_id=%s job_id=%s status=%s error_code=%s latency_ms=%d", job.TraceID, job.JobID, statusFailed, sendErr.Code, latency) return nil } type jobRow struct { JobID string Status string TraceID string ToEmails []string TemplateID string Params map[string]any RetryCount int MaxRetry int IdempotencyKey string } func (s *Service) loadJob(ctx context.Context, jobID string) (jobRow, error) { var out jobRow var paramsBytes []byte err := s.pool.QueryRow(ctx, ` SELECT job_id, status, trace_id, to_emails, template_id, params, retry_count, max_retry, idempotency_key FROM notification_jobs WHERE job_id = $1`, jobID, ).Scan(&out.JobID, &out.Status, &out.TraceID, &out.ToEmails, &out.TemplateID, ¶msBytes, &out.RetryCount, &out.MaxRetry, &out.IdempotencyKey) if err != nil { return jobRow{}, err } _ = json.Unmarshal(paramsBytes, &out.Params) return out, nil } func (s *Service) isDuplicateSent(ctx context.Context, idempotencyKey, jobID string) bool { var count int _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE idempotency_key = $1 AND status = 'sent' AND job_id <> $2`, idempotencyKey, jobID).Scan(&count) return count > 0 } func (s *Service) failJob(ctx context.Context, job jobRow, code, message string, retryable bool) { if retryable && job.RetryCount < job.MaxRetry { nextRetry := job.RetryCount + 1 backoff := s.retryDelay(nextRetry) _, _ = s.pool.Exec(ctx, ` UPDATE notification_jobs SET status = 'pending', retry_count = $2, available_at = now() + $3::interval, last_error_code = $4, last_error_message = $5, updated_at = now() WHERE job_id = $1`, job.JobID, nextRetry, fmt.Sprintf("%d seconds", int(backoff.Seconds())), code, truncate(message, 400), ) return } _, _ = s.pool.Exec(ctx, ` UPDATE notification_jobs SET status='dead', last_error_code=$2, last_error_message=$3, updated_at=now() WHERE job_id=$1`, job.JobID, code, truncate(message, 400), ) snapshot, _ := json.Marshal(job) _, _ = s.pool.Exec(ctx, ` INSERT INTO notification_dlq (job_id, reason, failed_at, snapshot) VALUES ($1, $2, now(), $3) ON CONFLICT (job_id) DO UPDATE SET reason = excluded.reason, failed_at = excluded.failed_at, snapshot = excluded.snapshot`, job.JobID, fmt.Sprintf("%s: %s", code, truncate(message, 200)), snapshot, ) } func (s *Service) retryDelay(retryCount int) time.Duration { x := math.Pow(2, float64(retryCount)) d := time.Duration(x) * s.cfg.BackoffBase maxBackoff := 30 * time.Minute if d > maxBackoff { return maxBackoff } return d } func (s *Service) runAlertChecker(ctx context.Context) { ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: from := time.Now().Add(-10 * time.Minute) metrics, err := s.GetMetrics(ctx, from, time.Now()) if err != nil { continue } failureRate := 0.0 total := metrics.SentCount + metrics.FailedCount if total > 0 { failureRate = float64(metrics.FailedCount) / float64(total) } if failureRate > s.cfg.AlertFailureThreshold { log.Printf("[ALERT] notification failure rate spike: %.2f", failureRate) } if metrics.PendingBacklog > int64(s.cfg.AlertPendingThreshold) { log.Printf("[ALERT] notification pending backlog: %d", metrics.PendingBacklog) } if metrics.DLQInLast10Mins > int64(s.cfg.AlertDLQThreshold) { log.Printf("[ALERT] notification DLQ growth: %d in last 10m", metrics.DLQInLast10Mins) } } } } func (s *Service) renderTemplate(templateID string, params map[string]any) (string, string, error) { locale := normalizeLocale(anyString(params["locale"])) payload, _ := params["payload"].(map[string]any) if payload == nil { payload = map[string]any{} } var subjectTpl, bodyTpl string switch templateID { case "task_created_notice": subjectTpl = mapLocale(locale, "新任务创建:{{.title}}", "Task created: {{.title}}") bodyTpl = mapLocale(locale, "任务 {{.title}} 已创建,优先级 P{{.priority}},截止 {{.due_at}}", "Task {{.title}} created, priority P{{.priority}}, due {{.due_at}}") case "task_updated_notice": subjectTpl = mapLocale(locale, "任务更新:{{.title}}", "Task updated: {{.title}}") bodyTpl = mapLocale(locale, "任务 {{.title}} 已更新,当前状态 {{.status}}", "Task {{.title}} updated, current status {{.status}}") case "task_status_changed_notice": subjectTpl = mapLocale(locale, "任务状态变更:{{.title}}", "Task status changed: {{.title}}") bodyTpl = mapLocale(locale, "任务 {{.title}} 状态已变更为 {{.status}}", "Task {{.title}} status changed to {{.status}}") case "task_due_soon": subjectTpl = mapLocale(locale, "任务即将到期:{{.title}}", "Task due soon: {{.title}}") bodyTpl = mapLocale(locale, "任务 {{.title}} 将在 {{.due_at}} 到期", "Task {{.title}} is due at {{.due_at}}") case "task_overdue": subjectTpl = mapLocale(locale, "任务已逾期:{{.title}}", "Task overdue: {{.title}}") bodyTpl = mapLocale(locale, "任务 {{.title}} 已逾期,截止时间 {{.due_at}}", "Task {{.title}} is overdue, due at {{.due_at}}") case "daily_summary": subjectTpl = mapLocale(locale, "每日摘要({{.summary_date}})", "Daily summary ({{.summary_date}})") bodyTpl = mapLocale(locale, "未完成任务 {{.open_count}},逾期任务 {{.overdue_count}}", "Open tasks {{.open_count}}, overdue tasks {{.overdue_count}}") default: return "", "", fmt.Errorf("template not found") } render := func(tpl string) (string, error) { t, err := template.New("msg").Parse(tpl) if err != nil { return "", err } var b strings.Builder if err := t.Execute(&b, payload); err != nil { return "", err } return b.String(), nil } subject, err := render(subjectTpl) if err != nil { return "", "", err } body, err := render(bodyTpl) if err != nil { return "", "", err } return subject, body, nil } func mapTemplateID(eventType string) string { switch eventType { case "task.created": return "task_created_notice" case "task.updated": return "task_updated_notice" case "task.status_changed": return "task_status_changed_notice" case "task.due_soon": return "task_due_soon" case "task.overdue": return "task_overdue" case "task.daily_summary": return "daily_summary" default: return "" } } func applyDND(now time.Time, prefs UserNotificationPrefs) time.Time { if prefs.DNDStart == "" || prefs.DNDEnd == "" { return now } loc, err := time.LoadLocation(prefs.Timezone) if err != nil { loc = time.UTC } localNow := now.In(loc) start, ok1 := parseClock(localNow, prefs.DNDStart) end, ok2 := parseClock(localNow, prefs.DNDEnd) if !ok1 || !ok2 { return now } if start.Equal(end) { return now } if end.After(start) { if !localNow.Before(start) && localNow.Before(end) { return end.UTC() } return now } if !localNow.Before(start) { return end.Add(24 * time.Hour).UTC() } if localNow.Before(end) { return end.UTC() } return now } func (s *Service) nextDailyTime(now time.Time, prefs UserNotificationPrefs) time.Time { loc, err := time.LoadLocation(prefs.Timezone) if err != nil { loc = time.UTC } localNow := now.In(loc) target, ok := parseClock(localNow, prefs.DailySummaryTime) if !ok { target = time.Date(localNow.Year(), localNow.Month(), localNow.Day(), 9, 30, 0, 0, loc) } if localNow.After(target) { target = target.Add(24 * time.Hour) } shifted := applyDND(target.UTC(), prefs) return shifted } func parseClock(base time.Time, hhmm string) (time.Time, bool) { parts := strings.Split(hhmm, ":") if len(parts) != 2 { return time.Time{}, false } h, err1 := strconv.Atoi(parts[0]) m, err2 := strconv.Atoi(parts[1]) if err1 != nil || err2 != nil || h < 0 || h > 23 || m < 0 || m > 59 { return time.Time{}, false } return time.Date(base.Year(), base.Month(), base.Day(), h, m, 0, 0, base.Location()), true } func isValidHHMM(v string) bool { _, ok := parseClock(time.Now(), v) return ok } func normalizeLocale(v string) string { v = strings.TrimSpace(strings.ToLower(v)) if v != "en" { return "zh" } return v } func normalizeEmails(input []string) []string { m := map[string]struct{}{} out := make([]string, 0, len(input)) for _, e := range input { e = strings.TrimSpace(strings.ToLower(e)) if e == "" { continue } if _, err := mail.ParseAddress(e); err != nil { continue } if _, ok := m[e]; ok { continue } m[e] = struct{}{} out = append(out, e) } sort.Strings(out) return out } func parseAnyInt64Slice(v any) []int64 { out := make([]int64, 0) switch items := v.(type) { case []any: for _, item := range items { switch n := item.(type) { case float64: out = append(out, int64(n)) case int64: out = append(out, n) case int: out = append(out, int64(n)) } } case []int64: out = append(out, items...) } return dedupeInt64(out) } func dedupeInt64(in []int64) []int64 { if len(in) == 0 { return in } m := make(map[int64]struct{}, len(in)) out := make([]int64, 0, len(in)) for _, id := range in { if id <= 0 { continue } if _, ok := m[id]; ok { continue } m[id] = struct{}{} out = append(out, id) } sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) return out } func idempotencyKey(eventID, templateID string, recipients []string) string { h := sha1.New() _, _ = h.Write([]byte(eventID)) _, _ = h.Write([]byte("|" + templateID + "|" + strings.Join(recipients, ","))) return hex.EncodeToString(h.Sum(nil)) } func randomID(prefix string) string { buf := make([]byte, 12) if _, err := rand.Read(buf); err != nil { n := time.Now().UnixNano() return fmt.Sprintf("%s-%d", prefix, n) } return fmt.Sprintf("%s-%s", prefix, hex.EncodeToString(buf)) } func isRetryableError(code string) bool { switch code { case "SMTP_TIMEOUT", "SMTP_RATE_LIMIT", "SMTP_TEMP_NETWORK", "SMTP_4XX", "SMTP_SEND_FAILED": return true default: return false } } func truncate(v string, n int) string { if len(v) <= n { return v } return v[:n] } func mapLocale(locale, zh, en string) string { if locale == "en" { return en } return zh } func anyString(v any) string { if s, ok := v.(string); ok { return s } return "" } type SendError struct { Code string Err error } func (e *SendError) Error() string { if e == nil || e.Err == nil { return "" } return e.Err.Error() } type EmailAdapter struct { cfg Config } func NewEmailAdapter(cfg Config) *EmailAdapter { return &EmailAdapter{cfg: cfg} } func (a *EmailAdapter) Send(to []string, subject, body string) *SendError { if len(to) == 0 { return &SendError{Code: "SMTP_INVALID_ADDRESS", Err: fmt.Errorf("empty recipient")} } for _, item := range to { if _, err := mail.ParseAddress(item); err != nil { return &SendError{Code: "SMTP_INVALID_ADDRESS", Err: fmt.Errorf("invalid recipient %q", item)} } } if a.cfg.SMTPHost == "" || a.cfg.SMTPUser == "" || a.cfg.SMTPPass == "" || a.cfg.SMTPFrom == "" { return &SendError{Code: "SMTP_CONFIG", Err: fmt.Errorf("smtp config incomplete")} } addr := net.JoinHostPort(a.cfg.SMTPHost, a.cfg.SMTPPort) msg := buildMail(a.cfg.SMTPFrom, to, subject, body) conn, err := net.DialTimeout("tcp", addr, 8*time.Second) if err != nil { return &SendError{Code: "SMTP_TEMP_NETWORK", Err: err} } defer conn.Close() if a.cfg.SMTPUseTLS { tlsConn := tls.Client(conn, &tls.Config{ServerName: a.cfg.SMTPHost}) if err := tlsConn.Handshake(); err != nil { return &SendError{Code: "SMTP_TIMEOUT", Err: err} } conn = tlsConn } client, err := smtp.NewClient(conn, a.cfg.SMTPHost) if err != nil { return &SendError{Code: "SMTP_SEND_FAILED", Err: err} } defer client.Close() if err := client.Auth(smtp.PlainAuth("", a.cfg.SMTPUser, a.cfg.SMTPPass, a.cfg.SMTPHost)); err != nil { return &SendError{Code: "SMTP_SEND_FAILED", Err: err} } if err := client.Mail(a.cfg.SMTPFrom); err != nil { return classifySMTPError(err) } for _, recipient := range to { if err := client.Rcpt(recipient); err != nil { return classifySMTPError(err) } } writer, err := client.Data() if err != nil { return classifySMTPError(err) } if _, err := writer.Write([]byte(msg)); err != nil { _ = writer.Close() return classifySMTPError(err) } if err := writer.Close(); err != nil { return classifySMTPError(err) } if err := client.Quit(); err != nil { return classifySMTPError(err) } return nil } func buildMail(from string, to []string, subject, body string) string { headers := []string{ "From: " + from, "To: " + strings.Join(to, ", "), "Subject: " + mimeEncodeHeader(subject), "MIME-Version: 1.0", "Content-Type: text/plain; charset=UTF-8", "", body, } return strings.Join(headers, "\r\n") } func mimeEncodeHeader(v string) string { if isASCII(v) { return v } return "=?UTF-8?B?" + b64Encode(v) + "?=" } func isASCII(v string) bool { for i := 0; i < len(v); i++ { if v[i] > 127 { return false } } return true } func b64Encode(v string) string { return strings.TrimRight(base64Encoding.EncodeToString([]byte(v)), "=") } var base64Encoding = base64NoPadding{} type base64NoPadding struct{} func (base64NoPadding) EncodeToString(src []byte) string { const table = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" if len(src) == 0 { return "" } var out strings.Builder for i := 0; i < len(src); i += 3 { var b0, b1, b2 byte b0 = src[i] if i+1 < len(src) { b1 = src[i+1] } if i+2 < len(src) { b2 = src[i+2] } out.WriteByte(table[b0>>2]) out.WriteByte(table[((b0&0x03)<<4)|(b1>>4)]) if i+1 < len(src) { out.WriteByte(table[((b1&0x0f)<<2)|(b2>>6)]) } if i+2 < len(src) { out.WriteByte(table[b2&0x3f]) } } return out.String() } func classifySMTPError(err error) *SendError { if err == nil { return nil } msg := strings.ToLower(err.Error()) if strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout") { return &SendError{Code: "SMTP_TIMEOUT", Err: err} } if strings.Contains(msg, " 4") || strings.HasPrefix(msg, "4") { return &SendError{Code: "SMTP_4XX", Err: err} } if strings.Contains(msg, "rate") || strings.Contains(msg, "limit") { return &SendError{Code: "SMTP_RATE_LIMIT", Err: err} } if strings.Contains(msg, "invalid") || strings.Contains(msg, "mailbox unavailable") || strings.Contains(msg, "user unknown") { return &SendError{Code: "SMTP_INVALID_ADDRESS", Err: err} } var netErr net.Error if errors.As(err, &netErr) { return &SendError{Code: "SMTP_TEMP_NETWORK", Err: err} } return &SendError{Code: "SMTP_SEND_FAILED", Err: err} } func splitCSV(value string) []string { value = strings.TrimSpace(value) if value == "" { return nil } items := strings.Split(value, ",") out := make([]string, 0, len(items)) for _, item := range items { item = strings.TrimSpace(item) if item != "" { out = append(out, item) } } return out } func parseBoolEnv(name string, fallback bool) bool { v := strings.TrimSpace(os.Getenv(name)) if v == "" { return fallback } b, err := strconv.ParseBool(v) if err != nil { return fallback } return b } func parseIntEnv(name string, fallback int) int { v := strings.TrimSpace(os.Getenv(name)) if v == "" { return fallback } n, err := strconv.Atoi(v) if err != nil { return fallback } return n } func getenv(name, fallback string) string { v := strings.TrimSpace(os.Getenv(name)) if v == "" { return fallback } return v } func parseRangeTime(fromStr, toStr string) (time.Time, time.Time, error) { var from, to time.Time var err error if strings.TrimSpace(fromStr) != "" { from, err = time.Parse(time.RFC3339, fromStr) if err != nil { return time.Time{}, time.Time{}, err } } if strings.TrimSpace(toStr) != "" { to, err = time.Parse(time.RFC3339, toStr) if err != nil { return time.Time{}, time.Time{}, err } } return from, to, nil } func NullStringFromString(v string) sql.NullString { v = strings.TrimSpace(v) if v == "" { return sql.NullString{} } return sql.NullString{String: v, Valid: true} }