351 lines
9.5 KiB
Go
351 lines
9.5 KiB
Go
package leaderelection
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
"go.uber.org/zap"
|
|
|
|
"codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/pkg/distributedmutex"
|
|
)
|
|
|
|
// mutexLeaderElection implements LeaderElection using distributedmutex.
|
|
type mutexLeaderElection struct {
|
|
config *Config
|
|
mutex distributedmutex.Adapter
|
|
redis redis.UniversalClient
|
|
logger *zap.Logger
|
|
instanceID string
|
|
hostname string
|
|
isLeader bool
|
|
leaderMutex sync.RWMutex
|
|
becomeLeaderCbs []func()
|
|
loseLeadershipCbs []func()
|
|
callbackMutex sync.RWMutex
|
|
stopChan chan struct{}
|
|
stoppedChan chan struct{}
|
|
leaderStartTime time.Time
|
|
lastHeartbeat time.Time
|
|
lastHeartbeatMutex sync.RWMutex
|
|
}
|
|
|
|
// NewMutexLeaderElection creates a new distributed mutex-based leader election instance.
|
|
func NewMutexLeaderElection(
|
|
config *Config,
|
|
mutex distributedmutex.Adapter,
|
|
redisClient redis.UniversalClient,
|
|
logger *zap.Logger,
|
|
) (LeaderElection, error) {
|
|
logger = logger.Named("LeaderElection")
|
|
|
|
// Validate configuration
|
|
if err := config.Validate(); err != nil {
|
|
return nil, fmt.Errorf("invalid configuration: %w", err)
|
|
}
|
|
|
|
// Generate instance ID if not provided
|
|
instanceID := config.InstanceID
|
|
if instanceID == "" {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = "unknown"
|
|
}
|
|
// Add random suffix to make it unique
|
|
instanceID = fmt.Sprintf("%s-%d", hostname, rand.Intn(100000))
|
|
logger.Info("Generated instance ID", zap.String("instance_id", instanceID))
|
|
}
|
|
|
|
// Get hostname if not provided
|
|
hostname := config.Hostname
|
|
if hostname == "" {
|
|
h, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = "unknown"
|
|
} else {
|
|
hostname = h
|
|
}
|
|
}
|
|
|
|
return &mutexLeaderElection{
|
|
config: config,
|
|
mutex: mutex,
|
|
redis: redisClient,
|
|
logger: logger,
|
|
instanceID: instanceID,
|
|
hostname: hostname,
|
|
isLeader: false,
|
|
becomeLeaderCbs: make([]func(), 0),
|
|
loseLeadershipCbs: make([]func(), 0),
|
|
stopChan: make(chan struct{}),
|
|
stoppedChan: make(chan struct{}),
|
|
}, nil
|
|
}
|
|
|
|
// Start begins participating in leader election.
|
|
func (le *mutexLeaderElection) Start(ctx context.Context) error {
|
|
le.logger.Info("Starting leader election",
|
|
zap.String("instance_id", le.instanceID),
|
|
zap.String("hostname", le.hostname),
|
|
zap.Duration("lock_ttl", le.config.LockTTL),
|
|
zap.Duration("heartbeat_interval", le.config.HeartbeatInterval),
|
|
)
|
|
|
|
defer close(le.stoppedChan)
|
|
|
|
// Main election loop
|
|
ticker := time.NewTicker(le.config.RetryInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Try to become leader immediately on startup
|
|
le.tryBecomeLeader(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
le.logger.Info("Context cancelled, stopping leader election")
|
|
le.releaseLeadership(context.Background())
|
|
return ctx.Err()
|
|
|
|
case <-le.stopChan:
|
|
le.logger.Info("Stop signal received, stopping leader election")
|
|
le.releaseLeadership(context.Background())
|
|
return nil
|
|
|
|
case <-ticker.C:
|
|
if le.IsLeader() {
|
|
// If we're the leader, send heartbeat
|
|
if err := le.sendHeartbeat(ctx); err != nil {
|
|
le.logger.Error("Failed to send heartbeat, lost leadership",
|
|
zap.Error(err))
|
|
le.setLeaderStatus(false)
|
|
le.executeCallbacks(le.loseLeadershipCbs)
|
|
}
|
|
} else {
|
|
// If we're not the leader, try to become leader
|
|
le.tryBecomeLeader(ctx)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// tryBecomeLeader attempts to acquire leadership using distributed mutex.
|
|
func (le *mutexLeaderElection) tryBecomeLeader(ctx context.Context) {
|
|
// Try to acquire the lock (non-blocking)
|
|
acquired, err := le.mutex.TryAcquire(ctx, le.config.RedisKeyName, le.config.LockTTL)
|
|
if err != nil {
|
|
le.logger.Error("Failed to attempt leader election",
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if acquired {
|
|
// We became the leader!
|
|
le.logger.Info("🎉 Became the leader!",
|
|
zap.String("instance_id", le.instanceID))
|
|
|
|
le.leaderStartTime = time.Now()
|
|
le.setLeaderStatus(true)
|
|
le.updateLeaderInfo(ctx)
|
|
le.executeCallbacks(le.becomeLeaderCbs)
|
|
} else {
|
|
// Someone else is the leader
|
|
if !le.IsLeader() {
|
|
// Only log if we weren't already aware
|
|
currentLeader, _ := le.GetLeaderID()
|
|
le.logger.Debug("Another instance is the leader",
|
|
zap.String("leader_id", currentLeader))
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendHeartbeat renews the leader lock using distributed mutex.
|
|
func (le *mutexLeaderElection) sendHeartbeat(ctx context.Context) error {
|
|
// Extend the lock TTL
|
|
err := le.mutex.Extend(ctx, le.config.RedisKeyName, le.config.LockTTL)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to extend lock: %w", err)
|
|
}
|
|
|
|
// Update heartbeat time
|
|
le.setLastHeartbeat(time.Now())
|
|
|
|
// Update leader info
|
|
le.updateLeaderInfo(ctx)
|
|
|
|
le.logger.Debug("Heartbeat sent",
|
|
zap.String("instance_id", le.instanceID))
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateLeaderInfo updates the leader information in Redis.
|
|
func (le *mutexLeaderElection) updateLeaderInfo(ctx context.Context) {
|
|
info := &LeaderInfo{
|
|
InstanceID: le.instanceID,
|
|
Hostname: le.hostname,
|
|
StartedAt: le.leaderStartTime,
|
|
LastHeartbeat: le.getLastHeartbeat(),
|
|
}
|
|
|
|
data, err := json.Marshal(info)
|
|
if err != nil {
|
|
le.logger.Error("Failed to marshal leader info", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// Set with same TTL as lock
|
|
err = le.redis.Set(ctx, le.config.RedisInfoKeyName, data, le.config.LockTTL).Err()
|
|
if err != nil {
|
|
le.logger.Error("Failed to update leader info", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// releaseLeadership voluntarily releases leadership.
|
|
func (le *mutexLeaderElection) releaseLeadership(ctx context.Context) {
|
|
if !le.IsLeader() {
|
|
return
|
|
}
|
|
|
|
le.logger.Info("Releasing leadership voluntarily",
|
|
zap.String("instance_id", le.instanceID))
|
|
|
|
// Release the lock using distributed mutex
|
|
le.mutex.Release(ctx, le.config.RedisKeyName)
|
|
|
|
// Delete leader info
|
|
le.redis.Del(ctx, le.config.RedisInfoKeyName)
|
|
|
|
le.setLeaderStatus(false)
|
|
le.executeCallbacks(le.loseLeadershipCbs)
|
|
}
|
|
|
|
// IsLeader returns true if this instance is the leader.
|
|
func (le *mutexLeaderElection) IsLeader() bool {
|
|
le.leaderMutex.RLock()
|
|
defer le.leaderMutex.RUnlock()
|
|
return le.isLeader
|
|
}
|
|
|
|
// GetLeaderID returns the ID of the current leader.
|
|
func (le *mutexLeaderElection) GetLeaderID() (string, error) {
|
|
ctx := context.Background()
|
|
|
|
// Check if we own the lock
|
|
isOwner, err := le.mutex.IsOwner(ctx, le.config.RedisKeyName)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to check lock ownership: %w", err)
|
|
}
|
|
|
|
if isOwner {
|
|
return le.instanceID, nil
|
|
}
|
|
|
|
// We don't own it, try to get from Redis
|
|
leaderID, err := le.redis.Get(ctx, le.config.RedisKeyName).Result()
|
|
if err == redis.Nil {
|
|
return "", nil
|
|
}
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to get leader ID: %w", err)
|
|
}
|
|
return leaderID, nil
|
|
}
|
|
|
|
// GetLeaderInfo returns information about the current leader.
|
|
func (le *mutexLeaderElection) GetLeaderInfo() (*LeaderInfo, error) {
|
|
ctx := context.Background()
|
|
data, err := le.redis.Get(ctx, le.config.RedisInfoKeyName).Result()
|
|
if err == redis.Nil {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get leader info: %w", err)
|
|
}
|
|
|
|
var info LeaderInfo
|
|
if err := json.Unmarshal([]byte(data), &info); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal leader info: %w", err)
|
|
}
|
|
|
|
return &info, nil
|
|
}
|
|
|
|
// OnBecomeLeader registers a callback for when this instance becomes leader.
|
|
func (le *mutexLeaderElection) OnBecomeLeader(callback func()) {
|
|
le.callbackMutex.Lock()
|
|
defer le.callbackMutex.Unlock()
|
|
le.becomeLeaderCbs = append(le.becomeLeaderCbs, callback)
|
|
}
|
|
|
|
// OnLoseLeadership registers a callback for when this instance loses leadership.
|
|
func (le *mutexLeaderElection) OnLoseLeadership(callback func()) {
|
|
le.callbackMutex.Lock()
|
|
defer le.callbackMutex.Unlock()
|
|
le.loseLeadershipCbs = append(le.loseLeadershipCbs, callback)
|
|
}
|
|
|
|
// Stop gracefully stops leader election.
|
|
func (le *mutexLeaderElection) Stop() error {
|
|
le.logger.Info("Stopping leader election")
|
|
close(le.stopChan)
|
|
|
|
// Wait for the election loop to finish (with timeout)
|
|
select {
|
|
case <-le.stoppedChan:
|
|
le.logger.Info("Leader election stopped successfully")
|
|
return nil
|
|
case <-time.After(5 * time.Second):
|
|
le.logger.Warn("Timeout waiting for leader election to stop")
|
|
return fmt.Errorf("timeout waiting for leader election to stop")
|
|
}
|
|
}
|
|
|
|
// GetInstanceID returns this instance's unique identifier.
|
|
func (le *mutexLeaderElection) GetInstanceID() string {
|
|
return le.instanceID
|
|
}
|
|
|
|
// setLeaderStatus updates the leader status (thread-safe).
|
|
func (le *mutexLeaderElection) setLeaderStatus(isLeader bool) {
|
|
le.leaderMutex.Lock()
|
|
defer le.leaderMutex.Unlock()
|
|
le.isLeader = isLeader
|
|
}
|
|
|
|
// setLastHeartbeat updates the last heartbeat time (thread-safe).
|
|
func (le *mutexLeaderElection) setLastHeartbeat(t time.Time) {
|
|
le.lastHeartbeatMutex.Lock()
|
|
defer le.lastHeartbeatMutex.Unlock()
|
|
le.lastHeartbeat = t
|
|
}
|
|
|
|
// getLastHeartbeat gets the last heartbeat time (thread-safe).
|
|
func (le *mutexLeaderElection) getLastHeartbeat() time.Time {
|
|
le.lastHeartbeatMutex.RLock()
|
|
defer le.lastHeartbeatMutex.RUnlock()
|
|
return le.lastHeartbeat
|
|
}
|
|
|
|
// executeCallbacks executes a list of callbacks in separate goroutines.
|
|
func (le *mutexLeaderElection) executeCallbacks(callbacks []func()) {
|
|
le.callbackMutex.RLock()
|
|
defer le.callbackMutex.RUnlock()
|
|
|
|
for _, callback := range callbacks {
|
|
go func(cb func()) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
le.logger.Error("Panic in leader election callback",
|
|
zap.Any("panic", r))
|
|
}
|
|
}()
|
|
cb()
|
|
}(callback)
|
|
}
|
|
}
|