From 272d17615ec44087998cfc20c7f18e6c382c3ff0 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 8 Aug 2024 13:03:34 +0530 Subject: [PATCH] [server] Remove fileData cleanup via queue --- server/cmd/museum/main.go | 13 +- server/pkg/controller/filedata/delete.go | 135 +++++++++----------- server/pkg/controller/filedata/replicate.go | 6 + server/pkg/repo/object.go | 7 +- server/pkg/repo/queue.go | 2 - 5 files changed, 72 insertions(+), 91 deletions(-) create mode 100644 server/pkg/controller/filedata/replicate.go diff --git a/server/cmd/museum/main.go b/server/cmd/museum/main.go index 10b5a7fc6c..8c9c88f304 100644 --- a/server/cmd/museum/main.go +++ b/server/cmd/museum/main.go @@ -706,11 +706,11 @@ func main() { publicAPI.GET("/offers/black-friday", offerHandler.GetBlackFridayOffers) setKnownAPIs(server.Routes()) - setupAndStartBackgroundJobs(objectCleanupController, replicationController3) + setupAndStartBackgroundJobs(objectCleanupController, replicationController3, fileDataCtrl) setupAndStartCrons( userAuthRepo, publicCollectionRepo, twoFactorRepo, passkeysRepo, fileController, taskLockingRepo, emailNotificationCtrl, trashController, pushController, objectController, dataCleanupController, storageBonusCtrl, - embeddingController, fileDataCtrl, healthCheckHandler, kexCtrl, castDb) + embeddingController, healthCheckHandler, kexCtrl, castDb) // Create a new collector, the name will be used as a label on the metrics collector := sqlstats.NewStatsCollector("prod_db", db) @@ -816,6 +816,7 @@ func setupDatabase() *sql.DB { func setupAndStartBackgroundJobs( objectCleanupController *controller.ObjectCleanupController, replicationController3 *controller.ReplicationController3, + fileDataCtrl *filedata.Controller, ) { isReplicationEnabled := viper.GetBool("replication.enabled") if isReplicationEnabled { @@ -823,9 +824,14 @@ func setupAndStartBackgroundJobs( if err != nil { log.Warnf("Could not start replication v3: %s", err) } + err = fileDataCtrl.StartReplication() + if err != nil { + log.Warnf("Could not start fileData replication: %s", err) + } } else { log.Info("Skipping Replication as replication is disabled") } + fileDataCtrl.StartDataDeletion() // Start data deletion for file data; objectCleanupController.StartRemovingUnreportedObjects() objectCleanupController.StartClearingOrphanObjects() @@ -839,7 +845,6 @@ func setupAndStartCrons(userAuthRepo *repo.UserAuthRepository, publicCollectionR dataCleanupCtrl *dataCleanupCtrl.DeleteUserCleanupController, storageBonusCtrl *storagebonus.Controller, embeddingCtrl *embeddingCtrl.Controller, - fileDataCtrl *filedata.Controller, healthCheckHandler *api.HealthCheckHandler, kexCtrl *kexCtrl.Controller, castDb castRepo.Repository) { @@ -883,10 +888,8 @@ func setupAndStartCrons(userAuthRepo *repo.UserAuthRepository, publicCollectionR schedule(c, "@every 2m", func() { fileController.CleanupDeletedFiles() }) - fileDataCtrl.CleanUpDeletedFileData() schedule(c, "@every 101s", func() { embeddingCtrl.CleanupDeletedEmbeddings() - fileDataCtrl.CleanUpDeletedFileData() }) schedule(c, "@every 10m", func() { diff --git a/server/pkg/controller/filedata/delete.go b/server/pkg/controller/filedata/delete.go index 1673e7c0fc..799844d06b 100644 --- a/server/pkg/controller/filedata/delete.go +++ b/server/pkg/controller/filedata/delete.go @@ -4,93 +4,58 @@ import ( "context" "fmt" "github.com/ente-io/museum/ente/filedata" - "github.com/ente-io/museum/pkg/repo" fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata" - "github.com/ente-io/museum/pkg/utils/time" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" - "strconv" ) -// CleanUpDeletedFileData clears associated file data from the object store -func (c *Controller) CleanUpDeletedFileData() { +// StartDataDeletion clears associated file data from the object store +func (c *Controller) StartDataDeletion() { log.Info("Cleaning up deleted file data") - if c.cleanupCronRunning { - log.Info("Skipping CleanUpDeletedFileData cron run as another instance is still running") - return - } - c.cleanupCronRunning = true - defer func() { - c.cleanupCronRunning = false - }() - items, err := c.QueueRepo.GetItemsReadyForDeletion(repo.DeleteFileDataQueue, 200) - if err != nil { - log.WithError(err).Error("Failed to fetch items from queue") - return - } - for _, i := range items { - c.deleteFileData(i) - } + // todo: start goroutine workers to delete data + } -func (c *Controller) deleteFileData(qItem repo.QueueItem) { - lockName := fmt.Sprintf("FileDataDelete:%s", qItem.Item) - lockStatus, err := c.TaskLockingRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), c.HostName) - ctxLogger := log.WithField("item", qItem.Item).WithField("queue_id", qItem.Id) - if err != nil || !lockStatus { - ctxLogger.Warn("unable to acquire lock") - return - } - defer func() { - err = c.TaskLockingRepo.ReleaseLock(lockName) - if err != nil { - ctxLogger.Errorf("Error while releasing lock %s", err) - } - }() - ctxLogger.Debug("Deleting all file data") - fileID, _ := strconv.ParseInt(qItem.Item, 10, 64) +func (c *Controller) DeleteFileData(fileID int64) error { ownerID, err := c.FileRepo.GetOwnerID(fileID) if err != nil { - ctxLogger.WithError(err).Error("Failed to fetch ownerID") - return + return err } rows, err := c.Repo.GetFileData(context.Background(), fileID) if err != nil { - ctxLogger.WithError(err).Error("Failed to fetch datacenters") - return + return err } for i := range rows { fileDataRow := rows[i] + ctxLogger := log.WithField("file_id", fileDataRow.DeleteFromBuckets).WithField("type", fileDataRow.Type).WithField("user_id", fileDataRow.UserID) objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type) - // Delete from delete/stale buckets - for j := range fileDataRow.DeleteFromBuckets { - bucketID := fileDataRow.DeleteFromBuckets[j] - for k := range objectKeys { - err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], bucketID) - if err != nil { - ctxLogger.WithError(err).Error("Failed to delete object from datacenter") - return - } - } - dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, fileDataRepo.DeletionColumn) - if dbErr != nil { - ctxLogger.WithError(dbErr).Error("Failed to remove from db") - return - } + bucketColumnMap := make(map[string]string) + bucketColumnMap, err = getMapOfbucketItToColumn(fileDataRow) + if err != nil { + ctxLogger.WithError(err).Error("Failed to get bucketColumnMap") + return err } - // Delete from replicated buckets - for j := range fileDataRow.ReplicatedBuckets { - bucketID := fileDataRow.ReplicatedBuckets[j] - for k := range objectKeys { - err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], bucketID) + // Delete objects and remove buckets + for bucketID, columnName := range bucketColumnMap { + for _, objectKey := range objectKeys { + err := c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKey, bucketID) if err != nil { - ctxLogger.WithError(err).Error("Failed to delete object from datacenter") - return + ctxLogger.WithError(err).WithFields(logrus.Fields{ + "bucketID": bucketID, + "column": columnName, + "objectKey": objectKey, + }).Error("Failed to delete object from datacenter") + return err } } - dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, fileDataRepo.ReplicationColumn) + dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, columnName) if dbErr != nil { - ctxLogger.WithError(dbErr).Error("Failed to remove from db") - return + ctxLogger.WithError(dbErr).WithFields(logrus.Fields{ + "bucketID": bucketID, + "column": columnName, + }).Error("Failed to remove bucket from db") + return dbErr + } } // Delete from Latest bucket @@ -98,23 +63,37 @@ func (c *Controller) deleteFileData(qItem repo.QueueItem) { err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], fileDataRow.LatestBucket) if err != nil { ctxLogger.WithError(err).Error("Failed to delete object from datacenter") - return + return err } } dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow) if dbErr != nil { ctxLogger.WithError(dbErr).Error("Failed to remove from db") - return + return err } } - if err != nil { - ctxLogger.WithError(err).Error("Failed delete data") - return - } - err = c.QueueRepo.DeleteItem(repo.DeleteFileDataQueue, qItem.Item) - if err != nil { - ctxLogger.WithError(err).Error("Failed to remove item from the queue") - return - } - ctxLogger.Info("Successfully deleted all file data") + return nil +} + +func getMapOfbucketItToColumn(row filedata.Row) (map[string]string, error) { + bucketColumnMap := make(map[string]string) + for _, bucketID := range row.DeleteFromBuckets { + if existingColumn, exists := bucketColumnMap[bucketID]; exists { + return nil, fmt.Errorf("Duplicate DeleteFromBuckets ID found: %s in column %s", bucketID, existingColumn) + } + bucketColumnMap[bucketID] = fileDataRepo.DeletionColumn + } + for _, bucketID := range row.ReplicatedBuckets { + if existingColumn, exists := bucketColumnMap[bucketID]; exists { + return nil, fmt.Errorf("Duplicate ReplicatedBuckets ID found: %s in column %s", bucketID, existingColumn) + } + bucketColumnMap[bucketID] = fileDataRepo.ReplicationColumn + } + for _, bucketID := range row.InflightReplicas { + if existingColumn, exists := bucketColumnMap[bucketID]; exists { + return nil, fmt.Errorf("Duplicate InFlightBucketID found: %s in column %s", bucketID, existingColumn) + } + bucketColumnMap[bucketID] = fileDataRepo.InflightRepColumn + } + return bucketColumnMap, nil } diff --git a/server/pkg/controller/filedata/replicate.go b/server/pkg/controller/filedata/replicate.go new file mode 100644 index 0000000000..7baa814199 --- /dev/null +++ b/server/pkg/controller/filedata/replicate.go @@ -0,0 +1,6 @@ +package filedata + +func (c *Controller) StartReplication() error { + // todo: implement replication logic + return nil +} diff --git a/server/pkg/repo/object.go b/server/pkg/repo/object.go index fc02e2d25c..e5e7e49996 100644 --- a/server/pkg/repo/object.go +++ b/server/pkg/repo/object.go @@ -148,7 +148,7 @@ func (repo *ObjectRepository) MarkObjectsAsDeletedForFileIDs(ctx context.Context for _, fileID := range fileIDs { embeddingsToBeDeleted = append(embeddingsToBeDeleted, strconv.FormatInt(fileID, 10)) } - _, err = tx.ExecContext(ctx, `UPDATE file_data SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs)) + _, err = tx.ExecContext(ctx, `UPDATE file_data SET is_deleted = TRUE, pending_sync = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs)) if err != nil { return nil, stacktrace.Propagate(err, "") } @@ -158,11 +158,6 @@ func (repo *ObjectRepository) MarkObjectsAsDeletedForFileIDs(ctx context.Context return nil, stacktrace.Propagate(err, "") } - err = repo.QueueRepo.AddItems(ctx, tx, DeleteFileDataQueue, embeddingsToBeDeleted) - if err != nil { - return nil, stacktrace.Propagate(err, "") - } - _, err = tx.ExecContext(ctx, `UPDATE object_keys SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs)) if err != nil { return nil, stacktrace.Propagate(err, "") diff --git a/server/pkg/repo/queue.go b/server/pkg/repo/queue.go index e4800aea9c..49544dbc8c 100644 --- a/server/pkg/repo/queue.go +++ b/server/pkg/repo/queue.go @@ -23,7 +23,6 @@ var itemDeletionDelayInMinMap = map[string]int64{ DropFileEncMedataQueue: -1 * 24 * 60, // -ve value to ensure attributes are immediately removed DeleteObjectQueue: 45 * 24 * 60, // 45 days in minutes DeleteEmbeddingsQueue: -1 * 24 * 60, // -ve value to ensure embeddings are immediately removed - DeleteFileDataQueue: -1 * 24 * 60, // -ve value to ensure file-data is immediately removed TrashCollectionQueueV3: -1 * 24 * 60, // -ve value to ensure collections are immediately marked as trashed TrashEmptyQueue: -1 * 24 * 60, // -ve value to ensure empty trash request are processed in next cron run RemoveComplianceHoldQueue: -1 * 24 * 60, // -ve value to ensure compliance hold is removed in next cron run @@ -33,7 +32,6 @@ const ( DropFileEncMedataQueue string = "dropFileEncMetata" DeleteObjectQueue string = "deleteObject" DeleteEmbeddingsQueue string = "deleteEmbedding" - DeleteFileDataQueue string = "deleteFileData" OutdatedObjectsQueue string = "outdatedObject" // Deprecated: Keeping it till we clean up items from the queue DB. TrashCollectionQueue string = "trashCollection"