109 lines
3.3 KiB
Go
109 lines
3.3 KiB
Go
package cache
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/gocql/gocql"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// CassandraCacher defines the interface for Cassandra cache operations
|
|
type CassandraCacher interface {
|
|
Shutdown(ctx context.Context)
|
|
Get(ctx context.Context, key string) ([]byte, error)
|
|
Set(ctx context.Context, key string, val []byte) error
|
|
SetWithExpiry(ctx context.Context, key string, val []byte, expiry time.Duration) error
|
|
Delete(ctx context.Context, key string) error
|
|
PurgeExpired(ctx context.Context) error
|
|
}
|
|
|
|
type cassandraCache struct {
|
|
session *gocql.Session
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewCassandraCache creates a new Cassandra cache instance
|
|
func NewCassandraCache(session *gocql.Session, logger *zap.Logger) CassandraCacher {
|
|
logger = logger.Named("cassandra-cache")
|
|
logger.Info("✓ Cassandra cache layer initialized")
|
|
return &cassandraCache{
|
|
session: session,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
func (c *cassandraCache) Shutdown(ctx context.Context) {
|
|
c.logger.Info("shutting down Cassandra cache")
|
|
// Note: Don't close the session here as it's managed by the database layer
|
|
}
|
|
|
|
func (c *cassandraCache) Get(ctx context.Context, key string) ([]byte, error) {
|
|
var value []byte
|
|
var expiresAt time.Time
|
|
|
|
query := `SELECT value, expires_at FROM cache WHERE key = ?`
|
|
err := c.session.Query(query, key).WithContext(ctx).Consistency(gocql.LocalQuorum).Scan(&value, &expiresAt)
|
|
|
|
if err == gocql.ErrNotFound {
|
|
// Key doesn't exist - this is not an error
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check if expired in application code
|
|
if time.Now().After(expiresAt) {
|
|
// Entry is expired, delete it and return nil
|
|
_ = c.Delete(ctx, key) // Clean up expired entry
|
|
return nil, nil
|
|
}
|
|
|
|
return value, nil
|
|
}
|
|
|
|
func (c *cassandraCache) Set(ctx context.Context, key string, val []byte) error {
|
|
expiresAt := time.Now().Add(24 * time.Hour) // Default 24 hour expiry
|
|
query := `INSERT INTO cache (key, expires_at, value) VALUES (?, ?, ?)`
|
|
return c.session.Query(query, key, expiresAt, val).WithContext(ctx).Consistency(gocql.LocalQuorum).Exec()
|
|
}
|
|
|
|
func (c *cassandraCache) SetWithExpiry(ctx context.Context, key string, val []byte, expiry time.Duration) error {
|
|
expiresAt := time.Now().Add(expiry)
|
|
query := `INSERT INTO cache (key, expires_at, value) VALUES (?, ?, ?)`
|
|
return c.session.Query(query, key, expiresAt, val).WithContext(ctx).Consistency(gocql.LocalQuorum).Exec()
|
|
}
|
|
|
|
func (c *cassandraCache) Delete(ctx context.Context, key string) error {
|
|
query := `DELETE FROM cache WHERE key = ?`
|
|
return c.session.Query(query, key).WithContext(ctx).Consistency(gocql.LocalQuorum).Exec()
|
|
}
|
|
|
|
func (c *cassandraCache) PurgeExpired(ctx context.Context) error {
|
|
now := time.Now()
|
|
|
|
// Thanks to the index on expires_at, this query is efficient
|
|
iter := c.session.Query(`SELECT key FROM cache WHERE expires_at < ? ALLOW FILTERING`, now).WithContext(ctx).Iter()
|
|
|
|
var expiredKeys []string
|
|
var key string
|
|
for iter.Scan(&key) {
|
|
expiredKeys = append(expiredKeys, key)
|
|
}
|
|
|
|
if err := iter.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete expired keys in batch
|
|
if len(expiredKeys) > 0 {
|
|
batch := c.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
|
|
for _, expiredKey := range expiredKeys {
|
|
batch.Query(`DELETE FROM cache WHERE key = ?`, expiredKey)
|
|
}
|
|
return c.session.ExecuteBatch(batch)
|
|
}
|
|
|
|
return nil
|
|
}
|