220 lines
6.4 KiB
Go
220 lines
6.4 KiB
Go
// File Path: monorepo/cloud/maplefile-backend/pkg/distributedmutex/distributedmutex.go
|
|
package distributedmutex
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/bsm/redislock"
|
|
"github.com/redis/go-redis/v9"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Adapter provides interface for abstracting distributedmutex generation.
|
|
type Adapter interface {
|
|
// Blocking acquire - waits until lock is obtained or timeout
|
|
Acquire(ctx context.Context, key string)
|
|
Acquiref(ctx context.Context, format string, a ...any)
|
|
Release(ctx context.Context, key string)
|
|
Releasef(ctx context.Context, format string, a ...any)
|
|
|
|
// Non-blocking operations for leader election
|
|
// TryAcquire attempts to acquire a lock without blocking
|
|
// Returns true if lock was acquired, false if already held by someone else
|
|
TryAcquire(ctx context.Context, key string, ttl time.Duration) (bool, error)
|
|
|
|
// Extend renews the TTL of an existing lock
|
|
// Returns error if the lock is not owned by this instance
|
|
Extend(ctx context.Context, key string, ttl time.Duration) error
|
|
|
|
// IsOwner checks if this instance owns the given lock
|
|
IsOwner(ctx context.Context, key string) (bool, error)
|
|
}
|
|
|
|
type distributedLockerAdapter struct {
|
|
Logger *zap.Logger
|
|
Redis redis.UniversalClient
|
|
Locker *redislock.Client
|
|
LockInstances map[string]*redislock.Lock
|
|
Mutex *sync.Mutex // Add a mutex for synchronization with goroutines
|
|
}
|
|
|
|
// NewAdapter constructor that returns the default DistributedLocker generator.
|
|
func NewAdapter(loggerp *zap.Logger, redisClient redis.UniversalClient) Adapter {
|
|
loggerp = loggerp.Named("DistributedMutex")
|
|
loggerp.Debug("distributed mutex starting and connecting...")
|
|
|
|
// Create a new lock client.
|
|
locker := redislock.New(redisClient)
|
|
|
|
loggerp.Debug("distributed mutex initialized")
|
|
|
|
return distributedLockerAdapter{
|
|
Logger: loggerp,
|
|
Redis: redisClient,
|
|
Locker: locker,
|
|
LockInstances: make(map[string]*redislock.Lock, 0),
|
|
Mutex: &sync.Mutex{}, // Initialize the mutex
|
|
}
|
|
}
|
|
|
|
// Acquire function blocks the current thread if the lock key is currently locked.
|
|
func (a distributedLockerAdapter) Acquire(ctx context.Context, k string) {
|
|
startDT := time.Now()
|
|
a.Logger.Debug(fmt.Sprintf("locking for key: %v", k))
|
|
|
|
// Retry every 250ms, for up-to 20x
|
|
backoff := redislock.LimitRetry(redislock.LinearBackoff(250*time.Millisecond), 20)
|
|
|
|
// Obtain lock with retry
|
|
lock, err := a.Locker.Obtain(ctx, k, time.Minute, &redislock.Options{
|
|
RetryStrategy: backoff,
|
|
})
|
|
if err == redislock.ErrNotObtained {
|
|
nowDT := time.Now()
|
|
diff := nowDT.Sub(startDT)
|
|
a.Logger.Error("could not obtain lock",
|
|
zap.String("key", k),
|
|
zap.Time("start_dt", startDT),
|
|
zap.Time("now_dt", nowDT),
|
|
zap.Any("duration_in_minutes", diff.Minutes()))
|
|
return
|
|
} else if err != nil {
|
|
a.Logger.Error("failed obtaining lock",
|
|
zap.String("key", k),
|
|
zap.Any("error", err),
|
|
)
|
|
return
|
|
}
|
|
|
|
// DEVELOPERS NOTE:
|
|
// The `map` datastructure in Golang is not concurrently safe, therefore we
|
|
// need to use mutex to coordinate access of our `LockInstances` map
|
|
// resource between all the goroutines.
|
|
a.Mutex.Lock()
|
|
defer a.Mutex.Unlock()
|
|
|
|
if a.LockInstances != nil { // Defensive code.
|
|
a.LockInstances[k] = lock
|
|
}
|
|
}
|
|
|
|
// Acquiref function blocks the current thread if the lock key is currently locked.
|
|
func (u distributedLockerAdapter) Acquiref(ctx context.Context, format string, a ...any) {
|
|
k := fmt.Sprintf(format, a...)
|
|
u.Acquire(ctx, k)
|
|
return
|
|
}
|
|
|
|
// Release function blocks the current thread if the lock key is currently locked.
|
|
func (a distributedLockerAdapter) Release(ctx context.Context, k string) {
|
|
a.Logger.Debug(fmt.Sprintf("unlocking for key: %v", k))
|
|
|
|
lockInstance, ok := a.LockInstances[k]
|
|
if ok {
|
|
defer lockInstance.Release(ctx)
|
|
} else {
|
|
a.Logger.Error("could not obtain to unlock", zap.String("key", k))
|
|
}
|
|
return
|
|
}
|
|
|
|
// Releasef
|
|
func (u distributedLockerAdapter) Releasef(ctx context.Context, format string, a ...any) {
|
|
k := fmt.Sprintf(format, a...) //TODO: https://github.com/bsm/redislock/blob/main/README.md
|
|
u.Release(ctx, k)
|
|
return
|
|
}
|
|
|
|
// TryAcquire attempts to acquire a lock without blocking.
|
|
// Returns true if lock was acquired, false if already held by someone else.
|
|
func (a distributedLockerAdapter) TryAcquire(ctx context.Context, k string, ttl time.Duration) (bool, error) {
|
|
a.Logger.Debug(fmt.Sprintf("trying to acquire lock for key: %v with ttl: %v", k, ttl))
|
|
|
|
// Try to obtain lock without retries (non-blocking)
|
|
lock, err := a.Locker.Obtain(ctx, k, ttl, &redislock.Options{
|
|
RetryStrategy: redislock.NoRetry(),
|
|
})
|
|
|
|
if err == redislock.ErrNotObtained {
|
|
// Lock is held by someone else
|
|
a.Logger.Debug("lock not obtained, already held by another instance",
|
|
zap.String("key", k))
|
|
return false, nil
|
|
}
|
|
|
|
if err != nil {
|
|
// Actual error occurred
|
|
a.Logger.Error("failed trying to obtain lock",
|
|
zap.String("key", k),
|
|
zap.Error(err))
|
|
return false, err
|
|
}
|
|
|
|
// Successfully acquired lock
|
|
a.Mutex.Lock()
|
|
defer a.Mutex.Unlock()
|
|
|
|
if a.LockInstances != nil {
|
|
a.LockInstances[k] = lock
|
|
}
|
|
|
|
a.Logger.Debug("successfully acquired lock",
|
|
zap.String("key", k),
|
|
zap.Duration("ttl", ttl))
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// Extend renews the TTL of an existing lock.
|
|
// Returns error if the lock is not owned by this instance.
|
|
func (a distributedLockerAdapter) Extend(ctx context.Context, k string, ttl time.Duration) error {
|
|
a.Logger.Debug(fmt.Sprintf("extending lock for key: %v with ttl: %v", k, ttl))
|
|
|
|
a.Mutex.Lock()
|
|
lockInstance, ok := a.LockInstances[k]
|
|
a.Mutex.Unlock()
|
|
|
|
if !ok {
|
|
err := fmt.Errorf("lock not found in instances map")
|
|
a.Logger.Error("cannot extend lock, not owned by this instance",
|
|
zap.String("key", k),
|
|
zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Extend the lock TTL
|
|
err := lockInstance.Refresh(ctx, ttl, nil)
|
|
if err != nil {
|
|
a.Logger.Error("failed to extend lock",
|
|
zap.String("key", k),
|
|
zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
a.Logger.Debug("successfully extended lock",
|
|
zap.String("key", k),
|
|
zap.Duration("ttl", ttl))
|
|
|
|
return nil
|
|
}
|
|
|
|
// IsOwner checks if this instance owns the given lock.
|
|
func (a distributedLockerAdapter) IsOwner(ctx context.Context, k string) (bool, error) {
|
|
a.Mutex.Lock()
|
|
lockInstance, ok := a.LockInstances[k]
|
|
a.Mutex.Unlock()
|
|
|
|
if !ok {
|
|
// Not in our instances map
|
|
return false, nil
|
|
}
|
|
|
|
// Get the lock metadata to check if we still own it
|
|
metadata := lockInstance.Metadata()
|
|
|
|
// If metadata is empty, we don't own it
|
|
return metadata != "", nil
|
|
}
|