Merge remote-tracking branch 'origin/main' into improve-bg-processing
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/ente-io/stacktrace"
|
||||
"github.com/gin-gonic/gin"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func (c *Controller) InsertVideoPreview(ctx *gin.Context, req *filedata.VidPreviewRequest) error {
|
||||
@@ -40,6 +41,7 @@ func (c *Controller) InsertVideoPreview(ctx *gin.Context, req *filedata.VidPrevi
|
||||
WithField("objectKey", objectKey).
|
||||
WithField("fileID", req.FileID).
|
||||
WithField("type", ente.PreviewVideo)
|
||||
|
||||
size, uploadErr := c.uploadObject(obj, objectKey, bucketID)
|
||||
if uploadErr != nil {
|
||||
logger.WithError(uploadErr).Error("upload failed")
|
||||
@@ -57,10 +59,49 @@ func (c *Controller) InsertVideoPreview(ctx *gin.Context, req *filedata.VidPrevi
|
||||
}
|
||||
dbInsertErr := c.Repo.InsertOrUpdatePreviewData(context.Background(), row, fileObjectKey)
|
||||
if dbInsertErr != nil {
|
||||
logger.WithError(dbInsertErr).Error("insert or update failed")
|
||||
if strings.Contains(dbInsertErr.Error(), "failed to remove object from tempObjects") {
|
||||
isDuplicate, checkErr := c._checkIfDuplicateRequest(ctx, row, fileObjectKey)
|
||||
if checkErr != nil {
|
||||
logger.WithError(checkErr).Error("failed to check for duplicate request")
|
||||
// continue with existing dbInsertErr
|
||||
}
|
||||
if isDuplicate {
|
||||
logger.Info("duplicate put request detected, ignoring")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return stacktrace.Propagate(dbInsertErr, "failed to insert or update preview data")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (c *Controller) _checkIfDuplicateRequest(ctx *gin.Context, row filedata.Row, fileObjectKey string) (bool, error) {
|
||||
exists, err := c.Repo.ObjectCleanupRepo.DoesTempObjectExist(ctx, fileObjectKey, row.LatestBucket)
|
||||
if err != nil {
|
||||
return false, stacktrace.Propagate(err, "failed to check if duplicate request")
|
||||
}
|
||||
if exists {
|
||||
return false, nil
|
||||
}
|
||||
data, dataErr := c.Repo.GetFilesData(ctx, row.Type, []int64{row.FileID})
|
||||
if dataErr != nil {
|
||||
return false, stacktrace.Propagate(dataErr, "failed to get files data")
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
if len(data) > 1 {
|
||||
return false, stacktrace.NewError("multiple rows found for fileID %d", row.FileID)
|
||||
}
|
||||
if data[0].LatestBucket == row.LatestBucket &&
|
||||
data[0].ObjectID != nil && *data[0].ObjectID == *row.ObjectID &&
|
||||
data[0].ObjectSize != nil && *data[0].ObjectSize == *row.ObjectSize {
|
||||
log.WithField("fileID", row.FileID).WithField("objectID", row.ObjectID).
|
||||
Info("duplicate put request detected")
|
||||
return true, nil
|
||||
} else {
|
||||
log.WithField("fileID", row.FileID).WithField("objectID", row.ObjectID).
|
||||
Info("duplicate put request not detected, existing data does not match")
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -57,6 +57,19 @@ func (repo *ObjectCleanupRepository) RemoveTempObjectFromDC(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (repo *ObjectCleanupRepository) DoesTempObjectExist(ctx context.Context, objectKey string, uploadID string) (bool, error) {
|
||||
var exists bool
|
||||
query := `SELECT EXISTS(SELECT 1 FROM temp_objects WHERE object_key = $1 AND upload_id = $2)`
|
||||
err := repo.DB.QueryRowContext(ctx, query, objectKey, uploadID).Scan(&exists)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false, nil
|
||||
}
|
||||
return false, stacktrace.Propagate(err, "failed to check if temp object exists")
|
||||
}
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
// GetExpiredObjects returns the list of object keys that have expired
|
||||
func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente.TempObject, error) {
|
||||
tx, err := repo.DB.Begin()
|
||||
|
||||
Reference in New Issue
Block a user