This commit is contained in:
Manav Rathi
2025-05-06 14:27:50 +05:30
parent c087e419d5
commit 9db8324ffd
3 changed files with 117 additions and 42 deletions

View File

@@ -7,9 +7,15 @@ import { type PublicAlbumsCredentials } from "ente-base/http";
import log from "ente-base/log";
import { fileLogID, type EnteFile } from "ente-media/file";
import { FileType } from "ente-media/file-type";
import {
getAllLocalFiles,
uniqueFilesByID,
} from "ente-new/photos/services/files";
import { settingsSnapshot } from "ente-new/photos/services/settings";
import { gunzip, gzip } from "ente-new/photos/utils/gzip";
import { randomSample } from "ente-utils/array";
import { ensurePrecondition } from "ente-utils/ensure";
import { wait } from "ente-utils/promise";
import { z } from "zod";
import {
initiateGenerateHLS,
@@ -43,9 +49,12 @@ interface VideoProcessingQueueItem {
* the current client. If present, this serves as an optimization allowing
* us to directly read the file off the user's file system.
*/
timestampedUploadItem: TimestampedFileSystemUploadItem | undefined;
timestampedUploadItem?: TimestampedFileSystemUploadItem;
}
const idleWaitInitial = 10 * 1000; /* 10 sec */
const idleWaitMax = idleWaitInitial * 2 ** 6; /* 640 sec */
/**
* Internal in-memory state shared by the functions in this module.
*
@@ -71,6 +80,13 @@ class VideoState {
* See: [Note: Exiting idle wait of processing loop].
*/
resolveTick: (() => void) | undefined;
/**
* The time to sleep if nothing is pending.
*
* Goes from {@link idleWaitInitial} to {@link idleWaitMax} in doublings.
* Reset back to {@link idleWaitInitial} in case of any activity.
*/
idleWait = idleWaitInitial;
}
/**
@@ -349,8 +365,8 @@ export const processVideoNewUpload = (
processableUploadItem: ProcessableUploadItem,
) => {
// TODO(HLS):
if (!isVideoProcessingEnabled()) return;
if (!isDesktop) return;
if (!isVideoProcessingEnabled()) return;
if (file.metadata.fileType !== FileType.video) return;
if (processableUploadItem instanceof File) {
// While the types don't guarantee it, we really shouldn't be getting
@@ -436,24 +452,83 @@ export const isVideoProcessingEnabled = () =>
* batches, and the externally triggered processing of live uploads.
*/
const processQueue = async () => {
while (true) {
const item = _state.liveQueue.shift();
if (!item) break;
try {
await processQueueItem(item);
} catch (e) {
log.error("Video processing failed", e);
// Ignore this unprocessable item. Currently this function only runs
// post upload, so this item will later get processed as part of the
// backfill.
//
// TODO(HLS): When processing the backfill itself, we'll need a way
// to mark this item as failed.
if (!(isDesktop && isVideoProcessingEnabled())) {
assertionFailed(); /* we shouldn't have come here */
return;
}
let bq: typeof _state.liveQueue | undefined;
while (isVideoProcessingEnabled()) {
log.debug(() => ["gen-hls-iter", []]);
const item =
_state.liveQueue.shift() ?? (bq ??= await backfillQueue()).shift();
if (item) {
try {
await processQueueItem(item);
} catch (e) {
log.error("Video processing failed", e);
} finally {
// TODO(HLS): This needs to be more granular in case of errors.
await markProcessedVideoFileID(item.file.id);
}
// Reset the idle wait on any activity.
_state.idleWait = idleWaitInitial;
} else {
// Replenish the backfill queue if possible.
bq = await backfillQueue();
if (!bq.length) {
// There are no more items in either the live queue or backlog.
// Go to sleep (for increasingly longer durations, capped at a
// maximum).
const idleWait = _state.idleWait;
_state.idleWait = Math.min(idleWait * 2, idleWaitMax);
// `tick` allows the sleep to be interrupted when there is
// potential activity.
if (!_state.tick) assertionFailed();
const tick = _state.tick!;
log.debug(() => ["gen-hls", { idleWait }]);
await Promise.race([tick, wait(idleWait)]);
}
}
}
_state.queueProcessor = undefined;
};
/**
* Return the next batch of videos that need to be processed.
*
* If there is nothing pending, return an empty array.
*/
const backfillQueue = async (): Promise<VideoProcessingQueueItem[]> => {
const allCollectionFiles = await getAllLocalFiles();
const processedVideoFileIDs = await savedProcessedVideoFileIDs();
const videoFiles = uniqueFilesByID(
allCollectionFiles.filter((f) => f.metadata.fileType == FileType.video),
);
const pendingVideoFiles = videoFiles.filter(
(f) => !processedVideoFileIDs.has(f.id),
);
const batch = randomSample(pendingVideoFiles, 50);
return batch.map((file) => ({ file }));
};
// TODO(HLS): Store this in DB.
const _processedVideoFileIDs: number[] = [];
const savedProcessedVideoFileIDs = async () => {
// TODO(HLS): make async
await wait(0);
return new Set(_processedVideoFileIDs);
};
const markProcessedVideoFileID = async (fileID: number) => {
// TODO(HLS): make async
await wait(0);
_processedVideoFileIDs.push(fileID);
};
/**
* Generate and upload a streamable variant of the given {@link EnteFile}.
*

View File

@@ -1,7 +1,7 @@
import { assertionFailed } from "ente-base/assert";
import log from "ente-base/log";
import type { EnteFile } from "ente-media/file";
import { shuffled } from "ente-utils/array";
import { randomSample } from "ente-utils/array";
import { getLocalFiles } from "../files";
import {
savedCGroups,
@@ -585,32 +585,6 @@ export const _suggestionsAndChoicesForPerson = async (
return { choices, suggestions };
};
/**
* Return a random sample of {@link n} elements from the given {@link items}.
*
* Functionally this is equivalent to `shuffled(items).slice(0, n)`, except it
* tries to be a bit faster for long arrays when we need only a small sample
* from it. In a few tests, this indeed makes a substantial difference.
*/
const randomSample = <T>(items: T[], n: number) => {
if (items.length <= n) return items;
if (n == 0) return [];
if (n > items.length / 3) {
// Avoid using the random sampling without replacement method if a
// significant proportion of the original items are needed, otherwise we
// might run into long retry loop at the tail end (hitting the same
// indexes again an again).
return shuffled(items).slice(0, n);
}
const ix = new Set<number>();
while (ix.size < n) {
ix.add(Math.floor(Math.random() * items.length));
}
return [...ix].map((i) => items[i]!);
};
/**
* A map specifying the changes to make when the user presses the save button on
* the people suggestions dialog.

View File

@@ -14,6 +14,32 @@ export const shuffled = <T>(xs: T[]): T[] =>
.sort()
.map(([, x]) => x) as T[];
/**
* Return a random sample of {@link n} elements from the given {@link items}.
*
* Functionally this is equivalent to `shuffled(items).slice(0, n)`, except it
* tries to be a bit faster for long arrays when we need only a small sample
* from it. In a few tests, this indeed makes a substantial difference.
*/
export const randomSample = <T>(items: T[], n: number) => {
if (items.length <= n) return items;
if (n == 0) return [];
if (n > items.length / 3) {
// Avoid using the random sampling without replacement method if a
// significant proportion of the original items are needed, otherwise we
// might run into long retry loop at the tail end (hitting the same
// indexes again an again).
return shuffled(items).slice(0, n);
}
const ix = new Set<number>();
while (ix.size < n) {
ix.add(Math.floor(Math.random() * items.length));
}
return [...ix].map((i) => items[i]!);
};
/**
* Return the first non-empty string from the given list of strings.
*