feat(notification): add scheduled email pipeline with prefs/groups and DLQ UI
This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"wolves.top/todo/internal/iam"
|
||||
"wolves.top/todo/internal/notification"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/jackc/pgconn"
|
||||
@@ -21,15 +22,16 @@ import (
|
||||
)
|
||||
|
||||
type Task struct {
|
||||
ID int64 `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Description string `json:"description"`
|
||||
Status string `json:"status"`
|
||||
DueAt string `json:"due_at"`
|
||||
Priority int `json:"priority"`
|
||||
Tags []string `json:"tags"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
ID int64 `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Description string `json:"description"`
|
||||
Status string `json:"status"`
|
||||
DueAt string `json:"due_at"`
|
||||
Priority int `json:"priority"`
|
||||
Tags []string `json:"tags"`
|
||||
NotifyGroupIDs []int64 `json:"notify_group_ids"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
type postgresStore struct {
|
||||
@@ -79,9 +81,85 @@ func (s *postgresStore) initSchema(ctx context.Context) error {
|
||||
due_at TEXT,
|
||||
priority INT NOT NULL DEFAULT 0,
|
||||
tags TEXT[] NOT NULL DEFAULT '{}',
|
||||
notify_group_ids BIGINT[] NOT NULL DEFAULT '{}',
|
||||
created_at TIMESTAMPTZ NOT NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL
|
||||
)`,
|
||||
`ALTER TABLE tasks ADD COLUMN IF NOT EXISTS notify_group_ids BIGINT[] NOT NULL DEFAULT '{}'`,
|
||||
|
||||
`CREATE TABLE IF NOT EXISTS user_notification_prefs (
|
||||
user_id BIGINT PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
||||
subscribed BOOLEAN NOT NULL DEFAULT true,
|
||||
dnd_start TIME NULL,
|
||||
dnd_end TIME NULL,
|
||||
locale TEXT NOT NULL DEFAULT 'zh',
|
||||
timezone TEXT NOT NULL DEFAULT 'Asia/Shanghai',
|
||||
daily_summary_enabled BOOLEAN NOT NULL DEFAULT true,
|
||||
daily_summary_time TIME NOT NULL DEFAULT '09:30:00',
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS notification_groups (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
name TEXT NOT NULL,
|
||||
emails TEXT[] NOT NULL DEFAULT '{}',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
UNIQUE (user_id, name)
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS task_notification_groups (
|
||||
task_id BIGINT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
|
||||
group_id BIGINT NOT NULL REFERENCES notification_groups(id) ON DELETE CASCADE,
|
||||
PRIMARY KEY (task_id, group_id)
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS notification_events (
|
||||
event_id TEXT PRIMARY KEY,
|
||||
event_type TEXT NOT NULL,
|
||||
user_id BIGINT NOT NULL,
|
||||
occurred_at TIMESTAMPTZ NOT NULL,
|
||||
payload JSONB NOT NULL,
|
||||
trace_id TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS notification_jobs (
|
||||
job_id TEXT PRIMARY KEY,
|
||||
event_id TEXT NOT NULL REFERENCES notification_events(event_id),
|
||||
trace_id TEXT NOT NULL,
|
||||
user_id BIGINT NOT NULL,
|
||||
channel TEXT NOT NULL DEFAULT 'email',
|
||||
to_emails TEXT[] NOT NULL,
|
||||
template_id TEXT NOT NULL,
|
||||
params JSONB NOT NULL,
|
||||
idempotency_key TEXT NOT NULL UNIQUE,
|
||||
status TEXT NOT NULL,
|
||||
scheduled_at TIMESTAMPTZ NOT NULL,
|
||||
available_at TIMESTAMPTZ NOT NULL,
|
||||
retry_count INT NOT NULL DEFAULT 0,
|
||||
max_retry INT NOT NULL DEFAULT 5,
|
||||
last_error_code TEXT NULL,
|
||||
last_error_message TEXT NULL,
|
||||
sent_at TIMESTAMPTZ NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_notification_jobs_status_available ON notification_jobs(status, available_at)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_notification_jobs_scheduled ON notification_jobs(scheduled_at)`,
|
||||
`CREATE TABLE IF NOT EXISTS notification_attempts (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
job_id TEXT NOT NULL REFERENCES notification_jobs(job_id) ON DELETE CASCADE,
|
||||
attempt_no INT NOT NULL,
|
||||
success BOOLEAN NOT NULL,
|
||||
error_code TEXT NULL,
|
||||
error_message TEXT NULL,
|
||||
latency_ms INT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS notification_dlq (
|
||||
job_id TEXT PRIMARY KEY REFERENCES notification_jobs(job_id),
|
||||
reason TEXT NOT NULL,
|
||||
failed_at TIMESTAMPTZ NOT NULL,
|
||||
snapshot JSONB NOT NULL
|
||||
)`,
|
||||
}
|
||||
for _, stmt := range statements {
|
||||
if _, err := s.pool.Exec(ctx, stmt); err != nil {
|
||||
@@ -92,7 +170,7 @@ func (s *postgresStore) initSchema(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (s *postgresStore) List(ctx context.Context, userID int64) ([]Task, error) {
|
||||
rows, err := s.pool.Query(ctx, `SELECT id, title, description, status, due_at, priority, tags, created_at, updated_at FROM tasks WHERE user_id = $1 ORDER BY id DESC`, userID)
|
||||
rows, err := s.pool.Query(ctx, `SELECT id, title, description, status, due_at, priority, tags, notify_group_ids, created_at, updated_at FROM tasks WHERE user_id = $1 ORDER BY id DESC`, userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -101,10 +179,12 @@ func (s *postgresStore) List(ctx context.Context, userID int64) ([]Task, error)
|
||||
for rows.Next() {
|
||||
var task Task
|
||||
var tags []string
|
||||
if err := rows.Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, &task.CreatedAt, &task.UpdatedAt); err != nil {
|
||||
var notifyGroupIDs []int64
|
||||
if err := rows.Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, ¬ifyGroupIDs, &task.CreatedAt, &task.UpdatedAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task.Tags = tags
|
||||
task.NotifyGroupIDs = notifyGroupIDs
|
||||
result = append(result, task)
|
||||
}
|
||||
return result, rows.Err()
|
||||
@@ -113,12 +193,14 @@ func (s *postgresStore) List(ctx context.Context, userID int64) ([]Task, error)
|
||||
func (s *postgresStore) Get(ctx context.Context, userID, id int64) (Task, error) {
|
||||
var task Task
|
||||
var tags []string
|
||||
err := s.pool.QueryRow(ctx, `SELECT id, title, description, status, due_at, priority, tags, created_at, updated_at FROM tasks WHERE user_id = $1 AND id = $2`, userID, id).
|
||||
Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, &task.CreatedAt, &task.UpdatedAt)
|
||||
var notifyGroupIDs []int64
|
||||
err := s.pool.QueryRow(ctx, `SELECT id, title, description, status, due_at, priority, tags, notify_group_ids, created_at, updated_at FROM tasks WHERE user_id = $1 AND id = $2`, userID, id).
|
||||
Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, ¬ifyGroupIDs, &task.CreatedAt, &task.UpdatedAt)
|
||||
if err != nil {
|
||||
return Task{}, err
|
||||
}
|
||||
task.Tags = tags
|
||||
task.NotifyGroupIDs = notifyGroupIDs
|
||||
return task, nil
|
||||
}
|
||||
|
||||
@@ -130,15 +212,20 @@ func (s *postgresStore) Create(ctx context.Context, userID int64, input Task) (T
|
||||
if input.Tags != nil {
|
||||
tags = input.Tags
|
||||
}
|
||||
groupIDs := dedupeInt64(input.NotifyGroupIDs)
|
||||
err := s.pool.QueryRow(ctx, `
|
||||
INSERT INTO tasks (user_id, title, description, status, due_at, priority, tags, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, now(), now())
|
||||
INSERT INTO tasks (user_id, title, description, status, due_at, priority, tags, notify_group_ids, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now(), now())
|
||||
RETURNING id, created_at, updated_at`,
|
||||
userID, input.Title, input.Description, input.Status, input.DueAt, input.Priority, tags,
|
||||
userID, input.Title, input.Description, input.Status, input.DueAt, input.Priority, tags, groupIDs,
|
||||
).Scan(&input.ID, &input.CreatedAt, &input.UpdatedAt)
|
||||
if err != nil {
|
||||
return Task{}, err
|
||||
}
|
||||
input.NotifyGroupIDs = groupIDs
|
||||
if err := s.syncTaskNotificationGroups(ctx, userID, input.ID, groupIDs); err != nil {
|
||||
return Task{}, err
|
||||
}
|
||||
return input, nil
|
||||
}
|
||||
|
||||
@@ -165,21 +252,65 @@ func (s *postgresStore) Update(ctx context.Context, userID, id int64, input Task
|
||||
if input.Tags != nil {
|
||||
existing.Tags = input.Tags
|
||||
}
|
||||
if input.NotifyGroupIDs != nil {
|
||||
existing.NotifyGroupIDs = dedupeInt64(input.NotifyGroupIDs)
|
||||
}
|
||||
var updatedAt time.Time
|
||||
err = s.pool.QueryRow(ctx, `
|
||||
UPDATE tasks
|
||||
SET title = $1, description = $2, status = $3, due_at = $4, priority = $5, tags = $6, updated_at = now()
|
||||
WHERE id = $7 AND user_id = $8
|
||||
SET title = $1, description = $2, status = $3, due_at = $4, priority = $5, tags = $6, notify_group_ids = $7, updated_at = now()
|
||||
WHERE id = $8 AND user_id = $9
|
||||
RETURNING updated_at`,
|
||||
existing.Title, existing.Description, existing.Status, existing.DueAt, existing.Priority, existing.Tags, id, userID,
|
||||
existing.Title, existing.Description, existing.Status, existing.DueAt, existing.Priority, existing.Tags, existing.NotifyGroupIDs, id, userID,
|
||||
).Scan(&updatedAt)
|
||||
if err != nil {
|
||||
return Task{}, err
|
||||
}
|
||||
if err := s.syncTaskNotificationGroups(ctx, userID, id, existing.NotifyGroupIDs); err != nil {
|
||||
return Task{}, err
|
||||
}
|
||||
existing.UpdatedAt = updatedAt
|
||||
return existing, nil
|
||||
}
|
||||
|
||||
func (s *postgresStore) syncTaskNotificationGroups(ctx context.Context, userID, taskID int64, groupIDs []int64) error {
|
||||
groupIDs = dedupeInt64(groupIDs)
|
||||
tx, err := s.pool.Begin(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
|
||||
if _, err := tx.Exec(ctx, `DELETE FROM task_notification_groups WHERE task_id = $1`, taskID); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(groupIDs) == 0 {
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
rows, err := tx.Query(ctx, `SELECT id FROM notification_groups WHERE user_id = $1 AND id = ANY($2)`, userID, groupIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
valid := make([]int64, 0, len(groupIDs))
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
rows.Close()
|
||||
return err
|
||||
}
|
||||
valid = append(valid, id)
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
for _, groupID := range valid {
|
||||
if _, err := tx.Exec(ctx, `INSERT INTO task_notification_groups (task_id, group_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, taskID, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
func (s *postgresStore) Delete(ctx context.Context, userID, id int64) error {
|
||||
result, err := s.pool.Exec(ctx, `DELETE FROM tasks WHERE id = $1 AND user_id = $2`, id, userID)
|
||||
if err != nil {
|
||||
@@ -218,12 +349,13 @@ func (e *taskEmitter) Emit(ctx context.Context, eventType string, task Task, use
|
||||
return
|
||||
}
|
||||
payload := map[string]any{
|
||||
"type": eventType,
|
||||
"task_id": task.ID,
|
||||
"user_id": userID,
|
||||
"status": task.Status,
|
||||
"priority": task.Priority,
|
||||
"at": time.Now().UTC().Format(time.RFC3339),
|
||||
"type": eventType,
|
||||
"task_id": task.ID,
|
||||
"user_id": userID,
|
||||
"status": task.Status,
|
||||
"priority": task.Priority,
|
||||
"notify_group_ids": task.NotifyGroupIDs,
|
||||
"at": time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
@@ -254,10 +386,15 @@ func isUniqueViolation(err error) bool {
|
||||
}
|
||||
|
||||
func main() {
|
||||
store, iamSvc, emitter := buildDependencies()
|
||||
store, iamSvc, emitter, notifySvc := buildDependencies()
|
||||
defer store.Close()
|
||||
defer iamSvc.Close()
|
||||
defer emitter.Close()
|
||||
defer notifySvc.Close()
|
||||
|
||||
appCtx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
notifySvc.Start(appCtx)
|
||||
|
||||
gin.SetMode(gin.DebugMode)
|
||||
router := gin.Default()
|
||||
@@ -299,6 +436,7 @@ func main() {
|
||||
return
|
||||
}
|
||||
emitter.Emit(c.Request.Context(), "task.created", created, userID)
|
||||
_ = notifySvc.PublishTaskEvent(c.Request.Context(), "task.created", toTaskSnapshot(userID, created))
|
||||
c.JSON(http.StatusCreated, created)
|
||||
})
|
||||
tasks.GET(":id", func(c *gin.Context) {
|
||||
@@ -331,6 +469,15 @@ func main() {
|
||||
return
|
||||
}
|
||||
userID := c.GetInt64(iam.ContextUserIDKey)
|
||||
before, err := store.Get(c.Request.Context(), userID, id)
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "not found"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load task"})
|
||||
return
|
||||
}
|
||||
updated, err := store.Update(c.Request.Context(), userID, id, input)
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
@@ -340,7 +487,12 @@ func main() {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update task"})
|
||||
return
|
||||
}
|
||||
emitter.Emit(c.Request.Context(), "task.updated", updated, userID)
|
||||
eventType := "task.updated"
|
||||
if before.Status != updated.Status {
|
||||
eventType = "task.status_changed"
|
||||
}
|
||||
emitter.Emit(c.Request.Context(), eventType, updated, userID)
|
||||
_ = notifySvc.PublishTaskEvent(c.Request.Context(), eventType, toTaskSnapshot(userID, updated))
|
||||
c.JSON(http.StatusOK, updated)
|
||||
})
|
||||
tasks.DELETE(":id", func(c *gin.Context) {
|
||||
@@ -363,12 +515,134 @@ func main() {
|
||||
})
|
||||
}
|
||||
|
||||
notifications := api.Group("/notifications")
|
||||
{
|
||||
notifications.GET("/prefs", func(c *gin.Context) {
|
||||
userID := c.GetInt64(iam.ContextUserIDKey)
|
||||
prefs, err := notifySvc.GetPrefs(c.Request.Context(), userID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load prefs"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, prefs)
|
||||
})
|
||||
notifications.PUT("/prefs", func(c *gin.Context) {
|
||||
var input notification.UserNotificationPrefs
|
||||
if err := c.ShouldBindJSON(&input); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
userID := c.GetInt64(iam.ContextUserIDKey)
|
||||
prefs, err := notifySvc.UpdatePrefs(c.Request.Context(), userID, input)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, prefs)
|
||||
})
|
||||
notifications.GET("/groups", func(c *gin.Context) {
|
||||
userID := c.GetInt64(iam.ContextUserIDKey)
|
||||
groups, err := notifySvc.ListGroups(c.Request.Context(), userID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list groups"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, groups)
|
||||
})
|
||||
notifications.POST("/groups", func(c *gin.Context) {
|
||||
var input notification.NotificationGroup
|
||||
if err := c.ShouldBindJSON(&input); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
userID := c.GetInt64(iam.ContextUserIDKey)
|
||||
group, err := notifySvc.CreateGroup(c.Request.Context(), userID, input)
|
||||
if err != nil {
|
||||
status := http.StatusBadRequest
|
||||
if isUniqueViolation(err) {
|
||||
status = http.StatusConflict
|
||||
}
|
||||
c.JSON(status, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusCreated, group)
|
||||
})
|
||||
notifications.PUT("/groups/:id", func(c *gin.Context) {
|
||||
id, err := parseID(c.Param("id"))
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"})
|
||||
return
|
||||
}
|
||||
var input notification.NotificationGroup
|
||||
if err := c.ShouldBindJSON(&input); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
userID := c.GetInt64(iam.ContextUserIDKey)
|
||||
group, err := notifySvc.UpdateGroup(c.Request.Context(), userID, id, input)
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "not found"})
|
||||
return
|
||||
}
|
||||
status := http.StatusBadRequest
|
||||
if isUniqueViolation(err) {
|
||||
status = http.StatusConflict
|
||||
}
|
||||
c.JSON(status, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, group)
|
||||
})
|
||||
notifications.DELETE("/groups/:id", func(c *gin.Context) {
|
||||
id, err := parseID(c.Param("id"))
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"})
|
||||
return
|
||||
}
|
||||
userID := c.GetInt64(iam.ContextUserIDKey)
|
||||
if err := notifySvc.DeleteGroup(c.Request.Context(), userID, id); err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "not found"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete"})
|
||||
return
|
||||
}
|
||||
c.Status(http.StatusNoContent)
|
||||
})
|
||||
notifications.GET("/dlq", func(c *gin.Context) {
|
||||
page, _ := strconv.Atoi(c.DefaultQuery("page", "1"))
|
||||
pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "20"))
|
||||
userID := c.GetInt64(iam.ContextUserIDKey)
|
||||
items, err := notifySvc.ListDLQ(c.Request.Context(), userID, page, pageSize)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load dlq"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, items)
|
||||
})
|
||||
notifications.GET("/metrics", func(c *gin.Context) {
|
||||
from, to, err := parseMetricsRange(c.Query("from"), c.Query("to"))
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid from/to"})
|
||||
return
|
||||
}
|
||||
metrics, err := notifySvc.GetMetrics(c.Request.Context(), from, to)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load metrics"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, metrics)
|
||||
})
|
||||
}
|
||||
|
||||
if err := router.Run(":8080"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func buildDependencies() (*postgresStore, *iam.Service, *taskEmitter) {
|
||||
func buildDependencies() (*postgresStore, *iam.Service, *taskEmitter, *notification.Service) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -392,7 +666,43 @@ func buildDependencies() (*postgresStore, *iam.Service, *taskEmitter) {
|
||||
topic = "todo.tasks"
|
||||
}
|
||||
emitter := newTaskEmitter(brokers, topic)
|
||||
return store, iamSvc, emitter
|
||||
|
||||
notifyCfg := notification.LoadConfigFromEnv()
|
||||
notifySvc := notification.NewService(store.Pool(), notifyCfg)
|
||||
|
||||
return store, iamSvc, emitter, notifySvc
|
||||
}
|
||||
|
||||
func toTaskSnapshot(userID int64, task Task) notification.TaskSnapshot {
|
||||
return notification.TaskSnapshot{
|
||||
ID: task.ID,
|
||||
UserID: userID,
|
||||
Title: task.Title,
|
||||
Description: task.Description,
|
||||
Status: task.Status,
|
||||
DueAt: task.DueAt,
|
||||
Priority: task.Priority,
|
||||
Tags: task.Tags,
|
||||
NotifyGroupIDs: task.NotifyGroupIDs,
|
||||
}
|
||||
}
|
||||
|
||||
func parseMetricsRange(fromStr, toStr string) (time.Time, time.Time, error) {
|
||||
var from, to time.Time
|
||||
var err error
|
||||
if strings.TrimSpace(fromStr) != "" {
|
||||
from, err = time.Parse(time.RFC3339, fromStr)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, err
|
||||
}
|
||||
}
|
||||
if strings.TrimSpace(toStr) != "" {
|
||||
to, err = time.Parse(time.RFC3339, toStr)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, err
|
||||
}
|
||||
}
|
||||
return from, to, nil
|
||||
}
|
||||
|
||||
func parseID(value string) (int64, error) {
|
||||
@@ -414,6 +724,25 @@ func splitCSV(value string) []string {
|
||||
return result
|
||||
}
|
||||
|
||||
func dedupeInt64(in []int64) []int64 {
|
||||
if len(in) == 0 {
|
||||
return in
|
||||
}
|
||||
seen := map[int64]struct{}{}
|
||||
out := make([]int64, 0, len(in))
|
||||
for _, id := range in {
|
||||
if id <= 0 {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[id]; ok {
|
||||
continue
|
||||
}
|
||||
seen[id] = struct{}{}
|
||||
out = append(out, id)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func corsMiddleware() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
origin := strings.TrimSpace(c.GetHeader("Origin"))
|
||||
|
||||
Reference in New Issue
Block a user