453 lines
13 KiB
Go
453 lines
13 KiB
Go
// codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/pkg/observability/health.go
|
|
package observability
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gocql/gocql"
|
|
"go.uber.org/zap"
|
|
|
|
"codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/pkg/storage/cache/twotiercache"
|
|
"codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/pkg/storage/object/s3"
|
|
)
|
|
|
|
// HealthStatus represents the health status of a component
|
|
type HealthStatus string
|
|
|
|
const (
|
|
HealthStatusHealthy HealthStatus = "healthy"
|
|
HealthStatusUnhealthy HealthStatus = "unhealthy"
|
|
HealthStatusDegraded HealthStatus = "degraded"
|
|
)
|
|
|
|
// HealthCheckResult represents the result of a health check
|
|
type HealthCheckResult struct {
|
|
Status HealthStatus `json:"status"`
|
|
Message string `json:"message,omitempty"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Duration string `json:"duration,omitempty"`
|
|
Component string `json:"component"`
|
|
Details interface{} `json:"details,omitempty"`
|
|
}
|
|
|
|
// HealthResponse represents the overall health response
|
|
type HealthResponse struct {
|
|
Status HealthStatus `json:"status"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Services map[string]HealthCheckResult `json:"services"`
|
|
Version string `json:"version"`
|
|
Uptime string `json:"uptime"`
|
|
}
|
|
|
|
// HealthChecker manages health checks for various components
|
|
type HealthChecker struct {
|
|
checks map[string]HealthCheck
|
|
mu sync.RWMutex
|
|
logger *zap.Logger
|
|
startTime time.Time
|
|
}
|
|
|
|
// HealthCheck represents a health check function
|
|
type HealthCheck func(ctx context.Context) HealthCheckResult
|
|
|
|
// NewHealthChecker creates a new health checker
|
|
func NewHealthChecker(logger *zap.Logger) *HealthChecker {
|
|
return &HealthChecker{
|
|
checks: make(map[string]HealthCheck),
|
|
logger: logger,
|
|
startTime: time.Now(),
|
|
}
|
|
}
|
|
|
|
// RegisterCheck registers a health check for a service
|
|
func (hc *HealthChecker) RegisterCheck(name string, check HealthCheck) {
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
hc.checks[name] = check
|
|
}
|
|
|
|
// CheckHealth performs all registered health checks
|
|
func (hc *HealthChecker) CheckHealth(ctx context.Context) HealthResponse {
|
|
hc.mu.RLock()
|
|
checks := make(map[string]HealthCheck, len(hc.checks))
|
|
for name, check := range hc.checks {
|
|
checks[name] = check
|
|
}
|
|
hc.mu.RUnlock()
|
|
|
|
results := make(map[string]HealthCheckResult)
|
|
overallStatus := HealthStatusHealthy
|
|
|
|
// Run health checks with timeout
|
|
checkCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
for name, check := range checks {
|
|
start := time.Now()
|
|
result := check(checkCtx)
|
|
result.Duration = time.Since(start).String()
|
|
|
|
results[name] = result
|
|
|
|
// Determine overall status
|
|
if result.Status == HealthStatusUnhealthy {
|
|
overallStatus = HealthStatusUnhealthy
|
|
} else if result.Status == HealthStatusDegraded && overallStatus == HealthStatusHealthy {
|
|
overallStatus = HealthStatusDegraded
|
|
}
|
|
}
|
|
|
|
return HealthResponse{
|
|
Status: overallStatus,
|
|
Timestamp: time.Now(),
|
|
Services: results,
|
|
Version: "1.0.0", // Could be injected
|
|
Uptime: time.Since(hc.startTime).String(),
|
|
}
|
|
}
|
|
|
|
// HealthHandler creates an HTTP handler for health checks
|
|
func (hc *HealthChecker) HealthHandler() http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
ctx := r.Context()
|
|
health := hc.CheckHealth(ctx)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
// Set appropriate status code
|
|
switch health.Status {
|
|
case HealthStatusHealthy:
|
|
w.WriteHeader(http.StatusOK)
|
|
case HealthStatusDegraded:
|
|
w.WriteHeader(http.StatusOK) // 200 but degraded
|
|
case HealthStatusUnhealthy:
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
}
|
|
|
|
if err := json.NewEncoder(w).Encode(health); err != nil {
|
|
hc.logger.Error("Failed to encode health response", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// ReadinessHandler creates a simple readiness probe
|
|
func (hc *HealthChecker) ReadinessHandler() http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
ctx := r.Context()
|
|
health := hc.CheckHealth(ctx)
|
|
|
|
// For readiness, we're more strict - any unhealthy component means not ready
|
|
if health.Status == HealthStatusUnhealthy {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
w.Write([]byte("NOT READY"))
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("READY"))
|
|
}
|
|
}
|
|
|
|
// LivenessHandler creates a simple liveness probe
|
|
func (hc *HealthChecker) LivenessHandler() http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
// For liveness, we just check if the service can respond
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("ALIVE"))
|
|
}
|
|
}
|
|
|
|
// CassandraHealthCheck creates a health check for Cassandra database connectivity
|
|
func CassandraHealthCheck(session *gocql.Session, logger *zap.Logger) HealthCheck {
|
|
return func(ctx context.Context) HealthCheckResult {
|
|
start := time.Now()
|
|
|
|
// Check if session is nil
|
|
if session == nil {
|
|
return HealthCheckResult{
|
|
Status: HealthStatusUnhealthy,
|
|
Message: "Cassandra session is nil",
|
|
Timestamp: time.Now(),
|
|
Component: "cassandra",
|
|
Details: map[string]interface{}{"error": "session_nil"},
|
|
}
|
|
}
|
|
|
|
// Try to execute a simple query with context
|
|
var result string
|
|
query := session.Query("SELECT uuid() FROM system.local")
|
|
|
|
// Create a channel to handle the query execution
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
done <- query.Scan(&result)
|
|
}()
|
|
|
|
// Wait for either completion or context cancellation
|
|
select {
|
|
case err := <-done:
|
|
duration := time.Since(start)
|
|
|
|
if err != nil {
|
|
logger.Warn("Cassandra health check failed",
|
|
zap.Error(err),
|
|
zap.Duration("duration", duration))
|
|
|
|
return HealthCheckResult{
|
|
Status: HealthStatusUnhealthy,
|
|
Message: "Cassandra query failed: " + err.Error(),
|
|
Timestamp: time.Now(),
|
|
Component: "cassandra",
|
|
Details: map[string]interface{}{
|
|
"error": err.Error(),
|
|
"duration": duration.String(),
|
|
},
|
|
}
|
|
}
|
|
|
|
return HealthCheckResult{
|
|
Status: HealthStatusHealthy,
|
|
Message: "Cassandra connection healthy",
|
|
Timestamp: time.Now(),
|
|
Component: "cassandra",
|
|
Details: map[string]interface{}{
|
|
"query_result": result,
|
|
"duration": duration.String(),
|
|
},
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return HealthCheckResult{
|
|
Status: HealthStatusUnhealthy,
|
|
Message: "Cassandra health check timed out",
|
|
Timestamp: time.Now(),
|
|
Component: "cassandra",
|
|
Details: map[string]interface{}{
|
|
"error": "timeout",
|
|
"duration": time.Since(start).String(),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TwoTierCacheHealthCheck creates a health check for the two-tier cache system
|
|
func TwoTierCacheHealthCheck(cache twotiercache.TwoTierCacher, logger *zap.Logger) HealthCheck {
|
|
return func(ctx context.Context) HealthCheckResult {
|
|
start := time.Now()
|
|
|
|
if cache == nil {
|
|
return HealthCheckResult{
|
|
Status: HealthStatusUnhealthy,
|
|
Message: "Cache instance is nil",
|
|
Timestamp: time.Now(),
|
|
Component: "two_tier_cache",
|
|
Details: map[string]interface{}{"error": "cache_nil"},
|
|
}
|
|
}
|
|
|
|
// Test cache functionality with a health check key
|
|
healthKey := "health_check_" + time.Now().Format("20060102150405")
|
|
testValue := []byte("health_check_value")
|
|
|
|
// Test Set operation
|
|
if err := cache.Set(ctx, healthKey, testValue); err != nil {
|
|
duration := time.Since(start)
|
|
logger.Warn("Cache health check SET failed",
|
|
zap.Error(err),
|
|
zap.Duration("duration", duration))
|
|
|
|
return HealthCheckResult{
|
|
Status: HealthStatusUnhealthy,
|
|
Message: "Cache SET operation failed: " + err.Error(),
|
|
Timestamp: time.Now(),
|
|
Component: "two_tier_cache",
|
|
Details: map[string]interface{}{
|
|
"error": err.Error(),
|
|
"operation": "set",
|
|
"duration": duration.String(),
|
|
},
|
|
}
|
|
}
|
|
|
|
// Test Get operation
|
|
retrievedValue, err := cache.Get(ctx, healthKey)
|
|
if err != nil {
|
|
duration := time.Since(start)
|
|
logger.Warn("Cache health check GET failed",
|
|
zap.Error(err),
|
|
zap.Duration("duration", duration))
|
|
|
|
return HealthCheckResult{
|
|
Status: HealthStatusUnhealthy,
|
|
Message: "Cache GET operation failed: " + err.Error(),
|
|
Timestamp: time.Now(),
|
|
Component: "two_tier_cache",
|
|
Details: map[string]interface{}{
|
|
"error": err.Error(),
|
|
"operation": "get",
|
|
"duration": duration.String(),
|
|
},
|
|
}
|
|
}
|
|
|
|
// Verify the value
|
|
if string(retrievedValue) != string(testValue) {
|
|
duration := time.Since(start)
|
|
return HealthCheckResult{
|
|
Status: HealthStatusDegraded,
|
|
Message: "Cache value mismatch",
|
|
Timestamp: time.Now(),
|
|
Component: "two_tier_cache",
|
|
Details: map[string]interface{}{
|
|
"expected": string(testValue),
|
|
"actual": string(retrievedValue),
|
|
"duration": duration.String(),
|
|
},
|
|
}
|
|
}
|
|
|
|
// Clean up test key
|
|
_ = cache.Delete(ctx, healthKey)
|
|
|
|
duration := time.Since(start)
|
|
return HealthCheckResult{
|
|
Status: HealthStatusHealthy,
|
|
Message: "Two-tier cache healthy",
|
|
Timestamp: time.Now(),
|
|
Component: "two_tier_cache",
|
|
Details: map[string]interface{}{
|
|
"operations_tested": []string{"set", "get", "delete"},
|
|
"duration": duration.String(),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// S3HealthCheck creates a health check for S3 object storage
|
|
func S3HealthCheck(s3Storage s3.S3ObjectStorage, logger *zap.Logger) HealthCheck {
|
|
return func(ctx context.Context) HealthCheckResult {
|
|
start := time.Now()
|
|
|
|
if s3Storage == nil {
|
|
return HealthCheckResult{
|
|
Status: HealthStatusUnhealthy,
|
|
Message: "S3 storage instance is nil",
|
|
Timestamp: time.Now(),
|
|
Component: "s3_storage",
|
|
Details: map[string]interface{}{"error": "storage_nil"},
|
|
}
|
|
}
|
|
|
|
// Test basic S3 connectivity by listing objects (lightweight operation)
|
|
_, err := s3Storage.ListAllObjects(ctx)
|
|
duration := time.Since(start)
|
|
|
|
if err != nil {
|
|
logger.Warn("S3 health check failed",
|
|
zap.Error(err),
|
|
zap.Duration("duration", duration))
|
|
|
|
return HealthCheckResult{
|
|
Status: HealthStatusUnhealthy,
|
|
Message: "S3 connectivity failed: " + err.Error(),
|
|
Timestamp: time.Now(),
|
|
Component: "s3_storage",
|
|
Details: map[string]interface{}{
|
|
"error": err.Error(),
|
|
"operation": "list_objects",
|
|
"duration": duration.String(),
|
|
},
|
|
}
|
|
}
|
|
|
|
return HealthCheckResult{
|
|
Status: HealthStatusHealthy,
|
|
Message: "S3 storage healthy",
|
|
Timestamp: time.Now(),
|
|
Component: "s3_storage",
|
|
Details: map[string]interface{}{
|
|
"operation": "list_objects",
|
|
"duration": duration.String(),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// RegisterRealHealthChecks registers health checks for actual infrastructure components
|
|
// Note: This function was previously used with Uber FX. It can be called directly
|
|
// or wired through Google Wire if needed.
|
|
func RegisterRealHealthChecks(
|
|
hc *HealthChecker,
|
|
logger *zap.Logger,
|
|
cassandraSession *gocql.Session,
|
|
cache twotiercache.TwoTierCacher,
|
|
s3Storage s3.S3ObjectStorage,
|
|
) {
|
|
// Register Cassandra health check
|
|
hc.RegisterCheck("cassandra", CassandraHealthCheck(cassandraSession, logger))
|
|
|
|
// Register two-tier cache health check
|
|
hc.RegisterCheck("cache", TwoTierCacheHealthCheck(cache, logger))
|
|
|
|
// Register S3 storage health check
|
|
hc.RegisterCheck("s3_storage", S3HealthCheck(s3Storage, logger))
|
|
|
|
logger.Info("Real infrastructure health checks registered",
|
|
zap.Strings("components", []string{"cassandra", "cache", "s3_storage"}))
|
|
}
|
|
|
|
// StartObservabilityServer starts the observability HTTP server on a separate port
|
|
// Note: This function was previously integrated with Uber FX lifecycle.
|
|
// It should now be called manually or integrated with Google Wire if needed.
|
|
func StartObservabilityServer(
|
|
hc *HealthChecker,
|
|
ms *MetricsServer,
|
|
logger *zap.Logger,
|
|
) (*http.Server, error) {
|
|
mux := http.NewServeMux()
|
|
|
|
// Health endpoints
|
|
mux.HandleFunc("/health", hc.HealthHandler())
|
|
mux.HandleFunc("/health/ready", hc.ReadinessHandler())
|
|
mux.HandleFunc("/health/live", hc.LivenessHandler())
|
|
|
|
// Metrics endpoint
|
|
mux.Handle("/metrics", ms.Handler())
|
|
|
|
server := &http.Server{
|
|
Addr: ":8080", // Separate port for observability
|
|
Handler: mux,
|
|
ReadTimeout: 30 * time.Second,
|
|
WriteTimeout: 30 * time.Second,
|
|
IdleTimeout: 60 * time.Second,
|
|
}
|
|
|
|
go func() {
|
|
logger.Info("Starting observability server on :8080")
|
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
logger.Error("Observability server failed", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
return server, nil
|
|
}
|