225 lines
7.1 KiB
Go
225 lines
7.1 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"codeberg.org/mapleopentech/monorepo/cloud/maplefile-backend/pkg/maplefile/client"
|
|
collectionDomain "codeberg.org/mapleopentech/monorepo/native/desktop/maplefile/internal/domain/collection"
|
|
"codeberg.org/mapleopentech/monorepo/native/desktop/maplefile/internal/domain/syncstate"
|
|
)
|
|
|
|
// CollectionSyncService defines the interface for collection synchronization
|
|
type CollectionSyncService interface {
|
|
Execute(ctx context.Context, input *SyncInput) (*SyncResult, error)
|
|
}
|
|
|
|
type collectionSyncService struct {
|
|
logger *zap.Logger
|
|
apiClient *client.Client
|
|
repoProvider RepositoryProvider
|
|
}
|
|
|
|
// ProvideCollectionSyncService creates a new collection sync service for Wire
|
|
func ProvideCollectionSyncService(
|
|
logger *zap.Logger,
|
|
apiClient *client.Client,
|
|
repoProvider RepositoryProvider,
|
|
) CollectionSyncService {
|
|
return &collectionSyncService{
|
|
logger: logger.Named("CollectionSyncService"),
|
|
apiClient: apiClient,
|
|
repoProvider: repoProvider,
|
|
}
|
|
}
|
|
|
|
// getCollectionRepo returns the collection repository, or an error if not initialized
|
|
func (s *collectionSyncService) getCollectionRepo() (collectionDomain.Repository, error) {
|
|
if !s.repoProvider.IsInitialized() {
|
|
return nil, fmt.Errorf("storage not initialized - user must be logged in")
|
|
}
|
|
repo := s.repoProvider.GetCollectionRepository()
|
|
if repo == nil {
|
|
return nil, fmt.Errorf("collection repository not available")
|
|
}
|
|
return repo, nil
|
|
}
|
|
|
|
// getSyncStateRepo returns the sync state repository, or an error if not initialized
|
|
func (s *collectionSyncService) getSyncStateRepo() (syncstate.Repository, error) {
|
|
if !s.repoProvider.IsInitialized() {
|
|
return nil, fmt.Errorf("storage not initialized - user must be logged in")
|
|
}
|
|
repo := s.repoProvider.GetSyncStateRepository()
|
|
if repo == nil {
|
|
return nil, fmt.Errorf("sync state repository not available")
|
|
}
|
|
return repo, nil
|
|
}
|
|
|
|
// Execute synchronizes collections from the cloud to local storage
|
|
func (s *collectionSyncService) Execute(ctx context.Context, input *SyncInput) (*SyncResult, error) {
|
|
s.logger.Info("Starting collection synchronization")
|
|
|
|
// Get repositories (will fail if user not logged in)
|
|
syncStateRepo, err := s.getSyncStateRepo()
|
|
if err != nil {
|
|
s.logger.Error("Cannot sync - storage not initialized", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
// Set defaults
|
|
if input == nil {
|
|
input = &SyncInput{}
|
|
}
|
|
if input.BatchSize <= 0 {
|
|
input.BatchSize = DefaultBatchSize
|
|
}
|
|
if input.MaxBatches <= 0 {
|
|
input.MaxBatches = DefaultMaxBatches
|
|
}
|
|
|
|
// Get current sync state
|
|
state, err := syncStateRepo.Get()
|
|
if err != nil {
|
|
s.logger.Error("Failed to get sync state", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
result := &SyncResult{}
|
|
batchCount := 0
|
|
|
|
// Sync loop - fetch and process batches until done or max reached
|
|
for batchCount < input.MaxBatches {
|
|
// Prepare API request
|
|
syncInput := &client.SyncInput{
|
|
Cursor: state.CollectionCursor,
|
|
Limit: input.BatchSize,
|
|
}
|
|
|
|
// Fetch batch from cloud
|
|
resp, err := s.apiClient.SyncCollections(ctx, syncInput)
|
|
if err != nil {
|
|
s.logger.Error("Failed to fetch collections from cloud", zap.Error(err))
|
|
result.Errors = append(result.Errors, "failed to fetch collections: "+err.Error())
|
|
break
|
|
}
|
|
|
|
// Process each collection in the batch
|
|
for _, cloudCol := range resp.Collections {
|
|
if err := s.processCollection(ctx, cloudCol, input.Password, result); err != nil {
|
|
s.logger.Error("Failed to process collection",
|
|
zap.String("id", cloudCol.ID),
|
|
zap.Error(err))
|
|
result.Errors = append(result.Errors, "failed to process collection "+cloudCol.ID+": "+err.Error())
|
|
}
|
|
result.CollectionsProcessed++
|
|
}
|
|
|
|
// Update sync state with new cursor
|
|
state.UpdateCollectionSync(resp.NextCursor, resp.HasMore)
|
|
if err := syncStateRepo.Save(state); err != nil {
|
|
s.logger.Error("Failed to save sync state", zap.Error(err))
|
|
result.Errors = append(result.Errors, "failed to save sync state: "+err.Error())
|
|
}
|
|
|
|
batchCount++
|
|
|
|
// Check if we're done
|
|
if !resp.HasMore {
|
|
s.logger.Info("Collection sync completed - no more items")
|
|
break
|
|
}
|
|
}
|
|
|
|
s.logger.Info("Collection sync finished",
|
|
zap.Int("processed", result.CollectionsProcessed),
|
|
zap.Int("added", result.CollectionsAdded),
|
|
zap.Int("updated", result.CollectionsUpdated),
|
|
zap.Int("deleted", result.CollectionsDeleted),
|
|
zap.Int("errors", len(result.Errors)))
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// processCollection handles a single collection from the cloud
|
|
// Note: ctx and password are reserved for future use (on-demand content decryption)
|
|
func (s *collectionSyncService) processCollection(_ context.Context, cloudCol *client.Collection, _ string, result *SyncResult) error {
|
|
// Get collection repository
|
|
collectionRepo, err := s.getCollectionRepo()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check if collection exists locally
|
|
localCol, err := collectionRepo.Get(cloudCol.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Handle deleted collections
|
|
if cloudCol.State == collectionDomain.StateDeleted {
|
|
if localCol != nil {
|
|
if err := collectionRepo.Delete(cloudCol.ID); err != nil {
|
|
return err
|
|
}
|
|
result.CollectionsDeleted++
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// The collection name comes from the API already decrypted for owned collections.
|
|
// For shared collections, it would need decryption using the key chain.
|
|
// For now, we use the name as-is from the API response.
|
|
collectionName := cloudCol.Name
|
|
|
|
// Create or update local collection
|
|
if localCol == nil {
|
|
// Create new local collection
|
|
newCol := s.mapCloudToLocal(cloudCol, collectionName)
|
|
if err := collectionRepo.Create(newCol); err != nil {
|
|
return err
|
|
}
|
|
result.CollectionsAdded++
|
|
} else {
|
|
// Update existing collection
|
|
updatedCol := s.mapCloudToLocal(cloudCol, collectionName)
|
|
updatedCol.SyncStatus = localCol.SyncStatus // Preserve local sync status
|
|
updatedCol.LastSyncedAt = time.Now()
|
|
if err := collectionRepo.Update(updatedCol); err != nil {
|
|
return err
|
|
}
|
|
result.CollectionsUpdated++
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// mapCloudToLocal converts a cloud collection to local domain model
|
|
func (s *collectionSyncService) mapCloudToLocal(cloudCol *client.Collection, decryptedName string) *collectionDomain.Collection {
|
|
return &collectionDomain.Collection{
|
|
ID: cloudCol.ID,
|
|
ParentID: cloudCol.ParentID,
|
|
OwnerID: cloudCol.UserID,
|
|
EncryptedCollectionKey: cloudCol.EncryptedCollectionKey.Ciphertext,
|
|
Nonce: cloudCol.EncryptedCollectionKey.Nonce,
|
|
Name: decryptedName,
|
|
Description: cloudCol.Description,
|
|
CustomIcon: cloudCol.CustomIcon, // Custom icon (emoji or "icon:<id>")
|
|
TotalFiles: cloudCol.TotalFiles,
|
|
TotalSizeInBytes: cloudCol.TotalSizeInBytes,
|
|
PermissionLevel: cloudCol.PermissionLevel,
|
|
IsOwner: cloudCol.IsOwner,
|
|
OwnerName: cloudCol.OwnerName,
|
|
OwnerEmail: cloudCol.OwnerEmail,
|
|
SyncStatus: collectionDomain.SyncStatusCloudOnly,
|
|
LastSyncedAt: time.Now(),
|
|
State: cloudCol.State,
|
|
CreatedAt: cloudCol.CreatedAt,
|
|
ModifiedAt: cloudCol.ModifiedAt,
|
|
}
|
|
}
|
|
|