From c4103f91364bf4cbe87a31e0c2ad82ac212ac1a3 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 09:49:54 +0530 Subject: [PATCH 01/10] Restore the pull scaffolding Partially reverts 61b98a99646fc064ba54f51495e0acc3b6c4e5c0 --- web/packages/new/photos/services/ml/worker.ts | 68 ++++++++++++++++--- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/web/packages/new/photos/services/ml/worker.ts b/web/packages/new/photos/services/ml/worker.ts index 37d6dc259e..970fcac02d 100644 --- a/web/packages/new/photos/services/ml/worker.ts +++ b/web/packages/new/photos/services/ml/worker.ts @@ -64,15 +64,16 @@ interface IndexableItem { * * ext. event state then state * ------------- --------------- -------------- + * sync -> "pull" -> "idle" * sync -> "backfillq" -> "idle" * upload -> "liveq" -> "idle" * idleTimeout -> "backfillq" -> "idle" * * where: * + * - "pull": pull existing embeddings from remote. * - "liveq": indexing items that are being uploaded, - * - "backfillq": fetching remote embeddings of unindexed items, and then - * indexing them if needed, + * - "backfillq": index unindexed items otherwise. * - "idle": in between state transitions. * * In addition, MLWorker can also be invoked for interactive tasks: in @@ -81,7 +82,9 @@ interface IndexableItem { export class MLWorker { private electron: ElectronMLWorker | undefined; private delegate: MLWorkerDelegate | undefined; - private state: "idle" | "indexing" = "idle"; + private state: "idle" | "pull" | "indexing" = "idle"; + private shouldPull = false; + private havePulledAtLeastOnce = false; private liveQ: IndexableItem[] = []; private idleTimeout: ReturnType | undefined; private idleDuration = idleDurationStart; /* unit: seconds */ @@ -127,14 +130,23 @@ export class MLWorker { } /** - * Start backfilling if needed. + * Pull embeddings from remote, and start backfilling if needed. * - * This function enqueues a backfill attempt and returns immediately without - * waiting for it complete. During a backfill, it will first attempt to - * fetch embeddings for files which don't have that data locally. If we - * fetch and find what we need, we save it locally. Otherwise we index them. + * This function enqueues a pull and returns immediately without waiting for + * the pull to complete. + * + * Once the pull is done, it then schedules a backfill. So calling this also + * implicitly triggers a backfill (which is why we call it a less-precise + * "sync" instead of "pull"). + * + * During a backfill we will first attempt to fetch embeddings for files + * which don't have that data locally. If we fetch and find what we need, we + * save it locally. Otherwise we index them. This the pull upfront is not + * necessary, but it helps a new client get up to speed faster since it can + * fetch all existing embeddings first before getting down to the indexing. */ sync() { + this.shouldPull = true; this.wakeUp(); } @@ -200,18 +212,49 @@ export class MLWorker { { state: this.state, liveQ: this.liveQ, + shouldPull: this.shouldPull, idleDuration: this.idleDuration, }, ]); const scheduleTick = () => void setTimeout(() => this.tick(), 0); + // If we've been asked to pull, do that first (before indexing). + if (this.shouldPull) { + // Allow this flag to be reset while we're pulling (triggering + // another pull when we tick next). + this.shouldPull = false; + this.state = "pull"; + try { + const didPull = await pull(); + // Mark that we completed one attempt at pulling successfully + // (irrespective of whether or not that got us some data). + this.havePulledAtLeastOnce = true; + // Reset the idle duration if we did pull something. + if (didPull) this.idleDuration = idleDurationStart; + } catch (e) { + log.error("Failed to pull embeddings", e); + } + // Tick again, even if we got an error. + // + // While the backfillQ won't be processed until at least a pull has + // happened once (`havePulledAtLeastOnce`), the liveQ can still be + // processed since these are new files without remote embeddings. + scheduleTick(); + return; + } + const liveQ = this.liveQ; this.liveQ = []; this.state = "indexing"; - // Use the liveQ if present, otherwise get the next batch to backfill. - const items = liveQ.length > 0 ? liveQ : await this.backfillQ(); + // Use the liveQ if present, otherwise get the next batch to backfill, + // but only after we've pulled once from remote successfully. + const items = liveQ.length + ? liveQ + : this.havePulledAtLeastOnce + ? await this.backfillQ() + : []; const allSuccess = await indexNextBatch( items, @@ -263,6 +306,11 @@ export class MLWorker { expose(MLWorker); +// eslint-disable-next-line @typescript-eslint/require-await +const pull = async () => { + return ""; +}; + /** * Find out files which need to be indexed. Then index the next batch of them. * From c369db9453808d48b90cfbd6b32e2216cfaa15b2 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 10:12:40 +0530 Subject: [PATCH 02/10] Impl handler for /embeddings/indexed-files https://github.com/ente-io/ente/pull/2511/ --- .../new/photos/services/ml/embedding.ts | 51 ++++++++++++++++++- web/packages/new/photos/services/ml/worker.ts | 5 ++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/web/packages/new/photos/services/ml/embedding.ts b/web/packages/new/photos/services/ml/embedding.ts index 6da4e765d1..16d3adaec9 100644 --- a/web/packages/new/photos/services/ml/embedding.ts +++ b/web/packages/new/photos/services/ml/embedding.ts @@ -235,7 +235,7 @@ const remoteDerivedDataFromJSONString = (jsonString: string) => { * @param fileIDs The ids of the files for which we want the embeddings. * * @returns a list of {@link RemoteEmbedding} for the files which had embeddings - * (and thatt remote was able to successfully retrieve). The order of this list + * (and that remote was able to successfully retrieve). The order of this list * is arbitrary, and the caller should use the {@link fileID} present within the * {@link RemoteEmbedding} to associate an item in the result back to a file * instead of relying on the order or count of items in the result. @@ -308,3 +308,52 @@ const putEmbedding = async ( }); ensureOk(res); }; + +/** A single entry in the response of {@link getIndexedFiles}. */ +const IndexedFile = z.object({ + fileID: z.number(), + updatedAt: z.number(), +}); + +type IndexedFile = z.infer; + +/** + * Fetch the file ids for {@link model} embeddings that have been created or + * updated since the given {@link sinceTime}. + * + * This allows a client to perform a quick "diff" and get the list of files that + * has changed since the last time it checked. It can then fetch those + * corresponding embeddings using the regular fetch API, this speeding up the + * initial sync on a new client. + * + * @param model The {@link EmbeddingModel} which we want. + * + * @param sinceTime Epoch milliseconds. Ask remote to provide us embeddings + * whose {@link updatedAt} is more than the given value. + * + * @param limit The maximum number of files to provide in the response. + * + * @returns a list of {@link RemoteEmbedding} for the files which had embeddings + * (and thatt remote was able to successfully retrieve). The order of this list + * is arbitrary, and the caller should use the {@link fileID} present within the + * {@link RemoteEmbedding} to associate an item in the result back to a file + * instead of relying on the order or count of items in the result. + */ +const getIndexedFiles = async ( + model: EmbeddingModel, + sinceTime: number, + limit: number, +): Promise => { + const params = new URLSearchParams({ + model, + sinceTime: sinceTime.toString(), + limit: limit.toString(), + }); + const url = await apiURL("/embeddings/indexed-files"); + const res = await fetch(`${url}?${params.toString()}`, { + headers: await authenticatedRequestHeaders(), + }); + ensureOk(res); + return z.object({ diff: z.array(IndexedFile) }).parse(await res.json()) + .diff; +}; diff --git a/web/packages/new/photos/services/ml/worker.ts b/web/packages/new/photos/services/ml/worker.ts index 970fcac02d..8ebe207aac 100644 --- a/web/packages/new/photos/services/ml/worker.ts +++ b/web/packages/new/photos/services/ml/worker.ts @@ -306,6 +306,11 @@ export class MLWorker { expose(MLWorker); +/** + * Pull embeddings from remote. + * + * Return true atleast one embedding was pulled. + */ // eslint-disable-next-line @typescript-eslint/require-await const pull = async () => { return ""; From 586d8f86f71ad91bded3461b55e826f65ca66968 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 10:32:47 +0530 Subject: [PATCH 03/10] Up --- .../new/photos/services/ml/embedding.ts | 50 +++++++++++++------ 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/web/packages/new/photos/services/ml/embedding.ts b/web/packages/new/photos/services/ml/embedding.ts index 16d3adaec9..1bb54e7669 100644 --- a/web/packages/new/photos/services/ml/embedding.ts +++ b/web/packages/new/photos/services/ml/embedding.ts @@ -309,6 +309,37 @@ const putEmbedding = async ( ensureOk(res); }; +/** + * Fetch new {@link model} embeddings since the given {@link sinceTime}. + * + * This allows a client to perform a quick "diff" and get embeddings that has + * changed (created or updated) since the last time it checked. By fetching + * these all upfront instead of doing them one by one during the indexing, we + * can speed up the initial sync of existing embeddings on a new client. + * + * @param model The {@link EmbeddingModel} which we want. + * + * @param sinceTime Epoch milliseconds. We use this to ask remote to provide us + * embeddings whose {@link updatedAt} is more than the given value. If not + * specified, then we'll start from the beginning. + * + * @param limit The maximum number of files to provide in the response. + * + * @returns a list of {@link RemoteEmbedding}, and the latest {@link updatedAt} + * from amongst all embeddings that were fetched. The caller should persist that + * and use it in subsequent calls to {@link pullEmbeddings} to resume pulling + * from the current checkpoint. + * + * Returns undefined if nothing more is left to pull. + */ +const pullEmbeddings = async ( + model: EmbeddingModel, + sinceTime: number | undefined, + limit: number, +) => { + getIndexedFiles(model) +}; + /** A single entry in the response of {@link getIndexedFiles}. */ const IndexedFile = z.object({ fileID: z.number(), @@ -321,23 +352,10 @@ type IndexedFile = z.infer; * Fetch the file ids for {@link model} embeddings that have been created or * updated since the given {@link sinceTime}. * - * This allows a client to perform a quick "diff" and get the list of files that - * has changed since the last time it checked. It can then fetch those - * corresponding embeddings using the regular fetch API, this speeding up the - * initial sync on a new client. + * See {@link pullEmbeddings} for details about the parameters. * - * @param model The {@link EmbeddingModel} which we want. - * - * @param sinceTime Epoch milliseconds. Ask remote to provide us embeddings - * whose {@link updatedAt} is more than the given value. - * - * @param limit The maximum number of files to provide in the response. - * - * @returns a list of {@link RemoteEmbedding} for the files which had embeddings - * (and thatt remote was able to successfully retrieve). The order of this list - * is arbitrary, and the caller should use the {@link fileID} present within the - * {@link RemoteEmbedding} to associate an item in the result back to a file - * instead of relying on the order or count of items in the result. + * @returns an array of file ids, each with an associated timestamp when the + * embedding for that file was last changed. */ const getIndexedFiles = async ( model: EmbeddingModel, From f869447c7deb83e991cc57e7ed4238c20f8ed639 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 10:41:31 +0530 Subject: [PATCH 04/10] File IDs --- .../new/photos/services/ml/embedding.ts | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/web/packages/new/photos/services/ml/embedding.ts b/web/packages/new/photos/services/ml/embedding.ts index 1bb54e7669..0588340db7 100644 --- a/web/packages/new/photos/services/ml/embedding.ts +++ b/web/packages/new/photos/services/ml/embedding.ts @@ -309,26 +309,39 @@ const putEmbedding = async ( ensureOk(res); }; +interface PullEmbeddingsResult { + /** + * Derived data indexed by the file id whose data this is. + */ + items: Map; + /** + * The latest {@link updatedAt} epoch milliseconds from all the derived data + * in {@link items}. + */ + latestUpdatedAt: number; +} + /** - * Fetch new {@link model} embeddings since the given {@link sinceTime}. + * Fetch derived data that has been created or updated since the given + * {@link sinceTime}. * * This allows a client to perform a quick "diff" and get embeddings that has - * changed (created or updated) since the last time it checked. By fetching - * these all upfront instead of doing them one by one during the indexing, we - * can speed up the initial sync of existing embeddings on a new client. - * - * @param model The {@link EmbeddingModel} which we want. + * changed since the last time it checked. By fetching these all upfront instead + * of doing them one by one during the indexing, we can speed up the initial + * sync of existing embeddings on a new client. * * @param sinceTime Epoch milliseconds. We use this to ask remote to provide us - * embeddings whose {@link updatedAt} is more than the given value. If not + * derived data whose {@link updatedAt} is more than the given value. If not * specified, then we'll start from the beginning. * - * @param limit The maximum number of files to provide in the response. + * @param limit An advisory limit on the number of items to return. * - * @returns a list of {@link RemoteEmbedding}, and the latest {@link updatedAt} - * from amongst all embeddings that were fetched. The caller should persist that - * and use it in subsequent calls to {@link pullEmbeddings} to resume pulling - * from the current checkpoint. + * @returns a map of {@link RemoteDerivedData} indexed by the id of the file + * whose derived data it is, and the latest {@link updatedAt} from amongst all + * the data that was fetched. + * + * The caller should persist the returned timestamp for use in subsequent calls + * to {@link pullEmbeddings} to resume pulling from the current checkpoint. * * Returns undefined if nothing more is left to pull. */ @@ -336,8 +349,9 @@ const pullEmbeddings = async ( model: EmbeddingModel, sinceTime: number | undefined, limit: number, -) => { - getIndexedFiles(model) +): Promise => { + return undefined; + // getIndexedFiles(model) }; /** A single entry in the response of {@link getIndexedFiles}. */ From 523af2600a9b3f551bc097dcae324a04933cfe8c Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 10:49:45 +0530 Subject: [PATCH 05/10] pull wip --- .../new/photos/services/ml/embedding.ts | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/web/packages/new/photos/services/ml/embedding.ts b/web/packages/new/photos/services/ml/embedding.ts index 0588340db7..3e077c1b08 100644 --- a/web/packages/new/photos/services/ml/embedding.ts +++ b/web/packages/new/photos/services/ml/embedding.ts @@ -350,7 +350,31 @@ const pullEmbeddings = async ( sinceTime: number | undefined, limit: number, ): Promise => { - return undefined; + // If since time is not provided, start at 0 (the beginning). + let latestUpdatedAt = sinceTime ?? 0; + + // See if anything changed since then. + const indexedFiles = await getIndexedFiles( + "derived", + latestUpdatedAt, + limit, + ); + + // Nope. Nothing more is left to do. + if (!indexedFiles.length) return undefined; + + // Find the latest from amongst the given updatedAt. This'll serve as our + // checkpoint for the next pull. + latestUpdatedAt = indexedFiles.reduce( + (max, { updatedAt }) => Math.max(max, updatedAt), + latestUpdatedAt, + ); + + // Fetch the embeddings for these guys. In rare cases, remote might return a + // partial response, but that will not have any lasting impact since we + // anyways refetch the derived data before attempting indexing. + const items = await fetchDerivedData() + // getIndexedFiles(model) }; From 5a362b5d4558a2a8be98ca08248257ea3d19cfd3 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 11:06:41 +0530 Subject: [PATCH 06/10] Move wip --- .../new/photos/services/ml/embedding.ts | 92 ++++--------------- web/packages/new/photos/services/ml/worker.ts | 40 +++++++- 2 files changed, 55 insertions(+), 77 deletions(-) diff --git a/web/packages/new/photos/services/ml/embedding.ts b/web/packages/new/photos/services/ml/embedding.ts index 3e077c1b08..11df71bc84 100644 --- a/web/packages/new/photos/services/ml/embedding.ts +++ b/web/packages/new/photos/services/ml/embedding.ts @@ -309,75 +309,6 @@ const putEmbedding = async ( ensureOk(res); }; -interface PullEmbeddingsResult { - /** - * Derived data indexed by the file id whose data this is. - */ - items: Map; - /** - * The latest {@link updatedAt} epoch milliseconds from all the derived data - * in {@link items}. - */ - latestUpdatedAt: number; -} - -/** - * Fetch derived data that has been created or updated since the given - * {@link sinceTime}. - * - * This allows a client to perform a quick "diff" and get embeddings that has - * changed since the last time it checked. By fetching these all upfront instead - * of doing them one by one during the indexing, we can speed up the initial - * sync of existing embeddings on a new client. - * - * @param sinceTime Epoch milliseconds. We use this to ask remote to provide us - * derived data whose {@link updatedAt} is more than the given value. If not - * specified, then we'll start from the beginning. - * - * @param limit An advisory limit on the number of items to return. - * - * @returns a map of {@link RemoteDerivedData} indexed by the id of the file - * whose derived data it is, and the latest {@link updatedAt} from amongst all - * the data that was fetched. - * - * The caller should persist the returned timestamp for use in subsequent calls - * to {@link pullEmbeddings} to resume pulling from the current checkpoint. - * - * Returns undefined if nothing more is left to pull. - */ -const pullEmbeddings = async ( - model: EmbeddingModel, - sinceTime: number | undefined, - limit: number, -): Promise => { - // If since time is not provided, start at 0 (the beginning). - let latestUpdatedAt = sinceTime ?? 0; - - // See if anything changed since then. - const indexedFiles = await getIndexedFiles( - "derived", - latestUpdatedAt, - limit, - ); - - // Nope. Nothing more is left to do. - if (!indexedFiles.length) return undefined; - - // Find the latest from amongst the given updatedAt. This'll serve as our - // checkpoint for the next pull. - latestUpdatedAt = indexedFiles.reduce( - (max, { updatedAt }) => Math.max(max, updatedAt), - latestUpdatedAt, - ); - - // Fetch the embeddings for these guys. In rare cases, remote might return a - // partial response, but that will not have any lasting impact since we - // anyways refetch the derived data before attempting indexing. - const items = await fetchDerivedData() - - // getIndexedFiles(model) -}; - /** A single entry in the response of {@link getIndexedFiles}. */ const IndexedFile = z.object({ fileID: z.number(), @@ -387,21 +318,32 @@ const IndexedFile = z.object({ type IndexedFile = z.infer; /** - * Fetch the file ids for {@link model} embeddings that have been created or + * Fetch the file ids whose {@link model} derived data has been created or * updated since the given {@link sinceTime}. * - * See {@link pullEmbeddings} for details about the parameters. + * This allows a client to perform a quick "diff" and first fetch all derived + * data that has changed since the last time it checked. By fetching these all + * upfront instead of doing them one by one during the indexing, we can speed up + * the initial sync of existing embeddings on a new client. + * + * @param sinceTime Epoch milliseconds. We use this to ask remote to provide us + * derived data whose {@link updatedAt} is more than the given value. If not + * specified, then we'll start from the beginning. + * + * @param limit An advisory limit on the number of items to return. * * @returns an array of file ids, each with an associated timestamp when the - * embedding for that file was last changed. + * derived data for that file was last changed. + * + * The caller should persist the latest amongst these timestamps and use it in + * subsequent calls to resume pulling from the current checkpoint. */ -const getIndexedFiles = async ( - model: EmbeddingModel, +export const getIndexedDerivedDataFiles = async ( sinceTime: number, limit: number, ): Promise => { const params = new URLSearchParams({ - model, + model: "derived", sinceTime: sinceTime.toString(), limit: limit.toString(), }); diff --git a/web/packages/new/photos/services/ml/worker.ts b/web/packages/new/photos/services/ml/worker.ts index 8ebe207aac..20b017c50f 100644 --- a/web/packages/new/photos/services/ml/worker.ts +++ b/web/packages/new/photos/services/ml/worker.ts @@ -1,6 +1,6 @@ import { clientPackageName } from "@/base/app"; import { isHTTP4xxError } from "@/base/http"; -import { getKVN } from "@/base/kv"; +import { getKVN, setKV } from "@/base/kv"; import { ensureAuthToken } from "@/base/local-user"; import log from "@/base/log"; import type { ElectronMLWorker } from "@/base/types/ipc"; @@ -34,6 +34,7 @@ import { } from "./db"; import { fetchDerivedData, + getIndexedDerivedDataFiles, putDerivedData, type RawRemoteDerivedData, type RemoteDerivedData, @@ -311,8 +312,43 @@ expose(MLWorker); * * Return true atleast one embedding was pulled. */ -// eslint-disable-next-line @typescript-eslint/require-await const pull = async () => { + // If we've never pulled before, start at the beginning (0). + return pullSince((await latestDerivedDataUpdatedAt()) ?? 0); +}; + +const latestDerivedDataUpdatedAt = () => getKVN("latestDerivedDataUpdatedAt"); + +const setLatestDerivedDataUpdatedAt = (n: number) => + setKV("latestDerivedDataUpdatedAt", n); + +const pullSince = async (sinceTime: number) => { + // See if anything has changed since `sinceTime`. + const indexedFiles = await getIndexedDerivedDataFiles(sinceTime, 200); + + // Nope. Nothing more is left to do. + if (!indexedFiles.length) return undefined; + + // Find the latest from amongst all the updatedAt we got back. This'll serve + // as our checkpoint for the next pull. + const latestUpdatedAt = indexedFiles.reduce( + (max, { updatedAt }) => Math.max(max, updatedAt), + sinceTime, + ); + + // Fetch the embeddings for the files which changed. + // + // In rare cases, remote might return a partial response, but that will not + // have any lasting impact since we anyways refetch the derived data before + // attempting indexing. + + + + + const items = await fetchDerivedData(); + + // getIndexedFiles(model) + return ""; }; From eed991a7b2197c38ddd70e8a793362cd75dc3a31 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 11:22:10 +0530 Subject: [PATCH 07/10] Construct the scaffolding --- web/packages/new/photos/services/ml/worker.ts | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/web/packages/new/photos/services/ml/worker.ts b/web/packages/new/photos/services/ml/worker.ts index 20b017c50f..79124e34cc 100644 --- a/web/packages/new/photos/services/ml/worker.ts +++ b/web/packages/new/photos/services/ml/worker.ts @@ -314,7 +314,11 @@ expose(MLWorker); */ const pull = async () => { // If we've never pulled before, start at the beginning (0). - return pullSince((await latestDerivedDataUpdatedAt()) ?? 0); + const sinceTime = (await latestDerivedDataUpdatedAt()) ?? 0; + // Start fetching, starting the fetched count at 0. + const fetchedCount = await pullSince(sinceTime, 0); + // Return true if something got fetched. + return fetchedCount > 0; }; const latestDerivedDataUpdatedAt = () => getKVN("latestDerivedDataUpdatedAt"); @@ -322,12 +326,12 @@ const latestDerivedDataUpdatedAt = () => getKVN("latestDerivedDataUpdatedAt"); const setLatestDerivedDataUpdatedAt = (n: number) => setKV("latestDerivedDataUpdatedAt", n); -const pullSince = async (sinceTime: number) => { +const pullSince = async (sinceTime: number, fetchedCount: number) => { // See if anything has changed since `sinceTime`. const indexedFiles = await getIndexedDerivedDataFiles(sinceTime, 200); - // Nope. Nothing more is left to do. - if (!indexedFiles.length) return undefined; + // Nothing more is left. Return the previous fetch count we got. + if (!indexedFiles.length) return fetchedCount; // Find the latest from amongst all the updatedAt we got back. This'll serve // as our checkpoint for the next pull. @@ -342,14 +346,30 @@ const pullSince = async (sinceTime: number) => { // have any lasting impact since we anyways refetch the derived data before // attempting indexing. - + const localFiles = await getAllLocalFiles(); + const localFilesByID = new Map(localFiles.map((f) => [f.id, f])); + const filesByID = new Map( + indexedFiles + .map(({ fileID }) => localFilesByID.get(fileID)) + .filter((x) => x !== undefined) + .map((f) => [f.id, f]), + ); - const items = await fetchDerivedData(); + const items = await fetchDerivedData(filesByID); - // getIndexedFiles(model) + // TODO: Save items - return ""; + // Save the checkpoint. + await setLatestDerivedDataUpdatedAt(latestUpdatedAt); + + // Fetch subsequent items. As a safety valve, ensure we don't get into an + // infinite loop by checking that the sinceTime has advanced. + + if (latestUpdatedAt == sinceTime) + throw new Error(`Since time ${sinceTime} did not advance after a pull`); + + return pullSince(latestUpdatedAt, fetchedCount + items.size); }; /** From 97bbf4811fbea83462732fee3ff75bdd8f8575d0 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 11:45:25 +0530 Subject: [PATCH 08/10] Save --- web/packages/new/photos/services/ml/worker.ts | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/web/packages/new/photos/services/ml/worker.ts b/web/packages/new/photos/services/ml/worker.ts index 79124e34cc..035f296346 100644 --- a/web/packages/new/photos/services/ml/worker.ts +++ b/web/packages/new/photos/services/ml/worker.ts @@ -358,7 +358,22 @@ const pullSince = async (sinceTime: number, fetchedCount: number) => { const items = await fetchDerivedData(filesByID); - // TODO: Save items + const save = async ([id, data]: [number, RemoteDerivedData]) => { + try { + await saveDerivedData(id, data); + } catch (e) { + // Ignore errors during saving individual items, let the rest of the + // pull proceed. Failures will not have a lasting impact since the + // file will anyways get revisited as part of a backfill. + log.warn( + `Ignoring error when saving pulled derived data for file id ${id}`, + e, + ); + } + }; + + // Save items. + await Promise.all([...items.entries()].map(save)); // Save the checkpoint. await setLatestDerivedDataUpdatedAt(latestUpdatedAt); @@ -372,6 +387,57 @@ const pullSince = async (sinceTime: number, fetchedCount: number) => { return pullSince(latestUpdatedAt, fetchedCount + items.size); }; +/** + * Save the given {@link remoteDerivedData} for {@link fileID}. + * + * This as subset of the save sequence during {@link index}. This one is meant + * to be used during a {@link pull}. + */ +const saveDerivedData = async ( + fileID: number, + remoteDerivedData: RemoteDerivedData, +) => { + // Discard any existing data that is made by an older indexing pipelines. + // See: [Note: Embedding versions] + + const existingRemoteFaceIndex = remoteDerivedData.parsed?.face; + const existingRemoteCLIPIndex = remoteDerivedData.parsed?.clip; + + let existingFaceIndex: FaceIndex | undefined; + if ( + existingRemoteFaceIndex && + existingRemoteFaceIndex.version >= faceIndexingVersion + ) { + const { width, height, faces } = existingRemoteFaceIndex; + existingFaceIndex = { width, height, faces }; + } + + let existingCLIPIndex: CLIPIndex | undefined; + if ( + existingRemoteCLIPIndex && + existingRemoteCLIPIndex.version >= clipIndexingVersion + ) { + const { embedding } = existingRemoteCLIPIndex; + existingCLIPIndex = { embedding }; + } + + // If we have all the required embedding types, then save them, marking a + // file as indexed. + // + // In particular, this means that there might be files which we've marked + // indexed but still don't have the optional derived data types like exif. + // This is fine, we wish to compute the optional type of derived data when + // we can, but by themselves they're not reason enough for us to download + // and index the original. + + if (existingFaceIndex && existingCLIPIndex) { + await saveIndexes( + { fileID, ...existingFaceIndex }, + { fileID, ...existingCLIPIndex }, + ); + } +}; + /** * Find out files which need to be indexed. Then index the next batch of them. * @@ -545,6 +611,8 @@ const index = async ( existingRemoteFaceIndex && existingRemoteFaceIndex.version >= faceIndexingVersion ) { + // Destructure the data we got from remote so that we only retain the + // fields we're interested in the object that gets put into indexed db. const { width, height, faces } = existingRemoteFaceIndex; existingFaceIndex = { width, height, faces }; } From 940c647d507e8cd8448a13e909ee2e92b4814a53 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 12:18:34 +0530 Subject: [PATCH 09/10] Prevent multiple ticks from being enqueued Noticed multiple ticks when uploading an item, which brought back focus into the app and caused wakeUp also to get triggered because of sync. Not sure if this was the issue, but felt like a potential one. --- web/packages/new/photos/services/ml/worker.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/web/packages/new/photos/services/ml/worker.ts b/web/packages/new/photos/services/ml/worker.ts index 035f296346..ac577d2263 100644 --- a/web/packages/new/photos/services/ml/worker.ts +++ b/web/packages/new/photos/services/ml/worker.ts @@ -83,7 +83,7 @@ interface IndexableItem { export class MLWorker { private electron: ElectronMLWorker | undefined; private delegate: MLWorkerDelegate | undefined; - private state: "idle" | "pull" | "indexing" = "idle"; + private state: "idle" | "waking" | "pull" | "indexing" = "idle"; private shouldPull = false; private havePulledAtLeastOnce = false; private liveQ: IndexableItem[] = []; @@ -154,9 +154,13 @@ export class MLWorker { /** Invoked in response to external events. */ private wakeUp() { if (this.state == "idle") { - // Currently paused. Get back to work. + // We are currently paused. Get back to work. if (this.idleTimeout) clearTimeout(this.idleTimeout); this.idleTimeout = undefined; + // Change state so that multiple calls to `wakeUp` don't cause + // multiple calls to `tick`. + this.state = "waking"; + // Enqueue a tick. void this.tick(); } else { // In the middle of a task. Do nothing, `this.tick` will From 985de0a5cecb32113db566aa58894634c887c7e0 Mon Sep 17 00:00:00 2001 From: Manav Rathi Date: Thu, 1 Aug 2024 12:26:02 +0530 Subject: [PATCH 10/10] Fix the actual issue described in 940c647d507e8cd8448a13e909ee2e92b4814a53 --- web/packages/new/photos/services/ml/worker.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/web/packages/new/photos/services/ml/worker.ts b/web/packages/new/photos/services/ml/worker.ts index ac577d2263..aecf7b45d3 100644 --- a/web/packages/new/photos/services/ml/worker.ts +++ b/web/packages/new/photos/services/ml/worker.ts @@ -500,6 +500,9 @@ const indexNextBatch = async ( await wait(0); } + // Wait for the pending tasks to drain out. + await Promise.all(tasks); + // Return true if nothing failed. return allSuccess; };