monorepo/cloud/maplefile-backend/internal/interface/scheduler/scheduler.go

179 lines
4.6 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package scheduler
import (
"context"
"sync"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
"codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/config"
"codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/pkg/leaderelection"
)
// Task represents a scheduled task
type Task interface {
Name() string
Schedule() string
Execute(ctx context.Context) error
}
// Scheduler manages all scheduled tasks
// Tasks are only executed if this instance is the leader (when leader election is enabled)
type Scheduler struct {
config *config.Config
logger *zap.Logger
cron *cron.Cron
tasks []Task
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
leaderElection leaderelection.LeaderElection // Leader election instance (can be nil if disabled)
}
// ProvideScheduler creates a new Scheduler instance for Wire DI
func ProvideScheduler(
cfg *config.Config,
logger *zap.Logger,
leaderElection leaderelection.LeaderElection,
) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.Named("Scheduler")
return &Scheduler{
config: cfg,
logger: logger,
cron: cron.New(),
tasks: make([]Task, 0),
ctx: ctx,
cancel: cancel,
leaderElection: leaderElection,
}
}
// RegisterTask registers a task to be scheduled
func (s *Scheduler) RegisterTask(task Task) error {
s.mu.Lock()
defer s.mu.Unlock()
s.logger.Info("Registering scheduled task",
zap.String("task", task.Name()),
zap.String("schedule", task.Schedule()))
// Add task to scheduler
_, err := s.cron.AddFunc(task.Schedule(), func() {
s.executeTask(task)
})
if err != nil {
s.logger.Error("Failed to register task",
zap.String("task", task.Name()),
zap.Error(err))
return err
}
s.tasks = append(s.tasks, task)
s.logger.Info("✅ Task registered successfully",
zap.String("task", task.Name()))
return nil
}
// executeTask executes a task with error handling and logging
// Tasks are only executed if this instance is the leader (when leader election is enabled)
func (s *Scheduler) executeTask(task Task) {
// Check if leader election is enabled
if s.config.LeaderElection.Enabled && s.leaderElection != nil {
// Only execute if this instance is the leader
if !s.leaderElection.IsLeader() {
s.logger.Debug("Skipping task execution - not the leader",
zap.String("task", task.Name()),
zap.String("instance_id", s.leaderElection.GetInstanceID()))
return
}
// Log that leader is executing the task
s.logger.Info("👑 Leader executing scheduled task",
zap.String("task", task.Name()),
zap.String("instance_id", s.leaderElection.GetInstanceID()))
} else {
// Leader election disabled, execute normally
s.logger.Info("Executing scheduled task",
zap.String("task", task.Name()))
}
// Create a context for this execution
ctx := s.ctx
// Execute the task
if err := task.Execute(ctx); err != nil {
s.logger.Error("Task execution failed",
zap.String("task", task.Name()),
zap.Error(err))
return
}
s.logger.Info("✅ Task completed successfully",
zap.String("task", task.Name()))
}
// Start starts the scheduler
func (s *Scheduler) Start() error {
s.mu.RLock()
taskCount := len(s.tasks)
s.mu.RUnlock()
// Log leader election status
if s.config.LeaderElection.Enabled && s.leaderElection != nil {
s.logger.Info("🕐 Starting scheduler with leader election",
zap.Int("registered_tasks", taskCount),
zap.Bool("leader_election_enabled", true),
zap.String("instance_id", s.leaderElection.GetInstanceID()))
s.logger.Info(" Tasks will ONLY execute on the leader instance")
} else {
s.logger.Info("🕐 Starting scheduler without leader election",
zap.Int("registered_tasks", taskCount),
zap.Bool("leader_election_enabled", false))
s.logger.Warn("⚠️ Leader election is disabled - tasks will run on ALL instances")
}
if taskCount == 0 {
s.logger.Warn("No tasks registered, scheduler will run but do nothing")
}
s.cron.Start()
s.logger.Info("✅ Scheduler started successfully")
return nil
}
// Stop stops the scheduler gracefully
func (s *Scheduler) Stop() error {
s.logger.Info("Stopping scheduler...")
// Cancel all running tasks
s.cancel()
// Stop the cron scheduler
ctx := s.cron.Stop()
<-ctx.Done()
s.logger.Info("✅ Scheduler stopped successfully")
return nil
}
// GetRegisteredTasks returns a list of registered task names
func (s *Scheduler) GetRegisteredTasks() []string {
s.mu.RLock()
defer s.mu.RUnlock()
taskNames := make([]string, len(s.tasks))
for i, task := range s.tasks {
taskNames[i] = task.Name()
}
return taskNames
}