monorepo/cloud/maplepress-backend/pkg/transaction/saga.go

516 lines
18 KiB
Go

package transaction
import (
"context"
"go.uber.org/zap"
)
// Package transaction provides a SAGA pattern implementation for managing distributed transactions.
//
// # What is SAGA Pattern?
//
// SAGA is a pattern for managing distributed transactions through a sequence of local transactions,
// each with a corresponding compensating transaction that undoes its effects if a later step fails.
//
// # When to Use SAGA
//
// Use SAGA when you have multiple database operations that need to succeed or fail together,
// but you can't use traditional ACID transactions (e.g., with Cassandra, distributed services,
// or operations across multiple bounded contexts).
//
// # Key Concepts
//
// - Forward Transaction: A database write operation (e.g., CreateTenant)
// - Compensating Transaction: An undo operation (e.g., DeleteTenant)
// - LIFO Execution: Compensations execute in reverse order (Last In, First Out)
//
// # Example Usage: User Registration Flow
//
// Problem: When registering a user, we create a tenant, then create a user.
// If user creation fails, the tenant becomes orphaned in the database.
//
// Solution: Use SAGA to automatically delete the tenant if user creation fails.
//
// func (s *RegisterService) Register(ctx context.Context, input *RegisterInput) (*RegisterResponse, error) {
// // Step 1: Create SAGA instance
// saga := transaction.NewSaga("user-registration", s.logger)
//
// // Step 2: Validate input (no DB writes, no compensation needed)
// if err := s.validateInputUC.Execute(input); err != nil {
// return nil, err
// }
//
// // Step 3: Create tenant (FIRST DB WRITE - register compensation)
// tenantOutput, err := s.createTenantUC.Execute(ctx, input)
// if err != nil {
// return nil, err // No rollback needed - tenant creation failed
// }
//
// // Register compensation: if anything fails later, delete this tenant
// saga.AddCompensation(func(ctx context.Context) error {
// s.logger.Warn("compensating: deleting tenant",
// zap.String("tenant_id", tenantOutput.ID))
// return s.deleteTenantUC.Execute(ctx, tenantOutput.ID)
// })
//
// // Step 4: Create user (SECOND DB WRITE)
// userOutput, err := s.createUserUC.Execute(ctx, tenantOutput.ID, input)
// if err != nil {
// s.logger.Error("user creation failed - rolling back tenant",
// zap.Error(err))
//
// // Execute SAGA rollback - this will delete the tenant
// saga.Rollback(ctx)
//
// return nil, err
// }
//
// // Success! Both tenant and user created, no rollback needed
// return &RegisterResponse{
// TenantID: tenantOutput.ID,
// UserID: userOutput.ID,
// }, nil
// }
//
// # Example Usage: Multi-Step Saga
//
// For operations with many steps, register multiple compensations:
//
// func (uc *ComplexOperationUseCase) Execute(ctx context.Context) error {
// saga := transaction.NewSaga("complex-operation", uc.logger)
//
// // Step 1: Create resource A
// resourceA, err := uc.createResourceA(ctx)
// if err != nil {
// return err
// }
// saga.AddCompensation(func(ctx context.Context) error {
// return uc.deleteResourceA(ctx, resourceA.ID)
// })
//
// // Step 2: Create resource B
// resourceB, err := uc.createResourceB(ctx)
// if err != nil {
// saga.Rollback(ctx) // Deletes A
// return err
// }
// saga.AddCompensation(func(ctx context.Context) error {
// return uc.deleteResourceB(ctx, resourceB.ID)
// })
//
// // Step 3: Create resource C
// resourceC, err := uc.createResourceC(ctx)
// if err != nil {
// saga.Rollback(ctx) // Deletes B, then A (LIFO order)
// return err
// }
// saga.AddCompensation(func(ctx context.Context) error {
// return uc.deleteResourceC(ctx, resourceC.ID)
// })
//
// // All steps succeeded - no rollback needed
// return nil
// }
//
// # Important Notes for Junior Developers
//
// 1. LIFO Order: Compensations execute in REVERSE order of registration
// If you create: Tenant → User → Email
// Rollback deletes: Email → User → Tenant
//
// 2. Idempotency: Compensating operations should be idempotent (safe to call multiple times)
// Your DeleteTenant should not error if tenant is already deleted
//
// 3. Failures Continue: If one compensation fails, others still execute
// This ensures maximum cleanup even if some operations fail
//
// 4. Logging: All operations are logged with emoji icons (🔴 for errors, 🟡 for warnings)
// Monitor logs for "saga rollback had failures" - indicates manual intervention needed
//
// 5. When NOT to Use SAGA:
// - Single database operation (no need for compensation)
// - Read-only operations (no state changes to rollback)
// - Operations where compensation isn't possible (e.g., sending an email can't be unsent)
//
// 6. Testing: Always test your rollback scenarios!
// Mock the second operation to fail and verify the first is rolled back
//
// # Common Pitfalls to Avoid
//
// - DON'T register compensations before the operation succeeds
// - DON'T forget to call saga.Rollback(ctx) when an operation fails
// - DON'T assume compensations will always succeed (they might fail too)
// - DON'T use SAGA for operations that can use database transactions
// - DO make your compensating operations idempotent
// - DO log all compensation failures for investigation
//
// # See Also
//
// For real-world examples, see:
// - internal/service/gateway/register.go (user registration with SAGA)
// - internal/usecase/tenant/delete.go (compensating transaction example)
// - internal/usecase/user/delete.go (compensating transaction example)
// Compensator defines a function that undoes a previously executed operation.
//
// A compensator is the "undo" function for a database write operation.
// For example:
// - Forward operation: CreateTenant
// - Compensator: DeleteTenant
//
// Compensators must:
// - Accept a context (for cancellation/timeouts)
// - Return an error if compensation fails
// - Be idempotent (safe to call multiple times)
// - Clean up the exact resources created by the forward operation
//
// Example:
//
// // Forward operation: Create tenant
// tenantID := "tenant-123"
// err := tenantRepo.Create(ctx, tenant)
//
// // Compensator: Delete tenant
// compensator := func(ctx context.Context) error {
// return tenantRepo.Delete(ctx, tenantID)
// }
//
// saga.AddCompensation(compensator)
type Compensator func(ctx context.Context) error
// Saga manages a sequence of operations with compensating transactions.
//
// A Saga coordinates a multi-step workflow where each step that performs a database
// write registers a compensating transaction. If any step fails, all registered
// compensations are executed in reverse order (LIFO) to undo previous changes.
//
// # How it Works
//
// 1. Create a Saga instance with NewSaga()
// 2. Execute your operations in sequence
// 3. After each successful write, call AddCompensation() with the undo operation
// 4. If any operation fails, call Rollback() to undo all previous changes
// 5. If all operations succeed, no action needed (compensations are never called)
//
// # Thread Safety
//
// Saga is NOT thread-safe. Do not share a single Saga instance across goroutines.
// Each workflow execution should create its own Saga instance.
//
// # Fields
//
// - name: Human-readable name for logging (e.g., "user-registration")
// - compensators: Stack of undo functions, executed in LIFO order
// - logger: Structured logger for tracking saga execution and failures
type Saga struct {
name string // Name of the saga (for logging)
compensators []Compensator // Stack of compensating transactions (LIFO)
logger *zap.Logger // Logger for tracking saga execution
}
// NewSaga creates a new SAGA instance with the given name.
//
// The name parameter should be a descriptive identifier for the workflow
// (e.g., "user-registration", "order-processing", "account-setup").
// This name appears in all log messages for easy tracking and debugging.
//
// # Parameters
//
// - name: A descriptive name for this saga workflow (used in logging)
// - logger: A zap logger instance (will be enhanced with saga-specific fields)
//
// # Returns
//
// A new Saga instance ready to coordinate multi-step operations.
//
// # Example
//
// // In your use case
// func (uc *RegisterUseCase) Execute(ctx context.Context, input *Input) error {
// // Create a new saga for this registration workflow
// saga := transaction.NewSaga("user-registration", uc.logger)
//
// // ... use saga for your operations ...
// }
//
// # Important
//
// Each workflow execution should create its own Saga instance.
// Do NOT reuse a Saga instance across multiple workflow executions.
func NewSaga(name string, logger *zap.Logger) *Saga {
return &Saga{
name: name,
compensators: make([]Compensator, 0),
logger: logger.Named("saga").With(zap.String("saga_name", name)),
}
}
// AddCompensation registers a compensating transaction for rollback.
//
// Call this method IMMEDIATELY AFTER a successful database write operation
// to register the corresponding undo operation.
//
// # Execution Order: LIFO (Last In, First Out)
//
// Compensations are executed in REVERSE order of registration during rollback.
// This ensures proper cleanup order:
// - If you create: Tenant → User → Subscription
// - Rollback deletes: Subscription → User → Tenant
//
// # Parameters
//
// - compensate: A function that undoes the operation (e.g., DeleteTenant)
//
// # When to Call
//
// // ✅ CORRECT: Register compensation AFTER operation succeeds
// tenantOutput, err := uc.createTenantUC.Execute(ctx, input)
// if err != nil {
// return nil, err // Operation failed - no compensation needed
// }
// // Operation succeeded - NOW register the undo operation
// saga.AddCompensation(func(ctx context.Context) error {
// return uc.deleteTenantUC.Execute(ctx, tenantOutput.ID)
// })
//
// // ❌ WRONG: Don't register compensation BEFORE operation
// saga.AddCompensation(func(ctx context.Context) error {
// return uc.deleteTenantUC.Execute(ctx, tenantOutput.ID)
// })
// tenantOutput, err := uc.createTenantUC.Execute(ctx, input) // Might fail!
//
// # Example: Basic Usage
//
// // Step 1: Create tenant
// tenant, err := uc.createTenantUC.Execute(ctx, input)
// if err != nil {
// return nil, err
// }
//
// // Step 2: Register compensation for tenant
// saga.AddCompensation(func(ctx context.Context) error {
// uc.logger.Warn("rolling back: deleting tenant",
// zap.String("tenant_id", tenant.ID))
// return uc.deleteTenantUC.Execute(ctx, tenant.ID)
// })
//
// # Example: Capturing Variables in Closure
//
// // Be careful with variable scope in closures!
// for _, item := range items {
// created, err := uc.createItem(ctx, item)
// if err != nil {
// saga.Rollback(ctx)
// return err
// }
//
// // ✅ CORRECT: Capture the variable value
// itemID := created.ID // Capture in local variable
// saga.AddCompensation(func(ctx context.Context) error {
// return uc.deleteItem(ctx, itemID) // Use captured value
// })
//
// // ❌ WRONG: Variable will have wrong value at rollback time
// saga.AddCompensation(func(ctx context.Context) error {
// return uc.deleteItem(ctx, created.ID) // 'created' may change!
// })
// }
//
// # Tips for Writing Good Compensators
//
// 1. Make them idempotent (safe to call multiple times)
// 2. Log what you're compensating for easier debugging
// 3. Capture all necessary IDs before the closure
// 4. Handle "not found" errors gracefully (resource may already be deleted)
// 5. Return errors if compensation truly fails (logged but doesn't stop other compensations)
func (s *Saga) AddCompensation(compensate Compensator) {
s.compensators = append(s.compensators, compensate)
s.logger.Debug("compensation registered",
zap.Int("total_compensations", len(s.compensators)))
}
// Rollback executes all registered compensating transactions in reverse order (LIFO).
//
// Call this method when any operation in your workflow fails AFTER you've started
// registering compensations. This will undo all previously successful operations
// by executing their compensating transactions in reverse order.
//
// # When to Call
//
// tenant, err := uc.createTenantUC.Execute(ctx, input)
// if err != nil {
// return nil, err // No compensations registered yet - no rollback needed
// }
// saga.AddCompensation(func(ctx context.Context) error {
// return uc.deleteTenantUC.Execute(ctx, tenant.ID)
// })
//
// user, err := uc.createUserUC.Execute(ctx, tenant.ID, input)
// if err != nil {
// // Compensations ARE registered - MUST call rollback!
// saga.Rollback(ctx)
// return nil, err
// }
//
// # Execution Behavior
//
// 1. LIFO Order: Compensations execute in REVERSE order of registration
// - If you registered: [DeleteTenant, DeleteUser, DeleteSubscription]
// - Rollback executes: DeleteSubscription → DeleteUser → DeleteTenant
//
// 2. Best Effort: If a compensation fails, it's logged but others still execute
// - This maximizes cleanup even if some operations fail
// - Failed compensations are logged with 🔴 emoji for investigation
//
// 3. No Panic: Rollback never panics, even if all compensations fail
// - Failures are logged for manual intervention
// - Returns without error (compensation failures are logged, not returned)
//
// # Example: Basic Rollback
//
// func (uc *RegisterUseCase) Execute(ctx context.Context, input *Input) error {
// saga := transaction.NewSaga("user-registration", uc.logger)
//
// // Step 1: Create tenant
// tenant, err := uc.createTenantUC.Execute(ctx, input)
// if err != nil {
// return err // No rollback needed
// }
// saga.AddCompensation(func(ctx context.Context) error {
// return uc.deleteTenantUC.Execute(ctx, tenant.ID)
// })
//
// // Step 2: Create user
// user, err := uc.createUserUC.Execute(ctx, tenant.ID, input)
// if err != nil {
// uc.logger.Error("user creation failed", zap.Error(err))
// saga.Rollback(ctx) // ← Deletes tenant
// return err
// }
//
// // Both operations succeeded - no rollback needed
// return nil
// }
//
// # Log Output Example
//
// Successful rollback:
//
// WARN 🟡 executing saga rollback {"saga_name": "user-registration", "compensation_count": 1}
// INFO executing compensation {"step": 1, "index": 0}
// INFO deleting tenant {"tenant_id": "tenant-123"}
// INFO tenant deleted successfully {"tenant_id": "tenant-123"}
// INFO compensation succeeded {"step": 1}
// WARN 🟡 saga rollback completed {"total_compensations": 1, "successes": 1, "failures": 0}
//
// Failed compensation:
//
// WARN 🟡 executing saga rollback
// INFO executing compensation
// ERROR 🔴 failed to delete tenant {"error": "connection lost"}
// ERROR 🔴 compensation failed {"step": 1, "error": "..."}
// WARN 🟡 saga rollback completed {"successes": 0, "failures": 1}
// ERROR 🔴 saga rollback had failures - manual intervention may be required
//
// # Important Notes
//
// 1. Always call Rollback if you've registered ANY compensations and a later step fails
// 2. Don't call Rollback if no compensations have been registered yet
// 3. Rollback is safe to call multiple times (idempotent) but wasteful
// 4. Monitor logs for "saga rollback had failures" - indicates manual cleanup needed
// 5. Context cancellation is respected - compensations will see cancelled context
//
// # Parameters
//
// - ctx: Context for cancellation/timeout (passed to each compensating function)
//
// # What Gets Logged
//
// - Start of rollback (warning level with 🟡 emoji)
// - Each compensation execution attempt
// - Success or failure of each compensation
// - Summary of rollback results
// - Alert if any compensations failed (error level with 🔴 emoji)
func (s *Saga) Rollback(ctx context.Context) {
if len(s.compensators) == 0 {
s.logger.Info("no compensations to execute")
return
}
s.logger.Warn("executing saga rollback",
zap.Int("compensation_count", len(s.compensators)))
successCount := 0
failureCount := 0
// Execute in reverse order (LIFO - Last In, First Out)
for i := len(s.compensators) - 1; i >= 0; i-- {
compensationStep := len(s.compensators) - i
s.logger.Info("executing compensation",
zap.Int("step", compensationStep),
zap.Int("index", i))
if err := s.compensators[i](ctx); err != nil {
failureCount++
// Log with error level (automatically adds emoji)
s.logger.Error("compensation failed",
zap.Int("step", compensationStep),
zap.Int("index", i),
zap.Error(err))
// Continue with other compensations even if one fails
} else {
successCount++
s.logger.Info("compensation succeeded",
zap.Int("step", compensationStep),
zap.Int("index", i))
}
}
s.logger.Warn("saga rollback completed",
zap.Int("total_compensations", len(s.compensators)),
zap.Int("successes", successCount),
zap.Int("failures", failureCount))
// If any compensations failed, this indicates a serious issue
// The operations team should be alerted to investigate
if failureCount > 0 {
s.logger.Error("saga rollback had failures - manual intervention may be required",
zap.Int("failed_compensations", failureCount))
}
}
// MustRollback is a convenience method that executes rollback.
//
// This method currently has the same behavior as Rollback() - it executes
// all compensating transactions but does NOT panic on failure.
//
// # When to Use
//
// Use this method when you want to make it explicit in your code that rollback
// is critical and must be executed, even though the actual behavior is the same
// as Rollback().
//
// # Example
//
// user, err := uc.createUserUC.Execute(ctx, tenant.ID, input)
// if err != nil {
// // Make it explicit that rollback is critical
// saga.MustRollback(ctx)
// return nil, err
// }
//
// # Note for Junior Developers
//
// Despite the name "MustRollback", this method does NOT panic if compensations fail.
// Compensation failures are logged for manual intervention, but the method returns normally.
//
// The name "Must" indicates that YOU must call this method if compensations are registered,
// not that the rollback itself must succeed.
//
// If you need actual panic behavior on compensation failure, you would need to check
// logs or implement custom panic logic.
func (s *Saga) MustRollback(ctx context.Context) {
s.Rollback(ctx)
}