Intertwine
This commit is contained in:
@@ -1,8 +1,4 @@
|
||||
import {
|
||||
indexedAndIndexableCounts,
|
||||
markIndexingFailed,
|
||||
saveFaceIndex,
|
||||
} from "@/new/photos/services/ml/db";
|
||||
import { markIndexingFailed, saveFaceIndex } from "@/new/photos/services/ml/db";
|
||||
import type { FaceIndex } from "@/new/photos/services/ml/types";
|
||||
import type { EnteFile } from "@/new/photos/types/file";
|
||||
import log from "@/next/log";
|
||||
@@ -86,67 +82,74 @@ export class MLWorker {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a file is uploaded from the current client.
|
||||
*
|
||||
* This is a great opportunity to index since we already have the in-memory
|
||||
* representation of the file's contents with us and won't need to download
|
||||
* the file from remote.
|
||||
*/
|
||||
didUpload(file: EnteFile) {
|
||||
// Add the recently uploaded file to the live indexing queue.
|
||||
//
|
||||
// Limit the queue to some maximum so that we don't keep growing
|
||||
// indefinitely (and cause memory pressure) if the speed of uploads is
|
||||
// exceeding the speed of indexing.
|
||||
//
|
||||
// In general, we can be sloppy with the items in the live queue (as
|
||||
// long as we're not systematically ignoring it). This is because the
|
||||
// live queue is just an optimization: if a file doesn't get indexed via
|
||||
// the live queue, it'll later get indexed anyway when we backfill.
|
||||
if (this.liveQ.length < 50) {
|
||||
this.liveQ.push(file);
|
||||
this.wakeUp();
|
||||
} else {
|
||||
log.debug(() => "Ignoring liveQ item since liveQ is full");
|
||||
}
|
||||
}
|
||||
|
||||
private async tick() {
|
||||
// Schedule a new macrotask (by using setTimeout) instead of scheduling
|
||||
// a new microtask (by directly resolving the promise). This is likely
|
||||
// unnecessary; I'm doing this partially out of superstition, aiming to
|
||||
// to give GC a chance to run if needed, and also generally ease
|
||||
// execution and memory pressure.
|
||||
const next = () => void setTimeout(() => this.tick(), 0);
|
||||
|
||||
// If we've been asked to sync, do that irrespective of anything else.
|
||||
if (this.shouldSync) {
|
||||
this.shouldSync = false;
|
||||
// Reset the idle duration too.
|
||||
// Reset the idle duration as we start pulling.
|
||||
this.idleDuration = idleDurationStart;
|
||||
await this.pull();
|
||||
// Call tick again once the pull is done.
|
||||
void this.pull().then(next);
|
||||
// Return without waiting for the pull.
|
||||
return;
|
||||
}
|
||||
|
||||
const liveQ = this.liveQ;
|
||||
this.liveQ = [];
|
||||
const allSuccess = await indexNextBatch(ensure(this.userAgent), liveQ);
|
||||
if (allSuccess) {
|
||||
// Everything is running smoothly. Reset the idle duration.
|
||||
this.idleDuration = idleDurationStart;
|
||||
// And tick again.
|
||||
next();
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise see if there is something in the live queue.
|
||||
if (this.liveQ.length > 0) {
|
||||
this.idleDuration = idleDurationStart;
|
||||
void this.liveq().then(next);
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise check to see if there is something to backfill.
|
||||
const { indexableCount } = await indexedAndIndexableCounts();
|
||||
if (indexableCount > 0) {
|
||||
const allSuccess = await backfill(ensure(this.userAgent));
|
||||
if (allSuccess) {
|
||||
// Everything is running smoothly. Reset the idle duration.
|
||||
this.idleDuration = idleDurationStart;
|
||||
// And tick again.
|
||||
next();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// We come here in two scenarios:
|
||||
// We come here in three scenarios - either there is nothing left to do,
|
||||
// or we cannot currently do it (e.g. user is offline), or we
|
||||
// encountered failures during indexing.
|
||||
//
|
||||
// 1. Nothing to do.
|
||||
// Failures are not really expected, so something unexpected might be
|
||||
// going on, or remote might be having issues.
|
||||
//
|
||||
// 2. We encountered failures during backfill. Failures are not really
|
||||
// expected, so something unexpected might be going on, or remote
|
||||
// might be having issues.
|
||||
//
|
||||
// In both cases, we pause for exponentially longer durations of time
|
||||
// So in all cases, we pause for exponentially longer durations of time
|
||||
// (limited to some maximum).
|
||||
|
||||
this.idleDuration = Math.min(this.idleDuration * 2, idleDurationMax);
|
||||
this.idleTimeout = setTimeout(next, this.idleDuration * 1000);
|
||||
}
|
||||
|
||||
async pull() {
|
||||
private async pull() {
|
||||
await pullFaceEmbeddings();
|
||||
}
|
||||
|
||||
async liveq() {
|
||||
console.log("liveq");
|
||||
await wait(0);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO-ML: Temorarily disable
|
||||
@@ -154,16 +157,34 @@ export class MLWorker {
|
||||
|
||||
/**
|
||||
* Find out files which need to be indexed. Then index the next batch of them.
|
||||
*
|
||||
* Returns `false` to indicate that either an error occurred, or there are no
|
||||
* more files to process, or that we cannot currently process files.
|
||||
*
|
||||
* Which means that when it returns true, all is well and there are more
|
||||
* things pending to process, so we should chug along at full speed.
|
||||
*/
|
||||
const backfill = async (userAgent: string) => {
|
||||
const indexNextBatch = async (userAgent: string, liveQ: EnteFile[]) => {
|
||||
if (!self.navigator.onLine) {
|
||||
log.info("Skipping ML indexing since we are not online");
|
||||
return false;
|
||||
}
|
||||
|
||||
const userID = ensure(await getKVN("userID"));
|
||||
|
||||
const files = await syncWithLocalFilesAndGetFilesToIndex(userID, 200);
|
||||
const files =
|
||||
liveQ.length > 0
|
||||
? liveQ
|
||||
: await syncWithLocalFilesAndGetFilesToIndex(userID, 200);
|
||||
if (files.length == 0) return false;
|
||||
|
||||
let allSuccess = true;
|
||||
for (const file of files) {
|
||||
try {
|
||||
await index(file, undefined, userAgent);
|
||||
// Let the event loop run so that other events (like didUpload) can
|
||||
// be acknowledged and noted down.
|
||||
await wait(0);
|
||||
} catch {
|
||||
allSuccess = false;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user