From 86ad432d5b153b4f69a7f65f5baacd2e44b48b4e Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:16:40 +0530 Subject: [PATCH] [server] Implement file data deleted using existing table as queue --- server/pkg/controller/filedata/delete.go | 164 ++++++++++++++++------- server/pkg/repo/filedata/repository.go | 37 +++++ 2 files changed, 151 insertions(+), 50 deletions(-) diff --git a/server/pkg/controller/filedata/delete.go b/server/pkg/controller/filedata/delete.go index 799844d06b..d6d07cd24d 100644 --- a/server/pkg/controller/filedata/delete.go +++ b/server/pkg/controller/filedata/delete.go @@ -2,96 +2,160 @@ package filedata import ( "context" + "database/sql" + "errors" "fmt" "github.com/ente-io/museum/ente/filedata" fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata" + enteTime "github.com/ente-io/museum/pkg/utils/time" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" + "time" ) // StartDataDeletion clears associated file data from the object store func (c *Controller) StartDataDeletion() { - log.Info("Cleaning up deleted file data") - // todo: start goroutine workers to delete data - + go c.startDeleteWorkers(5) } -func (c *Controller) DeleteFileData(fileID int64) error { - ownerID, err := c.FileRepo.GetOwnerID(fileID) +func (c *Controller) startDeleteWorkers(n int) { + log.Infof("Starting %d delete workers for fileData", n) + + for i := 0; i < n; i++ { + go c.delete(i) + // Stagger the workers + time.Sleep(time.Duration(2*i+1) * time.Second) + } +} + +// Entry point for the delete worker (goroutine) +// +// i is an arbitrary index of the current routine. +func (c *Controller) delete(i int) { + // This is just + // + // while (true) { delete() } + // + // but with an extra sleep for a bit if nothing got deleted - both when + // something's wrong, or there's nothing to do. + for { + err := c.tryDelete() + if err != nil { + // Sleep in proportion to the (arbitrary) index to space out the + // workers further. + time.Sleep(time.Duration(i+1) * time.Minute) + } + } +} + +func (c *Controller) tryDelete() error { + newLockTime := enteTime.MicrosecondsAfterMinutes(10) + row, err := c.Repo.GetPendingSyncDataAndExtendLock(context.Background(), newLockTime, true) if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Errorf("Could not fetch row for deletion: %s", err) + } return err } + err = c.DeleteFileRow(*row) + if err != nil { + log.Errorf("Could not delete file data: %s", err) + return err + } + return nil +} +func (c *Controller) DeleteFileData(fileID int64) error { rows, err := c.Repo.GetFileData(context.Background(), fileID) if err != nil { return err } for i := range rows { fileDataRow := rows[i] - ctxLogger := log.WithField("file_id", fileDataRow.DeleteFromBuckets).WithField("type", fileDataRow.Type).WithField("user_id", fileDataRow.UserID) - objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type) - bucketColumnMap := make(map[string]string) - bucketColumnMap, err = getMapOfbucketItToColumn(fileDataRow) + err = c.DeleteFileRow(fileDataRow) if err != nil { - ctxLogger.WithError(err).Error("Failed to get bucketColumnMap") - return err - } - // Delete objects and remove buckets - for bucketID, columnName := range bucketColumnMap { - for _, objectKey := range objectKeys { - err := c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKey, bucketID) - if err != nil { - ctxLogger.WithError(err).WithFields(logrus.Fields{ - "bucketID": bucketID, - "column": columnName, - "objectKey": objectKey, - }).Error("Failed to delete object from datacenter") - return err - } - } - dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, columnName) - if dbErr != nil { - ctxLogger.WithError(dbErr).WithFields(logrus.Fields{ - "bucketID": bucketID, - "column": columnName, - }).Error("Failed to remove bucket from db") - return dbErr - - } - } - // Delete from Latest bucket - for k := range objectKeys { - err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], fileDataRow.LatestBucket) - if err != nil { - ctxLogger.WithError(err).Error("Failed to delete object from datacenter") - return err - } - } - dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow) - if dbErr != nil { - ctxLogger.WithError(dbErr).Error("Failed to remove from db") return err } } return nil } -func getMapOfbucketItToColumn(row filedata.Row) (map[string]string, error) { +func (c *Controller) DeleteFileRow(fileDataRow filedata.Row) error { + if !fileDataRow.IsDeleted { + return fmt.Errorf("file %d is not marked as deleted", fileDataRow.FileID) + } + fileID := fileDataRow.FileID + ownerID, err := c.FileRepo.GetOwnerID(fileID) + if err != nil { + return err + } + if fileDataRow.UserID != ownerID { + // this should never happen + panic(fmt.Sprintf("file %d does not belong to user %d", fileID, ownerID)) + } + ctxLogger := log.WithField("file_id", fileDataRow.DeleteFromBuckets).WithField("type", fileDataRow.Type).WithField("user_id", fileDataRow.UserID) + objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type) + bucketColumnMap := make(map[string]string) + bucketColumnMap, err = getMapOfBucketItToColumn(fileDataRow) + if err != nil { + ctxLogger.WithError(err).Error("Failed to get bucketColumnMap") + return err + } + // Delete objects and remove buckets + for bucketID, columnName := range bucketColumnMap { + for _, objectKey := range objectKeys { + err := c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKey, bucketID) + if err != nil { + ctxLogger.WithError(err).WithFields(logrus.Fields{ + "bucketID": bucketID, + "column": columnName, + "objectKey": objectKey, + }).Error("Failed to delete object from datacenter") + return err + } + } + dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, columnName) + if dbErr != nil { + ctxLogger.WithError(dbErr).WithFields(logrus.Fields{ + "bucketID": bucketID, + "column": columnName, + }).Error("Failed to remove bucket from db") + return dbErr + + } + } + // Delete from Latest bucket + for k := range objectKeys { + err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], fileDataRow.LatestBucket) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete object from datacenter") + return err + } + } + dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow) + if dbErr != nil { + ctxLogger.WithError(dbErr).Error("Failed to remove from db") + return err + } + return nil +} + +func getMapOfBucketItToColumn(row filedata.Row) (map[string]string, error) { bucketColumnMap := make(map[string]string) for _, bucketID := range row.DeleteFromBuckets { if existingColumn, exists := bucketColumnMap[bucketID]; exists { - return nil, fmt.Errorf("Duplicate DeleteFromBuckets ID found: %s in column %s", bucketID, existingColumn) + return nil, fmt.Errorf("duplicate DeleteFromBuckets ID found: %s in column %s", bucketID, existingColumn) } bucketColumnMap[bucketID] = fileDataRepo.DeletionColumn } for _, bucketID := range row.ReplicatedBuckets { if existingColumn, exists := bucketColumnMap[bucketID]; exists { - return nil, fmt.Errorf("Duplicate ReplicatedBuckets ID found: %s in column %s", bucketID, existingColumn) + return nil, fmt.Errorf("duplicate ReplicatedBuckets ID found: %s in column %s", bucketID, existingColumn) } bucketColumnMap[bucketID] = fileDataRepo.ReplicationColumn } for _, bucketID := range row.InflightReplicas { if existingColumn, exists := bucketColumnMap[bucketID]; exists { - return nil, fmt.Errorf("Duplicate InFlightBucketID found: %s in column %s", bucketID, existingColumn) + return nil, fmt.Errorf("duplicate InFlightBucketID found: %s in column %s", bucketID, existingColumn) } bucketColumnMap[bucketID] = fileDataRepo.InflightRepColumn } diff --git a/server/pkg/repo/filedata/repository.go b/server/pkg/repo/filedata/repository.go index 21208697bd..2ecabbb8b3 100644 --- a/server/pkg/repo/filedata/repository.go +++ b/server/pkg/repo/filedata/repository.go @@ -8,6 +8,7 @@ import ( "github.com/ente-io/museum/ente/filedata" "github.com/ente-io/stacktrace" "github.com/lib/pq" + "time" ) // Repository defines the methods for inserting, updating, and retrieving file data. @@ -157,6 +158,42 @@ func (r *Repository) MoveBetweenBuckets(row filedata.Row, bucketID string, sourc return nil } +// GetPendingSyncDataAndExtendLock in a transaction gets single file data row that has been deleted and pending sync is true and sync_lock_till is less than now_utc_micro_seconds() and extends the lock till newSyncLockTime +// This is used to lock the file data row for deletion and extend +func (r *Repository) GetPendingSyncDataAndExtendLock(ctx context.Context, newSyncLockTime int64, forDeletion bool) (*filedata.Row, error) { + // ensure newSyncLockTime is in the future + if newSyncLockTime < time.Now().Add(5*time.Minute).UnixMicro() { + return nil, stacktrace.NewError("newSyncLockTime should be at least 5min in the future") + } + tx, err := r.DB.BeginTx(ctx, nil) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + defer tx.Rollback() + row := tx.QueryRow(`SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at + FROM file_data + where pending_sync = true and is_deleted = $1 and sync_locked_till < now_utc_micro_seconds() + LIMIT 1 + FOR UPDATE SKIP LOCKED`, forDeletion) + var fileData filedata.Row + err = row.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + if fileData.SyncLockedTill > newSyncLockTime { + return nil, stacktrace.NewError(fmt.Sprintf("newSyncLockTime (%d) is less than existing SyncLockedTill(%d), newSync", newSyncLockTime, fileData.SyncLockedTill)) + } + _, err = tx.Exec(`UPDATE file_data SET sync_locked_till = $1 WHERE file_id = $2 AND data_type = $3 AND user_id = $4`, newSyncLockTime, fileData.FileID, string(fileData.Type), fileData.UserID) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + err = tx.Commit() + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return &fileData, nil +} + func (r *Repository) DeleteFileData(ctx context.Context, row filedata.Row) error { query := ` DELETE FROM file_data