Live update

This commit is contained in:
Manav Rathi
2024-07-11 14:56:55 +05:30
parent 544a5a9ccc
commit 936e0470e6
7 changed files with 40 additions and 7 deletions

View File

@@ -7,7 +7,7 @@ import { renderableImageBlob } from "../../utils/file";
import { readStream } from "../../utils/native-stream";
import DownloadManager from "../download";
import type { UploadItem } from "../upload/types";
import type { MLWorkerElectron } from "./worker-electron";
import type { MLWorkerElectron } from "./worker-types";
export interface ImageBitmapAndData {
bitmap: ImageBitmap;

View File

@@ -4,7 +4,7 @@ import type { ImageBitmapAndData } from "./bitmap";
import { clipIndexes } from "./db";
import { pixelRGBBicubic } from "./image";
import { dotProduct, norm } from "./math";
import type { MLWorkerElectron } from "./worker-electron";
import type { MLWorkerElectron } from "./worker-types";
/**
* The version of the CLIP indexing pipeline implemented by the current client.

View File

@@ -26,7 +26,7 @@ import {
warpAffineFloat32List,
} from "./image";
import { clamp } from "./math";
import type { MLWorkerElectron } from "./worker-electron";
import type { MLWorkerElectron } from "./worker-types";
/**
* The version of the face indexing pipeline implemented by the current client.

View File

@@ -68,12 +68,17 @@ const createComlinkWorker = async () => {
computeFaceEmbeddings: electron.computeFaceEmbeddings,
computeCLIPImageEmbedding: electron.computeCLIPImageEmbedding,
};
const delegate = {
workerDidProcessFile,
};
const cw = new ComlinkWorker<typeof MLWorker>(
"ML",
new Worker(new URL("worker.ts", import.meta.url)),
);
await cw.remote.then((w) => w.init(proxy(mlWorkerElectron)));
await cw.remote.then((w) =>
w.init(proxy(mlWorkerElectron), proxy(delegate)),
);
return cw;
};
@@ -405,6 +410,8 @@ const setInterimScheduledStatus = () => {
setMLStatusSnapshot({ phase: "scheduled", nSyncedFiles, nTotalFiles });
};
const workerDidProcessFile = () => triggerStatusUpdate();
/**
* Return the IDs of all the faces in the given {@link enteFile} that are not
* associated with a person cluster.

View File

@@ -1,3 +1,8 @@
/**
* @file Type for the objects shared (as a Comlink proxy) by the main thread and
* the ML worker.
*/
/**
* A subset of {@link Electron} provided to the {@link MLWorker}.
*
@@ -12,3 +17,16 @@ export interface MLWorkerElectron {
computeFaceEmbeddings: (input: Float32Array) => Promise<Float32Array>;
computeCLIPImageEmbedding: (input: Float32Array) => Promise<Float32Array>;
}
/**
* Callbacks invoked by the worker at various points in the indexing pipeline to
* notify the main thread of events it might be interested in.
*/
export interface MLWorkerDelegate {
/**
* Called whenever a file is processed during indexing.
*
* It is called both when the indexing was successful or failed.
*/
workerDidProcessFile: () => void;
}

View File

@@ -22,7 +22,7 @@ import {
} from "./db";
import { pullFaceEmbeddings, putCLIPIndex, putFaceIndex } from "./embedding";
import { indexFaces, type FaceIndex } from "./face";
import type { MLWorkerElectron } from "./worker-electron";
import type { MLWorkerDelegate, MLWorkerElectron } from "./worker-types";
const idleDurationStart = 5; /* 5 seconds */
const idleDurationMax = 16 * 60; /* 16 minutes */
@@ -56,6 +56,7 @@ interface IndexableItem {
*/
export class MLWorker {
private electron: MLWorkerElectron | undefined;
private delegate: MLWorkerDelegate | undefined;
private userAgent: string | undefined;
private state: "idle" | "pull" | "indexing" = "idle";
private shouldPull = false;
@@ -73,9 +74,13 @@ export class MLWorker {
* @param electron The {@link MLWorkerElectron} that allows the worker to
* use the functionality provided by our Node.js layer when running in the
* context of our desktop app
*
* @param delegate The {@link MLWorkerDelegate} the worker can use to inform
* the main thread of interesting events.
*/
async init(electron: MLWorkerElectron) {
async init(electron: MLWorkerElectron, delegate?: MLWorkerDelegate) {
this.electron = electron;
this.delegate = delegate;
// Set the user agent that'll be set in the generated embeddings.
this.userAgent = `${clientPackageName}/${await electron.appVersion()}`;
// Initialize the downloadManager running in the web worker with the
@@ -202,6 +207,7 @@ export class MLWorker {
items,
ensure(this.electron),
ensure(this.userAgent),
this.delegate,
);
if (allSuccess) {
// Everything is running smoothly. Reset the idle duration.
@@ -276,6 +282,7 @@ const indexNextBatch = async (
items: IndexableItem[],
electron: MLWorkerElectron,
userAgent: string,
delegate: MLWorkerDelegate | undefined,
) => {
// Don't try to index if we wouldn't be able to upload them anyway. The
// liveQ has already been drained, but that's fine, it'll be rare that we
@@ -293,6 +300,7 @@ const indexNextBatch = async (
for (const { enteFile, uploadItem } of items) {
try {
await index(enteFile, uploadItem, electron, userAgent);
delegate?.workerDidProcessFile();
// Possibly unnecessary, but let us drain the microtask queue.
await wait(0);
} catch {

View File

@@ -7,7 +7,7 @@
*/
import type { Electron, ZipItem } from "@/next/types/ipc";
import type { MLWorkerElectron } from "../services/ml/worker-electron";
import type { MLWorkerElectron } from "../services/ml/worker-types";
/**
* Stream the given file or zip entry from the user's local file system.