[server] Read embeddings from different bucket & logging improvements (#5180)
## Description ## Tests
This commit is contained in:
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/ente-io/museum/pkg/utils/network"
|
||||
"github.com/ente-io/museum/pkg/utils/s3config"
|
||||
"github.com/ente-io/stacktrace"
|
||||
"github.com/gin-contrib/requestid"
|
||||
"github.com/gin-gonic/gin"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"sync"
|
||||
@@ -143,7 +144,13 @@ func (c *Controller) GetFileData(ctx *gin.Context, req fileData.GetFileData) (*f
|
||||
if len(doRows) == 0 || doRows[0].IsDeleted {
|
||||
return nil, stacktrace.Propagate(ente.ErrNotFound, "")
|
||||
}
|
||||
s3MetaObject, err := c.fetchS3FileMetadata(context.Background(), doRows[0], doRows[0].LatestBucket)
|
||||
ctxLogger := log.WithFields(log.Fields{
|
||||
"objectKey": doRows[0].S3FileMetadataObjectKey(),
|
||||
"latest_bucket": doRows[0].LatestBucket,
|
||||
"req_id": requestid.Get(ctx),
|
||||
"file_id": req.FileID,
|
||||
})
|
||||
s3MetaObject, err := c.fetchS3FileMetadata(context.Background(), doRows[0], ctxLogger)
|
||||
if err != nil {
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
}
|
||||
@@ -180,7 +187,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) (
|
||||
}
|
||||
pendingIndexFileIds := array.FindMissingElementsInSecondList(req.FileIDs, dbFileIds)
|
||||
// Fetch missing doRows in parallel
|
||||
s3MetaFetchResults, err := c.getS3FileMetadataParallel(activeRows)
|
||||
s3MetaFetchResults, err := c.getS3FileMetadataParallel(ctx, activeRows)
|
||||
if err != nil {
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
}
|
||||
@@ -207,7 +214,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) (
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) {
|
||||
func (c *Controller) getS3FileMetadataParallel(ctx *gin.Context, dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) {
|
||||
var wg sync.WaitGroup
|
||||
embeddingObjects := make([]bulkS3MetaFetchResult, len(dbRows))
|
||||
for i := range dbRows {
|
||||
@@ -217,17 +224,17 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3M
|
||||
go func(i int, row fileData.Row) {
|
||||
defer wg.Done()
|
||||
defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore
|
||||
dc := row.LatestBucket
|
||||
// :todo:neeraj make it configurable
|
||||
// treat b6 as preferred bucket for reading
|
||||
if dc == "b5" {
|
||||
if array.StringInList("b6", row.ReplicatedBuckets) {
|
||||
dc = "b6"
|
||||
}
|
||||
}
|
||||
s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, dc)
|
||||
|
||||
ctxLogger := log.WithFields(log.Fields{
|
||||
"objectKey": row.S3FileMetadataObjectKey(),
|
||||
"req_id": requestid.Get(ctx),
|
||||
"latest_bucket": row.LatestBucket,
|
||||
"file_id": row.FileID,
|
||||
})
|
||||
|
||||
s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, ctxLogger)
|
||||
if err != nil {
|
||||
log.WithField("bucket", dc).
|
||||
ctxLogger.
|
||||
Error("error fetching object: "+row.S3FileMetadataObjectKey(), err)
|
||||
embeddingObjects[i] = bulkS3MetaFetchResult{
|
||||
err: err,
|
||||
@@ -246,10 +253,18 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3M
|
||||
return embeddingObjects, nil
|
||||
}
|
||||
|
||||
func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, dc string) (*fileData.S3FileMetadata, error) {
|
||||
func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, ctxLogger *log.Entry) (*fileData.S3FileMetadata, error) {
|
||||
dc := row.LatestBucket
|
||||
// :todo:neeraj make it configurable to
|
||||
// specify preferred dc to read from
|
||||
// and fallback logic to read from different bucket when we fail to read from preferred dc
|
||||
if dc == "b6" {
|
||||
if array.StringInList("b5", row.ReplicatedBuckets) {
|
||||
dc = "b5"
|
||||
}
|
||||
}
|
||||
opt := _defaultFetchConfig
|
||||
objectKey := row.S3FileMetadataObjectKey()
|
||||
ctxLogger := log.WithField("objectKey", objectKey).WithField("dc", row.LatestBucket)
|
||||
totalAttempts := opt.RetryCount + 1
|
||||
timeout := opt.InitialTimeout
|
||||
for i := 0; i < totalAttempts; i++ {
|
||||
@@ -269,13 +284,13 @@ func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row,
|
||||
cancel() // Ensure cancel is called to release resources
|
||||
if err == nil {
|
||||
if i > 0 {
|
||||
ctxLogger.Infof("Fetched object after %d attempts", i)
|
||||
ctxLogger.WithField("dc", dc).Infof("Fetched object after %d attempts", i)
|
||||
}
|
||||
return &obj, nil
|
||||
}
|
||||
// Check if the error is due to context timeout or cancellation
|
||||
if err == nil && fetchCtx.Err() != nil {
|
||||
ctxLogger.Error("Fetch timed out or cancelled: ", fetchCtx.Err())
|
||||
ctxLogger.WithField("dc", dc).Error("Fetch timed out or cancelled: ", fetchCtx.Err())
|
||||
} else {
|
||||
// check if the error is due to object not found
|
||||
if s3Err, ok := err.(awserr.RequestFailure); ok {
|
||||
@@ -283,7 +298,7 @@ func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row,
|
||||
return nil, stacktrace.Propagate(errors.New("object not found"), "")
|
||||
}
|
||||
}
|
||||
ctxLogger.Error("Failed to fetch object: ", err)
|
||||
ctxLogger.WithField("dc", dc).Error("Failed to fetch object: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ func (c *Controller) StartReplication() error {
|
||||
|
||||
workerCount := viper.GetInt("replication.file-data.worker-count")
|
||||
if workerCount == 0 {
|
||||
workerCount = 6
|
||||
workerCount = 10
|
||||
}
|
||||
err := c.createTemporaryStorage()
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user