# Scheduler with Leader Election The scheduler has been integrated with leader election to ensure that **scheduled tasks only run on the leader instance**. ## Overview When multiple instances of the backend are running (e.g., behind a load balancer), you don't want scheduled tasks running on every instance. This would cause: - ❌ Duplicate task executions - ❌ Database conflicts - ❌ Wasted resources - ❌ Race conditions With leader election integration: - ✅ Tasks only execute on the **leader instance** - ✅ Automatic failover if leader crashes - ✅ No duplicate executions - ✅ Safe for multi-instance deployments ## How It Works ``` ┌─────────────────────────────────────────────────────────┐ │ Load Balancer │ └─────────────────┬───────────────┬──────────────────────┘ │ │ ┌─────────▼────┐ ┌──────▼──────┐ ┌──────────────┐ │ Instance 1 │ │ Instance 2 │ │ Instance 3 │ │ (LEADER) 👑 │ │ (Follower) │ │ (Follower) │ │ │ │ │ │ │ │ Scheduler ✅ │ │ Scheduler ⏸️ │ │ Scheduler ⏸️ │ │ Runs tasks │ │ Skips tasks │ │ Skips tasks │ └──────────────┘ └─────────────┘ └──────────────┘ ``` ### Execution Flow 1. **All instances** have the scheduler running with registered tasks 2. **All instances** have cron triggers firing at scheduled times 3. **Only the leader** actually executes the task logic 4. **Followers** skip execution (logged at DEBUG level) Example logs: **Leader Instance:** ``` 2025-01-12T10:00:00.000Z INFO 👑 Leader executing scheduled task task=CleanupOldRecords instance_id=instance-1 2025-01-12T10:00:05.123Z INFO ✅ Task completed successfully task=CleanupOldRecords ``` **Follower Instances:** ``` 2025-01-12T10:00:00.000Z DEBUG Skipping task execution - not the leader task=CleanupOldRecords instance_id=instance-2 ``` ## Usage ### 1. Create a Scheduled Task ```go package tasks import ( "context" "go.uber.org/zap" "codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/internal/interface/scheduler" ) type CleanupTask struct { logger *zap.Logger // ... other dependencies } func NewCleanupTask(logger *zap.Logger) scheduler.Task { return &CleanupTask{ logger: logger.Named("CleanupTask"), } } func (t *CleanupTask) Name() string { return "CleanupOldRecords" } func (t *CleanupTask) Schedule() string { // Cron format: every day at 2 AM return "0 2 * * *" } func (t *CleanupTask) Execute(ctx context.Context) error { t.logger.Info("Starting cleanup of old records") // Your task logic here // This will ONLY run on the leader instance t.logger.Info("Cleanup completed") return nil } ``` ### 2. Register Tasks with the Scheduler The scheduler is already wired through Google Wire. To register tasks, you would typically do this in your application startup: ```go // In app/app.go or wherever you initialize your app func (app *Application) Start() error { // ... existing startup code ... // Register scheduled tasks if app.scheduler != nil { // Create and register tasks cleanupTask := tasks.NewCleanupTask(app.logger) if err := app.scheduler.RegisterTask(cleanupTask); err != nil { return fmt.Errorf("failed to register cleanup task: %w", err) } metricsTask := tasks.NewMetricsAggregationTask(app.logger) if err := app.scheduler.RegisterTask(metricsTask); err != nil { return fmt.Errorf("failed to register metrics task: %w", err) } // Start the scheduler if err := app.scheduler.Start(); err != nil { return fmt.Errorf("failed to start scheduler: %w", err) } } // ... rest of startup code ... } ``` ### 3. Graceful Shutdown ```go func (app *Application) Stop() error { // ... other shutdown code ... if app.scheduler != nil { if err := app.scheduler.Stop(); err != nil { app.logger.Error("Failed to stop scheduler", zap.Error(err)) } } // ... rest of shutdown code ... } ``` ## Cron Schedule Format The scheduler uses standard cron format: ``` ┌───────────── minute (0 - 59) │ ┌───────────── hour (0 - 23) │ │ ┌───────────── day of month (1 - 31) │ │ │ ┌───────────── month (1 - 12) │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday) │ │ │ │ │ │ │ │ │ │ * * * * * ``` ### Common Examples ```go "* * * * *" // Every minute "0 * * * *" // Every hour (on the hour) "0 0 * * *" // Every day at midnight "0 2 * * *" // Every day at 2:00 AM "0 */6 * * *" // Every 6 hours "0 0 * * 0" // Every Sunday at midnight "0 0 1 * *" // First day of every month at midnight "0 9 * * 1-5" // Weekdays at 9:00 AM "*/5 * * * *" // Every 5 minutes "0 0,12 * * *" // Twice a day (midnight and noon) ``` ## Example Tasks ### Daily Cleanup Task ```go type DailyCleanupTask struct { logger *zap.Logger repo *Repository } func (t *DailyCleanupTask) Name() string { return "DailyCleanup" } func (t *DailyCleanupTask) Schedule() string { return "0 3 * * *" // 3 AM every day } func (t *DailyCleanupTask) Execute(ctx context.Context) error { t.logger.Info("Running daily cleanup") // Delete old records cutoffDate := time.Now().AddDate(0, 0, -30) // 30 days ago if err := t.repo.DeleteOlderThan(ctx, cutoffDate); err != nil { return fmt.Errorf("cleanup failed: %w", err) } return nil } ``` ### Hourly Metrics Task ```go type MetricsAggregationTask struct { logger *zap.Logger metrics *MetricsService } func (t *MetricsAggregationTask) Name() string { return "HourlyMetrics" } func (t *MetricsAggregationTask) Schedule() string { return "0 * * * *" // Every hour } func (t *MetricsAggregationTask) Execute(ctx context.Context) error { t.logger.Info("Aggregating hourly metrics") if err := t.metrics.AggregateAndSend(ctx); err != nil { return fmt.Errorf("metrics aggregation failed: %w", err) } return nil } ``` ### Cache Warming Task ```go type CacheWarmingTask struct { logger *zap.Logger cache *CacheService } func (t *CacheWarmingTask) Name() string { return "CacheWarming" } func (t *CacheWarmingTask) Schedule() string { return "*/30 * * * *" // Every 30 minutes } func (t *CacheWarmingTask) Execute(ctx context.Context) error { t.logger.Info("Warming application cache") if err := t.cache.WarmFrequentlyAccessedData(ctx); err != nil { return fmt.Errorf("cache warming failed: %w", err) } return nil } ``` ## Testing ### Local Testing with Multiple Instances ```bash # Terminal 1 (will become leader) LEADER_ELECTION_INSTANCE_ID=instance-1 ./maplefile-backend # Terminal 2 (follower) LEADER_ELECTION_INSTANCE_ID=instance-2 ./maplefile-backend # Terminal 3 (follower) LEADER_ELECTION_INSTANCE_ID=instance-3 ./maplefile-backend ``` Watch the logs: - **Only instance-1** (leader) will execute tasks - **instance-2 and instance-3** will skip task execution Kill instance-1 and watch: - Either instance-2 or instance-3 becomes the new leader - The new leader starts executing tasks - The remaining follower continues to skip ### Testing Task Execution Create a test task that runs every minute: ```go type TestTask struct { logger *zap.Logger } func (t *TestTask) Name() string { return "TestTask" } func (t *TestTask) Schedule() string { return "* * * * *" // Every minute } func (t *TestTask) Execute(ctx context.Context) error { t.logger.Info("TEST TASK EXECUTED - I am the leader!") return nil } ``` This makes it easy to see which instance is executing tasks. ## Configuration ### Enable/Disable Leader Election Leader election for the scheduler is controlled by the `LEADER_ELECTION_ENABLED` environment variable: ```bash # With leader election (default) LEADER_ELECTION_ENABLED=true # Without leader election (all instances run tasks - NOT RECOMMENDED for production) LEADER_ELECTION_ENABLED=false ``` ### Behavior Matrix | Leader Election | Instances | Task Execution | |----------------|-----------|----------------| | Enabled | Single | Tasks run on that instance ✅ | | Enabled | Multiple | Tasks run ONLY on leader ✅ | | Disabled | Single | Tasks run on that instance ✅ | | Disabled | Multiple | Tasks run on ALL instances ⚠️ | ## Best Practices 1. **Always enable leader election in production** when running multiple instances 2. **Keep tasks idempotent** - if a task is accidentally executed twice, it shouldn't cause problems 3. **Handle task failures gracefully** - the scheduler will log errors but continue running 4. **Don't run long tasks** - tasks block the scheduler thread 5. **Use context** - respect context cancellation for graceful shutdown 6. **Log appropriately** - use structured logging to track task execution 7. **Test failover** - verify new leader takes over task execution ## Monitoring ### Check Scheduler Status You can check which instance is executing tasks by looking at the logs: ```bash # Leader logs grep "Leader executing" logs/app.log # Follower logs (DEBUG level) grep "Skipping task execution" logs/app.log ``` ### Health Check You could add a health check endpoint to expose scheduler status: ```go func (h *HealthHandler) SchedulerHealth(w http.ResponseWriter, r *http.Request) { tasks := h.scheduler.GetRegisteredTasks() response := map[string]interface{}{ "registered_tasks": tasks, "leader_election_enabled": h.config.LeaderElection.Enabled, "is_leader": h.leaderElection.IsLeader(), "will_execute_tasks": !h.config.LeaderElection.Enabled || h.leaderElection.IsLeader(), } json.NewEncoder(w).Encode(response) } ``` ## Troubleshooting ### Tasks not running on any instance 1. Check leader election is working: `grep "Became the leader" logs/app.log` 2. Check tasks are registered: Look for "Registering scheduled task" in logs 3. Check scheduler started: Look for "Scheduler started successfully" ### Tasks running on multiple instances 1. Check `LEADER_ELECTION_ENABLED=true` in all instances 2. Check all instances connect to the same Redis 3. Check network connectivity between instances and Redis ### Tasks not running after leader failure 1. Check `LEADER_ELECTION_LOCK_TTL` - should be < 30s for fast failover 2. Check `LEADER_ELECTION_RETRY_INTERVAL` - followers should retry frequently 3. Check new leader logs for "Became the leader" 4. Verify new leader executes tasks after election ## Related Documentation - [Leader Election Package](../../../pkg/leaderelection/README.md) - [Leader Election Examples](../../../pkg/leaderelection/EXAMPLE.md)