This commit is contained in:
Neeraj Gupta
2024-08-05 17:46:00 +05:30
parent 744d6bc6ea
commit 543aa6b9cf

View File

@@ -44,18 +44,17 @@ type bulkS3MetaFetchResult struct {
}
type Controller struct {
Repo *fileDataRepo.Repository
AccessCtrl access.Controller
ObjectCleanupController *controller.ObjectCleanupController
S3Config *s3config.S3Config
QueueRepo *repo.QueueRepository
TaskLockingRepo *repo.TaskLockRepository
FileRepo *repo.FileRepository
CollectionRepo *repo.CollectionRepository
HostName string
cleanupCronRunning bool
derivedStorageDataCenter string
downloadManagerCache map[string]*s3manager.Downloader
Repo *fileDataRepo.Repository
AccessCtrl access.Controller
ObjectCleanupController *controller.ObjectCleanupController
S3Config *s3config.S3Config
QueueRepo *repo.QueueRepository
TaskLockingRepo *repo.TaskLockRepository
FileRepo *repo.FileRepository
CollectionRepo *repo.CollectionRepository
HostName string
cleanupCronRunning bool
downloadManagerCache map[string]*s3manager.Downloader
}
func New(repo *fileDataRepo.Repository,
@@ -67,24 +66,23 @@ func New(repo *fileDataRepo.Repository,
fileRepo *repo.FileRepository,
collectionRepo *repo.CollectionRepository,
hostName string) *Controller {
embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetWasabiDerivedDC(), s3Config.GetDerivedStorageDataCenter()}
embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetWasabiDerivedDC(), s3Config.GetDerivedStorageDataCenter(), "b5"}
cache := make(map[string]*s3manager.Downloader, len(embeddingDcs))
for i := range embeddingDcs {
s3Client := s3Config.GetS3Client(embeddingDcs[i])
cache[embeddingDcs[i]] = s3manager.NewDownloaderWithClient(&s3Client)
}
return &Controller{
Repo: repo,
AccessCtrl: accessCtrl,
ObjectCleanupController: objectCleanupController,
S3Config: s3Config,
QueueRepo: queueRepo,
TaskLockingRepo: taskLockingRepo,
FileRepo: fileRepo,
CollectionRepo: collectionRepo,
HostName: hostName,
derivedStorageDataCenter: s3Config.GetDerivedStorageDataCenter(),
downloadManagerCache: cache,
Repo: repo,
AccessCtrl: accessCtrl,
ObjectCleanupController: objectCleanupController,
S3Config: s3Config,
QueueRepo: queueRepo,
TaskLockingRepo: taskLockingRepo,
FileRepo: fileRepo,
CollectionRepo: collectionRepo,
HostName: hostName,
downloadManagerCache: cache,
}
}
@@ -105,7 +103,8 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req *fileData.PutFileDataR
DecryptionHeader: *req.DecryptionHeader,
Client: network.GetClientInfo(ctx),
}
size, uploadErr := c.uploadObject(obj, objectKey, c.derivedStorageDataCenter)
bucketID := c.S3Config.GetBucketID(req.Type)
size, uploadErr := c.uploadObject(obj, objectKey, bucketID)
if uploadErr != nil {
log.Error(uploadErr)
return stacktrace.Propagate(uploadErr, "")
@@ -115,7 +114,7 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req *fileData.PutFileDataR
Type: req.Type,
UserID: fileOwnerID,
Size: size,
LatestBucket: c.derivedStorageDataCenter,
LatestBucket: bucketID,
}
err = c.Repo.InsertOrUpdate(ctx, row)
if err != nil {
@@ -146,7 +145,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) (
}
pendingIndexFileIds := array.FindMissingElementsInSecondList(req.FileIDs, dbFileIds)
// Fetch missing doRows in parallel
s3MetaFetchResults, err := c.getS3FileMetadataParallel(activeRows, c.derivedStorageDataCenter)
s3MetaFetchResults, err := c.getS3FileMetadataParallel(activeRows)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
@@ -173,7 +172,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) (
}, nil
}
func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row, dc string) ([]bulkS3MetaFetchResult, error) {
func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) {
var wg sync.WaitGroup
embeddingObjects := make([]bulkS3MetaFetchResult, len(dbRows))
for i, _ := range dbRows {
@@ -183,6 +182,7 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row, dc string)
go func(i int, row fileData.Row) {
defer wg.Done()
defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore
dc := row.LatestBucket
s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, dc)
if err != nil {
log.WithField("bucket", row.LatestBucket).