package repo import ( "context" "fmt" "time" "github.com/gocql/gocql" "go.uber.org/zap" "codeberg.org/mapleopentech/monorepo/cloud/maplepress-backend/internal/domain/site" ) type siteRepository struct { session *gocql.Session logger *zap.Logger } // NewSiteRepository creates a new site repository func NewSiteRepository(session *gocql.Session, logger *zap.Logger) site.Repository { return &siteRepository{ session: session, logger: logger.Named("site-repo"), } } // Create inserts a site into all 4 Cassandra tables using a batch func (r *siteRepository) Create(ctx context.Context, s *site.Site) error { // Check if domain already exists exists, err := r.DomainExists(ctx, s.Domain) if err != nil { return fmt.Errorf("failed to check domain existence: %w", err) } if exists { return site.ErrDomainAlreadyExists } batch := r.session.NewBatch(gocql.LoggedBatch) // 1. Insert into sites_by_id (primary table) batch.Query(` INSERT INTO maplepress.sites_by_id ( tenant_id, id, site_url, domain, api_key_hash, api_key_prefix, api_key_last_four, status, is_verified, verification_token, search_index_name, total_pages_indexed, last_indexed_at, plugin_version, storage_used_bytes, search_requests_count, monthly_pages_indexed, last_reset_at, language, timezone, notes, created_at, updated_at, created_from_ip_address, created_from_ip_timestamp, modified_from_ip_address, modified_from_ip_timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, s.TenantID, s.ID, s.SiteURL, s.Domain, s.APIKeyHash, s.APIKeyPrefix, s.APIKeyLastFour, s.Status, s.IsVerified, s.VerificationToken, s.SearchIndexName, s.TotalPagesIndexed, s.LastIndexedAt, s.PluginVersion, s.StorageUsedBytes, s.SearchRequestsCount, s.MonthlyPagesIndexed, s.LastResetAt, s.Language, s.Timezone, s.Notes, s.CreatedAt, s.UpdatedAt, s.CreatedFromIPAddress, s.CreatedFromIPTimestamp, s.ModifiedFromIPAddress, s.ModifiedFromIPTimestamp, ) // 2. Insert into sites_by_tenant (list view) batch.Query(` INSERT INTO maplepress.sites_by_tenant ( tenant_id, created_at, id, domain, status, is_verified ) VALUES (?, ?, ?, ?, ?, ?) `, s.TenantID, s.CreatedAt, s.ID, s.Domain, s.Status, s.IsVerified, ) // 3. Insert into sites_by_domain (domain lookup & uniqueness) batch.Query(` INSERT INTO maplepress.sites_by_domain ( domain, tenant_id, id, site_url, api_key_hash, api_key_prefix, api_key_last_four, status, is_verified, verification_token, search_index_name, total_pages_indexed, last_indexed_at, plugin_version, storage_used_bytes, search_requests_count, monthly_pages_indexed, last_reset_at, language, timezone, notes, created_at, updated_at, created_from_ip_address, created_from_ip_timestamp, modified_from_ip_address, modified_from_ip_timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, s.Domain, s.TenantID, s.ID, s.SiteURL, s.APIKeyHash, s.APIKeyPrefix, s.APIKeyLastFour, s.Status, s.IsVerified, s.VerificationToken, s.SearchIndexName, s.TotalPagesIndexed, s.LastIndexedAt, s.PluginVersion, s.StorageUsedBytes, s.SearchRequestsCount, s.MonthlyPagesIndexed, s.LastResetAt, s.Language, s.Timezone, s.Notes, s.CreatedAt, s.UpdatedAt, s.CreatedFromIPAddress, s.CreatedFromIPTimestamp, s.ModifiedFromIPAddress, s.ModifiedFromIPTimestamp, ) // 4. Insert into sites_by_apikey (authentication table) batch.Query(` INSERT INTO maplepress.sites_by_apikey ( api_key_hash, tenant_id, id, domain, site_url, api_key_prefix, api_key_last_four, status, is_verified, search_index_name, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, s.APIKeyHash, s.TenantID, s.ID, s.Domain, s.SiteURL, s.APIKeyPrefix, s.APIKeyLastFour, s.Status, s.IsVerified, s.SearchIndexName, s.CreatedAt, s.UpdatedAt, ) // Execute batch if err := r.session.ExecuteBatch(batch.WithContext(ctx)); err != nil { r.logger.Error("failed to create site", zap.Error(err), zap.String("domain", s.Domain)) return fmt.Errorf("failed to create site: %w", err) } r.logger.Info("site created successfully", zap.String("site_id", s.ID.String()), zap.String("domain", s.Domain), zap.String("tenant_id", s.TenantID.String())) return nil } // GetByID retrieves a site by tenant_id and site_id func (r *siteRepository) GetByID(ctx context.Context, tenantID, siteID gocql.UUID) (*site.Site, error) { var s site.Site query := ` SELECT tenant_id, id, site_url, domain, api_key_hash, api_key_prefix, api_key_last_four, status, is_verified, verification_token, search_index_name, total_pages_indexed, last_indexed_at, plugin_version, storage_used_bytes, search_requests_count, monthly_pages_indexed, last_reset_at, language, timezone, notes, created_at, updated_at, created_from_ip_address, created_from_ip_timestamp, modified_from_ip_address, modified_from_ip_timestamp FROM maplepress.sites_by_id WHERE tenant_id = ? AND id = ? ` err := r.session.Query(query, tenantID, siteID). WithContext(ctx). Scan( &s.TenantID, &s.ID, &s.SiteURL, &s.Domain, &s.APIKeyHash, &s.APIKeyPrefix, &s.APIKeyLastFour, &s.Status, &s.IsVerified, &s.VerificationToken, &s.SearchIndexName, &s.TotalPagesIndexed, &s.LastIndexedAt, &s.PluginVersion, &s.StorageUsedBytes, &s.SearchRequestsCount, &s.MonthlyPagesIndexed, &s.LastResetAt, &s.Language, &s.Timezone, &s.Notes, &s.CreatedAt, &s.UpdatedAt, &s.CreatedFromIPAddress, &s.CreatedFromIPTimestamp, &s.ModifiedFromIPAddress, &s.ModifiedFromIPTimestamp, ) if err == gocql.ErrNotFound { return nil, site.ErrNotFound } if err != nil { r.logger.Error("failed to get site by id", zap.Error(err)) return nil, fmt.Errorf("failed to get site: %w", err) } return &s, nil } // GetByDomain retrieves a site by domain func (r *siteRepository) GetByDomain(ctx context.Context, domain string) (*site.Site, error) { var s site.Site query := ` SELECT domain, tenant_id, id, site_url, api_key_hash, api_key_prefix, api_key_last_four, status, is_verified, verification_token, search_index_name, total_pages_indexed, last_indexed_at, plugin_version, storage_used_bytes, search_requests_count, monthly_pages_indexed, last_reset_at, language, timezone, notes, created_at, updated_at, created_from_ip_address, created_from_ip_timestamp, modified_from_ip_address, modified_from_ip_timestamp FROM maplepress.sites_by_domain WHERE domain = ? ` err := r.session.Query(query, domain). WithContext(ctx). Scan( &s.Domain, &s.TenantID, &s.ID, &s.SiteURL, &s.APIKeyHash, &s.APIKeyPrefix, &s.APIKeyLastFour, &s.Status, &s.IsVerified, &s.VerificationToken, &s.SearchIndexName, &s.TotalPagesIndexed, &s.LastIndexedAt, &s.PluginVersion, &s.StorageUsedBytes, &s.SearchRequestsCount, &s.MonthlyPagesIndexed, &s.LastResetAt, &s.Language, &s.Timezone, &s.Notes, &s.CreatedAt, &s.UpdatedAt, &s.CreatedFromIPAddress, &s.CreatedFromIPTimestamp, &s.ModifiedFromIPAddress, &s.ModifiedFromIPTimestamp, ) if err == gocql.ErrNotFound { return nil, site.ErrNotFound } if err != nil { r.logger.Error("failed to get site by domain", zap.Error(err), zap.String("domain", domain)) return nil, fmt.Errorf("failed to get site by domain: %w", err) } return &s, nil } // GetByAPIKeyHash retrieves a site by API key hash (optimized for authentication) func (r *siteRepository) GetByAPIKeyHash(ctx context.Context, apiKeyHash string) (*site.Site, error) { var s site.Site query := ` SELECT api_key_hash, tenant_id, id, domain, site_url, api_key_prefix, api_key_last_four, status, is_verified, search_index_name, created_at, updated_at FROM maplepress.sites_by_apikey WHERE api_key_hash = ? ` err := r.session.Query(query, apiKeyHash). WithContext(ctx). Scan( &s.APIKeyHash, &s.TenantID, &s.ID, &s.Domain, &s.SiteURL, &s.APIKeyPrefix, &s.APIKeyLastFour, &s.Status, &s.IsVerified, &s.SearchIndexName, &s.CreatedAt, &s.UpdatedAt, ) if err == gocql.ErrNotFound { return nil, site.ErrInvalidAPIKey } if err != nil { r.logger.Error("failed to get site by api key", zap.Error(err)) return nil, fmt.Errorf("failed to get site by api key: %w", err) } // Note: This returns partial data (optimized for auth) // Caller should use GetByID for full site details if needed return &s, nil } // ListByTenant retrieves all sites for a tenant with pagination func (r *siteRepository) ListByTenant(ctx context.Context, tenantID gocql.UUID, pageSize int, pageState []byte) ([]*site.Site, []byte, error) { query := ` SELECT tenant_id, created_at, id, domain, status, is_verified FROM maplepress.sites_by_tenant WHERE tenant_id = ? ` iter := r.session.Query(query, tenantID). WithContext(ctx). PageSize(pageSize). PageState(pageState). Iter() var sites []*site.Site var s site.Site for iter.Scan(&s.TenantID, &s.CreatedAt, &s.ID, &s.Domain, &s.Status, &s.IsVerified) { // Make a copy siteCopy := s sites = append(sites, &siteCopy) } newPageState := iter.PageState() if err := iter.Close(); err != nil { r.logger.Error("failed to list sites by tenant", zap.Error(err)) return nil, nil, fmt.Errorf("failed to list sites: %w", err) } return sites, newPageState, nil } // Update updates a site in all Cassandra tables func (r *siteRepository) Update(ctx context.Context, s *site.Site) error { s.UpdatedAt = time.Now() batch := r.session.NewBatch(gocql.LoggedBatch) // Update all 4 tables batch.Query(` UPDATE maplepress.sites_by_id SET site_url = ?, api_key_hash = ?, api_key_prefix = ?, api_key_last_four = ?, status = ?, is_verified = ?, verification_token = ?, total_pages_indexed = ?, last_indexed_at = ?, plugin_version = ?, storage_used_bytes = ?, search_requests_count = ?, monthly_pages_indexed = ?, last_reset_at = ?, language = ?, timezone = ?, notes = ?, updated_at = ?, modified_from_ip_address = ?, modified_from_ip_timestamp = ? WHERE tenant_id = ? AND id = ? `, s.SiteURL, s.APIKeyHash, s.APIKeyPrefix, s.APIKeyLastFour, s.Status, s.IsVerified, s.VerificationToken, s.TotalPagesIndexed, s.LastIndexedAt, s.PluginVersion, s.StorageUsedBytes, s.SearchRequestsCount, s.MonthlyPagesIndexed, s.LastResetAt, s.Language, s.Timezone, s.Notes, s.UpdatedAt, s.ModifiedFromIPAddress, s.ModifiedFromIPTimestamp, s.TenantID, s.ID, ) batch.Query(` UPDATE maplepress.sites_by_tenant SET status = ?, is_verified = ? WHERE tenant_id = ? AND created_at = ? AND id = ? `, s.Status, s.IsVerified, s.TenantID, s.CreatedAt, s.ID, ) batch.Query(` UPDATE maplepress.sites_by_domain SET site_url = ?, api_key_hash = ?, api_key_prefix = ?, api_key_last_four = ?, status = ?, is_verified = ?, verification_token = ?, total_pages_indexed = ?, last_indexed_at = ?, plugin_version = ?, storage_used_bytes = ?, search_requests_count = ?, monthly_pages_indexed = ?, last_reset_at = ?, language = ?, timezone = ?, notes = ?, updated_at = ?, modified_from_ip_address = ?, modified_from_ip_timestamp = ? WHERE domain = ? `, s.SiteURL, s.APIKeyHash, s.APIKeyPrefix, s.APIKeyLastFour, s.Status, s.IsVerified, s.VerificationToken, s.TotalPagesIndexed, s.LastIndexedAt, s.PluginVersion, s.StorageUsedBytes, s.SearchRequestsCount, s.MonthlyPagesIndexed, s.LastResetAt, s.Language, s.Timezone, s.Notes, s.UpdatedAt, s.ModifiedFromIPAddress, s.ModifiedFromIPTimestamp, s.Domain, ) batch.Query(` UPDATE maplepress.sites_by_apikey SET site_url = ?, api_key_prefix = ?, api_key_last_four = ?, status = ?, is_verified = ?, updated_at = ? WHERE api_key_hash = ? `, s.SiteURL, s.APIKeyPrefix, s.APIKeyLastFour, s.Status, s.IsVerified, s.UpdatedAt, s.APIKeyHash, ) if err := r.session.ExecuteBatch(batch.WithContext(ctx)); err != nil { r.logger.Error("failed to update site", zap.Error(err)) return fmt.Errorf("failed to update site: %w", err) } r.logger.Info("site updated successfully", zap.String("site_id", s.ID.String()), zap.String("domain", s.Domain)) return nil } // UpdateAPIKey updates the API key for a site in all Cassandra tables // This method properly handles the sites_by_apikey table by deleting the old entry and inserting a new one // since api_key_hash is part of the primary key and cannot be updated in place func (r *siteRepository) UpdateAPIKey(ctx context.Context, s *site.Site, oldAPIKeyHash string) error { s.UpdatedAt = time.Now() batch := r.session.NewBatch(gocql.LoggedBatch) // Update sites_by_id batch.Query(` UPDATE maplepress.sites_by_id SET api_key_hash = ?, api_key_prefix = ?, api_key_last_four = ?, updated_at = ?, modified_from_ip_address = ?, modified_from_ip_timestamp = ? WHERE tenant_id = ? AND id = ? `, s.APIKeyHash, s.APIKeyPrefix, s.APIKeyLastFour, s.UpdatedAt, s.ModifiedFromIPAddress, s.ModifiedFromIPTimestamp, s.TenantID, s.ID, ) // sites_by_tenant doesn't store API key info, no update needed // Update sites_by_domain batch.Query(` UPDATE maplepress.sites_by_domain SET api_key_hash = ?, api_key_prefix = ?, api_key_last_four = ?, updated_at = ?, modified_from_ip_address = ?, modified_from_ip_timestamp = ? WHERE domain = ? `, s.APIKeyHash, s.APIKeyPrefix, s.APIKeyLastFour, s.UpdatedAt, s.ModifiedFromIPAddress, s.ModifiedFromIPTimestamp, s.Domain, ) // sites_by_apikey: DELETE old entry (can't update primary key) batch.Query(` DELETE FROM maplepress.sites_by_apikey WHERE api_key_hash = ? `, oldAPIKeyHash) // sites_by_apikey: INSERT new entry with new API key hash batch.Query(` INSERT INTO maplepress.sites_by_apikey ( api_key_hash, tenant_id, id, domain, site_url, api_key_prefix, api_key_last_four, status, is_verified, search_index_name, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, s.APIKeyHash, s.TenantID, s.ID, s.Domain, s.SiteURL, s.APIKeyPrefix, s.APIKeyLastFour, s.Status, s.IsVerified, s.SearchIndexName, s.CreatedAt, s.UpdatedAt, ) if err := r.session.ExecuteBatch(batch.WithContext(ctx)); err != nil { r.logger.Error("failed to update site API key", zap.Error(err)) return fmt.Errorf("failed to update site API key: %w", err) } r.logger.Info("site API key updated successfully", zap.String("site_id", s.ID.String()), zap.String("domain", s.Domain), zap.String("new_key_prefix", s.APIKeyPrefix), zap.String("new_key_last_four", s.APIKeyLastFour)) return nil } // UpdateUsage updates only usage tracking fields (optimized for frequent updates) func (r *siteRepository) UpdateUsage(ctx context.Context, s *site.Site) error { s.UpdatedAt = time.Now() batch := r.session.NewBatch(gocql.LoggedBatch) // Only update usage tracking fields in relevant tables batch.Query(` UPDATE maplepress.sites_by_id SET total_pages_indexed = ?, monthly_pages_indexed = ?, storage_used_bytes = ?, search_requests_count = ?, last_reset_at = ?, updated_at = ? WHERE tenant_id = ? AND id = ? `, s.TotalPagesIndexed, s.MonthlyPagesIndexed, s.StorageUsedBytes, s.SearchRequestsCount, s.LastResetAt, s.UpdatedAt, s.TenantID, s.ID, ) batch.Query(` UPDATE maplepress.sites_by_domain SET total_pages_indexed = ?, monthly_pages_indexed = ?, storage_used_bytes = ?, search_requests_count = ?, last_reset_at = ?, updated_at = ? WHERE domain = ? `, s.TotalPagesIndexed, s.MonthlyPagesIndexed, s.StorageUsedBytes, s.SearchRequestsCount, s.LastResetAt, s.UpdatedAt, s.Domain, ) if err := r.session.ExecuteBatch(batch.WithContext(ctx)); err != nil { r.logger.Error("failed to update usage", zap.Error(err)) return fmt.Errorf("failed to update usage: %w", err) } return nil } // Delete removes a site from all Cassandra tables func (r *siteRepository) Delete(ctx context.Context, tenantID, siteID gocql.UUID) error { // First get the site to retrieve domain and api_key_hash s, err := r.GetByID(ctx, tenantID, siteID) if err != nil { return err } batch := r.session.NewBatch(gocql.LoggedBatch) // Delete from all 4 tables batch.Query(`DELETE FROM maplepress.sites_by_id WHERE tenant_id = ? AND id = ?`, tenantID, siteID) batch.Query(`DELETE FROM maplepress.sites_by_tenant WHERE tenant_id = ? AND created_at = ? AND id = ?`, tenantID, s.CreatedAt, siteID) batch.Query(`DELETE FROM maplepress.sites_by_domain WHERE domain = ?`, s.Domain) batch.Query(`DELETE FROM maplepress.sites_by_apikey WHERE api_key_hash = ?`, s.APIKeyHash) if err := r.session.ExecuteBatch(batch.WithContext(ctx)); err != nil { r.logger.Error("failed to delete site", zap.Error(err)) return fmt.Errorf("failed to delete site: %w", err) } r.logger.Info("site deleted successfully", zap.String("site_id", siteID.String()), zap.String("domain", s.Domain)) return nil } // DomainExists checks if a domain is already registered func (r *siteRepository) DomainExists(ctx context.Context, domain string) (bool, error) { var count int query := `SELECT COUNT(*) FROM maplepress.sites_by_domain WHERE domain = ?` err := r.session.Query(query, domain). WithContext(ctx). Scan(&count) if err != nil { r.logger.Error("failed to check domain existence", zap.Error(err)) return false, fmt.Errorf("failed to check domain: %w", err) } return count > 0, nil } // GetAllSitesForUsageReset retrieves all sites for monthly usage counter reset (admin task only) // WARNING: This uses ALLOW FILTERING and should only be used for scheduled administrative tasks func (r *siteRepository) GetAllSitesForUsageReset(ctx context.Context, pageSize int, pageState []byte) ([]*site.Site, []byte, error) { query := ` SELECT tenant_id, id, site_url, domain, api_key_hash, api_key_prefix, api_key_last_four, status, is_verified, verification_token, search_index_name, total_pages_indexed, last_indexed_at, plugin_version, storage_used_bytes, search_requests_count, monthly_pages_indexed, last_reset_at, language, timezone, notes, created_at, updated_at, created_from_ip_address, created_from_ip_timestamp, modified_from_ip_address, modified_from_ip_timestamp FROM maplepress.sites_by_id ALLOW FILTERING ` iter := r.session.Query(query). WithContext(ctx). PageSize(pageSize). PageState(pageState). Iter() var sites []*site.Site var s site.Site for iter.Scan( &s.TenantID, &s.ID, &s.SiteURL, &s.Domain, &s.APIKeyHash, &s.APIKeyPrefix, &s.APIKeyLastFour, &s.Status, &s.IsVerified, &s.VerificationToken, &s.SearchIndexName, &s.TotalPagesIndexed, &s.LastIndexedAt, &s.PluginVersion, &s.StorageUsedBytes, &s.SearchRequestsCount, &s.MonthlyPagesIndexed, &s.LastResetAt, &s.Language, &s.Timezone, &s.Notes, &s.CreatedAt, &s.UpdatedAt, &s.CreatedFromIPAddress, &s.CreatedFromIPTimestamp, &s.ModifiedFromIPAddress, &s.ModifiedFromIPTimestamp, ) { // Create a copy to avoid pointer reuse issues siteCopy := s sites = append(sites, &siteCopy) } nextPageState := iter.PageState() if err := iter.Close(); err != nil { r.logger.Error("failed to get all sites for usage reset", zap.Error(err)) return nil, nil, fmt.Errorf("failed to get sites: %w", err) } r.logger.Info("retrieved sites for usage reset", zap.Int("count", len(sites)), zap.Bool("has_more", len(nextPageState) > 0)) return sites, nextPageState, nil }