1595 lines
44 KiB
Go
1595 lines
44 KiB
Go
package notification
|
||
|
||
import (
|
||
"context"
|
||
"crypto/rand"
|
||
"crypto/sha1"
|
||
"crypto/tls"
|
||
"database/sql"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"log"
|
||
"math"
|
||
"net"
|
||
"net/mail"
|
||
"net/smtp"
|
||
"os"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"text/template"
|
||
"time"
|
||
|
||
"github.com/jackc/pgx/v5"
|
||
"github.com/jackc/pgx/v5/pgxpool"
|
||
"github.com/segmentio/kafka-go"
|
||
)
|
||
|
||
const (
|
||
statusPending = "pending"
|
||
statusSent = "sent"
|
||
statusFailed = "failed"
|
||
statusDead = "dead"
|
||
)
|
||
|
||
type Config struct {
|
||
Enabled bool
|
||
KafkaBrokers []string
|
||
NotifyTopic string
|
||
DispatchBatch int
|
||
SchedulerTick time.Duration
|
||
MaxRetry int
|
||
BackoffBase time.Duration
|
||
DispatchLease time.Duration
|
||
DueSoonWindow time.Duration
|
||
OverdueThrottle time.Duration
|
||
AlertFailureThreshold float64
|
||
AlertPendingThreshold int
|
||
AlertDLQThreshold int
|
||
ConsumerGroup string
|
||
SMTPHost string
|
||
SMTPPort string
|
||
SMTPUser string
|
||
SMTPPass string
|
||
SMTPFrom string
|
||
SMTPUseTLS bool
|
||
DailySummaryDefaultLoc string
|
||
}
|
||
|
||
func LoadConfigFromEnv() Config {
|
||
cfg := Config{
|
||
Enabled: parseBoolEnv("NOTIFY_ENABLED", true),
|
||
KafkaBrokers: splitCSV(os.Getenv("KAFKA_BROKERS")),
|
||
NotifyTopic: getenv("NOTIFY_TOPIC", "notification.jobs"),
|
||
DispatchBatch: parseIntEnv("NOTIFY_DISPATCH_BATCH", 200),
|
||
SchedulerTick: time.Duration(parseIntEnv("NOTIFY_SCHED_TICK_SECONDS", 60)) * time.Second,
|
||
MaxRetry: parseIntEnv("NOTIFY_MAX_RETRY", 5),
|
||
BackoffBase: time.Duration(parseIntEnv("NOTIFY_BACKOFF_BASE_SECONDS", 30)) * time.Second,
|
||
DispatchLease: 10 * time.Second,
|
||
DueSoonWindow: 60 * time.Minute,
|
||
OverdueThrottle: 30 * time.Minute,
|
||
AlertFailureThreshold: 0.2,
|
||
AlertPendingThreshold: 1000,
|
||
AlertDLQThreshold: 50,
|
||
ConsumerGroup: getenv("NOTIFY_CONSUMER_GROUP", "supertodo-notification-worker"),
|
||
SMTPHost: strings.TrimSpace(os.Getenv("SMTP_HOST")),
|
||
SMTPPort: getenv("SMTP_PORT", "465"),
|
||
SMTPUser: strings.TrimSpace(os.Getenv("SMTP_USER")),
|
||
SMTPPass: strings.TrimSpace(os.Getenv("SMTP_PASS")),
|
||
SMTPFrom: strings.TrimSpace(os.Getenv("SMTP_FROM")),
|
||
SMTPUseTLS: parseBoolEnv("SMTP_USE_TLS", true),
|
||
DailySummaryDefaultLoc: getenv("NOTIFY_DEFAULT_LOCALE", "zh"),
|
||
}
|
||
if cfg.SMTPFrom == "" {
|
||
cfg.SMTPFrom = cfg.SMTPUser
|
||
}
|
||
if cfg.DispatchBatch <= 0 {
|
||
cfg.DispatchBatch = 200
|
||
}
|
||
if cfg.SchedulerTick <= 0 {
|
||
cfg.SchedulerTick = time.Minute
|
||
}
|
||
if cfg.MaxRetry <= 0 {
|
||
cfg.MaxRetry = 5
|
||
}
|
||
if cfg.BackoffBase <= 0 {
|
||
cfg.BackoffBase = 30 * time.Second
|
||
}
|
||
return cfg
|
||
}
|
||
|
||
type Service struct {
|
||
pool *pgxpool.Pool
|
||
cfg Config
|
||
writer *kafka.Writer
|
||
reader *kafka.Reader
|
||
email *EmailAdapter
|
||
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
func NewService(pool *pgxpool.Pool, cfg Config) *Service {
|
||
s := &Service{pool: pool, cfg: cfg}
|
||
if cfg.Enabled && len(cfg.KafkaBrokers) > 0 {
|
||
s.writer = &kafka.Writer{
|
||
Addr: kafka.TCP(cfg.KafkaBrokers...),
|
||
Topic: cfg.NotifyTopic,
|
||
Balancer: &kafka.LeastBytes{},
|
||
}
|
||
s.reader = kafka.NewReader(kafka.ReaderConfig{
|
||
Brokers: cfg.KafkaBrokers,
|
||
Topic: cfg.NotifyTopic,
|
||
GroupID: cfg.ConsumerGroup,
|
||
MinBytes: 1,
|
||
MaxBytes: 10e6,
|
||
})
|
||
}
|
||
s.email = NewEmailAdapter(cfg)
|
||
return s
|
||
}
|
||
|
||
func (s *Service) Close() {
|
||
if s.writer != nil {
|
||
_ = s.writer.Close()
|
||
}
|
||
if s.reader != nil {
|
||
_ = s.reader.Close()
|
||
}
|
||
s.wg.Wait()
|
||
}
|
||
|
||
func (s *Service) Start(ctx context.Context) {
|
||
if !s.cfg.Enabled {
|
||
log.Println("notification service disabled")
|
||
return
|
||
}
|
||
|
||
s.wg.Add(1)
|
||
go func() {
|
||
defer s.wg.Done()
|
||
s.runDueScheduler(ctx)
|
||
}()
|
||
|
||
s.wg.Add(1)
|
||
go func() {
|
||
defer s.wg.Done()
|
||
s.runDailySummaryScheduler(ctx)
|
||
}()
|
||
|
||
s.wg.Add(1)
|
||
go func() {
|
||
defer s.wg.Done()
|
||
s.runJobDispatcher(ctx)
|
||
}()
|
||
|
||
s.wg.Add(1)
|
||
go func() {
|
||
defer s.wg.Done()
|
||
s.runWorker(ctx)
|
||
}()
|
||
|
||
s.wg.Add(1)
|
||
go func() {
|
||
defer s.wg.Done()
|
||
s.runAlertChecker(ctx)
|
||
}()
|
||
}
|
||
|
||
type UserNotificationPrefs struct {
|
||
Subscribed bool `json:"subscribed"`
|
||
DNDStart string `json:"dnd_start"`
|
||
DNDEnd string `json:"dnd_end"`
|
||
Locale string `json:"locale"`
|
||
Timezone string `json:"timezone"`
|
||
DailySummaryEnabled bool `json:"daily_summary_enabled"`
|
||
DailySummaryTime string `json:"daily_summary_time"`
|
||
}
|
||
|
||
type NotificationGroup struct {
|
||
ID int64 `json:"id"`
|
||
Name string `json:"name"`
|
||
Emails []string `json:"emails"`
|
||
}
|
||
|
||
type DLQItem struct {
|
||
JobID string `json:"job_id"`
|
||
TemplateID string `json:"template_id"`
|
||
ToEmails []string `json:"to_emails"`
|
||
Reason string `json:"reason"`
|
||
FailedAt string `json:"failed_at"`
|
||
RetryCount int `json:"retry_count"`
|
||
TraceID string `json:"trace_id"`
|
||
ErrorCode string `json:"error_code"`
|
||
}
|
||
|
||
type FailureReason struct {
|
||
Code string `json:"code"`
|
||
Count int64 `json:"count"`
|
||
}
|
||
|
||
type Metrics struct {
|
||
From string `json:"from"`
|
||
To string `json:"to"`
|
||
SentCount int64 `json:"sent_count"`
|
||
FailedCount int64 `json:"failed_count"`
|
||
DeliveryRate float64 `json:"delivery_rate"`
|
||
AvgRetryCount float64 `json:"avg_retry_count"`
|
||
P95LatencyMS float64 `json:"p95_latency_ms"`
|
||
PendingBacklog int64 `json:"pending_backlog"`
|
||
FailureReasons []FailureReason `json:"failure_reasons"`
|
||
DLQInLast10Mins int64 `json:"dlq_last_10m"`
|
||
}
|
||
|
||
func (s *Service) GetPrefs(ctx context.Context, userID int64) (UserNotificationPrefs, error) {
|
||
if err := s.ensureUserPrefs(ctx, userID); err != nil {
|
||
return UserNotificationPrefs{}, err
|
||
}
|
||
var prefs UserNotificationPrefs
|
||
err := s.pool.QueryRow(ctx, `
|
||
SELECT subscribed,
|
||
COALESCE(to_char(dnd_start, 'HH24:MI'), ''),
|
||
COALESCE(to_char(dnd_end, 'HH24:MI'), ''),
|
||
locale,
|
||
timezone,
|
||
daily_summary_enabled,
|
||
COALESCE(to_char(daily_summary_time, 'HH24:MI'), '09:30')
|
||
FROM user_notification_prefs
|
||
WHERE user_id = $1`, userID,
|
||
).Scan(
|
||
&prefs.Subscribed,
|
||
&prefs.DNDStart,
|
||
&prefs.DNDEnd,
|
||
&prefs.Locale,
|
||
&prefs.Timezone,
|
||
&prefs.DailySummaryEnabled,
|
||
&prefs.DailySummaryTime,
|
||
)
|
||
if err != nil {
|
||
return UserNotificationPrefs{}, err
|
||
}
|
||
if prefs.Locale == "" {
|
||
prefs.Locale = "zh"
|
||
}
|
||
if prefs.Timezone == "" {
|
||
prefs.Timezone = "Asia/Shanghai"
|
||
}
|
||
if prefs.DailySummaryTime == "" {
|
||
prefs.DailySummaryTime = "09:30"
|
||
}
|
||
return prefs, nil
|
||
}
|
||
|
||
func (s *Service) UpdatePrefs(ctx context.Context, userID int64, input UserNotificationPrefs) (UserNotificationPrefs, error) {
|
||
if err := s.ensureUserPrefs(ctx, userID); err != nil {
|
||
return UserNotificationPrefs{}, err
|
||
}
|
||
input.Locale = normalizeLocale(input.Locale)
|
||
input.Timezone = strings.TrimSpace(input.Timezone)
|
||
if input.Timezone == "" {
|
||
input.Timezone = "Asia/Shanghai"
|
||
}
|
||
if _, err := time.LoadLocation(input.Timezone); err != nil {
|
||
return UserNotificationPrefs{}, fmt.Errorf("invalid timezone")
|
||
}
|
||
if input.DailySummaryTime == "" {
|
||
input.DailySummaryTime = "09:30"
|
||
}
|
||
if !isValidHHMM(input.DailySummaryTime) {
|
||
return UserNotificationPrefs{}, fmt.Errorf("invalid daily_summary_time")
|
||
}
|
||
if input.DNDStart != "" && !isValidHHMM(input.DNDStart) {
|
||
return UserNotificationPrefs{}, fmt.Errorf("invalid dnd_start")
|
||
}
|
||
if input.DNDEnd != "" && !isValidHHMM(input.DNDEnd) {
|
||
return UserNotificationPrefs{}, fmt.Errorf("invalid dnd_end")
|
||
}
|
||
|
||
_, err := s.pool.Exec(ctx, `
|
||
UPDATE user_notification_prefs
|
||
SET subscribed = $2,
|
||
dnd_start = NULLIF($3, '')::time,
|
||
dnd_end = NULLIF($4, '')::time,
|
||
locale = $5,
|
||
timezone = $6,
|
||
daily_summary_enabled = $7,
|
||
daily_summary_time = NULLIF($8, '')::time,
|
||
updated_at = now()
|
||
WHERE user_id = $1`,
|
||
userID,
|
||
input.Subscribed,
|
||
input.DNDStart,
|
||
input.DNDEnd,
|
||
input.Locale,
|
||
input.Timezone,
|
||
input.DailySummaryEnabled,
|
||
input.DailySummaryTime,
|
||
)
|
||
if err != nil {
|
||
return UserNotificationPrefs{}, err
|
||
}
|
||
return s.GetPrefs(ctx, userID)
|
||
}
|
||
|
||
func (s *Service) ListGroups(ctx context.Context, userID int64) ([]NotificationGroup, error) {
|
||
rows, err := s.pool.Query(ctx, `SELECT id, name, emails FROM notification_groups WHERE user_id = $1 ORDER BY id DESC`, userID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
out := make([]NotificationGroup, 0)
|
||
for rows.Next() {
|
||
var item NotificationGroup
|
||
if err := rows.Scan(&item.ID, &item.Name, &item.Emails); err != nil {
|
||
return nil, err
|
||
}
|
||
out = append(out, item)
|
||
}
|
||
return out, rows.Err()
|
||
}
|
||
|
||
func (s *Service) CreateGroup(ctx context.Context, userID int64, group NotificationGroup) (NotificationGroup, error) {
|
||
group.Name = strings.TrimSpace(group.Name)
|
||
group.Emails = normalizeEmails(group.Emails)
|
||
if group.Name == "" {
|
||
return NotificationGroup{}, fmt.Errorf("group name required")
|
||
}
|
||
if len(group.Emails) == 0 {
|
||
return NotificationGroup{}, fmt.Errorf("emails required")
|
||
}
|
||
err := s.pool.QueryRow(ctx, `
|
||
INSERT INTO notification_groups (user_id, name, emails, created_at, updated_at)
|
||
VALUES ($1, $2, $3, now(), now())
|
||
RETURNING id, name, emails`, userID, group.Name, group.Emails,
|
||
).Scan(&group.ID, &group.Name, &group.Emails)
|
||
if err != nil {
|
||
return NotificationGroup{}, err
|
||
}
|
||
return group, nil
|
||
}
|
||
|
||
func (s *Service) UpdateGroup(ctx context.Context, userID, id int64, group NotificationGroup) (NotificationGroup, error) {
|
||
group.Name = strings.TrimSpace(group.Name)
|
||
group.Emails = normalizeEmails(group.Emails)
|
||
if group.Name == "" {
|
||
return NotificationGroup{}, fmt.Errorf("group name required")
|
||
}
|
||
if len(group.Emails) == 0 {
|
||
return NotificationGroup{}, fmt.Errorf("emails required")
|
||
}
|
||
err := s.pool.QueryRow(ctx, `
|
||
UPDATE notification_groups
|
||
SET name = $3, emails = $4, updated_at = now()
|
||
WHERE id = $1 AND user_id = $2
|
||
RETURNING id, name, emails`, id, userID, group.Name, group.Emails,
|
||
).Scan(&group.ID, &group.Name, &group.Emails)
|
||
if err != nil {
|
||
return NotificationGroup{}, err
|
||
}
|
||
return group, nil
|
||
}
|
||
|
||
func (s *Service) DeleteGroup(ctx context.Context, userID, id int64) error {
|
||
res, err := s.pool.Exec(ctx, `DELETE FROM notification_groups WHERE id = $1 AND user_id = $2`, id, userID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if res.RowsAffected() == 0 {
|
||
return pgx.ErrNoRows
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *Service) ListDLQ(ctx context.Context, userID int64, page, pageSize int) ([]DLQItem, error) {
|
||
if page < 1 {
|
||
page = 1
|
||
}
|
||
if pageSize <= 0 || pageSize > 200 {
|
||
pageSize = 20
|
||
}
|
||
offset := (page - 1) * pageSize
|
||
rows, err := s.pool.Query(ctx, `
|
||
SELECT j.job_id, j.template_id, j.to_emails, d.reason, d.failed_at, j.retry_count, j.trace_id, COALESCE(j.last_error_code, '')
|
||
FROM notification_dlq d
|
||
JOIN notification_jobs j ON j.job_id = d.job_id
|
||
WHERE j.user_id = $1
|
||
ORDER BY d.failed_at DESC
|
||
LIMIT $2 OFFSET $3`, userID, pageSize, offset,
|
||
)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
out := make([]DLQItem, 0)
|
||
for rows.Next() {
|
||
var item DLQItem
|
||
var failedAt time.Time
|
||
if err := rows.Scan(&item.JobID, &item.TemplateID, &item.ToEmails, &item.Reason, &failedAt, &item.RetryCount, &item.TraceID, &item.ErrorCode); err != nil {
|
||
return nil, err
|
||
}
|
||
item.FailedAt = failedAt.UTC().Format(time.RFC3339)
|
||
out = append(out, item)
|
||
}
|
||
return out, rows.Err()
|
||
}
|
||
|
||
func (s *Service) GetMetrics(ctx context.Context, from, to time.Time) (Metrics, error) {
|
||
if from.IsZero() {
|
||
from = time.Now().Add(-24 * time.Hour)
|
||
}
|
||
if to.IsZero() {
|
||
to = time.Now()
|
||
}
|
||
out := Metrics{From: from.UTC().Format(time.RFC3339), To: to.UTC().Format(time.RFC3339)}
|
||
|
||
_ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE status = 'sent' AND sent_at BETWEEN $1 AND $2`, from, to).Scan(&out.SentCount)
|
||
_ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE status IN ('failed','dead') AND updated_at BETWEEN $1 AND $2`, from, to).Scan(&out.FailedCount)
|
||
_ = s.pool.QueryRow(ctx, `SELECT COALESCE(avg(retry_count), 0) FROM notification_jobs WHERE created_at BETWEEN $1 AND $2`, from, to).Scan(&out.AvgRetryCount)
|
||
_ = s.pool.QueryRow(ctx, `SELECT COALESCE(percentile_cont(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM (sent_at - scheduled_at))*1000), 0) FROM notification_jobs WHERE status='sent' AND sent_at BETWEEN $1 AND $2`, from, to).Scan(&out.P95LatencyMS)
|
||
_ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE status='pending'`).Scan(&out.PendingBacklog)
|
||
_ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_dlq WHERE failed_at >= now() - interval '10 minutes'`).Scan(&out.DLQInLast10Mins)
|
||
|
||
total := out.SentCount + out.FailedCount
|
||
if total > 0 {
|
||
out.DeliveryRate = float64(out.SentCount) / float64(total)
|
||
}
|
||
|
||
rows, err := s.pool.Query(ctx, `
|
||
SELECT COALESCE(last_error_code,''), count(*)
|
||
FROM notification_jobs
|
||
WHERE status IN ('failed','dead') AND updated_at BETWEEN $1 AND $2
|
||
GROUP BY COALESCE(last_error_code,'')
|
||
ORDER BY count(*) DESC
|
||
LIMIT 10`, from, to,
|
||
)
|
||
if err == nil {
|
||
defer rows.Close()
|
||
for rows.Next() {
|
||
var item FailureReason
|
||
if err := rows.Scan(&item.Code, &item.Count); err == nil {
|
||
out.FailureReasons = append(out.FailureReasons, item)
|
||
}
|
||
}
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
type TaskSnapshot struct {
|
||
ID int64
|
||
UserID int64
|
||
Title string
|
||
Description string
|
||
Status string
|
||
DueAt string
|
||
Priority int
|
||
Tags []string
|
||
NotifyGroupIDs []int64
|
||
}
|
||
|
||
func (s *Service) PublishTaskEvent(ctx context.Context, eventType string, task TaskSnapshot) error {
|
||
eventID := randomID("evt")
|
||
traceID := randomID("trace")
|
||
if eventType == "task.due_soon" {
|
||
eventID = fmt.Sprintf("task-due-soon:%d:%s", task.ID, task.DueAt)
|
||
traceID = eventID
|
||
}
|
||
if eventType == "task.overdue" {
|
||
eventID = fmt.Sprintf("task-overdue:%d:%s", task.ID, task.DueAt)
|
||
traceID = eventID
|
||
}
|
||
payload := map[string]any{
|
||
"task_id": task.ID,
|
||
"title": task.Title,
|
||
"description": task.Description,
|
||
"status": task.Status,
|
||
"due_at": task.DueAt,
|
||
"priority": task.Priority,
|
||
"tags": task.Tags,
|
||
"notify_group_ids": task.NotifyGroupIDs,
|
||
}
|
||
inserted, err := s.insertEvent(ctx, eventID, eventType, task.UserID, traceID, payload)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !inserted {
|
||
return nil
|
||
}
|
||
return s.evaluateAndCreateJob(ctx, eventID, eventType, traceID, task.UserID, payload)
|
||
}
|
||
|
||
func (s *Service) PublishDailySummaryEvent(ctx context.Context, userID int64, localDate string, payload map[string]any) error {
|
||
eventID := fmt.Sprintf("daily-summary:%d:%s", userID, localDate)
|
||
traceID := eventID
|
||
inserted, err := s.insertEvent(ctx, eventID, "task.daily_summary", userID, traceID, payload)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !inserted {
|
||
return nil
|
||
}
|
||
return s.evaluateAndCreateJob(ctx, eventID, "task.daily_summary", traceID, userID, payload)
|
||
}
|
||
|
||
func (s *Service) insertEvent(ctx context.Context, eventID, eventType string, userID int64, traceID string, payload map[string]any) (bool, error) {
|
||
data, _ := json.Marshal(payload)
|
||
res, err := s.pool.Exec(ctx, `
|
||
INSERT INTO notification_events (event_id, event_type, user_id, occurred_at, payload, trace_id, created_at)
|
||
VALUES ($1, $2, $3, now(), $4, $5, now())
|
||
ON CONFLICT (event_id) DO NOTHING`, eventID, eventType, userID, data, traceID,
|
||
)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
return res.RowsAffected() > 0, nil
|
||
}
|
||
|
||
func (s *Service) evaluateAndCreateJob(ctx context.Context, eventID, eventType, traceID string, userID int64, payload map[string]any) error {
|
||
prefs, err := s.GetPrefs(ctx, userID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !prefs.Subscribed {
|
||
return nil
|
||
}
|
||
|
||
templateID := mapTemplateID(eventType)
|
||
if templateID == "" {
|
||
return nil
|
||
}
|
||
|
||
locale := normalizeLocale(prefs.Locale)
|
||
tz := prefs.Timezone
|
||
if tz == "" {
|
||
tz = "Asia/Shanghai"
|
||
}
|
||
|
||
var recipients []string
|
||
if eventType == "task.daily_summary" {
|
||
email, err := s.getUserEmail(ctx, userID)
|
||
if err != nil {
|
||
return nil
|
||
}
|
||
recipients = []string{email}
|
||
} else {
|
||
recipients, err = s.resolveRecipientsByPayload(ctx, userID, payload)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
recipients = normalizeEmails(recipients)
|
||
if len(recipients) == 0 {
|
||
return nil
|
||
}
|
||
|
||
now := time.Now().UTC()
|
||
scheduledAt := now
|
||
if eventType == "task.daily_summary" {
|
||
scheduledAt = s.nextDailyTime(now, prefs)
|
||
} else {
|
||
scheduledAt = applyDND(now, prefs)
|
||
}
|
||
if scheduledAt.Before(now) {
|
||
scheduledAt = now
|
||
}
|
||
|
||
params := map[string]any{
|
||
"event_type": eventType,
|
||
"locale": locale,
|
||
"timezone": tz,
|
||
"payload": payload,
|
||
}
|
||
|
||
jobID := randomID("job")
|
||
idem := idempotencyKey(eventID, templateID, recipients)
|
||
paramsJSON, _ := json.Marshal(params)
|
||
|
||
_, err = s.pool.Exec(ctx, `
|
||
INSERT INTO notification_jobs (
|
||
job_id, event_id, trace_id, user_id, channel, to_emails, template_id, params,
|
||
idempotency_key, status, scheduled_at, available_at, retry_count, max_retry, created_at, updated_at
|
||
)
|
||
VALUES ($1, $2, $3, $4, 'email', $5, $6, $7, $8, 'pending', $9, $9, 0, $10, now(), now())
|
||
ON CONFLICT (idempotency_key) DO NOTHING`,
|
||
jobID, eventID, traceID, userID, recipients, templateID, paramsJSON, idem, scheduledAt, s.cfg.MaxRetry,
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *Service) resolveRecipientsByPayload(ctx context.Context, userID int64, payload map[string]any) ([]string, error) {
|
||
rawIDs, ok := payload["notify_group_ids"]
|
||
if !ok {
|
||
return nil, nil
|
||
}
|
||
groupIDs := parseAnyInt64Slice(rawIDs)
|
||
if len(groupIDs) == 0 {
|
||
return nil, nil
|
||
}
|
||
rows, err := s.pool.Query(ctx, `SELECT emails FROM notification_groups WHERE user_id = $1 AND id = ANY($2)`, userID, groupIDs)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
emails := make([]string, 0)
|
||
for rows.Next() {
|
||
var list []string
|
||
if err := rows.Scan(&list); err != nil {
|
||
return nil, err
|
||
}
|
||
emails = append(emails, list...)
|
||
}
|
||
return normalizeEmails(emails), rows.Err()
|
||
}
|
||
|
||
func (s *Service) getUserEmail(ctx context.Context, userID int64) (string, error) {
|
||
var email string
|
||
err := s.pool.QueryRow(ctx, `SELECT email FROM users WHERE id = $1`, userID).Scan(&email)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return strings.TrimSpace(strings.ToLower(email)), nil
|
||
}
|
||
|
||
func (s *Service) ensureUserPrefs(ctx context.Context, userID int64) error {
|
||
_, err := s.pool.Exec(ctx, `
|
||
INSERT INTO user_notification_prefs (
|
||
user_id, subscribed, locale, timezone, daily_summary_enabled, daily_summary_time, updated_at
|
||
)
|
||
VALUES ($1, true, 'zh', 'Asia/Shanghai', true, '09:30:00', now())
|
||
ON CONFLICT (user_id) DO NOTHING`, userID,
|
||
)
|
||
return err
|
||
}
|
||
|
||
func (s *Service) runDueScheduler(ctx context.Context) {
|
||
ticker := time.NewTicker(s.cfg.SchedulerTick)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
s.scheduleDueEvents(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Service) scheduleDueEvents(ctx context.Context) {
|
||
rows, err := s.pool.Query(ctx, `
|
||
SELECT id, user_id, title, description, status, due_at, priority, tags, notify_group_ids
|
||
FROM tasks
|
||
WHERE status <> 'done' AND due_at IS NOT NULL AND due_at <> ''`,
|
||
)
|
||
if err != nil {
|
||
log.Printf("notification due scheduler query failed: %v", err)
|
||
return
|
||
}
|
||
defer rows.Close()
|
||
now := time.Now().UTC()
|
||
for rows.Next() {
|
||
var task TaskSnapshot
|
||
if err := rows.Scan(&task.ID, &task.UserID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &task.Tags, &task.NotifyGroupIDs); err != nil {
|
||
continue
|
||
}
|
||
dueAt, err := time.Parse(time.RFC3339, task.DueAt)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
if dueAt.After(now) && dueAt.Before(now.Add(s.cfg.DueSoonWindow)) {
|
||
if s.hasExistingTaskEvent(ctx, task.ID, "task.due_soon", task.DueAt) {
|
||
continue
|
||
}
|
||
_ = s.PublishTaskEvent(ctx, "task.due_soon", task)
|
||
}
|
||
if dueAt.Before(now) {
|
||
if s.hasExistingTaskEvent(ctx, task.ID, "task.overdue", task.DueAt) {
|
||
continue
|
||
}
|
||
_ = s.PublishTaskEvent(ctx, "task.overdue", task)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Service) hasExistingTaskEvent(ctx context.Context, taskID int64, eventType, dueAt string) bool {
|
||
var exists bool
|
||
err := s.pool.QueryRow(ctx, `
|
||
SELECT EXISTS (
|
||
SELECT 1
|
||
FROM notification_events
|
||
WHERE event_type = $1
|
||
AND payload->>'task_id' = $2
|
||
AND payload->>'due_at' = $3
|
||
)`,
|
||
eventType,
|
||
strconv.FormatInt(taskID, 10),
|
||
dueAt,
|
||
).Scan(&exists)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
return exists
|
||
}
|
||
|
||
func (s *Service) runDailySummaryScheduler(ctx context.Context) {
|
||
ticker := time.NewTicker(s.cfg.SchedulerTick)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
s.scheduleDailySummary(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Service) scheduleDailySummary(ctx context.Context) {
|
||
rows, err := s.pool.Query(ctx, `
|
||
SELECT user_id,
|
||
COALESCE(timezone, 'Asia/Shanghai'),
|
||
COALESCE(locale, 'zh'),
|
||
COALESCE(to_char(daily_summary_time, 'HH24:MI'), '09:30'),
|
||
subscribed,
|
||
daily_summary_enabled,
|
||
COALESCE(to_char(dnd_start, 'HH24:MI'), ''),
|
||
COALESCE(to_char(dnd_end, 'HH24:MI'), '')
|
||
FROM user_notification_prefs
|
||
WHERE daily_summary_enabled = true`,
|
||
)
|
||
if err != nil {
|
||
log.Printf("daily summary query failed: %v", err)
|
||
return
|
||
}
|
||
defer rows.Close()
|
||
nowUTC := time.Now().UTC()
|
||
for rows.Next() {
|
||
var userID int64
|
||
var timezone, locale, summaryAt, dndStart, dndEnd string
|
||
var subscribed, enabled bool
|
||
if err := rows.Scan(&userID, &timezone, &locale, &summaryAt, &subscribed, &enabled, &dndStart, &dndEnd); err != nil {
|
||
continue
|
||
}
|
||
if !subscribed || !enabled {
|
||
continue
|
||
}
|
||
loc, err := time.LoadLocation(timezone)
|
||
if err != nil {
|
||
loc = time.FixedZone("UTC", 0)
|
||
}
|
||
localNow := nowUTC.In(loc)
|
||
if localNow.Format("15:04") != summaryAt {
|
||
continue
|
||
}
|
||
openCount, overdueCount := s.summaryCounts(ctx, userID, nowUTC)
|
||
payload := map[string]any{
|
||
"open_count": openCount,
|
||
"overdue_count": overdueCount,
|
||
"summary_date": localNow.Format("2006-01-02"),
|
||
"timezone": timezone,
|
||
"locale": locale,
|
||
"dnd_start": dndStart,
|
||
"dnd_end": dndEnd,
|
||
}
|
||
_ = s.PublishDailySummaryEvent(ctx, userID, localNow.Format("2006-01-02"), payload)
|
||
}
|
||
}
|
||
|
||
func (s *Service) summaryCounts(ctx context.Context, userID int64, now time.Time) (int64, int64) {
|
||
var openCount int64
|
||
_ = s.pool.QueryRow(ctx, `SELECT count(*) FROM tasks WHERE user_id = $1 AND status <> 'done'`, userID).Scan(&openCount)
|
||
|
||
var overdueCount int64
|
||
_ = s.pool.QueryRow(ctx, `
|
||
SELECT count(*)
|
||
FROM tasks
|
||
WHERE user_id = $1
|
||
AND status <> 'done'
|
||
AND due_at IS NOT NULL
|
||
AND due_at <> ''
|
||
AND due_at::timestamptz < $2`, userID, now,
|
||
).Scan(&overdueCount)
|
||
return openCount, overdueCount
|
||
}
|
||
|
||
func (s *Service) runJobDispatcher(ctx context.Context) {
|
||
if s.writer == nil {
|
||
log.Println("notification dispatcher disabled: kafka writer not configured")
|
||
return
|
||
}
|
||
ticker := time.NewTicker(time.Second)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
s.dispatchPendingJobs(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
type jobRecord struct {
|
||
JobID string
|
||
TraceID string
|
||
Channel string
|
||
ToEmails []string
|
||
TemplateID string
|
||
Params map[string]any
|
||
RetryCount int
|
||
IdempotencyKey string
|
||
ScheduledAt time.Time
|
||
}
|
||
|
||
func (s *Service) dispatchPendingJobs(ctx context.Context) {
|
||
jobs, err := s.reservePendingJobs(ctx, s.cfg.DispatchBatch, s.cfg.DispatchLease)
|
||
if err != nil || len(jobs) == 0 {
|
||
return
|
||
}
|
||
messages := make([]kafka.Message, 0, len(jobs))
|
||
for _, job := range jobs {
|
||
data, _ := json.Marshal(map[string]any{
|
||
"job_id": job.JobID,
|
||
"trace_id": job.TraceID,
|
||
"channel": job.Channel,
|
||
"to": job.ToEmails,
|
||
"template_id": job.TemplateID,
|
||
"params": job.Params,
|
||
"retry_count": job.RetryCount,
|
||
"idempotency_key": job.IdempotencyKey,
|
||
"scheduled_at": job.ScheduledAt.UTC().Format(time.RFC3339),
|
||
})
|
||
messages = append(messages, kafka.Message{Key: []byte(job.JobID), Value: data})
|
||
}
|
||
writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||
defer cancel()
|
||
if err := s.writer.WriteMessages(writeCtx, messages...); err != nil {
|
||
_ = s.resetReservedJobs(ctx, jobs)
|
||
log.Printf("notification dispatcher publish failed: %v", err)
|
||
}
|
||
}
|
||
|
||
func (s *Service) reservePendingJobs(ctx context.Context, batch int, lease time.Duration) ([]jobRecord, error) {
|
||
tx, err := s.pool.Begin(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer tx.Rollback(ctx)
|
||
rows, err := tx.Query(ctx, `
|
||
WITH picked AS (
|
||
SELECT job_id
|
||
FROM notification_jobs
|
||
WHERE status = 'pending' AND available_at <= now()
|
||
ORDER BY available_at ASC
|
||
LIMIT $1
|
||
FOR UPDATE SKIP LOCKED
|
||
)
|
||
UPDATE notification_jobs nj
|
||
SET available_at = now() + $2::interval,
|
||
updated_at = now()
|
||
FROM picked
|
||
WHERE nj.job_id = picked.job_id
|
||
RETURNING nj.job_id, nj.trace_id, nj.channel, nj.to_emails, nj.template_id, nj.params, nj.retry_count, nj.idempotency_key, nj.scheduled_at`,
|
||
batch, fmt.Sprintf("%d seconds", int(lease.Seconds())),
|
||
)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
jobs := make([]jobRecord, 0)
|
||
for rows.Next() {
|
||
var item jobRecord
|
||
var paramsBytes []byte
|
||
if err := rows.Scan(&item.JobID, &item.TraceID, &item.Channel, &item.ToEmails, &item.TemplateID, ¶msBytes, &item.RetryCount, &item.IdempotencyKey, &item.ScheduledAt); err != nil {
|
||
return nil, err
|
||
}
|
||
_ = json.Unmarshal(paramsBytes, &item.Params)
|
||
jobs = append(jobs, item)
|
||
}
|
||
if err := rows.Err(); err != nil {
|
||
return nil, err
|
||
}
|
||
if err := tx.Commit(ctx); err != nil {
|
||
return nil, err
|
||
}
|
||
return jobs, nil
|
||
}
|
||
|
||
func (s *Service) resetReservedJobs(ctx context.Context, jobs []jobRecord) error {
|
||
ids := make([]string, 0, len(jobs))
|
||
for _, job := range jobs {
|
||
ids = append(ids, job.JobID)
|
||
}
|
||
if len(ids) == 0 {
|
||
return nil
|
||
}
|
||
_, err := s.pool.Exec(ctx, `UPDATE notification_jobs SET available_at = now(), updated_at = now() WHERE job_id = ANY($1)`, ids)
|
||
return err
|
||
}
|
||
|
||
func (s *Service) runWorker(ctx context.Context) {
|
||
if s.reader == nil {
|
||
log.Println("notification worker disabled: kafka reader not configured")
|
||
return
|
||
}
|
||
for {
|
||
msg, err := s.reader.FetchMessage(ctx)
|
||
if err != nil {
|
||
if ctx.Err() != nil {
|
||
return
|
||
}
|
||
log.Printf("notification worker fetch failed: %v", err)
|
||
continue
|
||
}
|
||
if err := s.handleMessage(ctx, msg); err != nil {
|
||
log.Printf("notification worker handle failed: %v", err)
|
||
continue
|
||
}
|
||
if err := s.reader.CommitMessages(ctx, msg); err != nil {
|
||
log.Printf("notification worker commit failed: %v", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Service) handleMessage(ctx context.Context, msg kafka.Message) error {
|
||
var payload struct {
|
||
JobID string `json:"job_id"`
|
||
}
|
||
if err := json.Unmarshal(msg.Value, &payload); err != nil {
|
||
return nil
|
||
}
|
||
if payload.JobID == "" {
|
||
return nil
|
||
}
|
||
job, err := s.loadJob(ctx, payload.JobID)
|
||
if err != nil {
|
||
if errors.Is(err, pgx.ErrNoRows) {
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
if job.Status == statusSent || job.Status == statusDead {
|
||
return nil
|
||
}
|
||
if s.isDuplicateSent(ctx, job.IdempotencyKey, job.JobID) {
|
||
_, _ = s.pool.Exec(ctx, `UPDATE notification_jobs SET status='sent', sent_at=now(), updated_at=now() WHERE job_id = $1`, job.JobID)
|
||
return nil
|
||
}
|
||
|
||
start := time.Now()
|
||
subject, body, err := s.renderTemplate(job.TemplateID, job.Params)
|
||
if err != nil {
|
||
s.failJob(ctx, job, "TEMPLATE_RENDER", err.Error(), false)
|
||
return nil
|
||
}
|
||
sendErr := s.email.Send(job.ToEmails, subject, body)
|
||
latency := int(time.Since(start).Milliseconds())
|
||
if sendErr == nil {
|
||
_, _ = s.pool.Exec(ctx, `UPDATE notification_jobs SET status='sent', sent_at=now(), updated_at=now(), last_error_code=NULL, last_error_message=NULL WHERE job_id = $1`, job.JobID)
|
||
_, _ = s.pool.Exec(ctx, `INSERT INTO notification_attempts (job_id, attempt_no, success, latency_ms, created_at) VALUES ($1, $2, true, $3, now())`, job.JobID, job.RetryCount+1, latency)
|
||
log.Printf("notification sent trace_id=%s job_id=%s status=sent latency_ms=%d", job.TraceID, job.JobID, latency)
|
||
return nil
|
||
}
|
||
|
||
retryable := isRetryableError(sendErr.Code)
|
||
s.failJob(ctx, job, sendErr.Code, sendErr.Error(), retryable)
|
||
_, _ = s.pool.Exec(ctx, `INSERT INTO notification_attempts (job_id, attempt_no, success, error_code, error_message, latency_ms, created_at) VALUES ($1, $2, false, $3, $4, $5, now())`,
|
||
job.JobID, job.RetryCount+1, sendErr.Code, truncate(sendErr.Error(), 400), latency)
|
||
log.Printf("notification failed trace_id=%s job_id=%s status=%s error_code=%s latency_ms=%d", job.TraceID, job.JobID, statusFailed, sendErr.Code, latency)
|
||
return nil
|
||
}
|
||
|
||
type jobRow struct {
|
||
JobID string
|
||
Status string
|
||
TraceID string
|
||
ToEmails []string
|
||
TemplateID string
|
||
Params map[string]any
|
||
RetryCount int
|
||
MaxRetry int
|
||
IdempotencyKey string
|
||
}
|
||
|
||
func (s *Service) loadJob(ctx context.Context, jobID string) (jobRow, error) {
|
||
var out jobRow
|
||
var paramsBytes []byte
|
||
err := s.pool.QueryRow(ctx, `
|
||
SELECT job_id, status, trace_id, to_emails, template_id, params, retry_count, max_retry, idempotency_key
|
||
FROM notification_jobs
|
||
WHERE job_id = $1`, jobID,
|
||
).Scan(&out.JobID, &out.Status, &out.TraceID, &out.ToEmails, &out.TemplateID, ¶msBytes, &out.RetryCount, &out.MaxRetry, &out.IdempotencyKey)
|
||
if err != nil {
|
||
return jobRow{}, err
|
||
}
|
||
_ = json.Unmarshal(paramsBytes, &out.Params)
|
||
return out, nil
|
||
}
|
||
|
||
func (s *Service) isDuplicateSent(ctx context.Context, idempotencyKey, jobID string) bool {
|
||
var count int
|
||
_ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE idempotency_key = $1 AND status = 'sent' AND job_id <> $2`, idempotencyKey, jobID).Scan(&count)
|
||
return count > 0
|
||
}
|
||
|
||
func (s *Service) failJob(ctx context.Context, job jobRow, code, message string, retryable bool) {
|
||
if retryable && job.RetryCount < job.MaxRetry {
|
||
nextRetry := job.RetryCount + 1
|
||
backoff := s.retryDelay(nextRetry)
|
||
_, _ = s.pool.Exec(ctx, `
|
||
UPDATE notification_jobs
|
||
SET status = 'pending',
|
||
retry_count = $2,
|
||
available_at = now() + $3::interval,
|
||
last_error_code = $4,
|
||
last_error_message = $5,
|
||
updated_at = now()
|
||
WHERE job_id = $1`,
|
||
job.JobID,
|
||
nextRetry,
|
||
fmt.Sprintf("%d seconds", int(backoff.Seconds())),
|
||
code,
|
||
truncate(message, 400),
|
||
)
|
||
return
|
||
}
|
||
_, _ = s.pool.Exec(ctx, `
|
||
UPDATE notification_jobs
|
||
SET status='dead',
|
||
last_error_code=$2,
|
||
last_error_message=$3,
|
||
updated_at=now()
|
||
WHERE job_id=$1`, job.JobID, code, truncate(message, 400),
|
||
)
|
||
snapshot, _ := json.Marshal(job)
|
||
_, _ = s.pool.Exec(ctx, `
|
||
INSERT INTO notification_dlq (job_id, reason, failed_at, snapshot)
|
||
VALUES ($1, $2, now(), $3)
|
||
ON CONFLICT (job_id) DO UPDATE SET reason = excluded.reason, failed_at = excluded.failed_at, snapshot = excluded.snapshot`,
|
||
job.JobID,
|
||
fmt.Sprintf("%s: %s", code, truncate(message, 200)),
|
||
snapshot,
|
||
)
|
||
}
|
||
|
||
func (s *Service) retryDelay(retryCount int) time.Duration {
|
||
x := math.Pow(2, float64(retryCount))
|
||
d := time.Duration(x) * s.cfg.BackoffBase
|
||
maxBackoff := 30 * time.Minute
|
||
if d > maxBackoff {
|
||
return maxBackoff
|
||
}
|
||
return d
|
||
}
|
||
|
||
func (s *Service) runAlertChecker(ctx context.Context) {
|
||
ticker := time.NewTicker(time.Minute)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
from := time.Now().Add(-10 * time.Minute)
|
||
metrics, err := s.GetMetrics(ctx, from, time.Now())
|
||
if err != nil {
|
||
continue
|
||
}
|
||
failureRate := 0.0
|
||
total := metrics.SentCount + metrics.FailedCount
|
||
if total > 0 {
|
||
failureRate = float64(metrics.FailedCount) / float64(total)
|
||
}
|
||
if failureRate > s.cfg.AlertFailureThreshold {
|
||
log.Printf("[ALERT] notification failure rate spike: %.2f", failureRate)
|
||
}
|
||
if metrics.PendingBacklog > int64(s.cfg.AlertPendingThreshold) {
|
||
log.Printf("[ALERT] notification pending backlog: %d", metrics.PendingBacklog)
|
||
}
|
||
if metrics.DLQInLast10Mins > int64(s.cfg.AlertDLQThreshold) {
|
||
log.Printf("[ALERT] notification DLQ growth: %d in last 10m", metrics.DLQInLast10Mins)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Service) renderTemplate(templateID string, params map[string]any) (string, string, error) {
|
||
locale := normalizeLocale(anyString(params["locale"]))
|
||
payload, _ := params["payload"].(map[string]any)
|
||
if payload == nil {
|
||
payload = map[string]any{}
|
||
}
|
||
var subjectTpl, bodyTpl string
|
||
switch templateID {
|
||
case "task_created_notice":
|
||
subjectTpl = mapLocale(locale, "新任务创建:{{.title}}", "Task created: {{.title}}")
|
||
bodyTpl = mapLocale(locale, "任务 {{.title}} 已创建,优先级 P{{.priority}},截止 {{.due_at}}", "Task {{.title}} created, priority P{{.priority}}, due {{.due_at}}")
|
||
case "task_updated_notice":
|
||
subjectTpl = mapLocale(locale, "任务更新:{{.title}}", "Task updated: {{.title}}")
|
||
bodyTpl = mapLocale(locale, "任务 {{.title}} 已更新,当前状态 {{.status}}", "Task {{.title}} updated, current status {{.status}}")
|
||
case "task_status_changed_notice":
|
||
subjectTpl = mapLocale(locale, "任务状态变更:{{.title}}", "Task status changed: {{.title}}")
|
||
bodyTpl = mapLocale(locale, "任务 {{.title}} 状态已变更为 {{.status}}", "Task {{.title}} status changed to {{.status}}")
|
||
case "task_due_soon":
|
||
subjectTpl = mapLocale(locale, "任务即将到期:{{.title}}", "Task due soon: {{.title}}")
|
||
bodyTpl = mapLocale(locale, "任务 {{.title}} 将在 {{.due_at}} 到期", "Task {{.title}} is due at {{.due_at}}")
|
||
case "task_overdue":
|
||
subjectTpl = mapLocale(locale, "任务已逾期:{{.title}}", "Task overdue: {{.title}}")
|
||
bodyTpl = mapLocale(locale, "任务 {{.title}} 已逾期,截止时间 {{.due_at}}", "Task {{.title}} is overdue, due at {{.due_at}}")
|
||
case "daily_summary":
|
||
subjectTpl = mapLocale(locale, "每日摘要({{.summary_date}})", "Daily summary ({{.summary_date}})")
|
||
bodyTpl = mapLocale(locale, "未完成任务 {{.open_count}},逾期任务 {{.overdue_count}}", "Open tasks {{.open_count}}, overdue tasks {{.overdue_count}}")
|
||
default:
|
||
return "", "", fmt.Errorf("template not found")
|
||
}
|
||
render := func(tpl string) (string, error) {
|
||
t, err := template.New("msg").Parse(tpl)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
var b strings.Builder
|
||
if err := t.Execute(&b, payload); err != nil {
|
||
return "", err
|
||
}
|
||
return b.String(), nil
|
||
}
|
||
subject, err := render(subjectTpl)
|
||
if err != nil {
|
||
return "", "", err
|
||
}
|
||
body, err := render(bodyTpl)
|
||
if err != nil {
|
||
return "", "", err
|
||
}
|
||
return subject, body, nil
|
||
}
|
||
|
||
func mapTemplateID(eventType string) string {
|
||
switch eventType {
|
||
case "task.created":
|
||
return "task_created_notice"
|
||
case "task.updated":
|
||
return "task_updated_notice"
|
||
case "task.status_changed":
|
||
return "task_status_changed_notice"
|
||
case "task.due_soon":
|
||
return "task_due_soon"
|
||
case "task.overdue":
|
||
return "task_overdue"
|
||
case "task.daily_summary":
|
||
return "daily_summary"
|
||
default:
|
||
return ""
|
||
}
|
||
}
|
||
|
||
func applyDND(now time.Time, prefs UserNotificationPrefs) time.Time {
|
||
if prefs.DNDStart == "" || prefs.DNDEnd == "" {
|
||
return now
|
||
}
|
||
loc, err := time.LoadLocation(prefs.Timezone)
|
||
if err != nil {
|
||
loc = time.UTC
|
||
}
|
||
localNow := now.In(loc)
|
||
start, ok1 := parseClock(localNow, prefs.DNDStart)
|
||
end, ok2 := parseClock(localNow, prefs.DNDEnd)
|
||
if !ok1 || !ok2 {
|
||
return now
|
||
}
|
||
if start.Equal(end) {
|
||
return now
|
||
}
|
||
if end.After(start) {
|
||
if !localNow.Before(start) && localNow.Before(end) {
|
||
return end.UTC()
|
||
}
|
||
return now
|
||
}
|
||
if !localNow.Before(start) {
|
||
return end.Add(24 * time.Hour).UTC()
|
||
}
|
||
if localNow.Before(end) {
|
||
return end.UTC()
|
||
}
|
||
return now
|
||
}
|
||
|
||
func (s *Service) nextDailyTime(now time.Time, prefs UserNotificationPrefs) time.Time {
|
||
loc, err := time.LoadLocation(prefs.Timezone)
|
||
if err != nil {
|
||
loc = time.UTC
|
||
}
|
||
localNow := now.In(loc)
|
||
target, ok := parseClock(localNow, prefs.DailySummaryTime)
|
||
if !ok {
|
||
target = time.Date(localNow.Year(), localNow.Month(), localNow.Day(), 9, 30, 0, 0, loc)
|
||
}
|
||
if localNow.After(target) {
|
||
target = target.Add(24 * time.Hour)
|
||
}
|
||
shifted := applyDND(target.UTC(), prefs)
|
||
return shifted
|
||
}
|
||
|
||
func parseClock(base time.Time, hhmm string) (time.Time, bool) {
|
||
parts := strings.Split(hhmm, ":")
|
||
if len(parts) != 2 {
|
||
return time.Time{}, false
|
||
}
|
||
h, err1 := strconv.Atoi(parts[0])
|
||
m, err2 := strconv.Atoi(parts[1])
|
||
if err1 != nil || err2 != nil || h < 0 || h > 23 || m < 0 || m > 59 {
|
||
return time.Time{}, false
|
||
}
|
||
return time.Date(base.Year(), base.Month(), base.Day(), h, m, 0, 0, base.Location()), true
|
||
}
|
||
|
||
func isValidHHMM(v string) bool {
|
||
_, ok := parseClock(time.Now(), v)
|
||
return ok
|
||
}
|
||
|
||
func normalizeLocale(v string) string {
|
||
v = strings.TrimSpace(strings.ToLower(v))
|
||
if v != "en" {
|
||
return "zh"
|
||
}
|
||
return v
|
||
}
|
||
|
||
func normalizeEmails(input []string) []string {
|
||
m := map[string]struct{}{}
|
||
out := make([]string, 0, len(input))
|
||
for _, e := range input {
|
||
e = strings.TrimSpace(strings.ToLower(e))
|
||
if e == "" {
|
||
continue
|
||
}
|
||
if _, err := mail.ParseAddress(e); err != nil {
|
||
continue
|
||
}
|
||
if _, ok := m[e]; ok {
|
||
continue
|
||
}
|
||
m[e] = struct{}{}
|
||
out = append(out, e)
|
||
}
|
||
sort.Strings(out)
|
||
return out
|
||
}
|
||
|
||
func parseAnyInt64Slice(v any) []int64 {
|
||
out := make([]int64, 0)
|
||
switch items := v.(type) {
|
||
case []any:
|
||
for _, item := range items {
|
||
switch n := item.(type) {
|
||
case float64:
|
||
out = append(out, int64(n))
|
||
case int64:
|
||
out = append(out, n)
|
||
case int:
|
||
out = append(out, int64(n))
|
||
}
|
||
}
|
||
case []int64:
|
||
out = append(out, items...)
|
||
}
|
||
return dedupeInt64(out)
|
||
}
|
||
|
||
func dedupeInt64(in []int64) []int64 {
|
||
if len(in) == 0 {
|
||
return in
|
||
}
|
||
m := make(map[int64]struct{}, len(in))
|
||
out := make([]int64, 0, len(in))
|
||
for _, id := range in {
|
||
if id <= 0 {
|
||
continue
|
||
}
|
||
if _, ok := m[id]; ok {
|
||
continue
|
||
}
|
||
m[id] = struct{}{}
|
||
out = append(out, id)
|
||
}
|
||
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
|
||
return out
|
||
}
|
||
|
||
func idempotencyKey(eventID, templateID string, recipients []string) string {
|
||
h := sha1.New()
|
||
_, _ = h.Write([]byte(eventID))
|
||
_, _ = h.Write([]byte("|" + templateID + "|" + strings.Join(recipients, ",")))
|
||
return hex.EncodeToString(h.Sum(nil))
|
||
}
|
||
|
||
func randomID(prefix string) string {
|
||
buf := make([]byte, 12)
|
||
if _, err := rand.Read(buf); err != nil {
|
||
n := time.Now().UnixNano()
|
||
return fmt.Sprintf("%s-%d", prefix, n)
|
||
}
|
||
return fmt.Sprintf("%s-%s", prefix, hex.EncodeToString(buf))
|
||
}
|
||
|
||
func isRetryableError(code string) bool {
|
||
switch code {
|
||
case "SMTP_TIMEOUT", "SMTP_RATE_LIMIT", "SMTP_TEMP_NETWORK", "SMTP_4XX", "SMTP_SEND_FAILED":
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func truncate(v string, n int) string {
|
||
if len(v) <= n {
|
||
return v
|
||
}
|
||
return v[:n]
|
||
}
|
||
|
||
func mapLocale(locale, zh, en string) string {
|
||
if locale == "en" {
|
||
return en
|
||
}
|
||
return zh
|
||
}
|
||
|
||
func anyString(v any) string {
|
||
if s, ok := v.(string); ok {
|
||
return s
|
||
}
|
||
return ""
|
||
}
|
||
|
||
type SendError struct {
|
||
Code string
|
||
Err error
|
||
}
|
||
|
||
func (e *SendError) Error() string {
|
||
if e == nil || e.Err == nil {
|
||
return ""
|
||
}
|
||
return e.Err.Error()
|
||
}
|
||
|
||
type EmailAdapter struct {
|
||
cfg Config
|
||
}
|
||
|
||
func NewEmailAdapter(cfg Config) *EmailAdapter {
|
||
return &EmailAdapter{cfg: cfg}
|
||
}
|
||
|
||
func (a *EmailAdapter) Send(to []string, subject, body string) *SendError {
|
||
if len(to) == 0 {
|
||
return &SendError{Code: "SMTP_INVALID_ADDRESS", Err: fmt.Errorf("empty recipient")}
|
||
}
|
||
for _, item := range to {
|
||
if _, err := mail.ParseAddress(item); err != nil {
|
||
return &SendError{Code: "SMTP_INVALID_ADDRESS", Err: fmt.Errorf("invalid recipient %q", item)}
|
||
}
|
||
}
|
||
if a.cfg.SMTPHost == "" || a.cfg.SMTPUser == "" || a.cfg.SMTPPass == "" || a.cfg.SMTPFrom == "" {
|
||
return &SendError{Code: "SMTP_CONFIG", Err: fmt.Errorf("smtp config incomplete")}
|
||
}
|
||
addr := net.JoinHostPort(a.cfg.SMTPHost, a.cfg.SMTPPort)
|
||
msg := buildMail(a.cfg.SMTPFrom, to, subject, body)
|
||
|
||
conn, err := net.DialTimeout("tcp", addr, 8*time.Second)
|
||
if err != nil {
|
||
return &SendError{Code: "SMTP_TEMP_NETWORK", Err: err}
|
||
}
|
||
defer conn.Close()
|
||
|
||
if a.cfg.SMTPUseTLS {
|
||
tlsConn := tls.Client(conn, &tls.Config{ServerName: a.cfg.SMTPHost})
|
||
if err := tlsConn.Handshake(); err != nil {
|
||
return &SendError{Code: "SMTP_TIMEOUT", Err: err}
|
||
}
|
||
conn = tlsConn
|
||
}
|
||
|
||
client, err := smtp.NewClient(conn, a.cfg.SMTPHost)
|
||
if err != nil {
|
||
return &SendError{Code: "SMTP_SEND_FAILED", Err: err}
|
||
}
|
||
defer client.Close()
|
||
|
||
if err := client.Auth(smtp.PlainAuth("", a.cfg.SMTPUser, a.cfg.SMTPPass, a.cfg.SMTPHost)); err != nil {
|
||
return &SendError{Code: "SMTP_SEND_FAILED", Err: err}
|
||
}
|
||
if err := client.Mail(a.cfg.SMTPFrom); err != nil {
|
||
return classifySMTPError(err)
|
||
}
|
||
for _, recipient := range to {
|
||
if err := client.Rcpt(recipient); err != nil {
|
||
return classifySMTPError(err)
|
||
}
|
||
}
|
||
writer, err := client.Data()
|
||
if err != nil {
|
||
return classifySMTPError(err)
|
||
}
|
||
if _, err := writer.Write([]byte(msg)); err != nil {
|
||
_ = writer.Close()
|
||
return classifySMTPError(err)
|
||
}
|
||
if err := writer.Close(); err != nil {
|
||
return classifySMTPError(err)
|
||
}
|
||
if err := client.Quit(); err != nil {
|
||
return classifySMTPError(err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func buildMail(from string, to []string, subject, body string) string {
|
||
headers := []string{
|
||
"From: " + from,
|
||
"To: " + strings.Join(to, ", "),
|
||
"Subject: " + mimeEncodeHeader(subject),
|
||
"MIME-Version: 1.0",
|
||
"Content-Type: text/plain; charset=UTF-8",
|
||
"",
|
||
body,
|
||
}
|
||
return strings.Join(headers, "\r\n")
|
||
}
|
||
|
||
func mimeEncodeHeader(v string) string {
|
||
if isASCII(v) {
|
||
return v
|
||
}
|
||
return "=?UTF-8?B?" + b64Encode(v) + "?="
|
||
}
|
||
|
||
func isASCII(v string) bool {
|
||
for i := 0; i < len(v); i++ {
|
||
if v[i] > 127 {
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
func b64Encode(v string) string {
|
||
return strings.TrimRight(base64Encoding.EncodeToString([]byte(v)), "=")
|
||
}
|
||
|
||
var base64Encoding = base64NoPadding{}
|
||
|
||
type base64NoPadding struct{}
|
||
|
||
func (base64NoPadding) EncodeToString(src []byte) string {
|
||
const table = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
|
||
if len(src) == 0 {
|
||
return ""
|
||
}
|
||
var out strings.Builder
|
||
for i := 0; i < len(src); i += 3 {
|
||
var b0, b1, b2 byte
|
||
b0 = src[i]
|
||
if i+1 < len(src) {
|
||
b1 = src[i+1]
|
||
}
|
||
if i+2 < len(src) {
|
||
b2 = src[i+2]
|
||
}
|
||
out.WriteByte(table[b0>>2])
|
||
out.WriteByte(table[((b0&0x03)<<4)|(b1>>4)])
|
||
if i+1 < len(src) {
|
||
out.WriteByte(table[((b1&0x0f)<<2)|(b2>>6)])
|
||
}
|
||
if i+2 < len(src) {
|
||
out.WriteByte(table[b2&0x3f])
|
||
}
|
||
}
|
||
return out.String()
|
||
}
|
||
|
||
func classifySMTPError(err error) *SendError {
|
||
if err == nil {
|
||
return nil
|
||
}
|
||
msg := strings.ToLower(err.Error())
|
||
if strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout") {
|
||
return &SendError{Code: "SMTP_TIMEOUT", Err: err}
|
||
}
|
||
if strings.Contains(msg, " 4") || strings.HasPrefix(msg, "4") {
|
||
return &SendError{Code: "SMTP_4XX", Err: err}
|
||
}
|
||
if strings.Contains(msg, "rate") || strings.Contains(msg, "limit") {
|
||
return &SendError{Code: "SMTP_RATE_LIMIT", Err: err}
|
||
}
|
||
if strings.Contains(msg, "invalid") || strings.Contains(msg, "mailbox unavailable") || strings.Contains(msg, "user unknown") {
|
||
return &SendError{Code: "SMTP_INVALID_ADDRESS", Err: err}
|
||
}
|
||
var netErr net.Error
|
||
if errors.As(err, &netErr) {
|
||
return &SendError{Code: "SMTP_TEMP_NETWORK", Err: err}
|
||
}
|
||
return &SendError{Code: "SMTP_SEND_FAILED", Err: err}
|
||
}
|
||
|
||
func splitCSV(value string) []string {
|
||
value = strings.TrimSpace(value)
|
||
if value == "" {
|
||
return nil
|
||
}
|
||
items := strings.Split(value, ",")
|
||
out := make([]string, 0, len(items))
|
||
for _, item := range items {
|
||
item = strings.TrimSpace(item)
|
||
if item != "" {
|
||
out = append(out, item)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func parseBoolEnv(name string, fallback bool) bool {
|
||
v := strings.TrimSpace(os.Getenv(name))
|
||
if v == "" {
|
||
return fallback
|
||
}
|
||
b, err := strconv.ParseBool(v)
|
||
if err != nil {
|
||
return fallback
|
||
}
|
||
return b
|
||
}
|
||
|
||
func parseIntEnv(name string, fallback int) int {
|
||
v := strings.TrimSpace(os.Getenv(name))
|
||
if v == "" {
|
||
return fallback
|
||
}
|
||
n, err := strconv.Atoi(v)
|
||
if err != nil {
|
||
return fallback
|
||
}
|
||
return n
|
||
}
|
||
|
||
func getenv(name, fallback string) string {
|
||
v := strings.TrimSpace(os.Getenv(name))
|
||
if v == "" {
|
||
return fallback
|
||
}
|
||
return v
|
||
}
|
||
|
||
func parseRangeTime(fromStr, toStr string) (time.Time, time.Time, error) {
|
||
var from, to time.Time
|
||
var err error
|
||
if strings.TrimSpace(fromStr) != "" {
|
||
from, err = time.Parse(time.RFC3339, fromStr)
|
||
if err != nil {
|
||
return time.Time{}, time.Time{}, err
|
||
}
|
||
}
|
||
if strings.TrimSpace(toStr) != "" {
|
||
to, err = time.Parse(time.RFC3339, toStr)
|
||
if err != nil {
|
||
return time.Time{}, time.Time{}, err
|
||
}
|
||
}
|
||
return from, to, nil
|
||
}
|
||
|
||
func NullStringFromString(v string) sql.NullString {
|
||
v = strings.TrimSpace(v)
|
||
if v == "" {
|
||
return sql.NullString{}
|
||
}
|
||
return sql.NullString{String: v, Valid: true}
|
||
}
|