diff --git a/server/ente/filedata/filedata.go b/server/ente/filedata/filedata.go index 20ff30aea1..2ddc373f16 100644 --- a/server/ente/filedata/filedata.go +++ b/server/ente/filedata/filedata.go @@ -43,7 +43,7 @@ type Row struct { InflightReplicas []string PendingSync bool IsDeleted bool - LastSyncTime int64 + SyncLockedTill int64 CreatedAt int64 UpdatedAt int64 } diff --git a/server/migrations/89_file_data_table.down.sql b/server/migrations/89_file_data_table.down.sql index 024a1da670..bdf2a8717b 100644 --- a/server/migrations/89_file_data_table.down.sql +++ b/server/migrations/89_file_data_table.down.sql @@ -1,5 +1,6 @@ DROP INDEX IF EXISTS idx_file_data_user_type_deleted; +DROP INDEX IF EXISTS idx_file_data_last_sync_time; DROP TABLE IF EXISTS file_data; diff --git a/server/migrations/89_file_data_table.up.sql b/server/migrations/89_file_data_table.up.sql index 36d66587a6..c0feab6e64 100644 --- a/server/migrations/89_file_data_table.up.sql +++ b/server/migrations/89_file_data_table.up.sql @@ -1,40 +1,45 @@ -ALTER TABLE temp_objects ADD COLUMN IF NOT EXISTS bucket_id s3region; +ALTER TABLE temp_objects +ADD COLUMN IF NOT EXISTS bucket_id s3region; ALTER TYPE OBJECT_TYPE ADD VALUE 'derivedMeta'; ALTER TYPE s3region ADD VALUE 'b5'; --- Create the derived table +-- Create the file_data table CREATE TABLE IF NOT EXISTS file_data ( - file_id BIGINT NOT NULL, - user_id BIGINT NOT NULL, - data_type OBJECT_TYPE NOT NULL, - size BIGINT NOT NULL, - latest_bucket s3region NOT NULL, - replicated_buckets s3region[] NOT NULL DEFAULT '{}', + file_id BIGINT NOT NULL, + user_id BIGINT NOT NULL, + data_type OBJECT_TYPE NOT NULL, + size BIGINT NOT NULL, + latest_bucket s3region NOT NULL, + replicated_buckets s3region[] NOT NULL DEFAULT '{}', -- following field contains list of buckets from where we need to delete the data as the given data_type will not longer be persisted in that dc - delete_from_buckets s3region[] NOT NULL DEFAULT '{}', - inflight_rep_buckets s3region[] NOT NULL DEFAULT '{}', - pending_sync BOOLEAN NOT NULL DEFAULT false, - is_deleted BOOLEAN NOT NULL DEFAULT false, - last_sync_time BIGINT NOT NULL DEFAULT 0, - created_at BIGINT NOT NULL DEFAULT now_utc_micro_seconds(), - updated_at BIGINT NOT NULL DEFAULT now_utc_micro_seconds(), + delete_from_buckets s3region[] NOT NULL DEFAULT '{}', + inflight_rep_buckets s3region[] NOT NULL DEFAULT '{}', + is_deleted BOOLEAN NOT NULL DEFAULT false, + pending_sync BOOLEAN NOT NULL DEFAULT false, + sync_locked_till BIGINT NOT NULL DEFAULT 0, + created_at BIGINT NOT NULL DEFAULT now_utc_micro_seconds(), + updated_at BIGINT NOT NULL DEFAULT now_utc_micro_seconds(), PRIMARY KEY (file_id, data_type) ); -- Add index for user_id and data_type for efficient querying CREATE INDEX idx_file_data_user_type_deleted ON file_data (user_id, data_type, is_deleted) INCLUDE (file_id, size); +CREATE INDEX idx_file_data_pending_sync_locked_till ON file_data (is_deleted, sync_locked_till) where pending_sync = true; CREATE OR REPLACE FUNCTION ensure_no_common_entries() - RETURNS TRIGGER AS $$ + RETURNS TRIGGER AS +$$ DECLARE - all_buckets s3region[]; + all_buckets s3region[]; duplicate_buckets s3region[]; BEGIN -- Combine all bucket IDs into a single array - all_buckets := ARRAY[NEW.latest_bucket] || NEW.replicated_buckets || NEW.delete_from_buckets || NEW.inflight_rep_buckets; + all_buckets := ARRAY [NEW.latest_bucket] || NEW.replicated_buckets || NEW.delete_from_buckets || + NEW.inflight_rep_buckets; -- Find duplicate bucket IDs - SELECT ARRAY_AGG(DISTINCT bucket) INTO duplicate_buckets + SELECT ARRAY_AGG(DISTINCT bucket) + INTO duplicate_buckets FROM unnest(all_buckets) bucket GROUP BY bucket HAVING COUNT(*) > 1; @@ -50,6 +55,8 @@ END; $$ LANGUAGE plpgsql; CREATE TRIGGER check_no_common_entries - BEFORE INSERT OR UPDATE ON file_data - FOR EACH ROW EXECUTE FUNCTION ensure_no_common_entries(); + BEFORE INSERT OR UPDATE + ON file_data + FOR EACH ROW +EXECUTE FUNCTION ensure_no_common_entries(); diff --git a/server/pkg/repo/filedata/repository.go b/server/pkg/repo/filedata/repository.go index 75c409a6c6..21208697bd 100644 --- a/server/pkg/repo/filedata/repository.go +++ b/server/pkg/repo/filedata/repository.go @@ -52,7 +52,7 @@ func (r *Repository) InsertOrUpdate(ctx context.Context, data filedata.Row) erro } func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fileIDs []int64) ([]filedata.Row, error) { - rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at + rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at FROM file_data WHERE data_type = $1 AND file_id = ANY($2)`, string(oType), pq.Array(fileIDs)) if err != nil { @@ -62,7 +62,7 @@ func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fi } func (r *Repository) GetFileData(ctx context.Context, fileIDs int64) ([]filedata.Row, error) { - rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets,inflight_rep_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at + rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets,inflight_rep_buckets, pending_sync, is_deleted, sync_locked_till, created_at, updated_at FROM file_data WHERE file_id = $1`, fileIDs) if err != nil { @@ -96,17 +96,17 @@ func (r *Repository) AddBucket(row filedata.Row, bucketID string, columnName str func (r *Repository) RemoveBucket(row filedata.Row, bucketID string, columnName string) error { query := fmt.Sprintf(` - UPDATE file_data - SET %s = array( - SELECT DISTINCT elem FROM unnest( - array_remove( - file_data.%s, - $1 - ) - ) AS elem - WHERE elem IS NOT NULL - ) - WHERE file_id = $2 AND data_type = $3 and user_id = $4`, columnName, columnName) + UPDATE file_data + SET %s = array( + SELECT DISTINCT elem FROM unnest( + array_remove( + file_data.%s, + $1 + ) + ) AS elem + WHERE elem IS NOT NULL + ) + WHERE file_id = $2 AND data_type = $3 and user_id = $4`, columnName, columnName) result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID) if err != nil { return stacktrace.Propagate(err, "failed to remove bucket from "+columnName) @@ -173,14 +173,13 @@ WHERE file_id = $1 AND data_type = $2 AND latest_bucket = $3 AND user_id = $4 AN return stacktrace.NewError("file data not deleted") } return nil - } func convertRowsToFilesData(rows *sql.Rows) ([]filedata.Row, error) { var filesData []filedata.Row for rows.Next() { var fileData filedata.Row - err := rows.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.LastSyncTime, &fileData.CreatedAt, &fileData.UpdatedAt) + err := rows.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.SyncLockedTill, &fileData.CreatedAt, &fileData.UpdatedAt) if err != nil { return nil, stacktrace.Propagate(err, "") }