199 lines
6.2 KiB
Go
199 lines
6.2 KiB
Go
package database
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/gocql/gocql"
|
|
"github.com/golang-migrate/migrate/v4"
|
|
_ "github.com/golang-migrate/migrate/v4/database/cassandra"
|
|
_ "github.com/golang-migrate/migrate/v4/source/file"
|
|
"go.uber.org/zap"
|
|
|
|
"codeberg.org/mapleopentech/monorepo/cloud/maplepress-backend/config"
|
|
)
|
|
|
|
// silentGocqlLogger filters out noisy "invalid peer" warnings from gocql
|
|
type silentGocqlLogger struct{}
|
|
|
|
func (l *silentGocqlLogger) Print(v ...interface{}) {
|
|
// Silently discard all gocql logs including "invalid peer" warnings
|
|
}
|
|
|
|
func (l *silentGocqlLogger) Printf(format string, v ...interface{}) {
|
|
// Silently discard all gocql logs including "invalid peer" warnings
|
|
}
|
|
|
|
func (l *silentGocqlLogger) Println(v ...interface{}) {
|
|
// Silently discard all gocql logs including "invalid peer" warnings
|
|
}
|
|
|
|
// Migrator handles database schema migrations
|
|
// This encapsulates all migration logic and makes it testable
|
|
type Migrator struct {
|
|
config *config.Config
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewMigrator creates a new migration manager
|
|
func NewMigrator(cfg *config.Config, logger *zap.Logger) *Migrator {
|
|
if logger == nil {
|
|
// Create a no-op logger if none provided (for backward compatibility)
|
|
logger = zap.NewNop()
|
|
}
|
|
return &Migrator{
|
|
config: cfg,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Up runs all pending migrations with dirty state recovery
|
|
func (m *Migrator) Up() error {
|
|
// Ensure keyspace exists before running migrations
|
|
m.logger.Debug("Ensuring keyspace exists...")
|
|
if err := m.ensureKeyspaceExists(); err != nil {
|
|
return fmt.Errorf("failed to ensure keyspace exists: %w", err)
|
|
}
|
|
|
|
m.logger.Debug("Creating migrator...")
|
|
migrateInstance, err := m.createMigrate()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create migrator: %w", err)
|
|
}
|
|
defer migrateInstance.Close()
|
|
|
|
m.logger.Debug("Checking migration version...")
|
|
version, dirty, err := migrateInstance.Version()
|
|
if err != nil && err != migrate.ErrNilVersion {
|
|
return fmt.Errorf("failed to get migration version: %w", err)
|
|
}
|
|
|
|
if dirty {
|
|
m.logger.Warn("Database is in dirty state, attempting to force clean state",
|
|
zap.Uint("version", uint(version)))
|
|
if err := migrateInstance.Force(int(version)); err != nil {
|
|
return fmt.Errorf("failed to force clean migration state: %w", err)
|
|
}
|
|
}
|
|
|
|
// Run migrations
|
|
if err := migrateInstance.Up(); err != nil && err != migrate.ErrNoChange {
|
|
return fmt.Errorf("failed to run migrations: %w", err)
|
|
}
|
|
|
|
// Get final version
|
|
finalVersion, _, err := migrateInstance.Version()
|
|
if err != nil && err != migrate.ErrNilVersion {
|
|
m.logger.Warn("Could not get final migration version", zap.Error(err))
|
|
} else if err != migrate.ErrNilVersion {
|
|
m.logger.Debug("Database migrations completed successfully",
|
|
zap.Uint("version", uint(finalVersion)))
|
|
} else {
|
|
m.logger.Debug("Database migrations completed successfully (no migrations applied)")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Down rolls back the last migration
|
|
// Useful for development and rollback scenarios
|
|
func (m *Migrator) Down() error {
|
|
migrateInstance, err := m.createMigrate()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create migrator: %w", err)
|
|
}
|
|
defer migrateInstance.Close()
|
|
|
|
if err := migrateInstance.Steps(-1); err != nil {
|
|
return fmt.Errorf("failed to rollback migration: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Version returns the current migration version
|
|
func (m *Migrator) Version() (uint, bool, error) {
|
|
migrateInstance, err := m.createMigrate()
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to create migrator: %w", err)
|
|
}
|
|
defer migrateInstance.Close()
|
|
|
|
return migrateInstance.Version()
|
|
}
|
|
|
|
// ForceVersion forces the migration version (useful for fixing dirty states)
|
|
func (m *Migrator) ForceVersion(version int) error {
|
|
migrateInstance, err := m.createMigrate()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create migrator: %w", err)
|
|
}
|
|
defer migrateInstance.Close()
|
|
|
|
if err := migrateInstance.Force(version); err != nil {
|
|
return fmt.Errorf("failed to force version %d: %w", version, err)
|
|
}
|
|
|
|
m.logger.Info("Successfully forced migration version", zap.Int("version", version))
|
|
return nil
|
|
}
|
|
|
|
// createMigrate creates a migrate instance with proper configuration
|
|
func (m *Migrator) createMigrate() (*migrate.Migrate, error) {
|
|
// Set global gocql logger to suppress "invalid peer" warnings
|
|
// This affects the internal gocql connections used by golang-migrate
|
|
gocql.Logger = &silentGocqlLogger{}
|
|
|
|
// Build Cassandra connection string
|
|
// Format: cassandra://host:port/keyspace?consistency=level
|
|
databaseURL := fmt.Sprintf("cassandra://%s/%s?consistency=%s",
|
|
m.config.Database.Hosts[0], // Use first host for migrations
|
|
m.config.Database.Keyspace,
|
|
m.config.Database.Consistency,
|
|
)
|
|
|
|
// Create migrate instance
|
|
migrateInstance, err := migrate.New(m.config.Database.MigrationsPath, databaseURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize migrate: %w", err)
|
|
}
|
|
|
|
return migrateInstance, nil
|
|
}
|
|
|
|
// ensureKeyspaceExists creates the keyspace if it doesn't exist
|
|
// This must be done before running migrations since golang-migrate requires the keyspace to exist
|
|
func (m *Migrator) ensureKeyspaceExists() error {
|
|
// Create cluster configuration without keyspace
|
|
cluster := gocql.NewCluster(m.config.Database.Hosts...)
|
|
cluster.Port = 9042
|
|
cluster.Consistency = gocql.Quorum
|
|
cluster.ProtoVersion = 4
|
|
|
|
// Suppress noisy "invalid peer" warnings from gocql
|
|
// Use a minimal logger that discards these harmless Docker networking warnings
|
|
cluster.Logger = &silentGocqlLogger{}
|
|
|
|
// Create session to system keyspace
|
|
session, err := cluster.CreateSession()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to Cassandra: %w", err)
|
|
}
|
|
defer session.Close()
|
|
|
|
// Create keyspace if it doesn't exist
|
|
replicationFactor := m.config.Database.Replication
|
|
createKeyspaceQuery := fmt.Sprintf(`
|
|
CREATE KEYSPACE IF NOT EXISTS %s
|
|
WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': %d}
|
|
AND durable_writes = true
|
|
`, m.config.Database.Keyspace, replicationFactor)
|
|
|
|
m.logger.Debug("Creating keyspace if it doesn't exist",
|
|
zap.String("keyspace", m.config.Database.Keyspace))
|
|
if err := session.Query(createKeyspaceQuery).Exec(); err != nil {
|
|
return fmt.Errorf("failed to create keyspace: %w", err)
|
|
}
|
|
|
|
m.logger.Debug("Keyspace is ready", zap.String("keyspace", m.config.Database.Keyspace))
|
|
return nil
|
|
}
|