package leaderelection import ( "context" "encoding/json" "fmt" "math/rand" "os" "sync" "time" "github.com/redis/go-redis/v9" "go.uber.org/zap" "codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/pkg/distributedmutex" ) // mutexLeaderElection implements LeaderElection using distributedmutex. type mutexLeaderElection struct { config *Config mutex distributedmutex.Adapter redis redis.UniversalClient logger *zap.Logger instanceID string hostname string isLeader bool leaderMutex sync.RWMutex becomeLeaderCbs []func() loseLeadershipCbs []func() callbackMutex sync.RWMutex stopChan chan struct{} stoppedChan chan struct{} leaderStartTime time.Time lastHeartbeat time.Time lastHeartbeatMutex sync.RWMutex } // NewMutexLeaderElection creates a new distributed mutex-based leader election instance. func NewMutexLeaderElection( config *Config, mutex distributedmutex.Adapter, redisClient redis.UniversalClient, logger *zap.Logger, ) (LeaderElection, error) { logger = logger.Named("LeaderElection") // Validate configuration if err := config.Validate(); err != nil { return nil, fmt.Errorf("invalid configuration: %w", err) } // Generate instance ID if not provided instanceID := config.InstanceID if instanceID == "" { hostname, err := os.Hostname() if err != nil { hostname = "unknown" } // Add random suffix to make it unique instanceID = fmt.Sprintf("%s-%d", hostname, rand.Intn(100000)) logger.Info("Generated instance ID", zap.String("instance_id", instanceID)) } // Get hostname if not provided hostname := config.Hostname if hostname == "" { h, err := os.Hostname() if err != nil { hostname = "unknown" } else { hostname = h } } return &mutexLeaderElection{ config: config, mutex: mutex, redis: redisClient, logger: logger, instanceID: instanceID, hostname: hostname, isLeader: false, becomeLeaderCbs: make([]func(), 0), loseLeadershipCbs: make([]func(), 0), stopChan: make(chan struct{}), stoppedChan: make(chan struct{}), }, nil } // Start begins participating in leader election. func (le *mutexLeaderElection) Start(ctx context.Context) error { le.logger.Info("Starting leader election", zap.String("instance_id", le.instanceID), zap.String("hostname", le.hostname), zap.Duration("lock_ttl", le.config.LockTTL), zap.Duration("heartbeat_interval", le.config.HeartbeatInterval), ) defer close(le.stoppedChan) // Main election loop ticker := time.NewTicker(le.config.RetryInterval) defer ticker.Stop() // Try to become leader immediately on startup le.tryBecomeLeader(ctx) for { select { case <-ctx.Done(): le.logger.Info("Context cancelled, stopping leader election") le.releaseLeadership(context.Background()) return ctx.Err() case <-le.stopChan: le.logger.Info("Stop signal received, stopping leader election") le.releaseLeadership(context.Background()) return nil case <-ticker.C: if le.IsLeader() { // If we're the leader, send heartbeat if err := le.sendHeartbeat(ctx); err != nil { le.logger.Error("Failed to send heartbeat, lost leadership", zap.Error(err)) le.setLeaderStatus(false) le.executeCallbacks(le.loseLeadershipCbs) } } else { // If we're not the leader, try to become leader le.tryBecomeLeader(ctx) } } } } // tryBecomeLeader attempts to acquire leadership using distributed mutex. func (le *mutexLeaderElection) tryBecomeLeader(ctx context.Context) { // Try to acquire the lock (non-blocking) acquired, err := le.mutex.TryAcquire(ctx, le.config.RedisKeyName, le.config.LockTTL) if err != nil { le.logger.Error("Failed to attempt leader election", zap.Error(err)) return } if acquired { // We became the leader! le.logger.Info("🎉 Became the leader!", zap.String("instance_id", le.instanceID)) le.leaderStartTime = time.Now() le.setLeaderStatus(true) le.updateLeaderInfo(ctx) le.executeCallbacks(le.becomeLeaderCbs) } else { // Someone else is the leader if !le.IsLeader() { // Only log if we weren't already aware currentLeader, _ := le.GetLeaderID() le.logger.Debug("Another instance is the leader", zap.String("leader_id", currentLeader)) } } } // sendHeartbeat renews the leader lock using distributed mutex. func (le *mutexLeaderElection) sendHeartbeat(ctx context.Context) error { // Extend the lock TTL err := le.mutex.Extend(ctx, le.config.RedisKeyName, le.config.LockTTL) if err != nil { return fmt.Errorf("failed to extend lock: %w", err) } // Update heartbeat time le.setLastHeartbeat(time.Now()) // Update leader info le.updateLeaderInfo(ctx) le.logger.Debug("Heartbeat sent", zap.String("instance_id", le.instanceID)) return nil } // updateLeaderInfo updates the leader information in Redis. func (le *mutexLeaderElection) updateLeaderInfo(ctx context.Context) { info := &LeaderInfo{ InstanceID: le.instanceID, Hostname: le.hostname, StartedAt: le.leaderStartTime, LastHeartbeat: le.getLastHeartbeat(), } data, err := json.Marshal(info) if err != nil { le.logger.Error("Failed to marshal leader info", zap.Error(err)) return } // Set with same TTL as lock err = le.redis.Set(ctx, le.config.RedisInfoKeyName, data, le.config.LockTTL).Err() if err != nil { le.logger.Error("Failed to update leader info", zap.Error(err)) } } // releaseLeadership voluntarily releases leadership. func (le *mutexLeaderElection) releaseLeadership(ctx context.Context) { if !le.IsLeader() { return } le.logger.Info("Releasing leadership voluntarily", zap.String("instance_id", le.instanceID)) // Release the lock using distributed mutex le.mutex.Release(ctx, le.config.RedisKeyName) // Delete leader info le.redis.Del(ctx, le.config.RedisInfoKeyName) le.setLeaderStatus(false) le.executeCallbacks(le.loseLeadershipCbs) } // IsLeader returns true if this instance is the leader. func (le *mutexLeaderElection) IsLeader() bool { le.leaderMutex.RLock() defer le.leaderMutex.RUnlock() return le.isLeader } // GetLeaderID returns the ID of the current leader. func (le *mutexLeaderElection) GetLeaderID() (string, error) { ctx := context.Background() // Check if we own the lock isOwner, err := le.mutex.IsOwner(ctx, le.config.RedisKeyName) if err != nil { return "", fmt.Errorf("failed to check lock ownership: %w", err) } if isOwner { return le.instanceID, nil } // We don't own it, try to get from Redis leaderID, err := le.redis.Get(ctx, le.config.RedisKeyName).Result() if err == redis.Nil { return "", nil } if err != nil { return "", fmt.Errorf("failed to get leader ID: %w", err) } return leaderID, nil } // GetLeaderInfo returns information about the current leader. func (le *mutexLeaderElection) GetLeaderInfo() (*LeaderInfo, error) { ctx := context.Background() data, err := le.redis.Get(ctx, le.config.RedisInfoKeyName).Result() if err == redis.Nil { return nil, nil } if err != nil { return nil, fmt.Errorf("failed to get leader info: %w", err) } var info LeaderInfo if err := json.Unmarshal([]byte(data), &info); err != nil { return nil, fmt.Errorf("failed to unmarshal leader info: %w", err) } return &info, nil } // OnBecomeLeader registers a callback for when this instance becomes leader. func (le *mutexLeaderElection) OnBecomeLeader(callback func()) { le.callbackMutex.Lock() defer le.callbackMutex.Unlock() le.becomeLeaderCbs = append(le.becomeLeaderCbs, callback) } // OnLoseLeadership registers a callback for when this instance loses leadership. func (le *mutexLeaderElection) OnLoseLeadership(callback func()) { le.callbackMutex.Lock() defer le.callbackMutex.Unlock() le.loseLeadershipCbs = append(le.loseLeadershipCbs, callback) } // Stop gracefully stops leader election. func (le *mutexLeaderElection) Stop() error { le.logger.Info("Stopping leader election") close(le.stopChan) // Wait for the election loop to finish (with timeout) select { case <-le.stoppedChan: le.logger.Info("Leader election stopped successfully") return nil case <-time.After(5 * time.Second): le.logger.Warn("Timeout waiting for leader election to stop") return fmt.Errorf("timeout waiting for leader election to stop") } } // GetInstanceID returns this instance's unique identifier. func (le *mutexLeaderElection) GetInstanceID() string { return le.instanceID } // setLeaderStatus updates the leader status (thread-safe). func (le *mutexLeaderElection) setLeaderStatus(isLeader bool) { le.leaderMutex.Lock() defer le.leaderMutex.Unlock() le.isLeader = isLeader } // setLastHeartbeat updates the last heartbeat time (thread-safe). func (le *mutexLeaderElection) setLastHeartbeat(t time.Time) { le.lastHeartbeatMutex.Lock() defer le.lastHeartbeatMutex.Unlock() le.lastHeartbeat = t } // getLastHeartbeat gets the last heartbeat time (thread-safe). func (le *mutexLeaderElection) getLastHeartbeat() time.Time { le.lastHeartbeatMutex.RLock() defer le.lastHeartbeatMutex.RUnlock() return le.lastHeartbeat } // executeCallbacks executes a list of callbacks in separate goroutines. func (le *mutexLeaderElection) executeCallbacks(callbacks []func()) { le.callbackMutex.RLock() defer le.callbackMutex.RUnlock() for _, callback := range callbacks { go func(cb func()) { defer func() { if r := recover(); r != nil { le.logger.Error("Panic in leader election callback", zap.Any("panic", r)) } }() cb() }(callback) } }