package leaderelection import ( "context" "encoding/json" "fmt" "math/rand" "os" "sync" "time" "github.com/redis/go-redis/v9" "go.uber.org/zap" ) // redisLeaderElection implements LeaderElection using Redis. type redisLeaderElection struct { config *Config redis redis.UniversalClient logger *zap.Logger instanceID string hostname string isLeader bool leaderMutex sync.RWMutex becomeLeaderCbs []func() loseLeadershipCbs []func() callbackMutex sync.RWMutex stopChan chan struct{} stoppedChan chan struct{} leaderStartTime time.Time lastHeartbeat time.Time lastHeartbeatMutex sync.RWMutex } // NewRedisLeaderElection creates a new Redis-based leader election instance. func NewRedisLeaderElection( config *Config, redisClient redis.UniversalClient, logger *zap.Logger, ) (LeaderElection, error) { logger = logger.Named("LeaderElection") // Validate configuration if err := config.Validate(); err != nil { return nil, fmt.Errorf("invalid configuration: %w", err) } // Generate instance ID if not provided instanceID := config.InstanceID if instanceID == "" { hostname, err := os.Hostname() if err != nil { hostname = "unknown" } // Add random suffix to make it unique instanceID = fmt.Sprintf("%s-%d", hostname, rand.Intn(100000)) logger.Info("Generated instance ID", zap.String("instance_id", instanceID)) } // Get hostname if not provided hostname := config.Hostname if hostname == "" { h, err := os.Hostname() if err != nil { hostname = "unknown" } else { hostname = h } } return &redisLeaderElection{ config: config, redis: redisClient, logger: logger, instanceID: instanceID, hostname: hostname, isLeader: false, becomeLeaderCbs: make([]func(), 0), loseLeadershipCbs: make([]func(), 0), stopChan: make(chan struct{}), stoppedChan: make(chan struct{}), }, nil } // Start begins participating in leader election. func (le *redisLeaderElection) Start(ctx context.Context) error { le.logger.Info("Starting leader election", zap.String("instance_id", le.instanceID), zap.String("hostname", le.hostname), zap.Duration("lock_ttl", le.config.LockTTL), zap.Duration("heartbeat_interval", le.config.HeartbeatInterval), ) defer close(le.stoppedChan) // Main election loop ticker := time.NewTicker(le.config.RetryInterval) defer ticker.Stop() // Try to become leader immediately on startup le.tryBecomeLeader(ctx) for { select { case <-ctx.Done(): le.logger.Info("Context cancelled, stopping leader election") le.releaseLeadership(context.Background()) return ctx.Err() case <-le.stopChan: le.logger.Info("Stop signal received, stopping leader election") le.releaseLeadership(context.Background()) return nil case <-ticker.C: if le.IsLeader() { // If we're the leader, send heartbeat if err := le.sendHeartbeat(ctx); err != nil { le.logger.Error("Failed to send heartbeat, lost leadership", zap.Error(err)) le.setLeaderStatus(false) le.executeCallbacks(le.loseLeadershipCbs) } } else { // If we're not the leader, try to become leader le.tryBecomeLeader(ctx) } } } } // tryBecomeLeader attempts to acquire leadership. func (le *redisLeaderElection) tryBecomeLeader(ctx context.Context) { // Try to set the leader key with NX (only if not exists) and EX (expiry) success, err := le.redis.SetNX(ctx, le.config.RedisKeyName, le.instanceID, le.config.LockTTL).Result() if err != nil { le.logger.Error("Failed to attempt leader election", zap.Error(err)) return } if success { // We became the leader! le.logger.Info("🎉 Became the leader!", zap.String("instance_id", le.instanceID)) le.leaderStartTime = time.Now() le.setLeaderStatus(true) le.updateLeaderInfo(ctx) le.executeCallbacks(le.becomeLeaderCbs) } else { // Someone else is the leader if !le.IsLeader() { // Only log if we weren't already aware currentLeader, _ := le.GetLeaderID() le.logger.Debug("Another instance is the leader", zap.String("leader_id", currentLeader)) } } } // sendHeartbeat renews the leader lock. func (le *redisLeaderElection) sendHeartbeat(ctx context.Context) error { // Verify we still hold the lock currentValue, err := le.redis.Get(ctx, le.config.RedisKeyName).Result() if err != nil { return fmt.Errorf("failed to get current lock value: %w", err) } if currentValue != le.instanceID { return fmt.Errorf("lock held by different instance: %s", currentValue) } // Renew the lock err = le.redis.Expire(ctx, le.config.RedisKeyName, le.config.LockTTL).Err() if err != nil { return fmt.Errorf("failed to renew lock: %w", err) } // Update heartbeat time le.setLastHeartbeat(time.Now()) // Update leader info le.updateLeaderInfo(ctx) le.logger.Debug("Heartbeat sent", zap.String("instance_id", le.instanceID)) return nil } // updateLeaderInfo updates the leader information in Redis. func (le *redisLeaderElection) updateLeaderInfo(ctx context.Context) { info := &LeaderInfo{ InstanceID: le.instanceID, Hostname: le.hostname, StartedAt: le.leaderStartTime, LastHeartbeat: le.getLastHeartbeat(), } data, err := json.Marshal(info) if err != nil { le.logger.Error("Failed to marshal leader info", zap.Error(err)) return } // Set with same TTL as lock err = le.redis.Set(ctx, le.config.RedisInfoKeyName, data, le.config.LockTTL).Err() if err != nil { le.logger.Error("Failed to update leader info", zap.Error(err)) } } // releaseLeadership voluntarily releases leadership. func (le *redisLeaderElection) releaseLeadership(ctx context.Context) { if !le.IsLeader() { return } le.logger.Info("Releasing leadership voluntarily", zap.String("instance_id", le.instanceID)) // Only delete if we're still the owner script := ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end ` _, err := le.redis.Eval(ctx, script, []string{le.config.RedisKeyName}, le.instanceID).Result() if err != nil { le.logger.Error("Failed to release leadership", zap.Error(err)) } // Delete leader info le.redis.Del(ctx, le.config.RedisInfoKeyName) le.setLeaderStatus(false) le.executeCallbacks(le.loseLeadershipCbs) } // IsLeader returns true if this instance is the leader. func (le *redisLeaderElection) IsLeader() bool { le.leaderMutex.RLock() defer le.leaderMutex.RUnlock() return le.isLeader } // GetLeaderID returns the ID of the current leader. func (le *redisLeaderElection) GetLeaderID() (string, error) { ctx := context.Background() leaderID, err := le.redis.Get(ctx, le.config.RedisKeyName).Result() if err == redis.Nil { return "", nil } if err != nil { return "", fmt.Errorf("failed to get leader ID: %w", err) } return leaderID, nil } // GetLeaderInfo returns information about the current leader. func (le *redisLeaderElection) GetLeaderInfo() (*LeaderInfo, error) { ctx := context.Background() data, err := le.redis.Get(ctx, le.config.RedisInfoKeyName).Result() if err == redis.Nil { return nil, nil } if err != nil { return nil, fmt.Errorf("failed to get leader info: %w", err) } var info LeaderInfo if err := json.Unmarshal([]byte(data), &info); err != nil { return nil, fmt.Errorf("failed to unmarshal leader info: %w", err) } return &info, nil } // OnBecomeLeader registers a callback for when this instance becomes leader. func (le *redisLeaderElection) OnBecomeLeader(callback func()) { le.callbackMutex.Lock() defer le.callbackMutex.Unlock() le.becomeLeaderCbs = append(le.becomeLeaderCbs, callback) } // OnLoseLeadership registers a callback for when this instance loses leadership. func (le *redisLeaderElection) OnLoseLeadership(callback func()) { le.callbackMutex.Lock() defer le.callbackMutex.Unlock() le.loseLeadershipCbs = append(le.loseLeadershipCbs, callback) } // Stop gracefully stops leader election. func (le *redisLeaderElection) Stop() error { le.logger.Info("Stopping leader election") close(le.stopChan) // Wait for the election loop to finish (with timeout) select { case <-le.stoppedChan: le.logger.Info("Leader election stopped successfully") return nil case <-time.After(5 * time.Second): le.logger.Warn("Timeout waiting for leader election to stop") return fmt.Errorf("timeout waiting for leader election to stop") } } // GetInstanceID returns this instance's unique identifier. func (le *redisLeaderElection) GetInstanceID() string { return le.instanceID } // setLeaderStatus updates the leader status (thread-safe). func (le *redisLeaderElection) setLeaderStatus(isLeader bool) { le.leaderMutex.Lock() defer le.leaderMutex.Unlock() le.isLeader = isLeader } // setLastHeartbeat updates the last heartbeat time (thread-safe). func (le *redisLeaderElection) setLastHeartbeat(t time.Time) { le.lastHeartbeatMutex.Lock() defer le.lastHeartbeatMutex.Unlock() le.lastHeartbeat = t } // getLastHeartbeat gets the last heartbeat time (thread-safe). func (le *redisLeaderElection) getLastHeartbeat() time.Time { le.lastHeartbeatMutex.RLock() defer le.lastHeartbeatMutex.RUnlock() return le.lastHeartbeat } // executeCallbacks executes a list of callbacks in separate goroutines. func (le *redisLeaderElection) executeCallbacks(callbacks []func()) { le.callbackMutex.RLock() defer le.callbackMutex.RUnlock() for _, callback := range callbacks { go func(cb func()) { defer func() { if r := recover(); r != nil { le.logger.Error("Panic in leader election callback", zap.Any("panic", r)) } }() cb() }(callback) } }