package main import ( "context" "crypto/hmac" "crypto/sha256" "encoding/base64" "encoding/json" "errors" "log" "net/http" "os" "strconv" "strings" "time" "github.com/gin-gonic/gin" "github.com/jackc/pgconn" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/redis/go-redis/v9" "github.com/segmentio/kafka-go" "golang.org/x/crypto/bcrypt" ) type Task struct { ID int64 `json:"id"` Title string `json:"title"` Description string `json:"description"` Status string `json:"status"` DueAt string `json:"due_at"` Priority int `json:"priority"` Tags []string `json:"tags"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } type User struct { ID int64 `json:"id"` Email string `json:"email"` } type tokenManager struct { secret []byte ttl time.Duration } type postgresStore struct { pool *pgxpool.Pool } type tokenCache struct { client *redis.Client prefix string } type taskEmitter struct { writer *kafka.Writer topic string } func newTokenManager(secret string, ttl time.Duration) *tokenManager { return &tokenManager{ secret: []byte(secret), ttl: ttl, } } func (t *tokenManager) Generate(user User) (string, error) { payload := struct { UserID int64 `json:"uid"` Exp int64 `json:"exp"` }{ UserID: user.ID, Exp: time.Now().Add(t.ttl).Unix(), } raw, err := json.Marshal(payload) if err != nil { return "", err } encoded := base64.RawURLEncoding.EncodeToString(raw) sig := t.sign(encoded) return encoded + "." + sig, nil } func (t *tokenManager) Validate(token string) (int64, bool) { parts := strings.Split(token, ".") if len(parts) != 2 { return 0, false } payload, sig := parts[0], parts[1] if !t.verify(payload, sig) { return 0, false } raw, err := base64.RawURLEncoding.DecodeString(payload) if err != nil { return 0, false } var data struct { UserID int64 `json:"uid"` Exp int64 `json:"exp"` } if err := json.Unmarshal(raw, &data); err != nil { return 0, false } if data.UserID == 0 || time.Now().Unix() > data.Exp { return 0, false } return data.UserID, true } func (t *tokenManager) sign(payload string) string { mac := hmac.New(sha256.New, t.secret) mac.Write([]byte(payload)) return base64.RawURLEncoding.EncodeToString(mac.Sum(nil)) } func (t *tokenManager) verify(payload, signature string) bool { expected := t.sign(payload) return hmac.Equal([]byte(signature), []byte(expected)) } func newPostgresStore(ctx context.Context, url string) (*postgresStore, error) { pool, err := pgxpool.New(ctx, url) if err != nil { return nil, err } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, err } store := &postgresStore{pool: pool} if err := store.initSchema(ctx); err != nil { pool.Close() return nil, err } return store, nil } func (s *postgresStore) initSchema(ctx context.Context) error { statements := []string{ `CREATE TABLE IF NOT EXISTS users ( id BIGSERIAL PRIMARY KEY, email TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() )`, `CREATE TABLE IF NOT EXISTS tasks ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, title TEXT NOT NULL, description TEXT, status TEXT NOT NULL, due_at TEXT, priority INT NOT NULL DEFAULT 0, tags TEXT[] NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL )`, } for _, stmt := range statements { if _, err := s.pool.Exec(ctx, stmt); err != nil { return err } } return nil } func (s *postgresStore) Register(ctx context.Context, email, password string) (User, error) { hashed, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) if err != nil { return User{}, err } var id int64 err = s.pool.QueryRow(ctx, `INSERT INTO users (email, password_hash) VALUES ($1, $2) RETURNING id`, email, string(hashed)).Scan(&id) if err != nil { if isUniqueViolation(err) { return User{}, errAlreadyExists } return User{}, err } return User{ID: id, Email: email}, nil } func (s *postgresStore) Login(ctx context.Context, email, password string) (User, error) { var user User var hash string err := s.pool.QueryRow(ctx, `SELECT id, email, password_hash FROM users WHERE email = $1`, email).Scan(&user.ID, &user.Email, &hash) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return User{}, errInvalidCredentials } return User{}, err } if err := bcrypt.CompareHashAndPassword([]byte(hash), []byte(password)); err != nil { return User{}, errInvalidCredentials } return user, nil } func (s *postgresStore) List(ctx context.Context, userID int64) ([]Task, error) { rows, err := s.pool.Query(ctx, `SELECT id, title, description, status, due_at, priority, tags, created_at, updated_at FROM tasks WHERE user_id = $1 ORDER BY id DESC`, userID) if err != nil { return nil, err } defer rows.Close() result := []Task{} for rows.Next() { var task Task var tags []string if err := rows.Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, &task.CreatedAt, &task.UpdatedAt); err != nil { return nil, err } task.Tags = tags result = append(result, task) } return result, rows.Err() } func (s *postgresStore) Get(ctx context.Context, userID, id int64) (Task, error) { var task Task var tags []string err := s.pool.QueryRow(ctx, `SELECT id, title, description, status, due_at, priority, tags, created_at, updated_at FROM tasks WHERE user_id = $1 AND id = $2`, userID, id). Scan(&task.ID, &task.Title, &task.Description, &task.Status, &task.DueAt, &task.Priority, &tags, &task.CreatedAt, &task.UpdatedAt) if err != nil { return Task{}, err } task.Tags = tags return task, nil } func (s *postgresStore) Create(ctx context.Context, userID int64, input Task) (Task, error) { if input.Status == "" { input.Status = "todo" } var tags []string if input.Tags != nil { tags = input.Tags } err := s.pool.QueryRow(ctx, ` INSERT INTO tasks (user_id, title, description, status, due_at, priority, tags, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, now(), now()) RETURNING id, created_at, updated_at`, userID, input.Title, input.Description, input.Status, input.DueAt, input.Priority, tags, ).Scan(&input.ID, &input.CreatedAt, &input.UpdatedAt) if err != nil { return Task{}, err } return input, nil } func (s *postgresStore) Update(ctx context.Context, userID, id int64, input Task) (Task, error) { existing, err := s.Get(ctx, userID, id) if err != nil { return Task{}, err } if input.Title != "" { existing.Title = input.Title } if input.Description != "" { existing.Description = input.Description } if input.Status != "" { existing.Status = input.Status } if input.DueAt != "" { existing.DueAt = input.DueAt } if input.Priority != 0 { existing.Priority = input.Priority } if input.Tags != nil { existing.Tags = input.Tags } var updatedAt time.Time err = s.pool.QueryRow(ctx, ` UPDATE tasks SET title = $1, description = $2, status = $3, due_at = $4, priority = $5, tags = $6, updated_at = now() WHERE id = $7 AND user_id = $8 RETURNING updated_at`, existing.Title, existing.Description, existing.Status, existing.DueAt, existing.Priority, existing.Tags, id, userID, ).Scan(&updatedAt) if err != nil { return Task{}, err } existing.UpdatedAt = updatedAt return existing, nil } func (s *postgresStore) Delete(ctx context.Context, userID, id int64) error { result, err := s.pool.Exec(ctx, `DELETE FROM tasks WHERE id = $1 AND user_id = $2`, id, userID) if err != nil { return err } if result.RowsAffected() == 0 { return pgx.ErrNoRows } return nil } func (s *postgresStore) Close() { if s.pool != nil { s.pool.Close() } } func newTokenCache(addr, password string, db int) (*tokenCache, error) { if strings.TrimSpace(addr) == "" { return nil, nil } client := redis.NewClient(&redis.Options{ Addr: addr, Password: password, DB: db, }) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() if err := client.Ping(ctx).Err(); err != nil { return nil, err } return &tokenCache{client: client, prefix: "auth:token:"}, nil } func (c *tokenCache) Save(ctx context.Context, token string, ttl time.Duration) error { return c.client.Set(ctx, c.prefix+token, "1", ttl).Err() } func (c *tokenCache) Delete(ctx context.Context, token string) error { return c.client.Del(ctx, c.prefix+token).Err() } func (c *tokenCache) Exists(ctx context.Context, token string) (bool, error) { count, err := c.client.Exists(ctx, c.prefix+token).Result() if err != nil { return false, err } return count == 1, nil } func newTaskEmitter(brokers []string, topic string) *taskEmitter { if len(brokers) == 0 { return nil } writer := &kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: topic, Balancer: &kafka.LeastBytes{}, } return &taskEmitter{writer: writer, topic: topic} } func (e *taskEmitter) Emit(ctx context.Context, eventType string, task Task, userID int64) { if e == nil || e.writer == nil { return } payload := map[string]any{ "type": eventType, "task_id": task.ID, "user_id": userID, "status": task.Status, "priority": task.Priority, "at": time.Now().UTC().Format(time.RFC3339), } data, err := json.Marshal(payload) if err != nil { return } writeCtx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() if err := e.writer.WriteMessages(writeCtx, kafka.Message{ Key: []byte(strconv.FormatInt(task.ID, 10)), Value: data, }); err != nil { log.Printf("kafka write failed: %v", err) } } func (e *taskEmitter) Close() { if e == nil || e.writer == nil { return } if err := e.writer.Close(); err != nil { log.Printf("kafka close failed: %v", err) } } var ( errAlreadyExists = errors.New("already exists") errInvalidCredentials = errors.New("invalid credentials") ) func isUniqueViolation(err error) bool { var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code == "23505" { return true } return false } func main() { store, tokens, cache, emitter := buildDependencies() defer store.Close() if cache != nil { defer cache.client.Close() } defer emitter.Close() gin.SetMode(gin.DebugMode) router := gin.Default() router.RedirectTrailingSlash = false router.RedirectFixedPath = false router.Use(func(c *gin.Context) { c.Writer.Header().Set("Access-Control-Allow-Origin", "*") c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") c.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type") if c.Request.Method == http.MethodOptions { c.AbortWithStatus(http.StatusNoContent) return } c.Next() }) router.GET("/api/health", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "ok"}) }) auth := router.Group("/api/v1/auth") { auth.POST("/register", func(c *gin.Context) { var input struct { Email string `json:"email"` Password string `json:"password"` } if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } input.Email = strings.TrimSpace(input.Email) if input.Email == "" || input.Password == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "email and password required"}) return } user, err := store.Register(c.Request.Context(), input.Email, input.Password) if err != nil { if errors.Is(err, errAlreadyExists) { c.JSON(http.StatusConflict, gin.H{"error": "user already exists"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "registration failed"}) return } c.JSON(http.StatusCreated, gin.H{"id": user.ID, "email": user.Email}) }) auth.POST("/login", func(c *gin.Context) { var input struct { Email string `json:"email"` Password string `json:"password"` } if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } input.Email = strings.TrimSpace(input.Email) if input.Email == "" || input.Password == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "email and password required"}) return } user, err := store.Login(c.Request.Context(), input.Email, input.Password) if err != nil { if errors.Is(err, errInvalidCredentials) { c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid credentials"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "login failed"}) return } token, err := tokens.Generate(user) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "token generation failed"}) return } if cache != nil { if err := cache.Save(c.Request.Context(), token, tokens.ttl); err != nil { c.JSON(http.StatusServiceUnavailable, gin.H{"error": "token cache unavailable"}) return } } c.JSON(http.StatusOK, gin.H{"token": token}) }) auth.POST("/logout", func(c *gin.Context) { token := extractBearerToken(c) if token == "" { c.JSON(http.StatusUnauthorized, gin.H{"error": "missing authorization"}) return } if _, ok := tokens.Validate(token); !ok { c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) return } if cache != nil { if err := cache.Delete(c.Request.Context(), token); err != nil { c.JSON(http.StatusServiceUnavailable, gin.H{"error": "token cache unavailable"}) return } } c.Status(http.StatusNoContent) }) } api := router.Group("/api/v1") api.Use(func(c *gin.Context) { token := extractBearerToken(c) if token == "" { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing authorization"}) return } userID, ok := tokens.Validate(token) if !ok { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) return } if cache != nil { exists, err := cache.Exists(c.Request.Context(), token) if err != nil { c.AbortWithStatusJSON(http.StatusServiceUnavailable, gin.H{"error": "token cache unavailable"}) return } if !exists { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) return } } c.Set("user_id", userID) c.Next() }) tasks := api.Group("/tasks") { tasks.GET("", func(c *gin.Context) { userID := c.GetInt64("user_id") items, err := store.List(c.Request.Context(), userID) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load tasks"}) return } c.JSON(http.StatusOK, items) }) tasks.POST("", func(c *gin.Context) { var input Task if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID := c.GetInt64("user_id") created, err := store.Create(c.Request.Context(), userID, input) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create task"}) return } emitter.Emit(c.Request.Context(), "task.created", created, userID) c.JSON(http.StatusCreated, created) }) tasks.GET(":id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } userID := c.GetInt64("user_id") task, err := store.Get(c.Request.Context(), userID, id) if err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load task"}) return } c.JSON(http.StatusOK, task) }) tasks.PUT(":id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } var input Task if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID := c.GetInt64("user_id") updated, err := store.Update(c.Request.Context(), userID, id, input) if err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update task"}) return } emitter.Emit(c.Request.Context(), "task.updated", updated, userID) c.JSON(http.StatusOK, updated) }) tasks.DELETE(":id", func(c *gin.Context) { id, err := parseID(c.Param("id")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid id"}) return } userID := c.GetInt64("user_id") if err := store.Delete(c.Request.Context(), userID, id); err != nil { if errors.Is(err, pgx.ErrNoRows) { c.JSON(http.StatusNotFound, gin.H{"error": "not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete task"}) return } emitter.Emit(c.Request.Context(), "task.deleted", Task{ID: id}, userID) c.Status(http.StatusNoContent) }) } if err := router.Run(":8080"); err != nil { panic(err) } } func buildDependencies() (*postgresStore, *tokenManager, *tokenCache, *taskEmitter) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() dbURL := strings.TrimSpace(os.Getenv("DATABASE_URL")) if dbURL == "" { dbURL = "postgres://todo:todo@localhost:5432/todo?sslmode=disable" } store, err := newPostgresStore(ctx, dbURL) if err != nil { log.Fatalf("postgres connection failed: %v", err) } secret := strings.TrimSpace(os.Getenv("AUTH_SECRET")) if secret == "" { secret = "dev-secret-change-me" } tokens := newTokenManager(secret, 24*time.Hour) redisAddr := strings.TrimSpace(os.Getenv("REDIS_ADDR")) redisPassword := os.Getenv("REDIS_PASSWORD") redisDB := parseEnvInt("REDIS_DB", 0) cache, err := newTokenCache(redisAddr, redisPassword, redisDB) if err != nil { log.Fatalf("redis connection failed: %v", err) } brokers := splitCSV(os.Getenv("KAFKA_BROKERS")) topic := strings.TrimSpace(os.Getenv("KAFKA_TOPIC")) if topic == "" { topic = "todo.tasks" } emitter := newTaskEmitter(brokers, topic) return store, tokens, cache, emitter } func parseID(value string) (int64, error) { return strconv.ParseInt(value, 10, 64) } func extractBearerToken(c *gin.Context) string { authHeader := strings.TrimSpace(c.GetHeader("Authorization")) if strings.HasPrefix(authHeader, "Bearer ") { return strings.TrimSpace(authHeader[7:]) } return authHeader } func parseEnvInt(key string, fallback int) int { value := strings.TrimSpace(os.Getenv(key)) if value == "" { return fallback } parsed, err := strconv.Atoi(value) if err != nil { return fallback } return parsed } func splitCSV(value string) []string { if strings.TrimSpace(value) == "" { return nil } parts := strings.Split(value, ",") result := make([]string, 0, len(parts)) for _, part := range parts { item := strings.TrimSpace(part) if item != "" { result = append(result, item) } } return result }