monorepo/cloud/maplefile-backend/pkg/distributedmutex/distributelocker.go

220 lines
6.4 KiB
Go

// File Path: monorepo/cloud/maplefile-backend/pkg/distributedmutex/distributedmutex.go
package distributedmutex
import (
"context"
"fmt"
"sync"
"time"
"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
)
// Adapter provides interface for abstracting distributedmutex generation.
type Adapter interface {
// Blocking acquire - waits until lock is obtained or timeout
Acquire(ctx context.Context, key string)
Acquiref(ctx context.Context, format string, a ...any)
Release(ctx context.Context, key string)
Releasef(ctx context.Context, format string, a ...any)
// Non-blocking operations for leader election
// TryAcquire attempts to acquire a lock without blocking
// Returns true if lock was acquired, false if already held by someone else
TryAcquire(ctx context.Context, key string, ttl time.Duration) (bool, error)
// Extend renews the TTL of an existing lock
// Returns error if the lock is not owned by this instance
Extend(ctx context.Context, key string, ttl time.Duration) error
// IsOwner checks if this instance owns the given lock
IsOwner(ctx context.Context, key string) (bool, error)
}
type distributedLockerAdapter struct {
Logger *zap.Logger
Redis redis.UniversalClient
Locker *redislock.Client
LockInstances map[string]*redislock.Lock
Mutex *sync.Mutex // Add a mutex for synchronization with goroutines
}
// NewAdapter constructor that returns the default DistributedLocker generator.
func NewAdapter(loggerp *zap.Logger, redisClient redis.UniversalClient) Adapter {
loggerp = loggerp.Named("DistributedMutex")
loggerp.Debug("distributed mutex starting and connecting...")
// Create a new lock client.
locker := redislock.New(redisClient)
loggerp.Debug("distributed mutex initialized")
return distributedLockerAdapter{
Logger: loggerp,
Redis: redisClient,
Locker: locker,
LockInstances: make(map[string]*redislock.Lock, 0),
Mutex: &sync.Mutex{}, // Initialize the mutex
}
}
// Acquire function blocks the current thread if the lock key is currently locked.
func (a distributedLockerAdapter) Acquire(ctx context.Context, k string) {
startDT := time.Now()
a.Logger.Debug(fmt.Sprintf("locking for key: %v", k))
// Retry every 250ms, for up-to 20x
backoff := redislock.LimitRetry(redislock.LinearBackoff(250*time.Millisecond), 20)
// Obtain lock with retry
lock, err := a.Locker.Obtain(ctx, k, time.Minute, &redislock.Options{
RetryStrategy: backoff,
})
if err == redislock.ErrNotObtained {
nowDT := time.Now()
diff := nowDT.Sub(startDT)
a.Logger.Error("could not obtain lock",
zap.String("key", k),
zap.Time("start_dt", startDT),
zap.Time("now_dt", nowDT),
zap.Any("duration_in_minutes", diff.Minutes()))
return
} else if err != nil {
a.Logger.Error("failed obtaining lock",
zap.String("key", k),
zap.Any("error", err),
)
return
}
// DEVELOPERS NOTE:
// The `map` datastructure in Golang is not concurrently safe, therefore we
// need to use mutex to coordinate access of our `LockInstances` map
// resource between all the goroutines.
a.Mutex.Lock()
defer a.Mutex.Unlock()
if a.LockInstances != nil { // Defensive code.
a.LockInstances[k] = lock
}
}
// Acquiref function blocks the current thread if the lock key is currently locked.
func (u distributedLockerAdapter) Acquiref(ctx context.Context, format string, a ...any) {
k := fmt.Sprintf(format, a...)
u.Acquire(ctx, k)
return
}
// Release function blocks the current thread if the lock key is currently locked.
func (a distributedLockerAdapter) Release(ctx context.Context, k string) {
a.Logger.Debug(fmt.Sprintf("unlocking for key: %v", k))
lockInstance, ok := a.LockInstances[k]
if ok {
defer lockInstance.Release(ctx)
} else {
a.Logger.Error("could not obtain to unlock", zap.String("key", k))
}
return
}
// Releasef
func (u distributedLockerAdapter) Releasef(ctx context.Context, format string, a ...any) {
k := fmt.Sprintf(format, a...) //TODO: https://github.com/bsm/redislock/blob/main/README.md
u.Release(ctx, k)
return
}
// TryAcquire attempts to acquire a lock without blocking.
// Returns true if lock was acquired, false if already held by someone else.
func (a distributedLockerAdapter) TryAcquire(ctx context.Context, k string, ttl time.Duration) (bool, error) {
a.Logger.Debug(fmt.Sprintf("trying to acquire lock for key: %v with ttl: %v", k, ttl))
// Try to obtain lock without retries (non-blocking)
lock, err := a.Locker.Obtain(ctx, k, ttl, &redislock.Options{
RetryStrategy: redislock.NoRetry(),
})
if err == redislock.ErrNotObtained {
// Lock is held by someone else
a.Logger.Debug("lock not obtained, already held by another instance",
zap.String("key", k))
return false, nil
}
if err != nil {
// Actual error occurred
a.Logger.Error("failed trying to obtain lock",
zap.String("key", k),
zap.Error(err))
return false, err
}
// Successfully acquired lock
a.Mutex.Lock()
defer a.Mutex.Unlock()
if a.LockInstances != nil {
a.LockInstances[k] = lock
}
a.Logger.Debug("successfully acquired lock",
zap.String("key", k),
zap.Duration("ttl", ttl))
return true, nil
}
// Extend renews the TTL of an existing lock.
// Returns error if the lock is not owned by this instance.
func (a distributedLockerAdapter) Extend(ctx context.Context, k string, ttl time.Duration) error {
a.Logger.Debug(fmt.Sprintf("extending lock for key: %v with ttl: %v", k, ttl))
a.Mutex.Lock()
lockInstance, ok := a.LockInstances[k]
a.Mutex.Unlock()
if !ok {
err := fmt.Errorf("lock not found in instances map")
a.Logger.Error("cannot extend lock, not owned by this instance",
zap.String("key", k),
zap.Error(err))
return err
}
// Extend the lock TTL
err := lockInstance.Refresh(ctx, ttl, nil)
if err != nil {
a.Logger.Error("failed to extend lock",
zap.String("key", k),
zap.Error(err))
return err
}
a.Logger.Debug("successfully extended lock",
zap.String("key", k),
zap.Duration("ttl", ttl))
return nil
}
// IsOwner checks if this instance owns the given lock.
func (a distributedLockerAdapter) IsOwner(ctx context.Context, k string) (bool, error) {
a.Mutex.Lock()
lockInstance, ok := a.LockInstances[k]
a.Mutex.Unlock()
if !ok {
// Not in our instances map
return false, nil
}
// Get the lock metadata to check if we still own it
metadata := lockInstance.Metadata()
// If metadata is empty, we don't own it
return metadata != "", nil
}