package main import ( "context" "encoding/json" "errors" "log" "net/http" "os" "strconv" "strings" "time" "wolves.top/todo/internal/iam" "wolves.top/todo/internal/notification" "github.com/gin-gonic/gin" "github.com/jackc/pgconn" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/segmentio/kafka-go" ) type Task struct { ID int64 `json:"id"` Title string `json:"title"` Description string `json:"description"` Status string `json:"status"` DueAt string `json:"due_at"` Priority int `json:"priority"` Tags []string `json:"tags"` NotifyGroupIDs []int64 `json:"notify_group_ids"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } type postgresStore struct { pool *pgxpool.Pool } type taskEmitter struct { writer *kafka.Writer topic string } var ( errAlreadyExists = errors.New("already exists") ) func newPostgresStore(ctx context.Context, url string) (*postgresStore, error) { pool, err := pgxpool.New(ctx, url) if err != nil { return nil, err } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, err } store := &postgresStore{pool: pool} if err := store.initSchema(ctx); err != nil { pool.Close() return nil, err } return store, nil } func (s *postgresStore) initSchema(ctx context.Context) error { statements := []string{ `CREATE TABLE IF NOT EXISTS users ( id BIGSERIAL PRIMARY KEY, email TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() )`, `CREATE TABLE IF NOT EXISTS tasks ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, title TEXT NOT NULL, description TEXT, status TEXT NOT NULL, due_at TEXT, priority INT NOT NULL DEFAULT 0, tags TEXT[] NOT NULL DEFAULT '{}', notify_group_ids BIGINT[] NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL )`, `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS notify_group_ids BIGINT[] NOT NULL DEFAULT '{}'`, `CREATE TABLE IF NOT EXISTS user_notification_prefs ( user_id BIGINT PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE, subscribed BOOLEAN NOT NULL DEFAULT true, dnd_start TIME NULL, dnd_end TIME NULL, locale TEXT NOT NULL DEFAULT 'zh', timezone TEXT NOT NULL DEFAULT 'Asia/Shanghai', daily_summary_enabled BOOLEAN NOT NULL DEFAULT true, daily_summary_time TIME NOT NULL DEFAULT '09:30:00', updated_at TIMESTAMPTZ NOT NULL DEFAULT now() )`, `CREATE TABLE IF NOT EXISTS notification_groups ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, name TEXT NOT NULL, emails TEXT[] NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (user_id, name) )`, `CREATE TABLE IF NOT EXISTS task_notification_groups ( task_id BIGINT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, group_id BIGINT NOT NULL REFERENCES notification_groups(id) ON DELETE CASCADE, PRIMARY KEY (task_id, group_id) )`, `CREATE TABLE IF NOT EXISTS notification_events ( event_id TEXT PRIMARY KEY, event_type TEXT NOT NULL, user_id BIGINT NOT NULL, occurred_at TIMESTAMPTZ NOT NULL, payload JSONB NOT NULL, trace_id TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() )`, `CREATE TABLE IF NOT EXISTS notification_jobs ( job_id TEXT PRIMARY KEY, event_id TEXT NOT NULL REFERENCES notification_events(event_id), trace_id TEXT NOT NULL, user_id BIGINT NOT NULL, channel TEXT NOT NULL DEFAULT 'email', to_emails TEXT[] NOT NULL, template_id TEXT NOT NULL, params JSONB NOT NULL, idempotency_key TEXT NOT NULL UNIQUE, status TEXT NOT NULL, scheduled_at TIMESTAMPTZ NOT NULL, available_at TIMESTAMPTZ NOT NULL, retry_count INT NOT NULL DEFAULT 0, max_retry INT NOT NULL DEFAULT 5, last_error_code TEXT NULL, last_error_message TEXT NULL, sent_at TIMESTAMPTZ NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() )`, `CREATE INDEX IF NOT EXISTS idx_notification_jobs_status_available ON notification_jobs(status, available_at)`, `CREATE INDEX IF NOT EXISTS idx_notification_jobs_scheduled ON notification_jobs(scheduled_at)`, `CREATE TABLE IF NOT EXISTS notification_attempts ( id BIGSERIAL PRIMARY KEY, job_id TEXT NOT NULL REFERENCES notification_jobs(job_id) ON DELETE CASCADE, attempt_no INT NOT NULL, success BOOLEAN NOT NULL, error_code TEXT NULL, error_message TEXT NULL, latency_ms INT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() )`, `CREATE TABLE IF NOT EXISTS notification_dlq ( job_id TEXT PRIMARY KEY REFERENCES notification_jobs(job_id), reason TEXT NOT NULL, failed_at TIMESTAMPTZ NOT NULL, snapshot JSONB NOT NULL )`, } for _, stmt := range statements { if _, err := s.pool.Exec(ctx, stmt); err != nil { return err } } return nil } func (s *postgresStore) List(ctx context.Context, userID int64) ([]Task, error) { rows, err := s.pool.Query(ctx, `SELECT id, title, description, status, due_at, priority, tags, notify_group_ids, created_at, updated_at FROM tasks WHERE user_id = $1 ORDER BY id DESC`, userID) if err != nil { return nil, err } defer rows.Close() result := []Task{} for rows.Next() { var task Task var tags []string var notifyGroupIDs []int64 if err := rows.Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, ¬ifyGroupIDs, &task.CreatedAt, &task.UpdatedAt); err != nil { return nil, err } task.Tags = tags task.NotifyGroupIDs = notifyGroupIDs result = append(result, task) } return result, rows.Err() } func (s *postgresStore) Get(ctx context.Context, userID, id int64) (Task, error) { var task Task var tags []string var notifyGroupIDs []int64 err := s.pool.QueryRow(ctx, `SELECT id, title, description, status, due_at, priority, tags, notify_group_ids, created_at, updated_at FROM tasks WHERE user_id = $1 AND id = $2`, userID, id). Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, ¬ifyGroupIDs, &task.CreatedAt, &task.UpdatedAt) if err != nil { return Task{}, err } task.Tags = tags task.NotifyGroupIDs = notifyGroupIDs return task, nil } func (s *postgresStore) Create(ctx context.Context, userID int64, input Task) (Task, error) { if input.Status == "" { input.Status = "todo" } var tags []string if input.Tags != nil { tags = input.Tags } groupIDs := dedupeInt64(input.NotifyGroupIDs) err := s.pool.QueryRow(ctx, ` INSERT INTO tasks (user_id, title, description, status, due_at, priority, tags, notify_group_ids, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now(), now()) RETURNING id, created_at, updated_at`, userID, input.Title, input.Description, input.Status, input.DueAt, input.Priority, tags, groupIDs, ).Scan(&input.ID, &input.CreatedAt, &input.UpdatedAt) if err != nil { return Task{}, err } input.NotifyGroupIDs = groupIDs if err := s.syncTaskNotificationGroups(ctx, userID, input.ID, groupIDs); err != nil { return Task{}, err } return input, nil } func (s *postgresStore) Update(ctx context.Context, userID, id int64, input Task) (Task, error) { existing, err := s.Get(ctx, userID, id) if err != nil { return Task{}, err } if input.Title != "" { existing.Title = input.Title } if input.Description != "" { existing.Description = input.Description } if input.Status != "" { existing.Status = input.Status } if input.DueAt != "" { existing.DueAt = input.DueAt } if input.Priority != 0 { existing.Priority = input.Priority } if input.Tags != nil { existing.Tags = input.Tags } if input.NotifyGroupIDs != nil { existing.NotifyGroupIDs = dedupeInt64(input.NotifyGroupIDs) } var updatedAt time.Time err = s.pool.QueryRow(ctx, ` UPDATE tasks SET title = $1, description = $2, status = $3, due_at = $4, priority = $5, tags = $6, notify_group_ids = $7, updated_at = now() WHERE id = $8 AND user_id = $9 RETURNING updated_at`, existing.Title, existing.Description, existing.Status, existing.DueAt, existing.Priority, existing.Tags, existing.NotifyGroupIDs, id, userID, ).Scan(&updatedAt) if err != nil { return Task{}, err } if err := s.syncTaskNotificationGroups(ctx, userID, id, existing.NotifyGroupIDs); err != nil { return Task{}, err } existing.UpdatedAt = updatedAt return existing, nil } func (s *postgresStore) syncTaskNotificationGroups(ctx context.Context, userID, taskID int64, groupIDs []int64) error { groupIDs = dedupeInt64(groupIDs) tx, err := s.pool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx) if _, err := tx.Exec(ctx, `DELETE FROM task_notification_groups WHERE task_id = $1`, taskID); err != nil { return err } if len(groupIDs) == 0 { return tx.Commit(ctx) } rows, err := tx.Query(ctx, `SELECT id FROM notification_groups WHERE user_id = $1 AND id = ANY($2)`, userID, groupIDs) if err != nil { return err } valid := make([]int64, 0, len(groupIDs)) for rows.Next() { var id int64 if err := rows.Scan(&id); err != nil { rows.Close() return err } valid = append(valid, id) } rows.Close() for _, groupID := range valid { if _, err := tx.Exec(ctx, `INSERT INTO task_notification_groups (task_id, group_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, taskID, groupID); err != nil { return err } } return tx.Commit(ctx) } func (s *postgresStore) Delete(ctx context.Context, userID, id int64) error { result, err := s.pool.Exec(ctx, `DELETE FROM tasks WHERE id = $1 AND user_id = $2`, id, userID) if err != nil { return err } if result.RowsAffected() == 0 { return pgx.ErrNoRows } return nil } func (s *postgresStore) Close() { if s.pool != nil { s.pool.Close() } } func (s *postgresStore) Pool() *pgxpool.Pool { return s.pool } func newTaskEmitter(brokers []string, topic string) *taskEmitter { if len(brokers) == 0 { return nil } writer := &kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: topic, Balancer: &kafka.LeastBytes{}, } return &taskEmitter{writer: writer, topic: topic} } func (e *taskEmitter) Emit(ctx context.Context, eventType string, task Task, userID int64) { if e == nil || e.writer == nil { return } payload := map[string]any{ "type": eventType, "task_id": task.ID, "user_id": userID, "status": task.Status, "priority": task.Priority, "notify_group_ids": task.NotifyGroupIDs, "at": time.Now().UTC().Format(time.RFC3339), } data, err := json.Marshal(payload) if err != nil { return } writeCtx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() if err := e.writer.WriteMessages(writeCtx, kafka.Message{Key: []byte(strconv.FormatInt(task.ID, 10)), Value: data}); err != nil { log.Printf("kafka write failed: %v", err) } } func (e *taskEmitter) Close() { if e == nil || e.writer == nil { return } if err := e.writer.Close(); err != nil { log.Printf("kafka close failed: %v", err) } } func isUniqueViolation(err error) bool { var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code == "23505" { return true } return false } func main() { store, iamSvc, emitter, notifySvc := buildDependencies() defer store.Close() defer iamSvc.Close() defer emitter.Close() defer notifySvc.Close() appCtx, cancel := context.WithCancel(context.Background()) defer cancel() notifySvc.Start(appCtx) gin.SetMode(gin.DebugMode) router := gin.Default() router.RedirectTrailingSlash = false router.RedirectFixedPath = false router.Use(corsMiddleware()) router.GET("/api/health", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "ok"}) }) cfg := iam.LoadConfig() iam.NewHandler(iamSvc, cfg).RegisterRoutes(router) api := router.Group("/api/v1") api.Use(iamSvc.RequireAccess()) tasks := api.Group("/tasks") { tasks.GET("", func(c *gin.Context) { userID := c.GetInt64(iam.ContextUserIDKey) items, err := store.List(c.Request.Context(), userID) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load tasks"}) return } c.JSON(http.StatusOK, items) }) tasks.POST("", func(c *gin.Context) { var input Task if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID := c.GetInt64(iam.ContextUserIDKey) created, err := store.Create(c.Request.Context(), userID, input) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to save task"}) return } emitter.Emit(c.Request.Context(), "task.created", created, userID) _ = notifySvc.PublishTaskEvent(c.Request.Context(), "task.created", toTaskSnapshot(userID, created)) c.JSON(http.StatusCreated, created) }) tasks.GET(":id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } userID := c.GetInt64(iam.ContextUserIDKey) item, err := store.Get(c.Request.Context(), userID, id) if err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load task"}) return } c.JSON(http.StatusOK, item) }) tasks.PUT(":id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } var input Task if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID := c.GetInt64(iam.ContextUserIDKey) before, err := store.Get(c.Request.Context(), userID, id) if err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load task"}) return } updated, err := store.Update(c.Request.Context(), userID, id, input) if err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update task"}) return } eventType := "task.updated" if before.Status != updated.Status { eventType = "task.status_changed" } emitter.Emit(c.Request.Context(), eventType, updated, userID) _ = notifySvc.PublishTaskEvent(c.Request.Context(), eventType, toTaskSnapshot(userID, updated)) c.JSON(http.StatusOK, updated) }) tasks.DELETE(":id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } userID := c.GetInt64(iam.ContextUserIDKey) if err := store.Delete(c.Request.Context(), userID, id); err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete task"}) return } emitter.Emit(c.Request.Context(), "task.deleted", Task{ID: id}, userID) c.Status(http.StatusNoContent) }) } notifications := api.Group("/notifications") { notifications.GET("/prefs", func(c *gin.Context) { userID := c.GetInt64(iam.ContextUserIDKey) prefs, err := notifySvc.GetPrefs(c.Request.Context(), userID) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load prefs"}) return } c.JSON(http.StatusOK, prefs) }) notifications.PUT("/prefs", func(c *gin.Context) { var input notification.UserNotificationPrefs if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID := c.GetInt64(iam.ContextUserIDKey) prefs, err := notifySvc.UpdatePrefs(c.Request.Context(), userID, input) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } c.JSON(http.StatusOK, prefs) }) notifications.GET("/groups", func(c *gin.Context) { userID := c.GetInt64(iam.ContextUserIDKey) groups, err := notifySvc.ListGroups(c.Request.Context(), userID) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list groups"}) return } c.JSON(http.StatusOK, groups) }) notifications.POST("/groups", func(c *gin.Context) { var input notification.NotificationGroup if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID := c.GetInt64(iam.ContextUserIDKey) group, err := notifySvc.CreateGroup(c.Request.Context(), userID, input) if err != nil { status := http.StatusBadRequest if isUniqueViolation(err) { status = http.StatusConflict } c.JSON(status, gin.H{"error": err.Error()}) return } c.JSON(http.StatusCreated, group) }) notifications.PUT("/groups/:id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } var input notification.NotificationGroup if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID := c.GetInt64(iam.ContextUserIDKey) group, err := notifySvc.UpdateGroup(c.Request.Context(), userID, id, input) if err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } status := http.StatusBadRequest if isUniqueViolation(err) { status = http.StatusConflict } c.JSON(status, gin.H{"error": err.Error()}) return } c.JSON(http.StatusOK, group) }) notifications.DELETE("/groups/:id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } userID := c.GetInt64(iam.ContextUserIDKey) if err := notifySvc.DeleteGroup(c.Request.Context(), userID, id); err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete"}) return } c.Status(http.StatusNoContent) }) notifications.GET("/dlq", func(c *gin.Context) { page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "20")) userID := c.GetInt64(iam.ContextUserIDKey) items, err := notifySvc.ListDLQ(c.Request.Context(), userID, page, pageSize) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load dlq"}) return } c.JSON(http.StatusOK, items) }) notifications.GET("/metrics", func(c *gin.Context) { from, to, err := parseMetricsRange(c.Query("from"), c.Query("to")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid from/to"}) return } metrics, err := notifySvc.GetMetrics(c.Request.Context(), from, to) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load metrics"}) return } c.JSON(http.StatusOK, metrics) }) } if err := router.Run(":8080"); err != nil { panic(err) } } func buildDependencies() (*postgresStore, *iam.Service, *taskEmitter, *notification.Service) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() dbURL := strings.TrimSpace(os.Getenv("DATABASE_URL")) if dbURL == "" { dbURL = "postgres://todo:todo@localhost:5432/todo?sslmode=disable" } store, err := newPostgresStore(ctx, dbURL) if err != nil { log.Fatalf("postgres connection failed: %v", err) } iamCfg := iam.LoadConfig() iamSvc, err := iam.NewService(ctx, store.Pool(), iamCfg) if err != nil { log.Fatalf("iam initialization failed: %v", err) } brokers := splitCSV(os.Getenv("KAFKA_BROKERS")) topic := strings.TrimSpace(os.Getenv("KAFKA_TOPIC")) if topic == "" { topic = "todo.tasks" } emitter := newTaskEmitter(brokers, topic) notifyCfg := notification.LoadConfigFromEnv() notifySvc := notification.NewService(store.Pool(), notifyCfg) return store, iamSvc, emitter, notifySvc } func toTaskSnapshot(userID int64, task Task) notification.TaskSnapshot { return notification.TaskSnapshot{ ID: task.ID, UserID: userID, Title: task.Title, Description: task.Description, Status: task.Status, DueAt: task.DueAt, Priority: task.Priority, Tags: task.Tags, NotifyGroupIDs: task.NotifyGroupIDs, } } func parseMetricsRange(fromStr, toStr string) (time.Time, time.Time, error) { var from, to time.Time var err error if strings.TrimSpace(fromStr) != "" { from, err = time.Parse(time.RFC3339, fromStr) if err != nil { return time.Time{}, time.Time{}, err } } if strings.TrimSpace(toStr) != "" { to, err = time.Parse(time.RFC3339, toStr) if err != nil { return time.Time{}, time.Time{}, err } } return from, to, nil } func parseID(value string) (int64, error) { return strconv.ParseInt(value, 10, 64) } func splitCSV(value string) []string { if strings.TrimSpace(value) == "" { return nil } parts := strings.Split(value, ",") result := make([]string, 0, len(parts)) for _, part := range parts { item := strings.TrimSpace(part) if item != "" { result = append(result, item) } } return result } func dedupeInt64(in []int64) []int64 { if len(in) == 0 { return in } seen := map[int64]struct{}{} out := make([]int64, 0, len(in)) for _, id := range in { if id <= 0 { continue } if _, ok := seen[id]; ok { continue } seen[id] = struct{}{} out = append(out, id) } return out } func corsMiddleware() gin.HandlerFunc { return func(c *gin.Context) { origin := strings.TrimSpace(c.GetHeader("Origin")) if origin != "" { c.Writer.Header().Set("Access-Control-Allow-Origin", origin) c.Writer.Header().Set("Vary", "Origin") c.Writer.Header().Set("Access-Control-Allow-Credentials", "true") } else { c.Writer.Header().Set("Access-Control-Allow-Origin", "*") } c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") c.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type") if c.Request.Method == http.MethodOptions { c.AbortWithStatus(http.StatusNoContent) return } c.Next() } }