11 KiB
Scheduler with Leader Election
The scheduler has been integrated with leader election to ensure that scheduled tasks only run on the leader instance.
Overview
When multiple instances of the backend are running (e.g., behind a load balancer), you don't want scheduled tasks running on every instance. This would cause:
- ❌ Duplicate task executions
- ❌ Database conflicts
- ❌ Wasted resources
- ❌ Race conditions
With leader election integration:
- ✅ Tasks only execute on the leader instance
- ✅ Automatic failover if leader crashes
- ✅ No duplicate executions
- ✅ Safe for multi-instance deployments
How It Works
┌─────────────────────────────────────────────────────────┐
│ Load Balancer │
└─────────────────┬───────────────┬──────────────────────┘
│ │
┌─────────▼────┐ ┌──────▼──────┐ ┌──────────────┐
│ Instance 1 │ │ Instance 2 │ │ Instance 3 │
│ (LEADER) 👑 │ │ (Follower) │ │ (Follower) │
│ │ │ │ │ │
│ Scheduler ✅ │ │ Scheduler ⏸️ │ │ Scheduler ⏸️ │
│ Runs tasks │ │ Skips tasks │ │ Skips tasks │
└──────────────┘ └─────────────┘ └──────────────┘
Execution Flow
- All instances have the scheduler running with registered tasks
- All instances have cron triggers firing at scheduled times
- Only the leader actually executes the task logic
- Followers skip execution (logged at DEBUG level)
Example logs:
Leader Instance:
2025-01-12T10:00:00.000Z INFO 👑 Leader executing scheduled task task=CleanupOldRecords instance_id=instance-1
2025-01-12T10:00:05.123Z INFO ✅ Task completed successfully task=CleanupOldRecords
Follower Instances:
2025-01-12T10:00:00.000Z DEBUG Skipping task execution - not the leader task=CleanupOldRecords instance_id=instance-2
Usage
1. Create a Scheduled Task
package tasks
import (
"context"
"go.uber.org/zap"
"codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/internal/interface/scheduler"
)
type CleanupTask struct {
logger *zap.Logger
// ... other dependencies
}
func NewCleanupTask(logger *zap.Logger) scheduler.Task {
return &CleanupTask{
logger: logger.Named("CleanupTask"),
}
}
func (t *CleanupTask) Name() string {
return "CleanupOldRecords"
}
func (t *CleanupTask) Schedule() string {
// Cron format: every day at 2 AM
return "0 2 * * *"
}
func (t *CleanupTask) Execute(ctx context.Context) error {
t.logger.Info("Starting cleanup of old records")
// Your task logic here
// This will ONLY run on the leader instance
t.logger.Info("Cleanup completed")
return nil
}
2. Register Tasks with the Scheduler
The scheduler is already wired through Google Wire. To register tasks, you would typically do this in your application startup:
// In app/app.go or wherever you initialize your app
func (app *Application) Start() error {
// ... existing startup code ...
// Register scheduled tasks
if app.scheduler != nil {
// Create and register tasks
cleanupTask := tasks.NewCleanupTask(app.logger)
if err := app.scheduler.RegisterTask(cleanupTask); err != nil {
return fmt.Errorf("failed to register cleanup task: %w", err)
}
metricsTask := tasks.NewMetricsAggregationTask(app.logger)
if err := app.scheduler.RegisterTask(metricsTask); err != nil {
return fmt.Errorf("failed to register metrics task: %w", err)
}
// Start the scheduler
if err := app.scheduler.Start(); err != nil {
return fmt.Errorf("failed to start scheduler: %w", err)
}
}
// ... rest of startup code ...
}
3. Graceful Shutdown
func (app *Application) Stop() error {
// ... other shutdown code ...
if app.scheduler != nil {
if err := app.scheduler.Stop(); err != nil {
app.logger.Error("Failed to stop scheduler", zap.Error(err))
}
}
// ... rest of shutdown code ...
}
Cron Schedule Format
The scheduler uses standard cron format:
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
│ │ │ │ │
* * * * *
Common Examples
"* * * * *" // Every minute
"0 * * * *" // Every hour (on the hour)
"0 0 * * *" // Every day at midnight
"0 2 * * *" // Every day at 2:00 AM
"0 */6 * * *" // Every 6 hours
"0 0 * * 0" // Every Sunday at midnight
"0 0 1 * *" // First day of every month at midnight
"0 9 * * 1-5" // Weekdays at 9:00 AM
"*/5 * * * *" // Every 5 minutes
"0 0,12 * * *" // Twice a day (midnight and noon)
Example Tasks
Daily Cleanup Task
type DailyCleanupTask struct {
logger *zap.Logger
repo *Repository
}
func (t *DailyCleanupTask) Name() string {
return "DailyCleanup"
}
func (t *DailyCleanupTask) Schedule() string {
return "0 3 * * *" // 3 AM every day
}
func (t *DailyCleanupTask) Execute(ctx context.Context) error {
t.logger.Info("Running daily cleanup")
// Delete old records
cutoffDate := time.Now().AddDate(0, 0, -30) // 30 days ago
if err := t.repo.DeleteOlderThan(ctx, cutoffDate); err != nil {
return fmt.Errorf("cleanup failed: %w", err)
}
return nil
}
Hourly Metrics Task
type MetricsAggregationTask struct {
logger *zap.Logger
metrics *MetricsService
}
func (t *MetricsAggregationTask) Name() string {
return "HourlyMetrics"
}
func (t *MetricsAggregationTask) Schedule() string {
return "0 * * * *" // Every hour
}
func (t *MetricsAggregationTask) Execute(ctx context.Context) error {
t.logger.Info("Aggregating hourly metrics")
if err := t.metrics.AggregateAndSend(ctx); err != nil {
return fmt.Errorf("metrics aggregation failed: %w", err)
}
return nil
}
Cache Warming Task
type CacheWarmingTask struct {
logger *zap.Logger
cache *CacheService
}
func (t *CacheWarmingTask) Name() string {
return "CacheWarming"
}
func (t *CacheWarmingTask) Schedule() string {
return "*/30 * * * *" // Every 30 minutes
}
func (t *CacheWarmingTask) Execute(ctx context.Context) error {
t.logger.Info("Warming application cache")
if err := t.cache.WarmFrequentlyAccessedData(ctx); err != nil {
return fmt.Errorf("cache warming failed: %w", err)
}
return nil
}
Testing
Local Testing with Multiple Instances
# Terminal 1 (will become leader)
LEADER_ELECTION_INSTANCE_ID=instance-1 ./maplefile-backend
# Terminal 2 (follower)
LEADER_ELECTION_INSTANCE_ID=instance-2 ./maplefile-backend
# Terminal 3 (follower)
LEADER_ELECTION_INSTANCE_ID=instance-3 ./maplefile-backend
Watch the logs:
- Only instance-1 (leader) will execute tasks
- instance-2 and instance-3 will skip task execution
Kill instance-1 and watch:
- Either instance-2 or instance-3 becomes the new leader
- The new leader starts executing tasks
- The remaining follower continues to skip
Testing Task Execution
Create a test task that runs every minute:
type TestTask struct {
logger *zap.Logger
}
func (t *TestTask) Name() string {
return "TestTask"
}
func (t *TestTask) Schedule() string {
return "* * * * *" // Every minute
}
func (t *TestTask) Execute(ctx context.Context) error {
t.logger.Info("TEST TASK EXECUTED - I am the leader!")
return nil
}
This makes it easy to see which instance is executing tasks.
Configuration
Enable/Disable Leader Election
Leader election for the scheduler is controlled by the LEADER_ELECTION_ENABLED environment variable:
# With leader election (default)
LEADER_ELECTION_ENABLED=true
# Without leader election (all instances run tasks - NOT RECOMMENDED for production)
LEADER_ELECTION_ENABLED=false
Behavior Matrix
| Leader Election | Instances | Task Execution |
|---|---|---|
| Enabled | Single | Tasks run on that instance ✅ |
| Enabled | Multiple | Tasks run ONLY on leader ✅ |
| Disabled | Single | Tasks run on that instance ✅ |
| Disabled | Multiple | Tasks run on ALL instances ⚠️ |
Best Practices
- Always enable leader election in production when running multiple instances
- Keep tasks idempotent - if a task is accidentally executed twice, it shouldn't cause problems
- Handle task failures gracefully - the scheduler will log errors but continue running
- Don't run long tasks - tasks block the scheduler thread
- Use context - respect context cancellation for graceful shutdown
- Log appropriately - use structured logging to track task execution
- Test failover - verify new leader takes over task execution
Monitoring
Check Scheduler Status
You can check which instance is executing tasks by looking at the logs:
# Leader logs
grep "Leader executing" logs/app.log
# Follower logs (DEBUG level)
grep "Skipping task execution" logs/app.log
Health Check
You could add a health check endpoint to expose scheduler status:
func (h *HealthHandler) SchedulerHealth(w http.ResponseWriter, r *http.Request) {
tasks := h.scheduler.GetRegisteredTasks()
response := map[string]interface{}{
"registered_tasks": tasks,
"leader_election_enabled": h.config.LeaderElection.Enabled,
"is_leader": h.leaderElection.IsLeader(),
"will_execute_tasks": !h.config.LeaderElection.Enabled || h.leaderElection.IsLeader(),
}
json.NewEncoder(w).Encode(response)
}
Troubleshooting
Tasks not running on any instance
- Check leader election is working:
grep "Became the leader" logs/app.log - Check tasks are registered: Look for "Registering scheduled task" in logs
- Check scheduler started: Look for "Scheduler started successfully"
Tasks running on multiple instances
- Check
LEADER_ELECTION_ENABLED=truein all instances - Check all instances connect to the same Redis
- Check network connectivity between instances and Redis
Tasks not running after leader failure
- Check
LEADER_ELECTION_LOCK_TTL- should be < 30s for fast failover - Check
LEADER_ELECTION_RETRY_INTERVAL- followers should retry frequently - Check new leader logs for "Became the leader"
- Verify new leader executes tasks after election