[desktop] Streaming IPC 🚅 - Proof of concept (#1452)

This commit is contained in:
Manav Rathi
2024-04-15 16:05:23 +05:30
committed by GitHub
14 changed files with 319 additions and 145 deletions

View File

@@ -9,7 +9,7 @@
* https://www.electronjs.org/docs/latest/tutorial/process-model#the-main-process
*/
import { nativeImage } from "electron";
import { app, BrowserWindow, Menu, Tray } from "electron/main";
import { app, BrowserWindow, Menu, protocol, Tray } from "electron/main";
import serveNextAt from "next-electron-server";
import { existsSync } from "node:fs";
import fs from "node:fs/promises";
@@ -27,6 +27,7 @@ import { setupAutoUpdater } from "./main/services/app-update";
import autoLauncher from "./main/services/autoLauncher";
import { initWatcher } from "./main/services/chokidar";
import { userPreferences } from "./main/stores/user-preferences";
import { registerStreamProtocol } from "./main/stream";
import { isDev } from "./main/util";
/**
@@ -58,6 +59,21 @@ export const allowWindowClose = (): void => {
shouldAllowWindowClose = true;
};
/**
* Log a standard startup banner.
*
* This helps us identify app starts and other environment details in the logs.
*/
const logStartupBanner = () => {
const version = isDev ? "dev" : app.getVersion();
log.info(`Starting ente-photos-desktop ${version}`);
const platform = process.platform;
const osRelease = os.release();
const systemVersion = process.getSystemVersion();
log.info("Running on", { platform, osRelease, systemVersion });
};
/**
* next-electron-server allows up to directly use the output of `next build` in
* production mode and `next dev` in development mode, whilst keeping the rest
@@ -74,18 +90,57 @@ export const allowWindowClose = (): void => {
const setupRendererServer = () => serveNextAt(rendererURL);
/**
* Log a standard startup banner.
* Register privileged schemes.
*
* This helps us identify app starts and other environment details in the logs.
* We have two privileged schemes:
*
* 1. "ente", used for serving our web app (@see {@link setupRendererServer}).
*
* 2. "stream", used for streaming IPC (@see {@link registerStreamProtocol}).
*
* Both of these need some privileges, however, the documentation for Electron's
* [registerSchemesAsPrivileged](https://www.electronjs.org/docs/latest/api/protocol)
* says:
*
* > This method ... can be called only once.
*
* The library we use for the "ente" scheme, next-electron-server, already calls
* it once when we invoke {@link setupRendererServer}.
*
* In practice calling it multiple times just causes the values to be
* overwritten, and the last call wins. So we don't need to modify
* next-electron-server to prevent it from calling registerSchemesAsPrivileged.
* Instead, we (a) repeat what next-electron-server had done here, and (b)
* ensure that we're called after {@link setupRendererServer}.
*/
const logStartupBanner = () => {
const version = isDev ? "dev" : app.getVersion();
log.info(`Starting ente-photos-desktop ${version}`);
const registerPrivilegedSchemes = () => {
protocol.registerSchemesAsPrivileged([
{
// Taken verbatim from next-electron-server's code (index.js)
scheme: "ente",
privileges: {
standard: true,
secure: true,
allowServiceWorkers: true,
supportFetchAPI: true,
corsEnabled: true,
},
},
{
scheme: "stream",
privileges: {
// TODO(MR): Remove the commented bits if we don't end up
// needing them by the time the IPC refactoring is done.
const platform = process.platform;
const osRelease = os.release();
const systemVersion = process.getSystemVersion();
log.info("Running on", { platform, osRelease, systemVersion });
// Prevent the insecure origin issues when fetching this
// secure: true,
// Allow the web fetch API in the renderer to use this scheme.
supportFetchAPI: true,
// Allow it to be used with video tags.
// stream: true,
},
},
]);
};
/**
@@ -251,8 +306,10 @@ const main = () => {
let mainWindow: BrowserWindow | undefined;
initLogging();
setupRendererServer();
logStartupBanner();
// The order of the next two calls is important
setupRendererServer();
registerPrivilegedSchemes();
increaseDiskCache();
app.on("second-instance", () => {
@@ -269,11 +326,11 @@ const main = () => {
// Note that some Electron APIs can only be used after this event occurs.
app.on("ready", async () => {
mainWindow = await createMainWindow();
const watcher = initWatcher(mainWindow);
setupTrayItem(mainWindow);
Menu.setApplicationMenu(await createApplicationMenu(mainWindow));
setupTrayItem(mainWindow);
attachIPCHandlers();
attachFSWatchIPCHandlers(watcher);
attachFSWatchIPCHandlers(initWatcher(mainWindow));
registerStreamProtocol();
if (!isDev) setupAutoUpdater(mainWindow);
handleDownloads(mainWindow);
handleExternalLinks(mainWindow);

View File

@@ -1,9 +1,9 @@
/**
* @file file system related functions exposed over the context bridge.
*/
import { createWriteStream, existsSync } from "node:fs";
import { existsSync } from "node:fs";
import fs from "node:fs/promises";
import { Readable } from "node:stream";
import { writeStream } from "./stream";
export const fsExists = (path: string) => existsSync(path);
@@ -17,79 +17,16 @@ export const fsRmdir = (path: string) => fs.rmdir(path);
export const fsRm = (path: string) => fs.rm(path);
/**
* Write a (web) ReadableStream to a file at the given {@link filePath}.
*
* The returned promise resolves when the write completes.
*
* @param filePath The local filesystem path where the file should be written.
* @param readableStream A [web
* ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream)
*/
export const writeStream = (filePath: string, readableStream: ReadableStream) =>
writeNodeStream(filePath, convertWebReadableStreamToNode(readableStream));
export const fsReadTextFile = async (filePath: string) =>
fs.readFile(filePath, "utf-8");
/**
* Convert a Web ReadableStream into a Node.js ReadableStream
*
* This can be used to, for example, write a ReadableStream obtained via
* `net.fetch` into a file using the Node.js `fs` APIs
*/
const convertWebReadableStreamToNode = (readableStream: ReadableStream) => {
const reader = readableStream.getReader();
const rs = new Readable();
rs._read = async () => {
try {
const result = await reader.read();
if (!result.done) {
rs.push(Buffer.from(result.value));
} else {
rs.push(null);
return;
}
} catch (e) {
rs.emit("error", e);
}
};
return rs;
};
const writeNodeStream = async (
filePath: string,
fileStream: NodeJS.ReadableStream,
) => {
const writeable = createWriteStream(filePath);
fileStream.on("error", (error) => {
writeable.destroy(error); // Close the writable stream with an error
});
fileStream.pipe(writeable);
await new Promise((resolve, reject) => {
writeable.on("finish", resolve);
writeable.on("error", async (e: unknown) => {
if (existsSync(filePath)) {
await fs.unlink(filePath);
}
reject(e);
});
});
};
export const fsWriteFile = (path: string, contents: string) =>
fs.writeFile(path, contents);
/* TODO: Audit below this */
export const saveStreamToDisk = writeStream;
export const saveFileToDisk = (path: string, contents: string) =>
fs.writeFile(path, contents);
export const readTextFile = async (filePath: string) =>
fs.readFile(filePath, "utf-8");
export const isFolder = async (dirPath: string) => {
if (!existsSync(dirPath)) return false;
const stats = await fs.stat(dirPath);

View File

@@ -20,12 +20,12 @@ import {
import {
fsExists,
fsMkdirIfNeeded,
fsReadTextFile,
fsRename,
fsRm,
fsRmdir,
fsWriteFile,
isFolder,
readTextFile,
saveFileToDisk,
saveStreamToDisk,
} from "./fs";
import { logToDisk } from "./log";
@@ -113,6 +113,26 @@ export const attachIPCHandlers = () => {
ipcMain.on("skipAppUpdate", (_, version) => skipAppUpdate(version));
// - FS
ipcMain.handle("fsExists", (_, path) => fsExists(path));
ipcMain.handle("fsRename", (_, oldPath: string, newPath: string) =>
fsRename(oldPath, newPath),
);
ipcMain.handle("fsMkdirIfNeeded", (_, dirPath) => fsMkdirIfNeeded(dirPath));
ipcMain.handle("fsRmdir", (_, path: string) => fsRmdir(path));
ipcMain.handle("fsRm", (_, path: string) => fsRm(path));
ipcMain.handle("fsReadTextFile", (_, path: string) => fsReadTextFile(path));
ipcMain.handle("fsWriteFile", (_, path: string, contents: string) =>
fsWriteFile(path, contents),
);
// - Conversion
ipcMain.handle("convertToJPEG", (_, fileData, filename) =>
@@ -164,20 +184,6 @@ export const attachIPCHandlers = () => {
ipcMain.handle("showUploadZipDialog", () => showUploadZipDialog());
// - FS
ipcMain.handle("fsExists", (_, path) => fsExists(path));
ipcMain.handle("fsRename", (_, oldPath: string, newPath: string) =>
fsRename(oldPath, newPath),
);
ipcMain.handle("fsMkdirIfNeeded", (_, dirPath) => fsMkdirIfNeeded(dirPath));
ipcMain.handle("fsRmdir", (_, path: string) => fsRmdir(path));
ipcMain.handle("fsRm", (_, path: string) => fsRm(path));
// - FS Legacy
ipcMain.handle(
@@ -186,12 +192,6 @@ export const attachIPCHandlers = () => {
saveStreamToDisk(path, fileStream),
);
ipcMain.handle("saveFileToDisk", (_, path: string, contents: string) =>
saveFileToDisk(path, contents),
);
ipcMain.handle("readTextFile", (_, path: string) => readTextFile(path));
ipcMain.handle("isFolder", (_, dirPath: string) => isFolder(dirPath));
// - Upload

View File

@@ -2,7 +2,7 @@ import pathToFfmpeg from "ffmpeg-static";
import { existsSync } from "node:fs";
import fs from "node:fs/promises";
import { ElectronFile } from "../../types/ipc";
import { writeStream } from "../fs";
import { writeStream } from "../stream";
import log from "../log";
import { generateTempFilePath, getTempDirPath } from "../temp";
import { execAsync } from "../util";

View File

@@ -2,7 +2,7 @@ import { existsSync } from "fs";
import fs from "node:fs/promises";
import path from "path";
import { CustomErrors, ElectronFile } from "../../types/ipc";
import { writeStream } from "../fs";
import { writeStream } from "../stream";
import log from "../log";
import { isPlatform } from "../platform";
import { generateTempFilePath } from "../temp";

View File

@@ -11,7 +11,7 @@ import fs from "node:fs/promises";
import * as ort from "onnxruntime-node";
import Tokenizer from "../../thirdparty/clip-bpe-ts/mod";
import { CustomErrors } from "../../types/ipc";
import { writeStream } from "../fs";
import { writeStream } from "../stream";
import log from "../log";
import { generateTempFilePath } from "../temp";
import { deleteTempFile } from "./ffmpeg";

View File

@@ -15,7 +15,7 @@ import { existsSync } from "fs";
import fs from "node:fs/promises";
import path from "node:path";
import * as ort from "onnxruntime-node";
import { writeStream } from "../fs";
import { writeStream } from "../stream";
import log from "../log";
/**

114
desktop/src/main/stream.ts Normal file
View File

@@ -0,0 +1,114 @@
/**
* @file stream data to-from renderer using a custom protocol handler.
*/
import { protocol } from "electron/main";
import { createWriteStream, existsSync } from "node:fs";
import fs from "node:fs/promises";
import { Readable } from "node:stream";
import log from "./log";
/**
* Register a protocol handler that we use for streaming large files between the
* main process (node) and the renderer process (browser) layer.
*
* [Note: IPC streams]
*
* When running without node integration, there is no direct way to pass streams
* across IPC. And passing the entire contents of the file is not feasible for
* large video files because of the memory pressure the copying would entail.
*
* As an alternative, we register a custom protocol handler that can provided a
* bi-directional stream. The renderer can stream data to the node side by
* streaming the request. The node side can stream to the renderer side by
* streaming the response.
*
* See also: [Note: Transferring large amount of data over IPC]
*
* Depends on {@link registerPrivilegedSchemes}.
*/
export const registerStreamProtocol = () => {
protocol.handle("stream", async (request: Request) => {
const url = request.url;
const { host, pathname } = new URL(url);
switch (host) {
/* stream://write//path/to/file */
/* -host/pathname----- */
case "write":
try {
await writeStream(pathname, request.body);
return new Response("", { status: 200 });
} catch (e) {
log.error(`Failed to write stream for ${url}`, e);
return new Response(
`Failed to write stream: ${e.message}`,
{ status: 500 },
);
}
default:
return new Response("", { status: 404 });
}
});
};
/**
* Write a (web) ReadableStream to a file at the given {@link filePath}.
*
* The returned promise resolves when the write completes.
*
* @param filePath The local filesystem path where the file should be written.
* @param readableStream A [web
* ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream)
*/
export const writeStream = (filePath: string, readableStream: ReadableStream) =>
writeNodeStream(filePath, convertWebReadableStreamToNode(readableStream));
/**
* Convert a Web ReadableStream into a Node.js ReadableStream
*
* This can be used to, for example, write a ReadableStream obtained via
* `net.fetch` into a file using the Node.js `fs` APIs
*/
const convertWebReadableStreamToNode = (readableStream: ReadableStream) => {
const reader = readableStream.getReader();
const rs = new Readable();
rs._read = async () => {
try {
const result = await reader.read();
if (!result.done) {
rs.push(Buffer.from(result.value));
} else {
rs.push(null);
return;
}
} catch (e) {
rs.emit("error", e);
}
};
return rs;
};
const writeNodeStream = async (
filePath: string,
fileStream: NodeJS.ReadableStream,
) => {
const writeable = createWriteStream(filePath);
fileStream.on("error", (error) => {
writeable.destroy(error); // Close the writable stream with an error
});
fileStream.pipe(writeable);
await new Promise((resolve, reject) => {
writeable.on("finish", resolve);
writeable.on("error", async (e: unknown) => {
if (existsSync(filePath)) {
await fs.unlink(filePath);
}
reject(e);
});
});
};

View File

@@ -96,6 +96,8 @@ const skipAppUpdate = (version: string) => {
ipcRenderer.send("skipAppUpdate", version);
};
// - FS
const fsExists = (path: string): Promise<boolean> =>
ipcRenderer.invoke("fsExists", path);
@@ -110,6 +112,12 @@ const fsRmdir = (path: string): Promise<void> =>
const fsRm = (path: string): Promise<void> => ipcRenderer.invoke("fsRm", path);
const fsReadTextFile = (path: string): Promise<string> =>
ipcRenderer.invoke("fsReadTextFile", path);
const fsWriteFile = (path: string, contents: string): Promise<void> =>
ipcRenderer.invoke("fsWriteFile", path, contents);
// - AUDIT below this
// - Conversion
@@ -234,12 +242,6 @@ const saveStreamToDisk = (
fileStream: ReadableStream,
): Promise<void> => ipcRenderer.invoke("saveStreamToDisk", path, fileStream);
const saveFileToDisk = (path: string, contents: string): Promise<void> =>
ipcRenderer.invoke("saveFileToDisk", path, contents);
const readTextFile = (path: string): Promise<string> =>
ipcRenderer.invoke("readTextFile", path);
const isFolder = (dirPath: string): Promise<boolean> =>
ipcRenderer.invoke("isFolder", dirPath);
@@ -298,7 +300,8 @@ const getDirFiles = (dirPath: string): Promise<ElectronFile[]> =>
// https://www.electronjs.org/docs/latest/api/context-bridge#methods
//
// The copy itself is relatively fast, but the problem with transfering large
// amounts of data is potentially running out of memory during the copy.
// amounts of data is potentially running out of memory during the copy. For an
// alternative, see [Note: IPC streams].
contextBridge.exposeInMainWorld("electron", {
// - General
appVersion,
@@ -316,6 +319,17 @@ contextBridge.exposeInMainWorld("electron", {
updateOnNextRestart,
skipAppUpdate,
// - FS
fs: {
exists: fsExists,
rename: fsRename,
mkdirIfNeeded: fsMkdirIfNeeded,
rmdir: fsRmdir,
rm: fsRm,
readTextFile: fsReadTextFile,
writeFile: fsWriteFile,
},
// - Conversion
convertToJPEG,
generateImageThumbnail,
@@ -341,20 +355,9 @@ contextBridge.exposeInMainWorld("electron", {
updateWatchMappingSyncedFiles,
updateWatchMappingIgnoredFiles,
// - FS
fs: {
exists: fsExists,
rename: fsRename,
mkdirIfNeeded: fsMkdirIfNeeded,
rmdir: fsRmdir,
rm: fsRm,
},
// - FS legacy
// TODO: Move these into fs + document + rename if needed
saveStreamToDisk,
saveFileToDisk,
readTextFile,
isFolder,
// - Upload

View File

@@ -7,7 +7,6 @@ import { CustomError } from "@ente/shared/error";
import { Events, eventBus } from "@ente/shared/events";
import { Remote } from "comlink";
import { FILE_TYPE } from "constants/file";
import isElectron from "is-electron";
import { EnteFile } from "types/file";
import {
generateStreamFromArrayBuffer,
@@ -89,11 +88,12 @@ class DownloadManagerImpl {
e,
);
}
try {
if (isElectron()) this.fileCache = await openCache("files");
} catch (e) {
log.error("Failed to open file cache, will continue without it", e);
}
// TODO (MR): Revisit full file caching cf disk space usage
// try {
// if (isElectron()) this.fileCache = await openCache("files");
// } catch (e) {
// log.error("Failed to open file cache, will continue without it", e);
// }
this.cryptoWorker = await ComlinkCryptoWorker.getInstance();
this.ready = true;
eventBus.on(Events.LOGOUT, this.logoutHandler.bind(this), this);

View File

@@ -1,4 +1,5 @@
import { ensureElectron } from "@/next/electron";
import { isDevBuild } from "@/next/env";
import log from "@/next/log";
import { CustomError } from "@ente/shared/error";
import { Events, eventBus } from "@ente/shared/events";
@@ -884,7 +885,7 @@ class ExportService {
try {
const exportRecord = await this.getExportRecord(folder);
const newRecord: ExportRecord = { ...exportRecord, ...newData };
await ensureElectron().saveFileToDisk(
await ensureElectron().fs.writeFile(
`${folder}/${exportRecordFileName}`,
JSON.stringify(newRecord, null, 2),
);
@@ -907,8 +908,7 @@ class ExportService {
if (!(await fs.exists(exportRecordJSONPath))) {
return this.createEmptyExportRecord(exportRecordJSONPath);
}
const recordFile =
await electron.readTextFile(exportRecordJSONPath);
const recordFile = await fs.readTextFile(exportRecordJSONPath);
try {
return JSON.parse(recordFile);
} catch (e) {
@@ -993,6 +993,46 @@ class ExportService {
fileExportName,
file,
);
// TODO(MR): Productionalize
if (isDevBuild) {
const testStream = new ReadableStream({
async start(controller) {
await sleep(1000);
controller.enqueue("This ");
await sleep(1000);
controller.enqueue("is ");
await sleep(1000);
controller.enqueue("a ");
await sleep(1000);
controller.enqueue("test");
controller.close();
},
}).pipeThrough(new TextEncoderStream());
console.log({ a: "will send req", updatedFileStream });
// The duplex parameter needs to be set to 'half' when
// streaming requests.
//
// Currently browsers, and specifically in our case,
// since this code runs only within our desktop
// (Electron) app, Chromium, don't support 'full' duplex
// mode (i.e. streaming both the request and the
// response).
//
// https://developer.chrome.com/docs/capabilities/web-apis/fetch-streaming-requests
//
// In another twist, the TypeScript libdom.d.ts does not
// include the "duplex" parameter, so we need to cast to
// get TypeScript to let this code through. e.g. see
// https://github.com/node-fetch/node-fetch/issues/1769
const req = new Request("stream://write/tmp/foo.txt", {
method: "POST",
// body: updatedFileStream,
body: testStream,
duplex: "half",
} as unknown as RequestInit);
const res = await fetch(req);
console.log({ a: "got res", res });
}
await electron.saveStreamToDisk(
`${collectionExportPath}/${fileExportName}`,
updatedFileStream,
@@ -1077,7 +1117,7 @@ class ExportService {
fileExportName: string,
file: EnteFile,
) {
await ensureElectron().saveFileToDisk(
await ensureElectron().fs.writeFile(
getFileMetadataExportPath(collectionExportPath, fileExportName),
getGoogleLikeMetadataFile(fileExportName, file),
);
@@ -1106,7 +1146,7 @@ class ExportService {
private createEmptyExportRecord = async (exportRecordJSONPath: string) => {
const exportRecord: ExportRecord = NULL_EXPORT_RECORD;
await ensureElectron().saveFileToDisk(
await ensureElectron().fs.writeFile(
exportRecordJSONPath,
JSON.stringify(exportRecord, null, 2),
);

View File

@@ -59,11 +59,21 @@ const nextConfig = {
GIT_SHA: gitSHA(),
},
// https://dev.to/marcinwosinek/how-to-add-resolve-fallback-to-webpack-5-in-nextjs-10-i6j
// Customize the webpack configuration used by Next.js
webpack: (config, { isServer }) => {
// https://dev.to/marcinwosinek/how-to-add-resolve-fallback-to-webpack-5-in-nextjs-10-i6j
if (!isServer) {
config.resolve.fallback.fs = false;
}
// Suppress the warning "Critical dependency: require function is used
// in a way in which dependencies cannot be statically extracted" when
// import heic-convert.
//
// Upstream issue, which currently doesn't have a workaround.
// https://github.com/catdad-experiments/libheif-js/issues/23
config.ignoreWarnings = [{ module: /libheif-js/ }];
return config;
},
};

View File

@@ -188,6 +188,17 @@ export interface Electron {
* Delete the file at {@link path}.
*/
rm: (path: string) => Promise<void>;
/** Read the string contents of a file at {@link path}. */
readTextFile: (path: string) => Promise<string>;
/**
* Write a string to a file, replacing the file if it already exists.
*
* @param path The path of the file.
* @param contents The string contents to write.
*/
writeFile: (path: string, contents: string) => Promise<void>;
};
/*
@@ -304,8 +315,6 @@ export interface Electron {
path: string,
fileStream: ReadableStream,
) => Promise<void>;
saveFileToDisk: (path: string, contents: string) => Promise<void>;
readTextFile: (path: string) => Promise<string>;
isFolder: (dirPath: string) => Promise<boolean>;
// - Upload

View File

@@ -1,7 +1,11 @@
export async function sleep(time: number) {
await new Promise((resolve) => {
setTimeout(() => resolve(null), time);
});
/**
* Wait for {@link ms} milliseconds
*
* This function is a promisified `setTimeout`. It returns a promise that
* resolves after {@link ms} milliseconds.
*/
export async function sleep(ms: number) {
await new Promise((resolve) => setTimeout(resolve, ms));
}
export function downloadAsFile(filename: string, content: string) {