package cache import ( "context" "time" "github.com/gocql/gocql" "go.uber.org/zap" ) // CassandraCacher defines the interface for Cassandra cache operations type CassandraCacher interface { Shutdown(ctx context.Context) Get(ctx context.Context, key string) ([]byte, error) Set(ctx context.Context, key string, val []byte) error SetWithExpiry(ctx context.Context, key string, val []byte, expiry time.Duration) error Delete(ctx context.Context, key string) error PurgeExpired(ctx context.Context) error } type cassandraCache struct { session *gocql.Session logger *zap.Logger } // NewCassandraCache creates a new Cassandra cache instance func NewCassandraCache(session *gocql.Session, logger *zap.Logger) CassandraCacher { logger = logger.Named("cassandra-cache") logger.Info("✓ Cassandra cache layer initialized") return &cassandraCache{ session: session, logger: logger, } } func (c *cassandraCache) Shutdown(ctx context.Context) { c.logger.Info("shutting down Cassandra cache") // Note: Don't close the session here as it's managed by the database layer } func (c *cassandraCache) Get(ctx context.Context, key string) ([]byte, error) { var value []byte var expiresAt time.Time query := `SELECT value, expires_at FROM cache WHERE key = ?` err := c.session.Query(query, key).WithContext(ctx).Consistency(gocql.LocalQuorum).Scan(&value, &expiresAt) if err == gocql.ErrNotFound { // Key doesn't exist - this is not an error return nil, nil } if err != nil { return nil, err } // Check if expired in application code if time.Now().After(expiresAt) { // Entry is expired, delete it and return nil _ = c.Delete(ctx, key) // Clean up expired entry return nil, nil } return value, nil } func (c *cassandraCache) Set(ctx context.Context, key string, val []byte) error { expiresAt := time.Now().Add(24 * time.Hour) // Default 24 hour expiry query := `INSERT INTO cache (key, expires_at, value) VALUES (?, ?, ?)` return c.session.Query(query, key, expiresAt, val).WithContext(ctx).Consistency(gocql.LocalQuorum).Exec() } func (c *cassandraCache) SetWithExpiry(ctx context.Context, key string, val []byte, expiry time.Duration) error { expiresAt := time.Now().Add(expiry) query := `INSERT INTO cache (key, expires_at, value) VALUES (?, ?, ?)` return c.session.Query(query, key, expiresAt, val).WithContext(ctx).Consistency(gocql.LocalQuorum).Exec() } func (c *cassandraCache) Delete(ctx context.Context, key string) error { query := `DELETE FROM cache WHERE key = ?` return c.session.Query(query, key).WithContext(ctx).Consistency(gocql.LocalQuorum).Exec() } func (c *cassandraCache) PurgeExpired(ctx context.Context) error { now := time.Now() // Thanks to the index on expires_at, this query is efficient iter := c.session.Query(`SELECT key FROM cache WHERE expires_at < ? ALLOW FILTERING`, now).WithContext(ctx).Iter() var expiredKeys []string var key string for iter.Scan(&key) { expiredKeys = append(expiredKeys, key) } if err := iter.Close(); err != nil { return err } // Delete expired keys in batch if len(expiredKeys) > 0 { batch := c.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) for _, expiredKey := range expiredKeys { batch.Query(`DELETE FROM cache WHERE key = ?`, expiredKey) } return c.session.ExecuteBatch(batch) } return nil }