From ffbd76b88b9b2d95dbe87f7650cadf365146961e Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Fri, 9 Aug 2024 12:40:52 +0530 Subject: [PATCH] [server] Support for replicating ml data --- server/cmd/museum/main.go | 5 + server/migrations/89_file_data_table.up.sql | 2 +- server/pkg/controller/filedata/controller.go | 11 +- server/pkg/controller/filedata/replicate.go | 115 ++++++++++++++++++- server/pkg/controller/replication3.go | 21 +--- server/pkg/repo/filedata/repository.go | 32 ++++++ server/pkg/utils/file/file.go | 18 +++ 7 files changed, 178 insertions(+), 26 deletions(-) diff --git a/server/cmd/museum/main.go b/server/cmd/museum/main.go index 8c9c88f304..f74b004421 100644 --- a/server/cmd/museum/main.go +++ b/server/cmd/museum/main.go @@ -831,6 +831,11 @@ func setupAndStartBackgroundJobs( } else { log.Info("Skipping Replication as replication is disabled") } + + err := fileDataCtrl.StartReplication() + if err != nil { + log.Warnf("Could not start fileData replication: %s", err) + } fileDataCtrl.StartDataDeletion() // Start data deletion for file data; objectCleanupController.StartRemovingUnreportedObjects() diff --git a/server/migrations/89_file_data_table.up.sql b/server/migrations/89_file_data_table.up.sql index c0feab6e64..72bfe0979d 100644 --- a/server/migrations/89_file_data_table.up.sql +++ b/server/migrations/89_file_data_table.up.sql @@ -15,7 +15,7 @@ CREATE TABLE IF NOT EXISTS file_data delete_from_buckets s3region[] NOT NULL DEFAULT '{}', inflight_rep_buckets s3region[] NOT NULL DEFAULT '{}', is_deleted BOOLEAN NOT NULL DEFAULT false, - pending_sync BOOLEAN NOT NULL DEFAULT false, + pending_sync BOOLEAN NOT NULL DEFAULT true, sync_locked_till BIGINT NOT NULL DEFAULT 0, created_at BIGINT NOT NULL DEFAULT now_utc_micro_seconds(), updated_at BIGINT NOT NULL DEFAULT now_utc_micro_seconds(), diff --git a/server/pkg/controller/filedata/controller.go b/server/pkg/controller/filedata/controller.go index fb795c01c0..afdd895854 100644 --- a/server/pkg/controller/filedata/controller.go +++ b/server/pkg/controller/filedata/controller.go @@ -55,6 +55,9 @@ type Controller struct { HostName string cleanupCronRunning bool downloadManagerCache map[string]*s3manager.Downloader + + // for downloading objects from s3 for replication + workerURL string } func New(repo *fileDataRepo.Repository, @@ -185,8 +188,8 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3M dc := row.LatestBucket s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, dc) if err != nil { - log.WithField("bucket", row.LatestBucket). - Error("error fetching embedding object: "+row.S3FileMetadataObjectKey(), err) + log.WithField("bucket", dc). + Error("error fetching object: "+row.S3FileMetadataObjectKey(), err) embeddingObjects[i] = bulkS3MetaFetchResult{ err: err, dbEntry: row, @@ -204,7 +207,7 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3M return embeddingObjects, nil } -func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, _ string) (*fileData.S3FileMetadata, error) { +func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, dc string) (*fileData.S3FileMetadata, error) { opt := _defaultFetchConfig objectKey := row.S3FileMetadataObjectKey() ctxLogger := log.WithField("objectKey", objectKey).WithField("dc", row.LatestBucket) @@ -223,7 +226,7 @@ func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, cancel() return nil, stacktrace.Propagate(ctx.Err(), "") default: - obj, err := c.downloadObject(fetchCtx, objectKey, row.LatestBucket) + obj, err := c.downloadObject(fetchCtx, objectKey, dc) cancel() // Ensure cancel is called to release resources if err == nil { if i > 0 { diff --git a/server/pkg/controller/filedata/replicate.go b/server/pkg/controller/filedata/replicate.go index 7baa814199..1cf4488d70 100644 --- a/server/pkg/controller/filedata/replicate.go +++ b/server/pkg/controller/filedata/replicate.go @@ -1,6 +1,119 @@ package filedata +import ( + "context" + "database/sql" + "errors" + "fmt" + "github.com/ente-io/museum/ente/filedata" + fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata" + enteTime "github.com/ente-io/museum/pkg/utils/time" + "github.com/ente-io/stacktrace" + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "time" +) + +// StartReplication starts the replication process for file data. +// If func (c *Controller) StartReplication() error { - // todo: implement replication logic + workerURL := viper.GetString("replication.worker-url") + if workerURL == "" { + log.Infof("replication.worker-url was not defined, file data will downloaded directly during replication") + } else { + log.Infof("Worker URL to download objects for replication v3 is: %s", workerURL) + } + c.workerURL = workerURL + + workerCount := viper.GetInt("replication.file-data.worker-count") + if workerCount == 0 { + workerCount = 6 + } + go c.startWorkers(workerCount) return nil } +func (c *Controller) startWorkers(n int) { + log.Infof("Starting %d workers for replication v3", n) + + for i := 0; i < n; i++ { + go c.replicate(i) + // Stagger the workers + time.Sleep(time.Duration(2*i+1) * time.Second) + } +} + +// Entry point for the replication worker (goroutine) +// +// i is an arbitrary index of the current routine. +func (c *Controller) replicate(i int) { + for { + err := c.tryReplicate() + if err != nil { + // Sleep in proportion to the (arbitrary) index to space out the + // workers further. + time.Sleep(time.Duration(i+1) * time.Minute) + } + } +} + +func (c *Controller) tryReplicate() error { + newLockTime := enteTime.MicrosecondsAfterMinutes(60) + ctx, cancelFun := context.WithTimeout(context.Background(), 50*time.Minute) + defer cancelFun() + row, err := c.Repo.GetPendingSyncDataAndExtendLock(ctx, newLockTime, false) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Errorf("Could not fetch row for deletion: %s", err) + } + return err + } + err = c.replicateRowData(ctx, *row) + if err != nil { + log.Errorf("Could not delete file data: %s", err) + return err + } else { + // If the replication was completed without any errors, we can reset the lock time + return c.Repo.ResetSyncLock(ctx, *row, newLockTime) + } +} + +func (c *Controller) replicateRowData(ctx context.Context, row filedata.Row) error { + wantInBucketIDs := map[string]bool{} + wantInBucketIDs[c.S3Config.GetBucketID(row.Type)] = true + rep := c.S3Config.GetReplicatedBuckets(row.Type) + for _, bucket := range rep { + wantInBucketIDs[bucket] = true + } + delete(wantInBucketIDs, row.LatestBucket) + for _, bucket := range row.ReplicatedBuckets { + delete(wantInBucketIDs, bucket) + } + if len(wantInBucketIDs) > 0 { + s3FileMetadata, err := c.downloadObject(ctx, row.S3FileMetadataObjectKey(), row.LatestBucket) + if err != nil { + return stacktrace.Propagate(err, "error fetching metadata object "+row.S3FileMetadataObjectKey()) + } + for bucketID := range wantInBucketIDs { + if err := c.uploadAndVerify(ctx, row, s3FileMetadata, bucketID); err != nil { + return stacktrace.Propagate(err, "error uploading and verifying metadata object") + } + } + } else { + log.Infof("No replication pending for file %d and type %s", row.FileID, string(row.Type)) + } + return c.Repo.MarkReplicationAsDone(ctx, row) +} + +func (c *Controller) uploadAndVerify(ctx context.Context, row filedata.Row, s3FileMetadata filedata.S3FileMetadata, dstBucketID string) error { + if err := c.Repo.RegisterReplicationAttempt(ctx, row, dstBucketID); err != nil { + return stacktrace.Propagate(err, "could not register replication attempt") + } + metadataSize, err := c.uploadObject(s3FileMetadata, row.S3FileMetadataObjectKey(), dstBucketID) + if err != nil { + return err + } + if metadataSize != row.Size { + return fmt.Errorf("uploaded metadata size %d does not match expected size %d", metadataSize, row.Size) + } + return c.Repo.MoveBetweenBuckets(row, dstBucketID, fileDataRepo.InflightRepColumn, fileDataRepo.ReplicationColumn) +} diff --git a/server/pkg/controller/replication3.go b/server/pkg/controller/replication3.go index 4fad173ea2..6fe1c0f2bc 100644 --- a/server/pkg/controller/replication3.go +++ b/server/pkg/controller/replication3.go @@ -308,7 +308,7 @@ func (c *ReplicationController3) tryReplicate() error { return commit(nil) } - err = ensureSufficientSpace(ob.Size) + err = file.EnsureSufficientSpace(ob.Size) if err != nil { // We don't have free space right now, maybe because other big files are // being downloaded simultanously, but we might get space later, so mark @@ -360,25 +360,6 @@ func (c *ReplicationController3) tryReplicate() error { return commit(err) } -// Return an error if we risk running out of disk space if we try to download -// and write a file of size. -// -// This function keeps a buffer of 1 GB free space in its calculations. -func ensureSufficientSpace(size int64) error { - free, err := file.FreeSpace("/") - if err != nil { - return stacktrace.Propagate(err, "Failed to fetch free space") - } - - gb := uint64(1024) * 1024 * 1024 - need := uint64(size) + (2 * gb) - if free < need { - return fmt.Errorf("insufficient space on disk (need %d bytes, free %d bytes)", size, free) - } - - return nil -} - // Create a temporary file for storing objectKey. Return both the path to the // file, and the handle to the file. // diff --git a/server/pkg/repo/filedata/repository.go b/server/pkg/repo/filedata/repository.go index bdf184591e..0293b05f0f 100644 --- a/server/pkg/repo/filedata/repository.go +++ b/server/pkg/repo/filedata/repository.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/ente-io/museum/ente" "github.com/ente-io/museum/ente/filedata" + "github.com/ente-io/museum/pkg/utils/array" "github.com/ente-io/stacktrace" "github.com/lib/pq" "time" @@ -194,6 +195,37 @@ func (r *Repository) GetPendingSyncDataAndExtendLock(ctx context.Context, newSyn return &fileData, nil } +// MarkReplicationAsDone marks the pending_sync as false for the file data row +func (r *Repository) MarkReplicationAsDone(ctx context.Context, row filedata.Row) error { + query := `UPDATE file_data SET pending_sync = false WHERE is_deleted=true and file_id = $1 AND data_type = $2 AND user_id = $3` + _, err := r.DB.ExecContext(ctx, query, row.FileID, string(row.Type), row.UserID) + if err != nil { + return stacktrace.Propagate(err, "") + } + return nil +} + +func (r *Repository) RegisterReplicationAttempt(ctx context.Context, row filedata.Row, dstBucketID string) error { + if array.StringInList(dstBucketID, row.DeleteFromBuckets) { + return r.MoveBetweenBuckets(row, dstBucketID, DeletionColumn, InflightRepColumn) + } + if array.StringInList(dstBucketID, row.InflightReplicas) == false { + return r.AddBucket(row, dstBucketID, InflightRepColumn) + } + return nil +} + +// ResetSyncLock resets the sync_locked_till to now_utc_micro_seconds() for the file data row only if pending_sync is false and +// the input syncLockedTill is equal to the existing sync_locked_till. This is used to reset the lock after the replication is done +func (r *Repository) ResetSyncLock(ctx context.Context, row filedata.Row, syncLockedTill int64) error { + query := `UPDATE file_data SET sync_locked_till = now_utc_micro_seconds() WHERE pending_sync = false and file_id = $1 AND data_type = $2 AND user_id = $3 AND sync_locked_till = $4` + _, err := r.DB.ExecContext(ctx, query, row.FileID, string(row.Type), row.UserID, syncLockedTill) + if err != nil { + return stacktrace.Propagate(err, "") + } + return nil +} + func (r *Repository) DeleteFileData(ctx context.Context, row filedata.Row) error { query := ` DELETE FROM file_data diff --git a/server/pkg/utils/file/file.go b/server/pkg/utils/file/file.go index db94347026..5e1872e59c 100644 --- a/server/pkg/utils/file/file.go +++ b/server/pkg/utils/file/file.go @@ -35,6 +35,24 @@ func FreeSpace(path string) (uint64, error) { return fs.Bfree * uint64(fs.Bsize), nil } +// EnsureSufficientSpace Return an error if we risk running out of disk space if we try to download +// and write a file of size. +// This function keeps a buffer of 2 GB free space in its calculations. +func EnsureSufficientSpace(size int64) error { + free, err := FreeSpace("/") + if err != nil { + return stacktrace.Propagate(err, "Failed to fetch free space") + } + + gb := uint64(1024) * 1024 * 1024 + need := uint64(size) + (2 * gb) + if free < need { + return fmt.Errorf("insufficient space on disk (need %d bytes, free %d bytes)", size, free) + } + + return nil +} + func GetLockNameForObject(objectKey string) string { return fmt.Sprintf("Object:%s", objectKey) }