package main import ( "context" "encoding/json" "errors" "log" "net/http" "os" "strconv" "strings" "time" "wolves.top/todo/internal/iam" "github.com/gin-gonic/gin" "github.com/jackc/pgconn" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/segmentio/kafka-go" ) type Task struct { ID int64 `json:"id"` Title string `json:"title"` Description string `json:"description"` Status string `json:"status"` DueAt string `json:"due_at"` Priority int `json:"priority"` Tags []string `json:"tags"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } type postgresStore struct { pool *pgxpool.Pool } type taskEmitter struct { writer *kafka.Writer topic string } var ( errAlreadyExists = errors.New("already exists") ) func newPostgresStore(ctx context.Context, url string) (*postgresStore, error) { pool, err := pgxpool.New(ctx, url) if err != nil { return nil, err } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, err } store := &postgresStore{pool: pool} if err := store.initSchema(ctx); err != nil { pool.Close() return nil, err } return store, nil } func (s *postgresStore) initSchema(ctx context.Context) error { statements := []string{ `CREATE TABLE IF NOT EXISTS users ( id BIGSERIAL PRIMARY KEY, email TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() )`, `CREATE TABLE IF NOT EXISTS tasks ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, title TEXT NOT NULL, description TEXT, status TEXT NOT NULL, due_at TEXT, priority INT NOT NULL DEFAULT 0, tags TEXT[] NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL )`, } for _, stmt := range statements { if _, err := s.pool.Exec(ctx, stmt); err != nil { return err } } return nil } func (s *postgresStore) List(ctx context.Context, userID int64) ([]Task, error) { rows, err := s.pool.Query(ctx, `SELECT id, title, description, status, due_at, priority, tags, created_at, updated_at FROM tasks WHERE user_id = $1 ORDER BY id DESC`, userID) if err != nil { return nil, err } defer rows.Close() result := []Task{} for rows.Next() { var task Task var tags []string if err := rows.Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, &task.CreatedAt, &task.UpdatedAt); err != nil { return nil, err } task.Tags = tags result = append(result, task) } return result, rows.Err() } func (s *postgresStore) Get(ctx context.Context, userID, id int64) (Task, error) { var task Task var tags []string err := s.pool.QueryRow(ctx, `SELECT id, title, description, status, due_at, priority, tags, created_at, updated_at FROM tasks WHERE user_id = $1 AND id = $2`, userID, id). Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, &task.CreatedAt, &task.UpdatedAt) if err != nil { return Task{}, err } task.Tags = tags return task, nil } func (s *postgresStore) Create(ctx context.Context, userID int64, input Task) (Task, error) { if input.Status == "" { input.Status = "todo" } var tags []string if input.Tags != nil { tags = input.Tags } err := s.pool.QueryRow(ctx, ` INSERT INTO tasks (user_id, title, description, status, due_at, priority, tags, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, now(), now()) RETURNING id, created_at, updated_at`, userID, input.Title, input.Description, input.Status, input.DueAt, input.Priority, tags, ).Scan(&input.ID, &input.CreatedAt, &input.UpdatedAt) if err != nil { return Task{}, err } return input, nil } func (s *postgresStore) Update(ctx context.Context, userID, id int64, input Task) (Task, error) { existing, err := s.Get(ctx, userID, id) if err != nil { return Task{}, err } if input.Title != "" { existing.Title = input.Title } if input.Description != "" { existing.Description = input.Description } if input.Status != "" { existing.Status = input.Status } if input.DueAt != "" { existing.DueAt = input.DueAt } if input.Priority != 0 { existing.Priority = input.Priority } if input.Tags != nil { existing.Tags = input.Tags } var updatedAt time.Time err = s.pool.QueryRow(ctx, ` UPDATE tasks SET title = $1, description = $2, status = $3, due_at = $4, priority = $5, tags = $6, updated_at = now() WHERE id = $7 AND user_id = $8 RETURNING updated_at`, existing.Title, existing.Description, existing.Status, existing.DueAt, existing.Priority, existing.Tags, id, userID, ).Scan(&updatedAt) if err != nil { return Task{}, err } existing.UpdatedAt = updatedAt return existing, nil } func (s *postgresStore) Delete(ctx context.Context, userID, id int64) error { result, err := s.pool.Exec(ctx, `DELETE FROM tasks WHERE id = $1 AND user_id = $2`, id, userID) if err != nil { return err } if result.RowsAffected() == 0 { return pgx.ErrNoRows } return nil } func (s *postgresStore) Close() { if s.pool != nil { s.pool.Close() } } func (s *postgresStore) Pool() *pgxpool.Pool { return s.pool } func newTaskEmitter(brokers []string, topic string) *taskEmitter { if len(brokers) == 0 { return nil } writer := &kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: topic, Balancer: &kafka.LeastBytes{}, } return &taskEmitter{writer: writer, topic: topic} } func (e *taskEmitter) Emit(ctx context.Context, eventType string, task Task, userID int64) { if e == nil || e.writer == nil { return } payload := map[string]any{ "type": eventType, "task_id": task.ID, "user_id": userID, "status": task.Status, "priority": task.Priority, "at": time.Now().UTC().Format(time.RFC3339), } data, err := json.Marshal(payload) if err != nil { return } writeCtx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() if err := e.writer.WriteMessages(writeCtx, kafka.Message{Key: []byte(strconv.FormatInt(task.ID, 10)), Value: data}); err != nil { log.Printf("kafka write failed: %v", err) } } func (e *taskEmitter) Close() { if e == nil || e.writer == nil { return } if err := e.writer.Close(); err != nil { log.Printf("kafka close failed: %v", err) } } func isUniqueViolation(err error) bool { var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code == "23505" { return true } return false } func main() { store, iamSvc, emitter := buildDependencies() defer store.Close() defer iamSvc.Close() defer emitter.Close() gin.SetMode(gin.DebugMode) router := gin.Default() router.RedirectTrailingSlash = false router.RedirectFixedPath = false router.Use(corsMiddleware()) router.GET("/api/health", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "ok"}) }) cfg := iam.LoadConfig() iam.NewHandler(iamSvc, cfg).RegisterRoutes(router) api := router.Group("/api/v1") api.Use(iamSvc.RequireAccess()) tasks := api.Group("/tasks") { tasks.GET("", func(c *gin.Context) { userID := c.GetInt64(iam.ContextUserIDKey) items, err := store.List(c.Request.Context(), userID) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load tasks"}) return } c.JSON(http.StatusOK, items) }) tasks.POST("", func(c *gin.Context) { var input Task if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID := c.GetInt64(iam.ContextUserIDKey) created, err := store.Create(c.Request.Context(), userID, input) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to save task"}) return } emitter.Emit(c.Request.Context(), "task.created", created, userID) c.JSON(http.StatusCreated, created) }) tasks.GET(":id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } userID := c.GetInt64(iam.ContextUserIDKey) item, err := store.Get(c.Request.Context(), userID, id) if err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load task"}) return } c.JSON(http.StatusOK, item) }) tasks.PUT(":id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } var input Task if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID := c.GetInt64(iam.ContextUserIDKey) updated, err := store.Update(c.Request.Context(), userID, id, input) if err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update task"}) return } emitter.Emit(c.Request.Context(), "task.updated", updated, userID) c.JSON(http.StatusOK, updated) }) tasks.DELETE(":id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } userID := c.GetInt64(iam.ContextUserIDKey) if err := store.Delete(c.Request.Context(), userID, id); err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete task"}) return } emitter.Emit(c.Request.Context(), "task.deleted", Task{ID: id}, userID) c.Status(http.StatusNoContent) }) } if err := router.Run(":8080"); err != nil { panic(err) } } func buildDependencies() (*postgresStore, *iam.Service, *taskEmitter) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() dbURL := strings.TrimSpace(os.Getenv("DATABASE_URL")) if dbURL == "" { dbURL = "postgres://todo:todo@localhost:5432/todo?sslmode=disable" } store, err := newPostgresStore(ctx, dbURL) if err != nil { log.Fatalf("postgres connection failed: %v", err) } iamCfg := iam.LoadConfig() iamSvc, err := iam.NewService(ctx, store.Pool(), iamCfg) if err != nil { log.Fatalf("iam initialization failed: %v", err) } brokers := splitCSV(os.Getenv("KAFKA_BROKERS")) topic := strings.TrimSpace(os.Getenv("KAFKA_TOPIC")) if topic == "" { topic = "todo.tasks" } emitter := newTaskEmitter(brokers, topic) return store, iamSvc, emitter } func parseID(value string) (int64, error) { return strconv.ParseInt(value, 10, 64) } func splitCSV(value string) []string { if strings.TrimSpace(value) == "" { return nil } parts := strings.Split(value, ",") result := make([]string, 0, len(parts)) for _, part := range parts { item := strings.TrimSpace(part) if item != "" { result = append(result, item) } } return result } func corsMiddleware() gin.HandlerFunc { return func(c *gin.Context) { origin := strings.TrimSpace(c.GetHeader("Origin")) if origin != "" { c.Writer.Header().Set("Access-Control-Allow-Origin", origin) c.Writer.Header().Set("Vary", "Origin") c.Writer.Header().Set("Access-Control-Allow-Credentials", "true") } else { c.Writer.Header().Set("Access-Control-Allow-Origin", "*") } c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") c.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type") if c.Request.Method == http.MethodOptions { c.AbortWithStatus(http.StatusNoContent) return } c.Next() } }