For real - 1

This commit is contained in:
Manav Rathi
2024-07-30 20:22:09 +05:30
parent 65cfcc27a8
commit 7baacc6a77
4 changed files with 166 additions and 171 deletions

View File

@@ -1,125 +0,0 @@
// See [Note: Using Electron APIs in UtilityProcess] about what we can and
// cannot import.
import { ensure, wait } from "../utils/common";
/**
* We cannot do
*
* import log from "../log";
*
* because that requires the Electron APIs that are not available to a utility
* process (See: [Note: Using Electron APIs in UtilityProcess]). But even if
* that were to work, logging will still be problematic since we'd try opening
* the log file from two different Node.js processes (this one, and the main
* one), and I didn't find any indication in the electron-log repository that
* the log file's integrity would be maintained in such cases.
*
* So instead we create this proxy log object that uses `process.parentPort` to
* transport the logs over to the main process.
*/
const log = {
/**
* Unlike the real {@link log.error}, this accepts only the first string
* argument, not the second optional error one.
*/
errorString: (s: string) => mainProcess("log.errorString", s),
info: (...ms: unknown[]) => mainProcess("log.info", ms),
/**
* Unlike the real {@link log.debug}, this is (a) eagerly evaluated, and (b)
* accepts only strings.
*/
debugString: (s: string) => mainProcess("log.debugString", s),
};
/**
* Send a message to the main process using a barebones RPC protocol.
*/
const mainProcess = (method: string, param: unknown) =>
process.parentPort.postMessage({ method, p: param });
log.debugString(
`Started ML worker process with args ${process.argv.join(" ")}`,
);
process.parentPort.once("message", (e) => {
parseInitData(e.data);
const port = ensure(e.ports[0]);
port.on("message", (request) => {
void handleMessageFromRenderer(request.data).then((response) =>
port.postMessage(response),
);
});
port.start();
});
/**
* We cannot access Electron's {@link app} object within a utility process, so
* we pass the value of `app.getPath("userData")` during initialization, and it
* can be subsequently retrieved from here.
*/
let _userDataPath: string | undefined;
/** Equivalent to app.getPath("userData") */
const userDataPath = () => ensure(_userDataPath);
const parseInitData = (data: unknown) => {
if (
data &&
typeof data == "object" &&
"userDataPateh" in data &&
"userDataPath" in data &&
typeof data.userDataPath == "string"
) {
_userDataPath = data.userDataPath;
} else {
log.errorString("Unparseable initialization data");
}
};
/**
* Our hand-rolled RPC handler and router - the Node.js utility process end.
*
* Sibling of the electronMLWorker function (in `ml/worker.ts`) in the web code.
*
* [Note: Node.js ML worker RPC protocol]
*
* - Each RPC call (i.e. request message) has a "method" (string), "id"
* (number) and "p" (arbitrary param).
*
* - Each RPC result (i.e. response message) has an "id" (number) that is the
* same as the "id" for the request which it corresponds to.
*
* - If the RPC call was a success, then the response messege will have an
* "result" (arbitrary result) property. Otherwise it will have a "error"
* (string) property describing what went wrong.
*/
const handleMessageFromRenderer = async (m: unknown) => {
if (m && typeof m == "object" && "method" in m && "id" in m && "p" in m) {
const id = m.id;
const p = m.p;
try {
switch (m.method) {
case "foo":
if (p && typeof p == "string")
return { id, result: await foo(p) };
break;
}
} catch (e) {
return { id, error: e instanceof Error ? e.message : String(e) };
}
return { id, error: "Unknown message" };
}
// We don't even have an "id", so at least log it lest the renderer also
// ignore the "id"-less response.
log.info("Ignoring unknown message", m);
return { error: "Unknown message" };
};
const foo = async (a: string) => {
log.info("got message foo with argument", a, userDataPath());
await wait(0);
return a.length;
};

View File

@@ -5,15 +5,132 @@
* for various tasks are not shipped with the app but are downloaded on demand.
*/
// See [Note: Using Electron APIs in UtilityProcess] about what we can and
// cannot import.
import Tokenizer from "clip-bpe-js";
import { app, net } from "electron/main";
import { net } from "electron/main";
import { existsSync } from "fs";
import fs from "node:fs/promises";
import path from "node:path";
import * as ort from "onnxruntime-node";
import log from "../log";
import { writeStream } from "../stream";
import { ensure, wait } from "../utils/common";
import { writeStream } from "../utils/stream";
/**
* We cannot do
*
* import log from "../log";
*
* because that requires the Electron APIs that are not available to a utility
* process (See: [Note: Using Electron APIs in UtilityProcess]). But even if
* that were to work, logging will still be problematic since we'd try opening
* the log file from two different Node.js processes (this one, and the main
* one), and I didn't find any indication in the electron-log repository that
* the log file's integrity would be maintained in such cases.
*
* So instead we create this proxy log object that uses `process.parentPort` to
* transport the logs over to the main process.
*/
const log = {
/**
* Unlike the real {@link log.error}, this accepts only the first string
* argument, not the second optional error one.
*/
errorString: (s: string) => mainProcess("log.errorString", s),
info: (...ms: unknown[]) => mainProcess("log.info", ms),
/**
* Unlike the real {@link log.debug}, this is (a) eagerly evaluated, and (b)
* accepts only strings.
*/
debugString: (s: string) => mainProcess("log.debugString", s),
};
/**
* Send a message to the main process using a barebones RPC protocol.
*/
const mainProcess = (method: string, param: unknown) =>
process.parentPort.postMessage({ method, p: param });
log.debugString(
`Started ML worker process with args ${process.argv.join(" ")}`,
);
process.parentPort.once("message", (e) => {
parseInitData(e.data);
const port = ensure(e.ports[0]);
port.on("message", (request) => {
void handleMessageFromRenderer(request.data).then((response) =>
port.postMessage(response),
);
});
port.start();
});
/**
* We cannot access Electron's {@link app} object within a utility process, so
* we pass the value of `app.getPath("userData")` during initialization, and it
* can be subsequently retrieved from here.
*/
let _userDataPath: string | undefined;
/** Equivalent to app.getPath("userData") */
const userDataPath = () => ensure(_userDataPath);
const parseInitData = (data: unknown) => {
if (
data &&
typeof data == "object" &&
"userDataPateh" in data &&
"userDataPath" in data &&
typeof data.userDataPath == "string"
) {
_userDataPath = data.userDataPath;
} else {
log.errorString("Unparseable initialization data");
}
};
/**
* Our hand-rolled RPC handler and router - the Node.js utility process end.
*
* Sibling of the electronMLWorker function (in `ml/worker.ts`) in the web code.
*
* [Note: Node.js ML worker RPC protocol]
*
* - Each RPC call (i.e. request message) has a "method" (string), "id"
* (number) and "p" (arbitrary param).
*
* - Each RPC result (i.e. response message) has an "id" (number) that is the
* same as the "id" for the request which it corresponds to.
*
* - If the RPC call was a success, then the response messege will have an
* "result" (arbitrary result) property. Otherwise it will have a "error"
* (string) property describing what went wrong.
*/
const handleMessageFromRenderer = async (m: unknown) => {
if (m && typeof m == "object" && "method" in m && "id" in m && "p" in m) {
const id = m.id;
const p = m.p;
try {
switch (m.method) {
case "foo":
if (p && typeof p == "string")
return { id, result: await foo(p) };
break;
}
} catch (e) {
return { id, error: e instanceof Error ? e.message : String(e) };
}
return { id, error: "Unknown message" };
}
// We don't even have an "id", so at least log it lest the renderer also
// ignore the "id"-less response.
log.info("Ignoring unknown message", m);
return { error: "Unknown message" };
};
/**
* Return a function that can be used to trigger a download of the specified
@@ -79,7 +196,7 @@ const modelPathDownloadingIfNeeded = async (
} else {
const size = (await fs.stat(modelPath)).size;
if (size !== expectedByteSize) {
log.error(
log.errorString(
`The size ${size} of model ${modelName} does not match the expected size, downloading again`,
);
await downloadModel(modelPath, modelName);
@@ -91,7 +208,7 @@ const modelPathDownloadingIfNeeded = async (
/** Return the path where the given {@link modelName} is meant to be saved */
const modelSavePath = (modelName: string) =>
path.join(app.getPath("userData"), "models", modelName);
path.join(userDataPath(), "models", modelName);
const downloadModel = async (saveLocation: string, name: string) => {
// `mkdir -p` the directory where we want to save the model.
@@ -138,7 +255,7 @@ export const computeCLIPImageEmbedding = async (input: Float32Array) => {
input: new ort.Tensor("float32", input, [1, 3, 224, 224]),
};
const results = await session.run(feeds);
log.debug(() => `ONNX/CLIP image embedding took ${Date.now() - t} ms`);
log.debugString(`ONNX/CLIP image embedding took ${Date.now() - t} ms`);
/* Need these model specific casts to type the result */
return ensure(results.output).data as Float32Array;
};
@@ -184,7 +301,7 @@ export const computeCLIPTextEmbeddingIfAvailable = async (text: string) => {
};
const results = await session.run(feeds);
log.debug(() => `ONNX/CLIP text embedding took ${Date.now() - t} ms`);
log.debugString(`ONNX/CLIP text embedding took ${Date.now() - t} ms`);
return ensure(results.output).data as Float32Array;
};
@@ -203,7 +320,7 @@ export const detectFaces = async (input: Float32Array) => {
input: new ort.Tensor("float32", input, [1, 3, 640, 640]),
};
const results = await session.run(feeds);
log.debug(() => `ONNX/YOLO face detection took ${Date.now() - t} ms`);
log.debugString(`ONNX/YOLO face detection took ${Date.now() - t} ms`);
return ensure(results.output).data;
};
@@ -228,7 +345,7 @@ export const computeFaceEmbeddings = async (input: Float32Array) => {
const t = Date.now();
const feeds = { img_inputs: inputTensor };
const results = await session.run(feeds);
log.debug(() => `ONNX/MFNT face embedding took ${Date.now() - t} ms`);
log.debugString(`ONNX/MFNT face embedding took ${Date.now() - t} ms`);
/* Need these model specific casts to extract and type the result */
return (results.embeddings as unknown as Record<string, unknown>)
.cpuData as Float32Array;

View File

@@ -3,7 +3,6 @@
*/
import { net, protocol } from "electron/main";
import { randomUUID } from "node:crypto";
import { createWriteStream, existsSync } from "node:fs";
import fs from "node:fs/promises";
import { Readable } from "node:stream";
import { ReadableStream } from "node:stream/web";
@@ -12,6 +11,7 @@ import log from "./log";
import { ffmpegConvertToMP4 } from "./services/ffmpeg";
import { markClosableZip, openZip } from "./services/zip";
import { ensure } from "./utils/common";
import { writeStream } from "./utils/stream";
import {
deleteTempFile,
deleteTempFileIgnoringErrors,
@@ -160,42 +160,6 @@ const handleWrite = async (path: string, request: Request) => {
return new Response("", { status: 200 });
};
/**
* Write a (web) ReadableStream to a file at the given {@link filePath}.
*
* The returned promise resolves when the write completes.
*
* @param filePath The local file system path where the file should be written.
*
* @param readableStream A web
* [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
*
*/
export const writeStream = (
filePath: string,
readableStream: unknown /*ReadableStream*/, // @ts-expect-error [Note: Node and web stream type mismatch]
) => writeNodeStream(filePath, Readable.fromWeb(readableStream));
const writeNodeStream = async (filePath: string, fileStream: Readable) => {
const writeable = createWriteStream(filePath);
fileStream.on("error", (err) => {
writeable.destroy(err); // Close the writable stream with an error
});
fileStream.pipe(writeable);
await new Promise((resolve, reject) => {
writeable.on("finish", resolve);
writeable.on("error", (err) => {
if (existsSync(filePath)) {
void fs.unlink(filePath);
}
reject(err);
});
});
};
/**
* A map from token to file paths for convert-to-mp4 requests that we have
* received.

View File

@@ -0,0 +1,39 @@
import { createWriteStream, existsSync } from "node:fs";
import fs from "node:fs/promises";
import { Readable } from "node:stream";
/**
* Write a (web) ReadableStream to a file at the given {@link filePath}.
*
* The returned promise resolves when the write completes.
*
* @param filePath The local file system path where the file should be written.
*
* @param readableStream A web
* [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
*
*/
export const writeStream = (
filePath: string,
readableStream: unknown /*ReadableStream*/, // @ts-expect-error [Note: Node and web stream type mismatch]
) => writeNodeStream(filePath, Readable.fromWeb(readableStream));
const writeNodeStream = async (filePath: string, fileStream: Readable) => {
const writeable = createWriteStream(filePath);
fileStream.on("error", (err) => {
writeable.destroy(err); // Close the writable stream with an error
});
fileStream.pipe(writeable);
await new Promise((resolve, reject) => {
writeable.on("finish", resolve);
writeable.on("error", (err) => {
if (existsSync(filePath)) {
void fs.unlink(filePath);
}
reject(err);
});
});
};