[server] Retry for size fetch & parallelize check (#5155)
## Description ## Tests
This commit is contained in:
@@ -243,13 +243,14 @@ func main() {
|
||||
)
|
||||
|
||||
usageController := &controller.UsageController{
|
||||
BillingCtrl: billingController,
|
||||
StorageBonusCtrl: storageBonusCtrl,
|
||||
UserCacheCtrl: userCacheCtrl,
|
||||
UsageRepo: usageRepo,
|
||||
UserRepo: userRepo,
|
||||
FamilyRepo: familyRepo,
|
||||
FileRepo: fileRepo,
|
||||
BillingCtrl: billingController,
|
||||
StorageBonusCtrl: storageBonusCtrl,
|
||||
UserCacheCtrl: userCacheCtrl,
|
||||
UsageRepo: usageRepo,
|
||||
UserRepo: userRepo,
|
||||
FamilyRepo: familyRepo,
|
||||
FileRepo: fileRepo,
|
||||
UploadResultCache: make(map[int64]bool),
|
||||
}
|
||||
|
||||
accessCtrl := access.NewAccessController(collectionRepo, fileRepo)
|
||||
|
||||
@@ -24,6 +24,12 @@ var ErrIncorrectTOTP = errors.New("incorrect TOTP")
|
||||
// ErrNotFound is returned when the requested resource was not found
|
||||
var ErrNotFound = errors.New("not found")
|
||||
|
||||
var ErrCollectionDeleted = &ApiError{
|
||||
Code: "COLLECTION_DELETED",
|
||||
Message: "",
|
||||
HttpStatusCode: http.StatusNotFound,
|
||||
}
|
||||
|
||||
var ErrFileLimitReached = errors.New("file limit reached")
|
||||
|
||||
// ErrBadRequest is returned when a bad request is encountered
|
||||
@@ -153,6 +159,12 @@ var ErrNotFoundError = ApiError{
|
||||
HttpStatusCode: http.StatusNotFound,
|
||||
}
|
||||
|
||||
var ErrObjSizeFetchFailed = &ApiError{
|
||||
Code: "OBJECT_SIZE_FETCH_FAILED",
|
||||
Message: "",
|
||||
HttpStatusCode: http.StatusServiceUnavailable,
|
||||
}
|
||||
|
||||
var ErrUserNotFound = &ApiError{
|
||||
Code: "USER_NOT_FOUND",
|
||||
Message: "User is either deleted or not found",
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
gTime "time"
|
||||
|
||||
"github.com/ente-io/museum/pkg/controller/discord"
|
||||
"github.com/ente-io/museum/pkg/utils/network"
|
||||
@@ -96,7 +97,7 @@ func (c *FileController) validateFileCreateOrUpdateReq(userID int64, file ente.F
|
||||
return stacktrace.Propagate(ente.ErrPermissionDenied, "collection doesn't belong to user")
|
||||
}
|
||||
if collection.IsDeleted {
|
||||
return stacktrace.Propagate(ente.ErrNotFound, "collection has been deleted")
|
||||
return stacktrace.Propagate(ente.ErrCollectionDeleted, "collection has been deleted")
|
||||
}
|
||||
if file.OwnerID != userID {
|
||||
return stacktrace.Propagate(ente.ErrPermissionDenied, "file ownerID doesn't match with userID")
|
||||
@@ -106,20 +107,43 @@ func (c *FileController) validateFileCreateOrUpdateReq(userID int64, file ente.F
|
||||
return nil
|
||||
}
|
||||
|
||||
type sizeResult struct {
|
||||
size int64
|
||||
err error
|
||||
}
|
||||
|
||||
// Create adds an entry for a file in the respective tables
|
||||
func (c *FileController) Create(ctx *gin.Context, userID int64, file ente.File, userAgent string, app ente.App) (ente.File, error) {
|
||||
fileChan := make(chan sizeResult)
|
||||
thumbChan := make(chan sizeResult)
|
||||
go func() {
|
||||
size, err := c.sizeOf(file.File.ObjectKey)
|
||||
fileChan <- sizeResult{size, err}
|
||||
}()
|
||||
go func() {
|
||||
size, err := c.sizeOf(file.Thumbnail.ObjectKey)
|
||||
thumbChan <- sizeResult{size, err}
|
||||
}()
|
||||
err := c.validateFileCreateOrUpdateReq(userID, file)
|
||||
if err != nil {
|
||||
return file, stacktrace.Propagate(err, "")
|
||||
}
|
||||
// Receive results from both operations
|
||||
fileResult := <-fileChan
|
||||
thumbResult := <-thumbChan
|
||||
|
||||
hotDC := c.S3Config.GetHotDataCenter()
|
||||
// sizeOf will do also HEAD check to ensure that the object exists in the
|
||||
// current hot DC
|
||||
fileSize, err := c.sizeOf(file.File.ObjectKey)
|
||||
if err != nil {
|
||||
|
||||
if fileResult.err != nil {
|
||||
log.Error("Could not find size of file: " + file.File.ObjectKey)
|
||||
return file, stacktrace.Propagate(err, "")
|
||||
return file, stacktrace.Propagate(ente.ErrObjSizeFetchFailed, fileResult.err.Error())
|
||||
}
|
||||
if thumbResult.err != nil {
|
||||
log.Error("Could not find size of thumbnail: " + file.Thumbnail.ObjectKey)
|
||||
return file, stacktrace.Propagate(ente.ErrObjSizeFetchFailed, thumbResult.err.Error())
|
||||
}
|
||||
fileSize := fileResult.size
|
||||
thumbnailSize := thumbResult.size
|
||||
if fileSize > MaxFileSize {
|
||||
return file, stacktrace.Propagate(ente.ErrFileTooLarge, "")
|
||||
}
|
||||
@@ -127,7 +151,6 @@ func (c *FileController) Create(ctx *gin.Context, userID int64, file ente.File,
|
||||
return file, stacktrace.Propagate(ente.ErrBadRequest, "mismatch in file size")
|
||||
}
|
||||
file.File.Size = fileSize
|
||||
thumbnailSize, err := c.sizeOf(file.Thumbnail.ObjectKey)
|
||||
if err != nil {
|
||||
log.Error("Could not find size of thumbnail: " + file.Thumbnail.ObjectKey)
|
||||
return file, stacktrace.Propagate(err, "")
|
||||
@@ -806,14 +829,23 @@ func (c *FileController) getPreSignedURLForDC(objectKey string, dc string) (stri
|
||||
|
||||
func (c *FileController) sizeOf(objectKey string) (int64, error) {
|
||||
s3Client := c.S3Config.GetHotS3Client()
|
||||
head, err := s3Client.HeadObject(&s3.HeadObjectInput{
|
||||
Key: &objectKey,
|
||||
Bucket: c.S3Config.GetHotBucket(),
|
||||
})
|
||||
if err != nil {
|
||||
return -1, stacktrace.Propagate(err, "")
|
||||
bucket := c.S3Config.GetHotBucket()
|
||||
var head *s3.HeadObjectOutput
|
||||
var err error
|
||||
// Retry twice with a delay of 500ms and 1000ms
|
||||
for i := 0; i < 3; i++ {
|
||||
head, err = s3Client.HeadObject(&s3.HeadObjectInput{
|
||||
Key: &objectKey,
|
||||
Bucket: bucket,
|
||||
})
|
||||
if err == nil {
|
||||
return *head.ContentLength, nil
|
||||
}
|
||||
if i < 2 {
|
||||
gTime.Sleep(gTime.Duration(500*(i+1)) * gTime.Millisecond)
|
||||
}
|
||||
}
|
||||
return *head.ContentLength, nil
|
||||
return -1, stacktrace.Propagate(err, "")
|
||||
}
|
||||
|
||||
func (c *FileController) onDuplicateObjectDetected(ctx *gin.Context, file ente.File, existing ente.File, hotDC string) (ente.File, error) {
|
||||
|
||||
@@ -142,7 +142,7 @@ func (c *Controller) GetFileData(ctx *gin.Context, req fileData.GetFileData) (*f
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
}
|
||||
if len(doRows) == 0 || doRows[0].IsDeleted {
|
||||
return nil, stacktrace.Propagate(ente.ErrNotFound, "")
|
||||
return nil, stacktrace.Propagate(&ente.ErrNotFoundError, "")
|
||||
}
|
||||
ctxLogger := log.WithFields(log.Fields{
|
||||
"objectKey": doRows[0].S3FileMetadataObjectKey(),
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ente-io/museum/ente"
|
||||
bonus "github.com/ente-io/museum/ente/storagebonus"
|
||||
@@ -15,20 +16,47 @@ import (
|
||||
|
||||
// UsageController exposes functions which can be used to check around storage
|
||||
type UsageController struct {
|
||||
BillingCtrl *BillingController
|
||||
StorageBonusCtrl *storagebonus.Controller
|
||||
UserCacheCtrl *usercache.Controller
|
||||
UsageRepo *repo.UsageRepository
|
||||
UserRepo *repo.UserRepository
|
||||
FamilyRepo *repo.FamilyRepository
|
||||
FileRepo *repo.FileRepository
|
||||
mu sync.Mutex
|
||||
BillingCtrl *BillingController
|
||||
StorageBonusCtrl *storagebonus.Controller
|
||||
UserCacheCtrl *usercache.Controller
|
||||
UsageRepo *repo.UsageRepository
|
||||
UserRepo *repo.UserRepository
|
||||
FamilyRepo *repo.FamilyRepository
|
||||
FileRepo *repo.FileRepository
|
||||
UploadResultCache map[int64]bool
|
||||
}
|
||||
|
||||
const MaxLockerFiles = 10000
|
||||
const hundredMBInBytes = 100 * 1024 * 1024
|
||||
|
||||
// CanUploadFile returns error if the file of given size (with StorageOverflowAboveSubscriptionLimit buffer) can be
|
||||
// uploaded or not. If size is not passed, it validates if current usage is less than subscription storage.
|
||||
func (c *UsageController) CanUploadFile(ctx context.Context, userID int64, size *int64, app ente.App) error {
|
||||
// check if size is nil or less than 100 MB
|
||||
if app != ente.Locker && (size == nil || *size < hundredMBInBytes) {
|
||||
c.mu.Lock()
|
||||
canUpload, ok := c.UploadResultCache[userID]
|
||||
c.mu.Unlock()
|
||||
if ok && canUpload {
|
||||
go func() {
|
||||
_ = c.checkAndUpdateCache(ctx, userID, size, app)
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return c.checkAndUpdateCache(ctx, userID, size, app)
|
||||
}
|
||||
|
||||
func (c *UsageController) checkAndUpdateCache(ctx context.Context, userID int64, size *int64, app ente.App) error {
|
||||
err := c.canUploadFile(ctx, userID, size, app)
|
||||
c.mu.Lock()
|
||||
c.UploadResultCache[userID] = err == nil
|
||||
c.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *UsageController) canUploadFile(ctx context.Context, userID int64, size *int64, app ente.App) error {
|
||||
// If app is Locker, limit to MaxLockerFiles files
|
||||
if app == ente.Locker {
|
||||
// Get file count
|
||||
@@ -113,7 +141,6 @@ func (c *UsageController) CanUploadFile(ctx context.Context, userID int64, size
|
||||
|
||||
// Get particular member's storage and check if the file size is larger than the size of the storage allocated
|
||||
// to the Member and fail if its too large.
|
||||
|
||||
if subscriptionAdminID != userID && memberStorageLimit != nil {
|
||||
memberUsage, memberUsageErr := c.UsageRepo.GetUsage(userID)
|
||||
if memberUsageErr != nil {
|
||||
|
||||
Reference in New Issue
Block a user