355 lines
9.6 KiB
Go
355 lines
9.6 KiB
Go
package leaderelection
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// redisLeaderElection implements LeaderElection using Redis.
|
|
type redisLeaderElection struct {
|
|
config *Config
|
|
redis redis.UniversalClient
|
|
logger *zap.Logger
|
|
instanceID string
|
|
hostname string
|
|
isLeader bool
|
|
leaderMutex sync.RWMutex
|
|
becomeLeaderCbs []func()
|
|
loseLeadershipCbs []func()
|
|
callbackMutex sync.RWMutex
|
|
stopChan chan struct{}
|
|
stoppedChan chan struct{}
|
|
leaderStartTime time.Time
|
|
lastHeartbeat time.Time
|
|
lastHeartbeatMutex sync.RWMutex
|
|
}
|
|
|
|
// NewRedisLeaderElection creates a new Redis-based leader election instance.
|
|
func NewRedisLeaderElection(
|
|
config *Config,
|
|
redisClient redis.UniversalClient,
|
|
logger *zap.Logger,
|
|
) (LeaderElection, error) {
|
|
logger = logger.Named("LeaderElection")
|
|
|
|
// Validate configuration
|
|
if err := config.Validate(); err != nil {
|
|
return nil, fmt.Errorf("invalid configuration: %w", err)
|
|
}
|
|
|
|
// Generate instance ID if not provided
|
|
instanceID := config.InstanceID
|
|
if instanceID == "" {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = "unknown"
|
|
}
|
|
// Add random suffix to make it unique
|
|
instanceID = fmt.Sprintf("%s-%d", hostname, rand.Intn(100000))
|
|
logger.Info("Generated instance ID", zap.String("instance_id", instanceID))
|
|
}
|
|
|
|
// Get hostname if not provided
|
|
hostname := config.Hostname
|
|
if hostname == "" {
|
|
h, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = "unknown"
|
|
} else {
|
|
hostname = h
|
|
}
|
|
}
|
|
|
|
return &redisLeaderElection{
|
|
config: config,
|
|
redis: redisClient,
|
|
logger: logger,
|
|
instanceID: instanceID,
|
|
hostname: hostname,
|
|
isLeader: false,
|
|
becomeLeaderCbs: make([]func(), 0),
|
|
loseLeadershipCbs: make([]func(), 0),
|
|
stopChan: make(chan struct{}),
|
|
stoppedChan: make(chan struct{}),
|
|
}, nil
|
|
}
|
|
|
|
// Start begins participating in leader election.
|
|
func (le *redisLeaderElection) Start(ctx context.Context) error {
|
|
le.logger.Info("Starting leader election",
|
|
zap.String("instance_id", le.instanceID),
|
|
zap.String("hostname", le.hostname),
|
|
zap.Duration("lock_ttl", le.config.LockTTL),
|
|
zap.Duration("heartbeat_interval", le.config.HeartbeatInterval),
|
|
)
|
|
|
|
defer close(le.stoppedChan)
|
|
|
|
// Main election loop
|
|
ticker := time.NewTicker(le.config.RetryInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Try to become leader immediately on startup
|
|
le.tryBecomeLeader(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
le.logger.Info("Context cancelled, stopping leader election")
|
|
le.releaseLeadership(context.Background())
|
|
return ctx.Err()
|
|
|
|
case <-le.stopChan:
|
|
le.logger.Info("Stop signal received, stopping leader election")
|
|
le.releaseLeadership(context.Background())
|
|
return nil
|
|
|
|
case <-ticker.C:
|
|
if le.IsLeader() {
|
|
// If we're the leader, send heartbeat
|
|
if err := le.sendHeartbeat(ctx); err != nil {
|
|
le.logger.Error("Failed to send heartbeat, lost leadership",
|
|
zap.Error(err))
|
|
le.setLeaderStatus(false)
|
|
le.executeCallbacks(le.loseLeadershipCbs)
|
|
}
|
|
} else {
|
|
// If we're not the leader, try to become leader
|
|
le.tryBecomeLeader(ctx)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// tryBecomeLeader attempts to acquire leadership.
|
|
func (le *redisLeaderElection) tryBecomeLeader(ctx context.Context) {
|
|
// Try to set the leader key with NX (only if not exists) and EX (expiry)
|
|
success, err := le.redis.SetNX(ctx, le.config.RedisKeyName, le.instanceID, le.config.LockTTL).Result()
|
|
if err != nil {
|
|
le.logger.Error("Failed to attempt leader election",
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if success {
|
|
// We became the leader!
|
|
le.logger.Info("🎉 Became the leader!",
|
|
zap.String("instance_id", le.instanceID))
|
|
|
|
le.leaderStartTime = time.Now()
|
|
le.setLeaderStatus(true)
|
|
le.updateLeaderInfo(ctx)
|
|
le.executeCallbacks(le.becomeLeaderCbs)
|
|
} else {
|
|
// Someone else is the leader
|
|
if !le.IsLeader() {
|
|
// Only log if we weren't already aware
|
|
currentLeader, _ := le.GetLeaderID()
|
|
le.logger.Debug("Another instance is the leader",
|
|
zap.String("leader_id", currentLeader))
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendHeartbeat renews the leader lock.
|
|
func (le *redisLeaderElection) sendHeartbeat(ctx context.Context) error {
|
|
// Verify we still hold the lock
|
|
currentValue, err := le.redis.Get(ctx, le.config.RedisKeyName).Result()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get current lock value: %w", err)
|
|
}
|
|
|
|
if currentValue != le.instanceID {
|
|
return fmt.Errorf("lock held by different instance: %s", currentValue)
|
|
}
|
|
|
|
// Renew the lock
|
|
err = le.redis.Expire(ctx, le.config.RedisKeyName, le.config.LockTTL).Err()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to renew lock: %w", err)
|
|
}
|
|
|
|
// Update heartbeat time
|
|
le.setLastHeartbeat(time.Now())
|
|
|
|
// Update leader info
|
|
le.updateLeaderInfo(ctx)
|
|
|
|
le.logger.Debug("Heartbeat sent",
|
|
zap.String("instance_id", le.instanceID))
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateLeaderInfo updates the leader information in Redis.
|
|
func (le *redisLeaderElection) updateLeaderInfo(ctx context.Context) {
|
|
info := &LeaderInfo{
|
|
InstanceID: le.instanceID,
|
|
Hostname: le.hostname,
|
|
StartedAt: le.leaderStartTime,
|
|
LastHeartbeat: le.getLastHeartbeat(),
|
|
}
|
|
|
|
data, err := json.Marshal(info)
|
|
if err != nil {
|
|
le.logger.Error("Failed to marshal leader info", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// Set with same TTL as lock
|
|
err = le.redis.Set(ctx, le.config.RedisInfoKeyName, data, le.config.LockTTL).Err()
|
|
if err != nil {
|
|
le.logger.Error("Failed to update leader info", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// releaseLeadership voluntarily releases leadership.
|
|
func (le *redisLeaderElection) releaseLeadership(ctx context.Context) {
|
|
if !le.IsLeader() {
|
|
return
|
|
}
|
|
|
|
le.logger.Info("Releasing leadership voluntarily",
|
|
zap.String("instance_id", le.instanceID))
|
|
|
|
// Only delete if we're still the owner
|
|
script := `
|
|
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
return redis.call("DEL", KEYS[1])
|
|
else
|
|
return 0
|
|
end
|
|
`
|
|
|
|
_, err := le.redis.Eval(ctx, script, []string{le.config.RedisKeyName}, le.instanceID).Result()
|
|
if err != nil {
|
|
le.logger.Error("Failed to release leadership", zap.Error(err))
|
|
}
|
|
|
|
// Delete leader info
|
|
le.redis.Del(ctx, le.config.RedisInfoKeyName)
|
|
|
|
le.setLeaderStatus(false)
|
|
le.executeCallbacks(le.loseLeadershipCbs)
|
|
}
|
|
|
|
// IsLeader returns true if this instance is the leader.
|
|
func (le *redisLeaderElection) IsLeader() bool {
|
|
le.leaderMutex.RLock()
|
|
defer le.leaderMutex.RUnlock()
|
|
return le.isLeader
|
|
}
|
|
|
|
// GetLeaderID returns the ID of the current leader.
|
|
func (le *redisLeaderElection) GetLeaderID() (string, error) {
|
|
ctx := context.Background()
|
|
leaderID, err := le.redis.Get(ctx, le.config.RedisKeyName).Result()
|
|
if err == redis.Nil {
|
|
return "", nil
|
|
}
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to get leader ID: %w", err)
|
|
}
|
|
return leaderID, nil
|
|
}
|
|
|
|
// GetLeaderInfo returns information about the current leader.
|
|
func (le *redisLeaderElection) GetLeaderInfo() (*LeaderInfo, error) {
|
|
ctx := context.Background()
|
|
data, err := le.redis.Get(ctx, le.config.RedisInfoKeyName).Result()
|
|
if err == redis.Nil {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get leader info: %w", err)
|
|
}
|
|
|
|
var info LeaderInfo
|
|
if err := json.Unmarshal([]byte(data), &info); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal leader info: %w", err)
|
|
}
|
|
|
|
return &info, nil
|
|
}
|
|
|
|
// OnBecomeLeader registers a callback for when this instance becomes leader.
|
|
func (le *redisLeaderElection) OnBecomeLeader(callback func()) {
|
|
le.callbackMutex.Lock()
|
|
defer le.callbackMutex.Unlock()
|
|
le.becomeLeaderCbs = append(le.becomeLeaderCbs, callback)
|
|
}
|
|
|
|
// OnLoseLeadership registers a callback for when this instance loses leadership.
|
|
func (le *redisLeaderElection) OnLoseLeadership(callback func()) {
|
|
le.callbackMutex.Lock()
|
|
defer le.callbackMutex.Unlock()
|
|
le.loseLeadershipCbs = append(le.loseLeadershipCbs, callback)
|
|
}
|
|
|
|
// Stop gracefully stops leader election.
|
|
func (le *redisLeaderElection) Stop() error {
|
|
le.logger.Info("Stopping leader election")
|
|
close(le.stopChan)
|
|
|
|
// Wait for the election loop to finish (with timeout)
|
|
select {
|
|
case <-le.stoppedChan:
|
|
le.logger.Info("Leader election stopped successfully")
|
|
return nil
|
|
case <-time.After(5 * time.Second):
|
|
le.logger.Warn("Timeout waiting for leader election to stop")
|
|
return fmt.Errorf("timeout waiting for leader election to stop")
|
|
}
|
|
}
|
|
|
|
// GetInstanceID returns this instance's unique identifier.
|
|
func (le *redisLeaderElection) GetInstanceID() string {
|
|
return le.instanceID
|
|
}
|
|
|
|
// setLeaderStatus updates the leader status (thread-safe).
|
|
func (le *redisLeaderElection) setLeaderStatus(isLeader bool) {
|
|
le.leaderMutex.Lock()
|
|
defer le.leaderMutex.Unlock()
|
|
le.isLeader = isLeader
|
|
}
|
|
|
|
// setLastHeartbeat updates the last heartbeat time (thread-safe).
|
|
func (le *redisLeaderElection) setLastHeartbeat(t time.Time) {
|
|
le.lastHeartbeatMutex.Lock()
|
|
defer le.lastHeartbeatMutex.Unlock()
|
|
le.lastHeartbeat = t
|
|
}
|
|
|
|
// getLastHeartbeat gets the last heartbeat time (thread-safe).
|
|
func (le *redisLeaderElection) getLastHeartbeat() time.Time {
|
|
le.lastHeartbeatMutex.RLock()
|
|
defer le.lastHeartbeatMutex.RUnlock()
|
|
return le.lastHeartbeat
|
|
}
|
|
|
|
// executeCallbacks executes a list of callbacks in separate goroutines.
|
|
func (le *redisLeaderElection) executeCallbacks(callbacks []func()) {
|
|
le.callbackMutex.RLock()
|
|
defer le.callbackMutex.RUnlock()
|
|
|
|
for _, callback := range callbacks {
|
|
go func(cb func()) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
le.logger.Error("Panic in leader election callback",
|
|
zap.Any("panic", r))
|
|
}
|
|
}()
|
|
cb()
|
|
}(callback)
|
|
}
|
|
}
|