Move upload to utility process
Attempt to further lighten the load on the main thread
This commit is contained in:
@@ -7,11 +7,14 @@
|
||||
import { expose } from "comlink";
|
||||
import pathToFfmpeg from "ffmpeg-static";
|
||||
import { randomBytes } from "node:crypto";
|
||||
import fs_ from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import path, { basename } from "node:path";
|
||||
import { Readable } from "node:stream";
|
||||
import type { FFmpegCommand } from "../../types/ipc";
|
||||
import log from "../log-worker";
|
||||
import { messagePortMainEndpoint } from "../utils/comlink";
|
||||
import { wait } from "../utils/common";
|
||||
import { execAsyncWorker } from "../utils/exec-worker";
|
||||
|
||||
/* Ditto in the web app's code (used by the Wasm FFmpeg invocation). */
|
||||
@@ -40,6 +43,7 @@ export interface FFmpegUtilityProcess {
|
||||
ffmpegGenerateHLSPlaylistAndSegments: (
|
||||
inputFilePath: string,
|
||||
outputPathPrefix: string,
|
||||
outputUploadURL: string,
|
||||
) => Promise<FFmpegGenerateHLSPlaylistAndSegmentsResult | undefined>;
|
||||
}
|
||||
|
||||
@@ -177,7 +181,6 @@ const ffmpegConvertToMP4 = async (
|
||||
|
||||
export interface FFmpegGenerateHLSPlaylistAndSegmentsResult {
|
||||
playlistPath: string;
|
||||
videoPath: string;
|
||||
dimensions: { width: number; height: number };
|
||||
videoSize: number;
|
||||
}
|
||||
@@ -216,6 +219,7 @@ export interface FFmpegGenerateHLSPlaylistAndSegmentsResult {
|
||||
const ffmpegGenerateHLSPlaylistAndSegments = async (
|
||||
inputFilePath: string,
|
||||
outputPathPrefix: string,
|
||||
outputUploadURL: string,
|
||||
): Promise<FFmpegGenerateHLSPlaylistAndSegmentsResult | undefined> => {
|
||||
const { isH264, isBT709, bitrate } =
|
||||
await detectVideoCharacteristics(inputFilePath);
|
||||
@@ -476,23 +480,23 @@ const ffmpegGenerateHLSPlaylistAndSegments = async (
|
||||
// Find the size of the generated video segments by reading the size of
|
||||
// the generated .ts file.
|
||||
videoSize = await fs.stat(videoPath).then((st) => st.size);
|
||||
|
||||
await uploadVideoSegments(videoPath, videoSize, outputUploadURL);
|
||||
} catch (e) {
|
||||
log.error("HLS generation failed", e);
|
||||
await Promise.all([
|
||||
deletePathIgnoringErrors(playlistPath),
|
||||
deletePathIgnoringErrors(videoPath),
|
||||
]);
|
||||
await Promise.all([deletePathIgnoringErrors(playlistPath)]);
|
||||
throw e;
|
||||
} finally {
|
||||
await Promise.all([
|
||||
deletePathIgnoringErrors(keyInfoPath),
|
||||
deletePathIgnoringErrors(keyPath),
|
||||
deletePathIgnoringErrors(videoPath),
|
||||
// ffmpeg writes a /path/output.ts.tmp, clear it out too.
|
||||
deletePathIgnoringErrors(videoPath + ".tmp"),
|
||||
]);
|
||||
}
|
||||
|
||||
return { playlistPath, videoPath, dimensions, videoSize };
|
||||
return { playlistPath, dimensions, videoSize };
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -727,3 +731,96 @@ const pseudoFFProbeVideo = async (inputFilePath: string) => {
|
||||
|
||||
return stderr;
|
||||
};
|
||||
|
||||
/**
|
||||
* Upload the file at the given {@link videoFilePath} to the provided presigned
|
||||
* {@link objectUploadURL} using a HTTP PUT request.
|
||||
*
|
||||
* In case on non-HTTP-4xx errors, retry up to 3 times with exponential backoff.
|
||||
*
|
||||
* See: [Note: Upload HLS video segment from node side].
|
||||
*
|
||||
* @param videoFilePath The path to the file on the user's file system to
|
||||
* upload.
|
||||
*
|
||||
* @param videoSize The size in bytes of the file at {@link videoFilePath}.
|
||||
*
|
||||
* @param objectUploadURL A pre-signed URL to upload the file.
|
||||
*
|
||||
* ---
|
||||
*
|
||||
* This is an inlined but bespoke reimplementation of `retryEnsuringHTTPOkOr4xx`
|
||||
* from `web/packages/base/http.ts`
|
||||
*
|
||||
* - We don't have the rest of the scaffolding used by that function, which is
|
||||
* why it is intially inlined bespoked.
|
||||
*
|
||||
* - It handles the specific use case of uploading videos since generating the
|
||||
* HLS stream is a fairly expensive operation, so a retry to discount
|
||||
* transient network issues is called for. There are only 2 retries for a
|
||||
* total of 3 attempts, and the retry gaps are more spaced out.
|
||||
*
|
||||
* - Later it was discovered that net.fetch is much slower than node's native
|
||||
* fetch, so this implementation has further diverged.
|
||||
*
|
||||
* - This also moved to a utility process, where we also have a more restricted
|
||||
* ability to import electron API.
|
||||
*/
|
||||
const uploadVideoSegments = async (
|
||||
videoFilePath: string,
|
||||
videoSize: number,
|
||||
objectUploadURL: string,
|
||||
) => {
|
||||
const waitTimeBeforeNextTry = [5000, 20000];
|
||||
|
||||
while (true) {
|
||||
let abort = false;
|
||||
try {
|
||||
const nodeStream = fs_.createReadStream(videoFilePath);
|
||||
const webStream = Readable.toWeb(nodeStream);
|
||||
|
||||
// net.fetch is 40-50x slower than the native fetch for this
|
||||
// particular PUT request. This is easily reproducible - replace
|
||||
// `fetch` with `net.fetch`, then even on localhost the PUT requests
|
||||
// start taking a minute or so, while they take second(s) with
|
||||
// node's native fetch.
|
||||
const res = await fetch(objectUploadURL, {
|
||||
method: "PUT",
|
||||
// net.fetch apparently deduces and inserts a content-length,
|
||||
// because when we use the node native fetch then we need to
|
||||
// provide it explicitly.
|
||||
headers: { "Content-Length": `${videoSize}` },
|
||||
// The duplex option is required since we're passing a stream.
|
||||
//
|
||||
// @ts-expect-error TypeScript's libdom.d.ts does not include
|
||||
// the "duplex" parameter, e.g. see
|
||||
// https://github.com/node-fetch/node-fetch/issues/1769.
|
||||
duplex: "half",
|
||||
body: webStream,
|
||||
});
|
||||
|
||||
if (res.ok) {
|
||||
// Success.
|
||||
return;
|
||||
}
|
||||
if (res.status >= 400 && res.status < 500) {
|
||||
// HTTP 4xx.
|
||||
abort = true;
|
||||
}
|
||||
throw new Error(
|
||||
`Failed to upload generated HLS video: HTTP ${res.status} ${res.statusText}`,
|
||||
);
|
||||
} catch (e) {
|
||||
if (abort) {
|
||||
throw e;
|
||||
}
|
||||
const t = waitTimeBeforeNextTry.shift();
|
||||
if (!t) {
|
||||
throw e;
|
||||
} else {
|
||||
log.warn("Will retry potentially transient request failure", e);
|
||||
}
|
||||
await wait(t);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3,15 +3,13 @@
|
||||
*/
|
||||
import { net, protocol } from "electron/main";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import fs_ from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import { Readable, Writable } from "node:stream";
|
||||
import { Writable } from "node:stream";
|
||||
import { pathToFileURL } from "node:url";
|
||||
import log from "./log";
|
||||
import { ffmpegUtilityProcess } from "./services/ffmpeg";
|
||||
import { type FFmpegGenerateHLSPlaylistAndSegmentsResult } from "./services/ffmpeg-worker";
|
||||
import { markClosableZip, openZip } from "./services/zip";
|
||||
import { wait } from "./utils/common";
|
||||
import { writeStream } from "./utils/stream";
|
||||
import {
|
||||
deleteTempFile,
|
||||
@@ -326,6 +324,7 @@ const handleGenerateHLSWrite = async (
|
||||
result = await worker.ffmpegGenerateHLSPlaylistAndSegments(
|
||||
inputFilePath,
|
||||
outputFilePathPrefix,
|
||||
objectUploadURL,
|
||||
);
|
||||
|
||||
if (!result) {
|
||||
@@ -333,115 +332,17 @@ const handleGenerateHLSWrite = async (
|
||||
return new Response(null, { status: 204 });
|
||||
}
|
||||
|
||||
const { playlistPath, videoPath, videoSize, dimensions } = result;
|
||||
try {
|
||||
await uploadVideoSegments(videoPath, videoSize, objectUploadURL);
|
||||
const { playlistPath, videoSize, dimensions } = result;
|
||||
|
||||
const playlistToken = randomUUID();
|
||||
pendingVideoResults.set(playlistToken, playlistPath);
|
||||
const playlistToken = randomUUID();
|
||||
pendingVideoResults.set(playlistToken, playlistPath);
|
||||
|
||||
return new Response(
|
||||
JSON.stringify({ playlistToken, dimensions, videoSize }),
|
||||
{ status: 200 },
|
||||
);
|
||||
} catch (e) {
|
||||
await deleteTempFileIgnoringErrors(playlistPath);
|
||||
throw e;
|
||||
} finally {
|
||||
await deleteTempFileIgnoringErrors(videoPath);
|
||||
}
|
||||
return new Response(
|
||||
JSON.stringify({ playlistToken, videoSize, dimensions }),
|
||||
{ status: 200 },
|
||||
);
|
||||
} finally {
|
||||
if (isInputFileTemporary)
|
||||
await deleteTempFileIgnoringErrors(inputFilePath);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Upload the file at the given {@link videoFilePath} to the provided presigned
|
||||
* {@link objectUploadURL} using a HTTP PUT request.
|
||||
*
|
||||
* In case on non-HTTP-4xx errors, retry up to 3 times with exponential backoff.
|
||||
*
|
||||
* See: [Note: Upload HLS video segment from node side].
|
||||
*
|
||||
* @param videoFilePath The path to the file on the user's file system to
|
||||
* upload.
|
||||
*
|
||||
* @param videoSize The size in bytes of the file at {@link videoFilePath}.
|
||||
*
|
||||
* @param objectUploadURL A pre-signed URL to upload the file.
|
||||
*
|
||||
* ---
|
||||
*
|
||||
* This is an inlined but bespoke reimplementation of `retryEnsuringHTTPOkOr4xx`
|
||||
* from `web/packages/base/http.ts`
|
||||
*
|
||||
* - We don't have the rest of the scaffolding used by that function, which is
|
||||
* why it is intially inlined bespoked.
|
||||
*
|
||||
* - It handles the specific use case of uploading videos since generating the
|
||||
* HLS stream is a fairly expensive operation, so a retry to discount
|
||||
* transient network issues is called for. There are only 2 retries for a
|
||||
* total of 3 attempts, and the retry gaps are more spaced out.
|
||||
*
|
||||
* - Later it was discovered that net.fetch is much slower than node's native
|
||||
* fetch, so this implementation has further diverged.
|
||||
*/
|
||||
export const uploadVideoSegments = async (
|
||||
videoFilePath: string,
|
||||
videoSize: number,
|
||||
objectUploadURL: string,
|
||||
) => {
|
||||
const waitTimeBeforeNextTry = [5000, 20000];
|
||||
|
||||
while (true) {
|
||||
let abort = false;
|
||||
try {
|
||||
const nodeStream = fs_.createReadStream(videoFilePath);
|
||||
const webStream = Readable.toWeb(nodeStream);
|
||||
|
||||
// net.fetch is 40-50x slower than the native fetch for this
|
||||
// particular PUT request. This is easily reproducible (replace
|
||||
// `fetch` with `net.fetch`, then even on localhost the PUT requests
|
||||
// start taking a minute or so; with node's native fetch, it is
|
||||
// second(s)).
|
||||
const res = await fetch(objectUploadURL, {
|
||||
method: "PUT",
|
||||
// net.fetch apparently deduces and inserts a content-length,
|
||||
// because when we use the node native fetch then we need to
|
||||
// provide it explicitly.
|
||||
headers: { "Content-Length": `${videoSize}` },
|
||||
// The duplex option is required since we're passing a stream.
|
||||
//
|
||||
// @ts-expect-error TypeScript's libdom.d.ts does not include
|
||||
// the "duplex" parameter, e.g. see
|
||||
// https://github.com/node-fetch/node-fetch/issues/1769.
|
||||
duplex: "half",
|
||||
body: webStream,
|
||||
});
|
||||
|
||||
if (res.ok) {
|
||||
// Success.
|
||||
return;
|
||||
}
|
||||
if (res.status >= 400 && res.status < 500) {
|
||||
// HTTP 4xx.
|
||||
abort = true;
|
||||
}
|
||||
throw new Error(
|
||||
`Failed to upload generated HLS video: HTTP ${res.status} ${res.statusText}`,
|
||||
);
|
||||
} catch (e) {
|
||||
if (abort) {
|
||||
throw e;
|
||||
}
|
||||
const t = waitTimeBeforeNextTry.shift();
|
||||
if (!t) {
|
||||
throw e;
|
||||
} else {
|
||||
log.warn("Will retry potentially transient request failure", e);
|
||||
}
|
||||
await wait(t);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user