[desktop] ML new combined format - Part 2/2 (#2484)

Continues (and except minor touchups) and completes
https://github.com/ente-io/ente/pull/2460
This commit is contained in:
Manav Rathi
2024-07-18 15:31:48 +05:30
committed by GitHub
10 changed files with 684 additions and 799 deletions

View File

@@ -15,6 +15,30 @@ import {
} from "types/entity";
import { getLatestVersionEntities } from "utils/entity";
/**
* The maximum number of items to fetch in a single diff
*
* [Note: Limit of returned items in /diff requests]
*
* The various GET /diff API methods, which tell the client what all has changed
* since a timestamp (provided by the client) take a limit parameter.
*
* These diff API calls return all items whose updated at is greater
* (non-inclusive) than the timestamp we provide. So there is no mechanism for
* pagination of items which have the same exact updated at.
*
* Conceptually, it may happen that there are more items than the limit we've
* provided, but there are practical safeguards.
*
* For file diff, the limit is advisory, and remote may return less, equal or
* more items than the provided limit. The scenario where it returns more is
* when more files than the limit have the same updated at. Theoretically it
* would make the diff response unbounded, however in practice file
* modifications themselves are all batched. Even if the user were to select all
* the files in their library and updates them all in one go in the UI, their
* client app is required to use batched API calls to make those updates, and
* each of those batches would get distinct updated at.
*/
const DIFF_LIMIT = 500;
const ENTITY_TABLES: Record<EntityType, string> = {

View File

@@ -1,4 +1,3 @@
import type { EnteFile } from "@/new/photos/types/file";
import type { Electron } from "@/next/types/ipc";
import type { ImageBitmapAndData } from "./blob";
import { clipIndexes } from "./db";
@@ -59,14 +58,8 @@ export const clipIndexingVersion = 1;
* itself, is the same across clients - web and mobile.
*/
export interface CLIPIndex {
/** The ID of the {@link EnteFile} whose index this is. */
fileID: number;
/** An integral version number of the indexing algorithm / pipeline. */
version: number;
/** The UA for the client which generated this embedding. */
client: string;
/**
* The CLIP embedding itself.
* The CLIP embedding.
*
* This is an array of 512 floating point values that represent the
* embedding of the image in the same space where we'll embed the text so
@@ -75,6 +68,18 @@ export interface CLIPIndex {
embedding: number[];
}
export type RemoteCLIPIndex = CLIPIndex & {
/** An integral version number of the indexing algorithm / pipeline. */
version: number;
/** The UA for the client which generated this embedding. */
client: string;
};
export type LocalCLIPIndex = CLIPIndex & {
/** The ID of the {@link EnteFile} whose index this is. */
fileID: number;
};
/**
* Compute the CLIP embedding of a given file.
*
@@ -89,33 +94,19 @@ export interface CLIPIndex {
* that it can be saved locally and also uploaded to the user's remote storage
* for use on their other devices).
*
* @param enteFile The {@link EnteFile} to index.
*
* @param uploadItem If we're called during the upload process, then this will
* be set to the {@link UploadItem} that was uploaded. This way, we can directly
* use the on-disk file instead of needing to download the original from remote.
*
* @param electron The {@link MLWorkerElectron} instance that allows us to call
* our Node.js layer for various functionality.
*
* @param userAgent The UA of the client that is doing the indexing (us).
* our Node.js layer to run the ONNX inference.
*/
export const indexCLIP = async (
enteFile: EnteFile,
image: ImageBitmapAndData,
electron: MLWorkerElectron,
userAgent: string,
): Promise<CLIPIndex> => {
const { data: imageData } = image;
const fileID = enteFile.id;
return {
fileID,
version: clipIndexingVersion,
client: userAgent,
embedding: await computeEmbedding(imageData, electron),
};
};
): Promise<CLIPIndex> => ({
embedding: await computeEmbedding(image.data, electron),
});
const computeEmbedding = async (
imageData: ImageData,
@@ -184,10 +175,10 @@ const normalized = (embedding: Float32Array) => {
* native code running in our desktop app (the embedding happens in the native
* layer).
*
* It return a list of files that should be shown in the search results. The
* actual return type is a map from fileIDs to the scores they got (higher is
* better). This map will only contains entries whose score was above our
* minimum threshold.
* It returns file (IDs) that should be shown in the search results. They're
* returned as a map from fileIDs to the scores they got (higher is better).
* This map will only contains entries whose score was above our minimum
* threshold.
*
* The result can also be `undefined`, which indicates that the download for the
* ML model is still in progress (trying again later should succeed).

View File

@@ -56,7 +56,7 @@ export const saveFaceCrops = async (
const cache = await blobCache("face-crops");
return Promise.all(
faceIndex.faceEmbedding.faces.map(({ faceID, detection }) =>
faceIndex.faces.map(({ faceID, detection }) =>
extractFaceCrop(imageBitmap, detection.box).then((b) =>
cache.put(faceID, b),
),

View File

@@ -1,9 +1,9 @@
import { removeKV } from "@/next/kv";
import log from "@/next/log";
import localForage from "@ente/shared/storage/localForage";
import { deleteDB, openDB, type DBSchema } from "idb";
import type { CLIPIndex } from "./clip";
import type { EmbeddingModel } from "./embedding";
import type { FaceIndex } from "./face";
import type { LocalCLIPIndex } from "./clip";
import type { LocalFaceIndex } from "./face";
/**
* ML DB schema.
@@ -15,11 +15,11 @@ import type { FaceIndex } from "./face";
* required), this is synced with the list of files that the current client
* knows about locally.
*
* - "face-index": Contains {@link FaceIndex} objects, either indexed locally or
* fetched from remote storage.
* - "face-index": Contains {@link LocalFaceIndex} objects, either indexed
* locally or fetched from remote.
*
* - "clip-index": Contains {@link CLIPIndex} objects, either indexed locally or
* fetched from remote storage.
* - "clip-index": Contains {@link LocalCLIPIndex} objects, either indexed
* locally or fetched from remote.
*
* All the stores are keyed by {@link fileID}. The "file-status" contains
* book-keeping about the indexing process (whether or not a file needs
@@ -27,7 +27,7 @@ import type { FaceIndex } from "./face";
* the actual indexing results.
*
* In tandem, these serve as the underlying storage for the functions exposed by
* this file.
* the ML database.
*/
interface MLDBSchema extends DBSchema {
"file-status": {
@@ -37,11 +37,11 @@ interface MLDBSchema extends DBSchema {
};
"face-index": {
key: number;
value: FaceIndex;
value: LocalFaceIndex;
};
"clip-index": {
key: number;
value: CLIPIndex;
value: LocalCLIPIndex;
};
}
@@ -60,24 +60,11 @@ interface FileStatus {
*
* - "failed" - Indexing was attempted but failed.
*
* There can arise situations in which a file has one, but not all, indexes.
* e.g. it may have a "face-index" but "clip-index" might've not yet
* happened (or failed). In such cases, the status of the file will be
* "indexable": it transitions to "indexed" only after all indexes have been
* computed or fetched.
*
* If you have't heard the word "index" to the point of zoning out, we also
* If you haven't heard the word "index" to the point of zoning out, we also
* have a (IndexedDB) "index" on the status field to allow us to efficiently
* select or count {@link fileIDs} that fall into various buckets.
*/
status: "indexable" | "indexed" | "failed";
/**
* A list of embeddings that we still need to compute for the file.
*
* This is guaranteed to be empty if status is "indexed", and will have at
* least one entry otherwise.
*/
pending: EmbeddingModel[];
/**
* The number of times attempts to index this file failed.
*
@@ -151,6 +138,17 @@ const deleteLegacyDB = () => {
localForage.removeItem("onnx-clip-embedding_sync_time"),
localForage.removeItem("file-ml-clip-face-embedding_sync_time"),
]);
// Delete keys for the legacy diff based sync.
//
// This code was added July 2024 (v1.7.3-beta). These keys were never
// enabled outside of the nightly builds, so this cleanup is not a hard
// need. Either ways, it can be removed at some point when most clients have
// migrated (tag: Migration).
void Promise.all([
removeKV("embeddingSyncTime:onnx-clip"),
removeKV("embeddingSyncTime:file-ml-clip-face"),
]);
};
/**
@@ -183,36 +181,34 @@ export const clearMLDB = async () => {
};
/**
* Save the given {@link faceIndex} locally.
*
* @param faceIndex A {@link FaceIndex} representing the faces that we detected
* (and their corresponding embeddings) in a particular file.
*
* This function adds a new entry for the face index, overwriting any existing
* ones (No merging is performed, the existing entry is unconditionally
* overwritten). The file status is also updated to remove face from the pending
* embeddings. If there are no other pending embeddings, the status changes to
* Save the given {@link faceIndex} and {@link clipIndex}, and mark the file as
* "indexed".
*
* This function adds a new entry for the indexes, overwriting any existing ones
* (No merging is performed, the existing entry is unconditionally overwritten).
* The file status is also updated to "indexed", and the failure count is reset
* to 0.
*/
export const saveFaceIndex = async (faceIndex: FaceIndex) => {
export const saveIndexes = async (
faceIndex: LocalFaceIndex,
clipIndex: LocalCLIPIndex,
) => {
const { fileID } = faceIndex;
const db = await mlDB();
const tx = db.transaction(["file-status", "face-index"], "readwrite");
const statusStore = tx.objectStore("file-status");
const indexStore = tx.objectStore("face-index");
const fileStatus =
(await statusStore.get(IDBKeyRange.only(fileID))) ??
newFileStatus(fileID);
fileStatus.pending = fileStatus.pending.filter(
(v) => v != "file-ml-clip-face",
const tx = db.transaction(
["file-status", "face-index", "clip-index"],
"readwrite",
);
if (fileStatus.pending.length == 0) fileStatus.status = "indexed";
await Promise.all([
statusStore.put(fileStatus),
indexStore.put(faceIndex),
tx.objectStore("file-status").put({
fileID,
status: "indexed",
failureCount: 0,
}),
tx.objectStore("face-index").put(faceIndex),
tx.objectStore("clip-index").put(clipIndex),
tx.done,
]);
};
@@ -224,45 +220,9 @@ export const saveFaceIndex = async (faceIndex: FaceIndex) => {
const newFileStatus = (fileID: number): FileStatus => ({
fileID,
status: "indexable",
// TODO-ML: clip-test
// pending: ["file-ml-clip-face", "onnx-clip"],
pending: ["file-ml-clip-face"],
failureCount: 0,
});
/**
* Save the given {@link clipIndex} locally.
*
* @param clipIndex A {@link CLIPIndex} containing the CLIP embedding for a
* particular file.
*
* This function adds a new entry for the CLIP index, overwriting any existing
* ones (No merging is performed, the existing entry is unconditionally
* overwritten). The file status is also updated to remove CLIP from the pending
* embeddings. If there are no other pending embeddings, the status changes to
* "indexed".
*/
export const saveCLIPIndex = async (clipIndex: CLIPIndex) => {
const { fileID } = clipIndex;
const db = await mlDB();
const tx = db.transaction(["file-status", "clip-index"], "readwrite");
const statusStore = tx.objectStore("file-status");
const indexStore = tx.objectStore("clip-index");
const fileStatus =
(await statusStore.get(IDBKeyRange.only(fileID))) ??
newFileStatus(fileID);
fileStatus.pending = fileStatus.pending.filter((v) => v != "onnx-clip");
if (fileStatus.pending.length == 0) fileStatus.status = "indexed";
await Promise.all([
statusStore.put(fileStatus),
indexStore.put(clipIndex),
tx.done,
]);
};
/**
* Return the {@link FaceIndex}, if any, for {@link fileID}.
*/
@@ -317,9 +277,9 @@ export const addFileEntry = async (fileID: number) => {
* DB are removed from ML DB (including any indexes).
*
* - Files that are not present locally but are in the trash are retained in ML
* DB if their status is "indexed"; otherwise they too are removed. This
* special case is to prevent churn (re-indexing) if the user moves some files
* to trash but then later restores them before they get permanently deleted.
* DB if their status is "indexed"; otherwise they too are removed. This is to
* prevent churn, otherwise they'll get unnecessarily reindexed again if the
* user restores them from trash before permanent deletion.
*/
export const updateAssumingLocalFiles = async (
localFileIDs: number[],

View File

@@ -1,20 +1,15 @@
import {
getAllLocalFiles,
getLocalTrashedFiles,
} from "@/new/photos/services/files";
import type { EnteFile } from "@/new/photos/types/file";
import {
decryptFileMetadata,
encryptFileMetadata,
} from "@/new/shared/crypto/ente";
import { authenticatedRequestHeaders, ensureOk } from "@/next/http";
import { getKV, setKV } from "@/next/kv";
import log from "@/next/log";
import { apiURL } from "@/next/origins";
import { nullToUndefined } from "@/utils/transform";
import { z } from "zod";
import { clipIndexingVersion, type CLIPIndex } from "./clip";
import { saveCLIPIndex, saveFaceIndex } from "./db";
import { faceIndexingVersion, type FaceIndex } from "./face";
import { type RemoteCLIPIndex } from "./clip";
import { type RemoteFaceIndex } from "./face";
/**
* The embeddings that we (the current client) knows how to handle.
@@ -23,34 +18,38 @@ import { faceIndexingVersion, type FaceIndex } from "./face";
* encrypted embeddings from remote. However, we should be prepared to receive a
* {@link RemoteEmbedding} with a model value different from these.
*
* It is vernacularly called a model, but strictly speaking this is not the
* model, but the embedding produced by a particular model with a particular set
* of pre-/post- processing steps and related hyperparameters. It is better
* thought of as an "type" of embedding produced or consumed by the client.
* [Note: Embedding/model vs derived data]
*
* [Note: Handling versioning of embeddings]
* Historically, this has been called an "embedding" or a "model" in the API
* terminology. However, it is more like derived data.
*
* The embeddings themselves have a version embedded in them, so it is possible
* for us to make backward compatible updates to the indexing process on newer
* clients (There is also a top level version field too but that is not used).
* It started off being called as "model", but strictly speaking it was not just
* the model, but the embedding produced by a particular ML model when used with
* a particular set of pre-/post- processing steps and related hyperparameters.
* It is better thought of as an "type" of embedding produced or consumed by the
* client, e.g. the "face" embedding, or the "clip" embedding.
*
* If we bump the version of same model (say when indexing on a newer client),
* the assumption will be that older client will be able to consume the
* response. e.g. Say if we improve blur detection, older client should just
* consume embeddings with a newer version and not try to index the file again
* locally.
* Even the word embedding is a synedoche, since it might have other data. For
* example, for faces, it in not just the face embedding, but also the detection
* regions, landmarks etc: What we've come to refer as the "face index" in our
* client code terminology.
*
* If we get an embedding with version that is older than the version the client
* supports, then the client should ignore it. This way, the file will get
* reindexed locally an embedding with a newer version will get put to remote.
* Later on, to avoid the proliferation of small files (one per embedding), we
* combined all these embeddings into a single "embedding", which is a map of
* the form:
*
* In the case where the changes are not backward compatible and can only be
* consumed by clients with the relevant scaffolding, then we change this
* "model" (i.e "type") field to create a new universe of embeddings.
* {
* "face": ... the face indexing result ...
* "clip": ... the CLIP indexing result ...
* "exif": ... the Exif extracted from the file ...
* ... more in the future ...
* }
*
* Thus, now this is best thought of a tag for a particular format of encoding
* all the derived data associated with a file.
*/
export type EmbeddingModel =
| "onnx-clip" /* CLIP embeddings */
| "file-ml-clip-face" /* Face embeddings */;
// TODO-ML: Fix name to "combined" before release
type EmbeddingModel = "onnx-clip" /* Combined format */;
const RemoteEmbedding = z.object({
/** The ID of the file whose embedding this is. */
@@ -73,182 +72,237 @@ const RemoteEmbedding = z.object({
* the crypto layer.
*/
decryptionHeader: z.string(),
/** Last time (epoch ms) this embedding was updated. */
updatedAt: z.number(),
});
type RemoteEmbedding = z.infer<typeof RemoteEmbedding>;
/**
* Fetch new or updated embeddings from remote and save them locally.
*
* @param model The {@link EmbeddingModel} for which to pull embeddings. For
* each model, this function maintains the last sync time in local storage so
* subsequent fetches only pull what's new.
*
* @param save A function that is called to save the embedding. The save process
* can be model specific, so this provides us a hook to reuse the surrounding
* pull mechanisms while varying the save itself. This function will be passed
* the decrypted embedding string. If it throws, then we'll log about but
* otherwise ignore the embedding under consideration.
*
* This function should be called only after we have synced files with remote.
* See: [Note: Ignoring embeddings for unknown files].
*
* @returns true if at least one embedding was pulled, false otherwise.
*/
const pullEmbeddings = async (
model: EmbeddingModel,
save: (decryptedEmbedding: string) => Promise<void>,
) => {
// Include files from trash, otherwise they'll get unnecessarily reindexed
// if the user restores them from trash before permanent deletion.
const localFiles = (await getAllLocalFiles()).concat(
await getLocalTrashedFiles(),
);
// [Note: Ignoring embeddings for unknown files]
//
// We need the file to decrypt the embedding. This is easily ensured by
// running the embedding sync after we have synced our local files with
// remote.
//
// Still, it might happen that we come across an embedding for which we
// don't have the corresponding file locally. We can put them in two
// buckets:
//
// 1. Known case: In rare cases we might get a diff entry for an embedding
// corresponding to a file which has been deleted (but whose embedding
// is enqueued for deletion). Client should expect such a scenario, but
// all they have to do is just ignore such embeddings.
//
// 2. Other unknown cases: Even if somehow we end up with an embedding for
// a existent file which we don't have locally, it is fine because the
// current client will just regenerate the embedding if the file really
// exists and gets locally found later. There would be a bit of
// duplicate work, but that's fine as long as there isn't a systematic
// scenario where this happens.
const localFilesByID = new Map(localFiles.map((f) => [f.id, f]));
export type RawRemoteDerivedData = Record<string, unknown>;
let didPull = false;
let sinceTime = await embeddingSyncTime(model);
// TODO: eslint has fixed this spurious warning, but we're not on the latest
// version yet, so add a disable.
// https://github.com/eslint/eslint/pull/18286
/* eslint-disable no-constant-condition */
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
const remoteEmbeddings = await getEmbeddingsDiff(model, sinceTime);
if (remoteEmbeddings.length == 0) break;
let count = 0;
for (const remoteEmbedding of remoteEmbeddings) {
sinceTime = Math.max(sinceTime, remoteEmbedding.updatedAt);
try {
const file = localFilesByID.get(remoteEmbedding.fileID);
if (!file) continue;
await save(
await decryptFileMetadata(
remoteEmbedding.encryptedEmbedding,
remoteEmbedding.decryptionHeader,
file.key,
),
);
didPull = true;
count++;
} catch (e) {
log.warn(`Ignoring unparseable ${model} embedding`, e);
}
export type ParsedRemoteDerivedData = Partial<{
face: RemoteFaceIndex;
clip: RemoteCLIPIndex;
}>;
/**
* The decrypted payload of a {@link RemoteEmbedding} for the "combined"
* {@link EmbeddingModel}.
*
* [Note: Preserve unknown derived data fields]
*
* The remote derived data can contain arbitrary key at the top level apart from
* the ones that the current client knows about. We need to preserve these
* verbatim when we use {@link putDerivedData}.
*
* Thus we return two separate results from {@link fetchDerivedData}:
*
* - {@link RawRemoteDerivedData}: The original, unmodified JSON.
*
* - {@link ParsedRemoteDerivedData}: The particular fields that the current
* client knows about, parsed according to their expected structure.
*
* When integrating this information into our local state, we use the parsed
* version. And if we need to update the state on remote (e.g. if the current
* client notices an embedding type that was missing), then we use the original
* JSON as the base.
*/
export interface RemoteDerivedData {
raw: RawRemoteDerivedData;
parsed: ParsedRemoteDerivedData | undefined;
}
/**
* Zod schema for the {@link RemoteFaceIndex} type.
*
* [Note: Duplicated Zod schema and TypeScript type]
*
* Usually we define a Zod schema, and then infer the corresponding TypeScript
* type for it using `z.infer`. This works great except that the docstrings
* don't show up: Docstrings get added to the Zod schema, but usually the code
* using the parsed data will reference the TypeScript type, and the docstrings
* added to the fields in the Zod schema won't show up.
*
* We usually live with this infelicity since the alternative is code
* duplication: Defining a TypeScript type (putting the docstrings therein)
* _and_ also a corresponding Zod schema. The duplication is needed because it
* is not possible to go the other way (TypeScript type => Zod schema).
*
* However, in some cases having when the TypeScript type under consideration is
* used pervasively in code, having a standalone TypeScript type with attached
* docstrings is worth the code duplication.
*
* Note that this'll just be syntactic duplication - if the two definitions get
* out of sync in the shape of the types they represent, the TypeScript compiler
* will flag it for us.
*/
const RemoteFaceIndex = z.object({
version: z.number(),
client: z.string(),
width: z.number(),
height: z.number(),
faces: z.array(
z.object({
faceID: z.string(),
detection: z.object({
box: z.object({
x: z.number(),
y: z.number(),
width: z.number(),
height: z.number(),
}),
landmarks: z.array(
z.object({
x: z.number(),
y: z.number(),
}),
),
}),
score: z.number(),
blur: z.number(),
embedding: z.array(z.number()),
}),
),
});
/**
* Zod schema for the {@link RemoteCLIPIndex} type.
*
* See: [Note: Duplicated Zod schema and TypeScript type]
*/
const RemoteCLIPIndex = z.object({
version: z.number(),
client: z.string(),
embedding: z.array(z.number()),
});
/**
* Zod schema for the {@link RawRemoteDerivedData} type.
*/
const RawRemoteDerivedData = z.object({}).passthrough();
/**
* Zod schema for the {@link ParsedRemoteDerivedData} type.
*/
const ParsedRemoteDerivedData = z.object({
face: RemoteFaceIndex.nullish().transform(nullToUndefined),
clip: RemoteCLIPIndex.nullish().transform(nullToUndefined),
});
/**
* Fetch derived data for the given files from remote.
*
* @param filesByID A map containing the files whose derived data we want to
* fetch. Each entry is keyed the the file's ID, and the value is the file.
*
* @returns a map containing the (decrypted) derived data for each file for
* which remote returned the corresponding embedding. Each entry in the map is
* keyed by file's ID, and each value is a {@link RemoteDerivedData} that
* contains both the original JSON, and parsed representation of embeddings that
* we know about.
*/
export const fetchDerivedData = async (
filesByID: Map<number, EnteFile>,
): Promise<Map<number, RemoteDerivedData>> => {
// TODO-ML: Fix name to "combined" before release
const remoteEmbeddings = await fetchEmbeddings("onnx-clip", [
...filesByID.keys(),
]);
const result = new Map<number, RemoteDerivedData>();
for (const remoteEmbedding of remoteEmbeddings) {
const { fileID } = remoteEmbedding;
const file = filesByID.get(fileID);
if (!file) {
log.warn(`Ignoring derived data for unknown fileID ${fileID}`);
continue;
}
try {
const decryptedBytes = await decryptFileMetadata(
remoteEmbedding.encryptedEmbedding,
remoteEmbedding.decryptionHeader,
file.key,
);
const jsonString = await gunzip(decryptedBytes);
result.set(fileID, remoteDerivedDataFromJSONString(jsonString));
} catch (e) {
// This shouldn't happen. Likely some client has uploaded a
// corrupted embedding. Ignore it so that it gets reindexed and
// uploaded correctly again.
log.warn(`Ignoring unparseable embedding for ${fileID}`, e);
}
await saveEmbeddingSyncTime(sinceTime, model);
log.info(`Fetched ${count} ${model} embeddings`);
}
return didPull;
log.debug(() => `Fetched ${result.size} combined embeddings`);
return result;
};
const remoteDerivedDataFromJSONString = (jsonString: string) => {
const raw = RawRemoteDerivedData.parse(JSON.parse(jsonString));
const parseResult = ParsedRemoteDerivedData.safeParse(raw);
// This code is included in apps/photos, which currently does not have the
// TypeScript strict mode enabled, which causes a spurious tsc failure.
//
// eslint-disable-next-line @typescript-eslint/ban-ts-comment, @typescript-eslint/prefer-ts-expect-error
// @ts-ignore
const parsed = parseResult.success
? (parseResult.data as ParsedRemoteDerivedData)
: undefined;
return { raw, parsed };
};
/**
* The updatedAt of the most recent {@link RemoteEmbedding} for {@link model}
* we've retrieved from remote.
* Fetch {@link model} embeddings for the given list of files.
*
* Returns 0 if there is no such embedding.
* @param model The {@link EmbeddingModel} which we want.
*
* This value is persisted to local storage. To update it, use
* {@link saveEmbeddingSyncTime}.
* @param fileIDs The ids of the files for which we want the embeddings.
*
* @returns a list of {@link RemoteEmbedding} for the files which had embeddings
* (and thatt remote was able to successfully retrieve). The order of this list
* is arbitrary, and the caller should use the {@link fileID} present within the
* {@link RemoteEmbedding} to associate an item in the result back to a file
* instead of relying on the order or count of items in the result.
*/
const embeddingSyncTime = async (model: EmbeddingModel) =>
parseInt((await getKV("embeddingSyncTime:" + model)) ?? "0");
/** Sibling of {@link embeddingSyncTime}. */
const saveEmbeddingSyncTime = async (t: number, model: EmbeddingModel) =>
setKV("embeddingSyncTime:" + model, `${t}`);
/**
* The maximum number of items to fetch in a single GET /embeddings/diff
*
* [Note: Limit of returned items in /diff requests]
*
* The various GET /diff API methods, which tell the client what all has changed
* since a timestamp (provided by the client) take a limit parameter.
*
* These diff API calls return all items whose updated at is greater
* (non-inclusive) than the timestamp we provide. So there is no mechanism for
* pagination of items which have the same exact updated at. Conceptually, it
* may happen that there are more items than the limit we've provided.
*
* The behaviour of this limit is different for file diff and embeddings diff.
*
* - For file diff, the limit is advisory, and remote may return less, equal
* or more items than the provided limit. The scenario where it returns more
* is when more files than the limit have the same updated at. Theoretically
* it would make the diff response unbounded, however in practice file
* modifications themselves are all batched. Even if the user selects all
* the files in their library and updates them all in one go in the UI,
* their client app must use batched API calls to make those updates, and
* each of those batches would get distinct updated at.
*
* - For embeddings diff, there are no bulk updates and this limit is enforced
* as a maximum. While theoretically it is possible for an arbitrary number
* of files to have the same updated at, in practice it is not possible with
* the current set of APIs where clients PUT individual embeddings (the
* updated at is a server timestamp). And even if somehow a large number of
* files get the same updated at and thus get truncated in the response, it
* won't lead to any data loss, the client which requested that particular
* truncated diff will just regenerate them.
*/
const diffLimit = 500;
/**
* GET embeddings for the given model that have been updated {@link sinceTime}.
*
* This fetches the next {@link diffLimit} embeddings whose {@link updatedAt} is
* greater than the given {@link sinceTime} (non-inclusive).
*
* @param model The {@link EmbeddingModel} whose diff we wish for.
*
* @param sinceTime The updatedAt of the last embedding we've synced (epoch ms).
* Pass 0 to fetch everything from the beginning.
*
* @returns an array of {@link RemoteEmbedding}. The returned array is limited
* to a maximum count of {@link diffLimit}.
*
* > See [Note: Limit of returned items in /diff requests].
*/
const getEmbeddingsDiff = async (
const fetchEmbeddings = async (
model: EmbeddingModel,
sinceTime: number,
fileIDs: number[],
): Promise<RemoteEmbedding[]> => {
const params = new URLSearchParams({
model,
sinceTime: `${sinceTime}`,
limit: `${diffLimit}`,
});
const url = await apiURL("/embeddings/diff");
const res = await fetch(`${url}?${params.toString()}`, {
const res = await fetch(await apiURL("/embeddings/files"), {
method: "POST",
headers: await authenticatedRequestHeaders(),
body: JSON.stringify({
model,
fileIDs,
}),
});
ensureOk(res);
return z.object({ diff: z.array(RemoteEmbedding) }).parse(await res.json())
.diff;
return z
.object({ embeddings: z.array(RemoteEmbedding) })
.parse(await res.json()).embeddings;
};
/**
* Update the derived data stored for given {@link enteFile} on remote.
*
* This allows other clients to directly pull the derived data instead of
* needing to re-index.
*
* The data on remote will be replaced unconditionally, and it is up to the
* client (us) to ensure that we preserve the parts of the pre-existing derived
* data (if any) that we did not understand or touch.
*
* See: [Note: Preserve unknown derived data fields].
*/
export const putDerivedData = async (
enteFile: EnteFile,
derivedData: RawRemoteDerivedData,
) =>
// TODO-ML: Fix name to "combined" before release
putEmbedding(
enteFile,
"onnx-clip",
await gzip(JSON.stringify(derivedData)),
);
/**
* Upload an embedding to remote.
*
@@ -259,16 +313,14 @@ const getEmbeddingsDiff = async (
*
* @param model The {@link EmbeddingModel} which we are uploading.
*
* @param embedding String representation of the embedding. The exact contents
* of the embedding are model specific (usually this is the JSON string).
* @param embedding The binary data the embedding. The exact contents of the
* embedding are {@link model} specific.
*/
export const putEmbedding = async (
const putEmbedding = async (
enteFile: EnteFile,
model: EmbeddingModel,
embedding: string,
embedding: Uint8Array,
) => {
log.debug(() => ["Uploading embedding", { model, embedding }]);
const { encryptedMetadataB64, decryptionHeaderB64 } =
await encryptFileMetadata(embedding, enteFile.key);
@@ -285,174 +337,35 @@ export const putEmbedding = async (
ensureOk(res);
};
// MARK: - Face
// MARK: - GZIP
/**
* Fetch new or updated face embeddings from remote and save them locally.
* Compress the given {@link string} using "gzip" and return the resultant
* bytes. See {@link gunzip} for the reverse operation.
*
* It takes no parameters since it saves the last sync time in local storage.
*
* This function should be called only after we have synced files with remote.
* See: [Note: Ignoring embeddings for unknown files].
*
* @returns true if at least one embedding was pulled, false otherwise.
* This is syntactic sugar to deal with the string/blob/stream/bytes
* conversions, but it should not be taken as an abstraction layer. If your code
* can directly use a ReadableStream, then then data -> stream -> data round
* trip is unnecessary.
*/
export const pullFaceEmbeddings = () =>
pullEmbeddings("file-ml-clip-face", (jsonString: string) =>
// eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error, @typescript-eslint/ban-ts-comment
// @ts-ignore TODO: There is no error here, but this file is imported by
// one of our packages that doesn't have strict mode enabled yet,
// causing a spurious error to be emitted in that context.
saveFaceIndexIfNewer(FaceIndex.parse(JSON.parse(jsonString))),
);
/**
* Save the given {@link faceIndex} locally if it is newer than the one we have.
*
* This is a variant of {@link saveFaceIndex} that performs version checking as
* described in [Note: Handling versioning of embeddings].
*/
const saveFaceIndexIfNewer = async (index: FaceIndex) => {
const version = index.faceEmbedding.version;
if (version < faceIndexingVersion) {
log.info(
`Ignoring remote face index with version ${version} older than what our indexer can produce (${faceIndexingVersion})`,
);
return;
}
return saveFaceIndex(index);
const gzip = async (string: string) => {
const compressedStream = new Blob([string])
.stream()
// This code only runs on the desktop app currently, so we can rely on
// the existence of new web features the CompressionStream APIs.
.pipeThrough(new CompressionStream("gzip"));
return new Uint8Array(await new Response(compressedStream).arrayBuffer());
};
/**
* Zod schemas for the {@link FaceIndex} types.
*
* [Note: Duplicated between Zod schemas and TS type]
*
* Usually we define a Zod schema, and then infer the corresponding TypeScript
* type for it using `z.infer`. This works great except now the docstrings don't
* show up: The doc strings get added to the Zod schema, but usually the code
* using the parsed data will reference the TypeScript type, and the docstrings
* added to the fields in the Zod schema won't show up.
*
* We usually live with this infelicity, since the alternative is code
* duplication: Define the TypeScript type (putting the docstrings therein)
* _and_ also a corresponding Zod schema. The duplication happens because it is
* not possible to go the other way (TS type => Zod schema).
*
* However, in some cases having when the TypeScript type under consideration is
* used pervasively in code, having a standalone TypeScript type with attached
* docstrings is worth the code duplication.
*
* Note that this'll just be syntactic duplication - if the two definitions get
* out of sync in the shape of the types they represent, the TypeScript compiler
* will flag it for us.
* Decompress the given "gzip" compressed {@link data} and return the resultant
* string. See {@link gzip} for the reverse operation.
*/
const FaceIndex = z
.object({
fileID: z.number(),
width: z.number(),
height: z.number(),
faceEmbedding: z
.object({
version: z.number(),
client: z.string(),
faces: z.array(
z
.object({
faceID: z.string(),
detection: z
.object({
box: z
.object({
x: z.number(),
y: z.number(),
width: z.number(),
height: z.number(),
})
.passthrough(),
landmarks: z.array(
z
.object({
x: z.number(),
y: z.number(),
})
.passthrough(),
),
})
.passthrough(),
score: z.number(),
blur: z.number(),
embedding: z.array(z.number()),
})
.passthrough(),
),
})
.passthrough(),
})
// Retain fields we might not (currently) understand.
.passthrough();
/**
* Save the face index for the given {@link enteFile} on remote so that other
* clients can directly pull it instead of needing to reindex.
*/
export const putFaceIndex = async (enteFile: EnteFile, faceIndex: FaceIndex) =>
putEmbedding(enteFile, "file-ml-clip-face", JSON.stringify(faceIndex));
// MARK: - CLIP
/**
* Fetch new or updated CLIP embeddings from remote and save them locally.
*
* See {@link pullFaceEmbeddings} for a sibling function with more comprehensive
* documentation.
*
* @returns true if at least one embedding was pulled, false otherwise.
*/
export const pullCLIPEmbeddings = () =>
pullEmbeddings("onnx-clip", (jsonString: string) =>
// eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error, @typescript-eslint/ban-ts-comment
// @ts-ignore TODO: There is no error here, but this file is imported by
// one of our packages that doesn't have strict mode enabled yet,
// causing a spurious error to be emitted in that context.
saveCLIPIndexIfNewer(CLIPIndex.parse(JSON.parse(jsonString))),
);
/**
* Save the given {@link clipIndex} locally if it is newer than the one we have.
*
* This is a variant of {@link saveCLIPIndex} that performs version checking as
* described in [Note: Handling versioning of embeddings].
*/
const saveCLIPIndexIfNewer = async (index: CLIPIndex) => {
const version = index.version;
if (version < clipIndexingVersion) {
log.info(
`Ignoring remote CLIP index with version ${version} older than what our indexer can produce (${clipIndexingVersion})`,
);
return;
}
return saveCLIPIndex(index);
const gunzip = async (data: Uint8Array) => {
const decompressedStream = new Blob([data])
.stream()
// This code only runs on the desktop app currently, so we can rely on
// the existence of new web features the CompressionStream APIs.
.pipeThrough(new DecompressionStream("gzip"));
return new Response(decompressedStream).text();
};
/**
* Zod schemas for the {@link CLIPIndex} types.
*
* See: [Note: Duplicated between Zod schemas and TS type]
*/
const CLIPIndex = z
.object({
fileID: z.number(),
version: z.number(),
client: z.string(),
embedding: z.array(z.number()),
})
// Retain fields we might not (currently) understand.
.passthrough();
/**
* Save the CLIP index for the given {@link enteFile} on remote so that other
* clients can directly pull it instead of needing to reindex.
*/
export const putCLIPIndex = async (enteFile: EnteFile, clipIndex: CLIPIndex) =>
putEmbedding(enteFile, "onnx-clip", JSON.stringify(clipIndex));

View File

@@ -8,7 +8,6 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import type { EnteFile } from "@/new/photos/types/file";
import log from "@/next/log";
import { Matrix } from "ml-matrix";
import { getSimilarityTransformation } from "similarity-transformation";
import {
@@ -19,7 +18,6 @@ import {
type Matrix as TransformationMatrix,
} from "transformation-matrix";
import type { ImageBitmapAndData } from "./blob";
import { saveFaceCrops } from "./crop";
import {
grayscaleIntMatrixFromNormalized2List,
pixelRGBBilinear,
@@ -36,30 +34,85 @@ export const faceIndexingVersion = 1;
/**
* The faces in a file (and an embedding for each of them).
*
* This interface describes the format of both local and remote face data.
* This interface describes the format of face related data (aka "face index")
* for a file. This is the common subset that is present in both the fields that
* are persisted on remote ({@link RemoteFaceIndex}) and locally
* ({@link LocalFaceIndex}).
*
* - Local face detections and embeddings (collectively called as the face
* index) are generated by the current client when uploading a file (or when
* noticing a file which doesn't yet have a face index), stored in the local
* IndexedDB ("ml/db") and also uploaded (E2EE) to remote.
* - Face detections and embeddings (collectively called as the face index) are
* generated by the current client when uploading a file, or when noticing a
* file which doesn't yet have a face index. They are then uploaded (E2EE) to
* remote, and the relevant bits also saved locally in the ML DB for
* subsequent lookup or clustering.
*
* - Remote embeddings are fetched by subsequent clients to avoid them having to
* reindex (indexing faces is a costly operation, esp for mobile clients).
* - This face index is then fetched by subsequent clients to avoid them having
* to reindex (indexing faces is a costly operation, esp. for mobile clients).
*
* In both these scenarios (whether generated locally or fetched from remote),
* we end up with an face index described by this {@link FaceIndex} interface.
*
* It has a top level envelope with information about the file (in particular
* the primary key {@link fileID}), an inner envelope {@link faceEmbedding} with
* metadata about the indexing, and an array of {@link faces} each containing
* the result of a face detection and an embedding for that detected face.
*
* The word embedding is used to refer two things: The last one (faceEmbedding >
* faces > embedding) is the "actual" embedding, but sometimes we colloquially
* refer to the inner envelope (the "faceEmbedding") also an embedding since a
* file can have other types of embedding (envelopes), e.g. a "clipEmbedding".
* we end up with an some data described by this {@link FaceIndex} interface. We
* modify it slightly, adding envelopes, when saving them locally or uploading
* them to remote - those variations are described by the {@link LocalFaceIndex}
* and {@link RemoteFaceIndex} types respectively.
*/
export interface FaceIndex {
/**
* The width (in px) of the image (file).
*
* Having the image dimensions here is useful since the coordinates inside
* the {@link Face}s are all normalized (0 to 1) to the width and height of
* the image.
*/
width: number;
/**
* The height (in px) of the image (file).
*/
height: number;
/**
* The list of faces (and their embeddings) detected in the file.
*
* Each of the items is a {@link Face}, containing the result of a face
* detection, and an embedding for that detected face.
*/
faces: Face[];
}
export type RemoteFaceIndex = FaceIndex & {
/**
* An integral version number of the indexing algorithm / pipeline.
*
* [Note: Embedding versions]
*
* Embeddings have an associated version so it is possible for us to make
* backward compatible updates to the indexing process on newer clients.
*
* Clients agree out of band what a particular version means, and guarantee
* that an embedding with a particular version will be the same (to epsilon
* cosine similarity) irrespective of the client that indexed the file.
*
* If we bump the version of same model (say when indexing on a newer
* client), we will do it in a manner that older client will be able to
* consume the response. The schema should not change in non-additive
* manners. For example, say if we improve blur detection, older client
* should just consume embeddings with a newer version and not try to index
* the file again locally.
*
* When fetching from remote, if we get an embedding with version that is
* older than the version the client supports, then the client should ignore
* it. This way, the file will get reindexed locally and an embedding with a
* newer version will also get saved to remote.
*
* In the case where the changes are not backward compatible and can only be
* consumed by clients by making code changes, then we will introduce a new
* subtype (top level key) in the derived data.
*/
version: number;
/**
* The UA for the client which generated this embedding.
*/
client: string;
};
export type LocalFaceIndex = FaceIndex & {
/**
* The ID of the {@link EnteFile} whose index this is.
*
@@ -69,36 +122,7 @@ export interface FaceIndex {
* user will get a file entry with a fileID unique to them).
*/
fileID: number;
/**
* The width (in px) of the image (file).
*/
width: number;
/**
* The height (in px) of the image (file).
*/
height: number;
/**
* The "face embedding" for the file.
*
* This is an envelope that contains a list of indexed faces and metadata
* about the indexing.
*/
faceEmbedding: {
/**
* An integral version number of the indexing algorithm / pipeline.
*
* Clients agree out of band what a particular version means. The
* guarantee is that an embedding with a particular version will be the
* same (to negligible floating point epsilons) irrespective of the
* client that indexed the file.
*/
version: number;
/** The UA for the client which generated this embedding. */
client: string;
/** The list of faces (and their embeddings) detected in the file. */
faces: Face[];
};
}
};
/**
* A face detected in a file, and an embedding for this detected face.
@@ -199,63 +223,31 @@ export interface Box {
* This function is the entry point to the face indexing pipeline. The file goes
* through various stages:
*
* 1. Downloading the original if needed.
* 2. Detect faces using ONNX/YOLO
* 3. Align the face rectangles, compute blur.
* 4. Compute embeddings using ONNX/MFNT for the detected face (crop).
* 1. Detect faces using ONNX/YOLO
* 2. Align the face rectangles, compute blur.
* 3. Compute embeddings using ONNX/MFNT for the detected face (crop).
*
* Once all of it is done, it returns the face rectangles and embeddings so that
* they can be saved locally (for offline use), and also uploaded to the user's
* remote storage so that their other devices can download them instead of
* needing to reindex.
*
* As an optimization, it also saves the face crops of the detected faces to the
* local cache (they can be regenerated independently too by using
* {@link regenerateFaceCrops}).
*
* @param enteFile The {@link EnteFile} to index.
*
* @param image The file's contents.
*
* @param electron The {@link MLWorkerElectron} instance that allows us to call
* our Node.js layer for various functionality.
*
* @param userAgent The UA of the client that is doing the indexing (us).
* our Node.js layer to run the ONNX inference.
*/
export const indexFaces = async (
enteFile: EnteFile,
image: ImageBitmapAndData,
{ data: imageData }: ImageBitmapAndData,
electron: MLWorkerElectron,
userAgent: string,
): Promise<FaceIndex> => {
const { bitmap: imageBitmap, data: imageData } = image;
const { width, height } = imageBitmap;
const fileID = enteFile.id;
const faceIndex = {
fileID,
width,
height,
faceEmbedding: {
version: faceIndexingVersion,
client: userAgent,
faces: await indexFaces_(fileID, imageData, electron),
},
};
// This step, saving face crops, is not part of the indexing pipeline;
// we just do it here since we have already have the ImageBitmap at
// hand. Ignore errors that happen during this since it does not impact
// the generated face index.
try {
await saveFaceCrops(imageBitmap, faceIndex);
} catch (e) {
log.error(`Failed to save face crops for file ${fileID}`, e);
}
return faceIndex;
};
): Promise<FaceIndex> => ({
width: imageData.width,
height: imageData.height,
faces: await indexFaces_(enteFile.id, imageData, electron),
});
const indexFaces_ = async (
fileID: number,

View File

@@ -363,8 +363,7 @@ const setInterimScheduledStatus = () => {
let nSyncedFiles = 0,
nTotalFiles = 0;
if (_mlStatusSnapshot && _mlStatusSnapshot.phase != "disabled") {
nSyncedFiles = _mlStatusSnapshot.nSyncedFiles;
nTotalFiles = _mlStatusSnapshot.nTotalFiles;
({ nSyncedFiles, nTotalFiles } = _mlStatusSnapshot);
}
setMLStatusSnapshot({ phase: "scheduled", nSyncedFiles, nTotalFiles });
};
@@ -379,7 +378,7 @@ export const unidentifiedFaceIDs = async (
enteFile: EnteFile,
): Promise<string[]> => {
const index = await faceIndex(enteFile.id);
return index?.faceEmbedding.faces.map((f) => f.faceID) ?? [];
return index?.faces.map((f) => f.faceID) ?? [];
};
/**
@@ -393,7 +392,7 @@ export const regenerateFaceCropsIfNeeded = async (enteFile: EnteFile) => {
const index = await faceIndex(enteFile.id);
if (!index) return false;
const faceIDs = index.faceEmbedding.faces.map((f) => f.faceID);
const faceIDs = index.faces.map((f) => f.faceID);
const cache = await blobCache("face-crops");
for (const id of faceIDs) {
if (!(await cache.has(id))) {

View File

@@ -16,24 +16,32 @@ import {
renderableBlob,
type ImageBitmapAndData,
} from "./blob";
import { indexCLIP, type CLIPIndex } from "./clip";
import { clipIndexingVersion, indexCLIP, type CLIPIndex } from "./clip";
import { saveFaceCrops } from "./crop";
import {
indexableFileIDs,
markIndexingFailed,
saveCLIPIndex,
saveFaceIndex,
saveIndexes,
updateAssumingLocalFiles,
} from "./db";
import { pullFaceEmbeddings, putCLIPIndex, putFaceIndex } from "./embedding";
import { indexFaces, type FaceIndex } from "./face";
import {
fetchDerivedData,
putDerivedData,
type RemoteDerivedData,
} from "./embedding";
import { faceIndexingVersion, indexFaces, type FaceIndex } from "./face";
import type { MLWorkerDelegate, MLWorkerElectron } from "./worker-types";
const idleDurationStart = 5; /* 5 seconds */
const idleDurationMax = 16 * 60; /* 16 minutes */
interface IndexableItem {
/** The {@link EnteFile} to index (potentially). */
enteFile: EnteFile;
/** If the file was uploaded from the current client, then its contents. */
uploadItem: UploadItem | undefined;
/** The existing derived data on remote corresponding to this file. */
remoteDerivedData: RemoteDerivedData | undefined;
}
/**
@@ -47,24 +55,22 @@ interface IndexableItem {
*
* ext. event state then state
* ------------- --------------- --------------
* sync -> "pull" -> "idle"
* sync -> "backfillq" -> "idle"
* upload -> "liveq" -> "idle"
* idleTimeout -> "backfillq" -> "idle"
*
* where:
*
* - "pull": pulling embeddings from remote
* - "liveq": indexing items that are being uploaded
* - "backfillq": indexing unindexed items otherwise
* - "idle": in between state transitions
* - "liveq": indexing items that are being uploaded,
* - "backfillq": fetching remote embeddings of unindexed items, and then
* indexing them if needed,
* - "idle": in between state transitions.
*/
export class MLWorker {
private electron: MLWorkerElectron | undefined;
private delegate: MLWorkerDelegate | undefined;
private userAgent: string | undefined;
private state: "idle" | "pull" | "indexing" = "idle";
private shouldPull = false;
private havePulledAtLeastOnce = false;
private state: "idle" | "indexing" = "idle";
private liveQ: IndexableItem[] = [];
private idleTimeout: ReturnType<typeof setTimeout> | undefined;
private idleDuration = idleDurationStart; /* unit: seconds */
@@ -73,11 +79,11 @@ export class MLWorker {
* Initialize a new {@link MLWorker}.
*
* This is conceptually the constructor, however it is easier to have this
* as a separate function to avoid confounding the comlink types too much.
* as a separate function to avoid complicating the comlink types further.
*
* @param electron The {@link MLWorkerElectron} that allows the worker to
* use the functionality provided by our Node.js layer when running in the
* context of our desktop app
* context of our desktop app.
*
* @param delegate The {@link MLWorkerDelegate} the worker can use to inform
* the main thread of interesting events.
@@ -93,17 +99,14 @@ export class MLWorker {
}
/**
* Pull embeddings from remote, and start backfilling if needed.
* Start backfilling if needed.
*
* This function enqueues the pull and returns immediately without waiting
* for the pull to complete.
*
* While it only triggers a pull, once the pull is done it also checks for
* pending items to backfill. So it implicitly also triggers a backfill
* (which is why call it a less-precise sync instead of pull).
* This function enqueues a backfill attempt and returns immediately without
* waiting for it complete. During a backfill, it will first attempt to
* fetch embeddings for files which don't have that data locally. If we
* fetch and find what we need, we save it locally. Otherwise we index them.
*/
sync() {
this.shouldPull = true;
this.wakeUp();
}
@@ -139,7 +142,10 @@ export class MLWorker {
// live queue is just an optimization: if a file doesn't get indexed via
// the live queue, it'll later get indexed anyway when we backfill.
if (this.liveQ.length < 200) {
this.liveQ.push({ enteFile, uploadItem });
// The file is just being uploaded, and so will not have any
// pre-existing derived data on remote.
const remoteDerivedData = undefined;
this.liveQ.push({ enteFile, uploadItem, remoteDerivedData });
this.wakeUp();
} else {
log.debug(() => "Ignoring upload item since liveQ is full");
@@ -158,7 +164,6 @@ export class MLWorker {
"ml/tick",
{
state: this.state,
shouldSync: this.shouldPull,
liveQ: this.liveQ,
idleDuration: this.idleDuration,
},
@@ -166,46 +171,12 @@ export class MLWorker {
const scheduleTick = () => void setTimeout(() => this.tick(), 0);
// If we've been asked to sync, do that irrespective of anything else.
if (this.shouldPull) {
// Allow this flag to be reset while we're busy pulling (triggering
// another pull when we tick next).
this.shouldPull = false;
this.state = "pull";
try {
const didPull = await pull();
// Mark that we completed once attempt at pulling successfully
// (irrespective of whether or not that got us some data).
this.havePulledAtLeastOnce = true;
// Reset the idle duration if we did pull something.
if (didPull) this.idleDuration = idleDurationStart;
} catch (e) {
log.error("Failed to pull embeddings", e);
}
// Tick again, even if we got an error.
//
// While the backfillQ won't be processed until at least a pull has
// happened once (`havePulledAtLeastOnce`), the liveQ can still be
// processed since these are new files without remote embeddings.
scheduleTick();
return;
}
const liveQ = this.liveQ;
this.liveQ = [];
this.state = "indexing";
// Use the liveQ if present, otherwise get the next batch to backfill,
// but only if we've pulled once from remote successfully (otherwise we
// might end up reindexing files that were already indexed on remote but
// which we didn't know about since pull failed, say, for transient
// network issues).
const items =
liveQ.length > 0
? liveQ
: this.havePulledAtLeastOnce
? await this.backfillQ()
: [];
// Use the liveQ if present, otherwise get the next batch to backfill.
const items = liveQ.length > 0 ? liveQ : await this.backfillQ();
const allSuccess = await indexNextBatch(
items,
@@ -239,40 +210,25 @@ export class MLWorker {
/** Return the next batch of items to backfill (if any). */
async backfillQ() {
const userID = ensure(await getKVN("userID"));
return syncWithLocalFilesAndGetFilesToIndex(userID, 200).then((fs) =>
fs.map((f) => ({ enteFile: f, uploadItem: undefined })),
// Find files that our local DB thinks need syncing.
const filesByID = await syncWithLocalFilesAndGetFilesToIndex(
userID,
200,
);
if (!filesByID.size) return [];
// Fetch their existing derived data (if any).
const derivedDataByID = await fetchDerivedData(filesByID);
// Return files after annotating them with their existing derived data.
return Array.from(filesByID, ([id, file]) => ({
enteFile: file,
uploadItem: undefined,
remoteDerivedData: derivedDataByID.get(id),
}));
}
}
expose(MLWorker);
/**
* Pull embeddings from remote.
*
* Return true atleast one embedding was pulled.
*/
const pull = async () => {
const res = await Promise.allSettled([
pullFaceEmbeddings(),
// TODO-ML: clip-test
// pullCLIPEmbeddings(),
]);
for (const r of res) {
switch (r.status) {
case "fulfilled":
// Return true if any pulled something.
if (r.value) return true;
break;
case "rejected":
// Throw if any failed.
throw r.reason;
}
}
// Return false if neither pulled anything.
return false;
};
/**
* Find out files which need to be indexed. Then index the next batch of them.
*
@@ -301,9 +257,9 @@ const indexNextBatch = async (
// Index, keeping track if any of the items failed.
let allSuccess = true;
for (const { enteFile, uploadItem } of items) {
for (const item of items) {
try {
await index(enteFile, uploadItem, electron, userAgent);
await index(item, electron, userAgent);
delegate?.workerDidProcessFile();
// Possibly unnecessary, but let us drain the microtask queue.
await wait(0);
@@ -330,7 +286,7 @@ const indexNextBatch = async (
const syncWithLocalFilesAndGetFilesToIndex = async (
userID: number,
count: number,
): Promise<EnteFile[]> => {
): Promise<Map<number, EnteFile>> => {
const isIndexable = (f: EnteFile) => f.ownerID == userID;
const localFiles = await getAllLocalFiles();
@@ -346,24 +302,14 @@ const syncWithLocalFilesAndGetFilesToIndex = async (
);
const fileIDsToIndex = await indexableFileIDs(count);
return fileIDsToIndex.map((id) => ensure(localFilesByID.get(id)));
return new Map(
fileIDsToIndex.map((id) => [id, ensure(localFilesByID.get(id))]),
);
};
/**
* Index file, save the persist the results locally, and put them on remote.
*
* @param enteFile The {@link EnteFile} to index.
*
* @param uploadItem If the file is one which is being uploaded from the current
* client, then we will also have access to the file's content. In such cases,
* passing a web {@link File} object will directly use that its data when
* indexing. Otherwise (when this is not provided), the file's contents will be
* downloaded and decrypted from remote.
*
* @param userAgent The UA of the client that is doing the indexing (us).
*
* ---
*
* [Note: ML indexing does more ML]
*
* Nominally, and primarily, indexing a file involves computing its various ML
@@ -371,17 +317,97 @@ const syncWithLocalFilesAndGetFilesToIndex = async (
* the original file in memory, it is a great time to also compute other derived
* data related to the file (instead of re-downloading it again).
*
* So this index function also does things that are not related to ML:
* extracting and updating Exif.
* So this function also does things that are not related to ML and/or indexing:
*
* - Extracting and updating Exif.
* - Saving face crops.
*
* ---
*
* [Note: Transient and permanent indexing failures]
*
* We mark indexing for a file as having failed only if there is a good chance
* that the indexing failed because of some inherent issue with that particular
* file, and not if there were generic failures (like when trying to save the
* indexes to remote).
*
* When we mark it as failed, then a flag is persisted corresponding to this
* file in the ML DB so that it won't get reindexed in future runs. This are
* thus considered as permanent failures.
*
* > We might retry these in future versions if we identify reasons for indexing
* > to fail (it ideally shouldn't) and rectify them.
*
* On the other hand, saving the face index to remote might fail for transient
* issues (network issues, or remote having hiccups). We don't mark a file as
* failed permanently in such cases, so that it gets retried at some point.
* These are considered as transient failures.
*
* However, it is vary hard to pre-emptively enumerate all possible failure
* modes, and there is a the possibility of some non-transient failure getting
* classified as a transient failure and causing the client to try and index the
* same file again and again, when in fact there is a issue specific to that
* file which is preventing the index from being saved. What exactly? We don't
* know, but the possibility exists.
*
* To reduce the chances of this happening, we treat HTTP 4xx responses as
* permanent failures too - there are no known cases where a client retrying a
* 4xx response would work, and there are expected (but rare) cases where a
* client might get a non-retriable 4xx (e.g. if the file has over ~700 faces,
* then remote will return a 413 Request Entity Too Large).
*/
const index = async (
enteFile: EnteFile,
uploadItem: UploadItem | undefined,
{ enteFile, uploadItem, remoteDerivedData }: IndexableItem,
electron: MLWorkerElectron,
userAgent: string,
) => {
const f = fileLogID(enteFile);
const startTime = Date.now();
const fileID = enteFile.id;
// Massage the existing data (if any) that we got from remote to the form
// that the rest of this function operates on.
//
// Discard any existing data that is made by an older indexing pipelines.
// See: [Note: Embedding versions]
const existingRemoteFaceIndex = remoteDerivedData?.parsed?.face;
const existingRemoteCLIPIndex = remoteDerivedData?.parsed?.clip;
let existingFaceIndex: FaceIndex | undefined;
if (
existingRemoteFaceIndex &&
existingRemoteFaceIndex.version >= faceIndexingVersion
) {
const { width, height, faces } = existingRemoteFaceIndex;
existingFaceIndex = { width, height, faces };
}
let existingCLIPIndex: CLIPIndex | undefined;
if (
existingRemoteCLIPIndex &&
existingRemoteCLIPIndex.version >= clipIndexingVersion
) {
const { embedding } = existingRemoteCLIPIndex;
existingCLIPIndex = { embedding };
}
// See if we already have all the derived data fields that we need. If so,
// just update our local db and return.
if (existingFaceIndex && existingCLIPIndex) {
try {
await saveIndexes(
{ fileID, ...existingFaceIndex },
{ fileID, ...existingCLIPIndex },
);
} catch (e) {
log.error(`Failed to save indexes data for ${f}`, e);
throw e;
}
return;
}
// There is at least one derived data type that still needs to be indexed.
const imageBlob = await renderableBlob(enteFile, uploadItem, electron);
@@ -392,116 +418,98 @@ const index = async (
// If we cannot get the raw image data for the file, then retrying again
// won't help. It'd only make sense to retry later if modify
// `renderableBlob` to be do something different for this type of file.
//
// See: [Note: Transient and permanent indexing failures]
log.error(`Failed to get image data for indexing ${f}`, e);
await markIndexingFailed(enteFile.id);
throw e;
}
const res = await Promise.allSettled([
_indexFace(f, enteFile, image, electron, userAgent),
// TODO-ML: clip-test
// _indexCLIP(f, enteFile, image, electron, userAgent),
]);
image.bitmap.close();
try {
let faceIndex: FaceIndex;
let clipIndex: CLIPIndex;
const msg: string[] = [];
for (const r of res) {
if (r.status == "rejected") throw r.reason;
else msg.push(r.value);
const startTime = Date.now();
try {
[faceIndex, clipIndex] = await Promise.all([
existingFaceIndex ?? indexFaces(enteFile, image, electron),
existingCLIPIndex ?? indexCLIP(image, electron),
]);
} catch (e) {
// See: [Note: Transient and permanent indexing failures]
log.error(`Failed to index ${f}`, e);
await markIndexingFailed(enteFile.id);
throw e;
}
log.debug(() => {
const ms = Date.now() - startTime;
const msg = [];
if (!existingFaceIndex) msg.push(`${faceIndex.faces.length} faces`);
if (!existingCLIPIndex) msg.push("clip");
return `Indexed ${msg.join(" and ")} in ${f} (${ms} ms)`;
});
const remoteFaceIndex = existingRemoteFaceIndex ?? {
version: faceIndexingVersion,
client: userAgent,
...faceIndex,
};
const remoteCLIPIndex = existingRemoteCLIPIndex ?? {
version: clipIndexingVersion,
client: userAgent,
...clipIndex,
};
// Perform an "upsert" by using the existing raw data we got from the
// remote as the base, and inserting or overwriting any newly indexed
// parts. See: [Note: Preserve unknown derived data fields].
const existingRawDerivedData = remoteDerivedData?.raw ?? {};
const rawDerivedData = {
...existingRawDerivedData,
face: remoteFaceIndex,
clip: remoteCLIPIndex,
};
log.debug(() => ["Uploading derived data", rawDerivedData]);
try {
await putDerivedData(enteFile, rawDerivedData);
} catch (e) {
// See: [Note: Transient and permanent indexing failures]
log.error(`Failed to put derived data for ${f}`, e);
if (isHTTP4xxError(e)) await markIndexingFailed(enteFile.id);
throw e;
}
try {
await saveIndexes(
{ fileID, ...faceIndex },
{ fileID, ...clipIndex },
);
} catch (e) {
// Not sure if DB failures should be considered permanent or
// transient. There isn't a known case where writing to the local
// indexedDB would fail.
log.error(`Failed to save indexes for ${f}`, e);
throw e;
}
// This step, saving face crops, is conceptually not part of the
// indexing pipeline; we just do it here since we have already have the
// ImageBitmap at hand. Ignore errors that happen during this since it
// does not impact the generated face index.
if (!existingFaceIndex) {
try {
await saveFaceCrops(image.bitmap, faceIndex);
} catch (e) {
log.error(`Failed to save face crops for ${f}`, e);
}
}
} finally {
image.bitmap.close();
}
log.debug(() => {
const ms = Date.now() - startTime;
return `Indexed ${msg.join(" and ")} in ${f} (${ms} ms)`;
});
};
const _indexFace = async (
f: string,
enteFile: EnteFile,
image: ImageBitmapAndData,
electron: MLWorkerElectron,
userAgent: string,
) => {
let faceIndex: FaceIndex;
try {
faceIndex = await indexFaces(enteFile, image, electron, userAgent);
} catch (e) {
log.error(`Failed to index faces in ${f}`, e);
await markIndexingFailed(enteFile.id);
throw e;
}
// [Note: Transient and permanent indexing failures]
//
// Generally speaking, we mark indexing for a file as having failed only if
// the indexing itself failed, not if there were subsequent failures (like
// when trying to put the result to remote or save it to the local face DB).
//
// When we mark it as failed, then a flag is persisted corresponding to this
// file in the ML DB so that it won't get reindexed in future runs. This are
// thus considered as permanent failures.
//
// > We might retry in future versions if we identify reasons for indexing
// > to fail (it shouldn't) and rectify them.
//
// On the other hand, saving the face index to remote might fail for
// transient issues (network issues, or remote having hiccups). We don't
// mark a file as failed permanently in such cases, so that it gets retried
// at some point. These are considered as transient failures.
//
// However, this opens the possibility of some non-transient failure getting
// classified as a transient failure and causing the client to try and index
// the same file again and again, when in fact there is a issue specific to
// that file which is preventing the index from being saved. What exactly?
// We don't know, but the possibility exists.
//
// To reduce the chances of this happening, we treat HTTP 4xx responses as
// permanent failures too - there are no known cases where a client retrying
// a 4xx response would work, and there are known (but rare) cases where a
// client might get a 4xx (e.g. if the file has over ~700 faces, then remote
// will return a 413 Request Entity Too Large).
try {
await putFaceIndex(enteFile, faceIndex);
await saveFaceIndex(faceIndex);
} catch (e) {
log.error(`Failed to put/save face index for ${f}`, e);
if (isHTTP4xxError(e)) await markIndexingFailed(enteFile.id);
throw e;
}
// A message for debug printing.
return `${faceIndex.faceEmbedding.faces.length} faces`;
};
// TODO-ML: clip-test export
export const _indexCLIP = async (
f: string,
enteFile: EnteFile,
image: ImageBitmapAndData,
electron: MLWorkerElectron,
userAgent: string,
) => {
let clipIndex: CLIPIndex;
try {
clipIndex = await indexCLIP(enteFile, image, electron, userAgent);
} catch (e) {
log.error(`Failed to index CLIP in ${f}`, e);
await markIndexingFailed(enteFile.id);
throw e;
}
// See: [Note: Transient and permanent indexing failures]
try {
await putCLIPIndex(enteFile, clipIndex);
await saveCLIPIndex(clipIndex);
} catch (e) {
log.error(`Failed to put/save CLIP index for ${f}`, e);
if (isHTTP4xxError(e)) await markIndexingFailed(enteFile.id);
throw e;
}
// A message for debug printing.
return "clip";
};

View File

@@ -9,9 +9,9 @@
import * as libsodium from "@ente/shared/crypto/internal/libsodium";
/**
* Encrypt arbitrary metadata associated with a file using its key.
* Encrypt arbitrary metadata associated with a file using the file's key.
*
* @param metadata The metadata (string) to encrypt.
* @param metadata The metadata (bytes) to encrypt.
*
* @param keyB64 Base64 encoded string containing the encryption key (this'll
* generally be the file's key).
@@ -20,16 +20,10 @@ import * as libsodium from "@ente/shared/crypto/internal/libsodium";
* decryption header.
*/
export const encryptFileMetadata = async (
metadata: string,
metadata: Uint8Array,
keyB64: string,
): Promise<{ encryptedMetadataB64: string; decryptionHeaderB64: string }> => {
const encoder = new TextEncoder();
const encodedMetadata = encoder.encode(metadata);
const { file } = await libsodium.encryptChaChaOneShot(
encodedMetadata,
keyB64,
);
) => {
const { file } = await libsodium.encryptChaChaOneShot(metadata, keyB64);
return {
encryptedMetadataB64: await libsodium.toB64(file.encryptedData),
decryptionHeaderB64: file.decryptionHeader,
@@ -37,7 +31,7 @@ export const encryptFileMetadata = async (
};
/**
* Decrypt arbitrary metadata associated with a file using the its key.
* Decrypt arbitrary metadata associated with a file using the file's key.
*
* @param encryptedMetadataB64 Base64 encoded string containing the encrypted
* data.
@@ -48,18 +42,15 @@ export const encryptFileMetadata = async (
* @param keyB64 Base64 encoded string containing the encryption key (this'll
* generally be the file's key).
*
* @returns The decrypted utf-8 string.
* @returns The decrypted metadata bytes.
*/
export const decryptFileMetadata = async (
encryptedMetadataB64: string,
decryptionHeaderB64: string,
keyB64: string,
) => {
const metadataBytes = await libsodium.decryptChaChaOneShot(
) =>
libsodium.decryptChaChaOneShot(
await libsodium.fromB64(encryptedMetadataB64),
await libsodium.fromB64(decryptionHeaderB64),
keyB64,
);
const textDecoder = new TextDecoder();
return textDecoder.decode(metadataBytes);
};

View File

@@ -143,6 +143,13 @@ export const openBlobCache = async (
* To convert from a Uint8Array/ArrayBuffer/Blob to a ReadableStream
*
* new Response(array).body
* OR
* new Blob([...]).stream()
*
* To convert from a stream to a string (or JSON)
*
* await new Response(stream).text()
* await new Response(stream).json()
*
* Refs:
* - https://github.com/yigitunallar/arraybuffer-vs-blob