From 58e228b0dce2abe1f82ed131cd6a6c8ce0d3a8a6 Mon Sep 17 00:00:00 2001 From: wolves Date: Tue, 3 Mar 2026 01:00:00 +0800 Subject: [PATCH] fix(notification): dedupe due/overdue events to prevent resend on scan --- internal/notification/service.go | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/internal/notification/service.go b/internal/notification/service.go index 04524f2..ca2ef27 100644 --- a/internal/notification/service.go +++ b/internal/notification/service.go @@ -476,8 +476,7 @@ func (s *Service) PublishTaskEvent(ctx context.Context, eventType string, task T traceID = eventID } if eventType == "task.overdue" { - bucket := time.Now().UTC().Unix() / int64(s.cfg.OverdueThrottle/time.Second) - eventID = fmt.Sprintf("task-overdue:%d:%d", task.ID, bucket) + eventID = fmt.Sprintf("task-overdue:%d:%s", task.ID, task.DueAt) traceID = eventID } payload := map[string]any{ @@ -681,14 +680,40 @@ func (s *Service) scheduleDueEvents(ctx context.Context) { continue } if dueAt.After(now) && dueAt.Before(now.Add(s.cfg.DueSoonWindow)) { + if s.hasExistingTaskEvent(ctx, task.ID, "task.due_soon", task.DueAt) { + continue + } _ = s.PublishTaskEvent(ctx, "task.due_soon", task) } if dueAt.Before(now) { + if s.hasExistingTaskEvent(ctx, task.ID, "task.overdue", task.DueAt) { + continue + } _ = s.PublishTaskEvent(ctx, "task.overdue", task) } } } +func (s *Service) hasExistingTaskEvent(ctx context.Context, taskID int64, eventType, dueAt string) bool { + var exists bool + err := s.pool.QueryRow(ctx, ` + SELECT EXISTS ( + SELECT 1 + FROM notification_events + WHERE event_type = $1 + AND payload->>'task_id' = $2 + AND payload->>'due_at' = $3 + )`, + eventType, + strconv.FormatInt(taskID, 10), + dueAt, + ).Scan(&exists) + if err != nil { + return false + } + return exists +} + func (s *Service) runDailySummaryScheduler(ctx context.Context) { ticker := time.NewTicker(s.cfg.SchedulerTick) defer ticker.Stop()