From 08a23d87334c5dc7f284fe89bc39cc681f645320 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Wed, 3 Jul 2024 10:22:11 +0530 Subject: [PATCH] Sketch --- web/packages/new/photos/services/ml/worker.ts | 84 ++++++++++++++++++- 1 file changed, 80 insertions(+), 4 deletions(-) diff --git a/web/packages/new/photos/services/ml/worker.ts b/web/packages/new/photos/services/ml/worker.ts index 3fe960773a..a854bfa6e3 100644 --- a/web/packages/new/photos/services/ml/worker.ts +++ b/web/packages/new/photos/services/ml/worker.ts @@ -6,6 +6,25 @@ import log from "@/next/log"; import { fileLogID } from "../../utils/file"; import { pullFaceEmbeddings, putFaceIndex } from "./embedding"; import { indexFaces } from "./index-face"; +import { wait } from "@/utils/promise"; + +/** + * The MLWorker state machine. + * + * ext. event state then state + * ------------- --------------- -------------- + * sync -> "pull" -> "idle" + * upload -> "liveq" -> "idle" + * idleTimeout -> "backfillq" -> "idle" + * + * where + * + * - "pull": pulling embeddings from remote + * - "liveq": indexing items that are being uploaded + * - "backfillq": indexing unindexed items otherwise + * - "idle": in between state transitions + */ +type MLWorkerState = "idle" | "pull" | "liveq" | "backfillq"; /** * Run operations related to machine learning (e.g. indexing) in a Web Worker. @@ -15,16 +34,73 @@ import { indexFaces } from "./index-face"; * tasks that might degrade interactivity. */ export class MLWorker { + private state: MLWorkerState = "idle"; private isSyncing = false; + private shouldSync = false; + private liveQ: EnteFile[] = []; + private idleTimeout: ReturnType | undefined; /** * Pull embeddings from remote, and start backfilling if needed. + * + * This function enqueues the pull and returns immediately without waiting + * for the pull to complete. */ - async sync() { - if (this.isSyncing) return; - this.isSyncing = true; + sync() { + this.shouldSync = true; + if (this.idleTimeout) { + clearTimeout(this.idleTimeout); + this.idleTimeout = undefined; + this.tick(); + } else { + // this.tick will get run when the current task finishes. + } + } + + private tick() { + // Schedule a new macrotask (by using setTimeout) instead of scheduling + // a new microtask (by directly resolving the promise). This is likely + // unnecessary; I'm doing this as a partially out of superstition aiming + // to to give GC a chance to run if needed and generally ease execution + // / memory pressure. + const next = () => setTimeout(() => this.tick(), 0); + + // If we've been asked to sync, do that irrespective of anything else. + if (this.shouldSync) { + this.shouldSync = false; + this.state = "pull"; + void this.pull().then(next); + return; + } + + // Otherwise see if there is something in the live queue. + if (this.liveQ.length > 0) { + this.state = "liveq"; + void this.liveq().then(next); + return; + } + + // // Otherwise check to see if there is something to backfill. + // const { indexableCount } = await indexedAndIndexableCounts(); + // if (indexableCount > 0) { + // this.state = "backfillq"; + // void this.liveq().then(() => this.tick()); + // return; + // } + + // if (this.isSyncing) return; + // this.isSyncing = true; + // this.isSyncing = false; + // void this.next(); + } + + async pull() { await pullFaceEmbeddings(); - this.isSyncing = false; + } + + async liveq() { + console.log("liveq"); + await wait(0); } }