[server] Implement file data deleted using existing table as queue
This commit is contained in:
@@ -2,96 +2,160 @@ package filedata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/ente-io/museum/ente/filedata"
|
||||
fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata"
|
||||
enteTime "github.com/ente-io/museum/pkg/utils/time"
|
||||
"github.com/sirupsen/logrus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"time"
|
||||
)
|
||||
|
||||
// StartDataDeletion clears associated file data from the object store
|
||||
func (c *Controller) StartDataDeletion() {
|
||||
log.Info("Cleaning up deleted file data")
|
||||
// todo: start goroutine workers to delete data
|
||||
|
||||
go c.startDeleteWorkers(5)
|
||||
}
|
||||
|
||||
func (c *Controller) DeleteFileData(fileID int64) error {
|
||||
ownerID, err := c.FileRepo.GetOwnerID(fileID)
|
||||
func (c *Controller) startDeleteWorkers(n int) {
|
||||
log.Infof("Starting %d delete workers for fileData", n)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
go c.delete(i)
|
||||
// Stagger the workers
|
||||
time.Sleep(time.Duration(2*i+1) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Entry point for the delete worker (goroutine)
|
||||
//
|
||||
// i is an arbitrary index of the current routine.
|
||||
func (c *Controller) delete(i int) {
|
||||
// This is just
|
||||
//
|
||||
// while (true) { delete() }
|
||||
//
|
||||
// but with an extra sleep for a bit if nothing got deleted - both when
|
||||
// something's wrong, or there's nothing to do.
|
||||
for {
|
||||
err := c.tryDelete()
|
||||
if err != nil {
|
||||
// Sleep in proportion to the (arbitrary) index to space out the
|
||||
// workers further.
|
||||
time.Sleep(time.Duration(i+1) * time.Minute)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) tryDelete() error {
|
||||
newLockTime := enteTime.MicrosecondsAfterMinutes(10)
|
||||
row, err := c.Repo.GetPendingSyncDataAndExtendLock(context.Background(), newLockTime, true)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
log.Errorf("Could not fetch row for deletion: %s", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
err = c.DeleteFileRow(*row)
|
||||
if err != nil {
|
||||
log.Errorf("Could not delete file data: %s", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (c *Controller) DeleteFileData(fileID int64) error {
|
||||
rows, err := c.Repo.GetFileData(context.Background(), fileID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range rows {
|
||||
fileDataRow := rows[i]
|
||||
ctxLogger := log.WithField("file_id", fileDataRow.DeleteFromBuckets).WithField("type", fileDataRow.Type).WithField("user_id", fileDataRow.UserID)
|
||||
objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type)
|
||||
bucketColumnMap := make(map[string]string)
|
||||
bucketColumnMap, err = getMapOfbucketItToColumn(fileDataRow)
|
||||
err = c.DeleteFileRow(fileDataRow)
|
||||
if err != nil {
|
||||
ctxLogger.WithError(err).Error("Failed to get bucketColumnMap")
|
||||
return err
|
||||
}
|
||||
// Delete objects and remove buckets
|
||||
for bucketID, columnName := range bucketColumnMap {
|
||||
for _, objectKey := range objectKeys {
|
||||
err := c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKey, bucketID)
|
||||
if err != nil {
|
||||
ctxLogger.WithError(err).WithFields(logrus.Fields{
|
||||
"bucketID": bucketID,
|
||||
"column": columnName,
|
||||
"objectKey": objectKey,
|
||||
}).Error("Failed to delete object from datacenter")
|
||||
return err
|
||||
}
|
||||
}
|
||||
dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, columnName)
|
||||
if dbErr != nil {
|
||||
ctxLogger.WithError(dbErr).WithFields(logrus.Fields{
|
||||
"bucketID": bucketID,
|
||||
"column": columnName,
|
||||
}).Error("Failed to remove bucket from db")
|
||||
return dbErr
|
||||
|
||||
}
|
||||
}
|
||||
// Delete from Latest bucket
|
||||
for k := range objectKeys {
|
||||
err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], fileDataRow.LatestBucket)
|
||||
if err != nil {
|
||||
ctxLogger.WithError(err).Error("Failed to delete object from datacenter")
|
||||
return err
|
||||
}
|
||||
}
|
||||
dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow)
|
||||
if dbErr != nil {
|
||||
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMapOfbucketItToColumn(row filedata.Row) (map[string]string, error) {
|
||||
func (c *Controller) DeleteFileRow(fileDataRow filedata.Row) error {
|
||||
if !fileDataRow.IsDeleted {
|
||||
return fmt.Errorf("file %d is not marked as deleted", fileDataRow.FileID)
|
||||
}
|
||||
fileID := fileDataRow.FileID
|
||||
ownerID, err := c.FileRepo.GetOwnerID(fileID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fileDataRow.UserID != ownerID {
|
||||
// this should never happen
|
||||
panic(fmt.Sprintf("file %d does not belong to user %d", fileID, ownerID))
|
||||
}
|
||||
ctxLogger := log.WithField("file_id", fileDataRow.DeleteFromBuckets).WithField("type", fileDataRow.Type).WithField("user_id", fileDataRow.UserID)
|
||||
objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type)
|
||||
bucketColumnMap := make(map[string]string)
|
||||
bucketColumnMap, err = getMapOfBucketItToColumn(fileDataRow)
|
||||
if err != nil {
|
||||
ctxLogger.WithError(err).Error("Failed to get bucketColumnMap")
|
||||
return err
|
||||
}
|
||||
// Delete objects and remove buckets
|
||||
for bucketID, columnName := range bucketColumnMap {
|
||||
for _, objectKey := range objectKeys {
|
||||
err := c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKey, bucketID)
|
||||
if err != nil {
|
||||
ctxLogger.WithError(err).WithFields(logrus.Fields{
|
||||
"bucketID": bucketID,
|
||||
"column": columnName,
|
||||
"objectKey": objectKey,
|
||||
}).Error("Failed to delete object from datacenter")
|
||||
return err
|
||||
}
|
||||
}
|
||||
dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, columnName)
|
||||
if dbErr != nil {
|
||||
ctxLogger.WithError(dbErr).WithFields(logrus.Fields{
|
||||
"bucketID": bucketID,
|
||||
"column": columnName,
|
||||
}).Error("Failed to remove bucket from db")
|
||||
return dbErr
|
||||
|
||||
}
|
||||
}
|
||||
// Delete from Latest bucket
|
||||
for k := range objectKeys {
|
||||
err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], fileDataRow.LatestBucket)
|
||||
if err != nil {
|
||||
ctxLogger.WithError(err).Error("Failed to delete object from datacenter")
|
||||
return err
|
||||
}
|
||||
}
|
||||
dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow)
|
||||
if dbErr != nil {
|
||||
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMapOfBucketItToColumn(row filedata.Row) (map[string]string, error) {
|
||||
bucketColumnMap := make(map[string]string)
|
||||
for _, bucketID := range row.DeleteFromBuckets {
|
||||
if existingColumn, exists := bucketColumnMap[bucketID]; exists {
|
||||
return nil, fmt.Errorf("Duplicate DeleteFromBuckets ID found: %s in column %s", bucketID, existingColumn)
|
||||
return nil, fmt.Errorf("duplicate DeleteFromBuckets ID found: %s in column %s", bucketID, existingColumn)
|
||||
}
|
||||
bucketColumnMap[bucketID] = fileDataRepo.DeletionColumn
|
||||
}
|
||||
for _, bucketID := range row.ReplicatedBuckets {
|
||||
if existingColumn, exists := bucketColumnMap[bucketID]; exists {
|
||||
return nil, fmt.Errorf("Duplicate ReplicatedBuckets ID found: %s in column %s", bucketID, existingColumn)
|
||||
return nil, fmt.Errorf("duplicate ReplicatedBuckets ID found: %s in column %s", bucketID, existingColumn)
|
||||
}
|
||||
bucketColumnMap[bucketID] = fileDataRepo.ReplicationColumn
|
||||
}
|
||||
for _, bucketID := range row.InflightReplicas {
|
||||
if existingColumn, exists := bucketColumnMap[bucketID]; exists {
|
||||
return nil, fmt.Errorf("Duplicate InFlightBucketID found: %s in column %s", bucketID, existingColumn)
|
||||
return nil, fmt.Errorf("duplicate InFlightBucketID found: %s in column %s", bucketID, existingColumn)
|
||||
}
|
||||
bucketColumnMap[bucketID] = fileDataRepo.InflightRepColumn
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/ente-io/museum/ente/filedata"
|
||||
"github.com/ente-io/stacktrace"
|
||||
"github.com/lib/pq"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Repository defines the methods for inserting, updating, and retrieving file data.
|
||||
@@ -157,6 +158,42 @@ func (r *Repository) MoveBetweenBuckets(row filedata.Row, bucketID string, sourc
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPendingSyncDataAndExtendLock in a transaction gets single file data row that has been deleted and pending sync is true and sync_lock_till is less than now_utc_micro_seconds() and extends the lock till newSyncLockTime
|
||||
// This is used to lock the file data row for deletion and extend
|
||||
func (r *Repository) GetPendingSyncDataAndExtendLock(ctx context.Context, newSyncLockTime int64, forDeletion bool) (*filedata.Row, error) {
|
||||
// ensure newSyncLockTime is in the future
|
||||
if newSyncLockTime < time.Now().Add(5*time.Minute).UnixMicro() {
|
||||
return nil, stacktrace.NewError("newSyncLockTime should be at least 5min in the future")
|
||||
}
|
||||
tx, err := r.DB.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
}
|
||||
defer tx.Rollback()
|
||||
row := tx.QueryRow(`SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at
|
||||
FROM file_data
|
||||
where pending_sync = true and is_deleted = $1 and sync_locked_till < now_utc_micro_seconds()
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED`, forDeletion)
|
||||
var fileData filedata.Row
|
||||
err = row.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt)
|
||||
if err != nil {
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
}
|
||||
if fileData.SyncLockedTill > newSyncLockTime {
|
||||
return nil, stacktrace.NewError(fmt.Sprintf("newSyncLockTime (%d) is less than existing SyncLockedTill(%d), newSync", newSyncLockTime, fileData.SyncLockedTill))
|
||||
}
|
||||
_, err = tx.Exec(`UPDATE file_data SET sync_locked_till = $1 WHERE file_id = $2 AND data_type = $3 AND user_id = $4`, newSyncLockTime, fileData.FileID, string(fileData.Type), fileData.UserID)
|
||||
if err != nil {
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
}
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
}
|
||||
return &fileData, nil
|
||||
}
|
||||
|
||||
func (r *Repository) DeleteFileData(ctx context.Context, row filedata.Row) error {
|
||||
query := `
|
||||
DELETE FROM file_data
|
||||
|
||||
Reference in New Issue
Block a user