diff --git a/desktop/src/main/services/ml-util-test.ts b/desktop/src/main/services/ml-util-test.ts deleted file mode 100644 index 54f4cd8e0f..0000000000 --- a/desktop/src/main/services/ml-util-test.ts +++ /dev/null @@ -1,125 +0,0 @@ -// See [Note: Using Electron APIs in UtilityProcess] about what we can and -// cannot import. - -import { ensure, wait } from "../utils/common"; - -/** - * We cannot do - * - * import log from "../log"; - * - * because that requires the Electron APIs that are not available to a utility - * process (See: [Note: Using Electron APIs in UtilityProcess]). But even if - * that were to work, logging will still be problematic since we'd try opening - * the log file from two different Node.js processes (this one, and the main - * one), and I didn't find any indication in the electron-log repository that - * the log file's integrity would be maintained in such cases. - * - * So instead we create this proxy log object that uses `process.parentPort` to - * transport the logs over to the main process. - */ -const log = { - /** - * Unlike the real {@link log.error}, this accepts only the first string - * argument, not the second optional error one. - */ - errorString: (s: string) => mainProcess("log.errorString", s), - info: (...ms: unknown[]) => mainProcess("log.info", ms), - /** - * Unlike the real {@link log.debug}, this is (a) eagerly evaluated, and (b) - * accepts only strings. - */ - debugString: (s: string) => mainProcess("log.debugString", s), -}; - -/** - * Send a message to the main process using a barebones RPC protocol. - */ -const mainProcess = (method: string, param: unknown) => - process.parentPort.postMessage({ method, p: param }); - -log.debugString( - `Started ML worker process with args ${process.argv.join(" ")}`, -); - -process.parentPort.once("message", (e) => { - parseInitData(e.data); - - const port = ensure(e.ports[0]); - port.on("message", (request) => { - void handleMessageFromRenderer(request.data).then((response) => - port.postMessage(response), - ); - }); - port.start(); -}); - -/** - * We cannot access Electron's {@link app} object within a utility process, so - * we pass the value of `app.getPath("userData")` during initialization, and it - * can be subsequently retrieved from here. - */ -let _userDataPath: string | undefined; - -/** Equivalent to app.getPath("userData") */ -const userDataPath = () => ensure(_userDataPath); - -const parseInitData = (data: unknown) => { - if ( - data && - typeof data == "object" && - "userDataPateh" in data && - "userDataPath" in data && - typeof data.userDataPath == "string" - ) { - _userDataPath = data.userDataPath; - } else { - log.errorString("Unparseable initialization data"); - } -}; - -/** - * Our hand-rolled RPC handler and router - the Node.js utility process end. - * - * Sibling of the electronMLWorker function (in `ml/worker.ts`) in the web code. - * - * [Note: Node.js ML worker RPC protocol] - * - * - Each RPC call (i.e. request message) has a "method" (string), "id" - * (number) and "p" (arbitrary param). - * - * - Each RPC result (i.e. response message) has an "id" (number) that is the - * same as the "id" for the request which it corresponds to. - * - * - If the RPC call was a success, then the response messege will have an - * "result" (arbitrary result) property. Otherwise it will have a "error" - * (string) property describing what went wrong. - */ -const handleMessageFromRenderer = async (m: unknown) => { - if (m && typeof m == "object" && "method" in m && "id" in m && "p" in m) { - const id = m.id; - const p = m.p; - try { - switch (m.method) { - case "foo": - if (p && typeof p == "string") - return { id, result: await foo(p) }; - break; - } - } catch (e) { - return { id, error: e instanceof Error ? e.message : String(e) }; - } - return { id, error: "Unknown message" }; - } - - // We don't even have an "id", so at least log it lest the renderer also - // ignore the "id"-less response. - log.info("Ignoring unknown message", m); - return { error: "Unknown message" }; -}; - -const foo = async (a: string) => { - log.info("got message foo with argument", a, userDataPath()); - await wait(0); - return a.length; -}; diff --git a/desktop/src/main/services/ml-utility.ts b/desktop/src/main/services/ml-worker.ts similarity index 63% rename from desktop/src/main/services/ml-utility.ts rename to desktop/src/main/services/ml-worker.ts index 79d39edea4..255dbd3f5b 100644 --- a/desktop/src/main/services/ml-utility.ts +++ b/desktop/src/main/services/ml-worker.ts @@ -5,15 +5,132 @@ * for various tasks are not shipped with the app but are downloaded on demand. */ +// See [Note: Using Electron APIs in UtilityProcess] about what we can and +// cannot import. + import Tokenizer from "clip-bpe-js"; -import { app, net } from "electron/main"; +import { net } from "electron/main"; import { existsSync } from "fs"; import fs from "node:fs/promises"; import path from "node:path"; import * as ort from "onnxruntime-node"; -import log from "../log"; -import { writeStream } from "../stream"; import { ensure, wait } from "../utils/common"; +import { writeStream } from "../utils/stream"; + +/** + * We cannot do + * + * import log from "../log"; + * + * because that requires the Electron APIs that are not available to a utility + * process (See: [Note: Using Electron APIs in UtilityProcess]). But even if + * that were to work, logging will still be problematic since we'd try opening + * the log file from two different Node.js processes (this one, and the main + * one), and I didn't find any indication in the electron-log repository that + * the log file's integrity would be maintained in such cases. + * + * So instead we create this proxy log object that uses `process.parentPort` to + * transport the logs over to the main process. + */ +const log = { + /** + * Unlike the real {@link log.error}, this accepts only the first string + * argument, not the second optional error one. + */ + errorString: (s: string) => mainProcess("log.errorString", s), + info: (...ms: unknown[]) => mainProcess("log.info", ms), + /** + * Unlike the real {@link log.debug}, this is (a) eagerly evaluated, and (b) + * accepts only strings. + */ + debugString: (s: string) => mainProcess("log.debugString", s), +}; + +/** + * Send a message to the main process using a barebones RPC protocol. + */ +const mainProcess = (method: string, param: unknown) => + process.parentPort.postMessage({ method, p: param }); + +log.debugString( + `Started ML worker process with args ${process.argv.join(" ")}`, +); + +process.parentPort.once("message", (e) => { + parseInitData(e.data); + + const port = ensure(e.ports[0]); + port.on("message", (request) => { + void handleMessageFromRenderer(request.data).then((response) => + port.postMessage(response), + ); + }); + port.start(); +}); + +/** + * We cannot access Electron's {@link app} object within a utility process, so + * we pass the value of `app.getPath("userData")` during initialization, and it + * can be subsequently retrieved from here. + */ +let _userDataPath: string | undefined; + +/** Equivalent to app.getPath("userData") */ +const userDataPath = () => ensure(_userDataPath); + +const parseInitData = (data: unknown) => { + if ( + data && + typeof data == "object" && + "userDataPateh" in data && + "userDataPath" in data && + typeof data.userDataPath == "string" + ) { + _userDataPath = data.userDataPath; + } else { + log.errorString("Unparseable initialization data"); + } +}; + +/** + * Our hand-rolled RPC handler and router - the Node.js utility process end. + * + * Sibling of the electronMLWorker function (in `ml/worker.ts`) in the web code. + * + * [Note: Node.js ML worker RPC protocol] + * + * - Each RPC call (i.e. request message) has a "method" (string), "id" + * (number) and "p" (arbitrary param). + * + * - Each RPC result (i.e. response message) has an "id" (number) that is the + * same as the "id" for the request which it corresponds to. + * + * - If the RPC call was a success, then the response messege will have an + * "result" (arbitrary result) property. Otherwise it will have a "error" + * (string) property describing what went wrong. + */ +const handleMessageFromRenderer = async (m: unknown) => { + if (m && typeof m == "object" && "method" in m && "id" in m && "p" in m) { + const id = m.id; + const p = m.p; + try { + switch (m.method) { + case "foo": + if (p && typeof p == "string") + return { id, result: await foo(p) }; + break; + } + } catch (e) { + return { id, error: e instanceof Error ? e.message : String(e) }; + } + return { id, error: "Unknown message" }; + } + + // We don't even have an "id", so at least log it lest the renderer also + // ignore the "id"-less response. + log.info("Ignoring unknown message", m); + return { error: "Unknown message" }; +}; /** * Return a function that can be used to trigger a download of the specified @@ -79,7 +196,7 @@ const modelPathDownloadingIfNeeded = async ( } else { const size = (await fs.stat(modelPath)).size; if (size !== expectedByteSize) { - log.error( + log.errorString( `The size ${size} of model ${modelName} does not match the expected size, downloading again`, ); await downloadModel(modelPath, modelName); @@ -91,7 +208,7 @@ const modelPathDownloadingIfNeeded = async ( /** Return the path where the given {@link modelName} is meant to be saved */ const modelSavePath = (modelName: string) => - path.join(app.getPath("userData"), "models", modelName); + path.join(userDataPath(), "models", modelName); const downloadModel = async (saveLocation: string, name: string) => { // `mkdir -p` the directory where we want to save the model. @@ -138,7 +255,7 @@ export const computeCLIPImageEmbedding = async (input: Float32Array) => { input: new ort.Tensor("float32", input, [1, 3, 224, 224]), }; const results = await session.run(feeds); - log.debug(() => `ONNX/CLIP image embedding took ${Date.now() - t} ms`); + log.debugString(`ONNX/CLIP image embedding took ${Date.now() - t} ms`); /* Need these model specific casts to type the result */ return ensure(results.output).data as Float32Array; }; @@ -184,7 +301,7 @@ export const computeCLIPTextEmbeddingIfAvailable = async (text: string) => { }; const results = await session.run(feeds); - log.debug(() => `ONNX/CLIP text embedding took ${Date.now() - t} ms`); + log.debugString(`ONNX/CLIP text embedding took ${Date.now() - t} ms`); return ensure(results.output).data as Float32Array; }; @@ -203,7 +320,7 @@ export const detectFaces = async (input: Float32Array) => { input: new ort.Tensor("float32", input, [1, 3, 640, 640]), }; const results = await session.run(feeds); - log.debug(() => `ONNX/YOLO face detection took ${Date.now() - t} ms`); + log.debugString(`ONNX/YOLO face detection took ${Date.now() - t} ms`); return ensure(results.output).data; }; @@ -228,7 +345,7 @@ export const computeFaceEmbeddings = async (input: Float32Array) => { const t = Date.now(); const feeds = { img_inputs: inputTensor }; const results = await session.run(feeds); - log.debug(() => `ONNX/MFNT face embedding took ${Date.now() - t} ms`); + log.debugString(`ONNX/MFNT face embedding took ${Date.now() - t} ms`); /* Need these model specific casts to extract and type the result */ return (results.embeddings as unknown as Record) .cpuData as Float32Array; diff --git a/desktop/src/main/stream.ts b/desktop/src/main/stream.ts index c86232fd64..d32eecc627 100644 --- a/desktop/src/main/stream.ts +++ b/desktop/src/main/stream.ts @@ -3,7 +3,6 @@ */ import { net, protocol } from "electron/main"; import { randomUUID } from "node:crypto"; -import { createWriteStream, existsSync } from "node:fs"; import fs from "node:fs/promises"; import { Readable } from "node:stream"; import { ReadableStream } from "node:stream/web"; @@ -12,6 +11,7 @@ import log from "./log"; import { ffmpegConvertToMP4 } from "./services/ffmpeg"; import { markClosableZip, openZip } from "./services/zip"; import { ensure } from "./utils/common"; +import { writeStream } from "./utils/stream"; import { deleteTempFile, deleteTempFileIgnoringErrors, @@ -160,42 +160,6 @@ const handleWrite = async (path: string, request: Request) => { return new Response("", { status: 200 }); }; -/** - * Write a (web) ReadableStream to a file at the given {@link filePath}. - * - * The returned promise resolves when the write completes. - * - * @param filePath The local file system path where the file should be written. - * - * @param readableStream A web - * [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream). - * - */ -export const writeStream = ( - filePath: string, - readableStream: unknown /*ReadableStream*/, // @ts-expect-error [Note: Node and web stream type mismatch] -) => writeNodeStream(filePath, Readable.fromWeb(readableStream)); - -const writeNodeStream = async (filePath: string, fileStream: Readable) => { - const writeable = createWriteStream(filePath); - - fileStream.on("error", (err) => { - writeable.destroy(err); // Close the writable stream with an error - }); - - fileStream.pipe(writeable); - - await new Promise((resolve, reject) => { - writeable.on("finish", resolve); - writeable.on("error", (err) => { - if (existsSync(filePath)) { - void fs.unlink(filePath); - } - reject(err); - }); - }); -}; - /** * A map from token to file paths for convert-to-mp4 requests that we have * received. diff --git a/desktop/src/main/utils/stream.ts b/desktop/src/main/utils/stream.ts new file mode 100644 index 0000000000..f5a98de0f7 --- /dev/null +++ b/desktop/src/main/utils/stream.ts @@ -0,0 +1,39 @@ +import { createWriteStream, existsSync } from "node:fs"; +import fs from "node:fs/promises"; +import { Readable } from "node:stream"; + +/** + * Write a (web) ReadableStream to a file at the given {@link filePath}. + * + * The returned promise resolves when the write completes. + * + * @param filePath The local file system path where the file should be written. + * + * @param readableStream A web + * [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream). + * + */ +export const writeStream = ( + filePath: string, + readableStream: unknown /*ReadableStream*/, // @ts-expect-error [Note: Node and web stream type mismatch] +) => writeNodeStream(filePath, Readable.fromWeb(readableStream)); + +const writeNodeStream = async (filePath: string, fileStream: Readable) => { + const writeable = createWriteStream(filePath); + + fileStream.on("error", (err) => { + writeable.destroy(err); // Close the writable stream with an error + }); + + fileStream.pipe(writeable); + + await new Promise((resolve, reject) => { + writeable.on("finish", resolve); + writeable.on("error", (err) => { + if (existsSync(filePath)) { + void fs.unlink(filePath); + } + reject(err); + }); + }); +};