From 18442e25fc15a770d22692f2cf224fd579ef7e69 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 8 May 2025 14:53:05 +0530 Subject: [PATCH] Move upload to utility process Attempt to further lighten the load on the main thread --- desktop/src/main/services/ffmpeg-worker.ts | 109 +++++++++++++++++-- desktop/src/main/stream.ts | 117 ++------------------- 2 files changed, 112 insertions(+), 114 deletions(-) diff --git a/desktop/src/main/services/ffmpeg-worker.ts b/desktop/src/main/services/ffmpeg-worker.ts index a6dba0c7a2..5e1084d2ed 100644 --- a/desktop/src/main/services/ffmpeg-worker.ts +++ b/desktop/src/main/services/ffmpeg-worker.ts @@ -7,11 +7,14 @@ import { expose } from "comlink"; import pathToFfmpeg from "ffmpeg-static"; import { randomBytes } from "node:crypto"; +import fs_ from "node:fs"; import fs from "node:fs/promises"; import path, { basename } from "node:path"; +import { Readable } from "node:stream"; import type { FFmpegCommand } from "../../types/ipc"; import log from "../log-worker"; import { messagePortMainEndpoint } from "../utils/comlink"; +import { wait } from "../utils/common"; import { execAsyncWorker } from "../utils/exec-worker"; /* Ditto in the web app's code (used by the Wasm FFmpeg invocation). */ @@ -40,6 +43,7 @@ export interface FFmpegUtilityProcess { ffmpegGenerateHLSPlaylistAndSegments: ( inputFilePath: string, outputPathPrefix: string, + outputUploadURL: string, ) => Promise; } @@ -177,7 +181,6 @@ const ffmpegConvertToMP4 = async ( export interface FFmpegGenerateHLSPlaylistAndSegmentsResult { playlistPath: string; - videoPath: string; dimensions: { width: number; height: number }; videoSize: number; } @@ -216,6 +219,7 @@ export interface FFmpegGenerateHLSPlaylistAndSegmentsResult { const ffmpegGenerateHLSPlaylistAndSegments = async ( inputFilePath: string, outputPathPrefix: string, + outputUploadURL: string, ): Promise => { const { isH264, isBT709, bitrate } = await detectVideoCharacteristics(inputFilePath); @@ -476,23 +480,23 @@ const ffmpegGenerateHLSPlaylistAndSegments = async ( // Find the size of the generated video segments by reading the size of // the generated .ts file. videoSize = await fs.stat(videoPath).then((st) => st.size); + + await uploadVideoSegments(videoPath, videoSize, outputUploadURL); } catch (e) { log.error("HLS generation failed", e); - await Promise.all([ - deletePathIgnoringErrors(playlistPath), - deletePathIgnoringErrors(videoPath), - ]); + await Promise.all([deletePathIgnoringErrors(playlistPath)]); throw e; } finally { await Promise.all([ deletePathIgnoringErrors(keyInfoPath), deletePathIgnoringErrors(keyPath), + deletePathIgnoringErrors(videoPath), // ffmpeg writes a /path/output.ts.tmp, clear it out too. deletePathIgnoringErrors(videoPath + ".tmp"), ]); } - return { playlistPath, videoPath, dimensions, videoSize }; + return { playlistPath, dimensions, videoSize }; }; /** @@ -727,3 +731,96 @@ const pseudoFFProbeVideo = async (inputFilePath: string) => { return stderr; }; + +/** + * Upload the file at the given {@link videoFilePath} to the provided presigned + * {@link objectUploadURL} using a HTTP PUT request. + * + * In case on non-HTTP-4xx errors, retry up to 3 times with exponential backoff. + * + * See: [Note: Upload HLS video segment from node side]. + * + * @param videoFilePath The path to the file on the user's file system to + * upload. + * + * @param videoSize The size in bytes of the file at {@link videoFilePath}. + * + * @param objectUploadURL A pre-signed URL to upload the file. + * + * --- + * + * This is an inlined but bespoke reimplementation of `retryEnsuringHTTPOkOr4xx` + * from `web/packages/base/http.ts` + * + * - We don't have the rest of the scaffolding used by that function, which is + * why it is intially inlined bespoked. + * + * - It handles the specific use case of uploading videos since generating the + * HLS stream is a fairly expensive operation, so a retry to discount + * transient network issues is called for. There are only 2 retries for a + * total of 3 attempts, and the retry gaps are more spaced out. + * + * - Later it was discovered that net.fetch is much slower than node's native + * fetch, so this implementation has further diverged. + * + * - This also moved to a utility process, where we also have a more restricted + * ability to import electron API. + */ +const uploadVideoSegments = async ( + videoFilePath: string, + videoSize: number, + objectUploadURL: string, +) => { + const waitTimeBeforeNextTry = [5000, 20000]; + + while (true) { + let abort = false; + try { + const nodeStream = fs_.createReadStream(videoFilePath); + const webStream = Readable.toWeb(nodeStream); + + // net.fetch is 40-50x slower than the native fetch for this + // particular PUT request. This is easily reproducible - replace + // `fetch` with `net.fetch`, then even on localhost the PUT requests + // start taking a minute or so, while they take second(s) with + // node's native fetch. + const res = await fetch(objectUploadURL, { + method: "PUT", + // net.fetch apparently deduces and inserts a content-length, + // because when we use the node native fetch then we need to + // provide it explicitly. + headers: { "Content-Length": `${videoSize}` }, + // The duplex option is required since we're passing a stream. + // + // @ts-expect-error TypeScript's libdom.d.ts does not include + // the "duplex" parameter, e.g. see + // https://github.com/node-fetch/node-fetch/issues/1769. + duplex: "half", + body: webStream, + }); + + if (res.ok) { + // Success. + return; + } + if (res.status >= 400 && res.status < 500) { + // HTTP 4xx. + abort = true; + } + throw new Error( + `Failed to upload generated HLS video: HTTP ${res.status} ${res.statusText}`, + ); + } catch (e) { + if (abort) { + throw e; + } + const t = waitTimeBeforeNextTry.shift(); + if (!t) { + throw e; + } else { + log.warn("Will retry potentially transient request failure", e); + } + await wait(t); + } + } +}; diff --git a/desktop/src/main/stream.ts b/desktop/src/main/stream.ts index f66946bc7c..85d572870f 100644 --- a/desktop/src/main/stream.ts +++ b/desktop/src/main/stream.ts @@ -3,15 +3,13 @@ */ import { net, protocol } from "electron/main"; import { randomUUID } from "node:crypto"; -import fs_ from "node:fs"; import fs from "node:fs/promises"; -import { Readable, Writable } from "node:stream"; +import { Writable } from "node:stream"; import { pathToFileURL } from "node:url"; import log from "./log"; import { ffmpegUtilityProcess } from "./services/ffmpeg"; import { type FFmpegGenerateHLSPlaylistAndSegmentsResult } from "./services/ffmpeg-worker"; import { markClosableZip, openZip } from "./services/zip"; -import { wait } from "./utils/common"; import { writeStream } from "./utils/stream"; import { deleteTempFile, @@ -326,6 +324,7 @@ const handleGenerateHLSWrite = async ( result = await worker.ffmpegGenerateHLSPlaylistAndSegments( inputFilePath, outputFilePathPrefix, + objectUploadURL, ); if (!result) { @@ -333,115 +332,17 @@ const handleGenerateHLSWrite = async ( return new Response(null, { status: 204 }); } - const { playlistPath, videoPath, videoSize, dimensions } = result; - try { - await uploadVideoSegments(videoPath, videoSize, objectUploadURL); + const { playlistPath, videoSize, dimensions } = result; - const playlistToken = randomUUID(); - pendingVideoResults.set(playlistToken, playlistPath); + const playlistToken = randomUUID(); + pendingVideoResults.set(playlistToken, playlistPath); - return new Response( - JSON.stringify({ playlistToken, dimensions, videoSize }), - { status: 200 }, - ); - } catch (e) { - await deleteTempFileIgnoringErrors(playlistPath); - throw e; - } finally { - await deleteTempFileIgnoringErrors(videoPath); - } + return new Response( + JSON.stringify({ playlistToken, videoSize, dimensions }), + { status: 200 }, + ); } finally { if (isInputFileTemporary) await deleteTempFileIgnoringErrors(inputFilePath); } }; - -/** - * Upload the file at the given {@link videoFilePath} to the provided presigned - * {@link objectUploadURL} using a HTTP PUT request. - * - * In case on non-HTTP-4xx errors, retry up to 3 times with exponential backoff. - * - * See: [Note: Upload HLS video segment from node side]. - * - * @param videoFilePath The path to the file on the user's file system to - * upload. - * - * @param videoSize The size in bytes of the file at {@link videoFilePath}. - * - * @param objectUploadURL A pre-signed URL to upload the file. - * - * --- - * - * This is an inlined but bespoke reimplementation of `retryEnsuringHTTPOkOr4xx` - * from `web/packages/base/http.ts` - * - * - We don't have the rest of the scaffolding used by that function, which is - * why it is intially inlined bespoked. - * - * - It handles the specific use case of uploading videos since generating the - * HLS stream is a fairly expensive operation, so a retry to discount - * transient network issues is called for. There are only 2 retries for a - * total of 3 attempts, and the retry gaps are more spaced out. - * - * - Later it was discovered that net.fetch is much slower than node's native - * fetch, so this implementation has further diverged. - */ -export const uploadVideoSegments = async ( - videoFilePath: string, - videoSize: number, - objectUploadURL: string, -) => { - const waitTimeBeforeNextTry = [5000, 20000]; - - while (true) { - let abort = false; - try { - const nodeStream = fs_.createReadStream(videoFilePath); - const webStream = Readable.toWeb(nodeStream); - - // net.fetch is 40-50x slower than the native fetch for this - // particular PUT request. This is easily reproducible (replace - // `fetch` with `net.fetch`, then even on localhost the PUT requests - // start taking a minute or so; with node's native fetch, it is - // second(s)). - const res = await fetch(objectUploadURL, { - method: "PUT", - // net.fetch apparently deduces and inserts a content-length, - // because when we use the node native fetch then we need to - // provide it explicitly. - headers: { "Content-Length": `${videoSize}` }, - // The duplex option is required since we're passing a stream. - // - // @ts-expect-error TypeScript's libdom.d.ts does not include - // the "duplex" parameter, e.g. see - // https://github.com/node-fetch/node-fetch/issues/1769. - duplex: "half", - body: webStream, - }); - - if (res.ok) { - // Success. - return; - } - if (res.status >= 400 && res.status < 500) { - // HTTP 4xx. - abort = true; - } - throw new Error( - `Failed to upload generated HLS video: HTTP ${res.status} ${res.statusText}`, - ); - } catch (e) { - if (abort) { - throw e; - } - const t = waitTimeBeforeNextTry.shift(); - if (!t) { - throw e; - } else { - log.warn("Will retry potentially transient request failure", e); - } - await wait(t); - } - } -};