diff --git a/server/pkg/controller/filedata/controller.go b/server/pkg/controller/filedata/controller.go index c428aaebae..fb795c01c0 100644 --- a/server/pkg/controller/filedata/controller.go +++ b/server/pkg/controller/filedata/controller.go @@ -44,18 +44,17 @@ type bulkS3MetaFetchResult struct { } type Controller struct { - Repo *fileDataRepo.Repository - AccessCtrl access.Controller - ObjectCleanupController *controller.ObjectCleanupController - S3Config *s3config.S3Config - QueueRepo *repo.QueueRepository - TaskLockingRepo *repo.TaskLockRepository - FileRepo *repo.FileRepository - CollectionRepo *repo.CollectionRepository - HostName string - cleanupCronRunning bool - derivedStorageDataCenter string - downloadManagerCache map[string]*s3manager.Downloader + Repo *fileDataRepo.Repository + AccessCtrl access.Controller + ObjectCleanupController *controller.ObjectCleanupController + S3Config *s3config.S3Config + QueueRepo *repo.QueueRepository + TaskLockingRepo *repo.TaskLockRepository + FileRepo *repo.FileRepository + CollectionRepo *repo.CollectionRepository + HostName string + cleanupCronRunning bool + downloadManagerCache map[string]*s3manager.Downloader } func New(repo *fileDataRepo.Repository, @@ -67,24 +66,23 @@ func New(repo *fileDataRepo.Repository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller { - embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetWasabiDerivedDC(), s3Config.GetDerivedStorageDataCenter()} + embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetWasabiDerivedDC(), s3Config.GetDerivedStorageDataCenter(), "b5"} cache := make(map[string]*s3manager.Downloader, len(embeddingDcs)) for i := range embeddingDcs { s3Client := s3Config.GetS3Client(embeddingDcs[i]) cache[embeddingDcs[i]] = s3manager.NewDownloaderWithClient(&s3Client) } return &Controller{ - Repo: repo, - AccessCtrl: accessCtrl, - ObjectCleanupController: objectCleanupController, - S3Config: s3Config, - QueueRepo: queueRepo, - TaskLockingRepo: taskLockingRepo, - FileRepo: fileRepo, - CollectionRepo: collectionRepo, - HostName: hostName, - derivedStorageDataCenter: s3Config.GetDerivedStorageDataCenter(), - downloadManagerCache: cache, + Repo: repo, + AccessCtrl: accessCtrl, + ObjectCleanupController: objectCleanupController, + S3Config: s3Config, + QueueRepo: queueRepo, + TaskLockingRepo: taskLockingRepo, + FileRepo: fileRepo, + CollectionRepo: collectionRepo, + HostName: hostName, + downloadManagerCache: cache, } } @@ -105,7 +103,8 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req *fileData.PutFileDataR DecryptionHeader: *req.DecryptionHeader, Client: network.GetClientInfo(ctx), } - size, uploadErr := c.uploadObject(obj, objectKey, c.derivedStorageDataCenter) + bucketID := c.S3Config.GetBucketID(req.Type) + size, uploadErr := c.uploadObject(obj, objectKey, bucketID) if uploadErr != nil { log.Error(uploadErr) return stacktrace.Propagate(uploadErr, "") @@ -115,7 +114,7 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req *fileData.PutFileDataR Type: req.Type, UserID: fileOwnerID, Size: size, - LatestBucket: c.derivedStorageDataCenter, + LatestBucket: bucketID, } err = c.Repo.InsertOrUpdate(ctx, row) if err != nil { @@ -146,7 +145,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) ( } pendingIndexFileIds := array.FindMissingElementsInSecondList(req.FileIDs, dbFileIds) // Fetch missing doRows in parallel - s3MetaFetchResults, err := c.getS3FileMetadataParallel(activeRows, c.derivedStorageDataCenter) + s3MetaFetchResults, err := c.getS3FileMetadataParallel(activeRows) if err != nil { return nil, stacktrace.Propagate(err, "") } @@ -173,7 +172,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) ( }, nil } -func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row, dc string) ([]bulkS3MetaFetchResult, error) { +func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) { var wg sync.WaitGroup embeddingObjects := make([]bulkS3MetaFetchResult, len(dbRows)) for i, _ := range dbRows { @@ -183,6 +182,7 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row, dc string) go func(i int, row fileData.Row) { defer wg.Done() defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore + dc := row.LatestBucket s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, dc) if err != nil { log.WithField("bucket", row.LatestBucket).