diff --git a/server/pkg/controller/filedata/controller.go b/server/pkg/controller/filedata/controller.go index 6f7c6d858f..eca74f7aa0 100644 --- a/server/pkg/controller/filedata/controller.go +++ b/server/pkg/controller/filedata/controller.go @@ -17,6 +17,7 @@ import ( "github.com/ente-io/museum/pkg/utils/network" "github.com/ente-io/museum/pkg/utils/s3config" "github.com/ente-io/stacktrace" + "github.com/gin-contrib/requestid" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" "sync" @@ -143,7 +144,13 @@ func (c *Controller) GetFileData(ctx *gin.Context, req fileData.GetFileData) (*f if len(doRows) == 0 || doRows[0].IsDeleted { return nil, stacktrace.Propagate(ente.ErrNotFound, "") } - s3MetaObject, err := c.fetchS3FileMetadata(context.Background(), doRows[0], doRows[0].LatestBucket) + ctxLogger := log.WithFields(log.Fields{ + "objectKey": doRows[0].S3FileMetadataObjectKey(), + "latest_bucket": doRows[0].LatestBucket, + "req_id": requestid.Get(ctx), + "file_id": req.FileID, + }) + s3MetaObject, err := c.fetchS3FileMetadata(context.Background(), doRows[0], ctxLogger) if err != nil { return nil, stacktrace.Propagate(err, "") } @@ -180,7 +187,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) ( } pendingIndexFileIds := array.FindMissingElementsInSecondList(req.FileIDs, dbFileIds) // Fetch missing doRows in parallel - s3MetaFetchResults, err := c.getS3FileMetadataParallel(activeRows) + s3MetaFetchResults, err := c.getS3FileMetadataParallel(ctx, activeRows) if err != nil { return nil, stacktrace.Propagate(err, "") } @@ -207,7 +214,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) ( }, nil } -func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) { +func (c *Controller) getS3FileMetadataParallel(ctx *gin.Context, dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) { var wg sync.WaitGroup embeddingObjects := make([]bulkS3MetaFetchResult, len(dbRows)) for i := range dbRows { @@ -217,17 +224,17 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3M go func(i int, row fileData.Row) { defer wg.Done() defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore - dc := row.LatestBucket - // :todo:neeraj make it configurable - // treat b6 as preferred bucket for reading - if dc == "b5" { - if array.StringInList("b6", row.ReplicatedBuckets) { - dc = "b6" - } - } - s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, dc) + + ctxLogger := log.WithFields(log.Fields{ + "objectKey": row.S3FileMetadataObjectKey(), + "req_id": requestid.Get(ctx), + "latest_bucket": row.LatestBucket, + "file_id": row.FileID, + }) + + s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, ctxLogger) if err != nil { - log.WithField("bucket", dc). + ctxLogger. Error("error fetching object: "+row.S3FileMetadataObjectKey(), err) embeddingObjects[i] = bulkS3MetaFetchResult{ err: err, @@ -246,10 +253,18 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3M return embeddingObjects, nil } -func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, dc string) (*fileData.S3FileMetadata, error) { +func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, ctxLogger *log.Entry) (*fileData.S3FileMetadata, error) { + dc := row.LatestBucket + // :todo:neeraj make it configurable to + // specify preferred dc to read from + // and fallback logic to read from different bucket when we fail to read from preferred dc + if dc == "b6" { + if array.StringInList("b5", row.ReplicatedBuckets) { + dc = "b5" + } + } opt := _defaultFetchConfig objectKey := row.S3FileMetadataObjectKey() - ctxLogger := log.WithField("objectKey", objectKey).WithField("dc", row.LatestBucket) totalAttempts := opt.RetryCount + 1 timeout := opt.InitialTimeout for i := 0; i < totalAttempts; i++ { @@ -269,13 +284,13 @@ func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, cancel() // Ensure cancel is called to release resources if err == nil { if i > 0 { - ctxLogger.Infof("Fetched object after %d attempts", i) + ctxLogger.WithField("dc", dc).Infof("Fetched object after %d attempts", i) } return &obj, nil } // Check if the error is due to context timeout or cancellation if err == nil && fetchCtx.Err() != nil { - ctxLogger.Error("Fetch timed out or cancelled: ", fetchCtx.Err()) + ctxLogger.WithField("dc", dc).Error("Fetch timed out or cancelled: ", fetchCtx.Err()) } else { // check if the error is due to object not found if s3Err, ok := err.(awserr.RequestFailure); ok { @@ -283,7 +298,7 @@ func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, return nil, stacktrace.Propagate(errors.New("object not found"), "") } } - ctxLogger.Error("Failed to fetch object: ", err) + ctxLogger.WithField("dc", dc).Error("Failed to fetch object: ", err) } } } diff --git a/server/pkg/controller/filedata/replicate.go b/server/pkg/controller/filedata/replicate.go index a17323282b..2f4a7a8b8e 100644 --- a/server/pkg/controller/filedata/replicate.go +++ b/server/pkg/controller/filedata/replicate.go @@ -29,7 +29,7 @@ func (c *Controller) StartReplication() error { workerCount := viper.GetInt("replication.file-data.worker-count") if workerCount == 0 { - workerCount = 6 + workerCount = 10 } err := c.createTemporaryStorage() if err != nil {