From d0ef4f234db3a18e396dfdde70608d0b99153381 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Wed, 26 Feb 2025 14:04:28 +0530 Subject: [PATCH 1/4] [server] Prefer reading from b5 --- server/pkg/controller/filedata/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/pkg/controller/filedata/controller.go b/server/pkg/controller/filedata/controller.go index 6f7c6d858f..6b68e26187 100644 --- a/server/pkg/controller/filedata/controller.go +++ b/server/pkg/controller/filedata/controller.go @@ -219,10 +219,10 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3M defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore dc := row.LatestBucket // :todo:neeraj make it configurable - // treat b6 as preferred bucket for reading - if dc == "b5" { - if array.StringInList("b6", row.ReplicatedBuckets) { - dc = "b6" + // treat b5 as preferred bucket for reading + if dc == "b6" { + if array.StringInList("b5", row.ReplicatedBuckets) { + dc = "b5" } } s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, dc) From b68870693557f1eede1023ba1c5a4b766cb12ab6 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Wed, 26 Feb 2025 14:20:28 +0530 Subject: [PATCH 2/4] [server] Enchance logging for metadata failure --- server/pkg/controller/filedata/controller.go | 50 +++++++++++++------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/server/pkg/controller/filedata/controller.go b/server/pkg/controller/filedata/controller.go index 6b68e26187..fe1b217d5e 100644 --- a/server/pkg/controller/filedata/controller.go +++ b/server/pkg/controller/filedata/controller.go @@ -17,6 +17,7 @@ import ( "github.com/ente-io/museum/pkg/utils/network" "github.com/ente-io/museum/pkg/utils/s3config" "github.com/ente-io/stacktrace" + "github.com/gin-contrib/requestid" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" "sync" @@ -143,7 +144,13 @@ func (c *Controller) GetFileData(ctx *gin.Context, req fileData.GetFileData) (*f if len(doRows) == 0 || doRows[0].IsDeleted { return nil, stacktrace.Propagate(ente.ErrNotFound, "") } - s3MetaObject, err := c.fetchS3FileMetadata(context.Background(), doRows[0], doRows[0].LatestBucket) + ctxLogger := log.WithFields(log.Fields{ + "objectKey": doRows[0].S3FileMetadataObjectKey(), + "latest_bucket": doRows[0].LatestBucket, + "req_id": requestid.Get(ctx), + "file_id": req.FileID, + }) + s3MetaObject, err := c.fetchS3FileMetadata(context.Background(), doRows[0], ctxLogger) if err != nil { return nil, stacktrace.Propagate(err, "") } @@ -180,7 +187,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) ( } pendingIndexFileIds := array.FindMissingElementsInSecondList(req.FileIDs, dbFileIds) // Fetch missing doRows in parallel - s3MetaFetchResults, err := c.getS3FileMetadataParallel(activeRows) + s3MetaFetchResults, err := c.getS3FileMetadataParallel(ctx, activeRows) if err != nil { return nil, stacktrace.Propagate(err, "") } @@ -207,7 +214,7 @@ func (c *Controller) GetFilesData(ctx *gin.Context, req fileData.GetFilesData) ( }, nil } -func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) { +func (c *Controller) getS3FileMetadataParallel(ctx *gin.Context, dbRows []fileData.Row) ([]bulkS3MetaFetchResult, error) { var wg sync.WaitGroup embeddingObjects := make([]bulkS3MetaFetchResult, len(dbRows)) for i := range dbRows { @@ -217,17 +224,17 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3M go func(i int, row fileData.Row) { defer wg.Done() defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore - dc := row.LatestBucket - // :todo:neeraj make it configurable - // treat b5 as preferred bucket for reading - if dc == "b6" { - if array.StringInList("b5", row.ReplicatedBuckets) { - dc = "b5" - } - } - s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, dc) + + ctxLogger := log.WithFields(log.Fields{ + "objectKey": row.S3FileMetadataObjectKey(), + "req_id": requestid.Get(ctx), + "latest_bucket": row.LatestBucket, + "file_id": row.FileID, + }) + + s3FileMetadata, err := c.fetchS3FileMetadata(context.Background(), row, ctxLogger) if err != nil { - log.WithField("bucket", dc). + ctxLogger. Error("error fetching object: "+row.S3FileMetadataObjectKey(), err) embeddingObjects[i] = bulkS3MetaFetchResult{ err: err, @@ -246,10 +253,17 @@ func (c *Controller) getS3FileMetadataParallel(dbRows []fileData.Row) ([]bulkS3M return embeddingObjects, nil } -func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, dc string) (*fileData.S3FileMetadata, error) { +func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, ctxLogger *log.Entry) (*fileData.S3FileMetadata, error) { + dc := row.LatestBucket + // :todo:neeraj make it configurable + // treat b5 as preferred bucket for reading + if dc == "b6" { + if array.StringInList("b5", row.ReplicatedBuckets) { + dc = "b5" + } + } opt := _defaultFetchConfig objectKey := row.S3FileMetadataObjectKey() - ctxLogger := log.WithField("objectKey", objectKey).WithField("dc", row.LatestBucket) totalAttempts := opt.RetryCount + 1 timeout := opt.InitialTimeout for i := 0; i < totalAttempts; i++ { @@ -269,13 +283,13 @@ func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, cancel() // Ensure cancel is called to release resources if err == nil { if i > 0 { - ctxLogger.Infof("Fetched object after %d attempts", i) + ctxLogger.WithField("dc", dc).Infof("Fetched object after %d attempts", i) } return &obj, nil } // Check if the error is due to context timeout or cancellation if err == nil && fetchCtx.Err() != nil { - ctxLogger.Error("Fetch timed out or cancelled: ", fetchCtx.Err()) + ctxLogger.WithField("dc", dc).Error("Fetch timed out or cancelled: ", fetchCtx.Err()) } else { // check if the error is due to object not found if s3Err, ok := err.(awserr.RequestFailure); ok { @@ -283,7 +297,7 @@ func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, return nil, stacktrace.Propagate(errors.New("object not found"), "") } } - ctxLogger.Error("Failed to fetch object: ", err) + ctxLogger.WithField("dc", dc).Error("Failed to fetch object: ", err) } } } From d494bb8d38275a725549ba29545e9e025a774318 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Wed, 26 Feb 2025 14:25:45 +0530 Subject: [PATCH 3/4] [server] doc --- server/pkg/controller/filedata/controller.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/pkg/controller/filedata/controller.go b/server/pkg/controller/filedata/controller.go index fe1b217d5e..eca74f7aa0 100644 --- a/server/pkg/controller/filedata/controller.go +++ b/server/pkg/controller/filedata/controller.go @@ -255,8 +255,9 @@ func (c *Controller) getS3FileMetadataParallel(ctx *gin.Context, dbRows []fileDa func (c *Controller) fetchS3FileMetadata(ctx context.Context, row fileData.Row, ctxLogger *log.Entry) (*fileData.S3FileMetadata, error) { dc := row.LatestBucket - // :todo:neeraj make it configurable - // treat b5 as preferred bucket for reading + // :todo:neeraj make it configurable to + // specify preferred dc to read from + // and fallback logic to read from different bucket when we fail to read from preferred dc if dc == "b6" { if array.StringInList("b5", row.ReplicatedBuckets) { dc = "b5" From 98ebf98d1794c20c4b15d42f1e346d4156b7a58c Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Wed, 26 Feb 2025 14:32:17 +0530 Subject: [PATCH 4/4] [server] Bump default worker for filedata replication --- server/pkg/controller/filedata/replicate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/pkg/controller/filedata/replicate.go b/server/pkg/controller/filedata/replicate.go index a17323282b..2f4a7a8b8e 100644 --- a/server/pkg/controller/filedata/replicate.go +++ b/server/pkg/controller/filedata/replicate.go @@ -29,7 +29,7 @@ func (c *Controller) StartReplication() error { workerCount := viper.GetInt("replication.file-data.worker-count") if workerCount == 0 { - workerCount = 6 + workerCount = 10 } err := c.createTemporaryStorage() if err != nil {