[server] Remove fileData cleanup via queue

This commit is contained in:
Neeraj Gupta
2024-08-08 13:03:34 +05:30
parent 50f6fd7440
commit 272d17615e
5 changed files with 72 additions and 91 deletions

View File

@@ -706,11 +706,11 @@ func main() {
publicAPI.GET("/offers/black-friday", offerHandler.GetBlackFridayOffers)
setKnownAPIs(server.Routes())
setupAndStartBackgroundJobs(objectCleanupController, replicationController3)
setupAndStartBackgroundJobs(objectCleanupController, replicationController3, fileDataCtrl)
setupAndStartCrons(
userAuthRepo, publicCollectionRepo, twoFactorRepo, passkeysRepo, fileController, taskLockingRepo, emailNotificationCtrl,
trashController, pushController, objectController, dataCleanupController, storageBonusCtrl,
embeddingController, fileDataCtrl, healthCheckHandler, kexCtrl, castDb)
embeddingController, healthCheckHandler, kexCtrl, castDb)
// Create a new collector, the name will be used as a label on the metrics
collector := sqlstats.NewStatsCollector("prod_db", db)
@@ -816,6 +816,7 @@ func setupDatabase() *sql.DB {
func setupAndStartBackgroundJobs(
objectCleanupController *controller.ObjectCleanupController,
replicationController3 *controller.ReplicationController3,
fileDataCtrl *filedata.Controller,
) {
isReplicationEnabled := viper.GetBool("replication.enabled")
if isReplicationEnabled {
@@ -823,9 +824,14 @@ func setupAndStartBackgroundJobs(
if err != nil {
log.Warnf("Could not start replication v3: %s", err)
}
err = fileDataCtrl.StartReplication()
if err != nil {
log.Warnf("Could not start fileData replication: %s", err)
}
} else {
log.Info("Skipping Replication as replication is disabled")
}
fileDataCtrl.StartDataDeletion() // Start data deletion for file data;
objectCleanupController.StartRemovingUnreportedObjects()
objectCleanupController.StartClearingOrphanObjects()
@@ -839,7 +845,6 @@ func setupAndStartCrons(userAuthRepo *repo.UserAuthRepository, publicCollectionR
dataCleanupCtrl *dataCleanupCtrl.DeleteUserCleanupController,
storageBonusCtrl *storagebonus.Controller,
embeddingCtrl *embeddingCtrl.Controller,
fileDataCtrl *filedata.Controller,
healthCheckHandler *api.HealthCheckHandler,
kexCtrl *kexCtrl.Controller,
castDb castRepo.Repository) {
@@ -883,10 +888,8 @@ func setupAndStartCrons(userAuthRepo *repo.UserAuthRepository, publicCollectionR
schedule(c, "@every 2m", func() {
fileController.CleanupDeletedFiles()
})
fileDataCtrl.CleanUpDeletedFileData()
schedule(c, "@every 101s", func() {
embeddingCtrl.CleanupDeletedEmbeddings()
fileDataCtrl.CleanUpDeletedFileData()
})
schedule(c, "@every 10m", func() {

View File

@@ -4,93 +4,58 @@ import (
"context"
"fmt"
"github.com/ente-io/museum/ente/filedata"
"github.com/ente-io/museum/pkg/repo"
fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata"
"github.com/ente-io/museum/pkg/utils/time"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"strconv"
)
// CleanUpDeletedFileData clears associated file data from the object store
func (c *Controller) CleanUpDeletedFileData() {
// StartDataDeletion clears associated file data from the object store
func (c *Controller) StartDataDeletion() {
log.Info("Cleaning up deleted file data")
if c.cleanupCronRunning {
log.Info("Skipping CleanUpDeletedFileData cron run as another instance is still running")
return
}
c.cleanupCronRunning = true
defer func() {
c.cleanupCronRunning = false
}()
items, err := c.QueueRepo.GetItemsReadyForDeletion(repo.DeleteFileDataQueue, 200)
if err != nil {
log.WithError(err).Error("Failed to fetch items from queue")
return
}
for _, i := range items {
c.deleteFileData(i)
}
// todo: start goroutine workers to delete data
}
func (c *Controller) deleteFileData(qItem repo.QueueItem) {
lockName := fmt.Sprintf("FileDataDelete:%s", qItem.Item)
lockStatus, err := c.TaskLockingRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), c.HostName)
ctxLogger := log.WithField("item", qItem.Item).WithField("queue_id", qItem.Id)
if err != nil || !lockStatus {
ctxLogger.Warn("unable to acquire lock")
return
}
defer func() {
err = c.TaskLockingRepo.ReleaseLock(lockName)
if err != nil {
ctxLogger.Errorf("Error while releasing lock %s", err)
}
}()
ctxLogger.Debug("Deleting all file data")
fileID, _ := strconv.ParseInt(qItem.Item, 10, 64)
func (c *Controller) DeleteFileData(fileID int64) error {
ownerID, err := c.FileRepo.GetOwnerID(fileID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to fetch ownerID")
return
return err
}
rows, err := c.Repo.GetFileData(context.Background(), fileID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to fetch datacenters")
return
return err
}
for i := range rows {
fileDataRow := rows[i]
ctxLogger := log.WithField("file_id", fileDataRow.DeleteFromBuckets).WithField("type", fileDataRow.Type).WithField("user_id", fileDataRow.UserID)
objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type)
// Delete from delete/stale buckets
for j := range fileDataRow.DeleteFromBuckets {
bucketID := fileDataRow.DeleteFromBuckets[j]
for k := range objectKeys {
err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], bucketID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete object from datacenter")
return
}
}
dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, fileDataRepo.DeletionColumn)
if dbErr != nil {
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
return
}
bucketColumnMap := make(map[string]string)
bucketColumnMap, err = getMapOfbucketItToColumn(fileDataRow)
if err != nil {
ctxLogger.WithError(err).Error("Failed to get bucketColumnMap")
return err
}
// Delete from replicated buckets
for j := range fileDataRow.ReplicatedBuckets {
bucketID := fileDataRow.ReplicatedBuckets[j]
for k := range objectKeys {
err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], bucketID)
// Delete objects and remove buckets
for bucketID, columnName := range bucketColumnMap {
for _, objectKey := range objectKeys {
err := c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKey, bucketID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete object from datacenter")
return
ctxLogger.WithError(err).WithFields(logrus.Fields{
"bucketID": bucketID,
"column": columnName,
"objectKey": objectKey,
}).Error("Failed to delete object from datacenter")
return err
}
}
dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, fileDataRepo.ReplicationColumn)
dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, columnName)
if dbErr != nil {
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
return
ctxLogger.WithError(dbErr).WithFields(logrus.Fields{
"bucketID": bucketID,
"column": columnName,
}).Error("Failed to remove bucket from db")
return dbErr
}
}
// Delete from Latest bucket
@@ -98,23 +63,37 @@ func (c *Controller) deleteFileData(qItem repo.QueueItem) {
err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], fileDataRow.LatestBucket)
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete object from datacenter")
return
return err
}
}
dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow)
if dbErr != nil {
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
return
return err
}
}
if err != nil {
ctxLogger.WithError(err).Error("Failed delete data")
return
}
err = c.QueueRepo.DeleteItem(repo.DeleteFileDataQueue, qItem.Item)
if err != nil {
ctxLogger.WithError(err).Error("Failed to remove item from the queue")
return
}
ctxLogger.Info("Successfully deleted all file data")
return nil
}
func getMapOfbucketItToColumn(row filedata.Row) (map[string]string, error) {
bucketColumnMap := make(map[string]string)
for _, bucketID := range row.DeleteFromBuckets {
if existingColumn, exists := bucketColumnMap[bucketID]; exists {
return nil, fmt.Errorf("Duplicate DeleteFromBuckets ID found: %s in column %s", bucketID, existingColumn)
}
bucketColumnMap[bucketID] = fileDataRepo.DeletionColumn
}
for _, bucketID := range row.ReplicatedBuckets {
if existingColumn, exists := bucketColumnMap[bucketID]; exists {
return nil, fmt.Errorf("Duplicate ReplicatedBuckets ID found: %s in column %s", bucketID, existingColumn)
}
bucketColumnMap[bucketID] = fileDataRepo.ReplicationColumn
}
for _, bucketID := range row.InflightReplicas {
if existingColumn, exists := bucketColumnMap[bucketID]; exists {
return nil, fmt.Errorf("Duplicate InFlightBucketID found: %s in column %s", bucketID, existingColumn)
}
bucketColumnMap[bucketID] = fileDataRepo.InflightRepColumn
}
return bucketColumnMap, nil
}

View File

@@ -0,0 +1,6 @@
package filedata
func (c *Controller) StartReplication() error {
// todo: implement replication logic
return nil
}

View File

@@ -148,7 +148,7 @@ func (repo *ObjectRepository) MarkObjectsAsDeletedForFileIDs(ctx context.Context
for _, fileID := range fileIDs {
embeddingsToBeDeleted = append(embeddingsToBeDeleted, strconv.FormatInt(fileID, 10))
}
_, err = tx.ExecContext(ctx, `UPDATE file_data SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs))
_, err = tx.ExecContext(ctx, `UPDATE file_data SET is_deleted = TRUE, pending_sync = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs))
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
@@ -158,11 +158,6 @@ func (repo *ObjectRepository) MarkObjectsAsDeletedForFileIDs(ctx context.Context
return nil, stacktrace.Propagate(err, "")
}
err = repo.QueueRepo.AddItems(ctx, tx, DeleteFileDataQueue, embeddingsToBeDeleted)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
_, err = tx.ExecContext(ctx, `UPDATE object_keys SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs))
if err != nil {
return nil, stacktrace.Propagate(err, "")

View File

@@ -23,7 +23,6 @@ var itemDeletionDelayInMinMap = map[string]int64{
DropFileEncMedataQueue: -1 * 24 * 60, // -ve value to ensure attributes are immediately removed
DeleteObjectQueue: 45 * 24 * 60, // 45 days in minutes
DeleteEmbeddingsQueue: -1 * 24 * 60, // -ve value to ensure embeddings are immediately removed
DeleteFileDataQueue: -1 * 24 * 60, // -ve value to ensure file-data is immediately removed
TrashCollectionQueueV3: -1 * 24 * 60, // -ve value to ensure collections are immediately marked as trashed
TrashEmptyQueue: -1 * 24 * 60, // -ve value to ensure empty trash request are processed in next cron run
RemoveComplianceHoldQueue: -1 * 24 * 60, // -ve value to ensure compliance hold is removed in next cron run
@@ -33,7 +32,6 @@ const (
DropFileEncMedataQueue string = "dropFileEncMetata"
DeleteObjectQueue string = "deleteObject"
DeleteEmbeddingsQueue string = "deleteEmbedding"
DeleteFileDataQueue string = "deleteFileData"
OutdatedObjectsQueue string = "outdatedObject"
// Deprecated: Keeping it till we clean up items from the queue DB.
TrashCollectionQueue string = "trashCollection"