From 71d6aed1aa865c84afa5f53fed76ea34c6134ed9 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 10:30:10 +0530 Subject: [PATCH 01/13] web side 1 --- web/packages/base/types/ipc.ts | 85 ++++++++++--------- web/packages/gallery/services/ffmpeg/index.ts | 39 ++++++--- .../gallery/services/upload/thumbnail.ts | 2 +- 3 files changed, 75 insertions(+), 51 deletions(-) diff --git a/web/packages/base/types/ipc.ts b/web/packages/base/types/ipc.ts index 25f5ce2126..b1c5ff57e8 100644 --- a/web/packages/base/types/ipc.ts +++ b/web/packages/base/types/ipc.ts @@ -334,43 +334,7 @@ export interface Electron { maxSize: number, ) => Promise; - /** - * Execute a FFmpeg {@link command} on the given - * {@link dataOrPathOrZipItem}. - * - * This executes the command using a FFmpeg executable we bundle with our - * desktop app. We also have a Wasm FFmpeg implementation that we use when - * running on the web, which has a sibling function with the same - * parameters. See [Note:FFmpeg in Electron]. - * - * @param command An array of strings, each representing one positional - * parameter in the command to execute. Placeholders for the input, output - * and ffmpeg's own path are replaced before executing the command - * (respectively {@link inputPathPlaceholder}, - * {@link outputPathPlaceholder}, {@link ffmpegPathPlaceholder}). - * - * @param dataOrPathOrZipItem The bytes of the input file, or the path to - * the input file on the user's local disk, or the path to a zip file on the - * user's disk and the name of an entry in it. In all three cases, the data - * gets serialized to a temporary file, and then that path gets substituted - * in the FFmpeg {@link command} in lieu of {@link inputPathPlaceholder}. - * - * @param outputFileExtension The extension (without the dot, e.g. "jpeg") - * to use for the output file that we ask FFmpeg to create in - * {@param command}. While this file will eventually get deleted, and we'll - * just return its contents, for some FFmpeg command the extension matters - * (e.g. conversion to a JPEG fails if the extension is arbitrary). - * - * @returns The contents of the output file produced by the ffmpeg command - * (specified as {@link outputPathPlaceholder} in {@link command}). - */ - ffmpegExec: ( - command: FFmpegCommand, - dataOrPathOrZipItem: Uint8Array | string | ZipItem, - outputFileExtension: string, - ) => Promise; - - // - ML + // - Utility process /** * Trigger the creation of a new utility process of the given {@link type}, @@ -385,6 +349,8 @@ export interface Electron { * value of {@link type}. Thus, att the other end of that port will be an * object that conforms to: * + * - {@link ElectronFFmpegWorker} interface, when type is "ffmpeg". + * * - {@link ElectronMLWorker} interface, when type is "ml". * * For more details about the IPC flow, see: [Note: ML IPC]. @@ -596,7 +562,50 @@ export interface Electron { clearPendingUploads: () => Promise; } -export type UtilityProcessType = "ml"; +export type UtilityProcessType = "ffmpeg" | "ml"; + +/** + * The shape of the object exposed by the Node.js utility process listening on + * the other side message port that the web layer obtains by doing + * {@link triggerCreateUtilityProcess} with type "ffmpeg". + */ +export interface ElectronFFmpegWorker { + /** + * Execute a FFmpeg {@link command} on the given + * {@link dataOrPathOrZipItem}. + * + * This executes the command using a FFmpeg executable we bundle with our + * desktop app. We also have a Wasm FFmpeg implementation that we use when + * running on the web, which has a sibling function with the same + * parameters. See [Note:FFmpeg in Electron]. + * + * @param command An array of strings, each representing one positional + * parameter in the command to execute. Placeholders for the input, output + * and ffmpeg's own path are replaced before executing the command + * (respectively {@link inputPathPlaceholder}, + * {@link outputPathPlaceholder}, {@link ffmpegPathPlaceholder}). + * + * @param dataOrPathOrZipItem The bytes of the input file, or the path to + * the input file on the user's local disk, or the path to a zip file on the + * user's disk and the name of an entry in it. In all three cases, the data + * gets serialized to a temporary file, and then that path gets substituted + * in the FFmpeg {@link command} in lieu of {@link inputPathPlaceholder}. + * + * @param outputFileExtension The extension (without the dot, e.g. "jpeg") + * to use for the output file that we ask FFmpeg to create in + * {@param command}. While this file will eventually get deleted, and we'll + * just return its contents, for some FFmpeg command the extension matters + * (e.g. conversion to a JPEG fails if the extension is arbitrary). + * + * @returns The contents of the output file produced by the ffmpeg command + * (specified as {@link outputPathPlaceholder} in {@link command}). + */ + ffmpegExec: ( + command: FFmpegCommand, + dataOrPathOrZipItem: Uint8Array | string | ZipItem, + outputFileExtension: string, + ) => Promise; +} /** * The shape of the object exposed by the Node.js utility process listening on diff --git a/web/packages/gallery/services/ffmpeg/index.ts b/web/packages/gallery/services/ffmpeg/index.ts index 1dd4f7f4b7..d2f6af4756 100644 --- a/web/packages/gallery/services/ffmpeg/index.ts +++ b/web/packages/gallery/services/ffmpeg/index.ts @@ -1,6 +1,6 @@ import { ensureElectron } from "ente-base/electron"; import log from "ente-base/log"; -import type { Electron } from "ente-base/types/ipc"; +import type { Electron, ElectronFFmpegWorker } from "ente-base/types/ipc"; import { toDataOrPathOrZipEntry, type FileSystemUploadItem, @@ -16,6 +16,7 @@ import { type ParsedMetadata, } from "ente-media/file-metadata"; import { settingsSnapshot } from "ente-new/photos/services/settings"; +import { createUtilityProcess } from "../../utils/native-worker"; import { ffmpegPathPlaceholder, inputPathPlaceholder, @@ -23,6 +24,21 @@ import { } from "./constants"; import { ffmpegExecWeb } from "./web"; +let _electronFFmpegWorker: Promise | undefined; + +/** + * Handle to the on-demand lazily created utility process in the Node.js layer + * that exposes an {@link ElectronFFmpegWorker} interface. + */ +const electronFFmpegWorker = () => + (_electronFFmpegWorker ??= createElectronFFmpegWorker()); + +const createElectronFFmpegWorker = () => + createUtilityProcess( + ensureElectron(), + "ffmpeg", + ) as unknown as Promise; + /** * Generate a thumbnail for the given video using a Wasm FFmpeg running in a web * worker. @@ -76,14 +92,15 @@ const _generateVideoThumbnail = async ( * See also {@link generateVideoThumbnailNative}. */ export const generateVideoThumbnailNative = async ( - electron: Electron, fsUploadItem: FileSystemUploadItem, ) => - _generateVideoThumbnail((seekTime: number) => - electron.ffmpegExec( - makeGenThumbnailCommand(seekTime), - toDataOrPathOrZipEntry(fsUploadItem), - "jpeg", + electronFFmpegWorker().then((electronFW) => + _generateVideoThumbnail((seekTime: number) => + electronFW.ffmpegExec( + makeGenThumbnailCommand(seekTime), + toDataOrPathOrZipEntry(fsUploadItem), + "jpeg", + ), ), ); @@ -144,11 +161,9 @@ export const extractVideoMetadata = async ( return parseFFmpegExtractedMetadata( uploadItem instanceof File ? await ffmpegExecWeb(command, uploadItem, "txt") - : await ensureElectron().ffmpegExec( - command, - toDataOrPathOrZipEntry(uploadItem), - "txt", - ), + : await ( + await electronFFmpegWorker() + ).ffmpegExec(command, toDataOrPathOrZipEntry(uploadItem), "txt"), ); }; diff --git a/web/packages/gallery/services/upload/thumbnail.ts b/web/packages/gallery/services/upload/thumbnail.ts index 49c8b78135..6cb9696de4 100644 --- a/web/packages/gallery/services/upload/thumbnail.ts +++ b/web/packages/gallery/services/upload/thumbnail.ts @@ -200,7 +200,7 @@ export const generateThumbnailNative = async ( maxThumbnailDimension, maxThumbnailSize, ) - : ffmpeg.generateVideoThumbnailNative(electron, fsUploadItem); + : ffmpeg.generateVideoThumbnailNative(fsUploadItem); /** * A fallback, black, thumbnail for use in cases where thumbnail generation From adf1379b9e9de0b5e09da15683d0cc3ac0f8588b Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 10:44:43 +0530 Subject: [PATCH 02/13] node 1 --- desktop/src/main/ipc.ts | 12 --- .../services/{ffmpeg.ts => ffmpeg-worker.ts} | 39 ++++++-- desktop/src/main/services/workers.ts | 95 +++++++++++++++++-- desktop/src/main/stream.ts | 14 +-- desktop/src/types/ipc.ts | 2 +- web/packages/gallery/services/ffmpeg/index.ts | 9 +- web/packages/gallery/services/video.ts | 4 +- web/packages/gallery/utils/native-stream.ts | 26 +++-- 8 files changed, 152 insertions(+), 49 deletions(-) rename desktop/src/main/services/{ffmpeg.ts => ffmpeg-worker.ts} (96%) diff --git a/desktop/src/main/ipc.ts b/desktop/src/main/ipc.ts index b74b19ec6c..74f7821dcf 100644 --- a/desktop/src/main/ipc.ts +++ b/desktop/src/main/ipc.ts @@ -13,7 +13,6 @@ import type { BrowserWindow } from "electron"; import { ipcMain } from "electron/main"; import type { CollectionMapping, - FFmpegCommand, FolderWatch, PendingUploads, UtilityProcessType, @@ -32,7 +31,6 @@ import { openLogDirectory, selectDirectory, } from "./services/dir"; -import { ffmpegExec } from "./services/ffmpeg"; import { fsExists, fsFindFiles, @@ -188,16 +186,6 @@ export const attachIPCHandlers = () => { ) => generateImageThumbnail(dataOrPathOrZipItem, maxDimension, maxSize), ); - ipcMain.handle( - "ffmpegExec", - ( - _, - command: FFmpegCommand, - dataOrPathOrZipItem: Uint8Array | string | ZipItem, - outputFileExtension: string, - ) => ffmpegExec(command, dataOrPathOrZipItem, outputFileExtension), - ); - // - Upload ipcMain.handle("listZipItems", (_, zipPath: string) => diff --git a/desktop/src/main/services/ffmpeg.ts b/desktop/src/main/services/ffmpeg-worker.ts similarity index 96% rename from desktop/src/main/services/ffmpeg.ts rename to desktop/src/main/services/ffmpeg-worker.ts index b5587b6ba1..ab9f6910be 100644 --- a/desktop/src/main/services/ffmpeg.ts +++ b/desktop/src/main/services/ffmpeg-worker.ts @@ -1,9 +1,17 @@ +/** + * @file ffmpeg invocations. This code runs in a utility process. + */ + +// See [Note: Using Electron APIs in UtilityProcess] about what we can and +// cannot import. +import { expose } from "comlink"; import pathToFfmpeg from "ffmpeg-static"; import { randomBytes } from "node:crypto"; import fs from "node:fs/promises"; import path, { basename } from "node:path"; import type { FFmpegCommand, ZipItem } from "../../types/ipc"; -import log from "../log"; +import log from "../log-worker"; +import { messagePortMainEndpoint } from "../utils/comlink"; import { execAsync } from "../utils/electron"; import { deleteTempFileIgnoringErrors, @@ -16,6 +24,21 @@ const ffmpegPathPlaceholder = "FFMPEG"; const inputPathPlaceholder = "INPUT"; const outputPathPlaceholder = "OUTPUT"; +log.debugString("Started ffmpeg utility process"); + +process.parentPort.once("message", (e) => { + // Expose an instance of `ElectronFFmpegWorker & ElectronFFmpegWorkerNode` + // on the port we got from our parent. + expose( + { + ffmpegExec, + ffmpegConvertToMP4, + ffmpegGenerateHLSPlaylistAndSegments, + }, + messagePortMainEndpoint(e.ports[0]!), + ); +}); + /** * Run a FFmpeg command * @@ -43,7 +66,7 @@ const outputPathPlaceholder = "OUTPUT"; * * But I'm not sure if our code is supposed to be able to use it, and how. */ -export const ffmpegExec = async ( +const ffmpegExec = async ( command: FFmpegCommand, dataOrPathOrZipItem: Uint8Array | string | ZipItem, outputFileExtension: string, @@ -63,7 +86,9 @@ export const ffmpegExec = async ( resolvedCommand = command; } else { const isHDR = await isHDRVideo(inputFilePath); - log.debug(() => [basename(inputFilePath), { isHDR }]); + log.debugString( + `${basename(inputFilePath)} ${JSON.stringify({ isHDR })}`, + ); resolvedCommand = isHDR ? command.hdr : command.default; } @@ -123,7 +148,7 @@ const ffmpegBinaryPath = () => { * @param outputFilePath The path to a file on the user's local file system where * we should write the converted MP4 video. */ -export const ffmpegConvertToMP4 = async ( +const ffmpegConvertToMP4 = async ( inputFilePath: string, outputFilePath: string, ): Promise => { @@ -179,14 +204,16 @@ export interface FFmpegGenerateHLSPlaylistAndSegmentsResult { * If the video is such that it doesn't require stream generation, then this * function returns `undefined`. */ -export const ffmpegGenerateHLSPlaylistAndSegments = async ( +const ffmpegGenerateHLSPlaylistAndSegments = async ( inputFilePath: string, outputPathPrefix: string, ): Promise => { const { isH264, isBT709, bitrate } = await detectVideoCharacteristics(inputFilePath); - log.debug(() => [basename(inputFilePath), { isH264, isBT709, bitrate }]); + log.debugString( + `${basename(inputFilePath)} ${JSON.stringify({ isH264, isBT709, bitrate })}`, + ); // If the video is smaller than 10 MB, and already H.264 (the codec we are // going to use for the conversion), then a streaming variant is not much diff --git a/desktop/src/main/services/workers.ts b/desktop/src/main/services/workers.ts index cb6923bc62..c9977f4ed1 100644 --- a/desktop/src/main/services/workers.ts +++ b/desktop/src/main/services/workers.ts @@ -12,12 +12,22 @@ import { app, utilityProcess } from "electron/main"; import path from "node:path"; import type { UtilityProcessType } from "../../types/ipc"; import log, { processUtilityProcessLogMessage } from "../log"; +import type { FFmpegGenerateHLSPlaylistAndSegmentsResult } from "./ffmpeg-worker"; /** The active ML utility process, if any. */ -let _child: UtilityProcess | undefined; +let _childML: UtilityProcess | undefined; + +/** The active ffmpeg utility process, if any. */ +let _childFFmpeg: UtilityProcess | undefined; /** - * Create a new ML utility process, terminating the older ones (if any). + * Create a new utility process of the given {@link type}, terminating the older + * ones (if any). + * + * The following note explains the reasoning why utility processes were used for + * the first workload (ML) that was handled this way. Similar reasoning applies + * to subsequent workloads (ffmpeg) that have been offloaded to utility + * processes to avoid stutter in the UI. * * [Note: ML IPC] * @@ -77,18 +87,20 @@ export const triggerCreateUtilityProcess = ( window: BrowserWindow, ) => { switch (type) { - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition case "ml": triggerCreateMLUtilityProcess(window); break; + case "ffmpeg": + triggerCreateFFmpegUtilityProcess(window); + break; } }; export const triggerCreateMLUtilityProcess = (window: BrowserWindow) => { - if (_child) { + if (_childML) { log.debug(() => "Terminating previous ML utility process"); - _child.kill(); - _child = undefined; + _childML.kill(); + _childML = undefined; } const { port1, port2 } = new MessageChannelMain(); @@ -99,9 +111,9 @@ export const triggerCreateMLUtilityProcess = (window: BrowserWindow) => { window.webContents.postMessage("utilityProcessPort/ml", undefined, [port2]); - handleMessagesFromUtilityProcess(child); + handleMessagesFromMLUtilityProcess(child); - _child = child; + _childML = child; }; /** @@ -127,7 +139,7 @@ export const triggerCreateMLUtilityProcess = (window: BrowserWindow) => { * - When we need to communicate from the utility process to the main process, * we use the `parentPort` in the utility process. */ -const handleMessagesFromUtilityProcess = (child: UtilityProcess) => { +const handleMessagesFromMLUtilityProcess = (child: UtilityProcess) => { child.on("message", (m: unknown) => { if (processUtilityProcessLogMessage("[ml-worker]", m)) { return; @@ -135,3 +147,68 @@ const handleMessagesFromUtilityProcess = (child: UtilityProcess) => { log.info("Ignoring unknown message from ML utility process", m); }); }; + +export const triggerCreateFFmpegUtilityProcess = (window: BrowserWindow) => { + if (_childFFmpeg) { + log.debug(() => "Terminating previous ffmpeg utility process"); + _childFFmpeg.kill(); + _childFFmpeg = undefined; + } + + const { port1, port2 } = new MessageChannelMain(); + + const child = utilityProcess.fork(path.join(__dirname, "ffmpeg-worker.js")); + // TODO + const userDataPath = app.getPath("userData"); + child.postMessage({ userDataPath }, [port1]); + + window.webContents.postMessage("utilityProcessPort/ffmpeg", undefined, [ + port2, + ]); + + handleMessagesFromFFmpegUtilityProcess(child); + + _childFFmpeg = child; +}; + +const handleMessagesFromFFmpegUtilityProcess = (child: UtilityProcess) => { + child.on("message", (m: unknown) => { + if (processUtilityProcessLogMessage("[ffmpeg-worker]", m)) { + return; + } + log.info("Ignoring unknown message from ffmpeg utility process", m); + }); +}; + +/** + * The port exposed by _childFFmpeg (i.e., by the utility process running + * `ffmpeg-worker.ts`) provides an interface that conforms to + * {@link ElectronFFmpegWorker} (meant for use by the web layer), and in + * addition provides other "private" function meant for use by (the node layer). + * + * This interface lists the functions exposed for use by the node layer. + */ +export interface ElectronFFmpegWorkerNode { + ffmpegConvertToMP4: ( + inputFilePath: string, + outputFilePath: string, + ) => Promise; + + ffmpegGenerateHLSPlaylistAndSegments: ( + inputFilePath: string, + outputPathPrefix: string, + ) => Promise; +} + +/** + * Return a handle to the already running ffmpeg utility process. + * + * This assumes that the web layer has already initiated the utility process by + * invoking `triggerCreateUtilityProcess("ffmpeg")`, otherwise this function + * will throw. + */ +export const electronFFmpegWorkerNodeIfRunning = () => { + const child = _childFFmpeg; + if (!child) throw new Error("ffmpeg utility process has not been started"); + return child as unknown as ElectronFFmpegWorkerNode; +}; diff --git a/desktop/src/main/stream.ts b/desktop/src/main/stream.ts index 01d591042f..04332b5894 100644 --- a/desktop/src/main/stream.ts +++ b/desktop/src/main/stream.ts @@ -8,11 +8,8 @@ import fs from "node:fs/promises"; import { Readable, Writable } from "node:stream"; import { pathToFileURL } from "node:url"; import log from "./log"; -import { - ffmpegConvertToMP4, - ffmpegGenerateHLSPlaylistAndSegments, - type FFmpegGenerateHLSPlaylistAndSegmentsResult, -} from "./services/ffmpeg"; +import { type FFmpegGenerateHLSPlaylistAndSegmentsResult } from "./services/ffmpeg-worker"; +import { electronFFmpegWorkerNodeIfRunning } from "./services/workers"; import { markClosableZip, openZip } from "./services/zip"; import { wait } from "./utils/common"; import { writeStream } from "./utils/stream"; @@ -238,8 +235,9 @@ const handleConvertToMP4Write = async (request: Request) => { await writeStream(inputTempFilePath, request.body!); const outputTempFilePath = await makeTempFilePath("mp4"); + const worker = electronFFmpegWorkerNodeIfRunning(); try { - await ffmpegConvertToMP4(inputTempFilePath, outputTempFilePath); + await worker.ffmpegConvertToMP4(inputTempFilePath, outputTempFilePath); } catch (e) { log.error("Conversion to MP4 failed", e); await deleteTempFileIgnoringErrors(outputTempFilePath); @@ -311,6 +309,8 @@ const handleGenerateHLSWrite = async ( } } + const worker = electronFFmpegWorkerNodeIfRunning(); + const { path: inputFilePath, isFileTemporary: isInputFileTemporary, @@ -322,7 +322,7 @@ const handleGenerateHLSWrite = async ( try { await writeToTemporaryInputFile(); - result = await ffmpegGenerateHLSPlaylistAndSegments( + result = await worker.ffmpegGenerateHLSPlaylistAndSegments( inputFilePath, outputFilePathPrefix, ); diff --git a/desktop/src/types/ipc.ts b/desktop/src/types/ipc.ts index db1d009f8b..23f57af554 100644 --- a/desktop/src/types/ipc.ts +++ b/desktop/src/types/ipc.ts @@ -5,7 +5,7 @@ * See [Note: types.ts <-> preload.ts <-> ipc.ts] */ -export type UtilityProcessType = "ml"; +export type UtilityProcessType = "ml" | "ffmpeg"; export interface AppUpdate { autoUpdatable: boolean; diff --git a/web/packages/gallery/services/ffmpeg/index.ts b/web/packages/gallery/services/ffmpeg/index.ts index d2f6af4756..8833249555 100644 --- a/web/packages/gallery/services/ffmpeg/index.ts +++ b/web/packages/gallery/services/ffmpeg/index.ts @@ -30,7 +30,7 @@ let _electronFFmpegWorker: Promise | undefined; * Handle to the on-demand lazily created utility process in the Node.js layer * that exposes an {@link ElectronFFmpegWorker} interface. */ -const electronFFmpegWorker = () => +export const getElectronFFmpegWorker = () => (_electronFFmpegWorker ??= createElectronFFmpegWorker()); const createElectronFFmpegWorker = () => @@ -94,7 +94,7 @@ const _generateVideoThumbnail = async ( export const generateVideoThumbnailNative = async ( fsUploadItem: FileSystemUploadItem, ) => - electronFFmpegWorker().then((electronFW) => + getElectronFFmpegWorker().then((electronFW) => _generateVideoThumbnail((seekTime: number) => electronFW.ffmpegExec( makeGenThumbnailCommand(seekTime), @@ -162,7 +162,7 @@ export const extractVideoMetadata = async ( uploadItem instanceof File ? await ffmpegExecWeb(command, uploadItem, "txt") : await ( - await electronFFmpegWorker() + await getElectronFFmpegWorker() ).ffmpegExec(command, toDataOrPathOrZipEntry(uploadItem), "txt"), ); }; @@ -312,7 +312,8 @@ export const convertToMP4 = async (blob: Blob): Promise => { }; const convertToMP4Native = async (electron: Electron, blob: Blob) => { - const token = await initiateConvertToMP4(electron, blob); + const electronFFmpegWorker = await getElectronFFmpegWorker(); + const token = await initiateConvertToMP4(electronFFmpegWorker, blob); const mp4Blob = await readVideoStream(electron, token).then((res) => res.blob(), ); diff --git a/web/packages/gallery/services/video.ts b/web/packages/gallery/services/video.ts index 8dd553aeed..1a12ba63c5 100644 --- a/web/packages/gallery/services/video.ts +++ b/web/packages/gallery/services/video.ts @@ -23,6 +23,7 @@ import { videoStreamDone, } from "../utils/native-stream"; import { downloadManager } from "./download"; +import { getElectronFFmpegWorker } from "./ffmpeg"; import { fetchFileData, fetchFilePreviewData, @@ -553,6 +554,7 @@ const processQueueItem = async ({ timestampedUploadItem, }: VideoProcessingQueueItem) => { const electron = ensureElectron(); + const electronFFmpegWorker = await getElectronFFmpegWorker(); log.debug(() => ["gen-hls", { file, timestampedUploadItem }]); @@ -585,7 +587,7 @@ const processQueueItem = async ({ log.info(`Generate HLS for ${fileLogID(file)} | start`); const res = await initiateGenerateHLS( - electron, + electronFFmpegWorker, sourceVideo!, objectUploadURL, ); diff --git a/web/packages/gallery/utils/native-stream.ts b/web/packages/gallery/utils/native-stream.ts index 43e4523224..f2eff42107 100644 --- a/web/packages/gallery/utils/native-stream.ts +++ b/web/packages/gallery/utils/native-stream.ts @@ -6,7 +6,12 @@ * See: [Note: IPC streams]. */ -import type { Electron, ElectronMLWorker, ZipItem } from "ente-base/types/ipc"; +import type { + Electron, + ElectronFFmpegWorker, + ElectronMLWorker, + ZipItem, +} from "ente-base/types/ipc"; import { z } from "zod"; import type { FileSystemUploadItem } from "../services/upload"; @@ -124,8 +129,9 @@ export const writeStream = async ( * * This is a variant of {@link writeStream} tailored for the conversion to MP4. * - * @param _ An {@link Electron} instance, witness to the fact that we're running - * in the context of the desktop app. It is otherwise not used. + * @param _ An {@link ElectronFFmpegWorker} instance, witness to the fact that + * we're running in the context of the desktop app, and that an ffmpeg utility + * process has been initialized. It is otherwise not used. * * @param video A {@link Blob} containing the video to convert. * @@ -136,7 +142,7 @@ export const writeStream = async ( * See: [Note: Convert to MP4]. */ export const initiateConvertToMP4 = async ( - _: Electron, + _: ElectronFFmpegWorker, video: Blob, ): Promise => { const url = "stream://video?op=convert-to-mp4"; @@ -172,14 +178,16 @@ export type GenerateHLSResult = z.infer; * is similar to {@link initiateConvertToMP4}, but also supports streaming * {@link FileSystemUploadItem}s and {@link ReadableStream}s. * - * @param _ An {@link Electron} instance, witness to the fact that we're running - * in the context of the desktop app. It is otherwise not used. + * @param _ An {@link ElectronFFmpegWorker} instance, witness to the fact that + * we're running in the context of the desktop app, and that an ffmpeg utility + * process has been initialized. It is otherwise not used. * * @param video The video to convert. * * - If we're called during the upload process, then this will be set to the - * {@link FileSystemUploadItem} that was uploaded. This way, we can directly use - * the on-disk file instead of needing to download the original from remote. + * {@link FileSystemUploadItem} that was uploaded. This way, we can directly + * use the on-disk file instead of needing to download the original from + * remote. * * - Otherwise it should be a {@link ReadableStream} of the video contents. * @@ -197,7 +205,7 @@ export type GenerateHLSResult = z.infer; * See: [Note: Preview variant of videos]. */ export const initiateGenerateHLS = async ( - _: Electron, + _: ElectronFFmpegWorker, video: FileSystemUploadItem | ReadableStream, objectUploadURL: string, ): Promise => { From ae925a240e46e618b4c6d55037957f11eab53d78 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 11:58:13 +0530 Subject: [PATCH 03/13] fix --- desktop/src/main/services/ffmpeg-worker.ts | 10 +++++----- desktop/src/main/utils/exec-worker.ts | 23 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) create mode 100644 desktop/src/main/utils/exec-worker.ts diff --git a/desktop/src/main/services/ffmpeg-worker.ts b/desktop/src/main/services/ffmpeg-worker.ts index ab9f6910be..d977d41853 100644 --- a/desktop/src/main/services/ffmpeg-worker.ts +++ b/desktop/src/main/services/ffmpeg-worker.ts @@ -12,7 +12,7 @@ import path, { basename } from "node:path"; import type { FFmpegCommand, ZipItem } from "../../types/ipc"; import log from "../log-worker"; import { messagePortMainEndpoint } from "../utils/comlink"; -import { execAsync } from "../utils/electron"; +import { execAsyncWorker } from "../utils/exec-worker"; import { deleteTempFileIgnoringErrors, makeFileForDataOrStreamOrPathOrZipItem, @@ -98,7 +98,7 @@ const ffmpegExec = async ( outputFilePath, ); - await execAsync(cmd); + await execAsyncWorker(cmd); return await fs.readFile(outputFilePath); } finally { @@ -163,7 +163,7 @@ const ffmpegConvertToMP4 = async ( const cmd = substitutePlaceholders(command, inputFilePath, outputFilePath); - await execAsync(cmd); + await execAsyncWorker(cmd); }; export interface FFmpegGenerateHLSPlaylistAndSegmentsResult { @@ -458,7 +458,7 @@ const ffmpegGenerateHLSPlaylistAndSegments = async ( // Run the ffmpeg command to generate the HLS playlist and segments. // // Note: Depending on the size of the input file, this may take long! - const { stderr: conversionStderr } = await execAsync(command); + const { stderr: conversionStderr } = await execAsyncWorker(command); // Determine the dimensions of the generated video from the stderr // output produced by ffmpeg during the conversion. @@ -699,7 +699,7 @@ const pseudoFFProbeVideo = async (inputFilePath: string) => { const cmd = substitutePlaceholders(command, inputFilePath, /* NA */ ""); - const { stderr } = await execAsync(cmd); + const { stderr } = await execAsyncWorker(cmd); return stderr; }; diff --git a/desktop/src/main/utils/exec-worker.ts b/desktop/src/main/utils/exec-worker.ts new file mode 100644 index 0000000000..02ac116a6d --- /dev/null +++ b/desktop/src/main/utils/exec-worker.ts @@ -0,0 +1,23 @@ +import shellescape from "any-shell-escape"; +import { exec } from "node:child_process"; +import { promisify } from "node:util"; +import log from "../log-worker"; + +/** + * Run a shell command asynchronously (utility process edition). + * + * This is an almost verbatim copy of {@link execAsync} from `electron.ts`, + * except it is meant to be usable from a utility process where only a subset of + * imports are available. See [Note: Using Electron APIs in UtilityProcess]. + */ +export const execAsyncWorker = async (command: string | string[]) => { + const escapedCommand = Array.isArray(command) + ? shellescape(command) + : command; + const startTime = Date.now(); + const result = await execAsync_(escapedCommand); + log.debugString(`${escapedCommand} (${Date.now() - startTime} ms)`); + return result; +}; + +const execAsync_ = promisify(exec); From bd8fc08b7cea966942a4e9eaff4b2a0d54876f1a Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 12:36:55 +0530 Subject: [PATCH 04/13] rework 1 temp files will need to be handled on main process --- desktop/src/main/services/ffmpeg-worker.ts | 30 ++++- desktop/src/main/services/ffmpeg.ts | 14 +++ desktop/src/main/services/workers.ts | 118 +++++++----------- desktop/src/types/ipc.ts | 2 +- web/packages/base/types/ipc.ts | 85 ++++++------- web/packages/gallery/services/ffmpeg/index.ts | 42 ++----- .../gallery/services/upload/thumbnail.ts | 2 +- web/packages/gallery/services/video.ts | 4 +- web/packages/gallery/utils/native-stream.ts | 21 ++-- 9 files changed, 151 insertions(+), 167 deletions(-) create mode 100644 desktop/src/main/services/ffmpeg.ts diff --git a/desktop/src/main/services/ffmpeg-worker.ts b/desktop/src/main/services/ffmpeg-worker.ts index d977d41853..f19e0e9d86 100644 --- a/desktop/src/main/services/ffmpeg-worker.ts +++ b/desktop/src/main/services/ffmpeg-worker.ts @@ -24,17 +24,41 @@ const ffmpegPathPlaceholder = "FFMPEG"; const inputPathPlaceholder = "INPUT"; const outputPathPlaceholder = "OUTPUT"; +/** + * The interface of the object exposed by `ffmpeg-worker.ts` on the message port + * pair that the main process creates to communicate with it. + * + * @see {@link ffmpegUtilityProcessPort}. + */ +export interface FFmpegUtilityProcess { + ffmpegExec: ( + command: FFmpegCommand, + dataOrPathOrZipItem: Uint8Array | string | ZipItem, + outputFileExtension: string, + ) => Promise; + + ffmpegConvertToMP4: ( + inputFilePath: string, + outputFilePath: string, + ) => Promise; + + ffmpegGenerateHLSPlaylistAndSegments: ( + inputFilePath: string, + outputPathPrefix: string, + ) => Promise; +} + log.debugString("Started ffmpeg utility process"); process.parentPort.once("message", (e) => { - // Expose an instance of `ElectronFFmpegWorker & ElectronFFmpegWorkerNode` - // on the port we got from our parent. + // Expose an instance of `FFmpegUtilityProcess` on the port we got from our + // parent. expose( { ffmpegExec, ffmpegConvertToMP4, ffmpegGenerateHLSPlaylistAndSegments, - }, + } satisfies FFmpegUtilityProcess, messagePortMainEndpoint(e.ports[0]!), ); }); diff --git a/desktop/src/main/services/ffmpeg.ts b/desktop/src/main/services/ffmpeg.ts new file mode 100644 index 0000000000..3b66bb2913 --- /dev/null +++ b/desktop/src/main/services/ffmpeg.ts @@ -0,0 +1,14 @@ +/** + * @file A bridge to the ffmpeg utility process. This code runs in the main + * process. + */ + +import type { FFmpegUtilityProcess } from "./ffmpeg-worker"; +import { ffmpegUtilityProcessPort } from "./workers"; + +/** + * Return a handle to the ffmpeg utility process, starting it if needed. + */ +export const ffmpegUtilityProcess = () => { + return ffmpegUtilityProcessPort() as unknown as FFmpegUtilityProcess; +}; diff --git a/desktop/src/main/services/workers.ts b/desktop/src/main/services/workers.ts index c9977f4ed1..d10f5527b5 100644 --- a/desktop/src/main/services/workers.ts +++ b/desktop/src/main/services/workers.ts @@ -8,26 +8,28 @@ import { type BrowserWindow, type UtilityProcess, } from "electron"; -import { app, utilityProcess } from "electron/main"; +import { app, utilityProcess, type MessagePortMain } from "electron/main"; import path from "node:path"; import type { UtilityProcessType } from "../../types/ipc"; import log, { processUtilityProcessLogMessage } from "../log"; -import type { FFmpegGenerateHLSPlaylistAndSegmentsResult } from "./ffmpeg-worker"; /** The active ML utility process, if any. */ -let _childML: UtilityProcess | undefined; +let _utilityProcessML: UtilityProcess | undefined; -/** The active ffmpeg utility process, if any. */ -let _childFFmpeg: UtilityProcess | undefined; +/** + * A {@link MessagePort} that can be used to communicate with + * the active ffmpeg utility process (if any). + */ +let _utilityProcessFFmpegPort: MessagePortMain | undefined; /** * Create a new utility process of the given {@link type}, terminating the older * ones (if any). * - * The following note explains the reasoning why utility processes were used for - * the first workload (ML) that was handled this way. Similar reasoning applies - * to subsequent workloads (ffmpeg) that have been offloaded to utility - * processes to avoid stutter in the UI. + * Currently the only type is "ml". The following note explains the reasoning + * why utility processes were used for the first workload (ML) that was handled + * this way. Similar reasoning applies to subsequent workloads (ffmpeg) that + * have been offloaded to utility processes to avoid stutter in the UI. * * [Note: ML IPC] * @@ -85,22 +87,13 @@ let _childFFmpeg: UtilityProcess | undefined; export const triggerCreateUtilityProcess = ( type: UtilityProcessType, window: BrowserWindow, -) => { - switch (type) { - case "ml": - triggerCreateMLUtilityProcess(window); - break; - case "ffmpeg": - triggerCreateFFmpegUtilityProcess(window); - break; - } -}; +) => triggerCreateMLUtilityProcess(window); export const triggerCreateMLUtilityProcess = (window: BrowserWindow) => { - if (_childML) { + if (_utilityProcessML) { log.debug(() => "Terminating previous ML utility process"); - _childML.kill(); - _childML = undefined; + _utilityProcessML.kill(); + _utilityProcessML = undefined; } const { port1, port2 } = new MessageChannelMain(); @@ -113,7 +106,7 @@ export const triggerCreateMLUtilityProcess = (window: BrowserWindow) => { handleMessagesFromMLUtilityProcess(child); - _childML = child; + _utilityProcessML = child; }; /** @@ -148,27 +141,45 @@ const handleMessagesFromMLUtilityProcess = (child: UtilityProcess) => { }); }; -export const triggerCreateFFmpegUtilityProcess = (window: BrowserWindow) => { - if (_childFFmpeg) { - log.debug(() => "Terminating previous ffmpeg utility process"); - _childFFmpeg.kill(); - _childFFmpeg = undefined; - } +/** + * A port that can be used to communicate with the ffmpeg utility process. If + * there is no ffmpeg utility process, a new one is created on demand. + * + * See [Note: ML IPC] for a general outline of why utility processes are needed + * (tl;dr; to avoid stutter on the UI). + * + * In the case of ffmpeg, the IPC flow is a bit different: the utility process + * is not exposed to the web layer, and is internal to the node layer. The + * reason for this difference is that we need to create temporary files etc, and + * doing it a utility process requires access to the `app` module which are not + * accessible (See: [Note: Using Electron APIs in UtilityProcess]). + * + * There could've been possible reasonable workarounds, but the architecture + * we've adopted of three layers: + * + * Renderer (web) <-> Node.js main <-> Node.js ffmpeg utility process + * + * The temporary file creation etc is handled in the Node.js main process, and + * paths to the files are forwarded to the ffmpeg utility process to act on. + * + * @returns a port that can be used to communiate with the utility process. The + * utility process is expected to expose an object that conforms to the + * {@link ElectronFFmpegWorkerNode} interface on this port. + */ +export const ffmpegUtilityProcessPort = () => { + if (_utilityProcessFFmpegPort) return _utilityProcessFFmpegPort; const { port1, port2 } = new MessageChannelMain(); const child = utilityProcess.fork(path.join(__dirname, "ffmpeg-worker.js")); - // TODO - const userDataPath = app.getPath("userData"); - child.postMessage({ userDataPath }, [port1]); - - window.webContents.postMessage("utilityProcessPort/ffmpeg", undefined, [ - port2, - ]); + // Send a handle to the port + child.postMessage({}, [port1]); handleMessagesFromFFmpegUtilityProcess(child); - _childFFmpeg = child; + _utilityProcessFFmpegPort = port2; + + return _utilityProcessFFmpegPort; }; const handleMessagesFromFFmpegUtilityProcess = (child: UtilityProcess) => { @@ -179,36 +190,3 @@ const handleMessagesFromFFmpegUtilityProcess = (child: UtilityProcess) => { log.info("Ignoring unknown message from ffmpeg utility process", m); }); }; - -/** - * The port exposed by _childFFmpeg (i.e., by the utility process running - * `ffmpeg-worker.ts`) provides an interface that conforms to - * {@link ElectronFFmpegWorker} (meant for use by the web layer), and in - * addition provides other "private" function meant for use by (the node layer). - * - * This interface lists the functions exposed for use by the node layer. - */ -export interface ElectronFFmpegWorkerNode { - ffmpegConvertToMP4: ( - inputFilePath: string, - outputFilePath: string, - ) => Promise; - - ffmpegGenerateHLSPlaylistAndSegments: ( - inputFilePath: string, - outputPathPrefix: string, - ) => Promise; -} - -/** - * Return a handle to the already running ffmpeg utility process. - * - * This assumes that the web layer has already initiated the utility process by - * invoking `triggerCreateUtilityProcess("ffmpeg")`, otherwise this function - * will throw. - */ -export const electronFFmpegWorkerNodeIfRunning = () => { - const child = _childFFmpeg; - if (!child) throw new Error("ffmpeg utility process has not been started"); - return child as unknown as ElectronFFmpegWorkerNode; -}; diff --git a/desktop/src/types/ipc.ts b/desktop/src/types/ipc.ts index 23f57af554..db1d009f8b 100644 --- a/desktop/src/types/ipc.ts +++ b/desktop/src/types/ipc.ts @@ -5,7 +5,7 @@ * See [Note: types.ts <-> preload.ts <-> ipc.ts] */ -export type UtilityProcessType = "ml" | "ffmpeg"; +export type UtilityProcessType = "ml"; export interface AppUpdate { autoUpdatable: boolean; diff --git a/web/packages/base/types/ipc.ts b/web/packages/base/types/ipc.ts index b1c5ff57e8..d0c588bea3 100644 --- a/web/packages/base/types/ipc.ts +++ b/web/packages/base/types/ipc.ts @@ -334,6 +334,44 @@ export interface Electron { maxSize: number, ) => Promise; + // - FFmpeg + + /** + * Execute a FFmpeg {@link command} on the given + * {@link dataOrPathOrZipItem}. + * + * This executes the command using a FFmpeg executable we bundle with our + * desktop app. We also have a Wasm FFmpeg implementation that we use when + * running on the web, which has a sibling function with the same + * parameters. See [Note:FFmpeg in Electron]. + * + * @param command An array of strings, each representing one positional + * parameter in the command to execute. Placeholders for the input, output + * and ffmpeg's own path are replaced before executing the command + * (respectively {@link inputPathPlaceholder}, + * {@link outputPathPlaceholder}, {@link ffmpegPathPlaceholder}). + * + * @param dataOrPathOrZipItem The bytes of the input file, or the path to + * the input file on the user's local disk, or the path to a zip file on the + * user's disk and the name of an entry in it. In all three cases, the data + * gets serialized to a temporary file, and then that path gets substituted + * in the FFmpeg {@link command} in lieu of {@link inputPathPlaceholder}. + * + * @param outputFileExtension The extension (without the dot, e.g. "jpeg") + * to use for the output file that we ask FFmpeg to create in + * {@param command}. While this file will eventually get deleted, and we'll + * just return its contents, for some FFmpeg command the extension matters + * (e.g. conversion to a JPEG fails if the extension is arbitrary). + * + * @returns The contents of the output file produced by the ffmpeg command + * (specified as {@link outputPathPlaceholder} in {@link command}). + */ + ffmpegExec: ( + command: FFmpegCommand, + dataOrPathOrZipItem: Uint8Array | string | ZipItem, + outputFileExtension: string, + ) => Promise; + // - Utility process /** @@ -349,8 +387,6 @@ export interface Electron { * value of {@link type}. Thus, att the other end of that port will be an * object that conforms to: * - * - {@link ElectronFFmpegWorker} interface, when type is "ffmpeg". - * * - {@link ElectronMLWorker} interface, when type is "ml". * * For more details about the IPC flow, see: [Note: ML IPC]. @@ -562,50 +598,7 @@ export interface Electron { clearPendingUploads: () => Promise; } -export type UtilityProcessType = "ffmpeg" | "ml"; - -/** - * The shape of the object exposed by the Node.js utility process listening on - * the other side message port that the web layer obtains by doing - * {@link triggerCreateUtilityProcess} with type "ffmpeg". - */ -export interface ElectronFFmpegWorker { - /** - * Execute a FFmpeg {@link command} on the given - * {@link dataOrPathOrZipItem}. - * - * This executes the command using a FFmpeg executable we bundle with our - * desktop app. We also have a Wasm FFmpeg implementation that we use when - * running on the web, which has a sibling function with the same - * parameters. See [Note:FFmpeg in Electron]. - * - * @param command An array of strings, each representing one positional - * parameter in the command to execute. Placeholders for the input, output - * and ffmpeg's own path are replaced before executing the command - * (respectively {@link inputPathPlaceholder}, - * {@link outputPathPlaceholder}, {@link ffmpegPathPlaceholder}). - * - * @param dataOrPathOrZipItem The bytes of the input file, or the path to - * the input file on the user's local disk, or the path to a zip file on the - * user's disk and the name of an entry in it. In all three cases, the data - * gets serialized to a temporary file, and then that path gets substituted - * in the FFmpeg {@link command} in lieu of {@link inputPathPlaceholder}. - * - * @param outputFileExtension The extension (without the dot, e.g. "jpeg") - * to use for the output file that we ask FFmpeg to create in - * {@param command}. While this file will eventually get deleted, and we'll - * just return its contents, for some FFmpeg command the extension matters - * (e.g. conversion to a JPEG fails if the extension is arbitrary). - * - * @returns The contents of the output file produced by the ffmpeg command - * (specified as {@link outputPathPlaceholder} in {@link command}). - */ - ffmpegExec: ( - command: FFmpegCommand, - dataOrPathOrZipItem: Uint8Array | string | ZipItem, - outputFileExtension: string, - ) => Promise; -} +export type UtilityProcessType = "ml"; /** * The shape of the object exposed by the Node.js utility process listening on diff --git a/web/packages/gallery/services/ffmpeg/index.ts b/web/packages/gallery/services/ffmpeg/index.ts index 8833249555..1dd4f7f4b7 100644 --- a/web/packages/gallery/services/ffmpeg/index.ts +++ b/web/packages/gallery/services/ffmpeg/index.ts @@ -1,6 +1,6 @@ import { ensureElectron } from "ente-base/electron"; import log from "ente-base/log"; -import type { Electron, ElectronFFmpegWorker } from "ente-base/types/ipc"; +import type { Electron } from "ente-base/types/ipc"; import { toDataOrPathOrZipEntry, type FileSystemUploadItem, @@ -16,7 +16,6 @@ import { type ParsedMetadata, } from "ente-media/file-metadata"; import { settingsSnapshot } from "ente-new/photos/services/settings"; -import { createUtilityProcess } from "../../utils/native-worker"; import { ffmpegPathPlaceholder, inputPathPlaceholder, @@ -24,21 +23,6 @@ import { } from "./constants"; import { ffmpegExecWeb } from "./web"; -let _electronFFmpegWorker: Promise | undefined; - -/** - * Handle to the on-demand lazily created utility process in the Node.js layer - * that exposes an {@link ElectronFFmpegWorker} interface. - */ -export const getElectronFFmpegWorker = () => - (_electronFFmpegWorker ??= createElectronFFmpegWorker()); - -const createElectronFFmpegWorker = () => - createUtilityProcess( - ensureElectron(), - "ffmpeg", - ) as unknown as Promise; - /** * Generate a thumbnail for the given video using a Wasm FFmpeg running in a web * worker. @@ -92,15 +76,14 @@ const _generateVideoThumbnail = async ( * See also {@link generateVideoThumbnailNative}. */ export const generateVideoThumbnailNative = async ( + electron: Electron, fsUploadItem: FileSystemUploadItem, ) => - getElectronFFmpegWorker().then((electronFW) => - _generateVideoThumbnail((seekTime: number) => - electronFW.ffmpegExec( - makeGenThumbnailCommand(seekTime), - toDataOrPathOrZipEntry(fsUploadItem), - "jpeg", - ), + _generateVideoThumbnail((seekTime: number) => + electron.ffmpegExec( + makeGenThumbnailCommand(seekTime), + toDataOrPathOrZipEntry(fsUploadItem), + "jpeg", ), ); @@ -161,9 +144,11 @@ export const extractVideoMetadata = async ( return parseFFmpegExtractedMetadata( uploadItem instanceof File ? await ffmpegExecWeb(command, uploadItem, "txt") - : await ( - await getElectronFFmpegWorker() - ).ffmpegExec(command, toDataOrPathOrZipEntry(uploadItem), "txt"), + : await ensureElectron().ffmpegExec( + command, + toDataOrPathOrZipEntry(uploadItem), + "txt", + ), ); }; @@ -312,8 +297,7 @@ export const convertToMP4 = async (blob: Blob): Promise => { }; const convertToMP4Native = async (electron: Electron, blob: Blob) => { - const electronFFmpegWorker = await getElectronFFmpegWorker(); - const token = await initiateConvertToMP4(electronFFmpegWorker, blob); + const token = await initiateConvertToMP4(electron, blob); const mp4Blob = await readVideoStream(electron, token).then((res) => res.blob(), ); diff --git a/web/packages/gallery/services/upload/thumbnail.ts b/web/packages/gallery/services/upload/thumbnail.ts index 6cb9696de4..49c8b78135 100644 --- a/web/packages/gallery/services/upload/thumbnail.ts +++ b/web/packages/gallery/services/upload/thumbnail.ts @@ -200,7 +200,7 @@ export const generateThumbnailNative = async ( maxThumbnailDimension, maxThumbnailSize, ) - : ffmpeg.generateVideoThumbnailNative(fsUploadItem); + : ffmpeg.generateVideoThumbnailNative(electron, fsUploadItem); /** * A fallback, black, thumbnail for use in cases where thumbnail generation diff --git a/web/packages/gallery/services/video.ts b/web/packages/gallery/services/video.ts index 1a12ba63c5..8dd553aeed 100644 --- a/web/packages/gallery/services/video.ts +++ b/web/packages/gallery/services/video.ts @@ -23,7 +23,6 @@ import { videoStreamDone, } from "../utils/native-stream"; import { downloadManager } from "./download"; -import { getElectronFFmpegWorker } from "./ffmpeg"; import { fetchFileData, fetchFilePreviewData, @@ -554,7 +553,6 @@ const processQueueItem = async ({ timestampedUploadItem, }: VideoProcessingQueueItem) => { const electron = ensureElectron(); - const electronFFmpegWorker = await getElectronFFmpegWorker(); log.debug(() => ["gen-hls", { file, timestampedUploadItem }]); @@ -587,7 +585,7 @@ const processQueueItem = async ({ log.info(`Generate HLS for ${fileLogID(file)} | start`); const res = await initiateGenerateHLS( - electronFFmpegWorker, + electron, sourceVideo!, objectUploadURL, ); diff --git a/web/packages/gallery/utils/native-stream.ts b/web/packages/gallery/utils/native-stream.ts index f2eff42107..41d39064ef 100644 --- a/web/packages/gallery/utils/native-stream.ts +++ b/web/packages/gallery/utils/native-stream.ts @@ -6,12 +6,7 @@ * See: [Note: IPC streams]. */ -import type { - Electron, - ElectronFFmpegWorker, - ElectronMLWorker, - ZipItem, -} from "ente-base/types/ipc"; +import type { Electron, ElectronMLWorker, ZipItem } from "ente-base/types/ipc"; import { z } from "zod"; import type { FileSystemUploadItem } from "../services/upload"; @@ -129,9 +124,8 @@ export const writeStream = async ( * * This is a variant of {@link writeStream} tailored for the conversion to MP4. * - * @param _ An {@link ElectronFFmpegWorker} instance, witness to the fact that - * we're running in the context of the desktop app, and that an ffmpeg utility - * process has been initialized. It is otherwise not used. + * @param _ An {@link Electron} instance, witness to the fact that we're running + * in the context of the desktop app. It is otherwise not used. * * @param video A {@link Blob} containing the video to convert. * @@ -142,7 +136,7 @@ export const writeStream = async ( * See: [Note: Convert to MP4]. */ export const initiateConvertToMP4 = async ( - _: ElectronFFmpegWorker, + _: Electron, video: Blob, ): Promise => { const url = "stream://video?op=convert-to-mp4"; @@ -178,9 +172,8 @@ export type GenerateHLSResult = z.infer; * is similar to {@link initiateConvertToMP4}, but also supports streaming * {@link FileSystemUploadItem}s and {@link ReadableStream}s. * - * @param _ An {@link ElectronFFmpegWorker} instance, witness to the fact that - * we're running in the context of the desktop app, and that an ffmpeg utility - * process has been initialized. It is otherwise not used. + * @param _ An {@link Electron} instance, witness to the fact that we're running + * in the context of the desktop app. It is otherwise not used. * * @param video The video to convert. * @@ -205,7 +198,7 @@ export type GenerateHLSResult = z.infer; * See: [Note: Preview variant of videos]. */ export const initiateGenerateHLS = async ( - _: ElectronFFmpegWorker, + _: Electron, video: FileSystemUploadItem | ReadableStream, objectUploadURL: string, ): Promise => { From 3513b51477dab5909ceb48e03df4e54e2f0cab13 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 13:23:01 +0530 Subject: [PATCH 05/13] trampoline --- desktop/src/main/ipc.ts | 12 +++ desktop/src/main/services/ffmpeg-worker.ts | 93 ++++++++++------------ desktop/src/main/services/ffmpeg.ts | 36 +++++++++ desktop/src/main/stream.ts | 6 +- 4 files changed, 94 insertions(+), 53 deletions(-) diff --git a/desktop/src/main/ipc.ts b/desktop/src/main/ipc.ts index 74f7821dcf..b74b19ec6c 100644 --- a/desktop/src/main/ipc.ts +++ b/desktop/src/main/ipc.ts @@ -13,6 +13,7 @@ import type { BrowserWindow } from "electron"; import { ipcMain } from "electron/main"; import type { CollectionMapping, + FFmpegCommand, FolderWatch, PendingUploads, UtilityProcessType, @@ -31,6 +32,7 @@ import { openLogDirectory, selectDirectory, } from "./services/dir"; +import { ffmpegExec } from "./services/ffmpeg"; import { fsExists, fsFindFiles, @@ -186,6 +188,16 @@ export const attachIPCHandlers = () => { ) => generateImageThumbnail(dataOrPathOrZipItem, maxDimension, maxSize), ); + ipcMain.handle( + "ffmpegExec", + ( + _, + command: FFmpegCommand, + dataOrPathOrZipItem: Uint8Array | string | ZipItem, + outputFileExtension: string, + ) => ffmpegExec(command, dataOrPathOrZipItem, outputFileExtension), + ); + // - Upload ipcMain.handle("listZipItems", (_, zipPath: string) => diff --git a/desktop/src/main/services/ffmpeg-worker.ts b/desktop/src/main/services/ffmpeg-worker.ts index f19e0e9d86..e3f99f230d 100644 --- a/desktop/src/main/services/ffmpeg-worker.ts +++ b/desktop/src/main/services/ffmpeg-worker.ts @@ -9,15 +9,10 @@ import pathToFfmpeg from "ffmpeg-static"; import { randomBytes } from "node:crypto"; import fs from "node:fs/promises"; import path, { basename } from "node:path"; -import type { FFmpegCommand, ZipItem } from "../../types/ipc"; +import type { FFmpegCommand } from "../../types/ipc"; import log from "../log-worker"; import { messagePortMainEndpoint } from "../utils/comlink"; import { execAsyncWorker } from "../utils/exec-worker"; -import { - deleteTempFileIgnoringErrors, - makeFileForDataOrStreamOrPathOrZipItem, - makeTempFilePath, -} from "../utils/temp"; /* Ditto in the web app's code (used by the Wasm FFmpeg invocation). */ const ffmpegPathPlaceholder = "FFMPEG"; @@ -33,9 +28,9 @@ const outputPathPlaceholder = "OUTPUT"; export interface FFmpegUtilityProcess { ffmpegExec: ( command: FFmpegCommand, - dataOrPathOrZipItem: Uint8Array | string | ZipItem, - outputFileExtension: string, - ) => Promise; + inputFilePath: string, + outputFilePath: string, + ) => Promise; ffmpegConvertToMP4: ( inputFilePath: string, @@ -92,44 +87,27 @@ process.parentPort.once("message", (e) => { */ const ffmpegExec = async ( command: FFmpegCommand, - dataOrPathOrZipItem: Uint8Array | string | ZipItem, - outputFileExtension: string, -): Promise => { - const { - path: inputFilePath, - isFileTemporary: isInputFileTemporary, - writeToTemporaryFile: writeToTemporaryInputFile, - } = await makeFileForDataOrStreamOrPathOrZipItem(dataOrPathOrZipItem); - - const outputFilePath = await makeTempFilePath(outputFileExtension); - try { - await writeToTemporaryInputFile(); - - let resolvedCommand: string[]; - if (Array.isArray(command)) { - resolvedCommand = command; - } else { - const isHDR = await isHDRVideo(inputFilePath); - log.debugString( - `${basename(inputFilePath)} ${JSON.stringify({ isHDR })}`, - ); - resolvedCommand = isHDR ? command.hdr : command.default; - } - - const cmd = substitutePlaceholders( - resolvedCommand, - inputFilePath, - outputFilePath, + inputFilePath: string, + outputFilePath: string, +): Promise => { + let resolvedCommand: string[]; + if (Array.isArray(command)) { + resolvedCommand = command; + } else { + const isHDR = await isHDRVideo(inputFilePath); + log.debugString( + `${basename(inputFilePath)} ${JSON.stringify({ isHDR })}`, ); - - await execAsyncWorker(cmd); - - return await fs.readFile(outputFilePath); - } finally { - if (isInputFileTemporary) - await deleteTempFileIgnoringErrors(inputFilePath); - await deleteTempFileIgnoringErrors(outputFilePath); + resolvedCommand = isHDR ? command.hdr : command.default; } + + const cmd = substitutePlaceholders( + resolvedCommand, + inputFilePath, + outputFilePath, + ); + + await execAsyncWorker(cmd); }; const substitutePlaceholders = ( @@ -494,22 +472,37 @@ const ffmpegGenerateHLSPlaylistAndSegments = async ( } catch (e) { log.error("HLS generation failed", e); await Promise.all([ - deleteTempFileIgnoringErrors(playlistPath), - deleteTempFileIgnoringErrors(videoPath), + deletePathIgnoringErrors(playlistPath), + deletePathIgnoringErrors(videoPath), ]); throw e; } finally { await Promise.all([ - deleteTempFileIgnoringErrors(keyInfoPath), - deleteTempFileIgnoringErrors(keyPath), + deletePathIgnoringErrors(keyInfoPath), + deletePathIgnoringErrors(keyPath), // ffmpeg writes a /path/output.ts.tmp, clear it out too. - deleteTempFileIgnoringErrors(videoPath + ".tmp"), + deletePathIgnoringErrors(videoPath + ".tmp"), ]); } return { playlistPath, videoPath, dimensions, videoSize }; }; +/** + * A variant of {@link deletePathIgnoringErrors} (which we can't directly use in + * the utility process). It unconditionally removes the item at the provided + * path; in particular, this will not raise any errors if there is no item at + * the given path (as may be expected to happen when we run during catch + * handlers). + */ +const deletePathIgnoringErrors = async (tempFilePath: string) => { + try { + await fs.rm(tempFilePath, { force: true }); + } catch (e) { + log.error(`Could not delete item at path ${tempFilePath}`, e); + } +}; + /** * A regex that matches the first line of the form * diff --git a/desktop/src/main/services/ffmpeg.ts b/desktop/src/main/services/ffmpeg.ts index 3b66bb2913..4567d7b704 100644 --- a/desktop/src/main/services/ffmpeg.ts +++ b/desktop/src/main/services/ffmpeg.ts @@ -3,6 +3,13 @@ * process. */ +import fs from "node:fs/promises"; +import type { FFmpegCommand, ZipItem } from "../../types/ipc"; +import { + deleteTempFileIgnoringErrors, + makeFileForDataOrStreamOrPathOrZipItem, + makeTempFilePath, +} from "../utils/temp"; import type { FFmpegUtilityProcess } from "./ffmpeg-worker"; import { ffmpegUtilityProcessPort } from "./workers"; @@ -12,3 +19,32 @@ import { ffmpegUtilityProcessPort } from "./workers"; export const ffmpegUtilityProcess = () => { return ffmpegUtilityProcessPort() as unknown as FFmpegUtilityProcess; }; + +export const ffmpegExec = async ( + command: FFmpegCommand, + dataOrPathOrZipItem: Uint8Array | string | ZipItem, + outputFileExtension: string, +): Promise => { + const { + path: inputFilePath, + isFileTemporary: isInputFileTemporary, + writeToTemporaryFile: writeToTemporaryInputFile, + } = await makeFileForDataOrStreamOrPathOrZipItem(dataOrPathOrZipItem); + + const outputFilePath = await makeTempFilePath(outputFileExtension); + try { + await writeToTemporaryInputFile(); + + await ffmpegUtilityProcess().ffmpegExec( + command, + inputFilePath, + outputFilePath, + ); + + return await fs.readFile(outputFilePath); + } finally { + if (isInputFileTemporary) + await deleteTempFileIgnoringErrors(inputFilePath); + await deleteTempFileIgnoringErrors(outputFilePath); + } +}; diff --git a/desktop/src/main/stream.ts b/desktop/src/main/stream.ts index 04332b5894..0f0a37bd93 100644 --- a/desktop/src/main/stream.ts +++ b/desktop/src/main/stream.ts @@ -8,8 +8,8 @@ import fs from "node:fs/promises"; import { Readable, Writable } from "node:stream"; import { pathToFileURL } from "node:url"; import log from "./log"; +import { ffmpegUtilityProcess } from "./services/ffmpeg"; import { type FFmpegGenerateHLSPlaylistAndSegmentsResult } from "./services/ffmpeg-worker"; -import { electronFFmpegWorkerNodeIfRunning } from "./services/workers"; import { markClosableZip, openZip } from "./services/zip"; import { wait } from "./utils/common"; import { writeStream } from "./utils/stream"; @@ -235,7 +235,7 @@ const handleConvertToMP4Write = async (request: Request) => { await writeStream(inputTempFilePath, request.body!); const outputTempFilePath = await makeTempFilePath("mp4"); - const worker = electronFFmpegWorkerNodeIfRunning(); + const worker = ffmpegUtilityProcess(); try { await worker.ffmpegConvertToMP4(inputTempFilePath, outputTempFilePath); } catch (e) { @@ -309,7 +309,7 @@ const handleGenerateHLSWrite = async ( } } - const worker = electronFFmpegWorkerNodeIfRunning(); + const worker = ffmpegUtilityProcess(); const { path: inputFilePath, From f47a6f7b423e2f2914bd4486601e77945e79b36c Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 13:43:46 +0530 Subject: [PATCH 06/13] rendezvous --- desktop/src/main/services/ffmpeg-worker.ts | 7 ++++++ desktop/src/main/services/ffmpeg.ts | 10 +++----- desktop/src/main/services/workers.ts | 29 ++++++++++++++-------- desktop/src/main/stream.ts | 5 ++-- 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/desktop/src/main/services/ffmpeg-worker.ts b/desktop/src/main/services/ffmpeg-worker.ts index e3f99f230d..1bf3069add 100644 --- a/desktop/src/main/services/ffmpeg-worker.ts +++ b/desktop/src/main/services/ffmpeg-worker.ts @@ -56,8 +56,15 @@ process.parentPort.once("message", (e) => { } satisfies FFmpegUtilityProcess, messagePortMainEndpoint(e.ports[0]!), ); + mainProcess("ack", undefined); }); +/** + * Send a message to the main process using a barebones RPC protocol. + */ +const mainProcess = (method: string, param: unknown) => + process.parentPort.postMessage({ method, p: param }); + /** * Run a FFmpeg command * diff --git a/desktop/src/main/services/ffmpeg.ts b/desktop/src/main/services/ffmpeg.ts index 4567d7b704..49f13a5a53 100644 --- a/desktop/src/main/services/ffmpeg.ts +++ b/desktop/src/main/services/ffmpeg.ts @@ -17,7 +17,7 @@ import { ffmpegUtilityProcessPort } from "./workers"; * Return a handle to the ffmpeg utility process, starting it if needed. */ export const ffmpegUtilityProcess = () => { - return ffmpegUtilityProcessPort() as unknown as FFmpegUtilityProcess; + return ffmpegUtilityProcessPort() as unknown as Promise; }; export const ffmpegExec = async ( @@ -25,6 +25,8 @@ export const ffmpegExec = async ( dataOrPathOrZipItem: Uint8Array | string | ZipItem, outputFileExtension: string, ): Promise => { + const worker = await ffmpegUtilityProcess(); + const { path: inputFilePath, isFileTemporary: isInputFileTemporary, @@ -35,11 +37,7 @@ export const ffmpegExec = async ( try { await writeToTemporaryInputFile(); - await ffmpegUtilityProcess().ffmpegExec( - command, - inputFilePath, - outputFilePath, - ); + await worker.ffmpegExec(command, inputFilePath, outputFilePath); return await fs.readFile(outputFilePath); } finally { diff --git a/desktop/src/main/services/workers.ts b/desktop/src/main/services/workers.ts index d10f5527b5..6c109ca401 100644 --- a/desktop/src/main/services/workers.ts +++ b/desktop/src/main/services/workers.ts @@ -17,10 +17,10 @@ import log, { processUtilityProcessLogMessage } from "../log"; let _utilityProcessML: UtilityProcess | undefined; /** - * A {@link MessagePort} that can be used to communicate with - * the active ffmpeg utility process (if any). + * A promise to a {@link MessagePort} that can be used to communicate with the + * active ffmpeg utility process (if any). */ -let _utilityProcessFFmpegPort: MessagePortMain | undefined; +let _utilityProcessFFmpegPort: Promise | undefined; /** * Create a new utility process of the given {@link type}, terminating the older @@ -175,18 +175,25 @@ export const ffmpegUtilityProcessPort = () => { // Send a handle to the port child.postMessage({}, [port1]); - handleMessagesFromFFmpegUtilityProcess(child); - - _utilityProcessFFmpegPort = port2; - - return _utilityProcessFFmpegPort; -}; - -const handleMessagesFromFFmpegUtilityProcess = (child: UtilityProcess) => { child.on("message", (m: unknown) => { + if (m && typeof m == "object" && "method" in m) { + switch (m.method) { + case "ack": + resolvePortPromise!(port2); + return; + } + } + if (processUtilityProcessLogMessage("[ffmpeg-worker]", m)) { return; } + log.info("Ignoring unknown message from ffmpeg utility process", m); }); + + let resolvePortPromise: ((port: MessagePortMain) => void) | undefined; + _utilityProcessFFmpegPort = new Promise((r) => (resolvePortPromise = r)); + + // Resolve with the other end of the port we sent to the utility process. + return _utilityProcessFFmpegPort; }; diff --git a/desktop/src/main/stream.ts b/desktop/src/main/stream.ts index 0f0a37bd93..f66946bc7c 100644 --- a/desktop/src/main/stream.ts +++ b/desktop/src/main/stream.ts @@ -231,11 +231,12 @@ export const clearPendingVideoResults = () => pendingVideoResults.clear(); * See also: [Note: IPC streams] */ const handleConvertToMP4Write = async (request: Request) => { + const worker = await ffmpegUtilityProcess(); + const inputTempFilePath = await makeTempFilePath(); await writeStream(inputTempFilePath, request.body!); const outputTempFilePath = await makeTempFilePath("mp4"); - const worker = ffmpegUtilityProcess(); try { await worker.ffmpegConvertToMP4(inputTempFilePath, outputTempFilePath); } catch (e) { @@ -309,7 +310,7 @@ const handleGenerateHLSWrite = async ( } } - const worker = ffmpegUtilityProcess(); + const worker = await ffmpegUtilityProcess(); const { path: inputFilePath, From 2f670e316b78f85b8118f444828ac42321d53a2e Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 13:53:51 +0530 Subject: [PATCH 07/13] conv --- desktop/src/main/services/ffmpeg.ts | 10 +++++--- desktop/src/main/services/workers.ts | 37 +++++++++++++++------------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/desktop/src/main/services/ffmpeg.ts b/desktop/src/main/services/ffmpeg.ts index 49f13a5a53..89218cdf37 100644 --- a/desktop/src/main/services/ffmpeg.ts +++ b/desktop/src/main/services/ffmpeg.ts @@ -3,6 +3,7 @@ * process. */ +import { wrap } from "comlink"; import fs from "node:fs/promises"; import type { FFmpegCommand, ZipItem } from "../../types/ipc"; import { @@ -11,14 +12,15 @@ import { makeTempFilePath, } from "../utils/temp"; import type { FFmpegUtilityProcess } from "./ffmpeg-worker"; -import { ffmpegUtilityProcessPort } from "./workers"; +import { ffmpegUtilityProcessEndpoint } from "./workers"; /** * Return a handle to the ffmpeg utility process, starting it if needed. */ -export const ffmpegUtilityProcess = () => { - return ffmpegUtilityProcessPort() as unknown as Promise; -}; +export const ffmpegUtilityProcess = () => + ffmpegUtilityProcessEndpoint().then((port) => + wrap(port), + ); export const ffmpegExec = async ( command: FFmpegCommand, diff --git a/desktop/src/main/services/workers.ts b/desktop/src/main/services/workers.ts index 6c109ca401..ff8d1736c6 100644 --- a/desktop/src/main/services/workers.ts +++ b/desktop/src/main/services/workers.ts @@ -3,24 +3,26 @@ * utility processes that we create. */ +import type { Endpoint } from "comlink"; import { MessageChannelMain, type BrowserWindow, type UtilityProcess, } from "electron"; -import { app, utilityProcess, type MessagePortMain } from "electron/main"; +import { app, utilityProcess } from "electron/main"; import path from "node:path"; import type { UtilityProcessType } from "../../types/ipc"; import log, { processUtilityProcessLogMessage } from "../log"; +import { messagePortMainEndpoint } from "../utils/comlink"; /** The active ML utility process, if any. */ let _utilityProcessML: UtilityProcess | undefined; /** - * A promise to a {@link MessagePort} that can be used to communicate with the - * active ffmpeg utility process (if any). + * A promise to a comlink {@link Endpoint} that can be used to communicate with + * the active ffmpeg utility process (if any). */ -let _utilityProcessFFmpegPort: Promise | undefined; +let _utilityProcessFFmpegEndpoint: Promise | undefined; /** * Create a new utility process of the given {@link type}, terminating the older @@ -142,8 +144,9 @@ const handleMessagesFromMLUtilityProcess = (child: UtilityProcess) => { }; /** - * A port that can be used to communicate with the ffmpeg utility process. If - * there is no ffmpeg utility process, a new one is created on demand. + * A comlink endpoint that can be used to communicate with the ffmpeg utility + * process. If there is no ffmpeg utility process, a new one is created on + * demand. * * See [Note: ML IPC] for a general outline of why utility processes are needed * (tl;dr; to avoid stutter on the UI). @@ -162,24 +165,24 @@ const handleMessagesFromMLUtilityProcess = (child: UtilityProcess) => { * The temporary file creation etc is handled in the Node.js main process, and * paths to the files are forwarded to the ffmpeg utility process to act on. * - * @returns a port that can be used to communiate with the utility process. The - * utility process is expected to expose an object that conforms to the - * {@link ElectronFFmpegWorkerNode} interface on this port. + * @returns an endpoint that can be used to communicate with the utility + * process. The utility process is expected to expose an object that conforms to + * the {@link ElectronFFmpegWorkerNode} interface on this port. */ -export const ffmpegUtilityProcessPort = () => { - if (_utilityProcessFFmpegPort) return _utilityProcessFFmpegPort; +export const ffmpegUtilityProcessEndpoint = () => { + if (_utilityProcessFFmpegEndpoint) return _utilityProcessFFmpegEndpoint; const { port1, port2 } = new MessageChannelMain(); const child = utilityProcess.fork(path.join(__dirname, "ffmpeg-worker.js")); - // Send a handle to the port + // Send a handle to the port (one end of the message channel). child.postMessage({}, [port1]); child.on("message", (m: unknown) => { if (m && typeof m == "object" && "method" in m) { switch (m.method) { case "ack": - resolvePortPromise!(port2); + resolveEndpoint!(messagePortMainEndpoint(port2)); return; } } @@ -191,9 +194,9 @@ export const ffmpegUtilityProcessPort = () => { log.info("Ignoring unknown message from ffmpeg utility process", m); }); - let resolvePortPromise: ((port: MessagePortMain) => void) | undefined; - _utilityProcessFFmpegPort = new Promise((r) => (resolvePortPromise = r)); + let resolveEndpoint: ((port: Endpoint) => void) | undefined; + _utilityProcessFFmpegEndpoint = new Promise((r) => (resolveEndpoint = r)); - // Resolve with the other end of the port we sent to the utility process. - return _utilityProcessFFmpegPort; + // Resolve with the other end of the message channel. + return _utilityProcessFFmpegEndpoint; }; From c2c7ac8b2399c5db73ae22205281d47f7c95fbb3 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 16:43:45 +0530 Subject: [PATCH 08/13] Tweak --- desktop/src/main/services/ffmpeg-worker.ts | 2 +- desktop/src/main/services/ffmpeg.ts | 5 ++++ desktop/src/main/services/workers.ts | 27 +++++++++++++--------- web/packages/base/types/ipc.ts | 2 -- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/desktop/src/main/services/ffmpeg-worker.ts b/desktop/src/main/services/ffmpeg-worker.ts index 1bf3069add..a6dba0c7a2 100644 --- a/desktop/src/main/services/ffmpeg-worker.ts +++ b/desktop/src/main/services/ffmpeg-worker.ts @@ -23,7 +23,7 @@ const outputPathPlaceholder = "OUTPUT"; * The interface of the object exposed by `ffmpeg-worker.ts` on the message port * pair that the main process creates to communicate with it. * - * @see {@link ffmpegUtilityProcessPort}. + * @see {@link ffmpegUtilityProcessEndpoint}. */ export interface FFmpegUtilityProcess { ffmpegExec: ( diff --git a/desktop/src/main/services/ffmpeg.ts b/desktop/src/main/services/ffmpeg.ts index 89218cdf37..1b0e623faa 100644 --- a/desktop/src/main/services/ffmpeg.ts +++ b/desktop/src/main/services/ffmpeg.ts @@ -22,6 +22,11 @@ export const ffmpegUtilityProcess = () => wrap(port), ); +/** + * Implement the IPC "ffmpegExec" contract, writing the input and output to + * temporary files as needed, and then forward to the {@link ffmpegExec} running + * in the utility process. + */ export const ffmpegExec = async ( command: FFmpegCommand, dataOrPathOrZipItem: Uint8Array | string | ZipItem, diff --git a/desktop/src/main/services/workers.ts b/desktop/src/main/services/workers.ts index ff8d1736c6..e44b72cd3b 100644 --- a/desktop/src/main/services/workers.ts +++ b/desktop/src/main/services/workers.ts @@ -31,7 +31,8 @@ let _utilityProcessFFmpegEndpoint: Promise | undefined; * Currently the only type is "ml". The following note explains the reasoning * why utility processes were used for the first workload (ML) that was handled * this way. Similar reasoning applies to subsequent workloads (ffmpeg) that - * have been offloaded to utility processes to avoid stutter in the UI. + * have been offloaded to utility processes in a slightly different manner to + * avoid stutter in the UI. * * [Note: ML IPC] * @@ -167,22 +168,28 @@ const handleMessagesFromMLUtilityProcess = (child: UtilityProcess) => { * * @returns an endpoint that can be used to communicate with the utility * process. The utility process is expected to expose an object that conforms to - * the {@link ElectronFFmpegWorkerNode} interface on this port. + * the {@link ElectronFFmpegWorkerNode} interface on this endpoint. */ -export const ffmpegUtilityProcessEndpoint = () => { - if (_utilityProcessFFmpegEndpoint) return _utilityProcessFFmpegEndpoint; +export const ffmpegUtilityProcessEndpoint = () => + (_utilityProcessFFmpegEndpoint ??= createFFmpegUtilityProcessEndpoint()); + +const createFFmpegUtilityProcessEndpoint = () => { + // Promise.withResolvers is currently in the node available to us. + let resolve: ((endpoint: Endpoint) => void) | undefined; + const promise = new Promise((r) => (resolve = r)); const { port1, port2 } = new MessageChannelMain(); const child = utilityProcess.fork(path.join(__dirname, "ffmpeg-worker.js")); - // Send a handle to the port (one end of the message channel). + // Send a handle to the port (one end of the message channel) to the utility + // process. The utility process will reply with an "ack" when it get it. child.postMessage({}, [port1]); child.on("message", (m: unknown) => { if (m && typeof m == "object" && "method" in m) { switch (m.method) { case "ack": - resolveEndpoint!(messagePortMainEndpoint(port2)); + resolve!(messagePortMainEndpoint(port2)); return; } } @@ -194,9 +201,7 @@ export const ffmpegUtilityProcessEndpoint = () => { log.info("Ignoring unknown message from ffmpeg utility process", m); }); - let resolveEndpoint: ((port: Endpoint) => void) | undefined; - _utilityProcessFFmpegEndpoint = new Promise((r) => (resolveEndpoint = r)); - - // Resolve with the other end of the message channel. - return _utilityProcessFFmpegEndpoint; + // Resolve with the other end of the message channel (once we get an "ack" + // from the utility process). + return promise; }; diff --git a/web/packages/base/types/ipc.ts b/web/packages/base/types/ipc.ts index d0c588bea3..083cc81037 100644 --- a/web/packages/base/types/ipc.ts +++ b/web/packages/base/types/ipc.ts @@ -334,8 +334,6 @@ export interface Electron { maxSize: number, ) => Promise; - // - FFmpeg - /** * Execute a FFmpeg {@link command} on the given * {@link dataOrPathOrZipItem}. From 6730c0c682b31f75e0a80011237acfe4b08ddb41 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 17:48:44 +0530 Subject: [PATCH 09/13] take 1 - fix sporadic unhandled exception With extra logging, this is it: adapter postMessage { id: '12c1c688f6f7b4-4e5fd44d3b9bc-f746fb2a0beda-1b0c26213b6237', type: 'RELEASE' } undefined [error] Unhandled rejection: TypeError: transferables must be an array of MessagePorts TypeError: transferables must be an array of MessagePorts at MessagePortMain.postMessage (node:electron/js2c/browser_init:2:111057) at Object.postMessage (...desktop/app/main/utils/comlink.js:21:16) at .../desktop/node_modules/comlink/dist/umd/comlink.js:353:16 at new Promise () at requestResponseMessage (../desktop/node_modules/comlink/dist/umd/comlink.js:347:16) at releaseEndpoint (.../desktop/node_modules/comlink/dist/umd/comlink.js:199:16) at proxyFinalizers (.../desktop/node_modules/comlink/dist/umd/comlink.js:211:17) --- desktop/src/main/utils/comlink.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/desktop/src/main/utils/comlink.ts b/desktop/src/main/utils/comlink.ts index d2006e795b..7d3752d013 100644 --- a/desktop/src/main/utils/comlink.ts +++ b/desktop/src/main/utils/comlink.ts @@ -19,7 +19,14 @@ export const messagePortMainEndpoint = (mp: MessagePortMain): Endpoint => { const listeners = new WeakMap(); return { postMessage: (message, transfer) => { - mp.postMessage(message, transfer as unknown as MessagePortMain[]); + if (transfer) { + mp.postMessage( + message, + transfer as unknown as MessagePortMain[], + ); + } else { + mp.postMessage(message); + } }, addEventListener: (_, eh) => { const l: EL = (data) => From d35f898b70f1088438564a50efd0279623be9296 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 7 May 2025 18:10:02 +0530 Subject: [PATCH 10/13] better way of dealing with undefs Electron postMessage wants an empty array and balks at undefined --- desktop/src/main/utils/comlink.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/desktop/src/main/utils/comlink.ts b/desktop/src/main/utils/comlink.ts index 7d3752d013..f0edd758af 100644 --- a/desktop/src/main/utils/comlink.ts +++ b/desktop/src/main/utils/comlink.ts @@ -19,14 +19,7 @@ export const messagePortMainEndpoint = (mp: MessagePortMain): Endpoint => { const listeners = new WeakMap(); return { postMessage: (message, transfer) => { - if (transfer) { - mp.postMessage( - message, - transfer as unknown as MessagePortMain[], - ); - } else { - mp.postMessage(message); - } + mp.postMessage(message, (transfer ?? []) as MessagePortMain[]); }, addEventListener: (_, eh) => { const l: EL = (data) => From 18442e25fc15a770d22692f2cf224fd579ef7e69 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 8 May 2025 14:53:05 +0530 Subject: [PATCH 11/13] Move upload to utility process Attempt to further lighten the load on the main thread --- desktop/src/main/services/ffmpeg-worker.ts | 109 +++++++++++++++++-- desktop/src/main/stream.ts | 117 ++------------------- 2 files changed, 112 insertions(+), 114 deletions(-) diff --git a/desktop/src/main/services/ffmpeg-worker.ts b/desktop/src/main/services/ffmpeg-worker.ts index a6dba0c7a2..5e1084d2ed 100644 --- a/desktop/src/main/services/ffmpeg-worker.ts +++ b/desktop/src/main/services/ffmpeg-worker.ts @@ -7,11 +7,14 @@ import { expose } from "comlink"; import pathToFfmpeg from "ffmpeg-static"; import { randomBytes } from "node:crypto"; +import fs_ from "node:fs"; import fs from "node:fs/promises"; import path, { basename } from "node:path"; +import { Readable } from "node:stream"; import type { FFmpegCommand } from "../../types/ipc"; import log from "../log-worker"; import { messagePortMainEndpoint } from "../utils/comlink"; +import { wait } from "../utils/common"; import { execAsyncWorker } from "../utils/exec-worker"; /* Ditto in the web app's code (used by the Wasm FFmpeg invocation). */ @@ -40,6 +43,7 @@ export interface FFmpegUtilityProcess { ffmpegGenerateHLSPlaylistAndSegments: ( inputFilePath: string, outputPathPrefix: string, + outputUploadURL: string, ) => Promise; } @@ -177,7 +181,6 @@ const ffmpegConvertToMP4 = async ( export interface FFmpegGenerateHLSPlaylistAndSegmentsResult { playlistPath: string; - videoPath: string; dimensions: { width: number; height: number }; videoSize: number; } @@ -216,6 +219,7 @@ export interface FFmpegGenerateHLSPlaylistAndSegmentsResult { const ffmpegGenerateHLSPlaylistAndSegments = async ( inputFilePath: string, outputPathPrefix: string, + outputUploadURL: string, ): Promise => { const { isH264, isBT709, bitrate } = await detectVideoCharacteristics(inputFilePath); @@ -476,23 +480,23 @@ const ffmpegGenerateHLSPlaylistAndSegments = async ( // Find the size of the generated video segments by reading the size of // the generated .ts file. videoSize = await fs.stat(videoPath).then((st) => st.size); + + await uploadVideoSegments(videoPath, videoSize, outputUploadURL); } catch (e) { log.error("HLS generation failed", e); - await Promise.all([ - deletePathIgnoringErrors(playlistPath), - deletePathIgnoringErrors(videoPath), - ]); + await Promise.all([deletePathIgnoringErrors(playlistPath)]); throw e; } finally { await Promise.all([ deletePathIgnoringErrors(keyInfoPath), deletePathIgnoringErrors(keyPath), + deletePathIgnoringErrors(videoPath), // ffmpeg writes a /path/output.ts.tmp, clear it out too. deletePathIgnoringErrors(videoPath + ".tmp"), ]); } - return { playlistPath, videoPath, dimensions, videoSize }; + return { playlistPath, dimensions, videoSize }; }; /** @@ -727,3 +731,96 @@ const pseudoFFProbeVideo = async (inputFilePath: string) => { return stderr; }; + +/** + * Upload the file at the given {@link videoFilePath} to the provided presigned + * {@link objectUploadURL} using a HTTP PUT request. + * + * In case on non-HTTP-4xx errors, retry up to 3 times with exponential backoff. + * + * See: [Note: Upload HLS video segment from node side]. + * + * @param videoFilePath The path to the file on the user's file system to + * upload. + * + * @param videoSize The size in bytes of the file at {@link videoFilePath}. + * + * @param objectUploadURL A pre-signed URL to upload the file. + * + * --- + * + * This is an inlined but bespoke reimplementation of `retryEnsuringHTTPOkOr4xx` + * from `web/packages/base/http.ts` + * + * - We don't have the rest of the scaffolding used by that function, which is + * why it is intially inlined bespoked. + * + * - It handles the specific use case of uploading videos since generating the + * HLS stream is a fairly expensive operation, so a retry to discount + * transient network issues is called for. There are only 2 retries for a + * total of 3 attempts, and the retry gaps are more spaced out. + * + * - Later it was discovered that net.fetch is much slower than node's native + * fetch, so this implementation has further diverged. + * + * - This also moved to a utility process, where we also have a more restricted + * ability to import electron API. + */ +const uploadVideoSegments = async ( + videoFilePath: string, + videoSize: number, + objectUploadURL: string, +) => { + const waitTimeBeforeNextTry = [5000, 20000]; + + while (true) { + let abort = false; + try { + const nodeStream = fs_.createReadStream(videoFilePath); + const webStream = Readable.toWeb(nodeStream); + + // net.fetch is 40-50x slower than the native fetch for this + // particular PUT request. This is easily reproducible - replace + // `fetch` with `net.fetch`, then even on localhost the PUT requests + // start taking a minute or so, while they take second(s) with + // node's native fetch. + const res = await fetch(objectUploadURL, { + method: "PUT", + // net.fetch apparently deduces and inserts a content-length, + // because when we use the node native fetch then we need to + // provide it explicitly. + headers: { "Content-Length": `${videoSize}` }, + // The duplex option is required since we're passing a stream. + // + // @ts-expect-error TypeScript's libdom.d.ts does not include + // the "duplex" parameter, e.g. see + // https://github.com/node-fetch/node-fetch/issues/1769. + duplex: "half", + body: webStream, + }); + + if (res.ok) { + // Success. + return; + } + if (res.status >= 400 && res.status < 500) { + // HTTP 4xx. + abort = true; + } + throw new Error( + `Failed to upload generated HLS video: HTTP ${res.status} ${res.statusText}`, + ); + } catch (e) { + if (abort) { + throw e; + } + const t = waitTimeBeforeNextTry.shift(); + if (!t) { + throw e; + } else { + log.warn("Will retry potentially transient request failure", e); + } + await wait(t); + } + } +}; diff --git a/desktop/src/main/stream.ts b/desktop/src/main/stream.ts index f66946bc7c..85d572870f 100644 --- a/desktop/src/main/stream.ts +++ b/desktop/src/main/stream.ts @@ -3,15 +3,13 @@ */ import { net, protocol } from "electron/main"; import { randomUUID } from "node:crypto"; -import fs_ from "node:fs"; import fs from "node:fs/promises"; -import { Readable, Writable } from "node:stream"; +import { Writable } from "node:stream"; import { pathToFileURL } from "node:url"; import log from "./log"; import { ffmpegUtilityProcess } from "./services/ffmpeg"; import { type FFmpegGenerateHLSPlaylistAndSegmentsResult } from "./services/ffmpeg-worker"; import { markClosableZip, openZip } from "./services/zip"; -import { wait } from "./utils/common"; import { writeStream } from "./utils/stream"; import { deleteTempFile, @@ -326,6 +324,7 @@ const handleGenerateHLSWrite = async ( result = await worker.ffmpegGenerateHLSPlaylistAndSegments( inputFilePath, outputFilePathPrefix, + objectUploadURL, ); if (!result) { @@ -333,115 +332,17 @@ const handleGenerateHLSWrite = async ( return new Response(null, { status: 204 }); } - const { playlistPath, videoPath, videoSize, dimensions } = result; - try { - await uploadVideoSegments(videoPath, videoSize, objectUploadURL); + const { playlistPath, videoSize, dimensions } = result; - const playlistToken = randomUUID(); - pendingVideoResults.set(playlistToken, playlistPath); + const playlistToken = randomUUID(); + pendingVideoResults.set(playlistToken, playlistPath); - return new Response( - JSON.stringify({ playlistToken, dimensions, videoSize }), - { status: 200 }, - ); - } catch (e) { - await deleteTempFileIgnoringErrors(playlistPath); - throw e; - } finally { - await deleteTempFileIgnoringErrors(videoPath); - } + return new Response( + JSON.stringify({ playlistToken, videoSize, dimensions }), + { status: 200 }, + ); } finally { if (isInputFileTemporary) await deleteTempFileIgnoringErrors(inputFilePath); } }; - -/** - * Upload the file at the given {@link videoFilePath} to the provided presigned - * {@link objectUploadURL} using a HTTP PUT request. - * - * In case on non-HTTP-4xx errors, retry up to 3 times with exponential backoff. - * - * See: [Note: Upload HLS video segment from node side]. - * - * @param videoFilePath The path to the file on the user's file system to - * upload. - * - * @param videoSize The size in bytes of the file at {@link videoFilePath}. - * - * @param objectUploadURL A pre-signed URL to upload the file. - * - * --- - * - * This is an inlined but bespoke reimplementation of `retryEnsuringHTTPOkOr4xx` - * from `web/packages/base/http.ts` - * - * - We don't have the rest of the scaffolding used by that function, which is - * why it is intially inlined bespoked. - * - * - It handles the specific use case of uploading videos since generating the - * HLS stream is a fairly expensive operation, so a retry to discount - * transient network issues is called for. There are only 2 retries for a - * total of 3 attempts, and the retry gaps are more spaced out. - * - * - Later it was discovered that net.fetch is much slower than node's native - * fetch, so this implementation has further diverged. - */ -export const uploadVideoSegments = async ( - videoFilePath: string, - videoSize: number, - objectUploadURL: string, -) => { - const waitTimeBeforeNextTry = [5000, 20000]; - - while (true) { - let abort = false; - try { - const nodeStream = fs_.createReadStream(videoFilePath); - const webStream = Readable.toWeb(nodeStream); - - // net.fetch is 40-50x slower than the native fetch for this - // particular PUT request. This is easily reproducible (replace - // `fetch` with `net.fetch`, then even on localhost the PUT requests - // start taking a minute or so; with node's native fetch, it is - // second(s)). - const res = await fetch(objectUploadURL, { - method: "PUT", - // net.fetch apparently deduces and inserts a content-length, - // because when we use the node native fetch then we need to - // provide it explicitly. - headers: { "Content-Length": `${videoSize}` }, - // The duplex option is required since we're passing a stream. - // - // @ts-expect-error TypeScript's libdom.d.ts does not include - // the "duplex" parameter, e.g. see - // https://github.com/node-fetch/node-fetch/issues/1769. - duplex: "half", - body: webStream, - }); - - if (res.ok) { - // Success. - return; - } - if (res.status >= 400 && res.status < 500) { - // HTTP 4xx. - abort = true; - } - throw new Error( - `Failed to upload generated HLS video: HTTP ${res.status} ${res.statusText}`, - ); - } catch (e) { - if (abort) { - throw e; - } - const t = waitTimeBeforeNextTry.shift(); - if (!t) { - throw e; - } else { - log.warn("Will retry potentially transient request failure", e); - } - await wait(t); - } - } -}; From 44199093f0395f89e6fa1a1a0acc0ab617d109ff Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 8 May 2025 15:02:07 +0530 Subject: [PATCH 12/13] Enable --- web/packages/gallery/services/ffmpeg/index.ts | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/web/packages/gallery/services/ffmpeg/index.ts b/web/packages/gallery/services/ffmpeg/index.ts index 1dd4f7f4b7..a182ee1400 100644 --- a/web/packages/gallery/services/ffmpeg/index.ts +++ b/web/packages/gallery/services/ffmpeg/index.ts @@ -15,7 +15,6 @@ import { parseMetadataDate, type ParsedMetadata, } from "ente-media/file-metadata"; -import { settingsSnapshot } from "ente-new/photos/services/settings"; import { ffmpegPathPlaceholder, inputPathPlaceholder, @@ -38,14 +37,7 @@ import { ffmpegExecWeb } from "./web"; */ export const generateVideoThumbnailWeb = async (blob: Blob) => _generateVideoThumbnail((seekTime: number) => - ffmpegExecWeb( - // TODO(HLS): Enable for all - settingsSnapshot().isInternalUser - ? makeGenThumbnailCommand(seekTime) - : _makeGenThumbnailCommand(seekTime, false), - blob, - "jpeg", - ), + ffmpegExecWeb(makeGenThumbnailCommand(seekTime), blob, "jpeg"), ); const _generateVideoThumbnail = async ( From d53281500b9d0d04df12cd578262024431367818 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 8 May 2025 15:05:56 +0530 Subject: [PATCH 13/13] prune --- desktop/src/main/services/ffmpeg-worker.ts | 9 ++------- web/packages/gallery/services/ffmpeg/web.ts | 1 - 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/desktop/src/main/services/ffmpeg-worker.ts b/desktop/src/main/services/ffmpeg-worker.ts index 5e1084d2ed..f6af11358c 100644 --- a/desktop/src/main/services/ffmpeg-worker.ts +++ b/desktop/src/main/services/ffmpeg-worker.ts @@ -9,7 +9,7 @@ import pathToFfmpeg from "ffmpeg-static"; import { randomBytes } from "node:crypto"; import fs_ from "node:fs"; import fs from "node:fs/promises"; -import path, { basename } from "node:path"; +import path from "node:path"; import { Readable } from "node:stream"; import type { FFmpegCommand } from "../../types/ipc"; import log from "../log-worker"; @@ -106,9 +106,6 @@ const ffmpegExec = async ( resolvedCommand = command; } else { const isHDR = await isHDRVideo(inputFilePath); - log.debugString( - `${basename(inputFilePath)} ${JSON.stringify({ isHDR })}`, - ); resolvedCommand = isHDR ? command.hdr : command.default; } @@ -224,9 +221,7 @@ const ffmpegGenerateHLSPlaylistAndSegments = async ( const { isH264, isBT709, bitrate } = await detectVideoCharacteristics(inputFilePath); - log.debugString( - `${basename(inputFilePath)} ${JSON.stringify({ isH264, isBT709, bitrate })}`, - ); + log.debugString(JSON.stringify({ isH264, isBT709, bitrate })); // If the video is smaller than 10 MB, and already H.264 (the codec we are // going to use for the conversion), then a streaming variant is not much diff --git a/web/packages/gallery/services/ffmpeg/web.ts b/web/packages/gallery/services/ffmpeg/web.ts index 4c63762b0a..09152e6e46 100644 --- a/web/packages/gallery/services/ffmpeg/web.ts +++ b/web/packages/gallery/services/ffmpeg/web.ts @@ -94,7 +94,6 @@ const ffmpegExec = async ( resolvedCommand = command; } else { const isHDR = await isHDRVideo(ffmpeg, inputPath); - log.debug(() => `[wasm] input file is ${isHDR ? "" : "not "}HDR`); resolvedCommand = isHDR ? command.hdr : command.default; }