This commit is contained in:
Manav Rathi
2024-07-16 15:17:51 +05:30
parent f6715ad9c2
commit 0dc06c430a
2 changed files with 99 additions and 236 deletions

View File

@@ -15,6 +15,30 @@ import {
} from "types/entity";
import { getLatestVersionEntities } from "utils/entity";
/**
* The maximum number of items to fetch in a single diff
*
* [Note: Limit of returned items in /diff requests]
*
* The various GET /diff API methods, which tell the client what all has changed
* since a timestamp (provided by the client) take a limit parameter.
*
* These diff API calls return all items whose updated at is greater
* (non-inclusive) than the timestamp we provide. So there is no mechanism for
* pagination of items which have the same exact updated at.
*
* Conceptually, it may happen that there are more items than the limit we've
* provided, but there are practical safeguards.
*
* For file diff, the limit is advisory, and remote may return less, equal or
* more items than the provided limit. The scenario where it returns more is
* when more files than the limit have the same updated at. Theoretically it
* would make the diff response unbounded, however in practice file
* modifications themselves are all batched. Even if the user were to select all
* the files in their library and updates them all in one go in the UI, their
* client app is required to use batched API calls to make those updates, and
* each of those batches would get distinct updated at.
*/
const DIFF_LIMIT = 500;
const ENTITY_TABLES: Record<EntityType, string> = {

View File

@@ -8,21 +8,11 @@ import {
encryptFileMetadata,
} from "@/new/shared/crypto/ente";
import { authenticatedRequestHeaders, ensureOk } from "@/next/http";
import { getKV, setKV } from "@/next/kv";
import log from "@/next/log";
import { apiURL } from "@/next/origins";
import { z } from "zod";
import {
clipIndexingVersion,
type CLIPIndex,
type RemoteCLIPIndex,
} from "./clip";
import { saveCLIPIndex, saveFaceIndex } from "./db";
import {
faceIndexingVersion,
type FaceIndex,
type RemoteFaceIndex,
} from "./face";
import { type RemoteCLIPIndex } from "./clip";
import { type RemoteFaceIndex } from "./face";
/**
* The embeddings that we (the current client) knows how to handle.
@@ -112,6 +102,23 @@ const RemoteEmbedding = z.object({
type RemoteEmbedding = z.infer<typeof RemoteEmbedding>;
/**
* The decrypted payload of a {@link RemoteEmbedding} for the "combined"
* {@link EmbeddingModel}.
*
* [Note: Preserve unknown derived data fields]
*
* There is one entry for each of the embedding types that the current client
* knows about. However, there might be other fields apart from the known ones
* at the top level, and we need to ensure that we preserve them verbatim when
* trying use {@link putDerivedData} with an {@link RemoteDerivedData} obtained
* from remote as the base, with locally indexed additions.
*/
export type RemoteDerivedData = Record<string, unknown> & {
face: RemoteFaceIndex;
clip: RemoteCLIPIndex;
};
/**
* Fetch new or updated embeddings from remote and save them locally.
*
@@ -130,7 +137,7 @@ type RemoteEmbedding = z.infer<typeof RemoteEmbedding>;
*
* @returns true if at least one embedding was pulled, false otherwise.
*/
const pullEmbeddings = async (
export const getDerivedData = async (
model: EmbeddingModel,
save: (decryptedEmbedding: string) => Promise<void>,
) => {
@@ -197,57 +204,6 @@ const pullEmbeddings = async (
return didPull;
};
/**
* The updatedAt of the most recent {@link RemoteEmbedding} for {@link model}
* we've retrieved from remote.
*
* Returns 0 if there is no such embedding.
*
* This value is persisted to local storage. To update it, use
* {@link saveEmbeddingSyncTime}.
*/
const embeddingSyncTime = async (model: EmbeddingModel) =>
parseInt((await getKV("embeddingSyncTime:" + model)) ?? "0");
/** Sibling of {@link embeddingSyncTime}. */
const saveEmbeddingSyncTime = async (t: number, model: EmbeddingModel) =>
setKV("embeddingSyncTime:" + model, `${t}`);
/**
* The maximum number of items to fetch in a single GET /embeddings/diff
*
* [Note: Limit of returned items in /diff requests]
*
* The various GET /diff API methods, which tell the client what all has changed
* since a timestamp (provided by the client) take a limit parameter.
*
* These diff API calls return all items whose updated at is greater
* (non-inclusive) than the timestamp we provide. So there is no mechanism for
* pagination of items which have the same exact updated at. Conceptually, it
* may happen that there are more items than the limit we've provided.
*
* The behaviour of this limit is different for file diff and embeddings diff.
*
* - For file diff, the limit is advisory, and remote may return less, equal
* or more items than the provided limit. The scenario where it returns more
* is when more files than the limit have the same updated at. Theoretically
* it would make the diff response unbounded, however in practice file
* modifications themselves are all batched. Even if the user selects all
* the files in their library and updates them all in one go in the UI,
* their client app must use batched API calls to make those updates, and
* each of those batches would get distinct updated at.
*
* - For embeddings diff, there are no bulk updates and this limit is enforced
* as a maximum. While theoretically it is possible for an arbitrary number
* of files to have the same updated at, in practice it is not possible with
* the current set of APIs where clients PUT individual embeddings (the
* updated at is a server timestamp). And even if somehow a large number of
* files get the same updated at and thus get truncated in the response, it
* won't lead to any data loss, the client which requested that particular
* truncated diff will just regenerate them.
*/
const diffLimit = 500;
/**
* GET embeddings for the given model that have been updated {@link sinceTime}.
*
@@ -282,6 +238,23 @@ const getEmbeddingsDiff = async (
.diff;
};
/**
* Update the combined derived data stored for given {@link enteFile} on remote.
* This allows other clients to directly pull the derived data instead of
* needing to re-index.
*
* The data on remote will be replaced unconditionally, and it is up to the
* client (us) to ensure that we preserve the parts of the pre-existing derived
* data (if any) that we did not understand or touch.
*
* See: [Note: Preserve unknown derived data fields].
*/
export const putDerivedData = async (
enteFile: EnteFile,
derivedData: RemoteDerivedData,
) =>
putEmbedding(enteFile, "combined", await gzip(JSON.stringify(derivedData)));
/**
* Upload an embedding to remote.
*
@@ -316,117 +289,6 @@ const putEmbedding = async (
ensureOk(res);
};
/**
* Upload an embedding to remote.
*
* This function will save or update the given embedding as the latest embedding
* associated with the given {@link enteFile} for {@link model}.
*
* @param enteFile {@link EnteFile} to which this embedding relates to.
*
* @param model The {@link EmbeddingModel} which we are uploading.
*
* @param embedding String representation of the embedding. The exact contents
* of the embedding are model specific (usually this is the JSON string).
*/
const putEmbeddingString = async (
enteFile: EnteFile,
model: EmbeddingModel,
embedding: string,
) => putEmbedding(enteFile, model, new TextEncoder().encode(embedding));
// MARK: - Combined
/**
* The decrypted payload of a {@link RemoteEmbedding} for the "combined"
* {@link EmbeddingModel}.
*
* [Note: Preserve unknown derived data fields]
*
* There is one entry for each of the embedding types that the current client
* knows about. However, there might be other fields apart from the known ones
* at the top level, and we need to ensure that we preserve them verbatim when
* trying use {@link putDerivedData} with an {@link RemoteDerivedData} obtained
* from remote as the base, with locally indexed additions.
*/
export type RemoteDerivedData = Record<string, unknown> & {
face: RemoteFaceIndex;
clip: RemoteCLIPIndex;
};
/**
* Update the combined derived data stored for given {@link enteFile} on remote.
* This allows other clients to directly pull the derived data instead of
* needing to re-index.
*
* The data on remote will be replaced unconditionally, and it is up to the
* client (us) to ensure that we preserve the parts of the pre-existing derived
* data (if any) that we did not understand or touch.
*
* See: [Note: Preserve unknown derived data fields].
*/
export const putDerivedData = async (
enteFile: EnteFile,
derivedData: RemoteDerivedData,
) =>
putEmbedding(enteFile, "combined", await gzip(JSON.stringify(derivedData)));
/**
* Compress the given {@link string} using "gzip" and return the resultant
* bytes.
*
* This is syntactic sugar to deal with the string/blob/stream/bytes
* conversions, but it should not be taken as an abstraction layer. If your code
* can directly use a ReadableStream, then then data -> stream -> data round
* trip is unnecessary.
*/
const gzip = async (string: string) => {
const compressedStream = new Blob([string])
.stream()
// This code only runs on the desktop app currently, so we can rely on
// the existence of the CompressionStream API.
.pipeThrough(new CompressionStream("gzip"));
return new Uint8Array(await new Response(compressedStream).arrayBuffer());
};
// MARK: - Face
/**
* Fetch new or updated face embeddings from remote and save them locally.
*
* It takes no parameters since it saves the last sync time in local storage.
*
* This function should be called only after we have synced files with remote.
* See: [Note: Ignoring embeddings for unknown files].
*
* @returns true if at least one embedding was pulled, false otherwise.
*/
export const pullFaceEmbeddings = () =>
pullEmbeddings("file-ml-clip-face", (jsonString: string) =>
// eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error, @typescript-eslint/ban-ts-comment
// @ts-ignore TODO: There is no error here, but this file is imported by
// one of our packages that doesn't have strict mode enabled yet,
// causing a spurious error to be emitted in that context.
saveFaceIndexIfNewer(FaceIndex.parse(JSON.parse(jsonString))),
);
/**
* Save the given {@link faceIndex} locally if it is newer than the one we have.
*
* This is a variant of {@link saveFaceIndex} that performs version checking as
* described in [Note: Handling versioning of embeddings].
*/
const saveFaceIndexIfNewer = async (index: FaceIndex) => {
const version = index.faceEmbedding.version;
if (version < faceIndexingVersion) {
log.info(
`Ignoring remote face index with version ${version} older than what our indexer can produce (${faceIndexingVersion})`,
);
return;
}
return saveFaceIndex(index);
};
/**
* Zod schemas for the {@link RemoteFaceIndex} type.
*
@@ -454,6 +316,8 @@ const saveFaceIndexIfNewer = async (index: FaceIndex) => {
const RemoteFaceIndex = z.object({
version: z.number(),
client: z.string(),
width: z.number(),
height: z.number(),
faces: z.array(
z.object({
faceID: z.string(),
@@ -479,70 +343,45 @@ const RemoteFaceIndex = z.object({
});
/**
* Save the face index for the given {@link enteFile} on remote so that other
* clients can directly pull it instead of needing to reindex.
*/
export const putFaceIndex = async (enteFile: EnteFile, faceIndex: FaceIndex) =>
putEmbeddingString(
enteFile,
"file-ml-clip-face",
JSON.stringify(faceIndex),
);
// MARK: - CLIP
/**
* Fetch new or updated CLIP embeddings from remote and save them locally.
*
* See {@link pullFaceEmbeddings} for a sibling function with more comprehensive
* documentation.
*
* @returns true if at least one embedding was pulled, false otherwise.
*/
export const pullCLIPEmbeddings = () =>
pullEmbeddings("onnx-clip", (jsonString: string) =>
// eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error, @typescript-eslint/ban-ts-comment
// @ts-ignore TODO: There is no error here, but this file is imported by
// one of our packages that doesn't have strict mode enabled yet,
// causing a spurious error to be emitted in that context.
saveCLIPIndexIfNewer(CLIPIndex.parse(JSON.parse(jsonString))),
);
/**
* Save the given {@link clipIndex} locally if it is newer than the one we have.
*
* This is a variant of {@link saveCLIPIndex} that performs version checking as
* described in [Note: Handling versioning of embeddings].
*/
const saveCLIPIndexIfNewer = async (index: CLIPIndex) => {
const version = index.version;
if (version < clipIndexingVersion) {
log.info(
`Ignoring remote CLIP index with version ${version} older than what our indexer can produce (${clipIndexingVersion})`,
);
return;
}
return saveCLIPIndex(index);
};
/**
* Zod schemas for the {@link CLIPIndex} types.
* Zod schemas for the {@link RemoteCLIPIndex} types.
*
* See: [Note: Duplicated Zod schema and TypeScript type]
*/
const CLIPIndex = z
.object({
fileID: z.number(),
version: z.number(),
client: z.string(),
embedding: z.array(z.number()),
})
// Retain fields we might not (currently) understand.
.passthrough();
const RemoteCLIPIndex = z.object({
version: z.number(),
client: z.string(),
embedding: z.array(z.number()),
});
// MARK: - GZIP
/**
* Save the CLIP index for the given {@link enteFile} on remote so that other
* clients can directly pull it instead of needing to reindex.
* Compress the given {@link string} using "gzip" and return the resultant
* bytes. See {@link gunzip} for the reverse operation.
*
* This is syntactic sugar to deal with the string/blob/stream/bytes
* conversions, but it should not be taken as an abstraction layer. If your code
* can directly use a ReadableStream, then then data -> stream -> data round
* trip is unnecessary.
*/
export const putCLIPIndex = async (enteFile: EnteFile, clipIndex: CLIPIndex) =>
putEmbeddingString(enteFile, "onnx-clip", JSON.stringify(clipIndex));
const gzip = async (string: string) => {
const compressedStream = new Blob([string])
.stream()
// This code only runs on the desktop app currently, so we can rely on
// the existence of new web features the CompressionStream APIs.
.pipeThrough(new CompressionStream("gzip"));
return new Uint8Array(await new Response(compressedStream).arrayBuffer());
};
/**
* Decompress the given "gzip" compressed {@link data} and return the resultant
* string. See {@link gzip} for the reverse operation.
*/
const gunzip = async (data: Uint8Array) => {
const decompressedStream = new Blob([data])
.stream()
// This code only runs on the desktop app currently, so we can rely on
// the existence of new web features the CompressionStream APIs.
.pipeThrough(new DecompressionStream("gzip"));
return new Response(decompressedStream).text();
};