From 132ddd3648f381e11440444e8b8d7aacdab64f95 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 23 May 2024 09:48:52 +0530 Subject: [PATCH 1/3] Rework the video chunk decryptor stream logic When running on Ubuntu 24 arm64 in the desktop app (didn't test on web0, trying to open certain videos fails with: > [rndr] [error] Failed to process file stream: TypeError: Failed to execute 'enqueue' on 'ReadableStreamDefaultController': Cannot enqueue a chunk into a closed readable stream While not specifically fixing that issue, I'm first rewriting this to use the more normal (recommended?) approach of implementing a pull instead of doing everything in start. Maybe that fixes the issue, otherwise at least one less ghost for me to worry about. --- .../photos/src/services/download/index.ts | 122 ++++++++---------- 1 file changed, 55 insertions(+), 67 deletions(-) diff --git a/web/apps/photos/src/services/download/index.ts b/web/apps/photos/src/services/download/index.ts index d0be660c9f..8c8a1dd95c 100644 --- a/web/apps/photos/src/services/download/index.ts +++ b/web/apps/photos/src/services/download/index.ts @@ -318,76 +318,64 @@ class DownloadManagerImpl { const contentLength = +res.headers.get("Content-Length") ?? 0; let downloadedBytes = 0; + const decryptionHeader = await this.cryptoWorker.fromB64( + file.file.decryptionHeader, + ); + const fileKey = await this.cryptoWorker.fromB64(file.key); + const { pullState, decryptionChunkSize } = + await this.cryptoWorker.initChunkDecryption( + decryptionHeader, + fileKey, + ); + + let leftoverBytes = new Uint8Array(); + const stream = new ReadableStream({ - start: async (controller) => { - try { - const decryptionHeader = await this.cryptoWorker.fromB64( - file.file.decryptionHeader, - ); - const fileKey = await this.cryptoWorker.fromB64(file.key); - const { pullState, decryptionChunkSize } = - await this.cryptoWorker.initChunkDecryption( - decryptionHeader, - fileKey, + pull: async (controller) => { + // done is a boolean and value is an Uint8Array. When done is + // true value will be empty, otherwise present. + const { done, value } = await reader.read(); + + let data: Uint8Array; + if (done) { + data = leftoverBytes; + } else { + downloadedBytes += value.length; + onDownloadProgress({ + loaded: downloadedBytes, + total: contentLength, + }); + + data = new Uint8Array(leftoverBytes.length + value.length); + data.set(new Uint8Array(leftoverBytes), 0); + data.set(new Uint8Array(value), leftoverBytes.length); + } + + // data.length might be a multiple of decryptionChunkSize, and + // we might need multiple iterations to drain it all. + + while (data && data.length >= decryptionChunkSize) { + const { decryptedData } = + await this.cryptoWorker.decryptFileChunk( + data.slice(0, decryptionChunkSize), + pullState, ); + controller.enqueue(decryptedData); + data = data.slice(decryptionChunkSize); + } - let data = new Uint8Array(); - let more = true; - while (more) { - more = false; - - // "done" is a Boolean and value a "Uint8Array" - const { done, value } = await reader.read(); - - // Is there more data to read? - if (!done) { - downloadedBytes += value.length; - onDownloadProgress({ - loaded: downloadedBytes, - total: contentLength, - }); - - const buffer = new Uint8Array( - data.length + value.length, - ); - buffer.set(new Uint8Array(data), 0); - buffer.set(new Uint8Array(value), data.length); - - // Note that buffer.length might be a multiple of - // decryptionChunkSize. We let these accumulate, and - // drain it all with a nested while loop when done. - - if (buffer.length > decryptionChunkSize) { - const { decryptedData } = - await this.cryptoWorker.decryptFileChunk( - buffer.slice(0, decryptionChunkSize), - pullState, - ); - controller.enqueue(decryptedData); - data = buffer.slice(decryptionChunkSize); - } else { - data = buffer; - } - more = true; - } else { - while (data && data.length) { - const { decryptedData } = - await this.cryptoWorker.decryptFileChunk( - data.slice(0, decryptionChunkSize), - pullState, - ); - controller.enqueue(decryptedData); - data = - data.length > decryptionChunkSize - ? data.slice(decryptionChunkSize) - : undefined; - } - controller.close(); - } - } - } catch (e) { - log.error("Failed to process file stream", e); - controller.error(e); + if (done) { + // Send off the last one, no more bytes are going to come. + const { decryptedData } = + await this.cryptoWorker.decryptFileChunk( + data, + pullState, + ); + controller.enqueue(decryptedData); + controller.close(); + } else { + // Save it for the next pull. + leftoverBytes = data; } }, }); From 8a2117f9d430b1afe6565c743ea5606425948492 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 23 May 2024 10:05:14 +0530 Subject: [PATCH 2/3] Chunk --- .../photos/src/services/download/index.ts | 97 ++++++++++--------- 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/web/apps/photos/src/services/download/index.ts b/web/apps/photos/src/services/download/index.ts index 8c8a1dd95c..ee7e91fb5d 100644 --- a/web/apps/photos/src/services/download/index.ts +++ b/web/apps/photos/src/services/download/index.ts @@ -330,57 +330,66 @@ class DownloadManagerImpl { let leftoverBytes = new Uint8Array(); - const stream = new ReadableStream({ + return new ReadableStream({ pull: async (controller) => { - // done is a boolean and value is an Uint8Array. When done is - // true value will be empty, otherwise present. - const { done, value } = await reader.read(); + // Each time pull is called, we want to enqueue at least once. + let didEnqueue = false; + do { + // done is a boolean and value is an Uint8Array. When done + // is true value will be empty. + const { done, value } = await reader.read(); - let data: Uint8Array; - if (done) { - data = leftoverBytes; - } else { - downloadedBytes += value.length; - onDownloadProgress({ - loaded: downloadedBytes, - total: contentLength, - }); + let data: Uint8Array; + if (done) { + data = leftoverBytes; + } else { + downloadedBytes += value.length; + onDownloadProgress({ + loaded: downloadedBytes, + total: contentLength, + }); - data = new Uint8Array(leftoverBytes.length + value.length); - data.set(new Uint8Array(leftoverBytes), 0); - data.set(new Uint8Array(value), leftoverBytes.length); - } - - // data.length might be a multiple of decryptionChunkSize, and - // we might need multiple iterations to drain it all. - - while (data && data.length >= decryptionChunkSize) { - const { decryptedData } = - await this.cryptoWorker.decryptFileChunk( - data.slice(0, decryptionChunkSize), - pullState, + data = new Uint8Array( + leftoverBytes.length + value.length, ); - controller.enqueue(decryptedData); - data = data.slice(decryptionChunkSize); - } + data.set(new Uint8Array(leftoverBytes), 0); + data.set(new Uint8Array(value), leftoverBytes.length); + } - if (done) { - // Send off the last one, no more bytes are going to come. - const { decryptedData } = - await this.cryptoWorker.decryptFileChunk( - data, - pullState, - ); - controller.enqueue(decryptedData); - controller.close(); - } else { - // Save it for the next pull. - leftoverBytes = data; - } + // data.length might be a multiple of decryptionChunkSize, + // and we might need multiple iterations to drain it all. + while (data.length >= decryptionChunkSize) { + const { decryptedData } = + await this.cryptoWorker.decryptFileChunk( + data.slice(0, decryptionChunkSize), + pullState, + ); + controller.enqueue(decryptedData); + didEnqueue = true; + data = data.slice(decryptionChunkSize); + } + + if (done) { + // Send off the remaining bytes without waiting for a + // full chunk, no more bytes are going to come. + if (data.length) { + const { decryptedData } = + await this.cryptoWorker.decryptFileChunk( + data, + pullState, + ); + controller.enqueue(decryptedData); + } + // Don't loop again even if we didn't enqueue. + didEnqueue = true; + controller.close(); + } else { + // Save it for the next pull. + leftoverBytes = data; + } + } while (!didEnqueue); }, }); - - return stream; } trackDownloadProgress = (fileID: number, fileSize: number) => { From 3b89471b875b0e3dcae9a99557436c46fee52a97 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 23 May 2024 10:14:18 +0530 Subject: [PATCH 3/3] Use a standard converter --- web/apps/photos/src/services/download/index.ts | 4 ++-- web/apps/photos/src/services/export/index.ts | 7 ++----- web/apps/photos/src/utils/file/index.ts | 13 ++----------- web/packages/next/blob-cache.ts | 4 ++++ 4 files changed, 10 insertions(+), 18 deletions(-) diff --git a/web/apps/photos/src/services/download/index.ts b/web/apps/photos/src/services/download/index.ts index ee7e91fb5d..eb979af875 100644 --- a/web/apps/photos/src/services/download/index.ts +++ b/web/apps/photos/src/services/download/index.ts @@ -11,7 +11,7 @@ import { Remote } from "comlink"; import isElectron from "is-electron"; import * as ffmpeg from "services/ffmpeg"; import { EnteFile } from "types/file"; -import { generateStreamFromArrayBuffer, getRenderableImage } from "utils/file"; +import { getRenderableImage } from "utils/file"; import { PhotosDownloadClient } from "./clients/photos"; import { PublicAlbumsDownloadClient } from "./clients/publicAlbums"; @@ -289,7 +289,7 @@ class DownloadManagerImpl { await this.cryptoWorker.fromB64(file.file.decryptionHeader), file.key, ); - return generateStreamFromArrayBuffer(decrypted); + return new Response(decrypted).body; } catch (e) { if (e.message === CustomError.PROCESSING_FAILED) { log.error( diff --git a/web/apps/photos/src/services/export/index.ts b/web/apps/photos/src/services/export/index.ts index 3a68837e75..16472b3b4f 100644 --- a/web/apps/photos/src/services/export/index.ts +++ b/web/apps/photos/src/services/export/index.ts @@ -29,7 +29,6 @@ import { getNonEmptyPersonalCollections, } from "utils/collection"; import { - generateStreamFromArrayBuffer, getPersonalFiles, getUpdatedEXIFFileForDownload, mergeMetadata, @@ -1026,7 +1025,6 @@ class ExportService { videoExportName, ); - const imageStream = generateStreamFromArrayBuffer(livePhoto.imageData); await this.saveMetadataFile( collectionExportPath, imageExportName, @@ -1035,10 +1033,9 @@ class ExportService { await writeStream( electron, `${collectionExportPath}/${imageExportName}`, - imageStream, + new Response(livePhoto.imageData).body, ); - const videoStream = generateStreamFromArrayBuffer(livePhoto.videoData); await this.saveMetadataFile( collectionExportPath, videoExportName, @@ -1048,7 +1045,7 @@ class ExportService { await writeStream( electron, `${collectionExportPath}/${videoExportName}`, - videoStream, + new Response(livePhoto.videoData).body, ); } catch (e) { await fs.rm(`${collectionExportPath}/${imageExportName}`); diff --git a/web/apps/photos/src/utils/file/index.ts b/web/apps/photos/src/utils/file/index.ts index af5c06e8e0..f2f9932dd8 100644 --- a/web/apps/photos/src/utils/file/index.ts +++ b/web/apps/photos/src/utils/file/index.ts @@ -262,15 +262,6 @@ export async function decryptFile( } } -export function generateStreamFromArrayBuffer(data: Uint8Array) { - return new ReadableStream({ - async start(controller: ReadableStreamDefaultController) { - controller.enqueue(data); - controller.close(); - }, - }); -} - /** * The returned blob.type is filled in, whenever possible, with the MIME type of * the data that we're dealing with. @@ -649,7 +640,7 @@ async function downloadFileDesktop( imageFileName, fs.exists, ); - const imageStream = generateStreamFromArrayBuffer(imageData); + const imageStream = new Response(imageData).body; await writeStream( electron, `${downloadDir}/${imageExportName}`, @@ -661,7 +652,7 @@ async function downloadFileDesktop( videoFileName, fs.exists, ); - const videoStream = generateStreamFromArrayBuffer(videoData); + const videoStream = new Response(videoData).body; await writeStream( electron, `${downloadDir}/${videoExportName}`, diff --git a/web/packages/next/blob-cache.ts b/web/packages/next/blob-cache.ts index 7223d0fdc1..0db9464521 100644 --- a/web/packages/next/blob-cache.ts +++ b/web/packages/next/blob-cache.ts @@ -136,6 +136,10 @@ export const openBlobCache = async ( * * new Blob([arrayBuffer, andOrAnyArray, andOrstring]) * + * To convert from a Uint8Array/ArrayBuffer/Blob to a ReadableStream + * + * new Response(array).body + * * Refs: * - https://github.com/yigitunallar/arraybuffer-vs-blob * - https://stackoverflow.com/questions/11821096/what-is-the-difference-between-an-arraybuffer-and-a-blob