[desktop] HLS gen - WIP - Part x/x (#5721)
Stream generation works during live uploads (behind a dev feature flag).
This commit is contained in:
@@ -43,6 +43,11 @@ export default ts.config(
|
||||
//
|
||||
// See: [Note: non-null-assertions have better stack trace]
|
||||
"@typescript-eslint/no-non-null-assertion": "off",
|
||||
// Allow `while(true)` etc.
|
||||
"@typescript-eslint/no-unnecessary-condition": [
|
||||
"error",
|
||||
{ allowConstantLoopConditions: true },
|
||||
],
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
@@ -7,7 +7,7 @@ import log from "../log";
|
||||
import { execAsync } from "../utils/electron";
|
||||
import {
|
||||
deleteTempFileIgnoringErrors,
|
||||
makeFileForDataOrPathOrZipItem,
|
||||
makeFileForDataOrStreamOrPathOrZipItem,
|
||||
makeTempFilePath,
|
||||
} from "../utils/temp";
|
||||
|
||||
@@ -52,7 +52,7 @@ export const ffmpegExec = async (
|
||||
path: inputFilePath,
|
||||
isFileTemporary: isInputFileTemporary,
|
||||
writeToTemporaryFile: writeToTemporaryInputFile,
|
||||
} = await makeFileForDataOrPathOrZipItem(dataOrPathOrZipItem);
|
||||
} = await makeFileForDataOrStreamOrPathOrZipItem(dataOrPathOrZipItem);
|
||||
|
||||
const outputFilePath = await makeTempFilePath(outputFileExtension);
|
||||
try {
|
||||
@@ -136,6 +136,7 @@ export interface FFmpegGenerateHLSPlaylistAndSegmentsResult {
|
||||
playlistPath: string;
|
||||
videoPath: string;
|
||||
dimensions: { width: number; height: number };
|
||||
videoSize: number;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -330,6 +331,7 @@ export const ffmpegGenerateHLSPlaylistAndSegments = async (
|
||||
].flat();
|
||||
|
||||
let dimensions: ReturnType<typeof detectVideoDimensions>;
|
||||
let videoSize: number;
|
||||
|
||||
try {
|
||||
// Write the key and the keyInfo to their desired paths.
|
||||
@@ -346,6 +348,10 @@ export const ffmpegGenerateHLSPlaylistAndSegments = async (
|
||||
// Determine the dimensions of the generated video from the stderr
|
||||
// output produced by ffmpeg during the conversion.
|
||||
dimensions = detectVideoDimensions(conversionStderr);
|
||||
|
||||
// Find the size of the generated video segments by reading the size of
|
||||
// the generated .ts file.
|
||||
videoSize = await fs.stat(videoPath).then((st) => st.size);
|
||||
} catch (e) {
|
||||
log.error("HLS generation failed", e);
|
||||
await Promise.all([
|
||||
@@ -362,7 +368,7 @@ export const ffmpegGenerateHLSPlaylistAndSegments = async (
|
||||
]);
|
||||
}
|
||||
|
||||
return { playlistPath, videoPath, dimensions };
|
||||
return { playlistPath, videoPath, dimensions, videoSize };
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -6,7 +6,7 @@ import { type ZipItem } from "../../types/ipc";
|
||||
import { execAsync, isDev } from "../utils/electron";
|
||||
import {
|
||||
deleteTempFileIgnoringErrors,
|
||||
makeFileForDataOrPathOrZipItem,
|
||||
makeFileForDataOrStreamOrPathOrZipItem,
|
||||
makeTempFilePath,
|
||||
} from "../utils/temp";
|
||||
|
||||
@@ -69,7 +69,7 @@ export const generateImageThumbnail = async (
|
||||
path: inputFilePath,
|
||||
isFileTemporary: isInputFileTemporary,
|
||||
writeToTemporaryFile: writeToTemporaryInputFile,
|
||||
} = await makeFileForDataOrPathOrZipItem(dataOrPathOrZipItem);
|
||||
} = await makeFileForDataOrStreamOrPathOrZipItem(dataOrPathOrZipItem);
|
||||
|
||||
const outputFilePath = await makeTempFilePath("jpeg");
|
||||
|
||||
|
||||
@@ -3,8 +3,9 @@
|
||||
*/
|
||||
import { net, protocol } from "electron/main";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import fs_ from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import { Writable } from "node:stream";
|
||||
import { Readable, Writable } from "node:stream";
|
||||
import { pathToFileURL } from "node:url";
|
||||
import log from "./log";
|
||||
import {
|
||||
@@ -13,10 +14,12 @@ import {
|
||||
type FFmpegGenerateHLSPlaylistAndSegmentsResult,
|
||||
} from "./services/ffmpeg";
|
||||
import { markClosableZip, openZip } from "./services/zip";
|
||||
import { wait } from "./utils/common";
|
||||
import { writeStream } from "./utils/stream";
|
||||
import {
|
||||
deleteTempFile,
|
||||
deleteTempFileIgnoringErrors,
|
||||
makeFileForDataOrStreamOrPathOrZipItem,
|
||||
makeTempFilePath,
|
||||
} from "./utils/temp";
|
||||
|
||||
@@ -78,7 +81,7 @@ const handleStreamRequest = async (request: Request): Promise<Response> => {
|
||||
case "convert-to-mp4":
|
||||
return handleConvertToMP4Write(request);
|
||||
case "generate-hls":
|
||||
return handleGenerateHLSWrite(request);
|
||||
return handleGenerateHLSWrite(request, searchParams);
|
||||
default:
|
||||
return new Response(`Unknown op ${op}`, {
|
||||
status: 404,
|
||||
@@ -246,7 +249,7 @@ const handleConvertToMP4Write = async (request: Request) => {
|
||||
|
||||
const token = randomUUID();
|
||||
pendingVideoResults.set(token, outputTempFilePath);
|
||||
return new Response(JSON.stringify([token]), { status: 200 });
|
||||
return new Response(token, { status: 200 });
|
||||
};
|
||||
|
||||
const handleVideoRead = async (token: string) => {
|
||||
@@ -276,30 +279,137 @@ const handleVideoDone = async (token: string) => {
|
||||
*
|
||||
* The difference here is that we the conversion generates two streams - one for
|
||||
* the HLS playlist itself, and one for the file containing the encrypted and
|
||||
* transcoded video chunks. So instead of returning a single token, we return a
|
||||
* JSON array containing two tokens so that the renderer can read them off
|
||||
* separately.
|
||||
* transcoded video chunks. The video stream we write to the objectUploadURL
|
||||
* (provided via {@link params}), and then we return a JSON object containing
|
||||
* the token for the playlist, and other metadata for use by the renderer.
|
||||
*/
|
||||
const handleGenerateHLSWrite = async (request: Request) => {
|
||||
const inputTempFilePath = await makeTempFilePath();
|
||||
await writeStream(inputTempFilePath, request.body!);
|
||||
const handleGenerateHLSWrite = async (
|
||||
request: Request,
|
||||
params: URLSearchParams,
|
||||
) => {
|
||||
const objectUploadURL = params.get("objectUploadURL");
|
||||
if (!objectUploadURL) throw new Error("Missing objectUploadURL");
|
||||
|
||||
const outputFilePathPrefix = await makeTempFilePath();
|
||||
let paths: FFmpegGenerateHLSPlaylistAndSegmentsResult;
|
||||
try {
|
||||
paths = await ffmpegGenerateHLSPlaylistAndSegments(
|
||||
inputTempFilePath,
|
||||
outputFilePathPrefix,
|
||||
);
|
||||
} finally {
|
||||
await deleteTempFileIgnoringErrors(inputTempFilePath);
|
||||
let inputItem: Parameters<typeof makeFileForDataOrStreamOrPathOrZipItem>[0];
|
||||
const path = params.get("path");
|
||||
if (path) {
|
||||
inputItem = path;
|
||||
} else {
|
||||
const zipPath = params.get("zipPath");
|
||||
const entryName = params.get("entryName");
|
||||
if (zipPath && entryName) {
|
||||
inputItem = [zipPath, entryName];
|
||||
} else {
|
||||
const body = request.body;
|
||||
if (!body) throw new Error("Missing body");
|
||||
inputItem = body;
|
||||
}
|
||||
}
|
||||
|
||||
const playlistToken = randomUUID();
|
||||
const videoToken = randomUUID();
|
||||
pendingVideoResults.set(playlistToken, paths.playlistPath);
|
||||
pendingVideoResults.set(videoToken, paths.videoPath);
|
||||
return new Response(JSON.stringify([playlistToken, videoToken]), {
|
||||
status: 200,
|
||||
});
|
||||
const {
|
||||
path: inputFilePath,
|
||||
isFileTemporary: isInputFileTemporary,
|
||||
writeToTemporaryFile: writeToTemporaryInputFile,
|
||||
} = await makeFileForDataOrStreamOrPathOrZipItem(inputItem);
|
||||
|
||||
const outputFilePathPrefix = await makeTempFilePath();
|
||||
let result: FFmpegGenerateHLSPlaylistAndSegmentsResult;
|
||||
try {
|
||||
await writeToTemporaryInputFile();
|
||||
|
||||
result = await ffmpegGenerateHLSPlaylistAndSegments(
|
||||
inputFilePath,
|
||||
outputFilePathPrefix,
|
||||
);
|
||||
|
||||
const { playlistPath, videoPath } = result;
|
||||
try {
|
||||
await uploadVideoSegments(videoPath, objectUploadURL);
|
||||
|
||||
const playlistToken = randomUUID();
|
||||
pendingVideoResults.set(playlistToken, playlistPath);
|
||||
|
||||
const { dimensions, videoSize } = result;
|
||||
return new Response(
|
||||
JSON.stringify({ playlistToken, dimensions, videoSize }),
|
||||
{ status: 200 },
|
||||
);
|
||||
} catch (e) {
|
||||
await deleteTempFileIgnoringErrors(playlistPath);
|
||||
throw e;
|
||||
} finally {
|
||||
await deleteTempFileIgnoringErrors(videoPath);
|
||||
}
|
||||
} finally {
|
||||
if (isInputFileTemporary)
|
||||
await deleteTempFileIgnoringErrors(inputFilePath);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Upload the file at the given {@link videoFilePath} to the provided presigned
|
||||
* {@link objectUploadURL} using a HTTP PUT request.
|
||||
*
|
||||
* In case on non-HTTP-4xx errors, retry up to 3 times with exponential backoff.
|
||||
*
|
||||
* See: [Note: Upload HLS video segment from node side].
|
||||
*
|
||||
* ---
|
||||
*
|
||||
* This is an inlined but bespoke reimplementation of `retryEnsuringHTTPOkOr4xx`
|
||||
* from `web/packages/base/http.ts` (we don't have the rest of the scaffolding
|
||||
* used by that function, which is why it is inlined bespoked).
|
||||
*
|
||||
* It handles the specific use case of uploading videos since generating the HLS
|
||||
* stream is a fairly expensive operation, so a retry to discount transient
|
||||
* network issues is called for. There are only 2 retries for a total of 3
|
||||
* attempts, and the retry gaps are more spaced out.
|
||||
*/
|
||||
export const uploadVideoSegments = async (
|
||||
videoFilePath: string,
|
||||
objectUploadURL: string,
|
||||
) => {
|
||||
const waitTimeBeforeNextTry = [5000, 20000];
|
||||
|
||||
while (true) {
|
||||
let abort = false;
|
||||
try {
|
||||
const nodeStream = fs_.createReadStream(videoFilePath);
|
||||
const webStream = Readable.toWeb(nodeStream);
|
||||
|
||||
const res = await net.fetch(objectUploadURL, {
|
||||
method: "PUT",
|
||||
// The duplex option is required since we're passing a stream.
|
||||
//
|
||||
// @ts-expect-error TypeScript's libdom.d.ts does not include
|
||||
// the "duplex" parameter, e.g. see
|
||||
// https://github.com/node-fetch/node-fetch/issues/1769.
|
||||
duplex: "half",
|
||||
body: webStream,
|
||||
});
|
||||
|
||||
if (res.ok) {
|
||||
// Success.
|
||||
return;
|
||||
}
|
||||
if (res.status >= 400 && res.status < 500) {
|
||||
// HTTP 4xx.
|
||||
abort = true;
|
||||
}
|
||||
throw new Error(
|
||||
`Failed to upload generated HLS video: HTTP ${res.status} ${res.statusText}`,
|
||||
);
|
||||
} catch (e) {
|
||||
if (abort) {
|
||||
throw e;
|
||||
}
|
||||
const t = waitTimeBeforeNextTry.shift();
|
||||
if (!t) {
|
||||
throw e;
|
||||
} else {
|
||||
log.warn("Will retry potentially transient request failure", e);
|
||||
}
|
||||
await wait(t);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
*
|
||||
* This function is a promisified `setTimeout`. It returns a promise that
|
||||
* resolves after {@link ms} milliseconds.
|
||||
*
|
||||
* Duplicated from `web/packages/utils/promise.ts`.
|
||||
*/
|
||||
export const wait = (ms: number) =>
|
||||
new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
@@ -5,6 +5,7 @@ import path from "node:path";
|
||||
import type { ZipItem } from "../../types/ipc";
|
||||
import log from "../log";
|
||||
import { markClosableZip, openZip } from "../services/zip";
|
||||
import { writeStream } from "./stream";
|
||||
|
||||
/**
|
||||
* Our very own directory within the system temp directory. Go crazy, but
|
||||
@@ -79,7 +80,7 @@ export const deleteTempFileIgnoringErrors = async (tempFilePath: string) => {
|
||||
}
|
||||
};
|
||||
|
||||
/** The result of {@link makeFileForDataOrPathOrZipItem}. */
|
||||
/** The result of {@link makeFileForDataOrStreamOrPathOrZipItem}. */
|
||||
interface FileForDataOrPathOrZipItem {
|
||||
/**
|
||||
* The path to the file (possibly temporary).
|
||||
@@ -104,14 +105,14 @@ interface FileForDataOrPathOrZipItem {
|
||||
/**
|
||||
* Return the path to a file, a boolean indicating if this is a temporary path
|
||||
* that needs to be deleted after processing, and a function to write the given
|
||||
* {@link dataOrPathOrZipItem} into that temporary file if needed.
|
||||
* {@link item} into that temporary file if needed.
|
||||
*
|
||||
* @param dataOrPathOrZipItem The contents of the file, or the path to an
|
||||
* existing file, or a (path to a zip file, name of an entry within that zip
|
||||
* file) tuple.
|
||||
* @param item The contents of the file (bytes), or a {@link ReadableStream}
|
||||
* with the contents of the file, or the path to an existing file, or a (path to
|
||||
* a zip file, name of an entry within that zip file) tuple.
|
||||
*/
|
||||
export const makeFileForDataOrPathOrZipItem = async (
|
||||
dataOrPathOrZipItem: Uint8Array | string | ZipItem,
|
||||
export const makeFileForDataOrStreamOrPathOrZipItem = async (
|
||||
item: Uint8Array | ReadableStream | string | ZipItem,
|
||||
): Promise<FileForDataOrPathOrZipItem> => {
|
||||
let path: string;
|
||||
let isFileTemporary: boolean;
|
||||
@@ -119,18 +120,19 @@ export const makeFileForDataOrPathOrZipItem = async (
|
||||
/* no-op */
|
||||
};
|
||||
|
||||
if (typeof dataOrPathOrZipItem == "string") {
|
||||
path = dataOrPathOrZipItem;
|
||||
if (typeof item == "string") {
|
||||
path = item;
|
||||
isFileTemporary = false;
|
||||
} else {
|
||||
path = await makeTempFilePath();
|
||||
isFileTemporary = true;
|
||||
if (dataOrPathOrZipItem instanceof Uint8Array) {
|
||||
writeToTemporaryFile = () =>
|
||||
fs.writeFile(path, dataOrPathOrZipItem);
|
||||
if (item instanceof Uint8Array) {
|
||||
writeToTemporaryFile = () => fs.writeFile(path, item);
|
||||
} else if (item instanceof ReadableStream) {
|
||||
writeToTemporaryFile = () => writeStream(path, item);
|
||||
} else {
|
||||
writeToTemporaryFile = async () => {
|
||||
const [zipPath, entryName] = dataOrPathOrZipItem;
|
||||
const [zipPath, entryName] = item;
|
||||
const zip = openZip(zipPath);
|
||||
try {
|
||||
await zip.extract(entryName, path);
|
||||
|
||||
@@ -2,6 +2,7 @@ import {
|
||||
authenticatedPublicAlbumsRequestHeaders,
|
||||
authenticatedRequestHeaders,
|
||||
ensureOk,
|
||||
retryAsyncOperation,
|
||||
type PublicAlbumsCredentials,
|
||||
} from "ente-base/http";
|
||||
import log from "ente-base/log";
|
||||
@@ -10,7 +11,6 @@ import { EnteFile } from "ente-media/file";
|
||||
import { CustomError, handleUploadError } from "ente-shared/error";
|
||||
import HTTPService from "ente-shared/network/HTTPService";
|
||||
import { getToken } from "ente-shared/storage/localStorage/helpers";
|
||||
import { retryAsyncOperation } from "ente-utils/promise";
|
||||
import { z } from "zod";
|
||||
import { MultipartUploadURLs, UploadFile } from "./upload-service";
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { retryAsyncOperation } from "ente-utils/promise";
|
||||
import { wait } from "ente-utils/promise";
|
||||
import { z } from "zod";
|
||||
import { clientPackageName } from "./app";
|
||||
import { ensureAuthToken } from "./local-user";
|
||||
@@ -164,6 +164,43 @@ export const isMuseumHTTPError = async (
|
||||
return false;
|
||||
};
|
||||
|
||||
/**
|
||||
* Retry a async operation like a HTTP request 3 (+ 1 original) times with
|
||||
* exponential backoff.
|
||||
*
|
||||
* @param op A function that performs the operation, returning the promise for
|
||||
* its completion.
|
||||
*
|
||||
* @param abortIfNeeded An optional function that is called with the
|
||||
* corresponding error whenever {@link op} rejects. It should throw the error if
|
||||
* the retries should immediately be aborted.
|
||||
*
|
||||
* @returns A promise that fulfills with to the result of a first successfully
|
||||
* fulfilled promise of the 4 (1 + 3) attempts, or rejects with the error
|
||||
* obtained either when {@link abortIfNeeded} throws, or with the error from the
|
||||
* last attempt otherwise.
|
||||
*/
|
||||
export const retryAsyncOperation = async <T>(
|
||||
op: () => Promise<T>,
|
||||
abortIfNeeded?: (error: unknown) => void,
|
||||
): Promise<T> => {
|
||||
const waitTimeBeforeNextTry = [2000, 5000, 10000];
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
return await op();
|
||||
} catch (e) {
|
||||
if (abortIfNeeded) {
|
||||
abortIfNeeded(e);
|
||||
}
|
||||
const t = waitTimeBeforeNextTry.shift();
|
||||
if (!t) throw e;
|
||||
log.warn("Will retry potentially transient request failure", e);
|
||||
await wait(t);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* A helper function to adapt {@link retryAsyncOperation} for HTTP fetches.
|
||||
*
|
||||
@@ -184,7 +221,6 @@ export const retryEnsuringHTTPOk = (request: () => Promise<Response>) =>
|
||||
*
|
||||
* This is similar to {@link retryEnsuringHTTPOk}, except it stops retrying if
|
||||
* remote responds with a 4xx HTTP status.
|
||||
|
||||
*/
|
||||
export const retryEnsuringHTTPOkOr4xx = (request: () => Promise<Response>) =>
|
||||
retryAsyncOperation(
|
||||
|
||||
@@ -1103,10 +1103,10 @@ export class FileViewerPhotoSwipe {
|
||||
* navigation stops.
|
||||
*
|
||||
* So as a special case, we keep using arrow keys for navigation for the
|
||||
* first 1s when the user lands on a slide.
|
||||
* first 700 milliseconds when the user lands on a slide.
|
||||
*/
|
||||
const isUserLikelyNavigatingBetweenSlides = () =>
|
||||
Date.now() - lastSlideChangeEpochMilli < 1000; /* ms */
|
||||
Date.now() - lastSlideChangeEpochMilli < 700; /* ms */
|
||||
|
||||
const handleSeekBackOrPreviousSlide = () => {
|
||||
const video = videoVideoEl;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { isDesktop } from "ente-base/app";
|
||||
import log from "ente-base/log";
|
||||
import { workerBridge } from "ente-base/worker/worker-bridge";
|
||||
import type { EnteFile } from "ente-media/file";
|
||||
import { FileType } from "ente-media/file-type";
|
||||
import { isHEICExtension, needsJPEGConversion } from "ente-media/formats";
|
||||
import { heicToJPEG } from "ente-media/heic-convert";
|
||||
@@ -175,11 +176,20 @@ const testHEICDataURL =
|
||||
* A special case if for FileType.livePhoto on Linux in the desktop app, where
|
||||
* the conversion always happens to workaround the audio only playback in that
|
||||
* specific scenario.
|
||||
*
|
||||
* @param file The {@link EnteFile} with which this video is associated.
|
||||
*
|
||||
* @param videoFileName The name of the video.
|
||||
*
|
||||
* @param videoBlob The contents of the video.
|
||||
*
|
||||
* @returns An object URL that can be used to playback the provided
|
||||
* {@link videoBlob}.
|
||||
*/
|
||||
export const playableVideoURL = async (
|
||||
fileName: string,
|
||||
file: EnteFile,
|
||||
videoFileName: string,
|
||||
videoBlob: Blob,
|
||||
opts?: { fileType?: FileType },
|
||||
): Promise<string> => {
|
||||
const videoObjectURL = URL.createObjectURL(videoBlob);
|
||||
const isPlayable = await isPlaybackPossible(videoObjectURL);
|
||||
@@ -207,7 +217,7 @@ export const playableVideoURL = async (
|
||||
// conversion is fast in the desktop app.
|
||||
if (
|
||||
isDesktop &&
|
||||
opts?.fileType == FileType.livePhoto &&
|
||||
file.metadata.fileType == FileType.livePhoto &&
|
||||
// eslint-disable-next-line @typescript-eslint/no-deprecated
|
||||
navigator.platform.startsWith("Linux")
|
||||
) {
|
||||
@@ -228,13 +238,13 @@ export const playableVideoURL = async (
|
||||
|
||||
if (shouldConvert) {
|
||||
try {
|
||||
log.info(`Converting ${fileName} to mp4`);
|
||||
log.info(`Converting ${videoFileName} to mp4`);
|
||||
const convertedBlob = await convertToMP4(videoBlob);
|
||||
return URL.createObjectURL(
|
||||
new Blob([convertedBlob], { type: "video/mp4" }),
|
||||
);
|
||||
} catch (e) {
|
||||
log.error(`Video conversion failed for ${fileName}`, e);
|
||||
log.error(`Video conversion failed for ${videoFileName}`, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -630,16 +630,16 @@ const createRenderableSourceURLs = async (
|
||||
|
||||
const videoURL = () =>
|
||||
playableVideoURL(
|
||||
file,
|
||||
livePhoto.videoFileName,
|
||||
new Blob([livePhoto.videoData]),
|
||||
{ fileType },
|
||||
);
|
||||
|
||||
return { type: "livePhoto", imageURL, originalImageBlob, videoURL };
|
||||
}
|
||||
|
||||
case FileType.video: {
|
||||
const videoURL = await playableVideoURL(fileName, fileBlob);
|
||||
const videoURL = await playableVideoURL(file, fileName, fileBlob);
|
||||
return { type: "video", videoURL };
|
||||
}
|
||||
|
||||
|
||||
@@ -7,9 +7,9 @@ import {
|
||||
type UploadItem,
|
||||
} from "ente-gallery/services/upload";
|
||||
import {
|
||||
initiateConvertToMP4,
|
||||
readVideoStream,
|
||||
videoStreamDone,
|
||||
writeVideoStream,
|
||||
} from "ente-gallery/utils/native-stream";
|
||||
import {
|
||||
parseMetadataDate,
|
||||
@@ -266,8 +266,7 @@ export const convertToMP4 = async (blob: Blob): Promise<Blob | Uint8Array> => {
|
||||
};
|
||||
|
||||
const convertToMP4Native = async (electron: Electron, blob: Blob) => {
|
||||
const tokens = await writeVideoStream(electron, "convert-to-mp4", blob);
|
||||
const token = tokens[0]!;
|
||||
const token = await initiateConvertToMP4(electron, blob);
|
||||
const mp4Blob = await readVideoStream(electron, token).then((res) =>
|
||||
res.blob(),
|
||||
);
|
||||
|
||||
@@ -1,21 +1,18 @@
|
||||
import { decryptBlob } from "ente-base/crypto";
|
||||
import type { EncryptedBlob } from "ente-base/crypto/types";
|
||||
import {
|
||||
retryEnsuringHTTPOkOr4xx,
|
||||
type PublicAlbumsCredentials,
|
||||
} from "ente-base/http";
|
||||
import { type PublicAlbumsCredentials } from "ente-base/http";
|
||||
import log from "ente-base/log";
|
||||
import type { Electron } from "ente-base/types/ipc";
|
||||
import type { EnteFile } from "ente-media/file";
|
||||
import { fileLogID, type EnteFile } from "ente-media/file";
|
||||
import { FileType } from "ente-media/file-type";
|
||||
import { settingsSnapshot } from "ente-new/photos/services/settings";
|
||||
import { gunzip, gzip } from "ente-new/photos/utils/gzip";
|
||||
import { ensurePrecondition } from "ente-utils/ensure";
|
||||
import { z } from "zod";
|
||||
import {
|
||||
initiateGenerateHLS,
|
||||
readVideoStream,
|
||||
videoStreamDone,
|
||||
writeVideoStream,
|
||||
} from "../utils/native-stream";
|
||||
import { downloadManager } from "./download";
|
||||
import {
|
||||
@@ -33,9 +30,12 @@ interface VideoProcessingQueueItem {
|
||||
*/
|
||||
file: EnteFile;
|
||||
/**
|
||||
* The contents of the {@link file} as the newly uploaded {@link UploadItem}.
|
||||
* The {@link UploadItem} if available for the newly uploaded {@link file}.
|
||||
*
|
||||
* If present, this serves as an optimization allowing us to directly read
|
||||
* the file off the user's filesystem.
|
||||
*/
|
||||
uploadItem: UploadItem;
|
||||
uploadItem: UploadItem | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -399,104 +399,50 @@ const processQueueItem = async (
|
||||
) => {
|
||||
log.debug(() => ["gen-hls", { file, uploadItem }]);
|
||||
|
||||
const fileBlob = await fetchOriginalVideoBlob(file, uploadItem);
|
||||
const sourceVideo = uploadItem ?? (await downloadManager.fileStream(file));
|
||||
|
||||
// TODO(HLS):
|
||||
const tokens = await writeVideoStream(electron, "generate-hls", fileBlob!);
|
||||
const [playlistToken, videoToken] = [tokens[0]!, tokens[1]!];
|
||||
// [Note: Upload HLS video segment from node side]
|
||||
//
|
||||
// The generated video can be huge (multi-GB), too large to read it into
|
||||
// memory as an arrayBuffer.
|
||||
//
|
||||
// One option was to chain the video stream response (from the node side)
|
||||
// directly into a fetch request to `objectUploadURL`, however that requires
|
||||
// HTTP/2 (our servers support it, but self hosters' might not). Also that
|
||||
// approach won't work with retries on transient failures unless we
|
||||
// duplicate the stream beforehand, which invalidates the point of
|
||||
// streaming.
|
||||
//
|
||||
// So instead we provide the presigned upload URL to the node side so that
|
||||
// it can directly upload the generated video segments.
|
||||
const { objectID, url: objectUploadURL } =
|
||||
await getFilePreviewDataUploadURL(file);
|
||||
|
||||
log.info(`Generate HLS for ${fileLogID(file)} | start`);
|
||||
|
||||
const { playlistToken, dimensions, videoSize } = await initiateGenerateHLS(
|
||||
electron,
|
||||
sourceVideo!,
|
||||
objectUploadURL,
|
||||
);
|
||||
|
||||
try {
|
||||
const playlistBlob = await readVideoStream(
|
||||
electron,
|
||||
playlistToken,
|
||||
).then((res) => res.blob());
|
||||
|
||||
const { objectID, url: objectUploadURL } =
|
||||
await getFilePreviewDataUploadURL(file);
|
||||
|
||||
// TOOD(HLS): Move this to the node side.
|
||||
const videoBlob = await readVideoStream(electron, videoToken).then(
|
||||
(res) => res.blob(),
|
||||
);
|
||||
|
||||
const objectSize = videoBlob.size;
|
||||
|
||||
// Video conversion is non-trivial effort, and the video segment file
|
||||
// we're uploading is potentially very large, so add retries to avoid
|
||||
// the need to redo everything in case of transient errors.
|
||||
await retryEnsuringHTTPOkOr4xx(() =>
|
||||
fetch(objectUploadURL, { method: "PUT", body: videoBlob }),
|
||||
const playlist = await readVideoStream(electron, playlistToken).then(
|
||||
(res) => res.text(),
|
||||
);
|
||||
|
||||
const playlistData = await encodePlaylistJSON({
|
||||
type: "hls_video",
|
||||
playlist: await playlistBlob.text(),
|
||||
// TODO(HLS): Critical, fix this before any use.
|
||||
width: 1280,
|
||||
// TODO(HLS): Critical, fix this before any use.
|
||||
height: 720,
|
||||
size: objectSize,
|
||||
playlist,
|
||||
...dimensions,
|
||||
size: videoSize,
|
||||
});
|
||||
|
||||
await putVideoData(file, playlistData, objectID, objectSize);
|
||||
await putVideoData(file, playlistData, objectID, videoSize);
|
||||
|
||||
log.info(`Generate HLS for ${fileLogID(file)} | done`);
|
||||
} finally {
|
||||
await Promise.all([
|
||||
videoStreamDone(electron, playlistToken),
|
||||
videoStreamDone(electron, videoToken),
|
||||
]);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Return a blob containing the contents of the given video file.
|
||||
*
|
||||
* The blob is either constructed using the given {@link uploadItem} if present,
|
||||
* otherwise it is downloaded from remote.
|
||||
*
|
||||
* @param file An {@link EnteFile} of type {@link FileType.video}.
|
||||
*
|
||||
* @param uploadItem If we're called during the upload process, then this will
|
||||
* be set to the {@link UploadItem} that was uploaded. This way, we can directly
|
||||
* use the on-disk file instead of needing to download the original from remote.
|
||||
*/
|
||||
const fetchOriginalVideoBlob = async (
|
||||
file: EnteFile,
|
||||
uploadItem: UploadItem | undefined,
|
||||
): Promise<Blob | ReadableStream | null> =>
|
||||
uploadItem
|
||||
? fetchOriginalVideoUploadItemBlob(file, uploadItem)
|
||||
: await downloadManager.fileStream(file);
|
||||
|
||||
const fetchOriginalVideoUploadItemBlob = (
|
||||
_: EnteFile,
|
||||
uploadItem: UploadItem,
|
||||
) => {
|
||||
// TODO(HLS): Commented below is the implementation that the eventual
|
||||
// desktop only conversion would need to handle - the conversion logic would
|
||||
// need to move to the desktop side to allow it to handle large videos.
|
||||
//
|
||||
// Meanwhile during development, we assume we're on the happy web-only cases
|
||||
// (dragging and dropping a file). All this code is behind a development
|
||||
// feature flag, so it is not going to impact end users.
|
||||
|
||||
if (typeof uploadItem == "string" || Array.isArray(uploadItem)) {
|
||||
throw new Error("Not implemented");
|
||||
// const { response, lastModifiedMs } = await readStream(
|
||||
// ensureElectron(),
|
||||
// uploadItem,
|
||||
// );
|
||||
// const path = typeof uploadItem == "string" ? uploadItem : uploadItem[1];
|
||||
// // This function will not be called for videos, and for images
|
||||
// // it is reasonable to read the entire stream into memory here.
|
||||
// return new File([await response.arrayBuffer()], basename(path), {
|
||||
// lastModified: lastModifiedMs,
|
||||
// });
|
||||
} else {
|
||||
if (uploadItem instanceof File) {
|
||||
return uploadItem;
|
||||
} else {
|
||||
return uploadItem.file;
|
||||
}
|
||||
await Promise.all([videoStreamDone(electron, playlistToken)]);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
import type { Electron, ElectronMLWorker, ZipItem } from "ente-base/types/ipc";
|
||||
import { z } from "zod";
|
||||
import type { UploadItem } from "../services/upload";
|
||||
|
||||
/**
|
||||
* Stream the given file or zip entry from the user's local file system.
|
||||
@@ -119,57 +120,125 @@ export const writeStream = async (
|
||||
};
|
||||
|
||||
/**
|
||||
* One of the predefined operations to perform when invoking
|
||||
* {@link writeVideoStream} or {@link readVideoStream}.
|
||||
* Initate a conversion to MP4, streaming the video contents to the node side.
|
||||
*
|
||||
* - "convert-to-mp4" (See: [Note: Convert to MP4])
|
||||
* This is a variant of {@link writeStream} tailored for the conversion to MP4.
|
||||
*
|
||||
* - "generate-hls" (See: [Note: Preview variant of videos])
|
||||
* @param _ An {@link Electron} instance, witness to the fact that we're running
|
||||
* in the context of the desktop app. It is otherwise not used.
|
||||
*
|
||||
* @param video A {@link Blob} containing the video to convert.
|
||||
*
|
||||
* @returns a token that can then be passed to {@link readVideoStream} to
|
||||
* retrieve the converted MP4 file. This three step sequence (write/read/done)
|
||||
* can then be ended by using {@link videoStreamDone}).
|
||||
*
|
||||
* See: [Note: Convert to MP4].
|
||||
*/
|
||||
type VideoStreamOp = "convert-to-mp4" | "generate-hls";
|
||||
export const initiateConvertToMP4 = async (
|
||||
_: Electron,
|
||||
video: Blob,
|
||||
): Promise<string> => {
|
||||
const url = "stream://video?op=convert-to-mp4";
|
||||
const res = await fetch(url, { method: "POST", body: video });
|
||||
if (!res.ok)
|
||||
throw new Error(`Failed to write stream to ${url}: HTTP ${res.status}`);
|
||||
return res.text();
|
||||
};
|
||||
|
||||
const GenerateHLSResult = z.object({
|
||||
/**
|
||||
* A token that can be used to passed to {@link readVideoStream} to retrieve
|
||||
* the generated HLS playlist.
|
||||
*/
|
||||
playlistToken: z.string(),
|
||||
/**
|
||||
* The dimensions (width and height in pixels) of the generated video stream.
|
||||
*/
|
||||
dimensions: z.object({ width: z.number(), height: z.number() }),
|
||||
/**
|
||||
* The size (in bytes) of the file containing the encrypted video segments.
|
||||
*/
|
||||
videoSize: z.number(),
|
||||
});
|
||||
|
||||
export type GenerateHLSResult = z.infer<typeof GenerateHLSResult>;
|
||||
|
||||
/**
|
||||
* Variant of {@link writeStream} tailored for video processing operations.
|
||||
* Initate the generation of a HLS stream, streaming the source video contents
|
||||
* to the node side.
|
||||
*
|
||||
* @param op The operation to perform on this video (the result can then be
|
||||
* later read back in via {@link readVideoStream}, and the sequence ended by
|
||||
* using {@link videoStreamDone}).
|
||||
* This is a variant of {@link writeStream} tailored for the HLS generation. It
|
||||
* is similar to {@link initiateConvertToMP4}, but also supports streaming
|
||||
* {@link UploadItem}s and {@link ReadableStream}s.
|
||||
*
|
||||
* @param video The video to convert, as a {@link Blob} or a
|
||||
* {@link ReadableStream}.
|
||||
* @param _ An {@link Electron} instance, witness to the fact that we're running
|
||||
* in the context of the desktop app. It is otherwise not used.
|
||||
*
|
||||
* @returns an array of token that can then be passed to {@link readVideoStream}
|
||||
* to read back the processed video. The count (and semantics) of the tokens are
|
||||
* dependent on the operation:
|
||||
* @param video The video to convert.
|
||||
*
|
||||
* - "convert-to-mp4" returns a single token (which can be used to retrieve the
|
||||
* converted MP4 file).
|
||||
* - If we're called during the upload process, then this will be set to the
|
||||
* {@link UploadItem} that was uploaded. This way, we can directly use the
|
||||
* on-disk file instead of needing to download the original from remote.
|
||||
*
|
||||
* - "generate-hls" returns two tokens, first one that can be used to retrieve
|
||||
* the generated HLS playlist, and the second one that can be used to retrieve
|
||||
* the video (segments).
|
||||
* - Otherwise it should be a {@link ReadableStream} of the video contents.
|
||||
*
|
||||
* @param objectUploadURL A presigned URL where the video segments should be
|
||||
* uploaded to.
|
||||
*
|
||||
* @returns a token that can be used to retrieve the generated HLS playlist, and
|
||||
* metadata about the generated video (its byte size and dimensions). See {@link
|
||||
* GenerateHLSResult.
|
||||
*
|
||||
* See: [Note: Preview variant of videos].
|
||||
*/
|
||||
export const writeVideoStream = async (
|
||||
export const initiateGenerateHLS = async (
|
||||
_: Electron,
|
||||
op: VideoStreamOp,
|
||||
video: Blob | ReadableStream,
|
||||
): Promise<string[]> => {
|
||||
const url = `stream://video?op=${op}`;
|
||||
video: UploadItem | ReadableStream,
|
||||
objectUploadURL: string,
|
||||
): Promise<GenerateHLSResult> => {
|
||||
const params = new URLSearchParams({ op: "generate-hls", objectUploadURL });
|
||||
|
||||
const req = new Request(url, {
|
||||
let body: ReadableStream | null;
|
||||
if (video instanceof ReadableStream) {
|
||||
body = video;
|
||||
} else {
|
||||
// video is an UploadItem
|
||||
body = null;
|
||||
if (typeof video == "string") {
|
||||
// Path to a regular file on the user's filesystem.
|
||||
params.set("path", video);
|
||||
} else if (Array.isArray(video)) {
|
||||
// Path within a zip file on the user's filesystem.
|
||||
const [zipPath, entryName] = video;
|
||||
params.set("zipPath", zipPath);
|
||||
params.set("entryName", entryName);
|
||||
} else if (video instanceof File) {
|
||||
// A drag and dropped file, but without a path. This is a browser
|
||||
// specific case which shouldn't happen when we're running in the
|
||||
// desktop app. Bail.
|
||||
throw new Error("Unexpected file without path");
|
||||
} else {
|
||||
// A File with a path. Use the path.
|
||||
params.set("path", video.path);
|
||||
}
|
||||
}
|
||||
|
||||
const url = `stream://video?${params.toString()}`;
|
||||
const res = await fetch(url, {
|
||||
method: "POST",
|
||||
body: video,
|
||||
// The duplex option is required when body is a stream.
|
||||
//
|
||||
// @ts-expect-error TypeScript's libdom.d.ts does not include the
|
||||
// "duplex" parameter, e.g. see
|
||||
// https://github.com/node-fetch/node-fetch/issues/1769.
|
||||
duplex: "half",
|
||||
body,
|
||||
});
|
||||
|
||||
const res = await fetch(req);
|
||||
if (!res.ok)
|
||||
throw new Error(`Failed to write stream to ${url}: HTTP ${res.status}`);
|
||||
|
||||
return z.array(z.string()).parse(await res.json());
|
||||
return GenerateHLSResult.parse(await res.json());
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -113,42 +113,6 @@ export const withTimeout = async <T>(
|
||||
return Promise.race([promiseAndCancelTimeout(), rejectOnTimeout]);
|
||||
};
|
||||
|
||||
/**
|
||||
* Retry a async operation like a HTTP request 3 (+ 1 original) times with
|
||||
* exponential backoff.
|
||||
*
|
||||
* @param op A function that performs the operation, returning the promise for
|
||||
* its completion.
|
||||
*
|
||||
* @param abortIfNeeded An optional function that is called with the
|
||||
* corresponding error whenever {@link op} rejects. It should throw the error if
|
||||
* the retries should immediately be aborted.
|
||||
*
|
||||
* @returns A promise that fulfills with to the result of a first successfully
|
||||
* fulfilled promise of the 4 (1 + 3) attempts, or rejects with the error
|
||||
* obtained either when {@link abortIfNeeded} throws, or with the error from the
|
||||
* last attempt otherwise.
|
||||
*/
|
||||
export const retryAsyncOperation = async <T>(
|
||||
op: () => Promise<T>,
|
||||
abortIfNeeded?: (error: unknown) => void,
|
||||
): Promise<T> => {
|
||||
const waitTimeBeforeNextTry = [2000, 5000, 10000];
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
return await op();
|
||||
} catch (e) {
|
||||
if (abortIfNeeded) {
|
||||
abortIfNeeded(e);
|
||||
}
|
||||
const t = waitTimeBeforeNextTry.shift();
|
||||
if (!t) throw e;
|
||||
await wait(t);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* A promise queue to serialize execution of bunch of promises.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user