diff --git a/server/pkg/controller/filedata/video.go b/server/pkg/controller/filedata/video.go index 8fba680e44..b0d1b4a8de 100644 --- a/server/pkg/controller/filedata/video.go +++ b/server/pkg/controller/filedata/video.go @@ -9,6 +9,7 @@ import ( "github.com/ente-io/stacktrace" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" + "strings" ) func (c *Controller) InsertVideoPreview(ctx *gin.Context, req *filedata.VidPreviewRequest) error { @@ -40,6 +41,7 @@ func (c *Controller) InsertVideoPreview(ctx *gin.Context, req *filedata.VidPrevi WithField("objectKey", objectKey). WithField("fileID", req.FileID). WithField("type", ente.PreviewVideo) + size, uploadErr := c.uploadObject(obj, objectKey, bucketID) if uploadErr != nil { logger.WithError(uploadErr).Error("upload failed") @@ -57,10 +59,49 @@ func (c *Controller) InsertVideoPreview(ctx *gin.Context, req *filedata.VidPrevi } dbInsertErr := c.Repo.InsertOrUpdatePreviewData(context.Background(), row, fileObjectKey) if dbInsertErr != nil { - logger.WithError(dbInsertErr).Error("insert or update failed") + if strings.Contains(dbInsertErr.Error(), "failed to remove object from tempObjects") { + isDuplicate, checkErr := c._checkIfDuplicateRequest(ctx, row, fileObjectKey) + if checkErr != nil { + logger.WithError(checkErr).Error("failed to check for duplicate request") + // continue with existing dbInsertErr + } + if isDuplicate { + logger.Info("duplicate put request detected, ignoring") + return nil + } + } return stacktrace.Propagate(dbInsertErr, "failed to insert or update preview data") } - return nil - +} + +func (c *Controller) _checkIfDuplicateRequest(ctx *gin.Context, row filedata.Row, fileObjectKey string) (bool, error) { + exists, err := c.Repo.ObjectCleanupRepo.DoesTempObjectExist(ctx, fileObjectKey, row.LatestBucket) + if err != nil { + return false, stacktrace.Propagate(err, "failed to check if duplicate request") + } + if exists { + return false, nil + } + data, dataErr := c.Repo.GetFilesData(ctx, row.Type, []int64{row.FileID}) + if dataErr != nil { + return false, stacktrace.Propagate(dataErr, "failed to get files data") + } + if len(data) == 0 { + return false, nil + } + if len(data) > 1 { + return false, stacktrace.NewError("multiple rows found for fileID %d", row.FileID) + } + if data[0].LatestBucket == row.LatestBucket && + data[0].ObjectID != nil && *data[0].ObjectID == *row.ObjectID && + data[0].ObjectSize != nil && *data[0].ObjectSize == *row.ObjectSize { + log.WithField("fileID", row.FileID).WithField("objectID", row.ObjectID). + Info("duplicate put request detected") + return true, nil + } else { + log.WithField("fileID", row.FileID).WithField("objectID", row.ObjectID). + Info("duplicate put request not detected, existing data does not match") + } + return false, nil } diff --git a/server/pkg/repo/object_cleanup.go b/server/pkg/repo/object_cleanup.go index e568007963..772f75d4c6 100644 --- a/server/pkg/repo/object_cleanup.go +++ b/server/pkg/repo/object_cleanup.go @@ -57,6 +57,19 @@ func (repo *ObjectCleanupRepository) RemoveTempObjectFromDC(ctx context.Context, return nil } +func (repo *ObjectCleanupRepository) DoesTempObjectExist(ctx context.Context, objectKey string, uploadID string) (bool, error) { + var exists bool + query := `SELECT EXISTS(SELECT 1 FROM temp_objects WHERE object_key = $1 AND upload_id = $2)` + err := repo.DB.QueryRowContext(ctx, query, objectKey, uploadID).Scan(&exists) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return false, stacktrace.Propagate(err, "failed to check if temp object exists") + } + return exists, nil +} + // GetExpiredObjects returns the list of object keys that have expired func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente.TempObject, error) { tx, err := repo.DB.Begin()