diff --git a/web/apps/photos/src/services/entityService.ts b/web/apps/photos/src/services/entityService.ts index ea95da6a62..8a8f5322a7 100644 --- a/web/apps/photos/src/services/entityService.ts +++ b/web/apps/photos/src/services/entityService.ts @@ -15,6 +15,30 @@ import { } from "types/entity"; import { getLatestVersionEntities } from "utils/entity"; +/** + * The maximum number of items to fetch in a single diff + * + * [Note: Limit of returned items in /diff requests] + * + * The various GET /diff API methods, which tell the client what all has changed + * since a timestamp (provided by the client) take a limit parameter. + * + * These diff API calls return all items whose updated at is greater + * (non-inclusive) than the timestamp we provide. So there is no mechanism for + * pagination of items which have the same exact updated at. + * + * Conceptually, it may happen that there are more items than the limit we've + * provided, but there are practical safeguards. + * + * For file diff, the limit is advisory, and remote may return less, equal or + * more items than the provided limit. The scenario where it returns more is + * when more files than the limit have the same updated at. Theoretically it + * would make the diff response unbounded, however in practice file + * modifications themselves are all batched. Even if the user were to select all + * the files in their library and updates them all in one go in the UI, their + * client app is required to use batched API calls to make those updates, and + * each of those batches would get distinct updated at. + */ const DIFF_LIMIT = 500; const ENTITY_TABLES: Record = { diff --git a/web/packages/new/photos/services/ml/clip.ts b/web/packages/new/photos/services/ml/clip.ts index fab1147c37..e0bf6eecd4 100644 --- a/web/packages/new/photos/services/ml/clip.ts +++ b/web/packages/new/photos/services/ml/clip.ts @@ -1,4 +1,3 @@ -import type { EnteFile } from "@/new/photos/types/file"; import type { Electron } from "@/next/types/ipc"; import type { ImageBitmapAndData } from "./blob"; import { clipIndexes } from "./db"; @@ -59,14 +58,8 @@ export const clipIndexingVersion = 1; * itself, is the same across clients - web and mobile. */ export interface CLIPIndex { - /** The ID of the {@link EnteFile} whose index this is. */ - fileID: number; - /** An integral version number of the indexing algorithm / pipeline. */ - version: number; - /** The UA for the client which generated this embedding. */ - client: string; /** - * The CLIP embedding itself. + * The CLIP embedding. * * This is an array of 512 floating point values that represent the * embedding of the image in the same space where we'll embed the text so @@ -75,6 +68,18 @@ export interface CLIPIndex { embedding: number[]; } +export type RemoteCLIPIndex = CLIPIndex & { + /** An integral version number of the indexing algorithm / pipeline. */ + version: number; + /** The UA for the client which generated this embedding. */ + client: string; +}; + +export type LocalCLIPIndex = CLIPIndex & { + /** The ID of the {@link EnteFile} whose index this is. */ + fileID: number; +}; + /** * Compute the CLIP embedding of a given file. * @@ -89,33 +94,19 @@ export interface CLIPIndex { * that it can be saved locally and also uploaded to the user's remote storage * for use on their other devices). * - * @param enteFile The {@link EnteFile} to index. - * * @param uploadItem If we're called during the upload process, then this will * be set to the {@link UploadItem} that was uploaded. This way, we can directly * use the on-disk file instead of needing to download the original from remote. * * @param electron The {@link MLWorkerElectron} instance that allows us to call - * our Node.js layer for various functionality. - * - * @param userAgent The UA of the client that is doing the indexing (us). + * our Node.js layer to run the ONNX inference. */ export const indexCLIP = async ( - enteFile: EnteFile, image: ImageBitmapAndData, electron: MLWorkerElectron, - userAgent: string, -): Promise => { - const { data: imageData } = image; - const fileID = enteFile.id; - - return { - fileID, - version: clipIndexingVersion, - client: userAgent, - embedding: await computeEmbedding(imageData, electron), - }; -}; +): Promise => ({ + embedding: await computeEmbedding(image.data, electron), +}); const computeEmbedding = async ( imageData: ImageData, @@ -184,10 +175,10 @@ const normalized = (embedding: Float32Array) => { * native code running in our desktop app (the embedding happens in the native * layer). * - * It return a list of files that should be shown in the search results. The - * actual return type is a map from fileIDs to the scores they got (higher is - * better). This map will only contains entries whose score was above our - * minimum threshold. + * It returns file (IDs) that should be shown in the search results. They're + * returned as a map from fileIDs to the scores they got (higher is better). + * This map will only contains entries whose score was above our minimum + * threshold. * * The result can also be `undefined`, which indicates that the download for the * ML model is still in progress (trying again later should succeed). diff --git a/web/packages/new/photos/services/ml/crop.ts b/web/packages/new/photos/services/ml/crop.ts index ed13cdff52..5a626e0d50 100644 --- a/web/packages/new/photos/services/ml/crop.ts +++ b/web/packages/new/photos/services/ml/crop.ts @@ -56,7 +56,7 @@ export const saveFaceCrops = async ( const cache = await blobCache("face-crops"); return Promise.all( - faceIndex.faceEmbedding.faces.map(({ faceID, detection }) => + faceIndex.faces.map(({ faceID, detection }) => extractFaceCrop(imageBitmap, detection.box).then((b) => cache.put(faceID, b), ), diff --git a/web/packages/new/photos/services/ml/db.ts b/web/packages/new/photos/services/ml/db.ts index 51ca2f749b..7bc4857b76 100644 --- a/web/packages/new/photos/services/ml/db.ts +++ b/web/packages/new/photos/services/ml/db.ts @@ -1,9 +1,9 @@ +import { removeKV } from "@/next/kv"; import log from "@/next/log"; import localForage from "@ente/shared/storage/localForage"; import { deleteDB, openDB, type DBSchema } from "idb"; -import type { CLIPIndex } from "./clip"; -import type { EmbeddingModel } from "./embedding"; -import type { FaceIndex } from "./face"; +import type { LocalCLIPIndex } from "./clip"; +import type { LocalFaceIndex } from "./face"; /** * ML DB schema. @@ -15,11 +15,11 @@ import type { FaceIndex } from "./face"; * required), this is synced with the list of files that the current client * knows about locally. * - * - "face-index": Contains {@link FaceIndex} objects, either indexed locally or - * fetched from remote storage. + * - "face-index": Contains {@link LocalFaceIndex} objects, either indexed + * locally or fetched from remote. * - * - "clip-index": Contains {@link CLIPIndex} objects, either indexed locally or - * fetched from remote storage. + * - "clip-index": Contains {@link LocalCLIPIndex} objects, either indexed + * locally or fetched from remote. * * All the stores are keyed by {@link fileID}. The "file-status" contains * book-keeping about the indexing process (whether or not a file needs @@ -27,7 +27,7 @@ import type { FaceIndex } from "./face"; * the actual indexing results. * * In tandem, these serve as the underlying storage for the functions exposed by - * this file. + * the ML database. */ interface MLDBSchema extends DBSchema { "file-status": { @@ -37,11 +37,11 @@ interface MLDBSchema extends DBSchema { }; "face-index": { key: number; - value: FaceIndex; + value: LocalFaceIndex; }; "clip-index": { key: number; - value: CLIPIndex; + value: LocalCLIPIndex; }; } @@ -60,24 +60,11 @@ interface FileStatus { * * - "failed" - Indexing was attempted but failed. * - * There can arise situations in which a file has one, but not all, indexes. - * e.g. it may have a "face-index" but "clip-index" might've not yet - * happened (or failed). In such cases, the status of the file will be - * "indexable": it transitions to "indexed" only after all indexes have been - * computed or fetched. - * - * If you have't heard the word "index" to the point of zoning out, we also + * If you haven't heard the word "index" to the point of zoning out, we also * have a (IndexedDB) "index" on the status field to allow us to efficiently * select or count {@link fileIDs} that fall into various buckets. */ status: "indexable" | "indexed" | "failed"; - /** - * A list of embeddings that we still need to compute for the file. - * - * This is guaranteed to be empty if status is "indexed", and will have at - * least one entry otherwise. - */ - pending: EmbeddingModel[]; /** * The number of times attempts to index this file failed. * @@ -151,6 +138,17 @@ const deleteLegacyDB = () => { localForage.removeItem("onnx-clip-embedding_sync_time"), localForage.removeItem("file-ml-clip-face-embedding_sync_time"), ]); + + // Delete keys for the legacy diff based sync. + // + // This code was added July 2024 (v1.7.3-beta). These keys were never + // enabled outside of the nightly builds, so this cleanup is not a hard + // need. Either ways, it can be removed at some point when most clients have + // migrated (tag: Migration). + void Promise.all([ + removeKV("embeddingSyncTime:onnx-clip"), + removeKV("embeddingSyncTime:file-ml-clip-face"), + ]); }; /** @@ -183,36 +181,34 @@ export const clearMLDB = async () => { }; /** - * Save the given {@link faceIndex} locally. - * - * @param faceIndex A {@link FaceIndex} representing the faces that we detected - * (and their corresponding embeddings) in a particular file. - * - * This function adds a new entry for the face index, overwriting any existing - * ones (No merging is performed, the existing entry is unconditionally - * overwritten). The file status is also updated to remove face from the pending - * embeddings. If there are no other pending embeddings, the status changes to + * Save the given {@link faceIndex} and {@link clipIndex}, and mark the file as * "indexed". + * + * This function adds a new entry for the indexes, overwriting any existing ones + * (No merging is performed, the existing entry is unconditionally overwritten). + * The file status is also updated to "indexed", and the failure count is reset + * to 0. */ -export const saveFaceIndex = async (faceIndex: FaceIndex) => { +export const saveIndexes = async ( + faceIndex: LocalFaceIndex, + clipIndex: LocalCLIPIndex, +) => { const { fileID } = faceIndex; const db = await mlDB(); - const tx = db.transaction(["file-status", "face-index"], "readwrite"); - const statusStore = tx.objectStore("file-status"); - const indexStore = tx.objectStore("face-index"); - - const fileStatus = - (await statusStore.get(IDBKeyRange.only(fileID))) ?? - newFileStatus(fileID); - fileStatus.pending = fileStatus.pending.filter( - (v) => v != "file-ml-clip-face", + const tx = db.transaction( + ["file-status", "face-index", "clip-index"], + "readwrite", ); - if (fileStatus.pending.length == 0) fileStatus.status = "indexed"; await Promise.all([ - statusStore.put(fileStatus), - indexStore.put(faceIndex), + tx.objectStore("file-status").put({ + fileID, + status: "indexed", + failureCount: 0, + }), + tx.objectStore("face-index").put(faceIndex), + tx.objectStore("clip-index").put(clipIndex), tx.done, ]); }; @@ -224,45 +220,9 @@ export const saveFaceIndex = async (faceIndex: FaceIndex) => { const newFileStatus = (fileID: number): FileStatus => ({ fileID, status: "indexable", - // TODO-ML: clip-test - // pending: ["file-ml-clip-face", "onnx-clip"], - pending: ["file-ml-clip-face"], failureCount: 0, }); -/** - * Save the given {@link clipIndex} locally. - * - * @param clipIndex A {@link CLIPIndex} containing the CLIP embedding for a - * particular file. - * - * This function adds a new entry for the CLIP index, overwriting any existing - * ones (No merging is performed, the existing entry is unconditionally - * overwritten). The file status is also updated to remove CLIP from the pending - * embeddings. If there are no other pending embeddings, the status changes to - * "indexed". - */ -export const saveCLIPIndex = async (clipIndex: CLIPIndex) => { - const { fileID } = clipIndex; - - const db = await mlDB(); - const tx = db.transaction(["file-status", "clip-index"], "readwrite"); - const statusStore = tx.objectStore("file-status"); - const indexStore = tx.objectStore("clip-index"); - - const fileStatus = - (await statusStore.get(IDBKeyRange.only(fileID))) ?? - newFileStatus(fileID); - fileStatus.pending = fileStatus.pending.filter((v) => v != "onnx-clip"); - if (fileStatus.pending.length == 0) fileStatus.status = "indexed"; - - await Promise.all([ - statusStore.put(fileStatus), - indexStore.put(clipIndex), - tx.done, - ]); -}; - /** * Return the {@link FaceIndex}, if any, for {@link fileID}. */ @@ -317,9 +277,9 @@ export const addFileEntry = async (fileID: number) => { * DB are removed from ML DB (including any indexes). * * - Files that are not present locally but are in the trash are retained in ML - * DB if their status is "indexed"; otherwise they too are removed. This - * special case is to prevent churn (re-indexing) if the user moves some files - * to trash but then later restores them before they get permanently deleted. + * DB if their status is "indexed"; otherwise they too are removed. This is to + * prevent churn, otherwise they'll get unnecessarily reindexed again if the + * user restores them from trash before permanent deletion. */ export const updateAssumingLocalFiles = async ( localFileIDs: number[], diff --git a/web/packages/new/photos/services/ml/embedding.ts b/web/packages/new/photos/services/ml/embedding.ts index a6884bc17d..4e452a4c61 100644 --- a/web/packages/new/photos/services/ml/embedding.ts +++ b/web/packages/new/photos/services/ml/embedding.ts @@ -1,20 +1,15 @@ -import { - getAllLocalFiles, - getLocalTrashedFiles, -} from "@/new/photos/services/files"; import type { EnteFile } from "@/new/photos/types/file"; import { decryptFileMetadata, encryptFileMetadata, } from "@/new/shared/crypto/ente"; import { authenticatedRequestHeaders, ensureOk } from "@/next/http"; -import { getKV, setKV } from "@/next/kv"; import log from "@/next/log"; import { apiURL } from "@/next/origins"; +import { nullToUndefined } from "@/utils/transform"; import { z } from "zod"; -import { clipIndexingVersion, type CLIPIndex } from "./clip"; -import { saveCLIPIndex, saveFaceIndex } from "./db"; -import { faceIndexingVersion, type FaceIndex } from "./face"; +import { type RemoteCLIPIndex } from "./clip"; +import { type RemoteFaceIndex } from "./face"; /** * The embeddings that we (the current client) knows how to handle. @@ -23,34 +18,38 @@ import { faceIndexingVersion, type FaceIndex } from "./face"; * encrypted embeddings from remote. However, we should be prepared to receive a * {@link RemoteEmbedding} with a model value different from these. * - * It is vernacularly called a model, but strictly speaking this is not the - * model, but the embedding produced by a particular model with a particular set - * of pre-/post- processing steps and related hyperparameters. It is better - * thought of as an "type" of embedding produced or consumed by the client. + * [Note: Embedding/model vs derived data] * - * [Note: Handling versioning of embeddings] + * Historically, this has been called an "embedding" or a "model" in the API + * terminology. However, it is more like derived data. * - * The embeddings themselves have a version embedded in them, so it is possible - * for us to make backward compatible updates to the indexing process on newer - * clients (There is also a top level version field too but that is not used). + * It started off being called as "model", but strictly speaking it was not just + * the model, but the embedding produced by a particular ML model when used with + * a particular set of pre-/post- processing steps and related hyperparameters. + * It is better thought of as an "type" of embedding produced or consumed by the + * client, e.g. the "face" embedding, or the "clip" embedding. * - * If we bump the version of same model (say when indexing on a newer client), - * the assumption will be that older client will be able to consume the - * response. e.g. Say if we improve blur detection, older client should just - * consume embeddings with a newer version and not try to index the file again - * locally. + * Even the word embedding is a synedoche, since it might have other data. For + * example, for faces, it in not just the face embedding, but also the detection + * regions, landmarks etc: What we've come to refer as the "face index" in our + * client code terminology. * - * If we get an embedding with version that is older than the version the client - * supports, then the client should ignore it. This way, the file will get - * reindexed locally an embedding with a newer version will get put to remote. + * Later on, to avoid the proliferation of small files (one per embedding), we + * combined all these embeddings into a single "embedding", which is a map of + * the form: * - * In the case where the changes are not backward compatible and can only be - * consumed by clients with the relevant scaffolding, then we change this - * "model" (i.e "type") field to create a new universe of embeddings. + * { + * "face": ... the face indexing result ... + * "clip": ... the CLIP indexing result ... + * "exif": ... the Exif extracted from the file ... + * ... more in the future ... + * } + * + * Thus, now this is best thought of a tag for a particular format of encoding + * all the derived data associated with a file. */ -export type EmbeddingModel = - | "onnx-clip" /* CLIP embeddings */ - | "file-ml-clip-face" /* Face embeddings */; +// TODO-ML: Fix name to "combined" before release +type EmbeddingModel = "onnx-clip" /* Combined format */; const RemoteEmbedding = z.object({ /** The ID of the file whose embedding this is. */ @@ -73,182 +72,237 @@ const RemoteEmbedding = z.object({ * the crypto layer. */ decryptionHeader: z.string(), - /** Last time (epoch ms) this embedding was updated. */ - updatedAt: z.number(), }); type RemoteEmbedding = z.infer; -/** - * Fetch new or updated embeddings from remote and save them locally. - * - * @param model The {@link EmbeddingModel} for which to pull embeddings. For - * each model, this function maintains the last sync time in local storage so - * subsequent fetches only pull what's new. - * - * @param save A function that is called to save the embedding. The save process - * can be model specific, so this provides us a hook to reuse the surrounding - * pull mechanisms while varying the save itself. This function will be passed - * the decrypted embedding string. If it throws, then we'll log about but - * otherwise ignore the embedding under consideration. - * - * This function should be called only after we have synced files with remote. - * See: [Note: Ignoring embeddings for unknown files]. - * - * @returns true if at least one embedding was pulled, false otherwise. - */ -const pullEmbeddings = async ( - model: EmbeddingModel, - save: (decryptedEmbedding: string) => Promise, -) => { - // Include files from trash, otherwise they'll get unnecessarily reindexed - // if the user restores them from trash before permanent deletion. - const localFiles = (await getAllLocalFiles()).concat( - await getLocalTrashedFiles(), - ); - // [Note: Ignoring embeddings for unknown files] - // - // We need the file to decrypt the embedding. This is easily ensured by - // running the embedding sync after we have synced our local files with - // remote. - // - // Still, it might happen that we come across an embedding for which we - // don't have the corresponding file locally. We can put them in two - // buckets: - // - // 1. Known case: In rare cases we might get a diff entry for an embedding - // corresponding to a file which has been deleted (but whose embedding - // is enqueued for deletion). Client should expect such a scenario, but - // all they have to do is just ignore such embeddings. - // - // 2. Other unknown cases: Even if somehow we end up with an embedding for - // a existent file which we don't have locally, it is fine because the - // current client will just regenerate the embedding if the file really - // exists and gets locally found later. There would be a bit of - // duplicate work, but that's fine as long as there isn't a systematic - // scenario where this happens. - const localFilesByID = new Map(localFiles.map((f) => [f.id, f])); +export type RawRemoteDerivedData = Record; - let didPull = false; - let sinceTime = await embeddingSyncTime(model); - // TODO: eslint has fixed this spurious warning, but we're not on the latest - // version yet, so add a disable. - // https://github.com/eslint/eslint/pull/18286 - /* eslint-disable no-constant-condition */ - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - while (true) { - const remoteEmbeddings = await getEmbeddingsDiff(model, sinceTime); - if (remoteEmbeddings.length == 0) break; - let count = 0; - for (const remoteEmbedding of remoteEmbeddings) { - sinceTime = Math.max(sinceTime, remoteEmbedding.updatedAt); - try { - const file = localFilesByID.get(remoteEmbedding.fileID); - if (!file) continue; - await save( - await decryptFileMetadata( - remoteEmbedding.encryptedEmbedding, - remoteEmbedding.decryptionHeader, - file.key, - ), - ); - didPull = true; - count++; - } catch (e) { - log.warn(`Ignoring unparseable ${model} embedding`, e); - } +export type ParsedRemoteDerivedData = Partial<{ + face: RemoteFaceIndex; + clip: RemoteCLIPIndex; +}>; + +/** + * The decrypted payload of a {@link RemoteEmbedding} for the "combined" + * {@link EmbeddingModel}. + * + * [Note: Preserve unknown derived data fields] + * + * The remote derived data can contain arbitrary key at the top level apart from + * the ones that the current client knows about. We need to preserve these + * verbatim when we use {@link putDerivedData}. + * + * Thus we return two separate results from {@link fetchDerivedData}: + * + * - {@link RawRemoteDerivedData}: The original, unmodified JSON. + * + * - {@link ParsedRemoteDerivedData}: The particular fields that the current + * client knows about, parsed according to their expected structure. + * + * When integrating this information into our local state, we use the parsed + * version. And if we need to update the state on remote (e.g. if the current + * client notices an embedding type that was missing), then we use the original + * JSON as the base. + */ +export interface RemoteDerivedData { + raw: RawRemoteDerivedData; + parsed: ParsedRemoteDerivedData | undefined; +} + +/** + * Zod schema for the {@link RemoteFaceIndex} type. + * + * [Note: Duplicated Zod schema and TypeScript type] + * + * Usually we define a Zod schema, and then infer the corresponding TypeScript + * type for it using `z.infer`. This works great except that the docstrings + * don't show up: Docstrings get added to the Zod schema, but usually the code + * using the parsed data will reference the TypeScript type, and the docstrings + * added to the fields in the Zod schema won't show up. + * + * We usually live with this infelicity since the alternative is code + * duplication: Defining a TypeScript type (putting the docstrings therein) + * _and_ also a corresponding Zod schema. The duplication is needed because it + * is not possible to go the other way (TypeScript type => Zod schema). + * + * However, in some cases having when the TypeScript type under consideration is + * used pervasively in code, having a standalone TypeScript type with attached + * docstrings is worth the code duplication. + * + * Note that this'll just be syntactic duplication - if the two definitions get + * out of sync in the shape of the types they represent, the TypeScript compiler + * will flag it for us. + */ +const RemoteFaceIndex = z.object({ + version: z.number(), + client: z.string(), + width: z.number(), + height: z.number(), + faces: z.array( + z.object({ + faceID: z.string(), + detection: z.object({ + box: z.object({ + x: z.number(), + y: z.number(), + width: z.number(), + height: z.number(), + }), + landmarks: z.array( + z.object({ + x: z.number(), + y: z.number(), + }), + ), + }), + score: z.number(), + blur: z.number(), + embedding: z.array(z.number()), + }), + ), +}); + +/** + * Zod schema for the {@link RemoteCLIPIndex} type. + * + * See: [Note: Duplicated Zod schema and TypeScript type] + */ +const RemoteCLIPIndex = z.object({ + version: z.number(), + client: z.string(), + embedding: z.array(z.number()), +}); + +/** + * Zod schema for the {@link RawRemoteDerivedData} type. + */ +const RawRemoteDerivedData = z.object({}).passthrough(); + +/** + * Zod schema for the {@link ParsedRemoteDerivedData} type. + */ +const ParsedRemoteDerivedData = z.object({ + face: RemoteFaceIndex.nullish().transform(nullToUndefined), + clip: RemoteCLIPIndex.nullish().transform(nullToUndefined), +}); + +/** + * Fetch derived data for the given files from remote. + * + * @param filesByID A map containing the files whose derived data we want to + * fetch. Each entry is keyed the the file's ID, and the value is the file. + * + * @returns a map containing the (decrypted) derived data for each file for + * which remote returned the corresponding embedding. Each entry in the map is + * keyed by file's ID, and each value is a {@link RemoteDerivedData} that + * contains both the original JSON, and parsed representation of embeddings that + * we know about. + */ +export const fetchDerivedData = async ( + filesByID: Map, +): Promise> => { + // TODO-ML: Fix name to "combined" before release + const remoteEmbeddings = await fetchEmbeddings("onnx-clip", [ + ...filesByID.keys(), + ]); + + const result = new Map(); + for (const remoteEmbedding of remoteEmbeddings) { + const { fileID } = remoteEmbedding; + const file = filesByID.get(fileID); + if (!file) { + log.warn(`Ignoring derived data for unknown fileID ${fileID}`); + continue; + } + + try { + const decryptedBytes = await decryptFileMetadata( + remoteEmbedding.encryptedEmbedding, + remoteEmbedding.decryptionHeader, + file.key, + ); + const jsonString = await gunzip(decryptedBytes); + result.set(fileID, remoteDerivedDataFromJSONString(jsonString)); + } catch (e) { + // This shouldn't happen. Likely some client has uploaded a + // corrupted embedding. Ignore it so that it gets reindexed and + // uploaded correctly again. + log.warn(`Ignoring unparseable embedding for ${fileID}`, e); } - await saveEmbeddingSyncTime(sinceTime, model); - log.info(`Fetched ${count} ${model} embeddings`); } - return didPull; + log.debug(() => `Fetched ${result.size} combined embeddings`); + return result; +}; + +const remoteDerivedDataFromJSONString = (jsonString: string) => { + const raw = RawRemoteDerivedData.parse(JSON.parse(jsonString)); + const parseResult = ParsedRemoteDerivedData.safeParse(raw); + // This code is included in apps/photos, which currently does not have the + // TypeScript strict mode enabled, which causes a spurious tsc failure. + // + // eslint-disable-next-line @typescript-eslint/ban-ts-comment, @typescript-eslint/prefer-ts-expect-error + // @ts-ignore + const parsed = parseResult.success + ? (parseResult.data as ParsedRemoteDerivedData) + : undefined; + return { raw, parsed }; }; /** - * The updatedAt of the most recent {@link RemoteEmbedding} for {@link model} - * we've retrieved from remote. + * Fetch {@link model} embeddings for the given list of files. * - * Returns 0 if there is no such embedding. + * @param model The {@link EmbeddingModel} which we want. * - * This value is persisted to local storage. To update it, use - * {@link saveEmbeddingSyncTime}. + * @param fileIDs The ids of the files for which we want the embeddings. + * + * @returns a list of {@link RemoteEmbedding} for the files which had embeddings + * (and thatt remote was able to successfully retrieve). The order of this list + * is arbitrary, and the caller should use the {@link fileID} present within the + * {@link RemoteEmbedding} to associate an item in the result back to a file + * instead of relying on the order or count of items in the result. */ -const embeddingSyncTime = async (model: EmbeddingModel) => - parseInt((await getKV("embeddingSyncTime:" + model)) ?? "0"); - -/** Sibling of {@link embeddingSyncTime}. */ -const saveEmbeddingSyncTime = async (t: number, model: EmbeddingModel) => - setKV("embeddingSyncTime:" + model, `${t}`); - -/** - * The maximum number of items to fetch in a single GET /embeddings/diff - * - * [Note: Limit of returned items in /diff requests] - * - * The various GET /diff API methods, which tell the client what all has changed - * since a timestamp (provided by the client) take a limit parameter. - * - * These diff API calls return all items whose updated at is greater - * (non-inclusive) than the timestamp we provide. So there is no mechanism for - * pagination of items which have the same exact updated at. Conceptually, it - * may happen that there are more items than the limit we've provided. - * - * The behaviour of this limit is different for file diff and embeddings diff. - * - * - For file diff, the limit is advisory, and remote may return less, equal - * or more items than the provided limit. The scenario where it returns more - * is when more files than the limit have the same updated at. Theoretically - * it would make the diff response unbounded, however in practice file - * modifications themselves are all batched. Even if the user selects all - * the files in their library and updates them all in one go in the UI, - * their client app must use batched API calls to make those updates, and - * each of those batches would get distinct updated at. - * - * - For embeddings diff, there are no bulk updates and this limit is enforced - * as a maximum. While theoretically it is possible for an arbitrary number - * of files to have the same updated at, in practice it is not possible with - * the current set of APIs where clients PUT individual embeddings (the - * updated at is a server timestamp). And even if somehow a large number of - * files get the same updated at and thus get truncated in the response, it - * won't lead to any data loss, the client which requested that particular - * truncated diff will just regenerate them. - */ -const diffLimit = 500; - -/** - * GET embeddings for the given model that have been updated {@link sinceTime}. - * - * This fetches the next {@link diffLimit} embeddings whose {@link updatedAt} is - * greater than the given {@link sinceTime} (non-inclusive). - * - * @param model The {@link EmbeddingModel} whose diff we wish for. - * - * @param sinceTime The updatedAt of the last embedding we've synced (epoch ms). - * Pass 0 to fetch everything from the beginning. - * - * @returns an array of {@link RemoteEmbedding}. The returned array is limited - * to a maximum count of {@link diffLimit}. - * - * > See [Note: Limit of returned items in /diff requests]. - */ -const getEmbeddingsDiff = async ( +const fetchEmbeddings = async ( model: EmbeddingModel, - sinceTime: number, + fileIDs: number[], ): Promise => { - const params = new URLSearchParams({ - model, - sinceTime: `${sinceTime}`, - limit: `${diffLimit}`, - }); - const url = await apiURL("/embeddings/diff"); - const res = await fetch(`${url}?${params.toString()}`, { + const res = await fetch(await apiURL("/embeddings/files"), { + method: "POST", headers: await authenticatedRequestHeaders(), + body: JSON.stringify({ + model, + fileIDs, + }), }); ensureOk(res); - return z.object({ diff: z.array(RemoteEmbedding) }).parse(await res.json()) - .diff; + return z + .object({ embeddings: z.array(RemoteEmbedding) }) + .parse(await res.json()).embeddings; }; +/** + * Update the derived data stored for given {@link enteFile} on remote. + * + * This allows other clients to directly pull the derived data instead of + * needing to re-index. + * + * The data on remote will be replaced unconditionally, and it is up to the + * client (us) to ensure that we preserve the parts of the pre-existing derived + * data (if any) that we did not understand or touch. + * + * See: [Note: Preserve unknown derived data fields]. + */ +export const putDerivedData = async ( + enteFile: EnteFile, + derivedData: RawRemoteDerivedData, +) => + // TODO-ML: Fix name to "combined" before release + putEmbedding( + enteFile, + "onnx-clip", + await gzip(JSON.stringify(derivedData)), + ); + /** * Upload an embedding to remote. * @@ -259,16 +313,14 @@ const getEmbeddingsDiff = async ( * * @param model The {@link EmbeddingModel} which we are uploading. * - * @param embedding String representation of the embedding. The exact contents - * of the embedding are model specific (usually this is the JSON string). + * @param embedding The binary data the embedding. The exact contents of the + * embedding are {@link model} specific. */ -export const putEmbedding = async ( +const putEmbedding = async ( enteFile: EnteFile, model: EmbeddingModel, - embedding: string, + embedding: Uint8Array, ) => { - log.debug(() => ["Uploading embedding", { model, embedding }]); - const { encryptedMetadataB64, decryptionHeaderB64 } = await encryptFileMetadata(embedding, enteFile.key); @@ -285,174 +337,35 @@ export const putEmbedding = async ( ensureOk(res); }; -// MARK: - Face +// MARK: - GZIP /** - * Fetch new or updated face embeddings from remote and save them locally. + * Compress the given {@link string} using "gzip" and return the resultant + * bytes. See {@link gunzip} for the reverse operation. * - * It takes no parameters since it saves the last sync time in local storage. - * - * This function should be called only after we have synced files with remote. - * See: [Note: Ignoring embeddings for unknown files]. - * - * @returns true if at least one embedding was pulled, false otherwise. + * This is syntactic sugar to deal with the string/blob/stream/bytes + * conversions, but it should not be taken as an abstraction layer. If your code + * can directly use a ReadableStream, then then data -> stream -> data round + * trip is unnecessary. */ -export const pullFaceEmbeddings = () => - pullEmbeddings("file-ml-clip-face", (jsonString: string) => - // eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error, @typescript-eslint/ban-ts-comment - // @ts-ignore TODO: There is no error here, but this file is imported by - // one of our packages that doesn't have strict mode enabled yet, - // causing a spurious error to be emitted in that context. - saveFaceIndexIfNewer(FaceIndex.parse(JSON.parse(jsonString))), - ); - -/** - * Save the given {@link faceIndex} locally if it is newer than the one we have. - * - * This is a variant of {@link saveFaceIndex} that performs version checking as - * described in [Note: Handling versioning of embeddings]. - */ -const saveFaceIndexIfNewer = async (index: FaceIndex) => { - const version = index.faceEmbedding.version; - if (version < faceIndexingVersion) { - log.info( - `Ignoring remote face index with version ${version} older than what our indexer can produce (${faceIndexingVersion})`, - ); - return; - } - return saveFaceIndex(index); +const gzip = async (string: string) => { + const compressedStream = new Blob([string]) + .stream() + // This code only runs on the desktop app currently, so we can rely on + // the existence of new web features the CompressionStream APIs. + .pipeThrough(new CompressionStream("gzip")); + return new Uint8Array(await new Response(compressedStream).arrayBuffer()); }; /** - * Zod schemas for the {@link FaceIndex} types. - * - * [Note: Duplicated between Zod schemas and TS type] - * - * Usually we define a Zod schema, and then infer the corresponding TypeScript - * type for it using `z.infer`. This works great except now the docstrings don't - * show up: The doc strings get added to the Zod schema, but usually the code - * using the parsed data will reference the TypeScript type, and the docstrings - * added to the fields in the Zod schema won't show up. - * - * We usually live with this infelicity, since the alternative is code - * duplication: Define the TypeScript type (putting the docstrings therein) - * _and_ also a corresponding Zod schema. The duplication happens because it is - * not possible to go the other way (TS type => Zod schema). - * - * However, in some cases having when the TypeScript type under consideration is - * used pervasively in code, having a standalone TypeScript type with attached - * docstrings is worth the code duplication. - * - * Note that this'll just be syntactic duplication - if the two definitions get - * out of sync in the shape of the types they represent, the TypeScript compiler - * will flag it for us. + * Decompress the given "gzip" compressed {@link data} and return the resultant + * string. See {@link gzip} for the reverse operation. */ -const FaceIndex = z - .object({ - fileID: z.number(), - width: z.number(), - height: z.number(), - faceEmbedding: z - .object({ - version: z.number(), - client: z.string(), - faces: z.array( - z - .object({ - faceID: z.string(), - detection: z - .object({ - box: z - .object({ - x: z.number(), - y: z.number(), - width: z.number(), - height: z.number(), - }) - .passthrough(), - landmarks: z.array( - z - .object({ - x: z.number(), - y: z.number(), - }) - .passthrough(), - ), - }) - .passthrough(), - score: z.number(), - blur: z.number(), - embedding: z.array(z.number()), - }) - .passthrough(), - ), - }) - .passthrough(), - }) - // Retain fields we might not (currently) understand. - .passthrough(); - -/** - * Save the face index for the given {@link enteFile} on remote so that other - * clients can directly pull it instead of needing to reindex. - */ -export const putFaceIndex = async (enteFile: EnteFile, faceIndex: FaceIndex) => - putEmbedding(enteFile, "file-ml-clip-face", JSON.stringify(faceIndex)); - -// MARK: - CLIP - -/** - * Fetch new or updated CLIP embeddings from remote and save them locally. - * - * See {@link pullFaceEmbeddings} for a sibling function with more comprehensive - * documentation. - * - * @returns true if at least one embedding was pulled, false otherwise. - */ -export const pullCLIPEmbeddings = () => - pullEmbeddings("onnx-clip", (jsonString: string) => - // eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error, @typescript-eslint/ban-ts-comment - // @ts-ignore TODO: There is no error here, but this file is imported by - // one of our packages that doesn't have strict mode enabled yet, - // causing a spurious error to be emitted in that context. - saveCLIPIndexIfNewer(CLIPIndex.parse(JSON.parse(jsonString))), - ); - -/** - * Save the given {@link clipIndex} locally if it is newer than the one we have. - * - * This is a variant of {@link saveCLIPIndex} that performs version checking as - * described in [Note: Handling versioning of embeddings]. - */ -const saveCLIPIndexIfNewer = async (index: CLIPIndex) => { - const version = index.version; - if (version < clipIndexingVersion) { - log.info( - `Ignoring remote CLIP index with version ${version} older than what our indexer can produce (${clipIndexingVersion})`, - ); - return; - } - return saveCLIPIndex(index); +const gunzip = async (data: Uint8Array) => { + const decompressedStream = new Blob([data]) + .stream() + // This code only runs on the desktop app currently, so we can rely on + // the existence of new web features the CompressionStream APIs. + .pipeThrough(new DecompressionStream("gzip")); + return new Response(decompressedStream).text(); }; - -/** - * Zod schemas for the {@link CLIPIndex} types. - * - * See: [Note: Duplicated between Zod schemas and TS type] - */ -const CLIPIndex = z - .object({ - fileID: z.number(), - version: z.number(), - client: z.string(), - embedding: z.array(z.number()), - }) - // Retain fields we might not (currently) understand. - .passthrough(); - -/** - * Save the CLIP index for the given {@link enteFile} on remote so that other - * clients can directly pull it instead of needing to reindex. - */ -export const putCLIPIndex = async (enteFile: EnteFile, clipIndex: CLIPIndex) => - putEmbedding(enteFile, "onnx-clip", JSON.stringify(clipIndex)); diff --git a/web/packages/new/photos/services/ml/face.ts b/web/packages/new/photos/services/ml/face.ts index 2fb953e6d7..910970d3b9 100644 --- a/web/packages/new/photos/services/ml/face.ts +++ b/web/packages/new/photos/services/ml/face.ts @@ -8,7 +8,6 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ import type { EnteFile } from "@/new/photos/types/file"; -import log from "@/next/log"; import { Matrix } from "ml-matrix"; import { getSimilarityTransformation } from "similarity-transformation"; import { @@ -19,7 +18,6 @@ import { type Matrix as TransformationMatrix, } from "transformation-matrix"; import type { ImageBitmapAndData } from "./blob"; -import { saveFaceCrops } from "./crop"; import { grayscaleIntMatrixFromNormalized2List, pixelRGBBilinear, @@ -36,30 +34,85 @@ export const faceIndexingVersion = 1; /** * The faces in a file (and an embedding for each of them). * - * This interface describes the format of both local and remote face data. + * This interface describes the format of face related data (aka "face index") + * for a file. This is the common subset that is present in both the fields that + * are persisted on remote ({@link RemoteFaceIndex}) and locally + * ({@link LocalFaceIndex}). * - * - Local face detections and embeddings (collectively called as the face - * index) are generated by the current client when uploading a file (or when - * noticing a file which doesn't yet have a face index), stored in the local - * IndexedDB ("ml/db") and also uploaded (E2EE) to remote. + * - Face detections and embeddings (collectively called as the face index) are + * generated by the current client when uploading a file, or when noticing a + * file which doesn't yet have a face index. They are then uploaded (E2EE) to + * remote, and the relevant bits also saved locally in the ML DB for + * subsequent lookup or clustering. * - * - Remote embeddings are fetched by subsequent clients to avoid them having to - * reindex (indexing faces is a costly operation, esp for mobile clients). + * - This face index is then fetched by subsequent clients to avoid them having + * to reindex (indexing faces is a costly operation, esp. for mobile clients). * * In both these scenarios (whether generated locally or fetched from remote), - * we end up with an face index described by this {@link FaceIndex} interface. - * - * It has a top level envelope with information about the file (in particular - * the primary key {@link fileID}), an inner envelope {@link faceEmbedding} with - * metadata about the indexing, and an array of {@link faces} each containing - * the result of a face detection and an embedding for that detected face. - * - * The word embedding is used to refer two things: The last one (faceEmbedding > - * faces > embedding) is the "actual" embedding, but sometimes we colloquially - * refer to the inner envelope (the "faceEmbedding") also an embedding since a - * file can have other types of embedding (envelopes), e.g. a "clipEmbedding". + * we end up with an some data described by this {@link FaceIndex} interface. We + * modify it slightly, adding envelopes, when saving them locally or uploading + * them to remote - those variations are described by the {@link LocalFaceIndex} + * and {@link RemoteFaceIndex} types respectively. */ export interface FaceIndex { + /** + * The width (in px) of the image (file). + * + * Having the image dimensions here is useful since the coordinates inside + * the {@link Face}s are all normalized (0 to 1) to the width and height of + * the image. + */ + width: number; + /** + * The height (in px) of the image (file). + */ + height: number; + /** + * The list of faces (and their embeddings) detected in the file. + * + * Each of the items is a {@link Face}, containing the result of a face + * detection, and an embedding for that detected face. + */ + faces: Face[]; +} + +export type RemoteFaceIndex = FaceIndex & { + /** + * An integral version number of the indexing algorithm / pipeline. + * + * [Note: Embedding versions] + * + * Embeddings have an associated version so it is possible for us to make + * backward compatible updates to the indexing process on newer clients. + * + * Clients agree out of band what a particular version means, and guarantee + * that an embedding with a particular version will be the same (to epsilon + * cosine similarity) irrespective of the client that indexed the file. + * + * If we bump the version of same model (say when indexing on a newer + * client), we will do it in a manner that older client will be able to + * consume the response. The schema should not change in non-additive + * manners. For example, say if we improve blur detection, older client + * should just consume embeddings with a newer version and not try to index + * the file again locally. + * + * When fetching from remote, if we get an embedding with version that is + * older than the version the client supports, then the client should ignore + * it. This way, the file will get reindexed locally and an embedding with a + * newer version will also get saved to remote. + * + * In the case where the changes are not backward compatible and can only be + * consumed by clients by making code changes, then we will introduce a new + * subtype (top level key) in the derived data. + */ + version: number; + /** + * The UA for the client which generated this embedding. + */ + client: string; +}; + +export type LocalFaceIndex = FaceIndex & { /** * The ID of the {@link EnteFile} whose index this is. * @@ -69,36 +122,7 @@ export interface FaceIndex { * user will get a file entry with a fileID unique to them). */ fileID: number; - /** - * The width (in px) of the image (file). - */ - width: number; - /** - * The height (in px) of the image (file). - */ - height: number; - /** - * The "face embedding" for the file. - * - * This is an envelope that contains a list of indexed faces and metadata - * about the indexing. - */ - faceEmbedding: { - /** - * An integral version number of the indexing algorithm / pipeline. - * - * Clients agree out of band what a particular version means. The - * guarantee is that an embedding with a particular version will be the - * same (to negligible floating point epsilons) irrespective of the - * client that indexed the file. - */ - version: number; - /** The UA for the client which generated this embedding. */ - client: string; - /** The list of faces (and their embeddings) detected in the file. */ - faces: Face[]; - }; -} +}; /** * A face detected in a file, and an embedding for this detected face. @@ -199,63 +223,31 @@ export interface Box { * This function is the entry point to the face indexing pipeline. The file goes * through various stages: * - * 1. Downloading the original if needed. - * 2. Detect faces using ONNX/YOLO - * 3. Align the face rectangles, compute blur. - * 4. Compute embeddings using ONNX/MFNT for the detected face (crop). + * 1. Detect faces using ONNX/YOLO + * 2. Align the face rectangles, compute blur. + * 3. Compute embeddings using ONNX/MFNT for the detected face (crop). * * Once all of it is done, it returns the face rectangles and embeddings so that * they can be saved locally (for offline use), and also uploaded to the user's * remote storage so that their other devices can download them instead of * needing to reindex. * - * As an optimization, it also saves the face crops of the detected faces to the - * local cache (they can be regenerated independently too by using - * {@link regenerateFaceCrops}). - * * @param enteFile The {@link EnteFile} to index. * * @param image The file's contents. * * @param electron The {@link MLWorkerElectron} instance that allows us to call - * our Node.js layer for various functionality. - * - * @param userAgent The UA of the client that is doing the indexing (us). + * our Node.js layer to run the ONNX inference. */ export const indexFaces = async ( enteFile: EnteFile, - image: ImageBitmapAndData, + { data: imageData }: ImageBitmapAndData, electron: MLWorkerElectron, - userAgent: string, -): Promise => { - const { bitmap: imageBitmap, data: imageData } = image; - - const { width, height } = imageBitmap; - const fileID = enteFile.id; - - const faceIndex = { - fileID, - width, - height, - faceEmbedding: { - version: faceIndexingVersion, - client: userAgent, - faces: await indexFaces_(fileID, imageData, electron), - }, - }; - - // This step, saving face crops, is not part of the indexing pipeline; - // we just do it here since we have already have the ImageBitmap at - // hand. Ignore errors that happen during this since it does not impact - // the generated face index. - try { - await saveFaceCrops(imageBitmap, faceIndex); - } catch (e) { - log.error(`Failed to save face crops for file ${fileID}`, e); - } - - return faceIndex; -}; +): Promise => ({ + width: imageData.width, + height: imageData.height, + faces: await indexFaces_(enteFile.id, imageData, electron), +}); const indexFaces_ = async ( fileID: number, diff --git a/web/packages/new/photos/services/ml/index.ts b/web/packages/new/photos/services/ml/index.ts index fee1f0d2eb..8a28ba7bb7 100644 --- a/web/packages/new/photos/services/ml/index.ts +++ b/web/packages/new/photos/services/ml/index.ts @@ -363,8 +363,7 @@ const setInterimScheduledStatus = () => { let nSyncedFiles = 0, nTotalFiles = 0; if (_mlStatusSnapshot && _mlStatusSnapshot.phase != "disabled") { - nSyncedFiles = _mlStatusSnapshot.nSyncedFiles; - nTotalFiles = _mlStatusSnapshot.nTotalFiles; + ({ nSyncedFiles, nTotalFiles } = _mlStatusSnapshot); } setMLStatusSnapshot({ phase: "scheduled", nSyncedFiles, nTotalFiles }); }; @@ -379,7 +378,7 @@ export const unidentifiedFaceIDs = async ( enteFile: EnteFile, ): Promise => { const index = await faceIndex(enteFile.id); - return index?.faceEmbedding.faces.map((f) => f.faceID) ?? []; + return index?.faces.map((f) => f.faceID) ?? []; }; /** @@ -393,7 +392,7 @@ export const regenerateFaceCropsIfNeeded = async (enteFile: EnteFile) => { const index = await faceIndex(enteFile.id); if (!index) return false; - const faceIDs = index.faceEmbedding.faces.map((f) => f.faceID); + const faceIDs = index.faces.map((f) => f.faceID); const cache = await blobCache("face-crops"); for (const id of faceIDs) { if (!(await cache.has(id))) { diff --git a/web/packages/new/photos/services/ml/worker.ts b/web/packages/new/photos/services/ml/worker.ts index c7471d9e91..7b4e70a54d 100644 --- a/web/packages/new/photos/services/ml/worker.ts +++ b/web/packages/new/photos/services/ml/worker.ts @@ -16,24 +16,32 @@ import { renderableBlob, type ImageBitmapAndData, } from "./blob"; -import { indexCLIP, type CLIPIndex } from "./clip"; +import { clipIndexingVersion, indexCLIP, type CLIPIndex } from "./clip"; +import { saveFaceCrops } from "./crop"; import { indexableFileIDs, markIndexingFailed, - saveCLIPIndex, - saveFaceIndex, + saveIndexes, updateAssumingLocalFiles, } from "./db"; -import { pullFaceEmbeddings, putCLIPIndex, putFaceIndex } from "./embedding"; -import { indexFaces, type FaceIndex } from "./face"; +import { + fetchDerivedData, + putDerivedData, + type RemoteDerivedData, +} from "./embedding"; +import { faceIndexingVersion, indexFaces, type FaceIndex } from "./face"; import type { MLWorkerDelegate, MLWorkerElectron } from "./worker-types"; const idleDurationStart = 5; /* 5 seconds */ const idleDurationMax = 16 * 60; /* 16 minutes */ interface IndexableItem { + /** The {@link EnteFile} to index (potentially). */ enteFile: EnteFile; + /** If the file was uploaded from the current client, then its contents. */ uploadItem: UploadItem | undefined; + /** The existing derived data on remote corresponding to this file. */ + remoteDerivedData: RemoteDerivedData | undefined; } /** @@ -47,24 +55,22 @@ interface IndexableItem { * * ext. event state then state * ------------- --------------- -------------- - * sync -> "pull" -> "idle" + * sync -> "backfillq" -> "idle" * upload -> "liveq" -> "idle" * idleTimeout -> "backfillq" -> "idle" * * where: * - * - "pull": pulling embeddings from remote - * - "liveq": indexing items that are being uploaded - * - "backfillq": indexing unindexed items otherwise - * - "idle": in between state transitions + * - "liveq": indexing items that are being uploaded, + * - "backfillq": fetching remote embeddings of unindexed items, and then + * indexing them if needed, + * - "idle": in between state transitions. */ export class MLWorker { private electron: MLWorkerElectron | undefined; private delegate: MLWorkerDelegate | undefined; private userAgent: string | undefined; - private state: "idle" | "pull" | "indexing" = "idle"; - private shouldPull = false; - private havePulledAtLeastOnce = false; + private state: "idle" | "indexing" = "idle"; private liveQ: IndexableItem[] = []; private idleTimeout: ReturnType | undefined; private idleDuration = idleDurationStart; /* unit: seconds */ @@ -73,11 +79,11 @@ export class MLWorker { * Initialize a new {@link MLWorker}. * * This is conceptually the constructor, however it is easier to have this - * as a separate function to avoid confounding the comlink types too much. + * as a separate function to avoid complicating the comlink types further. * * @param electron The {@link MLWorkerElectron} that allows the worker to * use the functionality provided by our Node.js layer when running in the - * context of our desktop app + * context of our desktop app. * * @param delegate The {@link MLWorkerDelegate} the worker can use to inform * the main thread of interesting events. @@ -93,17 +99,14 @@ export class MLWorker { } /** - * Pull embeddings from remote, and start backfilling if needed. + * Start backfilling if needed. * - * This function enqueues the pull and returns immediately without waiting - * for the pull to complete. - * - * While it only triggers a pull, once the pull is done it also checks for - * pending items to backfill. So it implicitly also triggers a backfill - * (which is why call it a less-precise sync instead of pull). + * This function enqueues a backfill attempt and returns immediately without + * waiting for it complete. During a backfill, it will first attempt to + * fetch embeddings for files which don't have that data locally. If we + * fetch and find what we need, we save it locally. Otherwise we index them. */ sync() { - this.shouldPull = true; this.wakeUp(); } @@ -139,7 +142,10 @@ export class MLWorker { // live queue is just an optimization: if a file doesn't get indexed via // the live queue, it'll later get indexed anyway when we backfill. if (this.liveQ.length < 200) { - this.liveQ.push({ enteFile, uploadItem }); + // The file is just being uploaded, and so will not have any + // pre-existing derived data on remote. + const remoteDerivedData = undefined; + this.liveQ.push({ enteFile, uploadItem, remoteDerivedData }); this.wakeUp(); } else { log.debug(() => "Ignoring upload item since liveQ is full"); @@ -158,7 +164,6 @@ export class MLWorker { "ml/tick", { state: this.state, - shouldSync: this.shouldPull, liveQ: this.liveQ, idleDuration: this.idleDuration, }, @@ -166,46 +171,12 @@ export class MLWorker { const scheduleTick = () => void setTimeout(() => this.tick(), 0); - // If we've been asked to sync, do that irrespective of anything else. - if (this.shouldPull) { - // Allow this flag to be reset while we're busy pulling (triggering - // another pull when we tick next). - this.shouldPull = false; - this.state = "pull"; - try { - const didPull = await pull(); - // Mark that we completed once attempt at pulling successfully - // (irrespective of whether or not that got us some data). - this.havePulledAtLeastOnce = true; - // Reset the idle duration if we did pull something. - if (didPull) this.idleDuration = idleDurationStart; - } catch (e) { - log.error("Failed to pull embeddings", e); - } - // Tick again, even if we got an error. - // - // While the backfillQ won't be processed until at least a pull has - // happened once (`havePulledAtLeastOnce`), the liveQ can still be - // processed since these are new files without remote embeddings. - scheduleTick(); - return; - } - const liveQ = this.liveQ; this.liveQ = []; this.state = "indexing"; - // Use the liveQ if present, otherwise get the next batch to backfill, - // but only if we've pulled once from remote successfully (otherwise we - // might end up reindexing files that were already indexed on remote but - // which we didn't know about since pull failed, say, for transient - // network issues). - const items = - liveQ.length > 0 - ? liveQ - : this.havePulledAtLeastOnce - ? await this.backfillQ() - : []; + // Use the liveQ if present, otherwise get the next batch to backfill. + const items = liveQ.length > 0 ? liveQ : await this.backfillQ(); const allSuccess = await indexNextBatch( items, @@ -239,40 +210,25 @@ export class MLWorker { /** Return the next batch of items to backfill (if any). */ async backfillQ() { const userID = ensure(await getKVN("userID")); - return syncWithLocalFilesAndGetFilesToIndex(userID, 200).then((fs) => - fs.map((f) => ({ enteFile: f, uploadItem: undefined })), + // Find files that our local DB thinks need syncing. + const filesByID = await syncWithLocalFilesAndGetFilesToIndex( + userID, + 200, ); + if (!filesByID.size) return []; + // Fetch their existing derived data (if any). + const derivedDataByID = await fetchDerivedData(filesByID); + // Return files after annotating them with their existing derived data. + return Array.from(filesByID, ([id, file]) => ({ + enteFile: file, + uploadItem: undefined, + remoteDerivedData: derivedDataByID.get(id), + })); } } expose(MLWorker); -/** - * Pull embeddings from remote. - * - * Return true atleast one embedding was pulled. - */ -const pull = async () => { - const res = await Promise.allSettled([ - pullFaceEmbeddings(), - // TODO-ML: clip-test - // pullCLIPEmbeddings(), - ]); - for (const r of res) { - switch (r.status) { - case "fulfilled": - // Return true if any pulled something. - if (r.value) return true; - break; - case "rejected": - // Throw if any failed. - throw r.reason; - } - } - // Return false if neither pulled anything. - return false; -}; - /** * Find out files which need to be indexed. Then index the next batch of them. * @@ -301,9 +257,9 @@ const indexNextBatch = async ( // Index, keeping track if any of the items failed. let allSuccess = true; - for (const { enteFile, uploadItem } of items) { + for (const item of items) { try { - await index(enteFile, uploadItem, electron, userAgent); + await index(item, electron, userAgent); delegate?.workerDidProcessFile(); // Possibly unnecessary, but let us drain the microtask queue. await wait(0); @@ -330,7 +286,7 @@ const indexNextBatch = async ( const syncWithLocalFilesAndGetFilesToIndex = async ( userID: number, count: number, -): Promise => { +): Promise> => { const isIndexable = (f: EnteFile) => f.ownerID == userID; const localFiles = await getAllLocalFiles(); @@ -346,24 +302,14 @@ const syncWithLocalFilesAndGetFilesToIndex = async ( ); const fileIDsToIndex = await indexableFileIDs(count); - return fileIDsToIndex.map((id) => ensure(localFilesByID.get(id))); + return new Map( + fileIDsToIndex.map((id) => [id, ensure(localFilesByID.get(id))]), + ); }; /** * Index file, save the persist the results locally, and put them on remote. * - * @param enteFile The {@link EnteFile} to index. - * - * @param uploadItem If the file is one which is being uploaded from the current - * client, then we will also have access to the file's content. In such cases, - * passing a web {@link File} object will directly use that its data when - * indexing. Otherwise (when this is not provided), the file's contents will be - * downloaded and decrypted from remote. - * - * @param userAgent The UA of the client that is doing the indexing (us). - * - * --- - * * [Note: ML indexing does more ML] * * Nominally, and primarily, indexing a file involves computing its various ML @@ -371,17 +317,97 @@ const syncWithLocalFilesAndGetFilesToIndex = async ( * the original file in memory, it is a great time to also compute other derived * data related to the file (instead of re-downloading it again). * - * So this index function also does things that are not related to ML: - * extracting and updating Exif. + * So this function also does things that are not related to ML and/or indexing: + * + * - Extracting and updating Exif. + * - Saving face crops. + * + * --- + * + * [Note: Transient and permanent indexing failures] + * + * We mark indexing for a file as having failed only if there is a good chance + * that the indexing failed because of some inherent issue with that particular + * file, and not if there were generic failures (like when trying to save the + * indexes to remote). + * + * When we mark it as failed, then a flag is persisted corresponding to this + * file in the ML DB so that it won't get reindexed in future runs. This are + * thus considered as permanent failures. + * + * > We might retry these in future versions if we identify reasons for indexing + * > to fail (it ideally shouldn't) and rectify them. + * + * On the other hand, saving the face index to remote might fail for transient + * issues (network issues, or remote having hiccups). We don't mark a file as + * failed permanently in such cases, so that it gets retried at some point. + * These are considered as transient failures. + * + * However, it is vary hard to pre-emptively enumerate all possible failure + * modes, and there is a the possibility of some non-transient failure getting + * classified as a transient failure and causing the client to try and index the + * same file again and again, when in fact there is a issue specific to that + * file which is preventing the index from being saved. What exactly? We don't + * know, but the possibility exists. + * + * To reduce the chances of this happening, we treat HTTP 4xx responses as + * permanent failures too - there are no known cases where a client retrying a + * 4xx response would work, and there are expected (but rare) cases where a + * client might get a non-retriable 4xx (e.g. if the file has over ~700 faces, + * then remote will return a 413 Request Entity Too Large). */ const index = async ( - enteFile: EnteFile, - uploadItem: UploadItem | undefined, + { enteFile, uploadItem, remoteDerivedData }: IndexableItem, electron: MLWorkerElectron, userAgent: string, ) => { const f = fileLogID(enteFile); - const startTime = Date.now(); + const fileID = enteFile.id; + + // Massage the existing data (if any) that we got from remote to the form + // that the rest of this function operates on. + // + // Discard any existing data that is made by an older indexing pipelines. + // See: [Note: Embedding versions] + + const existingRemoteFaceIndex = remoteDerivedData?.parsed?.face; + const existingRemoteCLIPIndex = remoteDerivedData?.parsed?.clip; + + let existingFaceIndex: FaceIndex | undefined; + if ( + existingRemoteFaceIndex && + existingRemoteFaceIndex.version >= faceIndexingVersion + ) { + const { width, height, faces } = existingRemoteFaceIndex; + existingFaceIndex = { width, height, faces }; + } + + let existingCLIPIndex: CLIPIndex | undefined; + if ( + existingRemoteCLIPIndex && + existingRemoteCLIPIndex.version >= clipIndexingVersion + ) { + const { embedding } = existingRemoteCLIPIndex; + existingCLIPIndex = { embedding }; + } + + // See if we already have all the derived data fields that we need. If so, + // just update our local db and return. + + if (existingFaceIndex && existingCLIPIndex) { + try { + await saveIndexes( + { fileID, ...existingFaceIndex }, + { fileID, ...existingCLIPIndex }, + ); + } catch (e) { + log.error(`Failed to save indexes data for ${f}`, e); + throw e; + } + return; + } + + // There is at least one derived data type that still needs to be indexed. const imageBlob = await renderableBlob(enteFile, uploadItem, electron); @@ -392,116 +418,98 @@ const index = async ( // If we cannot get the raw image data for the file, then retrying again // won't help. It'd only make sense to retry later if modify // `renderableBlob` to be do something different for this type of file. + // + // See: [Note: Transient and permanent indexing failures] log.error(`Failed to get image data for indexing ${f}`, e); await markIndexingFailed(enteFile.id); throw e; } - const res = await Promise.allSettled([ - _indexFace(f, enteFile, image, electron, userAgent), - // TODO-ML: clip-test - // _indexCLIP(f, enteFile, image, electron, userAgent), - ]); - image.bitmap.close(); + try { + let faceIndex: FaceIndex; + let clipIndex: CLIPIndex; - const msg: string[] = []; - for (const r of res) { - if (r.status == "rejected") throw r.reason; - else msg.push(r.value); + const startTime = Date.now(); + + try { + [faceIndex, clipIndex] = await Promise.all([ + existingFaceIndex ?? indexFaces(enteFile, image, electron), + existingCLIPIndex ?? indexCLIP(image, electron), + ]); + } catch (e) { + // See: [Note: Transient and permanent indexing failures] + log.error(`Failed to index ${f}`, e); + await markIndexingFailed(enteFile.id); + throw e; + } + + log.debug(() => { + const ms = Date.now() - startTime; + const msg = []; + if (!existingFaceIndex) msg.push(`${faceIndex.faces.length} faces`); + if (!existingCLIPIndex) msg.push("clip"); + return `Indexed ${msg.join(" and ")} in ${f} (${ms} ms)`; + }); + + const remoteFaceIndex = existingRemoteFaceIndex ?? { + version: faceIndexingVersion, + client: userAgent, + ...faceIndex, + }; + + const remoteCLIPIndex = existingRemoteCLIPIndex ?? { + version: clipIndexingVersion, + client: userAgent, + ...clipIndex, + }; + + // Perform an "upsert" by using the existing raw data we got from the + // remote as the base, and inserting or overwriting any newly indexed + // parts. See: [Note: Preserve unknown derived data fields]. + + const existingRawDerivedData = remoteDerivedData?.raw ?? {}; + const rawDerivedData = { + ...existingRawDerivedData, + face: remoteFaceIndex, + clip: remoteCLIPIndex, + }; + + log.debug(() => ["Uploading derived data", rawDerivedData]); + + try { + await putDerivedData(enteFile, rawDerivedData); + } catch (e) { + // See: [Note: Transient and permanent indexing failures] + log.error(`Failed to put derived data for ${f}`, e); + if (isHTTP4xxError(e)) await markIndexingFailed(enteFile.id); + throw e; + } + + try { + await saveIndexes( + { fileID, ...faceIndex }, + { fileID, ...clipIndex }, + ); + } catch (e) { + // Not sure if DB failures should be considered permanent or + // transient. There isn't a known case where writing to the local + // indexedDB would fail. + log.error(`Failed to save indexes for ${f}`, e); + throw e; + } + + // This step, saving face crops, is conceptually not part of the + // indexing pipeline; we just do it here since we have already have the + // ImageBitmap at hand. Ignore errors that happen during this since it + // does not impact the generated face index. + if (!existingFaceIndex) { + try { + await saveFaceCrops(image.bitmap, faceIndex); + } catch (e) { + log.error(`Failed to save face crops for ${f}`, e); + } + } + } finally { + image.bitmap.close(); } - - log.debug(() => { - const ms = Date.now() - startTime; - return `Indexed ${msg.join(" and ")} in ${f} (${ms} ms)`; - }); -}; - -const _indexFace = async ( - f: string, - enteFile: EnteFile, - image: ImageBitmapAndData, - electron: MLWorkerElectron, - userAgent: string, -) => { - let faceIndex: FaceIndex; - try { - faceIndex = await indexFaces(enteFile, image, electron, userAgent); - } catch (e) { - log.error(`Failed to index faces in ${f}`, e); - await markIndexingFailed(enteFile.id); - throw e; - } - - // [Note: Transient and permanent indexing failures] - // - // Generally speaking, we mark indexing for a file as having failed only if - // the indexing itself failed, not if there were subsequent failures (like - // when trying to put the result to remote or save it to the local face DB). - // - // When we mark it as failed, then a flag is persisted corresponding to this - // file in the ML DB so that it won't get reindexed in future runs. This are - // thus considered as permanent failures. - // - // > We might retry in future versions if we identify reasons for indexing - // > to fail (it shouldn't) and rectify them. - // - // On the other hand, saving the face index to remote might fail for - // transient issues (network issues, or remote having hiccups). We don't - // mark a file as failed permanently in such cases, so that it gets retried - // at some point. These are considered as transient failures. - // - // However, this opens the possibility of some non-transient failure getting - // classified as a transient failure and causing the client to try and index - // the same file again and again, when in fact there is a issue specific to - // that file which is preventing the index from being saved. What exactly? - // We don't know, but the possibility exists. - // - // To reduce the chances of this happening, we treat HTTP 4xx responses as - // permanent failures too - there are no known cases where a client retrying - // a 4xx response would work, and there are known (but rare) cases where a - // client might get a 4xx (e.g. if the file has over ~700 faces, then remote - // will return a 413 Request Entity Too Large). - - try { - await putFaceIndex(enteFile, faceIndex); - await saveFaceIndex(faceIndex); - } catch (e) { - log.error(`Failed to put/save face index for ${f}`, e); - if (isHTTP4xxError(e)) await markIndexingFailed(enteFile.id); - throw e; - } - - // A message for debug printing. - return `${faceIndex.faceEmbedding.faces.length} faces`; -}; - -// TODO-ML: clip-test export -export const _indexCLIP = async ( - f: string, - enteFile: EnteFile, - image: ImageBitmapAndData, - electron: MLWorkerElectron, - userAgent: string, -) => { - let clipIndex: CLIPIndex; - try { - clipIndex = await indexCLIP(enteFile, image, electron, userAgent); - } catch (e) { - log.error(`Failed to index CLIP in ${f}`, e); - await markIndexingFailed(enteFile.id); - throw e; - } - - // See: [Note: Transient and permanent indexing failures] - try { - await putCLIPIndex(enteFile, clipIndex); - await saveCLIPIndex(clipIndex); - } catch (e) { - log.error(`Failed to put/save CLIP index for ${f}`, e); - if (isHTTP4xxError(e)) await markIndexingFailed(enteFile.id); - throw e; - } - - // A message for debug printing. - return "clip"; }; diff --git a/web/packages/new/shared/crypto/ente.ts b/web/packages/new/shared/crypto/ente.ts index 5e95e77627..dd230d6a15 100644 --- a/web/packages/new/shared/crypto/ente.ts +++ b/web/packages/new/shared/crypto/ente.ts @@ -9,9 +9,9 @@ import * as libsodium from "@ente/shared/crypto/internal/libsodium"; /** - * Encrypt arbitrary metadata associated with a file using its key. + * Encrypt arbitrary metadata associated with a file using the file's key. * - * @param metadata The metadata (string) to encrypt. + * @param metadata The metadata (bytes) to encrypt. * * @param keyB64 Base64 encoded string containing the encryption key (this'll * generally be the file's key). @@ -20,16 +20,10 @@ import * as libsodium from "@ente/shared/crypto/internal/libsodium"; * decryption header. */ export const encryptFileMetadata = async ( - metadata: string, + metadata: Uint8Array, keyB64: string, -): Promise<{ encryptedMetadataB64: string; decryptionHeaderB64: string }> => { - const encoder = new TextEncoder(); - const encodedMetadata = encoder.encode(metadata); - - const { file } = await libsodium.encryptChaChaOneShot( - encodedMetadata, - keyB64, - ); +) => { + const { file } = await libsodium.encryptChaChaOneShot(metadata, keyB64); return { encryptedMetadataB64: await libsodium.toB64(file.encryptedData), decryptionHeaderB64: file.decryptionHeader, @@ -37,7 +31,7 @@ export const encryptFileMetadata = async ( }; /** - * Decrypt arbitrary metadata associated with a file using the its key. + * Decrypt arbitrary metadata associated with a file using the file's key. * * @param encryptedMetadataB64 Base64 encoded string containing the encrypted * data. @@ -48,18 +42,15 @@ export const encryptFileMetadata = async ( * @param keyB64 Base64 encoded string containing the encryption key (this'll * generally be the file's key). * - * @returns The decrypted utf-8 string. + * @returns The decrypted metadata bytes. */ export const decryptFileMetadata = async ( encryptedMetadataB64: string, decryptionHeaderB64: string, keyB64: string, -) => { - const metadataBytes = await libsodium.decryptChaChaOneShot( +) => + libsodium.decryptChaChaOneShot( await libsodium.fromB64(encryptedMetadataB64), await libsodium.fromB64(decryptionHeaderB64), keyB64, ); - const textDecoder = new TextDecoder(); - return textDecoder.decode(metadataBytes); -}; diff --git a/web/packages/next/blob-cache.ts b/web/packages/next/blob-cache.ts index 750f3effd8..dae10f7725 100644 --- a/web/packages/next/blob-cache.ts +++ b/web/packages/next/blob-cache.ts @@ -143,6 +143,13 @@ export const openBlobCache = async ( * To convert from a Uint8Array/ArrayBuffer/Blob to a ReadableStream * * new Response(array).body + * OR + * new Blob([...]).stream() + * + * To convert from a stream to a string (or JSON) + * + * await new Response(stream).text() + * await new Response(stream).json() * * Refs: * - https://github.com/yigitunallar/arraybuffer-vs-blob