129 lines
3.5 KiB
Go
129 lines
3.5 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/robfig/cron/v3"
|
|
"go.uber.org/zap"
|
|
|
|
"codeberg.org/mapleopentech/monorepo/cloud/maplepress-backend/config"
|
|
"codeberg.org/mapleopentech/monorepo/cloud/maplepress-backend/pkg/leaderelection"
|
|
siteusecase "codeberg.org/mapleopentech/monorepo/cloud/maplepress-backend/internal/usecase/site"
|
|
)
|
|
|
|
// QuotaResetScheduler handles scheduled usage resets for billing cycles
|
|
type QuotaResetScheduler struct {
|
|
cron *cron.Cron
|
|
resetUsageUC *siteusecase.ResetMonthlyUsageUseCase
|
|
leaderElection leaderelection.LeaderElection
|
|
logger *zap.Logger
|
|
enabled bool
|
|
schedulePattern string
|
|
}
|
|
|
|
// cronLogger is a simple adapter for cron to use zap logger
|
|
type cronLogger struct {
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func (l *cronLogger) Info(msg string, keysAndValues ...interface{}) {
|
|
l.logger.Sugar().Infow(msg, keysAndValues...)
|
|
}
|
|
|
|
func (l *cronLogger) Error(err error, msg string, keysAndValues ...interface{}) {
|
|
l.logger.Sugar().Errorw(msg, append(keysAndValues, "error", err)...)
|
|
}
|
|
|
|
// ProvideQuotaResetScheduler creates a new QuotaResetScheduler from config
|
|
func ProvideQuotaResetScheduler(
|
|
cfg *config.Config,
|
|
resetUsageUC *siteusecase.ResetMonthlyUsageUseCase,
|
|
leaderElection leaderelection.LeaderElection,
|
|
logger *zap.Logger,
|
|
) *QuotaResetScheduler {
|
|
enabled := cfg.Scheduler.QuotaResetEnabled
|
|
schedulePattern := cfg.Scheduler.QuotaResetSchedule
|
|
|
|
// Create cron with logger
|
|
cronLog := &cronLogger{logger: logger.Named("cron")}
|
|
c := cron.New(
|
|
cron.WithLogger(cronLog),
|
|
cron.WithChain(
|
|
cron.Recover(cronLog),
|
|
),
|
|
)
|
|
|
|
return &QuotaResetScheduler{
|
|
cron: c,
|
|
resetUsageUC: resetUsageUC,
|
|
leaderElection: leaderElection,
|
|
logger: logger.Named("usage-reset-scheduler"),
|
|
enabled: enabled,
|
|
schedulePattern: schedulePattern,
|
|
}
|
|
}
|
|
|
|
// Start starts the quota reset scheduler
|
|
func (s *QuotaResetScheduler) Start() error {
|
|
if !s.enabled {
|
|
s.logger.Info("quota reset scheduler is disabled")
|
|
return nil
|
|
}
|
|
|
|
s.logger.Info("starting quota reset scheduler",
|
|
zap.String("schedule", s.schedulePattern))
|
|
|
|
// Schedule the quota reset job
|
|
_, err := s.cron.AddFunc(s.schedulePattern, s.resetQuotas)
|
|
if err != nil {
|
|
s.logger.Error("failed to schedule quota reset job", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Start the cron scheduler
|
|
s.cron.Start()
|
|
|
|
s.logger.Info("quota reset scheduler started successfully")
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the quota reset scheduler
|
|
func (s *QuotaResetScheduler) Stop() {
|
|
if !s.enabled {
|
|
return
|
|
}
|
|
|
|
s.logger.Info("stopping quota reset scheduler")
|
|
ctx := s.cron.Stop()
|
|
<-ctx.Done()
|
|
s.logger.Info("quota reset scheduler stopped")
|
|
}
|
|
|
|
// resetQuotas is the cron job function that resets monthly usage counters
|
|
func (s *QuotaResetScheduler) resetQuotas() {
|
|
// Only execute if this instance is the leader
|
|
if !s.leaderElection.IsLeader() {
|
|
s.logger.Debug("skipping quota reset - not the leader instance",
|
|
zap.String("instance_id", s.leaderElection.GetInstanceID()))
|
|
return
|
|
}
|
|
|
|
s.logger.Info("executing scheduled usage reset as leader instance",
|
|
zap.String("instance_id", s.leaderElection.GetInstanceID()))
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
|
defer cancel()
|
|
|
|
output, err := s.resetUsageUC.Execute(ctx)
|
|
if err != nil {
|
|
s.logger.Error("usage reset failed", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
s.logger.Info("usage reset completed",
|
|
zap.Int("processed_sites", output.ProcessedSites),
|
|
zap.Int("reset_count", output.ResetCount),
|
|
zap.Int("failed_count", output.FailedCount),
|
|
zap.Time("processed_at", output.ProcessedAt))
|
|
}
|