Store bucketID for temp objects

This commit is contained in:
Neeraj Gupta
2024-08-06 17:01:06 +05:30
parent 84fa8f343b
commit 98a6bf9164
6 changed files with 68 additions and 57 deletions

View File

@@ -202,7 +202,7 @@ type TempObject struct {
ObjectKey string
IsMultipart bool
UploadID string
DataCenter string
BucketId string
}
// DuplicateFiles represents duplicate files

View File

@@ -1,3 +1,4 @@
ALTER TABLE temp_objects ADD COLUMN IF NOT EXISTS bucket_id s3region;
ALTER TYPE OBJECT_TYPE ADD VALUE 'derivedMeta';
ALTER TYPE s3region ADD VALUE 'b5';
-- Create the derived table

View File

@@ -122,54 +122,6 @@ func (h *FileHandler) GetUploadURLs(c *gin.Context) {
})
}
func (h *FileHandler) GetVideoUploadURL(c *gin.Context) {
enteApp := auth.GetApp(c)
userID, fileID := getUserAndFileIDs(c)
urls, err := h.Controller.GetVideoUploadUrl(c, userID, fileID, enteApp)
if err != nil {
handler.Error(c, stacktrace.Propagate(err, ""))
return
}
c.JSON(http.StatusOK, urls)
}
func (h *FileHandler) GetVideoPreviewUrl(c *gin.Context) {
userID, fileID := getUserAndFileIDs(c)
url, err := h.Controller.GetPreviewUrl(c, userID, fileID)
if err != nil {
handler.Error(c, stacktrace.Propagate(err, ""))
return
}
c.JSON(http.StatusOK, gin.H{
"url": url,
})
}
func (h *FileHandler) ReportVideoPlayList(c *gin.Context) {
var request ente.InsertOrUpdateEmbeddingRequest
if err := c.ShouldBindJSON(&request); err != nil {
handler.Error(c,
stacktrace.Propagate(ente.ErrBadRequest, fmt.Sprintf("Request binding failed %s", err)))
return
}
err := h.Controller.ReportVideoPreview(c, request)
if err != nil {
handler.Error(c, stacktrace.Propagate(err, ""))
return
}
c.Status(http.StatusOK)
}
func (h *FileHandler) GetVideoPlaylist(c *gin.Context) {
fileID, _ := strconv.ParseInt(c.Param("fileID"), 10, 64)
response, err := h.Controller.GetPlaylist(c, fileID)
if err != nil {
handler.Error(c, stacktrace.Propagate(err, ""))
return
}
c.JSON(http.StatusOK, response)
}
// GetMultipartUploadURLs returns an array of PartUpload PresignedURLs
func (h *FileHandler) GetMultipartUploadURLs(c *gin.Context) {
enteApp := auth.GetApp(c)

View File

@@ -1,11 +1,15 @@
package api
import (
"fmt"
"github.com/ente-io/museum/ente"
fileData "github.com/ente-io/museum/ente/filedata"
"github.com/ente-io/museum/pkg/utils/auth"
"github.com/ente-io/museum/pkg/utils/handler"
"github.com/ente-io/stacktrace"
"github.com/gin-gonic/gin"
"net/http"
"strconv"
)
func (f *FileHandler) PutFileData(ctx *gin.Context) {
@@ -45,3 +49,51 @@ func (f *FileHandler) GetFilesData(ctx *gin.Context) {
}
ctx.JSON(http.StatusOK, resp)
}
func (h *FileHandler) GetVideoUploadURL(c *gin.Context) {
enteApp := auth.GetApp(c)
userID, fileID := getUserAndFileIDs(c)
urls, err := h.Controller.GetVideoUploadUrl(c, userID, fileID, enteApp)
if err != nil {
handler.Error(c, stacktrace.Propagate(err, ""))
return
}
c.JSON(http.StatusOK, urls)
}
func (h *FileHandler) GetVideoPreviewUrl(c *gin.Context) {
userID, fileID := getUserAndFileIDs(c)
url, err := h.Controller.GetPreviewUrl(c, userID, fileID)
if err != nil {
handler.Error(c, stacktrace.Propagate(err, ""))
return
}
c.JSON(http.StatusOK, gin.H{
"url": url,
})
}
func (h *FileHandler) ReportVideoPlayList(c *gin.Context) {
var request ente.InsertOrUpdateEmbeddingRequest
if err := c.ShouldBindJSON(&request); err != nil {
handler.Error(c,
stacktrace.Propagate(ente.ErrBadRequest, fmt.Sprintf("Request binding failed %s", err)))
return
}
err := h.Controller.ReportVideoPreview(c, request)
if err != nil {
handler.Error(c, stacktrace.Propagate(err, ""))
return
}
c.Status(http.StatusOK)
}
func (h *FileHandler) GetVideoPlaylist(c *gin.Context) {
fileID, _ := strconv.ParseInt(c.Param("fileID"), 10, 64)
response, err := h.Controller.GetPlaylist(c, fileID)
if err != nil {
handler.Error(c, stacktrace.Propagate(err, ""))
return
}
c.JSON(http.StatusOK, response)
}

View File

@@ -166,8 +166,10 @@ func (c *ObjectCleanupController) removeUnreportedObjects() int {
func (c *ObjectCleanupController) removeUnreportedObject(tx *sql.Tx, t ente.TempObject) error {
// TODO: object_cleanup
// This should use the DC from TempObject (once we start persisting it)
// dc := t.DataCenter
dc := c.S3Config.GetHotDataCenter()
dc := t.BucketId
if dc == "" {
dc = c.S3Config.GetHotDataCenter()
}
logger := log.WithFields(log.Fields{
"task": "remove-unreported-objects",
@@ -232,7 +234,7 @@ func (c *ObjectCleanupController) addCleanupEntryForObjectKey(objectKey string,
err := c.Repo.AddTempObject(ente.TempObject{
ObjectKey: objectKey,
IsMultipart: false,
DataCenter: dc,
BucketId: dc,
}, expirationTime)
return stacktrace.Propagate(err, "")
}
@@ -247,7 +249,7 @@ func (c *ObjectCleanupController) AddMultipartTempObjectKey(objectKey string, up
ObjectKey: objectKey,
IsMultipart: true,
UploadID: uploadID,
DataCenter: dc,
BucketId: dc,
}, expiry)
return stacktrace.Propagate(err, "")
}

View File

@@ -25,8 +25,8 @@ type ObjectCleanupRepository struct {
func (repo *ObjectCleanupRepository) AddTempObject(tempObject ente.TempObject, expirationTime int64) error {
var err error
if tempObject.IsMultipart {
_, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time,upload_id,is_multipart)
VALUES($1, $2, $3, $4)`, tempObject.ObjectKey, expirationTime, tempObject.UploadID, tempObject.IsMultipart)
_, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time,upload_id,is_multipart, bucket_id)
VALUES($1, $2, $3, $4)`, tempObject.ObjectKey, expirationTime, tempObject.UploadID, tempObject.IsMultipart, tempObject.BucketId)
} else {
_, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time)
VALUES($1, $2)`, tempObject.ObjectKey, expirationTime)
@@ -62,7 +62,7 @@ func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente
}
rows, err := tx.Query(`
SELECT object_key, is_multipart, upload_id FROM temp_objects
SELECT object_key, is_multipart, upload_id, bucket_id FROM temp_objects
WHERE expiration_time <= $1
LIMIT 1000
FOR UPDATE SKIP LOCKED
@@ -83,7 +83,8 @@ func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente
for rows.Next() {
var tempObject ente.TempObject
var uploadID sql.NullString
err := rows.Scan(&tempObject.ObjectKey, &tempObject.IsMultipart, &uploadID)
var bucketID sql.NullString
err := rows.Scan(&tempObject.ObjectKey, &tempObject.IsMultipart, &uploadID, &bucketID)
if err != nil {
rollback()
return nil, nil, stacktrace.Propagate(err, "")
@@ -91,6 +92,9 @@ func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente
if tempObject.IsMultipart {
tempObject.UploadID = uploadID.String
}
if bucketID.Valid {
tempObject.BucketId = bucketID.String
}
tempObjects = append(tempObjects, tempObject)
}
return tx, tempObjects, nil