diff --git a/server/pkg/controller/replication3.go b/server/pkg/controller/replication3.go index 4fad173ea2..61062c0e0a 100644 --- a/server/pkg/controller/replication3.go +++ b/server/pkg/controller/replication3.go @@ -222,7 +222,7 @@ func (c *ReplicationController3) replicate(i int) { // objects left to replicate currently. func (c *ReplicationController3) tryReplicate() error { // Fetch an object to replicate - tx, copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject() + copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject() if err != nil { if !errors.Is(err, sql.ErrNoRows) { log.Errorf("Could not fetch an object to replicate: %s", err) @@ -237,44 +237,16 @@ func (c *ReplicationController3) tryReplicate() error { "object_key": objectKey, }) - commit := func(err error) error { - // We don't rollback the transaction even in the case of errors, and - // instead try to commit it after setting the last_attempt timestamp. - // - // This avoids the replication getting stuck in a loop trying (and - // failing) to replicate the same object. The error would still need to - // be resolved, but at least the replication would meanwhile move - // forward, ignoring this row. - + done := func(err error) error { if err != nil { logger.Error(err) } - aerr := c.ObjectCopiesRepo.RegisterReplicationAttempt(tx, objectKey) - if aerr != nil { - aerr = stacktrace.Propagate(aerr, "Failed to mark replication attempt") - logger.Error(aerr) - } - - cerr := tx.Commit() - if cerr != nil { - cerr = stacktrace.Propagate(err, "Failed to commit transaction") - logger.Error(cerr) - } - - if err == nil { - err = aerr - } - if err == nil { - err = cerr - } - if err == nil { logger.Info("Replication attempt succeeded") } else { - logger.Info("Replication attempt failed") + logger.WithError(err).Info("Replication attempt failed") } - return err } @@ -282,30 +254,30 @@ func (c *ReplicationController3) tryReplicate() error { if copies.B2 == nil { err := errors.New("expected B2 copy to be in place before we start replication") - return commit(stacktrace.Propagate(err, "Sanity check failed")) + return done(stacktrace.Propagate(err, "Sanity check failed")) } if !copies.WantWasabi && !copies.WantSCW { err := errors.New("expected at least one of want_wasabi and want_scw to be true when trying to replicate") - return commit(stacktrace.Propagate(err, "Sanity check failed")) + return done(stacktrace.Propagate(err, "Sanity check failed")) } - ob, err := c.ObjectRepo.GetObjectState(tx, objectKey) + ob, err := c.ObjectRepo.GetObjectState(objectKey) if err != nil { - return commit(stacktrace.Propagate(err, "Failed to fetch file's deleted status")) + return done(stacktrace.Propagate(err, "Failed to fetch file's deleted status")) } if ob.IsFileDeleted || ob.IsUserDeleted { // Update the object_copies to mark this object as not requiring further // replication. The row in object_copies will get deleted when the next // scheduled object deletion runs. - err = c.ObjectCopiesRepo.UnmarkFromReplication(tx, objectKey) + err = c.ObjectCopiesRepo.UnmarkFromReplication(objectKey) if err != nil { - return commit(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication")) + return done(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication")) } logger.Infof("Skipping replication for deleted object (isFileDeleted = %v, isUserDeleted = %v)", ob.IsFileDeleted, ob.IsUserDeleted) - return commit(nil) + return done(nil) } err = ensureSufficientSpace(ob.Size) @@ -316,19 +288,19 @@ func (c *ReplicationController3) tryReplicate() error { // // Log this error though, so that it gets noticed if it happens too // frequently (the instance might need a bigger disk). - return commit(stacktrace.Propagate(err, "")) + return done(stacktrace.Propagate(err, "")) } filePath, file, err := c.createTemporaryFile(objectKey) if err != nil { - return commit(stacktrace.Propagate(err, "Failed to create temporary file")) + return done(stacktrace.Propagate(err, "Failed to create temporary file")) } defer os.Remove(filePath) defer file.Close() size, err := c.downloadFromB2ViaWorker(objectKey, file, logger) if err != nil { - return commit(stacktrace.Propagate(err, "Failed to download object from B2")) + return done(stacktrace.Propagate(err, "Failed to download object from B2")) } logger.Infof("Downloaded %d bytes to %s", size, filePath) @@ -343,21 +315,20 @@ func (c *ReplicationController3) tryReplicate() error { if copies.WantWasabi && copies.Wasabi == nil { werr := c.replicateFile(in, c.wasabiDest, func() error { - return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(tx, objectKey) + return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(objectKey) }) err = werr } if copies.WantSCW && copies.SCW == nil { serr := c.replicateFile(in, c.scwDest, func() error { - return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(tx, objectKey) + return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(objectKey) }) if err == nil { err = serr } } - - return commit(err) + return done(err) } // Return an error if we risk running out of disk space if we try to download diff --git a/server/pkg/repo/object.go b/server/pkg/repo/object.go index 052278402d..380a68126d 100644 --- a/server/pkg/repo/object.go +++ b/server/pkg/repo/object.go @@ -200,8 +200,8 @@ func (repo *ObjectRepository) DoesObjectOrTempObjectExist(objectKey string) (boo // // Unknown objects (i.e. objectKeys for which there are no entries) are // considered as deleted. -func (repo *ObjectRepository) GetObjectState(tx *sql.Tx, objectKey string) (ObjectState ente.ObjectState, err error) { - row := tx.QueryRow(` +func (repo *ObjectRepository) GetObjectState(objectKey string) (ObjectState ente.ObjectState, err error) { + row := repo.DB.QueryRow(` SELECT ok.is_deleted, u.encrypted_email IS NULL AS is_user_deleted, ok.size FROM object_keys ok JOIN files f ON ok.file_id = f.file_id diff --git a/server/pkg/repo/object_copies.go b/server/pkg/repo/object_copies.go index 7e1bb58f04..8d7be84571 100644 --- a/server/pkg/repo/object_copies.go +++ b/server/pkg/repo/object_copies.go @@ -18,17 +18,14 @@ type ObjectCopiesRepository struct { } // GetAndLockUnreplicatedObject gets an object which is not yet replicated to -// all the replicas. It also starts a transaction to keep the row corresponding -// to that object in the database locked. +// all the replicas. It also registers a replication to keep the row corresponding +// to that object to be blocked for 24h before next replication attemp. // -// Both tx and objectCopies are guaranteed to be nil if error is not nil. -// -// If the returned transaction is not `nil`, it must be either `Rollback`ed or -// `Commit`ed. -func (repo *ObjectCopiesRepository) GetAndLockUnreplicatedObject() (*sql.Tx, *ente.ObjectCopies, error) { +// ObjectCopies is guaranteed to be nil if error is not nil. +func (repo *ObjectCopiesRepository) GetAndLockUnreplicatedObject() (*ente.ObjectCopies, error) { tx, err := repo.DB.Begin() if err != nil { - return nil, nil, stacktrace.Propagate(err, "") + return nil, stacktrace.Propagate(err, "") } rollback := func() { @@ -64,15 +61,19 @@ func (repo *ObjectCopiesRepository) GetAndLockUnreplicatedObject() (*sql.Tx, *en if err != nil && errors.Is(err, sql.ErrNoRows) { commit() - return nil, nil, err + return nil, err } if err != nil { rollback() - return nil, nil, stacktrace.Propagate(err, "") + return nil, stacktrace.Propagate(err, "") } - - return tx, &r, nil + err = repo.RegisterReplicationAttempt(tx, r.ObjectKey) + if err != nil { + rollback() + return nil, stacktrace.Propagate(err, "failed to register replication attempt") + } + return &r, nil } // CreateNewB2Object creates a new entry for objectKey and marks it as having @@ -139,8 +140,8 @@ func (repo *ObjectCopiesRepository) ResetNeedsScalewayReplication(objectKey stri // UnmarkFromReplication clears the want_* flags so that this objectKey is // marked as not requiring further replication. -func (repo *ObjectCopiesRepository) UnmarkFromReplication(tx *sql.Tx, objectKey string) error { - _, err := tx.Exec(` +func (repo *ObjectCopiesRepository) UnmarkFromReplication(objectKey string) error { + _, err := repo.DB.Exec(` UPDATE object_copies SET want_b2 = false, want_wasabi = false, want_scw = false WHERE object_key = $1 @@ -150,24 +151,24 @@ func (repo *ObjectCopiesRepository) UnmarkFromReplication(tx *sql.Tx, objectKey // MarkObjectReplicatedB2 sets the time when `objectKey` was replicated to // Wasabi to the current timestamp. -func (repo *ObjectCopiesRepository) MarkObjectReplicatedWasabi(tx *sql.Tx, objectKey string) error { +func (repo *ObjectCopiesRepository) MarkObjectReplicatedWasabi(objectKey string) error { return repo.markObjectReplicated(` UPDATE object_copies SET wasabi = now_utc_micro_seconds() WHERE object_key = $1 - `, tx, objectKey) + `, objectKey) } // MarkObjectReplicatedScaleway sets the time when `objectKey` was replicated to // Wasabi to the current timestamp. -func (repo *ObjectCopiesRepository) MarkObjectReplicatedScaleway(tx *sql.Tx, objectKey string) error { +func (repo *ObjectCopiesRepository) MarkObjectReplicatedScaleway(objectKey string) error { return repo.markObjectReplicated(` UPDATE object_copies SET scw = now_utc_micro_seconds() WHERE object_key = $1 - `, tx, objectKey) + `, objectKey) } -func (repo *ObjectCopiesRepository) markObjectReplicated(query string, tx *sql.Tx, objectKey string) error { - result, err := tx.Exec(query, objectKey) +func (repo *ObjectCopiesRepository) markObjectReplicated(query string, objectKey string) error { + result, err := repo.DB.Exec(query, objectKey) if err != nil { return stacktrace.Propagate(err, "") }