From 7e97aaa7fca9f381988e4db58f3b4a2896694e2d Mon Sep 17 00:00:00 2001 From: wolves Date: Sun, 1 Mar 2026 22:28:51 +0800 Subject: [PATCH] feat(notification): add scheduled email pipeline with prefs/groups and DLQ UI --- cmd/server/main.go | 387 +++++- docker-compose.yml | 14 + internal/notification/service.go | 1569 ++++++++++++++++++++++ web/src/App.vue | 2 + web/src/components/TodoEditorModal.vue | 21 +- web/src/i18n.ts | 70 +- web/src/router.ts | 12 + web/src/services/api.ts | 41 +- web/src/types.ts | 28 + web/src/views/NotificationDlqView.vue | 74 + web/src/views/NotificationGroupsView.vue | 134 ++ web/src/views/TodoView.vue | 66 +- web/src/views/UserSettingsView.vue | 86 +- 13 files changed, 2416 insertions(+), 88 deletions(-) create mode 100644 internal/notification/service.go create mode 100644 web/src/views/NotificationDlqView.vue create mode 100644 web/src/views/NotificationGroupsView.vue diff --git a/cmd/server/main.go b/cmd/server/main.go index c104bfb..ebc2ebd 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -12,6 +12,7 @@ import ( "time" "wolves.top/todo/internal/iam" + "wolves.top/todo/internal/notification" "github.com/gin-gonic/gin" "github.com/jackc/pgconn" @@ -21,15 +22,16 @@ import ( ) type Task struct { - ID int64 `json:"id"` - Title string `json:"title"` - Description string `json:"description"` - Status string `json:"status"` - DueAt string `json:"due_at"` - Priority int `json:"priority"` - Tags []string `json:"tags"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID int64 `json:"id"` + Title string `json:"title"` + Description string `json:"description"` + Status string `json:"status"` + DueAt string `json:"due_at"` + Priority int `json:"priority"` + Tags []string `json:"tags"` + NotifyGroupIDs []int64 `json:"notify_group_ids"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } type postgresStore struct { @@ -79,9 +81,85 @@ func (s *postgresStore) initSchema(ctx context.Context) error { due_at TEXT, priority INT NOT NULL DEFAULT 0, tags TEXT[] NOT NULL DEFAULT '{}', + notify_group_ids BIGINT[] NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL )`, + `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS notify_group_ids BIGINT[] NOT NULL DEFAULT '{}'`, + + `CREATE TABLE IF NOT EXISTS user_notification_prefs ( + user_id BIGINT PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE, + subscribed BOOLEAN NOT NULL DEFAULT true, + dnd_start TIME NULL, + dnd_end TIME NULL, + locale TEXT NOT NULL DEFAULT 'zh', + timezone TEXT NOT NULL DEFAULT 'Asia/Shanghai', + daily_summary_enabled BOOLEAN NOT NULL DEFAULT true, + daily_summary_time TIME NOT NULL DEFAULT '09:30:00', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + )`, + `CREATE TABLE IF NOT EXISTS notification_groups ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + name TEXT NOT NULL, + emails TEXT[] NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (user_id, name) + )`, + `CREATE TABLE IF NOT EXISTS task_notification_groups ( + task_id BIGINT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + group_id BIGINT NOT NULL REFERENCES notification_groups(id) ON DELETE CASCADE, + PRIMARY KEY (task_id, group_id) + )`, + `CREATE TABLE IF NOT EXISTS notification_events ( + event_id TEXT PRIMARY KEY, + event_type TEXT NOT NULL, + user_id BIGINT NOT NULL, + occurred_at TIMESTAMPTZ NOT NULL, + payload JSONB NOT NULL, + trace_id TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() + )`, + `CREATE TABLE IF NOT EXISTS notification_jobs ( + job_id TEXT PRIMARY KEY, + event_id TEXT NOT NULL REFERENCES notification_events(event_id), + trace_id TEXT NOT NULL, + user_id BIGINT NOT NULL, + channel TEXT NOT NULL DEFAULT 'email', + to_emails TEXT[] NOT NULL, + template_id TEXT NOT NULL, + params JSONB NOT NULL, + idempotency_key TEXT NOT NULL UNIQUE, + status TEXT NOT NULL, + scheduled_at TIMESTAMPTZ NOT NULL, + available_at TIMESTAMPTZ NOT NULL, + retry_count INT NOT NULL DEFAULT 0, + max_retry INT NOT NULL DEFAULT 5, + last_error_code TEXT NULL, + last_error_message TEXT NULL, + sent_at TIMESTAMPTZ NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + )`, + `CREATE INDEX IF NOT EXISTS idx_notification_jobs_status_available ON notification_jobs(status, available_at)`, + `CREATE INDEX IF NOT EXISTS idx_notification_jobs_scheduled ON notification_jobs(scheduled_at)`, + `CREATE TABLE IF NOT EXISTS notification_attempts ( + id BIGSERIAL PRIMARY KEY, + job_id TEXT NOT NULL REFERENCES notification_jobs(job_id) ON DELETE CASCADE, + attempt_no INT NOT NULL, + success BOOLEAN NOT NULL, + error_code TEXT NULL, + error_message TEXT NULL, + latency_ms INT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() + )`, + `CREATE TABLE IF NOT EXISTS notification_dlq ( + job_id TEXT PRIMARY KEY REFERENCES notification_jobs(job_id), + reason TEXT NOT NULL, + failed_at TIMESTAMPTZ NOT NULL, + snapshot JSONB NOT NULL + )`, } for _, stmt := range statements { if _, err := s.pool.Exec(ctx, stmt); err != nil { @@ -92,7 +170,7 @@ func (s *postgresStore) initSchema(ctx context.Context) error { } func (s *postgresStore) List(ctx context.Context, userID int64) ([]Task, error) { - rows, err := s.pool.Query(ctx, `SELECT id, title, description, status, due_at, priority, tags, created_at, updated_at FROM tasks WHERE user_id = $1 ORDER BY id DESC`, userID) + rows, err := s.pool.Query(ctx, `SELECT id, title, description, status, due_at, priority, tags, notify_group_ids, created_at, updated_at FROM tasks WHERE user_id = $1 ORDER BY id DESC`, userID) if err != nil { return nil, err } @@ -101,10 +179,12 @@ func (s *postgresStore) List(ctx context.Context, userID int64) ([]Task, error) for rows.Next() { var task Task var tags []string - if err := rows.Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, &task.CreatedAt, &task.UpdatedAt); err != nil { + var notifyGroupIDs []int64 + if err := rows.Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, ¬ifyGroupIDs, &task.CreatedAt, &task.UpdatedAt); err != nil { return nil, err } task.Tags = tags + task.NotifyGroupIDs = notifyGroupIDs result = append(result, task) } return result, rows.Err() @@ -113,12 +193,14 @@ func (s *postgresStore) List(ctx context.Context, userID int64) ([]Task, error) func (s *postgresStore) Get(ctx context.Context, userID, id int64) (Task, error) { var task Task var tags []string - err := s.pool.QueryRow(ctx, `SELECT id, title, description, status, due_at, priority, tags, created_at, updated_at FROM tasks WHERE user_id = $1 AND id = $2`, userID, id). - Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, &task.CreatedAt, &task.UpdatedAt) + var notifyGroupIDs []int64 + err := s.pool.QueryRow(ctx, `SELECT id, title, description, status, due_at, priority, tags, notify_group_ids, created_at, updated_at FROM tasks WHERE user_id = $1 AND id = $2`, userID, id). + Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, ¬ifyGroupIDs, &task.CreatedAt, &task.UpdatedAt) if err != nil { return Task{}, err } task.Tags = tags + task.NotifyGroupIDs = notifyGroupIDs return task, nil } @@ -130,15 +212,20 @@ func (s *postgresStore) Create(ctx context.Context, userID int64, input Task) (T if input.Tags != nil { tags = input.Tags } + groupIDs := dedupeInt64(input.NotifyGroupIDs) err := s.pool.QueryRow(ctx, ` - INSERT INTO tasks (user_id, title, description, status, due_at, priority, tags, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, now(), now()) + INSERT INTO tasks (user_id, title, description, status, due_at, priority, tags, notify_group_ids, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now(), now()) RETURNING id, created_at, updated_at`, - userID, input.Title, input.Description, input.Status, input.DueAt, input.Priority, tags, + userID, input.Title, input.Description, input.Status, input.DueAt, input.Priority, tags, groupIDs, ).Scan(&input.ID, &input.CreatedAt, &input.UpdatedAt) if err != nil { return Task{}, err } + input.NotifyGroupIDs = groupIDs + if err := s.syncTaskNotificationGroups(ctx, userID, input.ID, groupIDs); err != nil { + return Task{}, err + } return input, nil } @@ -165,21 +252,65 @@ func (s *postgresStore) Update(ctx context.Context, userID, id int64, input Task if input.Tags != nil { existing.Tags = input.Tags } + if input.NotifyGroupIDs != nil { + existing.NotifyGroupIDs = dedupeInt64(input.NotifyGroupIDs) + } var updatedAt time.Time err = s.pool.QueryRow(ctx, ` UPDATE tasks - SET title = $1, description = $2, status = $3, due_at = $4, priority = $5, tags = $6, updated_at = now() - WHERE id = $7 AND user_id = $8 + SET title = $1, description = $2, status = $3, due_at = $4, priority = $5, tags = $6, notify_group_ids = $7, updated_at = now() + WHERE id = $8 AND user_id = $9 RETURNING updated_at`, - existing.Title, existing.Description, existing.Status, existing.DueAt, existing.Priority, existing.Tags, id, userID, + existing.Title, existing.Description, existing.Status, existing.DueAt, existing.Priority, existing.Tags, existing.NotifyGroupIDs, id, userID, ).Scan(&updatedAt) if err != nil { return Task{}, err } + if err := s.syncTaskNotificationGroups(ctx, userID, id, existing.NotifyGroupIDs); err != nil { + return Task{}, err + } existing.UpdatedAt = updatedAt return existing, nil } +func (s *postgresStore) syncTaskNotificationGroups(ctx context.Context, userID, taskID int64, groupIDs []int64) error { + groupIDs = dedupeInt64(groupIDs) + tx, err := s.pool.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + if _, err := tx.Exec(ctx, `DELETE FROM task_notification_groups WHERE task_id = $1`, taskID); err != nil { + return err + } + if len(groupIDs) == 0 { + return tx.Commit(ctx) + } + + rows, err := tx.Query(ctx, `SELECT id FROM notification_groups WHERE user_id = $1 AND id = ANY($2)`, userID, groupIDs) + if err != nil { + return err + } + valid := make([]int64, 0, len(groupIDs)) + for rows.Next() { + var id int64 + if err := rows.Scan(&id); err != nil { + rows.Close() + return err + } + valid = append(valid, id) + } + rows.Close() + + for _, groupID := range valid { + if _, err := tx.Exec(ctx, `INSERT INTO task_notification_groups (task_id, group_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, taskID, groupID); err != nil { + return err + } + } + return tx.Commit(ctx) +} + func (s *postgresStore) Delete(ctx context.Context, userID, id int64) error { result, err := s.pool.Exec(ctx, `DELETE FROM tasks WHERE id = $1 AND user_id = $2`, id, userID) if err != nil { @@ -218,12 +349,13 @@ func (e *taskEmitter) Emit(ctx context.Context, eventType string, task Task, use return } payload := map[string]any{ - "type": eventType, - "task_id": task.ID, - "user_id": userID, - "status": task.Status, - "priority": task.Priority, - "at": time.Now().UTC().Format(time.RFC3339), + "type": eventType, + "task_id": task.ID, + "user_id": userID, + "status": task.Status, + "priority": task.Priority, + "notify_group_ids": task.NotifyGroupIDs, + "at": time.Now().UTC().Format(time.RFC3339), } data, err := json.Marshal(payload) if err != nil { @@ -254,10 +386,15 @@ func isUniqueViolation(err error) bool { } func main() { - store, iamSvc, emitter := buildDependencies() + store, iamSvc, emitter, notifySvc := buildDependencies() defer store.Close() defer iamSvc.Close() defer emitter.Close() + defer notifySvc.Close() + + appCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + notifySvc.Start(appCtx) gin.SetMode(gin.DebugMode) router := gin.Default() @@ -299,6 +436,7 @@ func main() { return } emitter.Emit(c.Request.Context(), "task.created", created, userID) + _ = notifySvc.PublishTaskEvent(c.Request.Context(), "task.created", toTaskSnapshot(userID, created)) c.JSON(http.StatusCreated, created) }) tasks.GET(":id", func(c *gin.Context) { @@ -331,6 +469,15 @@ func main() { return } userID := c.GetInt64(iam.ContextUserIDKey) + before, err := store.Get(c.Request.Context(), userID, id) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load task"}) + return + } updated, err := store.Update(c.Request.Context(), userID, id, input) if err != nil { if errors.Is(err, pgx.ErrNoRows) { @@ -340,7 +487,12 @@ func main() { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update task"}) return } - emitter.Emit(c.Request.Context(), "task.updated", updated, userID) + eventType := "task.updated" + if before.Status != updated.Status { + eventType = "task.status_changed" + } + emitter.Emit(c.Request.Context(), eventType, updated, userID) + _ = notifySvc.PublishTaskEvent(c.Request.Context(), eventType, toTaskSnapshot(userID, updated)) c.JSON(http.StatusOK, updated) }) tasks.DELETE(":id", func(c *gin.Context) { @@ -363,12 +515,134 @@ func main() { }) } + notifications := api.Group("/notifications") + { + notifications.GET("/prefs", func(c *gin.Context) { + userID := c.GetInt64(iam.ContextUserIDKey) + prefs, err := notifySvc.GetPrefs(c.Request.Context(), userID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load prefs"}) + return + } + c.JSON(http.StatusOK, prefs) + }) + notifications.PUT("/prefs", func(c *gin.Context) { + var input notification.UserNotificationPrefs + if err := c.ShouldBindJSON(&input); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + userID := c.GetInt64(iam.ContextUserIDKey) + prefs, err := notifySvc.UpdatePrefs(c.Request.Context(), userID, input) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, prefs) + }) + notifications.GET("/groups", func(c *gin.Context) { + userID := c.GetInt64(iam.ContextUserIDKey) + groups, err := notifySvc.ListGroups(c.Request.Context(), userID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list groups"}) + return + } + c.JSON(http.StatusOK, groups) + }) + notifications.POST("/groups", func(c *gin.Context) { + var input notification.NotificationGroup + if err := c.ShouldBindJSON(&input); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + userID := c.GetInt64(iam.ContextUserIDKey) + group, err := notifySvc.CreateGroup(c.Request.Context(), userID, input) + if err != nil { + status := http.StatusBadRequest + if isUniqueViolation(err) { + status = http.StatusConflict + } + c.JSON(status, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusCreated, group) + }) + notifications.PUT("/groups/:id", func(c *gin.Context) { + id, err := parseID(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) + return + } + var input notification.NotificationGroup + if err := c.ShouldBindJSON(&input); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + userID := c.GetInt64(iam.ContextUserIDKey) + group, err := notifySvc.UpdateGroup(c.Request.Context(), userID, id, input) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) + return + } + status := http.StatusBadRequest + if isUniqueViolation(err) { + status = http.StatusConflict + } + c.JSON(status, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, group) + }) + notifications.DELETE("/groups/:id", func(c *gin.Context) { + id, err := parseID(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) + return + } + userID := c.GetInt64(iam.ContextUserIDKey) + if err := notifySvc.DeleteGroup(c.Request.Context(), userID, id); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete"}) + return + } + c.Status(http.StatusNoContent) + }) + notifications.GET("/dlq", func(c *gin.Context) { + page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) + pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "20")) + userID := c.GetInt64(iam.ContextUserIDKey) + items, err := notifySvc.ListDLQ(c.Request.Context(), userID, page, pageSize) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load dlq"}) + return + } + c.JSON(http.StatusOK, items) + }) + notifications.GET("/metrics", func(c *gin.Context) { + from, to, err := parseMetricsRange(c.Query("from"), c.Query("to")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid from/to"}) + return + } + metrics, err := notifySvc.GetMetrics(c.Request.Context(), from, to) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load metrics"}) + return + } + c.JSON(http.StatusOK, metrics) + }) + } + if err := router.Run(":8080"); err != nil { panic(err) } } -func buildDependencies() (*postgresStore, *iam.Service, *taskEmitter) { +func buildDependencies() (*postgresStore, *iam.Service, *taskEmitter, *notification.Service) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -392,7 +666,43 @@ func buildDependencies() (*postgresStore, *iam.Service, *taskEmitter) { topic = "todo.tasks" } emitter := newTaskEmitter(brokers, topic) - return store, iamSvc, emitter + + notifyCfg := notification.LoadConfigFromEnv() + notifySvc := notification.NewService(store.Pool(), notifyCfg) + + return store, iamSvc, emitter, notifySvc +} + +func toTaskSnapshot(userID int64, task Task) notification.TaskSnapshot { + return notification.TaskSnapshot{ + ID: task.ID, + UserID: userID, + Title: task.Title, + Description: task.Description, + Status: task.Status, + DueAt: task.DueAt, + Priority: task.Priority, + Tags: task.Tags, + NotifyGroupIDs: task.NotifyGroupIDs, + } +} + +func parseMetricsRange(fromStr, toStr string) (time.Time, time.Time, error) { + var from, to time.Time + var err error + if strings.TrimSpace(fromStr) != "" { + from, err = time.Parse(time.RFC3339, fromStr) + if err != nil { + return time.Time{}, time.Time{}, err + } + } + if strings.TrimSpace(toStr) != "" { + to, err = time.Parse(time.RFC3339, toStr) + if err != nil { + return time.Time{}, time.Time{}, err + } + } + return from, to, nil } func parseID(value string) (int64, error) { @@ -414,6 +724,25 @@ func splitCSV(value string) []string { return result } +func dedupeInt64(in []int64) []int64 { + if len(in) == 0 { + return in + } + seen := map[int64]struct{}{} + out := make([]int64, 0, len(in)) + for _, id := range in { + if id <= 0 { + continue + } + if _, ok := seen[id]; ok { + continue + } + seen[id] = struct{}{} + out = append(out, id) + } + return out +} + func corsMiddleware() gin.HandlerFunc { return func(c *gin.Context) { origin := strings.TrimSpace(c.GetHeader("Origin")) diff --git a/docker-compose.yml b/docker-compose.yml index f901d15..bd5c0ab 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -71,6 +71,20 @@ services: REDIS_ADDR: redis:6379 KAFKA_BROKERS: kafka:9092 KAFKA_TOPIC: todo.tasks + NOTIFY_ENABLED: "true" + NOTIFY_TOPIC: notification.jobs + NOTIFY_DISPATCH_BATCH: "200" + NOTIFY_SCHED_TICK_SECONDS: "60" + NOTIFY_MAX_RETRY: "5" + NOTIFY_BACKOFF_BASE_SECONDS: "30" + SMTP_HOST: smtp.qq.com + SMTP_PORT: "465" + SMTP_USER: 2914037183@qq.com + SMTP_PASS: daxnelwgfaraddbi + SMTP_FROM: 2914037183@qq.com + SMTP_USE_TLS: "true" + GOPROXY: https://goproxy.cn,direct + GOSUMDB: "off" AUTH_SECRET: dev-secret-change-me depends_on: - postgres diff --git a/internal/notification/service.go b/internal/notification/service.go new file mode 100644 index 0000000..04524f2 --- /dev/null +++ b/internal/notification/service.go @@ -0,0 +1,1569 @@ +package notification + +import ( + "context" + "crypto/rand" + "crypto/sha1" + "crypto/tls" + "database/sql" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "log" + "math" + "net" + "net/mail" + "net/smtp" + "os" + "sort" + "strconv" + "strings" + "sync" + "text/template" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/segmentio/kafka-go" +) + +const ( + statusPending = "pending" + statusSent = "sent" + statusFailed = "failed" + statusDead = "dead" +) + +type Config struct { + Enabled bool + KafkaBrokers []string + NotifyTopic string + DispatchBatch int + SchedulerTick time.Duration + MaxRetry int + BackoffBase time.Duration + DispatchLease time.Duration + DueSoonWindow time.Duration + OverdueThrottle time.Duration + AlertFailureThreshold float64 + AlertPendingThreshold int + AlertDLQThreshold int + ConsumerGroup string + SMTPHost string + SMTPPort string + SMTPUser string + SMTPPass string + SMTPFrom string + SMTPUseTLS bool + DailySummaryDefaultLoc string +} + +func LoadConfigFromEnv() Config { + cfg := Config{ + Enabled: parseBoolEnv("NOTIFY_ENABLED", true), + KafkaBrokers: splitCSV(os.Getenv("KAFKA_BROKERS")), + NotifyTopic: getenv("NOTIFY_TOPIC", "notification.jobs"), + DispatchBatch: parseIntEnv("NOTIFY_DISPATCH_BATCH", 200), + SchedulerTick: time.Duration(parseIntEnv("NOTIFY_SCHED_TICK_SECONDS", 60)) * time.Second, + MaxRetry: parseIntEnv("NOTIFY_MAX_RETRY", 5), + BackoffBase: time.Duration(parseIntEnv("NOTIFY_BACKOFF_BASE_SECONDS", 30)) * time.Second, + DispatchLease: 10 * time.Second, + DueSoonWindow: 60 * time.Minute, + OverdueThrottle: 30 * time.Minute, + AlertFailureThreshold: 0.2, + AlertPendingThreshold: 1000, + AlertDLQThreshold: 50, + ConsumerGroup: getenv("NOTIFY_CONSUMER_GROUP", "supertodo-notification-worker"), + SMTPHost: strings.TrimSpace(os.Getenv("SMTP_HOST")), + SMTPPort: getenv("SMTP_PORT", "465"), + SMTPUser: strings.TrimSpace(os.Getenv("SMTP_USER")), + SMTPPass: strings.TrimSpace(os.Getenv("SMTP_PASS")), + SMTPFrom: strings.TrimSpace(os.Getenv("SMTP_FROM")), + SMTPUseTLS: parseBoolEnv("SMTP_USE_TLS", true), + DailySummaryDefaultLoc: getenv("NOTIFY_DEFAULT_LOCALE", "zh"), + } + if cfg.SMTPFrom == "" { + cfg.SMTPFrom = cfg.SMTPUser + } + if cfg.DispatchBatch <= 0 { + cfg.DispatchBatch = 200 + } + if cfg.SchedulerTick <= 0 { + cfg.SchedulerTick = time.Minute + } + if cfg.MaxRetry <= 0 { + cfg.MaxRetry = 5 + } + if cfg.BackoffBase <= 0 { + cfg.BackoffBase = 30 * time.Second + } + return cfg +} + +type Service struct { + pool *pgxpool.Pool + cfg Config + writer *kafka.Writer + reader *kafka.Reader + email *EmailAdapter + + wg sync.WaitGroup +} + +func NewService(pool *pgxpool.Pool, cfg Config) *Service { + s := &Service{pool: pool, cfg: cfg} + if cfg.Enabled && len(cfg.KafkaBrokers) > 0 { + s.writer = &kafka.Writer{ + Addr: kafka.TCP(cfg.KafkaBrokers...), + Topic: cfg.NotifyTopic, + Balancer: &kafka.LeastBytes{}, + } + s.reader = kafka.NewReader(kafka.ReaderConfig{ + Brokers: cfg.KafkaBrokers, + Topic: cfg.NotifyTopic, + GroupID: cfg.ConsumerGroup, + MinBytes: 1, + MaxBytes: 10e6, + }) + } + s.email = NewEmailAdapter(cfg) + return s +} + +func (s *Service) Close() { + if s.writer != nil { + _ = s.writer.Close() + } + if s.reader != nil { + _ = s.reader.Close() + } + s.wg.Wait() +} + +func (s *Service) Start(ctx context.Context) { + if !s.cfg.Enabled { + log.Println("notification service disabled") + return + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.runDueScheduler(ctx) + }() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.runDailySummaryScheduler(ctx) + }() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.runJobDispatcher(ctx) + }() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.runWorker(ctx) + }() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.runAlertChecker(ctx) + }() +} + +type UserNotificationPrefs struct { + Subscribed bool `json:"subscribed"` + DNDStart string `json:"dnd_start"` + DNDEnd string `json:"dnd_end"` + Locale string `json:"locale"` + Timezone string `json:"timezone"` + DailySummaryEnabled bool `json:"daily_summary_enabled"` + DailySummaryTime string `json:"daily_summary_time"` +} + +type NotificationGroup struct { + ID int64 `json:"id"` + Name string `json:"name"` + Emails []string `json:"emails"` +} + +type DLQItem struct { + JobID string `json:"job_id"` + TemplateID string `json:"template_id"` + ToEmails []string `json:"to_emails"` + Reason string `json:"reason"` + FailedAt string `json:"failed_at"` + RetryCount int `json:"retry_count"` + TraceID string `json:"trace_id"` + ErrorCode string `json:"error_code"` +} + +type FailureReason struct { + Code string `json:"code"` + Count int64 `json:"count"` +} + +type Metrics struct { + From string `json:"from"` + To string `json:"to"` + SentCount int64 `json:"sent_count"` + FailedCount int64 `json:"failed_count"` + DeliveryRate float64 `json:"delivery_rate"` + AvgRetryCount float64 `json:"avg_retry_count"` + P95LatencyMS float64 `json:"p95_latency_ms"` + PendingBacklog int64 `json:"pending_backlog"` + FailureReasons []FailureReason `json:"failure_reasons"` + DLQInLast10Mins int64 `json:"dlq_last_10m"` +} + +func (s *Service) GetPrefs(ctx context.Context, userID int64) (UserNotificationPrefs, error) { + if err := s.ensureUserPrefs(ctx, userID); err != nil { + return UserNotificationPrefs{}, err + } + var prefs UserNotificationPrefs + err := s.pool.QueryRow(ctx, ` + SELECT subscribed, + COALESCE(to_char(dnd_start, 'HH24:MI'), ''), + COALESCE(to_char(dnd_end, 'HH24:MI'), ''), + locale, + timezone, + daily_summary_enabled, + COALESCE(to_char(daily_summary_time, 'HH24:MI'), '09:30') + FROM user_notification_prefs + WHERE user_id = $1`, userID, + ).Scan( + &prefs.Subscribed, + &prefs.DNDStart, + &prefs.DNDEnd, + &prefs.Locale, + &prefs.Timezone, + &prefs.DailySummaryEnabled, + &prefs.DailySummaryTime, + ) + if err != nil { + return UserNotificationPrefs{}, err + } + if prefs.Locale == "" { + prefs.Locale = "zh" + } + if prefs.Timezone == "" { + prefs.Timezone = "Asia/Shanghai" + } + if prefs.DailySummaryTime == "" { + prefs.DailySummaryTime = "09:30" + } + return prefs, nil +} + +func (s *Service) UpdatePrefs(ctx context.Context, userID int64, input UserNotificationPrefs) (UserNotificationPrefs, error) { + if err := s.ensureUserPrefs(ctx, userID); err != nil { + return UserNotificationPrefs{}, err + } + input.Locale = normalizeLocale(input.Locale) + input.Timezone = strings.TrimSpace(input.Timezone) + if input.Timezone == "" { + input.Timezone = "Asia/Shanghai" + } + if _, err := time.LoadLocation(input.Timezone); err != nil { + return UserNotificationPrefs{}, fmt.Errorf("invalid timezone") + } + if input.DailySummaryTime == "" { + input.DailySummaryTime = "09:30" + } + if !isValidHHMM(input.DailySummaryTime) { + return UserNotificationPrefs{}, fmt.Errorf("invalid daily_summary_time") + } + if input.DNDStart != "" && !isValidHHMM(input.DNDStart) { + return UserNotificationPrefs{}, fmt.Errorf("invalid dnd_start") + } + if input.DNDEnd != "" && !isValidHHMM(input.DNDEnd) { + return UserNotificationPrefs{}, fmt.Errorf("invalid dnd_end") + } + + _, err := s.pool.Exec(ctx, ` + UPDATE user_notification_prefs + SET subscribed = $2, + dnd_start = NULLIF($3, '')::time, + dnd_end = NULLIF($4, '')::time, + locale = $5, + timezone = $6, + daily_summary_enabled = $7, + daily_summary_time = NULLIF($8, '')::time, + updated_at = now() + WHERE user_id = $1`, + userID, + input.Subscribed, + input.DNDStart, + input.DNDEnd, + input.Locale, + input.Timezone, + input.DailySummaryEnabled, + input.DailySummaryTime, + ) + if err != nil { + return UserNotificationPrefs{}, err + } + return s.GetPrefs(ctx, userID) +} + +func (s *Service) ListGroups(ctx context.Context, userID int64) ([]NotificationGroup, error) { + rows, err := s.pool.Query(ctx, `SELECT id, name, emails FROM notification_groups WHERE user_id = $1 ORDER BY id DESC`, userID) + if err != nil { + return nil, err + } + defer rows.Close() + out := make([]NotificationGroup, 0) + for rows.Next() { + var item NotificationGroup + if err := rows.Scan(&item.ID, &item.Name, &item.Emails); err != nil { + return nil, err + } + out = append(out, item) + } + return out, rows.Err() +} + +func (s *Service) CreateGroup(ctx context.Context, userID int64, group NotificationGroup) (NotificationGroup, error) { + group.Name = strings.TrimSpace(group.Name) + group.Emails = normalizeEmails(group.Emails) + if group.Name == "" { + return NotificationGroup{}, fmt.Errorf("group name required") + } + if len(group.Emails) == 0 { + return NotificationGroup{}, fmt.Errorf("emails required") + } + err := s.pool.QueryRow(ctx, ` + INSERT INTO notification_groups (user_id, name, emails, created_at, updated_at) + VALUES ($1, $2, $3, now(), now()) + RETURNING id, name, emails`, userID, group.Name, group.Emails, + ).Scan(&group.ID, &group.Name, &group.Emails) + if err != nil { + return NotificationGroup{}, err + } + return group, nil +} + +func (s *Service) UpdateGroup(ctx context.Context, userID, id int64, group NotificationGroup) (NotificationGroup, error) { + group.Name = strings.TrimSpace(group.Name) + group.Emails = normalizeEmails(group.Emails) + if group.Name == "" { + return NotificationGroup{}, fmt.Errorf("group name required") + } + if len(group.Emails) == 0 { + return NotificationGroup{}, fmt.Errorf("emails required") + } + err := s.pool.QueryRow(ctx, ` + UPDATE notification_groups + SET name = $3, emails = $4, updated_at = now() + WHERE id = $1 AND user_id = $2 + RETURNING id, name, emails`, id, userID, group.Name, group.Emails, + ).Scan(&group.ID, &group.Name, &group.Emails) + if err != nil { + return NotificationGroup{}, err + } + return group, nil +} + +func (s *Service) DeleteGroup(ctx context.Context, userID, id int64) error { + res, err := s.pool.Exec(ctx, `DELETE FROM notification_groups WHERE id = $1 AND user_id = $2`, id, userID) + if err != nil { + return err + } + if res.RowsAffected() == 0 { + return pgx.ErrNoRows + } + return nil +} + +func (s *Service) ListDLQ(ctx context.Context, userID int64, page, pageSize int) ([]DLQItem, error) { + if page < 1 { + page = 1 + } + if pageSize <= 0 || pageSize > 200 { + pageSize = 20 + } + offset := (page - 1) * pageSize + rows, err := s.pool.Query(ctx, ` + SELECT j.job_id, j.template_id, j.to_emails, d.reason, d.failed_at, j.retry_count, j.trace_id, COALESCE(j.last_error_code, '') + FROM notification_dlq d + JOIN notification_jobs j ON j.job_id = d.job_id + WHERE j.user_id = $1 + ORDER BY d.failed_at DESC + LIMIT $2 OFFSET $3`, userID, pageSize, offset, + ) + if err != nil { + return nil, err + } + defer rows.Close() + out := make([]DLQItem, 0) + for rows.Next() { + var item DLQItem + var failedAt time.Time + if err := rows.Scan(&item.JobID, &item.TemplateID, &item.ToEmails, &item.Reason, &failedAt, &item.RetryCount, &item.TraceID, &item.ErrorCode); err != nil { + return nil, err + } + item.FailedAt = failedAt.UTC().Format(time.RFC3339) + out = append(out, item) + } + return out, rows.Err() +} + +func (s *Service) GetMetrics(ctx context.Context, from, to time.Time) (Metrics, error) { + if from.IsZero() { + from = time.Now().Add(-24 * time.Hour) + } + if to.IsZero() { + to = time.Now() + } + out := Metrics{From: from.UTC().Format(time.RFC3339), To: to.UTC().Format(time.RFC3339)} + + _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE status = 'sent' AND sent_at BETWEEN $1 AND $2`, from, to).Scan(&out.SentCount) + _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE status IN ('failed','dead') AND updated_at BETWEEN $1 AND $2`, from, to).Scan(&out.FailedCount) + _ = s.pool.QueryRow(ctx, `SELECT COALESCE(avg(retry_count), 0) FROM notification_jobs WHERE created_at BETWEEN $1 AND $2`, from, to).Scan(&out.AvgRetryCount) + _ = s.pool.QueryRow(ctx, `SELECT COALESCE(percentile_cont(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM (sent_at - scheduled_at))*1000), 0) FROM notification_jobs WHERE status='sent' AND sent_at BETWEEN $1 AND $2`, from, to).Scan(&out.P95LatencyMS) + _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE status='pending'`).Scan(&out.PendingBacklog) + _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_dlq WHERE failed_at >= now() - interval '10 minutes'`).Scan(&out.DLQInLast10Mins) + + total := out.SentCount + out.FailedCount + if total > 0 { + out.DeliveryRate = float64(out.SentCount) / float64(total) + } + + rows, err := s.pool.Query(ctx, ` + SELECT COALESCE(last_error_code,''), count(*) + FROM notification_jobs + WHERE status IN ('failed','dead') AND updated_at BETWEEN $1 AND $2 + GROUP BY COALESCE(last_error_code,'') + ORDER BY count(*) DESC + LIMIT 10`, from, to, + ) + if err == nil { + defer rows.Close() + for rows.Next() { + var item FailureReason + if err := rows.Scan(&item.Code, &item.Count); err == nil { + out.FailureReasons = append(out.FailureReasons, item) + } + } + } + return out, nil +} + +type TaskSnapshot struct { + ID int64 + UserID int64 + Title string + Description string + Status string + DueAt string + Priority int + Tags []string + NotifyGroupIDs []int64 +} + +func (s *Service) PublishTaskEvent(ctx context.Context, eventType string, task TaskSnapshot) error { + eventID := randomID("evt") + traceID := randomID("trace") + if eventType == "task.due_soon" { + eventID = fmt.Sprintf("task-due-soon:%d:%s", task.ID, task.DueAt) + traceID = eventID + } + if eventType == "task.overdue" { + bucket := time.Now().UTC().Unix() / int64(s.cfg.OverdueThrottle/time.Second) + eventID = fmt.Sprintf("task-overdue:%d:%d", task.ID, bucket) + traceID = eventID + } + payload := map[string]any{ + "task_id": task.ID, + "title": task.Title, + "description": task.Description, + "status": task.Status, + "due_at": task.DueAt, + "priority": task.Priority, + "tags": task.Tags, + "notify_group_ids": task.NotifyGroupIDs, + } + inserted, err := s.insertEvent(ctx, eventID, eventType, task.UserID, traceID, payload) + if err != nil { + return err + } + if !inserted { + return nil + } + return s.evaluateAndCreateJob(ctx, eventID, eventType, traceID, task.UserID, payload) +} + +func (s *Service) PublishDailySummaryEvent(ctx context.Context, userID int64, localDate string, payload map[string]any) error { + eventID := fmt.Sprintf("daily-summary:%d:%s", userID, localDate) + traceID := eventID + inserted, err := s.insertEvent(ctx, eventID, "task.daily_summary", userID, traceID, payload) + if err != nil { + return err + } + if !inserted { + return nil + } + return s.evaluateAndCreateJob(ctx, eventID, "task.daily_summary", traceID, userID, payload) +} + +func (s *Service) insertEvent(ctx context.Context, eventID, eventType string, userID int64, traceID string, payload map[string]any) (bool, error) { + data, _ := json.Marshal(payload) + res, err := s.pool.Exec(ctx, ` + INSERT INTO notification_events (event_id, event_type, user_id, occurred_at, payload, trace_id, created_at) + VALUES ($1, $2, $3, now(), $4, $5, now()) + ON CONFLICT (event_id) DO NOTHING`, eventID, eventType, userID, data, traceID, + ) + if err != nil { + return false, err + } + return res.RowsAffected() > 0, nil +} + +func (s *Service) evaluateAndCreateJob(ctx context.Context, eventID, eventType, traceID string, userID int64, payload map[string]any) error { + prefs, err := s.GetPrefs(ctx, userID) + if err != nil { + return err + } + if !prefs.Subscribed { + return nil + } + + templateID := mapTemplateID(eventType) + if templateID == "" { + return nil + } + + locale := normalizeLocale(prefs.Locale) + tz := prefs.Timezone + if tz == "" { + tz = "Asia/Shanghai" + } + + var recipients []string + if eventType == "task.daily_summary" { + email, err := s.getUserEmail(ctx, userID) + if err != nil { + return nil + } + recipients = []string{email} + } else { + recipients, err = s.resolveRecipientsByPayload(ctx, userID, payload) + if err != nil { + return err + } + } + recipients = normalizeEmails(recipients) + if len(recipients) == 0 { + return nil + } + + now := time.Now().UTC() + scheduledAt := now + if eventType == "task.daily_summary" { + scheduledAt = s.nextDailyTime(now, prefs) + } else { + scheduledAt = applyDND(now, prefs) + } + if scheduledAt.Before(now) { + scheduledAt = now + } + + params := map[string]any{ + "event_type": eventType, + "locale": locale, + "timezone": tz, + "payload": payload, + } + + jobID := randomID("job") + idem := idempotencyKey(eventID, templateID, recipients) + paramsJSON, _ := json.Marshal(params) + + _, err = s.pool.Exec(ctx, ` + INSERT INTO notification_jobs ( + job_id, event_id, trace_id, user_id, channel, to_emails, template_id, params, + idempotency_key, status, scheduled_at, available_at, retry_count, max_retry, created_at, updated_at + ) + VALUES ($1, $2, $3, $4, 'email', $5, $6, $7, $8, 'pending', $9, $9, 0, $10, now(), now()) + ON CONFLICT (idempotency_key) DO NOTHING`, + jobID, eventID, traceID, userID, recipients, templateID, paramsJSON, idem, scheduledAt, s.cfg.MaxRetry, + ) + if err != nil { + return err + } + return nil +} + +func (s *Service) resolveRecipientsByPayload(ctx context.Context, userID int64, payload map[string]any) ([]string, error) { + rawIDs, ok := payload["notify_group_ids"] + if !ok { + return nil, nil + } + groupIDs := parseAnyInt64Slice(rawIDs) + if len(groupIDs) == 0 { + return nil, nil + } + rows, err := s.pool.Query(ctx, `SELECT emails FROM notification_groups WHERE user_id = $1 AND id = ANY($2)`, userID, groupIDs) + if err != nil { + return nil, err + } + defer rows.Close() + emails := make([]string, 0) + for rows.Next() { + var list []string + if err := rows.Scan(&list); err != nil { + return nil, err + } + emails = append(emails, list...) + } + return normalizeEmails(emails), rows.Err() +} + +func (s *Service) getUserEmail(ctx context.Context, userID int64) (string, error) { + var email string + err := s.pool.QueryRow(ctx, `SELECT email FROM users WHERE id = $1`, userID).Scan(&email) + if err != nil { + return "", err + } + return strings.TrimSpace(strings.ToLower(email)), nil +} + +func (s *Service) ensureUserPrefs(ctx context.Context, userID int64) error { + _, err := s.pool.Exec(ctx, ` + INSERT INTO user_notification_prefs ( + user_id, subscribed, locale, timezone, daily_summary_enabled, daily_summary_time, updated_at + ) + VALUES ($1, true, 'zh', 'Asia/Shanghai', true, '09:30:00', now()) + ON CONFLICT (user_id) DO NOTHING`, userID, + ) + return err +} + +func (s *Service) runDueScheduler(ctx context.Context) { + ticker := time.NewTicker(s.cfg.SchedulerTick) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.scheduleDueEvents(ctx) + } + } +} + +func (s *Service) scheduleDueEvents(ctx context.Context) { + rows, err := s.pool.Query(ctx, ` + SELECT id, user_id, title, description, status, due_at, priority, tags, notify_group_ids + FROM tasks + WHERE status <> 'done' AND due_at IS NOT NULL AND due_at <> ''`, + ) + if err != nil { + log.Printf("notification due scheduler query failed: %v", err) + return + } + defer rows.Close() + now := time.Now().UTC() + for rows.Next() { + var task TaskSnapshot + if err := rows.Scan(&task.ID, &task.UserID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &task.Tags, &task.NotifyGroupIDs); err != nil { + continue + } + dueAt, err := time.Parse(time.RFC3339, task.DueAt) + if err != nil { + continue + } + if dueAt.After(now) && dueAt.Before(now.Add(s.cfg.DueSoonWindow)) { + _ = s.PublishTaskEvent(ctx, "task.due_soon", task) + } + if dueAt.Before(now) { + _ = s.PublishTaskEvent(ctx, "task.overdue", task) + } + } +} + +func (s *Service) runDailySummaryScheduler(ctx context.Context) { + ticker := time.NewTicker(s.cfg.SchedulerTick) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.scheduleDailySummary(ctx) + } + } +} + +func (s *Service) scheduleDailySummary(ctx context.Context) { + rows, err := s.pool.Query(ctx, ` + SELECT user_id, + COALESCE(timezone, 'Asia/Shanghai'), + COALESCE(locale, 'zh'), + COALESCE(to_char(daily_summary_time, 'HH24:MI'), '09:30'), + subscribed, + daily_summary_enabled, + COALESCE(to_char(dnd_start, 'HH24:MI'), ''), + COALESCE(to_char(dnd_end, 'HH24:MI'), '') + FROM user_notification_prefs + WHERE daily_summary_enabled = true`, + ) + if err != nil { + log.Printf("daily summary query failed: %v", err) + return + } + defer rows.Close() + nowUTC := time.Now().UTC() + for rows.Next() { + var userID int64 + var timezone, locale, summaryAt, dndStart, dndEnd string + var subscribed, enabled bool + if err := rows.Scan(&userID, &timezone, &locale, &summaryAt, &subscribed, &enabled, &dndStart, &dndEnd); err != nil { + continue + } + if !subscribed || !enabled { + continue + } + loc, err := time.LoadLocation(timezone) + if err != nil { + loc = time.FixedZone("UTC", 0) + } + localNow := nowUTC.In(loc) + if localNow.Format("15:04") != summaryAt { + continue + } + openCount, overdueCount := s.summaryCounts(ctx, userID, nowUTC) + payload := map[string]any{ + "open_count": openCount, + "overdue_count": overdueCount, + "summary_date": localNow.Format("2006-01-02"), + "timezone": timezone, + "locale": locale, + "dnd_start": dndStart, + "dnd_end": dndEnd, + } + _ = s.PublishDailySummaryEvent(ctx, userID, localNow.Format("2006-01-02"), payload) + } +} + +func (s *Service) summaryCounts(ctx context.Context, userID int64, now time.Time) (int64, int64) { + var openCount int64 + _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM tasks WHERE user_id = $1 AND status <> 'done'`, userID).Scan(&openCount) + + var overdueCount int64 + _ = s.pool.QueryRow(ctx, ` + SELECT count(*) + FROM tasks + WHERE user_id = $1 + AND status <> 'done' + AND due_at IS NOT NULL + AND due_at <> '' + AND due_at::timestamptz < $2`, userID, now, + ).Scan(&overdueCount) + return openCount, overdueCount +} + +func (s *Service) runJobDispatcher(ctx context.Context) { + if s.writer == nil { + log.Println("notification dispatcher disabled: kafka writer not configured") + return + } + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.dispatchPendingJobs(ctx) + } + } +} + +type jobRecord struct { + JobID string + TraceID string + Channel string + ToEmails []string + TemplateID string + Params map[string]any + RetryCount int + IdempotencyKey string + ScheduledAt time.Time +} + +func (s *Service) dispatchPendingJobs(ctx context.Context) { + jobs, err := s.reservePendingJobs(ctx, s.cfg.DispatchBatch, s.cfg.DispatchLease) + if err != nil || len(jobs) == 0 { + return + } + messages := make([]kafka.Message, 0, len(jobs)) + for _, job := range jobs { + data, _ := json.Marshal(map[string]any{ + "job_id": job.JobID, + "trace_id": job.TraceID, + "channel": job.Channel, + "to": job.ToEmails, + "template_id": job.TemplateID, + "params": job.Params, + "retry_count": job.RetryCount, + "idempotency_key": job.IdempotencyKey, + "scheduled_at": job.ScheduledAt.UTC().Format(time.RFC3339), + }) + messages = append(messages, kafka.Message{Key: []byte(job.JobID), Value: data}) + } + writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := s.writer.WriteMessages(writeCtx, messages...); err != nil { + _ = s.resetReservedJobs(ctx, jobs) + log.Printf("notification dispatcher publish failed: %v", err) + } +} + +func (s *Service) reservePendingJobs(ctx context.Context, batch int, lease time.Duration) ([]jobRecord, error) { + tx, err := s.pool.Begin(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + rows, err := tx.Query(ctx, ` + WITH picked AS ( + SELECT job_id + FROM notification_jobs + WHERE status = 'pending' AND available_at <= now() + ORDER BY available_at ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE notification_jobs nj + SET available_at = now() + $2::interval, + updated_at = now() + FROM picked + WHERE nj.job_id = picked.job_id + RETURNING nj.job_id, nj.trace_id, nj.channel, nj.to_emails, nj.template_id, nj.params, nj.retry_count, nj.idempotency_key, nj.scheduled_at`, + batch, fmt.Sprintf("%d seconds", int(lease.Seconds())), + ) + if err != nil { + return nil, err + } + defer rows.Close() + jobs := make([]jobRecord, 0) + for rows.Next() { + var item jobRecord + var paramsBytes []byte + if err := rows.Scan(&item.JobID, &item.TraceID, &item.Channel, &item.ToEmails, &item.TemplateID, ¶msBytes, &item.RetryCount, &item.IdempotencyKey, &item.ScheduledAt); err != nil { + return nil, err + } + _ = json.Unmarshal(paramsBytes, &item.Params) + jobs = append(jobs, item) + } + if err := rows.Err(); err != nil { + return nil, err + } + if err := tx.Commit(ctx); err != nil { + return nil, err + } + return jobs, nil +} + +func (s *Service) resetReservedJobs(ctx context.Context, jobs []jobRecord) error { + ids := make([]string, 0, len(jobs)) + for _, job := range jobs { + ids = append(ids, job.JobID) + } + if len(ids) == 0 { + return nil + } + _, err := s.pool.Exec(ctx, `UPDATE notification_jobs SET available_at = now(), updated_at = now() WHERE job_id = ANY($1)`, ids) + return err +} + +func (s *Service) runWorker(ctx context.Context) { + if s.reader == nil { + log.Println("notification worker disabled: kafka reader not configured") + return + } + for { + msg, err := s.reader.FetchMessage(ctx) + if err != nil { + if ctx.Err() != nil { + return + } + log.Printf("notification worker fetch failed: %v", err) + continue + } + if err := s.handleMessage(ctx, msg); err != nil { + log.Printf("notification worker handle failed: %v", err) + continue + } + if err := s.reader.CommitMessages(ctx, msg); err != nil { + log.Printf("notification worker commit failed: %v", err) + } + } +} + +func (s *Service) handleMessage(ctx context.Context, msg kafka.Message) error { + var payload struct { + JobID string `json:"job_id"` + } + if err := json.Unmarshal(msg.Value, &payload); err != nil { + return nil + } + if payload.JobID == "" { + return nil + } + job, err := s.loadJob(ctx, payload.JobID) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil + } + return err + } + if job.Status == statusSent || job.Status == statusDead { + return nil + } + if s.isDuplicateSent(ctx, job.IdempotencyKey, job.JobID) { + _, _ = s.pool.Exec(ctx, `UPDATE notification_jobs SET status='sent', sent_at=now(), updated_at=now() WHERE job_id = $1`, job.JobID) + return nil + } + + start := time.Now() + subject, body, err := s.renderTemplate(job.TemplateID, job.Params) + if err != nil { + s.failJob(ctx, job, "TEMPLATE_RENDER", err.Error(), false) + return nil + } + sendErr := s.email.Send(job.ToEmails, subject, body) + latency := int(time.Since(start).Milliseconds()) + if sendErr == nil { + _, _ = s.pool.Exec(ctx, `UPDATE notification_jobs SET status='sent', sent_at=now(), updated_at=now(), last_error_code=NULL, last_error_message=NULL WHERE job_id = $1`, job.JobID) + _, _ = s.pool.Exec(ctx, `INSERT INTO notification_attempts (job_id, attempt_no, success, latency_ms, created_at) VALUES ($1, $2, true, $3, now())`, job.JobID, job.RetryCount+1, latency) + log.Printf("notification sent trace_id=%s job_id=%s status=sent latency_ms=%d", job.TraceID, job.JobID, latency) + return nil + } + + retryable := isRetryableError(sendErr.Code) + s.failJob(ctx, job, sendErr.Code, sendErr.Error(), retryable) + _, _ = s.pool.Exec(ctx, `INSERT INTO notification_attempts (job_id, attempt_no, success, error_code, error_message, latency_ms, created_at) VALUES ($1, $2, false, $3, $4, $5, now())`, + job.JobID, job.RetryCount+1, sendErr.Code, truncate(sendErr.Error(), 400), latency) + log.Printf("notification failed trace_id=%s job_id=%s status=%s error_code=%s latency_ms=%d", job.TraceID, job.JobID, statusFailed, sendErr.Code, latency) + return nil +} + +type jobRow struct { + JobID string + Status string + TraceID string + ToEmails []string + TemplateID string + Params map[string]any + RetryCount int + MaxRetry int + IdempotencyKey string +} + +func (s *Service) loadJob(ctx context.Context, jobID string) (jobRow, error) { + var out jobRow + var paramsBytes []byte + err := s.pool.QueryRow(ctx, ` + SELECT job_id, status, trace_id, to_emails, template_id, params, retry_count, max_retry, idempotency_key + FROM notification_jobs + WHERE job_id = $1`, jobID, + ).Scan(&out.JobID, &out.Status, &out.TraceID, &out.ToEmails, &out.TemplateID, ¶msBytes, &out.RetryCount, &out.MaxRetry, &out.IdempotencyKey) + if err != nil { + return jobRow{}, err + } + _ = json.Unmarshal(paramsBytes, &out.Params) + return out, nil +} + +func (s *Service) isDuplicateSent(ctx context.Context, idempotencyKey, jobID string) bool { + var count int + _ = s.pool.QueryRow(ctx, `SELECT count(*) FROM notification_jobs WHERE idempotency_key = $1 AND status = 'sent' AND job_id <> $2`, idempotencyKey, jobID).Scan(&count) + return count > 0 +} + +func (s *Service) failJob(ctx context.Context, job jobRow, code, message string, retryable bool) { + if retryable && job.RetryCount < job.MaxRetry { + nextRetry := job.RetryCount + 1 + backoff := s.retryDelay(nextRetry) + _, _ = s.pool.Exec(ctx, ` + UPDATE notification_jobs + SET status = 'pending', + retry_count = $2, + available_at = now() + $3::interval, + last_error_code = $4, + last_error_message = $5, + updated_at = now() + WHERE job_id = $1`, + job.JobID, + nextRetry, + fmt.Sprintf("%d seconds", int(backoff.Seconds())), + code, + truncate(message, 400), + ) + return + } + _, _ = s.pool.Exec(ctx, ` + UPDATE notification_jobs + SET status='dead', + last_error_code=$2, + last_error_message=$3, + updated_at=now() + WHERE job_id=$1`, job.JobID, code, truncate(message, 400), + ) + snapshot, _ := json.Marshal(job) + _, _ = s.pool.Exec(ctx, ` + INSERT INTO notification_dlq (job_id, reason, failed_at, snapshot) + VALUES ($1, $2, now(), $3) + ON CONFLICT (job_id) DO UPDATE SET reason = excluded.reason, failed_at = excluded.failed_at, snapshot = excluded.snapshot`, + job.JobID, + fmt.Sprintf("%s: %s", code, truncate(message, 200)), + snapshot, + ) +} + +func (s *Service) retryDelay(retryCount int) time.Duration { + x := math.Pow(2, float64(retryCount)) + d := time.Duration(x) * s.cfg.BackoffBase + maxBackoff := 30 * time.Minute + if d > maxBackoff { + return maxBackoff + } + return d +} + +func (s *Service) runAlertChecker(ctx context.Context) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + from := time.Now().Add(-10 * time.Minute) + metrics, err := s.GetMetrics(ctx, from, time.Now()) + if err != nil { + continue + } + failureRate := 0.0 + total := metrics.SentCount + metrics.FailedCount + if total > 0 { + failureRate = float64(metrics.FailedCount) / float64(total) + } + if failureRate > s.cfg.AlertFailureThreshold { + log.Printf("[ALERT] notification failure rate spike: %.2f", failureRate) + } + if metrics.PendingBacklog > int64(s.cfg.AlertPendingThreshold) { + log.Printf("[ALERT] notification pending backlog: %d", metrics.PendingBacklog) + } + if metrics.DLQInLast10Mins > int64(s.cfg.AlertDLQThreshold) { + log.Printf("[ALERT] notification DLQ growth: %d in last 10m", metrics.DLQInLast10Mins) + } + } + } +} + +func (s *Service) renderTemplate(templateID string, params map[string]any) (string, string, error) { + locale := normalizeLocale(anyString(params["locale"])) + payload, _ := params["payload"].(map[string]any) + if payload == nil { + payload = map[string]any{} + } + var subjectTpl, bodyTpl string + switch templateID { + case "task_created_notice": + subjectTpl = mapLocale(locale, "新任务创建:{{.title}}", "Task created: {{.title}}") + bodyTpl = mapLocale(locale, "任务 {{.title}} 已创建,优先级 P{{.priority}},截止 {{.due_at}}", "Task {{.title}} created, priority P{{.priority}}, due {{.due_at}}") + case "task_updated_notice": + subjectTpl = mapLocale(locale, "任务更新:{{.title}}", "Task updated: {{.title}}") + bodyTpl = mapLocale(locale, "任务 {{.title}} 已更新,当前状态 {{.status}}", "Task {{.title}} updated, current status {{.status}}") + case "task_status_changed_notice": + subjectTpl = mapLocale(locale, "任务状态变更:{{.title}}", "Task status changed: {{.title}}") + bodyTpl = mapLocale(locale, "任务 {{.title}} 状态已变更为 {{.status}}", "Task {{.title}} status changed to {{.status}}") + case "task_due_soon": + subjectTpl = mapLocale(locale, "任务即将到期:{{.title}}", "Task due soon: {{.title}}") + bodyTpl = mapLocale(locale, "任务 {{.title}} 将在 {{.due_at}} 到期", "Task {{.title}} is due at {{.due_at}}") + case "task_overdue": + subjectTpl = mapLocale(locale, "任务已逾期:{{.title}}", "Task overdue: {{.title}}") + bodyTpl = mapLocale(locale, "任务 {{.title}} 已逾期,截止时间 {{.due_at}}", "Task {{.title}} is overdue, due at {{.due_at}}") + case "daily_summary": + subjectTpl = mapLocale(locale, "每日摘要({{.summary_date}})", "Daily summary ({{.summary_date}})") + bodyTpl = mapLocale(locale, "未完成任务 {{.open_count}},逾期任务 {{.overdue_count}}", "Open tasks {{.open_count}}, overdue tasks {{.overdue_count}}") + default: + return "", "", fmt.Errorf("template not found") + } + render := func(tpl string) (string, error) { + t, err := template.New("msg").Parse(tpl) + if err != nil { + return "", err + } + var b strings.Builder + if err := t.Execute(&b, payload); err != nil { + return "", err + } + return b.String(), nil + } + subject, err := render(subjectTpl) + if err != nil { + return "", "", err + } + body, err := render(bodyTpl) + if err != nil { + return "", "", err + } + return subject, body, nil +} + +func mapTemplateID(eventType string) string { + switch eventType { + case "task.created": + return "task_created_notice" + case "task.updated": + return "task_updated_notice" + case "task.status_changed": + return "task_status_changed_notice" + case "task.due_soon": + return "task_due_soon" + case "task.overdue": + return "task_overdue" + case "task.daily_summary": + return "daily_summary" + default: + return "" + } +} + +func applyDND(now time.Time, prefs UserNotificationPrefs) time.Time { + if prefs.DNDStart == "" || prefs.DNDEnd == "" { + return now + } + loc, err := time.LoadLocation(prefs.Timezone) + if err != nil { + loc = time.UTC + } + localNow := now.In(loc) + start, ok1 := parseClock(localNow, prefs.DNDStart) + end, ok2 := parseClock(localNow, prefs.DNDEnd) + if !ok1 || !ok2 { + return now + } + if start.Equal(end) { + return now + } + if end.After(start) { + if !localNow.Before(start) && localNow.Before(end) { + return end.UTC() + } + return now + } + if !localNow.Before(start) { + return end.Add(24 * time.Hour).UTC() + } + if localNow.Before(end) { + return end.UTC() + } + return now +} + +func (s *Service) nextDailyTime(now time.Time, prefs UserNotificationPrefs) time.Time { + loc, err := time.LoadLocation(prefs.Timezone) + if err != nil { + loc = time.UTC + } + localNow := now.In(loc) + target, ok := parseClock(localNow, prefs.DailySummaryTime) + if !ok { + target = time.Date(localNow.Year(), localNow.Month(), localNow.Day(), 9, 30, 0, 0, loc) + } + if localNow.After(target) { + target = target.Add(24 * time.Hour) + } + shifted := applyDND(target.UTC(), prefs) + return shifted +} + +func parseClock(base time.Time, hhmm string) (time.Time, bool) { + parts := strings.Split(hhmm, ":") + if len(parts) != 2 { + return time.Time{}, false + } + h, err1 := strconv.Atoi(parts[0]) + m, err2 := strconv.Atoi(parts[1]) + if err1 != nil || err2 != nil || h < 0 || h > 23 || m < 0 || m > 59 { + return time.Time{}, false + } + return time.Date(base.Year(), base.Month(), base.Day(), h, m, 0, 0, base.Location()), true +} + +func isValidHHMM(v string) bool { + _, ok := parseClock(time.Now(), v) + return ok +} + +func normalizeLocale(v string) string { + v = strings.TrimSpace(strings.ToLower(v)) + if v != "en" { + return "zh" + } + return v +} + +func normalizeEmails(input []string) []string { + m := map[string]struct{}{} + out := make([]string, 0, len(input)) + for _, e := range input { + e = strings.TrimSpace(strings.ToLower(e)) + if e == "" { + continue + } + if _, err := mail.ParseAddress(e); err != nil { + continue + } + if _, ok := m[e]; ok { + continue + } + m[e] = struct{}{} + out = append(out, e) + } + sort.Strings(out) + return out +} + +func parseAnyInt64Slice(v any) []int64 { + out := make([]int64, 0) + switch items := v.(type) { + case []any: + for _, item := range items { + switch n := item.(type) { + case float64: + out = append(out, int64(n)) + case int64: + out = append(out, n) + case int: + out = append(out, int64(n)) + } + } + case []int64: + out = append(out, items...) + } + return dedupeInt64(out) +} + +func dedupeInt64(in []int64) []int64 { + if len(in) == 0 { + return in + } + m := make(map[int64]struct{}, len(in)) + out := make([]int64, 0, len(in)) + for _, id := range in { + if id <= 0 { + continue + } + if _, ok := m[id]; ok { + continue + } + m[id] = struct{}{} + out = append(out, id) + } + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +func idempotencyKey(eventID, templateID string, recipients []string) string { + h := sha1.New() + _, _ = h.Write([]byte(eventID)) + _, _ = h.Write([]byte("|" + templateID + "|" + strings.Join(recipients, ","))) + return hex.EncodeToString(h.Sum(nil)) +} + +func randomID(prefix string) string { + buf := make([]byte, 12) + if _, err := rand.Read(buf); err != nil { + n := time.Now().UnixNano() + return fmt.Sprintf("%s-%d", prefix, n) + } + return fmt.Sprintf("%s-%s", prefix, hex.EncodeToString(buf)) +} + +func isRetryableError(code string) bool { + switch code { + case "SMTP_TIMEOUT", "SMTP_RATE_LIMIT", "SMTP_TEMP_NETWORK", "SMTP_4XX", "SMTP_SEND_FAILED": + return true + default: + return false + } +} + +func truncate(v string, n int) string { + if len(v) <= n { + return v + } + return v[:n] +} + +func mapLocale(locale, zh, en string) string { + if locale == "en" { + return en + } + return zh +} + +func anyString(v any) string { + if s, ok := v.(string); ok { + return s + } + return "" +} + +type SendError struct { + Code string + Err error +} + +func (e *SendError) Error() string { + if e == nil || e.Err == nil { + return "" + } + return e.Err.Error() +} + +type EmailAdapter struct { + cfg Config +} + +func NewEmailAdapter(cfg Config) *EmailAdapter { + return &EmailAdapter{cfg: cfg} +} + +func (a *EmailAdapter) Send(to []string, subject, body string) *SendError { + if len(to) == 0 { + return &SendError{Code: "SMTP_INVALID_ADDRESS", Err: fmt.Errorf("empty recipient")} + } + for _, item := range to { + if _, err := mail.ParseAddress(item); err != nil { + return &SendError{Code: "SMTP_INVALID_ADDRESS", Err: fmt.Errorf("invalid recipient %q", item)} + } + } + if a.cfg.SMTPHost == "" || a.cfg.SMTPUser == "" || a.cfg.SMTPPass == "" || a.cfg.SMTPFrom == "" { + return &SendError{Code: "SMTP_CONFIG", Err: fmt.Errorf("smtp config incomplete")} + } + addr := net.JoinHostPort(a.cfg.SMTPHost, a.cfg.SMTPPort) + msg := buildMail(a.cfg.SMTPFrom, to, subject, body) + + conn, err := net.DialTimeout("tcp", addr, 8*time.Second) + if err != nil { + return &SendError{Code: "SMTP_TEMP_NETWORK", Err: err} + } + defer conn.Close() + + if a.cfg.SMTPUseTLS { + tlsConn := tls.Client(conn, &tls.Config{ServerName: a.cfg.SMTPHost}) + if err := tlsConn.Handshake(); err != nil { + return &SendError{Code: "SMTP_TIMEOUT", Err: err} + } + conn = tlsConn + } + + client, err := smtp.NewClient(conn, a.cfg.SMTPHost) + if err != nil { + return &SendError{Code: "SMTP_SEND_FAILED", Err: err} + } + defer client.Close() + + if err := client.Auth(smtp.PlainAuth("", a.cfg.SMTPUser, a.cfg.SMTPPass, a.cfg.SMTPHost)); err != nil { + return &SendError{Code: "SMTP_SEND_FAILED", Err: err} + } + if err := client.Mail(a.cfg.SMTPFrom); err != nil { + return classifySMTPError(err) + } + for _, recipient := range to { + if err := client.Rcpt(recipient); err != nil { + return classifySMTPError(err) + } + } + writer, err := client.Data() + if err != nil { + return classifySMTPError(err) + } + if _, err := writer.Write([]byte(msg)); err != nil { + _ = writer.Close() + return classifySMTPError(err) + } + if err := writer.Close(); err != nil { + return classifySMTPError(err) + } + if err := client.Quit(); err != nil { + return classifySMTPError(err) + } + return nil +} + +func buildMail(from string, to []string, subject, body string) string { + headers := []string{ + "From: " + from, + "To: " + strings.Join(to, ", "), + "Subject: " + mimeEncodeHeader(subject), + "MIME-Version: 1.0", + "Content-Type: text/plain; charset=UTF-8", + "", + body, + } + return strings.Join(headers, "\r\n") +} + +func mimeEncodeHeader(v string) string { + if isASCII(v) { + return v + } + return "=?UTF-8?B?" + b64Encode(v) + "?=" +} + +func isASCII(v string) bool { + for i := 0; i < len(v); i++ { + if v[i] > 127 { + return false + } + } + return true +} + +func b64Encode(v string) string { + return strings.TrimRight(base64Encoding.EncodeToString([]byte(v)), "=") +} + +var base64Encoding = base64NoPadding{} + +type base64NoPadding struct{} + +func (base64NoPadding) EncodeToString(src []byte) string { + const table = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" + if len(src) == 0 { + return "" + } + var out strings.Builder + for i := 0; i < len(src); i += 3 { + var b0, b1, b2 byte + b0 = src[i] + if i+1 < len(src) { + b1 = src[i+1] + } + if i+2 < len(src) { + b2 = src[i+2] + } + out.WriteByte(table[b0>>2]) + out.WriteByte(table[((b0&0x03)<<4)|(b1>>4)]) + if i+1 < len(src) { + out.WriteByte(table[((b1&0x0f)<<2)|(b2>>6)]) + } + if i+2 < len(src) { + out.WriteByte(table[b2&0x3f]) + } + } + return out.String() +} + +func classifySMTPError(err error) *SendError { + if err == nil { + return nil + } + msg := strings.ToLower(err.Error()) + if strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout") { + return &SendError{Code: "SMTP_TIMEOUT", Err: err} + } + if strings.Contains(msg, " 4") || strings.HasPrefix(msg, "4") { + return &SendError{Code: "SMTP_4XX", Err: err} + } + if strings.Contains(msg, "rate") || strings.Contains(msg, "limit") { + return &SendError{Code: "SMTP_RATE_LIMIT", Err: err} + } + if strings.Contains(msg, "invalid") || strings.Contains(msg, "mailbox unavailable") || strings.Contains(msg, "user unknown") { + return &SendError{Code: "SMTP_INVALID_ADDRESS", Err: err} + } + var netErr net.Error + if errors.As(err, &netErr) { + return &SendError{Code: "SMTP_TEMP_NETWORK", Err: err} + } + return &SendError{Code: "SMTP_SEND_FAILED", Err: err} +} + +func splitCSV(value string) []string { + value = strings.TrimSpace(value) + if value == "" { + return nil + } + items := strings.Split(value, ",") + out := make([]string, 0, len(items)) + for _, item := range items { + item = strings.TrimSpace(item) + if item != "" { + out = append(out, item) + } + } + return out +} + +func parseBoolEnv(name string, fallback bool) bool { + v := strings.TrimSpace(os.Getenv(name)) + if v == "" { + return fallback + } + b, err := strconv.ParseBool(v) + if err != nil { + return fallback + } + return b +} + +func parseIntEnv(name string, fallback int) int { + v := strings.TrimSpace(os.Getenv(name)) + if v == "" { + return fallback + } + n, err := strconv.Atoi(v) + if err != nil { + return fallback + } + return n +} + +func getenv(name, fallback string) string { + v := strings.TrimSpace(os.Getenv(name)) + if v == "" { + return fallback + } + return v +} + +func parseRangeTime(fromStr, toStr string) (time.Time, time.Time, error) { + var from, to time.Time + var err error + if strings.TrimSpace(fromStr) != "" { + from, err = time.Parse(time.RFC3339, fromStr) + if err != nil { + return time.Time{}, time.Time{}, err + } + } + if strings.TrimSpace(toStr) != "" { + to, err = time.Parse(time.RFC3339, toStr) + if err != nil { + return time.Time{}, time.Time{}, err + } + } + return from, to, nil +} + +func NullStringFromString(v string) sql.NullString { + v = strings.TrimSpace(v) + if v == "" { + return sql.NullString{} + } + return sql.NullString{String: v, Valid: true} +} diff --git a/web/src/App.vue b/web/src/App.vue index 9e8df4b..94d3926 100644 --- a/web/src/App.vue +++ b/web/src/App.vue @@ -33,6 +33,8 @@ async function onLogout() { {{ t('nav_team') }} {{ t('nav_user_settings') }} {{ t('nav_team_settings') }} + {{ t('nav_notification_groups') }} + {{ t('nav_notification_dlq') }}