// File Path: monorepo/cloud/maplepress-backend/pkg/storage/database/cassandra/cassandra.go package database import ( "fmt" "strings" "time" "codeberg.org/mapleopentech/monorepo/cloud/maplepress-backend/config" "github.com/gocql/gocql" "go.uber.org/zap" ) // gocqlLogger wraps zap logger to filter out noisy gocql warnings type gocqlLogger struct { logger *zap.Logger } // Print implements gocql's Logger interface func (l *gocqlLogger) Print(v ...interface{}) { msg := fmt.Sprint(v...) // Filter out noisy "invalid peer" warnings from Cassandra gossip // These are harmless and occur due to Docker networking if strings.Contains(msg, "Found invalid peer") { return } // Log other messages at debug level l.logger.Debug(msg) } // Printf implements gocql's Logger interface func (l *gocqlLogger) Printf(format string, v ...interface{}) { msg := fmt.Sprintf(format, v...) // Filter out noisy "invalid peer" warnings from Cassandra gossip if strings.Contains(msg, "Found invalid peer") { return } // Log other messages at debug level l.logger.Debug(msg) } // Println implements gocql's Logger interface func (l *gocqlLogger) Println(v ...interface{}) { msg := fmt.Sprintln(v...) // Filter out noisy "invalid peer" warnings from Cassandra gossip if strings.Contains(msg, "Found invalid peer") { return } // Log other messages at debug level l.logger.Debug(msg) } // ProvideCassandraSession creates a new Cassandra session func ProvideCassandraSession(cfg *config.Config, logger *zap.Logger) (*gocql.Session, error) { logger.Info("⏳ Connecting to Cassandra...", zap.Strings("hosts", cfg.Database.Hosts), zap.String("keyspace", cfg.Database.Keyspace)) // Create cluster configuration cluster := gocql.NewCluster(cfg.Database.Hosts...) cluster.Keyspace = cfg.Database.Keyspace cluster.Consistency = parseConsistency(cfg.Database.Consistency) cluster.ProtoVersion = 4 cluster.ConnectTimeout = 10 * time.Second cluster.Timeout = 10 * time.Second cluster.NumConns = 2 // Set custom logger to filter out noisy warnings cluster.Logger = &gocqlLogger{logger: logger.Named("gocql")} // Retry policy cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{ NumRetries: 3, Min: 1 * time.Second, Max: 10 * time.Second, } // Create session session, err := cluster.CreateSession() if err != nil { return nil, fmt.Errorf("failed to connect to Cassandra: %w", err) } logger.Info("✓ Cassandra connected", zap.String("consistency", cfg.Database.Consistency), zap.Int("connections", cluster.NumConns)) return session, nil } // parseConsistency converts string consistency level to gocql.Consistency func parseConsistency(consistency string) gocql.Consistency { switch consistency { case "ANY": return gocql.Any case "ONE": return gocql.One case "TWO": return gocql.Two case "THREE": return gocql.Three case "QUORUM": return gocql.Quorum case "ALL": return gocql.All case "LOCAL_QUORUM": return gocql.LocalQuorum case "EACH_QUORUM": return gocql.EachQuorum case "LOCAL_ONE": return gocql.LocalOne default: return gocql.Quorum // Default to QUORUM } }