[server] Support for storing preview files (#4226)

## Description
- This change introduced the concept of associated object for a file.
- Added additional columns for object_id, object_nonce, and object_size.

Depending upon data_type, the values of certain columns will be nil.
The original size column will reflect total size for that particular
type. In case of vid_preview, it's size of the playlist + size of the
preview video.



## Tests

- [x] Replication
- [x] Test Deletion post replication
This commit is contained in:
Neeraj Gupta
2024-12-19 10:06:15 +05:30
committed by GitHub
19 changed files with 505 additions and 130 deletions

1
server/.gitignore vendored
View File

@@ -8,3 +8,4 @@ tmp/**
museum.yaml
bin/**
data/
__debug_bin*

View File

@@ -166,7 +166,7 @@ func main() {
fileRepo := &repo.FileRepository{DB: db, S3Config: s3Config, QueueRepo: queueRepo,
ObjectRepo: objectRepo, ObjectCleanupRepo: objectCleanupRepo,
ObjectCopiesRepo: objectCopiesRepo, UsageRepo: usageRepo}
fileDataRepo := &fileDataRepo.Repository{DB: db}
fileDataRepo := &fileDataRepo.Repository{DB: db, ObjectCleanupRepo: objectCleanupRepo}
familyRepo := &repo.FamilyRepository{DB: db}
trashRepo := &repo.TrashRepository{DB: db, ObjectRepo: objectRepo, FileRepo: fileRepo, QueueRepo: queueRepo}
publicCollectionRepo := repo.NewPublicCollectionRepository(db, viper.GetString("apps.public-albums"))
@@ -428,6 +428,7 @@ func main() {
privateAPI.GET("/files/preview/v2/:fileID", fileHandler.GetThumbnail)
privateAPI.PUT("/files/data", fileHandler.PutFileData)
privateAPI.PUT("/files/video-data", fileHandler.PutVideoData)
privateAPI.POST("/files/data/status-diff", fileHandler.FileDataStatusDiff)
privateAPI.POST("/files/data/fetch", fileHandler.GetFilesData)
privateAPI.GET("/files/data/fetch", fileHandler.GetFileData)

View File

@@ -30,6 +30,14 @@ func NewID(prefix string) (*string, error) {
return &result, nil
}
func MustNewID(prefix string) string {
id, err := NewID(prefix)
if err != nil {
panic(err)
}
return *id
}
func ServerReqID() string {
// Generate a nanoid with a custom alphabet and length of 22
id, err := NewID("ser")

View File

@@ -2,9 +2,25 @@ package filedata
import (
"fmt"
"github.com/ente-io/museum/ente"
)
/*
We store three types of derived data from a file, whose information is stored in the file_data table.
Each derived data can have multiple objects, and each object is stored in the S3 bucket.
1) MLData: This is the derived data from the file that is used for machine learning purposes.There's only
one object for S3FileMetadata type.
2) PreviewVideo: This is the derived data from the file that is used for previewing the video. This contains two objects.
2.1) One object of type S3FileMetadata that contains the encrypted HLS playlist.
2.2) Second object contains the encrypted video. The objectKey for this object is derived via ObjectKey function. The OG size column in the file_data
contains sum of S3Metadata object size and the video object size. The object size is stored in the ObjectSize column.
3) PreviewImage: This is the derived data from the file that is used for previewing the image. This just contain one object.
The objectKey for this object is derived via ObjectKey function. We also store the nonce of the object in the ObjectNonce column.
ObjectNonce is not stored for PreviewVideo type as HLS playlist contains a random key, that's only used once to encrypt the video with default nonce.
*/
type Entity struct {
FileID int64 `json:"fileID"`
Type ente.ObjectType `json:"type"`
@@ -13,16 +29,18 @@ type Entity struct {
}
type FDDiffRequest struct {
LastUpdatedAt *int64 `form:"lastUpdated" binding:"required"`
LastUpdatedAt *int64 `form:"lastUpdatedAt"`
}
type FDStatus struct {
FileID int64 `json:"fileID" binding:"required"`
UserID int64 `json:"userID" binding:"required"`
Type ente.ObjectType `json:"type" binding:"required"`
IsDeleted bool `json:"isDeleted" binding:"required"`
Size int64 `json:"size" binding:"required"`
UpdatedAt int64 `json:"updatedAt" binding:"required"`
FileID int64 `json:"fileID" binding:"required"`
UserID int64 `json:"userID" binding:"required"`
Type ente.ObjectType `json:"type" binding:"required"`
IsDeleted bool `json:"isDeleted" binding:"required"`
ObjectID *string `json:"objectID"`
ObjectNonce *string `json:"objectNonce"`
Size int64 `json:"size" binding:"required"`
UpdatedAt int64 `json:"updatedAt" binding:"required"`
}
// GetFilesData should only be used for getting the preview video playlist and derived metadata.
@@ -74,6 +92,7 @@ type S3FileMetadata struct {
type GetPreviewURLRequest struct {
FileID int64 `form:"fileID" binding:"required"`
Type ente.ObjectType `form:"type" binding:"required"`
Suffix *string `form:"suffix"`
}
func (g *GetPreviewURLRequest) Validate() error {
@@ -88,6 +107,11 @@ type PreviewUploadUrlRequest struct {
Type ente.ObjectType `form:"type" binding:"required"`
}
type PreviewUploadUrl struct {
ObjectID string `json:"objectID" binding:"required"`
Url string `json:"url" binding:"required"`
}
func (g *PreviewUploadUrlRequest) Validate() error {
if g.Type != ente.PreviewVideo && g.Type != ente.PreviewImage {
return ente.NewBadRequestWithMessage(fmt.Sprintf("unsupported object type %s", g.Type))
@@ -101,8 +125,16 @@ type Row struct {
UserID int64
Type ente.ObjectType
// If a file type has multiple objects, then the size is the sum of all the objects.
Size int64
LatestBucket string
Size int64
LatestBucket string
ObjectID *string
// For HLS video object, there's no object nonce, all relevant data
// is stored in the metadata object that primarily contains the playlist.
ObjectNonce *string
// Size of the object that is stored in the S3 bucket.
// In case of HLS video, this points to the size of the encrypted video.
// The playlist size can be calculated by the size - objectSize.
ObjectSize *int64
ReplicatedBuckets []string
DeleteFromBuckets []string
InflightReplicas []string
@@ -115,21 +147,16 @@ type Row struct {
// S3FileMetadataObjectKey returns the object key for the metadata stored in the S3 bucket.
func (r *Row) S3FileMetadataObjectKey() string {
if r.Type == ente.MlData {
return derivedMetaPath(r.FileID, r.UserID)
}
if r.Type == ente.PreviewVideo {
return previewVideoPlaylist(r.FileID, r.UserID)
if r.Type == ente.MlData || r.Type == ente.PreviewVideo {
return ObjectMetadataKey(r.FileID, r.UserID, r.Type, r.ObjectID)
}
panic(fmt.Sprintf("S3FileMetadata should not be written for %s type", r.Type))
}
// GetS3FileObjectKey returns the object key for the file data stored in the S3 bucket.
func (r *Row) GetS3FileObjectKey() string {
if r.Type == ente.PreviewVideo {
return previewVideoPath(r.FileID, r.UserID)
} else if r.Type == ente.PreviewImage {
return previewImagePath(r.FileID, r.UserID)
if r.Type == ente.PreviewVideo || r.Type == ente.PreviewImage {
return ObjectKey(r.FileID, r.UserID, r.Type, r.ObjectID)
}
panic(fmt.Sprintf("unsupported object type %s", r.Type))
}

View File

@@ -2,7 +2,9 @@ package filedata
import (
"fmt"
"github.com/ente-io/museum/ente"
"github.com/ente-io/museum/ente/base"
)
// BasePrefix returns the base prefix for all objects related to a file. To check if the file data is deleted,
@@ -13,39 +15,48 @@ func BasePrefix(fileID int64, ownerID int64) string {
func AllObjects(fileID int64, ownerID int64, oType ente.ObjectType) []string {
switch oType {
case ente.PreviewVideo:
return []string{previewVideoPath(fileID, ownerID), previewVideoPlaylist(fileID, ownerID)}
case ente.MlData:
return []string{derivedMetaPath(fileID, ownerID)}
case ente.PreviewImage:
return []string{previewImagePath(fileID, ownerID)}
default:
// throw panic saying current object type is not supported
panic(fmt.Sprintf("object type %s is not supported", oType))
}
}
func PreviewUrl(fileID int64, ownerID int64, oType ente.ObjectType) string {
func ObjectKey(fileID int64, ownerID int64, oType ente.ObjectType, id *string) string {
switch oType {
case ente.PreviewVideo:
return previewVideoPath(fileID, ownerID)
return fmt.Sprintf("%s%s/%s", BasePrefix(fileID, ownerID), string(oType), *id)
case ente.PreviewImage:
return previewImagePath(fileID, ownerID)
return fmt.Sprintf("%s%s/%s", BasePrefix(fileID, ownerID), string(oType), *id)
default:
panic(fmt.Sprintf("object type %s is not supported", oType))
}
}
func previewVideoPath(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewVideo))
func ObjectMetadataKey(fileID int64, ownerID int64, oType ente.ObjectType, id *string) string {
switch oType {
case ente.PreviewVideo:
return fmt.Sprintf("%s_playlist", ObjectKey(fileID, ownerID, oType, id))
case ente.MlData:
return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(oType))
default:
panic(fmt.Sprintf("ObjectMetadata not supported for type %s", string(oType)))
}
}
func previewVideoPlaylist(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", previewVideoPath(fileID, ownerID), "_playlist.m3u8")
func DeletePrefix(fileID int64, ownerID int64, oType ente.ObjectType) string {
return fmt.Sprintf("%s%s/", BasePrefix(fileID, ownerID), string(oType))
}
func previewImagePath(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewImage))
func NewUploadID(oType ente.ObjectType) string {
if oType == ente.PreviewVideo {
return base.MustNewID("pv")
} else if oType == ente.PreviewImage {
return base.MustNewID("pi")
}
panic(fmt.Sprintf("object type %s is not supported", oType))
}
func derivedMetaPath(fileID int64, ownerID int64) string {

View File

@@ -10,33 +10,17 @@ type PutFileDataRequest struct {
Type ente.ObjectType `json:"type" binding:"required"`
EncryptedData *string `json:"encryptedData,omitempty"`
DecryptionHeader *string `json:"decryptionHeader,omitempty"`
// ObjectKey is the key of the object in the S3 bucket. This is needed while putting the object in the S3 bucket.
ObjectKey *string `json:"objectKey,omitempty"`
// size of the object that is being uploaded. This helps in checking the size of the object that is being uploaded.
ObjectSize *int64 `json:"objectSize,omitempty"`
Version *int `json:"version,omitempty"`
Version *int `json:"version,omitempty"`
}
func (r PutFileDataRequest) isEncDataPresent() bool {
return r.EncryptedData != nil && r.DecryptionHeader != nil && *r.EncryptedData != "" && *r.DecryptionHeader != ""
}
func (r PutFileDataRequest) isObjectDataPresent() bool {
return r.ObjectKey != nil && *r.ObjectKey != "" && r.ObjectSize != nil && *r.ObjectSize > 0
}
func (r PutFileDataRequest) Validate() error {
switch r.Type {
case ente.PreviewVideo:
if !r.isEncDataPresent() || !r.isObjectDataPresent() {
return ente.NewBadRequestWithMessage("object and metadata are required")
}
case ente.PreviewImage:
if !r.isObjectDataPresent() || r.isEncDataPresent() {
return ente.NewBadRequestWithMessage("object (only) data is required for preview image")
}
case ente.MlData:
if !r.isEncDataPresent() || r.isObjectDataPresent() {
if !r.isEncDataPresent() {
return ente.NewBadRequestWithMessage("encryptedData and decryptionHeader (only) are required for derived meta")
}
default:
@@ -44,23 +28,3 @@ func (r PutFileDataRequest) Validate() error {
}
return nil
}
func (r PutFileDataRequest) S3FileMetadataObjectKey(ownerID int64) string {
if r.Type == ente.MlData {
return derivedMetaPath(r.FileID, ownerID)
}
if r.Type == ente.PreviewVideo {
return previewVideoPlaylist(r.FileID, ownerID)
}
panic(fmt.Sprintf("S3FileMetadata should not be written for %s type", r.Type))
}
func (r PutFileDataRequest) S3FileObjectKey(ownerID int64) string {
if r.Type == ente.PreviewVideo {
return previewVideoPath(r.FileID, ownerID)
}
if r.Type == ente.PreviewImage {
return previewImagePath(r.FileID, ownerID)
}
panic(fmt.Sprintf("S3FileObjectKey should not be written for %s type", r.Type))
}

View File

@@ -0,0 +1,19 @@
package filedata
import "github.com/ente-io/museum/ente"
type VidPreviewRequest struct {
FileID int64 `json:"fileID" binding:"required"`
ObjectID string `json:"objectID" binding:"required"`
ObjectSize int64 `json:"objectSize" binding:"required"`
Playlist string `json:"playlist" binding:"required"`
PlayListHeader string `json:"playlistHeader" binding:"required"`
Version *int `json:"version"`
}
func (r VidPreviewRequest) Validate() error {
if r.Playlist == "" || r.PlayListHeader == "" {
return ente.NewBadRequestWithMessage("playlist and playListHeader are required for preview video")
}
return nil
}

View File

@@ -0,0 +1,4 @@
ALTER TABLE file_data
DROP COLUMN obj_id,
DROP COLUMN obj_nonce,
DROP COLUMN obj_size;

View File

@@ -0,0 +1,6 @@
ALTER TYPE OBJECT_TYPE ADD VALUE 'vid_preview';
ALTER TYPE OBJECT_TYPE ADD VALUE 'img_preview';
ALTER TABLE file_data
ADD COLUMN obj_id TEXT,
ADD COLUMN obj_nonce TEXT,
ADD COLUMN obj_size INTEGER;

View File

@@ -25,7 +25,7 @@ func (h *FileHandler) PutFileData(ctx *gin.Context) {
version := 1
reqInt.Version = &version
}
err := h.FileDataCtrl.InsertOrUpdate(ctx, &req)
err := h.FileDataCtrl.InsertOrUpdateMetadata(ctx, &req)
if err != nil {
handler.Error(ctx, err)
@@ -33,6 +33,28 @@ func (h *FileHandler) PutFileData(ctx *gin.Context) {
}
ctx.JSON(http.StatusOK, gin.H{})
}
func (h *FileHandler) PutVideoData(ctx *gin.Context) {
var req fileData.VidPreviewRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
handler.Error(ctx, ente.NewBadRequestWithMessage(err.Error()))
return
}
if err := req.Validate(); err != nil {
ctx.JSON(http.StatusBadRequest, err)
return
}
reqInt := &req
if reqInt.Version == nil {
version := 1
reqInt.Version = &version
}
err := h.FileDataCtrl.InsertVideoPreview(ctx, &req)
if err != nil {
handler.Error(ctx, err)
return
}
ctx.JSON(http.StatusOK, gin.H{})
}
func (h *FileHandler) GetFilesData(ctx *gin.Context) {
var req fileData.GetFilesData
@@ -72,7 +94,7 @@ func (h *FileHandler) FileDataStatusDiff(ctx *gin.Context) {
func (h *FileHandler) GetFileData(ctx *gin.Context) {
var req fileData.GetFileData
if err := ctx.ShouldBindJSON(&req); err != nil {
if err := ctx.ShouldBindQuery(&req); err != nil {
ctx.JSON(http.StatusBadRequest, ente.NewBadRequestWithMessage(err.Error()))
return
}
@@ -88,23 +110,21 @@ func (h *FileHandler) GetFileData(ctx *gin.Context) {
func (h *FileHandler) GetPreviewUploadURL(c *gin.Context) {
var request fileData.PreviewUploadUrlRequest
if err := c.ShouldBindJSON(&request); err != nil {
if err := c.ShouldBindQuery(&request); err != nil {
handler.Error(c, stacktrace.Propagate(ente.ErrBadRequest, fmt.Sprintf("Request binding failed %s", err)))
return
}
url, err := h.FileDataCtrl.PreviewUploadURL(c, request)
resp, err := h.FileDataCtrl.PreviewUploadURL(c, request)
if err != nil {
handler.Error(c, stacktrace.Propagate(err, ""))
return
}
c.JSON(http.StatusOK, gin.H{
"url": url,
})
c.JSON(http.StatusOK, resp)
}
func (h *FileHandler) GetPreviewURL(c *gin.Context) {
var request fileData.GetPreviewURLRequest
if err := c.ShouldBindJSON(&request); err != nil {
if err := c.ShouldBindQuery(&request); err != nil {
handler.Error(c, stacktrace.Propagate(ente.ErrBadRequest, fmt.Sprintf("Request binding failed %s", err)))
return
}

View File

@@ -19,7 +19,6 @@ import (
"github.com/ente-io/stacktrace"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"strings"
"sync"
gTime "time"
)
@@ -49,7 +48,8 @@ type Controller struct {
CollectionRepo *repo.CollectionRepository
downloadManagerCache map[string]*s3manager.Downloader
// for downloading objects from s3 for replication
workerURL string
workerURL string
tempStorage string
}
func New(repo *fileDataRepo.Repository,
@@ -57,7 +57,8 @@ func New(repo *fileDataRepo.Repository,
objectCleanupController *controller.ObjectCleanupController,
s3Config *s3config.S3Config,
fileRepo *repo.FileRepository,
collectionRepo *repo.CollectionRepository) *Controller {
collectionRepo *repo.CollectionRepository,
) *Controller {
embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetWasabiDerivedDC(), s3Config.GetDerivedStorageDataCenter(), "b5"}
cache := make(map[string]*s3manager.Downloader, len(embeddingDcs))
for i := range embeddingDcs {
@@ -75,7 +76,7 @@ func New(repo *fileDataRepo.Repository,
}
}
func (c *Controller) InsertOrUpdate(ctx *gin.Context, req *fileData.PutFileDataRequest) error {
func (c *Controller) InsertOrUpdateMetadata(ctx *gin.Context, req *fileData.PutFileDataRequest) error {
if err := req.Validate(); err != nil {
return stacktrace.Propagate(err, "validation failed")
}
@@ -84,22 +85,12 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req *fileData.PutFileDataR
if err != nil {
return stacktrace.Propagate(err, "")
}
if req.Type != ente.MlData && req.Type != ente.PreviewVideo {
if req.Type != ente.MlData {
return stacktrace.Propagate(ente.NewBadRequestWithMessage("unsupported object type "+string(req.Type)), "")
}
fileOwnerID := userID
bucketID := c.S3Config.GetBucketID(req.Type)
if req.Type == ente.PreviewVideo {
fileObjectKey := req.S3FileObjectKey(fileOwnerID)
if !strings.Contains(*req.ObjectKey, fileObjectKey) {
return stacktrace.Propagate(ente.NewBadRequestWithMessage("objectKey should contain the file object key"), "")
}
err = c.copyObject(*req.ObjectKey, fileObjectKey, bucketID)
if err != nil {
return err
}
}
objectKey := req.S3FileMetadataObjectKey(fileOwnerID)
objectKey := fileData.ObjectMetadataKey(req.FileID, fileOwnerID, req.Type, nil)
obj := fileData.S3FileMetadata{
Version: *req.Version,
EncryptedData: *req.EncryptedData,

View File

@@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/ente-io/museum/ente"
"github.com/ente-io/museum/ente/filedata"
fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata"
enteTime "github.com/ente-io/museum/pkg/utils/time"
@@ -57,7 +58,14 @@ func (c *Controller) tryDelete() error {
}
return err
}
err = c.deleteFileRow(*row)
if row.Type == ente.MlData {
err = c.deleteFileRow(*row)
} else if row.Type == ente.PreviewVideo {
err = c.deleteFileRowV2(*row)
} else {
log.Warningf("Unsupported object type for deletion: %s", row.Type)
return nil
}
if err != nil {
log.Errorf("Could not delete file data: %s", err)
return err
@@ -79,6 +87,9 @@ func (c *Controller) deleteFileRow(fileDataRow filedata.Row) error {
panic(fmt.Sprintf("file %d does not belong to user %d", fileID, ownerID))
}
ctxLogger := log.WithField("file_id", fileDataRow.DeleteFromBuckets).WithField("type", fileDataRow.Type).WithField("user_id", fileDataRow.UserID)
if fileDataRow.Type != ente.MlData {
panic(fmt.Sprintf("unsupported object type for filedata deletion %s", fileDataRow.Type))
}
objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type)
bucketColumnMap, err := getMapOfBucketItToColumn(fileDataRow)
if err != nil {
@@ -88,14 +99,14 @@ func (c *Controller) deleteFileRow(fileDataRow filedata.Row) error {
// Delete objects and remove buckets
for bucketID, columnName := range bucketColumnMap {
for _, objectKey := range objectKeys {
err := c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKey, bucketID)
if err != nil {
ctxLogger.WithError(err).WithFields(log.Fields{
delErr := c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKey, bucketID)
if delErr != nil {
ctxLogger.WithError(delErr).WithFields(log.Fields{
"bucketID": bucketID,
"column": columnName,
"objectKey": objectKey,
}).Error("Failed to delete object from datacenter")
return err
return delErr
}
}
dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, columnName)
@@ -124,6 +135,67 @@ func (c *Controller) deleteFileRow(fileDataRow filedata.Row) error {
return nil
}
func (c *Controller) deleteFileRowV2(fileDataRow filedata.Row) error {
if !fileDataRow.IsDeleted {
return fmt.Errorf("file %d is not marked as deleted", fileDataRow.FileID)
}
fileID := fileDataRow.FileID
ownerID, err := c.FileRepo.GetOwnerID(fileID)
if err != nil {
return err
}
if fileDataRow.UserID != ownerID {
// this should never happen
panic(fmt.Sprintf("file %d does not belong to user %d", fileID, ownerID))
}
ctxLogger := log.WithField("file_id", fileDataRow.DeleteFromBuckets).WithField("type", fileDataRow.Type).WithField("user_id", fileDataRow.UserID)
if fileDataRow.Type != ente.PreviewVideo {
panic(fmt.Sprintf("unsupported object type for filedata deletion %s", fileDataRow.Type))
}
delPrefix := filedata.DeletePrefix(fileID, ownerID, fileDataRow.Type)
bucketColumnMap, err := getMapOfBucketItToColumn(fileDataRow)
if err != nil {
ctxLogger.WithError(err).Error("Failed to get bucketColumnMap")
return err
}
// Delete objects and remove buckets
for bucketID, columnName := range bucketColumnMap {
delErr := c.ObjectCleanupController.DeleteAllObjectsWithPrefix(delPrefix, bucketID)
if delErr != nil {
ctxLogger.WithError(delErr).WithFields(log.Fields{
"bucketID": bucketID,
"column": columnName,
"delPrefix": delPrefix,
}).Error("Failed to deleteAllObjectsWithPrefix from datacenter")
return delErr
}
dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, columnName)
if dbErr != nil {
ctxLogger.WithError(dbErr).WithFields(log.Fields{
"bucketID": bucketID,
"column": columnName,
}).Error("Failed to remove bucket from db")
return dbErr
}
}
// Delete from Latest bucket
err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(delPrefix, fileDataRow.LatestBucket)
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete object from datacenter")
return err
}
dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow)
if dbErr != nil {
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
return err
}
return nil
}
func getMapOfBucketItToColumn(row filedata.Row) (map[string]string, error) {
bucketColumnMap := make(map[string]string)
for _, bucketID := range row.DeleteFromBuckets {

View File

@@ -1,7 +1,6 @@
package filedata
import (
"fmt"
"github.com/ente-io/museum/ente"
"github.com/ente-io/museum/ente/filedata"
"github.com/ente-io/museum/pkg/utils/auth"
@@ -31,7 +30,7 @@ func (c *Controller) GetPreviewUrl(ctx *gin.Context, request filedata.GetPreview
return &enteUrl.URL, nil
}
func (c *Controller) PreviewUploadURL(ctx *gin.Context, request filedata.PreviewUploadUrlRequest) (*string, error) {
func (c *Controller) PreviewUploadURL(ctx *gin.Context, request filedata.PreviewUploadUrlRequest) (*filedata.PreviewUploadUrl, error) {
if err := request.Validate(); err != nil {
return nil, err
}
@@ -43,12 +42,16 @@ func (c *Controller) PreviewUploadURL(ctx *gin.Context, request filedata.Preview
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
id := filedata.NewUploadID(request.Type)
// note: instead of the final url, give a temp url for upload purpose.
uploadUrl := fmt.Sprintf("%s_temp_upload", filedata.PreviewUrl(request.FileID, fileOwnerID, request.Type))
objectKey := filedata.ObjectKey(request.FileID, fileOwnerID, request.Type, &id)
bucketID := c.S3Config.GetBucketID(request.Type)
enteUrl, err := c.getUploadURL(bucketID, uploadUrl)
enteUrl, err := c.getUploadURL(bucketID, objectKey)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
return &enteUrl.URL, nil
return &filedata.PreviewUploadUrl{
ObjectID: id,
Url: enteUrl.URL,
}, nil
}

View File

@@ -5,8 +5,10 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/ente-io/museum/ente"
"github.com/ente-io/museum/ente/filedata"
fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata"
"github.com/ente-io/museum/pkg/utils/file"
enteTime "github.com/ente-io/museum/pkg/utils/time"
"github.com/ente-io/stacktrace"
log "github.com/sirupsen/logrus"
@@ -29,9 +31,37 @@ func (c *Controller) StartReplication() error {
if workerCount == 0 {
workerCount = 6
}
err := c.createTemporaryStorage()
if err != nil {
return stacktrace.Propagate(err, "Failed to create temporary storage")
}
go c.startWorkers(workerCount)
return nil
}
func (c *Controller) createTemporaryStorage() error {
tempStorage := viper.GetString("replication.file-data.tmp-storage")
if tempStorage == "" {
tempStorage = "tmp/replication-file-data"
}
log.Infof("Temporary storage for replication v3 is: %s", tempStorage)
err := file.DeleteAllFilesInDirectory(tempStorage)
if err != nil {
return stacktrace.Propagate(err, "Failed to deleting old files from %s", tempStorage)
}
err = file.MakeDirectoryIfNotExists(tempStorage)
if err != nil {
return stacktrace.Propagate(err, "Failed to create temporary storage %s", tempStorage)
}
c.tempStorage = tempStorage
return nil
}
func (c *Controller) startWorkers(n int) {
log.Infof("Starting %d workers for replication v3", n)
@@ -67,6 +97,7 @@ func (c *Controller) tryReplicate() error {
}
return err
}
err = c.replicateRowData(ctx, *row)
if err != nil {
log.WithFields(log.Fields{
@@ -98,10 +129,26 @@ func (c *Controller) replicateRowData(ctx context.Context, row filedata.Row) err
if err != nil {
return stacktrace.Propagate(err, "error fetching metadata object "+row.S3FileMetadataObjectKey())
}
for bucketID := range wantInBucketIDs {
for key := range wantInBucketIDs {
bucketID := key
if regErr := c.Repo.RegisterReplicationAttempt(ctx, row, bucketID); regErr != nil {
return stacktrace.Propagate(regErr, "could not register replication attempt")
}
if err := c.uploadAndVerify(ctx, row, s3FileMetadata, bucketID); err != nil {
return stacktrace.Propagate(err, "error uploading and verifying metadata object")
}
if row.Type == ente.PreviewVideo {
req := ReplicateObjectReq{
ObjectKey: row.GetS3FileObjectKey(),
SrcBucketID: row.LatestBucket,
DestBucketID: bucketID,
ObjectSize: *row.ObjectSize,
}
if err := c.replicateObject(ctx, &req); err != nil {
return stacktrace.Propagate(err, "error replicating video objects")
}
}
return c.Repo.MoveBetweenBuckets(row, bucketID, fileDataRepo.InflightRepColumn, fileDataRepo.ReplicationColumn)
}
} else {
log.Infof("No replication pending for file %d and type %s", row.FileID, string(row.Type))
@@ -110,15 +157,17 @@ func (c *Controller) replicateRowData(ctx context.Context, row filedata.Row) err
}
func (c *Controller) uploadAndVerify(ctx context.Context, row filedata.Row, s3FileMetadata filedata.S3FileMetadata, dstBucketID string) error {
if err := c.Repo.RegisterReplicationAttempt(ctx, row, dstBucketID); err != nil {
return stacktrace.Propagate(err, "could not register replication attempt")
}
metadataSize, err := c.uploadObject(s3FileMetadata, row.S3FileMetadataObjectKey(), dstBucketID)
if err != nil {
return err
}
if metadataSize != row.Size {
expectedSize := row.Size
if row.ObjectSize != nil {
expectedSize = expectedSize - *row.ObjectSize
}
if metadataSize != expectedSize {
return fmt.Errorf("uploaded metadata size %d does not match expected size %d", metadataSize, row.Size)
}
return c.Repo.MoveBetweenBuckets(row, dstBucketID, fileDataRepo.InflightRepColumn, fileDataRepo.ReplicationColumn)
return nil
}

View File

@@ -10,8 +10,11 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/ente-io/museum/ente"
fileData "github.com/ente-io/museum/ente/filedata"
"github.com/ente-io/museum/pkg/utils/file"
"github.com/ente-io/stacktrace"
log "github.com/sirupsen/logrus"
"io"
"os"
stime "time"
)
@@ -92,21 +95,73 @@ func (c *Controller) uploadObject(obj fileData.S3FileMetadata, objectKey string,
return int64(len(embeddingObj)), nil
}
// copyObject copies the object from srcObjectKey to destObjectKey in the same bucket and returns the object size
func (c *Controller) copyObject(srcObjectKey string, destObjectKey string, bucketID string) error {
bucket := c.S3Config.GetBucket(bucketID)
func (c *Controller) verifySize(bucketID string, objectKey string, expectedSize int64) error {
s3Client := c.S3Config.GetS3Client(bucketID)
copySource := fmt.Sprintf("%s/%s", *bucket, srcObjectKey)
copyInput := &s3.CopyObjectInput{
Bucket: bucket,
CopySource: &copySource,
Key: aws.String(destObjectKey),
bucket := c.S3Config.GetBucket(bucketID)
res, err := s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: bucket,
Key: &objectKey,
})
if err != nil {
return stacktrace.Propagate(err, "Fetching object info from bucket %s failed", *bucket)
}
_, err := s3Client.CopyObject(copyInput)
if err != nil {
return fmt.Errorf("failed to copy (%s) from %s to %s: %v", bucketID, srcObjectKey, destObjectKey, err)
if *res.ContentLength != expectedSize {
err = fmt.Errorf("size of the uploaded file (%d) does not match the expected size (%d) in bucket %s",
*res.ContentLength, expectedSize, *bucket)
//c.notifyDiscord(fmt.Sprint(err))
return stacktrace.Propagate(err, "")
}
return nil
}
type ReplicateObjectReq struct {
ObjectKey string
SrcBucketID string
DestBucketID string
ObjectSize int64
}
// copyObject copies the object from srcObjectKey to destObjectKey in the same bucket and returns the object size
func (c *Controller) replicateObject(ctx context.Context, req *ReplicateObjectReq) error {
if err := file.EnsureSufficientSpace(req.ObjectSize); err != nil {
return stacktrace.Propagate(err, "")
}
filePath, file, err := file.CreateTemporaryFile(c.tempStorage, req.ObjectKey)
if err != nil {
return stacktrace.Propagate(err, "Failed to create temporary file")
}
defer os.Remove(filePath)
defer file.Close()
//s3Client := c.S3Config.GetS3Client(req.SrcBucketID)
bucket := c.S3Config.GetBucket(req.SrcBucketID)
downloader := c.downloadManagerCache[req.SrcBucketID]
_, err = downloader.DownloadWithContext(ctx, file, &s3.GetObjectInput{
Bucket: bucket,
Key: &req.ObjectKey,
})
if err != nil {
return stacktrace.Propagate(err, "Failed to download object from bucket %s", req.SrcBucketID)
}
if err := c.verifySize(req.SrcBucketID, req.ObjectKey, req.ObjectSize); err != nil {
return stacktrace.Propagate(err, "")
}
dstClient := c.S3Config.GetS3Client(req.DestBucketID)
uploader := s3manager.NewUploaderWithClient(&dstClient)
file.Seek(0, io.SeekStart)
up := s3manager.UploadInput{
Bucket: c.S3Config.GetBucket(req.DestBucketID),
Key: aws.String(req.ObjectKey),
Body: file,
}
result, err := uploader.Upload(&up)
if err != nil {
return stacktrace.Propagate(err, "Failed to upload object to bucket %s", req.DestBucketID)
}
log.Infof("Uploaded to bucket %s", result.Location)
// verify the size of the uploaded object
if err := c.verifySize(req.DestBucketID, req.ObjectKey, req.ObjectSize); err != nil {
return stacktrace.Propagate(err, "")
}
log.Infof("Copied (%s) from %s to %s", bucketID, srcObjectKey, destObjectKey)
return nil
}

View File

@@ -0,0 +1,67 @@
package filedata
import (
"context"
"github.com/ente-io/museum/ente"
"github.com/ente-io/museum/ente/filedata"
"github.com/ente-io/museum/pkg/utils/auth"
"github.com/ente-io/museum/pkg/utils/network"
"github.com/ente-io/stacktrace"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
func (c *Controller) InsertVideoPreview(ctx *gin.Context, req *filedata.VidPreviewRequest) error {
if err := req.Validate(); err != nil {
return stacktrace.Propagate(err, "validation failed")
}
userID := auth.GetUserID(ctx.Request.Header)
err := c._validatePermission(ctx, req.FileID, userID)
if err != nil {
return stacktrace.Propagate(err, "")
}
fileOwnerID := userID
bucketID := c.S3Config.GetBucketID(ente.PreviewVideo)
fileObjectKey := filedata.ObjectKey(req.FileID, fileOwnerID, ente.PreviewVideo, &req.ObjectID)
objectKey := filedata.ObjectMetadataKey(req.FileID, fileOwnerID, ente.PreviewVideo, &req.ObjectID)
if sizeErr := c.verifySize(bucketID, fileObjectKey, req.ObjectSize); sizeErr != nil {
return stacktrace.Propagate(sizeErr, "failed to validate size")
}
// Start a goroutine to handle the upload and insert operations
//go func() {
obj := filedata.S3FileMetadata{
Version: *req.Version,
EncryptedData: req.Playlist,
DecryptionHeader: req.PlayListHeader,
Client: network.GetClientInfo(ctx),
}
logger := log.
WithField("objectKey", objectKey).
WithField("fileID", req.FileID).
WithField("type", ente.PreviewVideo)
size, uploadErr := c.uploadObject(obj, objectKey, bucketID)
if uploadErr != nil {
logger.WithError(uploadErr).Error("upload failed")
return nil
}
row := filedata.Row{
FileID: req.FileID,
Type: ente.PreviewVideo,
UserID: fileOwnerID,
Size: size + req.ObjectSize,
LatestBucket: bucketID,
ObjectID: &req.ObjectID,
ObjectNonce: nil,
ObjectSize: &req.ObjectSize,
}
dbInsertErr := c.Repo.InsertOrUpdatePreviewData(context.Background(), row, fileObjectKey)
if dbInsertErr != nil {
logger.WithError(dbInsertErr).Error("insert or update failed")
return nil
}
//}()
return nil
}

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/ente-io/museum/ente"
"github.com/ente-io/museum/ente/filedata"
"github.com/ente-io/museum/pkg/repo"
"github.com/ente-io/museum/pkg/utils/array"
"github.com/ente-io/stacktrace"
"github.com/lib/pq"
@@ -14,7 +15,8 @@ import (
// Repository defines the methods for inserting, updating, and retrieving file data.
type Repository struct {
DB *sql.DB
DB *sql.DB
ObjectCleanupRepo *repo.ObjectCleanupRepository
}
const (
@@ -56,8 +58,52 @@ func (r *Repository) InsertOrUpdate(ctx context.Context, data filedata.Row) erro
return nil
}
func (r *Repository) InsertOrUpdatePreviewData(ctx context.Context, data filedata.Row, previewObject string) error {
tx, err := r.DB.BeginTx(ctx, nil)
if err != nil {
return stacktrace.Propagate(err, "")
}
query := `
INSERT INTO file_data
(file_id, user_id, data_type, size, latest_bucket, obj_id, obj_nonce, obj_size )
VALUES
($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (file_id, data_type)
DO UPDATE SET
size = EXCLUDED.size,
delete_from_buckets = array(
SELECT DISTINCT elem FROM unnest(
array_append(
array_cat(array_cat(file_data.replicated_buckets, file_data.delete_from_buckets), file_data.inflight_rep_buckets),
CASE WHEN file_data.latest_bucket != EXCLUDED.latest_bucket THEN file_data.latest_bucket END
)
) AS elem
WHERE elem IS NOT NULL AND elem != EXCLUDED.latest_bucket
),
replicated_buckets = ARRAY[]::s3region[],
pending_sync = true,
latest_bucket = EXCLUDED.latest_bucket,
obj_id = EXCLUDED.obj_id,
obj_nonce = excluded.obj_nonce,
obj_size = excluded.obj_size,
updated_at = now_utc_micro_seconds()
WHERE file_data.is_deleted = false`
_, err = tx.ExecContext(ctx, query,
data.FileID, data.UserID, string(data.Type), data.Size, data.LatestBucket, *data.ObjectID, data.ObjectNonce, data.ObjectSize)
if err != nil {
tx.Rollback()
return stacktrace.Propagate(err, "failed to insert file data")
}
err = r.ObjectCleanupRepo.RemoveTempObjectFromDC(ctx, tx, previewObject, data.LatestBucket)
if err != nil {
tx.Rollback()
return stacktrace.Propagate(err, "failed to remove object from tempObjects")
}
return tx.Commit()
}
func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fileIDs []int64) ([]filedata.Row, error) {
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at, obj_id, obj_nonce, obj_size
FROM file_data
WHERE data_type = $1 AND file_id = ANY($2)`, string(oType), pq.Array(fileIDs))
if err != nil {
@@ -67,7 +113,7 @@ func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fi
}
func (r *Repository) GetFileData(ctx context.Context, fileIDs int64) ([]filedata.Row, error) {
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets,inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets,inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at, obj_id, obj_nonce, obj_size
FROM file_data
WHERE file_id = $1`, fileIDs)
if err != nil {
@@ -127,7 +173,7 @@ func (r *Repository) RemoveBucket(row filedata.Row, bucketID string, columnName
}
func (r *Repository) GetFDForUser(ctx context.Context, userID int64, lastUpdatedAt int64, limit int64) ([]filedata.FDStatus, error) {
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, is_deleted, updated_at
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, is_deleted, obj_id, obj_nonce, updated_at
FROM file_data
WHERE user_id = $1 AND updated_at > $2 ORDER BY updated_at
LIMIT $3`, userID, lastUpdatedAt, limit)
@@ -137,7 +183,7 @@ func (r *Repository) GetFDForUser(ctx context.Context, userID int64, lastUpdated
var fdStatuses []filedata.FDStatus
for rows.Next() {
var status filedata.FDStatus
scanErr := rows.Scan(&status.FileID, &status.UserID, &status.Type, &status.Size, &status.IsDeleted, &status.UpdatedAt)
scanErr := rows.Scan(&status.FileID, &status.UserID, &status.Type, &status.Size, &status.IsDeleted, &status.ObjectID, &status.ObjectNonce, &status.UpdatedAt)
if scanErr != nil {
return nil, stacktrace.Propagate(scanErr, "")
}
@@ -194,13 +240,13 @@ func (r *Repository) GetPendingSyncDataAndExtendLock(ctx context.Context, newSyn
return nil, stacktrace.Propagate(err, "")
}
defer tx.Rollback()
row := tx.QueryRow(`SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at
row := tx.QueryRow(`SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at, obj_id, obj_nonce, obj_size
FROM file_data
where pending_sync = true and is_deleted = $1 and sync_locked_till < now_utc_micro_seconds()
LIMIT 1
FOR UPDATE SKIP LOCKED`, forDeletion)
var fileData filedata.Row
err = row.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt)
err = row.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt, &fileData.ObjectID, &fileData.ObjectNonce, &fileData.ObjectSize)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
@@ -273,7 +319,7 @@ func convertRowsToFilesData(rows *sql.Rows) ([]filedata.Row, error) {
var filesData []filedata.Row
for rows.Next() {
var fileData filedata.Row
err := rows.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt)
err := rows.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt, &fileData.ObjectID, &fileData.ObjectNonce, &fileData.ObjectSize)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/ente-io/stacktrace"
log "github.com/sirupsen/logrus"
@@ -40,6 +41,22 @@ func (repo *ObjectCleanupRepository) RemoveTempObjectKey(ctx context.Context, tx
return stacktrace.Propagate(err, "")
}
// RemoveTempObjectFromDC will also return how many rows were affected
func (repo *ObjectCleanupRepository) RemoveTempObjectFromDC(ctx context.Context, tx *sql.Tx, objectKey string, dc string) error {
res, err := tx.ExecContext(ctx, `DELETE FROM temp_objects WHERE object_key = $1 and bucket_id = $2`, objectKey, dc)
if err != nil {
return stacktrace.Propagate(err, "")
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return stacktrace.Propagate(err, "")
}
if rowsAffected != 1 {
return stacktrace.Propagate(fmt.Errorf("only one row should be affected not %d", rowsAffected), "")
}
return nil
}
// GetExpiredObjects returns the list of object keys that have expired
func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente.TempObject, error) {
tx, err := repo.DB.Begin()

View File

@@ -3,6 +3,7 @@ package file
import (
"fmt"
"os"
"strings"
"syscall"
"github.com/ente-io/stacktrace"
@@ -53,6 +54,19 @@ func EnsureSufficientSpace(size int64) error {
return nil
}
// CreateTemporaryFile Create a file, and return both the path to the
// file, and the handle to the file.
// The caller must Close() the returned file if it is not nil.
func CreateTemporaryFile(tempStorage string, tempFileName string) (string, *os.File, error) {
fileName := strings.ReplaceAll(tempFileName, "/", "_")
filePath := tempStorage + "/" + fileName
f, err := os.Create(filePath)
if err != nil {
return "", nil, stacktrace.Propagate(err, "Could not create temporary file at '%s' to download object", filePath)
}
return filePath, f, nil
}
func GetLockNameForObject(objectKey string) string {
return fmt.Sprintf("Object:%s", objectKey)
}