This commit is contained in:
Manav Rathi
2024-07-03 10:22:11 +05:30
parent 0814fd42d1
commit 08a23d8733

View File

@@ -6,6 +6,25 @@ import log from "@/next/log";
import { fileLogID } from "../../utils/file";
import { pullFaceEmbeddings, putFaceIndex } from "./embedding";
import { indexFaces } from "./index-face";
import { wait } from "@/utils/promise";
/**
* The MLWorker state machine.
*
* ext. event state then state
* ------------- --------------- --------------
* sync -> "pull" -> "idle"
* upload -> "liveq" -> "idle"
* idleTimeout -> "backfillq" -> "idle"
*
* where
*
* - "pull": pulling embeddings from remote
* - "liveq": indexing items that are being uploaded
* - "backfillq": indexing unindexed items otherwise
* - "idle": in between state transitions
*/
type MLWorkerState = "idle" | "pull" | "liveq" | "backfillq";
/**
* Run operations related to machine learning (e.g. indexing) in a Web Worker.
@@ -15,16 +34,73 @@ import { indexFaces } from "./index-face";
* tasks that might degrade interactivity.
*/
export class MLWorker {
private state: MLWorkerState = "idle";
private isSyncing = false;
private shouldSync = false;
private liveQ: EnteFile[] = [];
private idleTimeout: ReturnType<typeof setTimeout> | undefined;
/**
* Pull embeddings from remote, and start backfilling if needed.
*
* This function enqueues the pull and returns immediately without waiting
* for the pull to complete.
*/
async sync() {
if (this.isSyncing) return;
this.isSyncing = true;
sync() {
this.shouldSync = true;
if (this.idleTimeout) {
clearTimeout(this.idleTimeout);
this.idleTimeout = undefined;
this.tick();
} else {
// this.tick will get run when the current task finishes.
}
}
private tick() {
// Schedule a new macrotask (by using setTimeout) instead of scheduling
// a new microtask (by directly resolving the promise). This is likely
// unnecessary; I'm doing this as a partially out of superstition aiming
// to to give GC a chance to run if needed and generally ease execution
// / memory pressure.
const next = () => setTimeout(() => this.tick(), 0);
// If we've been asked to sync, do that irrespective of anything else.
if (this.shouldSync) {
this.shouldSync = false;
this.state = "pull";
void this.pull().then(next);
return;
}
// Otherwise see if there is something in the live queue.
if (this.liveQ.length > 0) {
this.state = "liveq";
void this.liveq().then(next);
return;
}
// // Otherwise check to see if there is something to backfill.
// const { indexableCount } = await indexedAndIndexableCounts();
// if (indexableCount > 0) {
// this.state = "backfillq";
// void this.liveq().then(() => this.tick());
// return;
// }
// if (this.isSyncing) return;
// this.isSyncing = true;
// this.isSyncing = false;
// void this.next();
}
async pull() {
await pullFaceEmbeddings();
this.isSyncing = false;
}
async liveq() {
console.log("liveq");
await wait(0);
}
}