402 lines
11 KiB
Markdown
402 lines
11 KiB
Markdown
# Scheduler with Leader Election
|
|
|
|
The scheduler has been integrated with leader election to ensure that **scheduled tasks only run on the leader instance**.
|
|
|
|
## Overview
|
|
|
|
When multiple instances of the backend are running (e.g., behind a load balancer), you don't want scheduled tasks running on every instance. This would cause:
|
|
- ❌ Duplicate task executions
|
|
- ❌ Database conflicts
|
|
- ❌ Wasted resources
|
|
- ❌ Race conditions
|
|
|
|
With leader election integration:
|
|
- ✅ Tasks only execute on the **leader instance**
|
|
- ✅ Automatic failover if leader crashes
|
|
- ✅ No duplicate executions
|
|
- ✅ Safe for multi-instance deployments
|
|
|
|
## How It Works
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ Load Balancer │
|
|
└─────────────────┬───────────────┬──────────────────────┘
|
|
│ │
|
|
┌─────────▼────┐ ┌──────▼──────┐ ┌──────────────┐
|
|
│ Instance 1 │ │ Instance 2 │ │ Instance 3 │
|
|
│ (LEADER) 👑 │ │ (Follower) │ │ (Follower) │
|
|
│ │ │ │ │ │
|
|
│ Scheduler ✅ │ │ Scheduler ⏸️ │ │ Scheduler ⏸️ │
|
|
│ Runs tasks │ │ Skips tasks │ │ Skips tasks │
|
|
└──────────────┘ └─────────────┘ └──────────────┘
|
|
```
|
|
|
|
### Execution Flow
|
|
|
|
1. **All instances** have the scheduler running with registered tasks
|
|
2. **All instances** have cron triggers firing at scheduled times
|
|
3. **Only the leader** actually executes the task logic
|
|
4. **Followers** skip execution (logged at DEBUG level)
|
|
|
|
Example logs:
|
|
|
|
**Leader Instance:**
|
|
```
|
|
2025-01-12T10:00:00.000Z INFO 👑 Leader executing scheduled task task=CleanupOldRecords instance_id=instance-1
|
|
2025-01-12T10:00:05.123Z INFO ✅ Task completed successfully task=CleanupOldRecords
|
|
```
|
|
|
|
**Follower Instances:**
|
|
```
|
|
2025-01-12T10:00:00.000Z DEBUG Skipping task execution - not the leader task=CleanupOldRecords instance_id=instance-2
|
|
```
|
|
|
|
## Usage
|
|
|
|
### 1. Create a Scheduled Task
|
|
|
|
```go
|
|
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"go.uber.org/zap"
|
|
"codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/internal/interface/scheduler"
|
|
)
|
|
|
|
type CleanupTask struct {
|
|
logger *zap.Logger
|
|
// ... other dependencies
|
|
}
|
|
|
|
func NewCleanupTask(logger *zap.Logger) scheduler.Task {
|
|
return &CleanupTask{
|
|
logger: logger.Named("CleanupTask"),
|
|
}
|
|
}
|
|
|
|
func (t *CleanupTask) Name() string {
|
|
return "CleanupOldRecords"
|
|
}
|
|
|
|
func (t *CleanupTask) Schedule() string {
|
|
// Cron format: every day at 2 AM
|
|
return "0 2 * * *"
|
|
}
|
|
|
|
func (t *CleanupTask) Execute(ctx context.Context) error {
|
|
t.logger.Info("Starting cleanup of old records")
|
|
|
|
// Your task logic here
|
|
// This will ONLY run on the leader instance
|
|
|
|
t.logger.Info("Cleanup completed")
|
|
return nil
|
|
}
|
|
```
|
|
|
|
### 2. Register Tasks with the Scheduler
|
|
|
|
The scheduler is already wired through Google Wire. To register tasks, you would typically do this in your application startup:
|
|
|
|
```go
|
|
// In app/app.go or wherever you initialize your app
|
|
|
|
func (app *Application) Start() error {
|
|
// ... existing startup code ...
|
|
|
|
// Register scheduled tasks
|
|
if app.scheduler != nil {
|
|
// Create and register tasks
|
|
cleanupTask := tasks.NewCleanupTask(app.logger)
|
|
if err := app.scheduler.RegisterTask(cleanupTask); err != nil {
|
|
return fmt.Errorf("failed to register cleanup task: %w", err)
|
|
}
|
|
|
|
metricsTask := tasks.NewMetricsAggregationTask(app.logger)
|
|
if err := app.scheduler.RegisterTask(metricsTask); err != nil {
|
|
return fmt.Errorf("failed to register metrics task: %w", err)
|
|
}
|
|
|
|
// Start the scheduler
|
|
if err := app.scheduler.Start(); err != nil {
|
|
return fmt.Errorf("failed to start scheduler: %w", err)
|
|
}
|
|
}
|
|
|
|
// ... rest of startup code ...
|
|
}
|
|
```
|
|
|
|
### 3. Graceful Shutdown
|
|
|
|
```go
|
|
func (app *Application) Stop() error {
|
|
// ... other shutdown code ...
|
|
|
|
if app.scheduler != nil {
|
|
if err := app.scheduler.Stop(); err != nil {
|
|
app.logger.Error("Failed to stop scheduler", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// ... rest of shutdown code ...
|
|
}
|
|
```
|
|
|
|
## Cron Schedule Format
|
|
|
|
The scheduler uses standard cron format:
|
|
|
|
```
|
|
┌───────────── minute (0 - 59)
|
|
│ ┌───────────── hour (0 - 23)
|
|
│ │ ┌───────────── day of month (1 - 31)
|
|
│ │ │ ┌───────────── month (1 - 12)
|
|
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
|
|
│ │ │ │ │
|
|
│ │ │ │ │
|
|
* * * * *
|
|
```
|
|
|
|
### Common Examples
|
|
|
|
```go
|
|
"* * * * *" // Every minute
|
|
"0 * * * *" // Every hour (on the hour)
|
|
"0 0 * * *" // Every day at midnight
|
|
"0 2 * * *" // Every day at 2:00 AM
|
|
"0 */6 * * *" // Every 6 hours
|
|
"0 0 * * 0" // Every Sunday at midnight
|
|
"0 0 1 * *" // First day of every month at midnight
|
|
"0 9 * * 1-5" // Weekdays at 9:00 AM
|
|
"*/5 * * * *" // Every 5 minutes
|
|
"0 0,12 * * *" // Twice a day (midnight and noon)
|
|
```
|
|
|
|
## Example Tasks
|
|
|
|
### Daily Cleanup Task
|
|
|
|
```go
|
|
type DailyCleanupTask struct {
|
|
logger *zap.Logger
|
|
repo *Repository
|
|
}
|
|
|
|
func (t *DailyCleanupTask) Name() string {
|
|
return "DailyCleanup"
|
|
}
|
|
|
|
func (t *DailyCleanupTask) Schedule() string {
|
|
return "0 3 * * *" // 3 AM every day
|
|
}
|
|
|
|
func (t *DailyCleanupTask) Execute(ctx context.Context) error {
|
|
t.logger.Info("Running daily cleanup")
|
|
|
|
// Delete old records
|
|
cutoffDate := time.Now().AddDate(0, 0, -30) // 30 days ago
|
|
if err := t.repo.DeleteOlderThan(ctx, cutoffDate); err != nil {
|
|
return fmt.Errorf("cleanup failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
```
|
|
|
|
### Hourly Metrics Task
|
|
|
|
```go
|
|
type MetricsAggregationTask struct {
|
|
logger *zap.Logger
|
|
metrics *MetricsService
|
|
}
|
|
|
|
func (t *MetricsAggregationTask) Name() string {
|
|
return "HourlyMetrics"
|
|
}
|
|
|
|
func (t *MetricsAggregationTask) Schedule() string {
|
|
return "0 * * * *" // Every hour
|
|
}
|
|
|
|
func (t *MetricsAggregationTask) Execute(ctx context.Context) error {
|
|
t.logger.Info("Aggregating hourly metrics")
|
|
|
|
if err := t.metrics.AggregateAndSend(ctx); err != nil {
|
|
return fmt.Errorf("metrics aggregation failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
```
|
|
|
|
### Cache Warming Task
|
|
|
|
```go
|
|
type CacheWarmingTask struct {
|
|
logger *zap.Logger
|
|
cache *CacheService
|
|
}
|
|
|
|
func (t *CacheWarmingTask) Name() string {
|
|
return "CacheWarming"
|
|
}
|
|
|
|
func (t *CacheWarmingTask) Schedule() string {
|
|
return "*/30 * * * *" // Every 30 minutes
|
|
}
|
|
|
|
func (t *CacheWarmingTask) Execute(ctx context.Context) error {
|
|
t.logger.Info("Warming application cache")
|
|
|
|
if err := t.cache.WarmFrequentlyAccessedData(ctx); err != nil {
|
|
return fmt.Errorf("cache warming failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
```
|
|
|
|
## Testing
|
|
|
|
### Local Testing with Multiple Instances
|
|
|
|
```bash
|
|
# Terminal 1 (will become leader)
|
|
LEADER_ELECTION_INSTANCE_ID=instance-1 ./maplefile-backend
|
|
|
|
# Terminal 2 (follower)
|
|
LEADER_ELECTION_INSTANCE_ID=instance-2 ./maplefile-backend
|
|
|
|
# Terminal 3 (follower)
|
|
LEADER_ELECTION_INSTANCE_ID=instance-3 ./maplefile-backend
|
|
```
|
|
|
|
Watch the logs:
|
|
- **Only instance-1** (leader) will execute tasks
|
|
- **instance-2 and instance-3** will skip task execution
|
|
|
|
Kill instance-1 and watch:
|
|
- Either instance-2 or instance-3 becomes the new leader
|
|
- The new leader starts executing tasks
|
|
- The remaining follower continues to skip
|
|
|
|
### Testing Task Execution
|
|
|
|
Create a test task that runs every minute:
|
|
|
|
```go
|
|
type TestTask struct {
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func (t *TestTask) Name() string {
|
|
return "TestTask"
|
|
}
|
|
|
|
func (t *TestTask) Schedule() string {
|
|
return "* * * * *" // Every minute
|
|
}
|
|
|
|
func (t *TestTask) Execute(ctx context.Context) error {
|
|
t.logger.Info("TEST TASK EXECUTED - I am the leader!")
|
|
return nil
|
|
}
|
|
```
|
|
|
|
This makes it easy to see which instance is executing tasks.
|
|
|
|
## Configuration
|
|
|
|
### Enable/Disable Leader Election
|
|
|
|
Leader election for the scheduler is controlled by the `LEADER_ELECTION_ENABLED` environment variable:
|
|
|
|
```bash
|
|
# With leader election (default)
|
|
LEADER_ELECTION_ENABLED=true
|
|
|
|
# Without leader election (all instances run tasks - NOT RECOMMENDED for production)
|
|
LEADER_ELECTION_ENABLED=false
|
|
```
|
|
|
|
### Behavior Matrix
|
|
|
|
| Leader Election | Instances | Task Execution |
|
|
|----------------|-----------|----------------|
|
|
| Enabled | Single | Tasks run on that instance ✅ |
|
|
| Enabled | Multiple | Tasks run ONLY on leader ✅ |
|
|
| Disabled | Single | Tasks run on that instance ✅ |
|
|
| Disabled | Multiple | Tasks run on ALL instances ⚠️ |
|
|
|
|
## Best Practices
|
|
|
|
1. **Always enable leader election in production** when running multiple instances
|
|
2. **Keep tasks idempotent** - if a task is accidentally executed twice, it shouldn't cause problems
|
|
3. **Handle task failures gracefully** - the scheduler will log errors but continue running
|
|
4. **Don't run long tasks** - tasks block the scheduler thread
|
|
5. **Use context** - respect context cancellation for graceful shutdown
|
|
6. **Log appropriately** - use structured logging to track task execution
|
|
7. **Test failover** - verify new leader takes over task execution
|
|
|
|
## Monitoring
|
|
|
|
### Check Scheduler Status
|
|
|
|
You can check which instance is executing tasks by looking at the logs:
|
|
|
|
```bash
|
|
# Leader logs
|
|
grep "Leader executing" logs/app.log
|
|
|
|
# Follower logs (DEBUG level)
|
|
grep "Skipping task execution" logs/app.log
|
|
```
|
|
|
|
### Health Check
|
|
|
|
You could add a health check endpoint to expose scheduler status:
|
|
|
|
```go
|
|
func (h *HealthHandler) SchedulerHealth(w http.ResponseWriter, r *http.Request) {
|
|
tasks := h.scheduler.GetRegisteredTasks()
|
|
|
|
response := map[string]interface{}{
|
|
"registered_tasks": tasks,
|
|
"leader_election_enabled": h.config.LeaderElection.Enabled,
|
|
"is_leader": h.leaderElection.IsLeader(),
|
|
"will_execute_tasks": !h.config.LeaderElection.Enabled || h.leaderElection.IsLeader(),
|
|
}
|
|
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### Tasks not running on any instance
|
|
|
|
1. Check leader election is working: `grep "Became the leader" logs/app.log`
|
|
2. Check tasks are registered: Look for "Registering scheduled task" in logs
|
|
3. Check scheduler started: Look for "Scheduler started successfully"
|
|
|
|
### Tasks running on multiple instances
|
|
|
|
1. Check `LEADER_ELECTION_ENABLED=true` in all instances
|
|
2. Check all instances connect to the same Redis
|
|
3. Check network connectivity between instances and Redis
|
|
|
|
### Tasks not running after leader failure
|
|
|
|
1. Check `LEADER_ELECTION_LOCK_TTL` - should be < 30s for fast failover
|
|
2. Check `LEADER_ELECTION_RETRY_INTERVAL` - followers should retry frequently
|
|
3. Check new leader logs for "Became the leader"
|
|
4. Verify new leader executes tasks after election
|
|
|
|
## Related Documentation
|
|
|
|
- [Leader Election Package](../../../pkg/leaderelection/README.md)
|
|
- [Leader Election Examples](../../../pkg/leaderelection/EXAMPLE.md)
|