[sever][replication] Avoid long running txn during replication (#2669)

## Description
Going forward, we pick a row for replication, and immediately mark
replication attempt. Once that's done, we try to replicate the object,
and update it's status without any transaction.

The initial locking and marking the replication attempt should ideally
ensure that no other worker will pick up the row for replication for
next 24hours.

## Tests
This commit is contained in:
Neeraj Gupta
2024-08-12 16:09:41 +05:30
committed by GitHub
3 changed files with 39 additions and 67 deletions

View File

@@ -222,7 +222,7 @@ func (c *ReplicationController3) replicate(i int) {
// objects left to replicate currently.
func (c *ReplicationController3) tryReplicate() error {
// Fetch an object to replicate
tx, copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject()
copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject()
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Errorf("Could not fetch an object to replicate: %s", err)
@@ -237,44 +237,16 @@ func (c *ReplicationController3) tryReplicate() error {
"object_key": objectKey,
})
commit := func(err error) error {
// We don't rollback the transaction even in the case of errors, and
// instead try to commit it after setting the last_attempt timestamp.
//
// This avoids the replication getting stuck in a loop trying (and
// failing) to replicate the same object. The error would still need to
// be resolved, but at least the replication would meanwhile move
// forward, ignoring this row.
done := func(err error) error {
if err != nil {
logger.Error(err)
}
aerr := c.ObjectCopiesRepo.RegisterReplicationAttempt(tx, objectKey)
if aerr != nil {
aerr = stacktrace.Propagate(aerr, "Failed to mark replication attempt")
logger.Error(aerr)
}
cerr := tx.Commit()
if cerr != nil {
cerr = stacktrace.Propagate(err, "Failed to commit transaction")
logger.Error(cerr)
}
if err == nil {
err = aerr
}
if err == nil {
err = cerr
}
if err == nil {
logger.Info("Replication attempt succeeded")
} else {
logger.Info("Replication attempt failed")
logger.WithError(err).Info("Replication attempt failed")
}
return err
}
@@ -282,30 +254,30 @@ func (c *ReplicationController3) tryReplicate() error {
if copies.B2 == nil {
err := errors.New("expected B2 copy to be in place before we start replication")
return commit(stacktrace.Propagate(err, "Sanity check failed"))
return done(stacktrace.Propagate(err, "Sanity check failed"))
}
if !copies.WantWasabi && !copies.WantSCW {
err := errors.New("expected at least one of want_wasabi and want_scw to be true when trying to replicate")
return commit(stacktrace.Propagate(err, "Sanity check failed"))
return done(stacktrace.Propagate(err, "Sanity check failed"))
}
ob, err := c.ObjectRepo.GetObjectState(tx, objectKey)
ob, err := c.ObjectRepo.GetObjectState(objectKey)
if err != nil {
return commit(stacktrace.Propagate(err, "Failed to fetch file's deleted status"))
return done(stacktrace.Propagate(err, "Failed to fetch file's deleted status"))
}
if ob.IsFileDeleted || ob.IsUserDeleted {
// Update the object_copies to mark this object as not requiring further
// replication. The row in object_copies will get deleted when the next
// scheduled object deletion runs.
err = c.ObjectCopiesRepo.UnmarkFromReplication(tx, objectKey)
err = c.ObjectCopiesRepo.UnmarkFromReplication(objectKey)
if err != nil {
return commit(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication"))
return done(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication"))
}
logger.Infof("Skipping replication for deleted object (isFileDeleted = %v, isUserDeleted = %v)",
ob.IsFileDeleted, ob.IsUserDeleted)
return commit(nil)
return done(nil)
}
err = ensureSufficientSpace(ob.Size)
@@ -316,19 +288,19 @@ func (c *ReplicationController3) tryReplicate() error {
//
// Log this error though, so that it gets noticed if it happens too
// frequently (the instance might need a bigger disk).
return commit(stacktrace.Propagate(err, ""))
return done(stacktrace.Propagate(err, ""))
}
filePath, file, err := c.createTemporaryFile(objectKey)
if err != nil {
return commit(stacktrace.Propagate(err, "Failed to create temporary file"))
return done(stacktrace.Propagate(err, "Failed to create temporary file"))
}
defer os.Remove(filePath)
defer file.Close()
size, err := c.downloadFromB2ViaWorker(objectKey, file, logger)
if err != nil {
return commit(stacktrace.Propagate(err, "Failed to download object from B2"))
return done(stacktrace.Propagate(err, "Failed to download object from B2"))
}
logger.Infof("Downloaded %d bytes to %s", size, filePath)
@@ -343,21 +315,20 @@ func (c *ReplicationController3) tryReplicate() error {
if copies.WantWasabi && copies.Wasabi == nil {
werr := c.replicateFile(in, c.wasabiDest, func() error {
return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(tx, objectKey)
return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(objectKey)
})
err = werr
}
if copies.WantSCW && copies.SCW == nil {
serr := c.replicateFile(in, c.scwDest, func() error {
return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(tx, objectKey)
return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(objectKey)
})
if err == nil {
err = serr
}
}
return commit(err)
return done(err)
}
// Return an error if we risk running out of disk space if we try to download

View File

@@ -200,8 +200,8 @@ func (repo *ObjectRepository) DoesObjectOrTempObjectExist(objectKey string) (boo
//
// Unknown objects (i.e. objectKeys for which there are no entries) are
// considered as deleted.
func (repo *ObjectRepository) GetObjectState(tx *sql.Tx, objectKey string) (ObjectState ente.ObjectState, err error) {
row := tx.QueryRow(`
func (repo *ObjectRepository) GetObjectState(objectKey string) (ObjectState ente.ObjectState, err error) {
row := repo.DB.QueryRow(`
SELECT ok.is_deleted, u.encrypted_email IS NULL AS is_user_deleted, ok.size
FROM object_keys ok
JOIN files f ON ok.file_id = f.file_id

View File

@@ -18,17 +18,14 @@ type ObjectCopiesRepository struct {
}
// GetAndLockUnreplicatedObject gets an object which is not yet replicated to
// all the replicas. It also starts a transaction to keep the row corresponding
// to that object in the database locked.
// all the replicas. It also registers a replication to keep the row corresponding
// to that object to be blocked for 24h before next replication attemp.
//
// Both tx and objectCopies are guaranteed to be nil if error is not nil.
//
// If the returned transaction is not `nil`, it must be either `Rollback`ed or
// `Commit`ed.
func (repo *ObjectCopiesRepository) GetAndLockUnreplicatedObject() (*sql.Tx, *ente.ObjectCopies, error) {
// ObjectCopies is guaranteed to be nil if error is not nil.
func (repo *ObjectCopiesRepository) GetAndLockUnreplicatedObject() (*ente.ObjectCopies, error) {
tx, err := repo.DB.Begin()
if err != nil {
return nil, nil, stacktrace.Propagate(err, "")
return nil, stacktrace.Propagate(err, "")
}
rollback := func() {
@@ -64,15 +61,19 @@ func (repo *ObjectCopiesRepository) GetAndLockUnreplicatedObject() (*sql.Tx, *en
if err != nil && errors.Is(err, sql.ErrNoRows) {
commit()
return nil, nil, err
return nil, err
}
if err != nil {
rollback()
return nil, nil, stacktrace.Propagate(err, "")
return nil, stacktrace.Propagate(err, "")
}
return tx, &r, nil
err = repo.RegisterReplicationAttempt(tx, r.ObjectKey)
if err != nil {
rollback()
return nil, stacktrace.Propagate(err, "failed to register replication attempt")
}
return &r, nil
}
// CreateNewB2Object creates a new entry for objectKey and marks it as having
@@ -139,8 +140,8 @@ func (repo *ObjectCopiesRepository) ResetNeedsScalewayReplication(objectKey stri
// UnmarkFromReplication clears the want_* flags so that this objectKey is
// marked as not requiring further replication.
func (repo *ObjectCopiesRepository) UnmarkFromReplication(tx *sql.Tx, objectKey string) error {
_, err := tx.Exec(`
func (repo *ObjectCopiesRepository) UnmarkFromReplication(objectKey string) error {
_, err := repo.DB.Exec(`
UPDATE object_copies
SET want_b2 = false, want_wasabi = false, want_scw = false
WHERE object_key = $1
@@ -150,24 +151,24 @@ func (repo *ObjectCopiesRepository) UnmarkFromReplication(tx *sql.Tx, objectKey
// MarkObjectReplicatedB2 sets the time when `objectKey` was replicated to
// Wasabi to the current timestamp.
func (repo *ObjectCopiesRepository) MarkObjectReplicatedWasabi(tx *sql.Tx, objectKey string) error {
func (repo *ObjectCopiesRepository) MarkObjectReplicatedWasabi(objectKey string) error {
return repo.markObjectReplicated(`
UPDATE object_copies SET wasabi = now_utc_micro_seconds()
WHERE object_key = $1
`, tx, objectKey)
`, objectKey)
}
// MarkObjectReplicatedScaleway sets the time when `objectKey` was replicated to
// Wasabi to the current timestamp.
func (repo *ObjectCopiesRepository) MarkObjectReplicatedScaleway(tx *sql.Tx, objectKey string) error {
func (repo *ObjectCopiesRepository) MarkObjectReplicatedScaleway(objectKey string) error {
return repo.markObjectReplicated(`
UPDATE object_copies SET scw = now_utc_micro_seconds()
WHERE object_key = $1
`, tx, objectKey)
`, objectKey)
}
func (repo *ObjectCopiesRepository) markObjectReplicated(query string, tx *sql.Tx, objectKey string) error {
result, err := tx.Exec(query, objectKey)
func (repo *ObjectCopiesRepository) markObjectReplicated(query string, objectKey string) error {
result, err := repo.DB.Exec(query, objectKey)
if err != nil {
return stacktrace.Propagate(err, "")
}