From 98a6bf91648e052ae8e33b327cf2675b317e0511 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Tue, 6 Aug 2024 17:01:06 +0530 Subject: [PATCH] Store bucketID for temp objects --- server/ente/file.go | 2 +- server/migrations/89_file_data_table.up.sql | 1 + server/pkg/api/file.go | 48 ------------------- server/pkg/api/file_data.go | 52 +++++++++++++++++++++ server/pkg/controller/object_cleanup.go | 10 ++-- server/pkg/repo/object_cleanup.go | 12 +++-- 6 files changed, 68 insertions(+), 57 deletions(-) diff --git a/server/ente/file.go b/server/ente/file.go index 8a554b6b1b..1e00c7f258 100644 --- a/server/ente/file.go +++ b/server/ente/file.go @@ -202,7 +202,7 @@ type TempObject struct { ObjectKey string IsMultipart bool UploadID string - DataCenter string + BucketId string } // DuplicateFiles represents duplicate files diff --git a/server/migrations/89_file_data_table.up.sql b/server/migrations/89_file_data_table.up.sql index 3012cea296..49c9704ee6 100644 --- a/server/migrations/89_file_data_table.up.sql +++ b/server/migrations/89_file_data_table.up.sql @@ -1,3 +1,4 @@ +ALTER TABLE temp_objects ADD COLUMN IF NOT EXISTS bucket_id s3region; ALTER TYPE OBJECT_TYPE ADD VALUE 'derivedMeta'; ALTER TYPE s3region ADD VALUE 'b5'; -- Create the derived table diff --git a/server/pkg/api/file.go b/server/pkg/api/file.go index 2d00b8eb53..2e15ade325 100644 --- a/server/pkg/api/file.go +++ b/server/pkg/api/file.go @@ -122,54 +122,6 @@ func (h *FileHandler) GetUploadURLs(c *gin.Context) { }) } -func (h *FileHandler) GetVideoUploadURL(c *gin.Context) { - enteApp := auth.GetApp(c) - userID, fileID := getUserAndFileIDs(c) - urls, err := h.Controller.GetVideoUploadUrl(c, userID, fileID, enteApp) - if err != nil { - handler.Error(c, stacktrace.Propagate(err, "")) - return - } - c.JSON(http.StatusOK, urls) -} - -func (h *FileHandler) GetVideoPreviewUrl(c *gin.Context) { - userID, fileID := getUserAndFileIDs(c) - url, err := h.Controller.GetPreviewUrl(c, userID, fileID) - if err != nil { - handler.Error(c, stacktrace.Propagate(err, "")) - return - } - c.JSON(http.StatusOK, gin.H{ - "url": url, - }) -} - -func (h *FileHandler) ReportVideoPlayList(c *gin.Context) { - var request ente.InsertOrUpdateEmbeddingRequest - if err := c.ShouldBindJSON(&request); err != nil { - handler.Error(c, - stacktrace.Propagate(ente.ErrBadRequest, fmt.Sprintf("Request binding failed %s", err))) - return - } - err := h.Controller.ReportVideoPreview(c, request) - if err != nil { - handler.Error(c, stacktrace.Propagate(err, "")) - return - } - c.Status(http.StatusOK) -} - -func (h *FileHandler) GetVideoPlaylist(c *gin.Context) { - fileID, _ := strconv.ParseInt(c.Param("fileID"), 10, 64) - response, err := h.Controller.GetPlaylist(c, fileID) - if err != nil { - handler.Error(c, stacktrace.Propagate(err, "")) - return - } - c.JSON(http.StatusOK, response) -} - // GetMultipartUploadURLs returns an array of PartUpload PresignedURLs func (h *FileHandler) GetMultipartUploadURLs(c *gin.Context) { enteApp := auth.GetApp(c) diff --git a/server/pkg/api/file_data.go b/server/pkg/api/file_data.go index 73ddeb6f27..50015c3d4d 100644 --- a/server/pkg/api/file_data.go +++ b/server/pkg/api/file_data.go @@ -1,11 +1,15 @@ package api import ( + "fmt" "github.com/ente-io/museum/ente" fileData "github.com/ente-io/museum/ente/filedata" + "github.com/ente-io/museum/pkg/utils/auth" "github.com/ente-io/museum/pkg/utils/handler" + "github.com/ente-io/stacktrace" "github.com/gin-gonic/gin" "net/http" + "strconv" ) func (f *FileHandler) PutFileData(ctx *gin.Context) { @@ -45,3 +49,51 @@ func (f *FileHandler) GetFilesData(ctx *gin.Context) { } ctx.JSON(http.StatusOK, resp) } + +func (h *FileHandler) GetVideoUploadURL(c *gin.Context) { + enteApp := auth.GetApp(c) + userID, fileID := getUserAndFileIDs(c) + urls, err := h.Controller.GetVideoUploadUrl(c, userID, fileID, enteApp) + if err != nil { + handler.Error(c, stacktrace.Propagate(err, "")) + return + } + c.JSON(http.StatusOK, urls) +} + +func (h *FileHandler) GetVideoPreviewUrl(c *gin.Context) { + userID, fileID := getUserAndFileIDs(c) + url, err := h.Controller.GetPreviewUrl(c, userID, fileID) + if err != nil { + handler.Error(c, stacktrace.Propagate(err, "")) + return + } + c.JSON(http.StatusOK, gin.H{ + "url": url, + }) +} + +func (h *FileHandler) ReportVideoPlayList(c *gin.Context) { + var request ente.InsertOrUpdateEmbeddingRequest + if err := c.ShouldBindJSON(&request); err != nil { + handler.Error(c, + stacktrace.Propagate(ente.ErrBadRequest, fmt.Sprintf("Request binding failed %s", err))) + return + } + err := h.Controller.ReportVideoPreview(c, request) + if err != nil { + handler.Error(c, stacktrace.Propagate(err, "")) + return + } + c.Status(http.StatusOK) +} + +func (h *FileHandler) GetVideoPlaylist(c *gin.Context) { + fileID, _ := strconv.ParseInt(c.Param("fileID"), 10, 64) + response, err := h.Controller.GetPlaylist(c, fileID) + if err != nil { + handler.Error(c, stacktrace.Propagate(err, "")) + return + } + c.JSON(http.StatusOK, response) +} diff --git a/server/pkg/controller/object_cleanup.go b/server/pkg/controller/object_cleanup.go index 91426cb56c..9a6a6055ce 100644 --- a/server/pkg/controller/object_cleanup.go +++ b/server/pkg/controller/object_cleanup.go @@ -166,8 +166,10 @@ func (c *ObjectCleanupController) removeUnreportedObjects() int { func (c *ObjectCleanupController) removeUnreportedObject(tx *sql.Tx, t ente.TempObject) error { // TODO: object_cleanup // This should use the DC from TempObject (once we start persisting it) - // dc := t.DataCenter - dc := c.S3Config.GetHotDataCenter() + dc := t.BucketId + if dc == "" { + dc = c.S3Config.GetHotDataCenter() + } logger := log.WithFields(log.Fields{ "task": "remove-unreported-objects", @@ -232,7 +234,7 @@ func (c *ObjectCleanupController) addCleanupEntryForObjectKey(objectKey string, err := c.Repo.AddTempObject(ente.TempObject{ ObjectKey: objectKey, IsMultipart: false, - DataCenter: dc, + BucketId: dc, }, expirationTime) return stacktrace.Propagate(err, "") } @@ -247,7 +249,7 @@ func (c *ObjectCleanupController) AddMultipartTempObjectKey(objectKey string, up ObjectKey: objectKey, IsMultipart: true, UploadID: uploadID, - DataCenter: dc, + BucketId: dc, }, expiry) return stacktrace.Propagate(err, "") } diff --git a/server/pkg/repo/object_cleanup.go b/server/pkg/repo/object_cleanup.go index 7074121381..b78910d052 100644 --- a/server/pkg/repo/object_cleanup.go +++ b/server/pkg/repo/object_cleanup.go @@ -25,8 +25,8 @@ type ObjectCleanupRepository struct { func (repo *ObjectCleanupRepository) AddTempObject(tempObject ente.TempObject, expirationTime int64) error { var err error if tempObject.IsMultipart { - _, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time,upload_id,is_multipart) - VALUES($1, $2, $3, $4)`, tempObject.ObjectKey, expirationTime, tempObject.UploadID, tempObject.IsMultipart) + _, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time,upload_id,is_multipart, bucket_id) + VALUES($1, $2, $3, $4)`, tempObject.ObjectKey, expirationTime, tempObject.UploadID, tempObject.IsMultipart, tempObject.BucketId) } else { _, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time) VALUES($1, $2)`, tempObject.ObjectKey, expirationTime) @@ -62,7 +62,7 @@ func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente } rows, err := tx.Query(` - SELECT object_key, is_multipart, upload_id FROM temp_objects + SELECT object_key, is_multipart, upload_id, bucket_id FROM temp_objects WHERE expiration_time <= $1 LIMIT 1000 FOR UPDATE SKIP LOCKED @@ -83,7 +83,8 @@ func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente for rows.Next() { var tempObject ente.TempObject var uploadID sql.NullString - err := rows.Scan(&tempObject.ObjectKey, &tempObject.IsMultipart, &uploadID) + var bucketID sql.NullString + err := rows.Scan(&tempObject.ObjectKey, &tempObject.IsMultipart, &uploadID, &bucketID) if err != nil { rollback() return nil, nil, stacktrace.Propagate(err, "") @@ -91,6 +92,9 @@ func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente if tempObject.IsMultipart { tempObject.UploadID = uploadID.String } + if bucketID.Valid { + tempObject.BucketId = bucketID.String + } tempObjects = append(tempObjects, tempObject) } return tx, tempObjects, nil