[server] Enable replication for vid_preview

This commit is contained in:
Neeraj Gupta
2024-12-02 05:49:38 +05:30
parent 32f8075acf
commit 9650eb3ff6
2 changed files with 21 additions and 8 deletions

View File

@@ -97,10 +97,7 @@ func (c *Controller) tryReplicate() error {
}
return err
}
if row.Type == ente.PreviewVideo {
log.Infof("Skipping replication for preview video %d", row.FileID)
return nil
}
err = c.replicateRowData(ctx, *row)
if err != nil {
log.WithFields(log.Fields{
@@ -140,6 +137,17 @@ func (c *Controller) replicateRowData(ctx context.Context, row filedata.Row) err
if err := c.uploadAndVerify(ctx, row, s3FileMetadata, bucketID); err != nil {
return stacktrace.Propagate(err, "error uploading and verifying metadata object")
}
if row.Type == ente.PreviewVideo {
req := ReplicateObjectReq{
ObjectKey: row.GetS3FileObjectKey(),
SrcBucketID: row.LatestBucket,
DestBucketID: bucketID,
ObjectSize: *row.ObjectSize,
}
if err := c.replicateObject(ctx, &req); err != nil {
return stacktrace.Propagate(err, "error replicating video objects")
}
}
return c.Repo.MoveBetweenBuckets(row, bucketID, fileDataRepo.InflightRepColumn, fileDataRepo.ReplicationColumn)
}
} else {
@@ -150,10 +158,15 @@ func (c *Controller) replicateRowData(ctx context.Context, row filedata.Row) err
func (c *Controller) uploadAndVerify(ctx context.Context, row filedata.Row, s3FileMetadata filedata.S3FileMetadata, dstBucketID string) error {
metadataSize, err := c.uploadObject(s3FileMetadata, row.S3FileMetadataObjectKey(), dstBucketID)
if err != nil {
return err
}
if metadataSize != row.Size {
expectedSize := row.Size
if row.ObjectSize != nil {
expectedSize = expectedSize - *row.ObjectSize
}
if metadataSize != expectedSize {
return fmt.Errorf("uploaded metadata size %d does not match expected size %d", metadataSize, row.Size)
}
return nil

View File

@@ -67,7 +67,7 @@ func (r *Repository) InsertOrUpdatePreviewData(ctx context.Context, data filedat
INSERT INTO file_data
(file_id, user_id, data_type, size, latest_bucket, obj_id, obj_nonce, obj_size )
VALUES
($1, $2, $3, $4, $5, $6, $7)
($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (file_id, data_type)
DO UPDATE SET
size = EXCLUDED.size,
@@ -240,13 +240,13 @@ func (r *Repository) GetPendingSyncDataAndExtendLock(ctx context.Context, newSyn
return nil, stacktrace.Propagate(err, "")
}
defer tx.Rollback()
row := tx.QueryRow(`SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at
row := tx.QueryRow(`SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at, obj_id, obj_nonce, obj_size
FROM file_data
where pending_sync = true and is_deleted = $1 and sync_locked_till < now_utc_micro_seconds()
LIMIT 1
FOR UPDATE SKIP LOCKED`, forDeletion)
var fileData filedata.Row
err = row.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt)
err = row.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt, &fileData.ObjectID, &fileData.ObjectNonce, &fileData.ObjectSize)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}