monorepo/cloud/maplepress-backend/pkg/distributedmutex/distributedmutex.go

138 lines
4.5 KiB
Go

package distributedmutex
import (
"context"
"fmt"
"sync"
"time"
"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
)
// Adapter provides interface for abstracting distributed mutex operations.
// CWE-755: Methods now return errors to properly handle exceptional conditions
type Adapter interface {
Acquire(ctx context.Context, key string) error
Acquiref(ctx context.Context, format string, a ...any) error
Release(ctx context.Context, key string) error
Releasef(ctx context.Context, format string, a ...any) error
}
type distributedMutexAdapter struct {
logger *zap.Logger
redis redis.UniversalClient
locker *redislock.Client
lockInstances map[string]*redislock.Lock
mutex *sync.Mutex // Mutex for synchronization with goroutines
}
// NewAdapter constructor that returns the default distributed mutex adapter.
func NewAdapter(logger *zap.Logger, redisClient redis.UniversalClient) Adapter {
logger = logger.Named("distributed-mutex")
// Create a new lock client
locker := redislock.New(redisClient)
logger.Info("✓ Distributed mutex initialized (Redis-backed)")
return &distributedMutexAdapter{
logger: logger,
redis: redisClient,
locker: locker,
lockInstances: make(map[string]*redislock.Lock),
mutex: &sync.Mutex{}, // Initialize the mutex
}
}
// Acquire function blocks the current thread if the lock key is currently locked.
// CWE-755: Now returns error instead of silently failing
func (a *distributedMutexAdapter) Acquire(ctx context.Context, key string) error {
startDT := time.Now()
a.logger.Debug("acquiring lock", zap.String("key", key))
// Retry every 250ms, for up to 20x
backoff := redislock.LimitRetry(redislock.LinearBackoff(250*time.Millisecond), 20)
// Obtain lock with retry
lock, err := a.locker.Obtain(ctx, key, time.Minute, &redislock.Options{
RetryStrategy: backoff,
})
if err == redislock.ErrNotObtained {
nowDT := time.Now()
diff := nowDT.Sub(startDT)
a.logger.Error("could not obtain lock after retries",
zap.String("key", key),
zap.Time("start_dt", startDT),
zap.Time("now_dt", nowDT),
zap.Duration("duration", diff),
zap.Int("max_retries", 20))
return fmt.Errorf("could not obtain lock after 20 retries (waited %s): %w", diff, err)
} else if err != nil {
a.logger.Error("failed obtaining lock",
zap.String("key", key),
zap.Error(err))
return fmt.Errorf("failed to obtain lock: %w", err)
}
// DEVELOPERS NOTE:
// The `map` datastructure in Golang is not concurrently safe, therefore we
// need to use mutex to coordinate access of our `lockInstances` map
// resource between all the goroutines.
a.mutex.Lock()
defer a.mutex.Unlock()
if a.lockInstances != nil { // Defensive code
a.lockInstances[key] = lock
}
a.logger.Debug("lock acquired", zap.String("key", key))
return nil // Success
}
// Acquiref function blocks the current thread if the lock key is currently locked.
// CWE-755: Now returns error from Acquire
func (a *distributedMutexAdapter) Acquiref(ctx context.Context, format string, args ...any) error {
key := fmt.Sprintf(format, args...)
return a.Acquire(ctx, key)
}
// Release function releases the lock for the given key.
// CWE-755: Now returns error instead of silently failing
func (a *distributedMutexAdapter) Release(ctx context.Context, key string) error {
a.logger.Debug("releasing lock", zap.String("key", key))
// DEVELOPERS NOTE:
// The `map` datastructure in Golang is not concurrently safe, therefore we
// need to use mutex to coordinate access of our `lockInstances` map
// resource between all the goroutines.
a.mutex.Lock()
lockInstance, ok := a.lockInstances[key]
if ok {
delete(a.lockInstances, key)
}
a.mutex.Unlock()
if ok {
if err := lockInstance.Release(ctx); err != nil {
a.logger.Error("failed to release lock",
zap.String("key", key),
zap.Error(err))
return fmt.Errorf("failed to release lock: %w", err)
}
a.logger.Debug("lock released", zap.String("key", key))
return nil // Success
}
// Lock not found - this is a warning but not an error (may have already been released)
a.logger.Warn("lock not found for release", zap.String("key", key))
return nil // Not an error, just not found
}
// Releasef function releases the lock for a formatted key.
// CWE-755: Now returns error from Release
func (a *distributedMutexAdapter) Releasef(ctx context.Context, format string, args ...any) error {
key := fmt.Sprintf(format, args...)
return a.Release(ctx, key)
}