[desktop] Allow live indexing during uploads (#2361)
Saves on an unnecessary download. With this final piece in place, the face indexing part is feature complete.
This commit is contained in:
@@ -149,7 +149,7 @@ export const WatchFolder: React.FC<WatchFolderProps> = ({ open, onClose }) => {
|
||||
};
|
||||
|
||||
const Title_ = styled("div")`
|
||||
padding: 32px 16px 16px 24px;
|
||||
padding: 16px 12px 16px 16px;
|
||||
`;
|
||||
|
||||
interface WatchList {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { FILE_TYPE } from "@/media/file-type";
|
||||
import { potentialFileTypeFromExtension } from "@/media/live-photo";
|
||||
import { getLocalFiles } from "@/new/photos/services/files";
|
||||
import { onUpload as onUploadML } from "@/new/photos/services/ml";
|
||||
import { indexNewUpload } from "@/new/photos/services/ml";
|
||||
import type { UploadItem } from "@/new/photos/services/upload/types";
|
||||
import {
|
||||
RANDOM_PERCENTAGE_PROGRESS_FOR_PUT,
|
||||
@@ -614,11 +614,11 @@ class UploadManager {
|
||||
UPLOAD_RESULT.UPLOADED_WITH_STATIC_THUMBNAIL,
|
||||
].includes(uploadResult)
|
||||
) {
|
||||
const uploadItem =
|
||||
uploadableItem.uploadItem ??
|
||||
uploadableItem.livePhotoAssets.image;
|
||||
try {
|
||||
let file: File | undefined;
|
||||
const uploadItem =
|
||||
uploadableItem.uploadItem ??
|
||||
uploadableItem.livePhotoAssets.image;
|
||||
if (uploadItem) {
|
||||
if (uploadItem instanceof File) {
|
||||
file = uploadItem;
|
||||
@@ -635,10 +635,17 @@ class UploadManager {
|
||||
enteFile: decryptedFile,
|
||||
localFile: file,
|
||||
});
|
||||
onUploadML(decryptedFile, file);
|
||||
} catch (e) {
|
||||
log.warn("Ignoring error in fileUploaded handlers", e);
|
||||
}
|
||||
if (
|
||||
uploadItem &&
|
||||
(uploadResult == UPLOAD_RESULT.UPLOADED ||
|
||||
uploadResult ==
|
||||
UPLOAD_RESULT.UPLOADED_WITH_STATIC_THUMBNAIL)
|
||||
) {
|
||||
indexNewUpload(decryptedFile, uploadItem);
|
||||
}
|
||||
this.updateExistingFiles(decryptedFile);
|
||||
}
|
||||
await this.watchFolderCallback(
|
||||
|
||||
@@ -9,7 +9,7 @@ import type {
|
||||
LivePhotoSourceURL,
|
||||
SourceURLs,
|
||||
} from "@/new/photos/types/file";
|
||||
import { getRenderableImage } from "@/new/photos/utils/file";
|
||||
import { renderableImageBlob } from "@/new/photos/utils/file";
|
||||
import { isDesktop } from "@/next/app";
|
||||
import { blobCache, type BlobCache } from "@/next/blob-cache";
|
||||
import log from "@/next/log";
|
||||
@@ -458,7 +458,7 @@ async function getRenderableFileURL(
|
||||
|
||||
switch (file.metadata.fileType) {
|
||||
case FILE_TYPE.IMAGE: {
|
||||
const convertedBlob = await getRenderableImage(
|
||||
const convertedBlob = await renderableImageBlob(
|
||||
file.metadata.title,
|
||||
fileBlob,
|
||||
);
|
||||
@@ -511,7 +511,7 @@ async function getRenderableLivePhotoURL(
|
||||
const getRenderableLivePhotoImageURL = async () => {
|
||||
try {
|
||||
const imageBlob = new Blob([livePhoto.imageData]);
|
||||
const convertedImageBlob = await getRenderableImage(
|
||||
const convertedImageBlob = await renderableImageBlob(
|
||||
livePhoto.imageFileName,
|
||||
imageBlob,
|
||||
);
|
||||
|
||||
107
web/packages/new/photos/services/ml/bitmap.ts
Normal file
107
web/packages/new/photos/services/ml/bitmap.ts
Normal file
@@ -0,0 +1,107 @@
|
||||
import { FILE_TYPE } from "@/media/file-type";
|
||||
import { decodeLivePhoto } from "@/media/live-photo";
|
||||
import { basename } from "@/next/file";
|
||||
import { ensure } from "@/utils/ensure";
|
||||
import type { EnteFile } from "../../types/file";
|
||||
import { renderableImageBlob } from "../../utils/file";
|
||||
import { readStream } from "../../utils/native-stream";
|
||||
import DownloadManager from "../download";
|
||||
import type { UploadItem } from "../upload/types";
|
||||
import type { MLWorkerElectron } from "./worker-electron";
|
||||
|
||||
/**
|
||||
* Return a {@link ImageBitmap} that downloads the source image corresponding to
|
||||
* {@link enteFile} from remote.
|
||||
*
|
||||
* - For images the original is used.
|
||||
* - For live photos the original image component is used.
|
||||
* - For videos the thumbnail is used.
|
||||
*/
|
||||
export const renderableImageBitmap = async (enteFile: EnteFile) => {
|
||||
const fileType = enteFile.metadata.fileType;
|
||||
let blob: Blob | undefined;
|
||||
if (fileType == FILE_TYPE.VIDEO) {
|
||||
const thumbnailData = await DownloadManager.getThumbnail(enteFile);
|
||||
blob = new Blob([ensure(thumbnailData)]);
|
||||
} else {
|
||||
blob = await fetchRenderableBlob(enteFile);
|
||||
}
|
||||
return createImageBitmap(ensure(blob));
|
||||
};
|
||||
|
||||
/**
|
||||
* Variant of {@link renderableImageBitmap} that uses the given
|
||||
* {@link uploadItem} to construct the image bitmap instead of downloading the
|
||||
* original from remote.
|
||||
*
|
||||
* For videos the thumbnail is still downloaded from remote.
|
||||
*/
|
||||
export const renderableUploadItemImageBitmap = async (
|
||||
enteFile: EnteFile,
|
||||
uploadItem: UploadItem,
|
||||
electron: MLWorkerElectron,
|
||||
) => {
|
||||
const fileType = enteFile.metadata.fileType;
|
||||
let blob: Blob | undefined;
|
||||
if (fileType == FILE_TYPE.VIDEO) {
|
||||
const thumbnailData = await DownloadManager.getThumbnail(enteFile);
|
||||
blob = new Blob([ensure(thumbnailData)]);
|
||||
} else {
|
||||
const file = await readNonVideoUploadItem(uploadItem, electron);
|
||||
blob = await renderableImageBlob(enteFile.metadata.title, file);
|
||||
}
|
||||
return createImageBitmap(ensure(blob));
|
||||
};
|
||||
|
||||
/**
|
||||
* Read the given {@link uploadItem} into an in-memory representation.
|
||||
*
|
||||
* See: [Note: Reading a UploadItem]
|
||||
*
|
||||
* @param uploadItem An {@link UploadItem} which we are trying to index. The
|
||||
* code calling us guarantees that this function will not be called for videos.
|
||||
*
|
||||
* @returns a web {@link File} that can be used to access the upload item's
|
||||
* contents.
|
||||
*/
|
||||
const readNonVideoUploadItem = async (
|
||||
uploadItem: UploadItem,
|
||||
electron: MLWorkerElectron,
|
||||
): Promise<File> => {
|
||||
if (typeof uploadItem == "string" || Array.isArray(uploadItem)) {
|
||||
const { response, lastModifiedMs } = await readStream(
|
||||
electron,
|
||||
uploadItem,
|
||||
);
|
||||
const path = typeof uploadItem == "string" ? uploadItem : uploadItem[1];
|
||||
// This function will not be called for videos, and for images
|
||||
// it is reasonable to read the entire stream into memory here.
|
||||
return new File([await response.arrayBuffer()], basename(path), {
|
||||
lastModified: lastModifiedMs,
|
||||
});
|
||||
} else {
|
||||
if (uploadItem instanceof File) {
|
||||
return uploadItem;
|
||||
} else {
|
||||
return uploadItem.file;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const fetchRenderableBlob = async (enteFile: EnteFile) => {
|
||||
const fileStream = await DownloadManager.getFile(enteFile);
|
||||
const fileBlob = await new Response(fileStream).blob();
|
||||
const fileType = enteFile.metadata.fileType;
|
||||
if (fileType == FILE_TYPE.IMAGE) {
|
||||
return renderableImageBlob(enteFile.metadata.title, fileBlob);
|
||||
} else if (fileType == FILE_TYPE.LIVE_PHOTO) {
|
||||
const { imageFileName, imageData } = await decodeLivePhoto(
|
||||
enteFile.metadata.title,
|
||||
fileBlob,
|
||||
);
|
||||
return renderableImageBlob(imageFileName, new Blob([imageData]));
|
||||
} else {
|
||||
// A layer above us should've already filtered these out.
|
||||
throw new Error(`Cannot index unsupported file type ${fileType}`);
|
||||
}
|
||||
};
|
||||
@@ -1,47 +0,0 @@
|
||||
import { FILE_TYPE } from "@/media/file-type";
|
||||
import { decodeLivePhoto } from "@/media/live-photo";
|
||||
import { ensure } from "@/utils/ensure";
|
||||
import type { EnteFile } from "../../types/file";
|
||||
import { getRenderableImage } from "../../utils/file";
|
||||
import DownloadManager from "../download";
|
||||
|
||||
/**
|
||||
* Return a "renderable" image blob, using {@link file} if present otherwise
|
||||
* downloading the source image corresponding to {@link enteFile} from remote.
|
||||
*
|
||||
* For videos their thumbnail is used.
|
||||
*/
|
||||
export const renderableImageBlob = async (
|
||||
enteFile: EnteFile,
|
||||
file?: File | undefined,
|
||||
) => {
|
||||
const fileType = enteFile.metadata.fileType;
|
||||
if (fileType == FILE_TYPE.VIDEO) {
|
||||
const thumbnailData = await DownloadManager.getThumbnail(enteFile);
|
||||
return new Blob([ensure(thumbnailData)]);
|
||||
} else {
|
||||
return ensure(
|
||||
file
|
||||
? await getRenderableImage(enteFile.metadata.title, file)
|
||||
: await fetchRenderableBlob(enteFile),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const fetchRenderableBlob = async (enteFile: EnteFile) => {
|
||||
const fileStream = await DownloadManager.getFile(enteFile);
|
||||
const fileBlob = await new Response(fileStream).blob();
|
||||
const fileType = enteFile.metadata.fileType;
|
||||
if (fileType == FILE_TYPE.IMAGE) {
|
||||
return getRenderableImage(enteFile.metadata.title, fileBlob);
|
||||
} else if (fileType == FILE_TYPE.LIVE_PHOTO) {
|
||||
const { imageFileName, imageData } = await decodeLivePhoto(
|
||||
enteFile.metadata.title,
|
||||
fileBlob,
|
||||
);
|
||||
return getRenderableImage(imageFileName, new Blob([imageData]));
|
||||
} else {
|
||||
// A layer above us should've already filtered these out.
|
||||
throw new Error(`Cannot index unsupported file type ${fileType}`);
|
||||
}
|
||||
};
|
||||
@@ -1,7 +1,7 @@
|
||||
import { blobCache } from "@/next/blob-cache";
|
||||
import { ensure } from "@/utils/ensure";
|
||||
import type { EnteFile } from "../../types/file";
|
||||
import { renderableImageBlob } from "./blob";
|
||||
import { renderableImageBitmap } from "./bitmap";
|
||||
import { type Box, type FaceIndex } from "./face";
|
||||
import { clamp } from "./image";
|
||||
|
||||
@@ -26,8 +26,7 @@ export const regenerateFaceCrops = async (
|
||||
enteFile: EnteFile,
|
||||
faceIndex: FaceIndex,
|
||||
) => {
|
||||
const imageBitmap =
|
||||
await renderableImageBlob(enteFile).then(createImageBitmap);
|
||||
const imageBitmap = await renderableImageBitmap(enteFile);
|
||||
|
||||
try {
|
||||
await saveFaceCrops(imageBitmap, faceIndex);
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
|
||||
import type { EnteFile } from "@/new/photos/types/file";
|
||||
import log from "@/next/log";
|
||||
import { workerBridge } from "@/next/worker/worker-bridge";
|
||||
import { ensure } from "@/utils/ensure";
|
||||
import { Matrix } from "ml-matrix";
|
||||
import { getSimilarityTransformation } from "similarity-transformation";
|
||||
@@ -20,7 +19,11 @@ import {
|
||||
translate,
|
||||
type Matrix as TransformationMatrix,
|
||||
} from "transformation-matrix";
|
||||
import { renderableImageBlob } from "./blob";
|
||||
import type { UploadItem } from "../upload/types";
|
||||
import {
|
||||
renderableImageBitmap,
|
||||
renderableUploadItemImageBitmap,
|
||||
} from "./bitmap";
|
||||
import { saveFaceCrops } from "./crop";
|
||||
import {
|
||||
clamp,
|
||||
@@ -28,6 +31,7 @@ import {
|
||||
pixelRGBBilinear,
|
||||
warpAffineFloat32List,
|
||||
} from "./image";
|
||||
import type { MLWorkerElectron } from "./worker-electron";
|
||||
|
||||
/**
|
||||
* The version of the face indexing pipeline implemented by the current client.
|
||||
@@ -212,20 +216,24 @@ export interface Box {
|
||||
*
|
||||
* @param enteFile The {@link EnteFile} to index.
|
||||
*
|
||||
* @param file The contents of {@link enteFile} as a web {@link File}, if
|
||||
* available. These are used when they are provided, otherwise the file is
|
||||
* downloaded and decrypted from remote.
|
||||
* @param uploadItem If we're called during the upload process, then this will
|
||||
* be set to the {@link UploadItem} that was uploaded. This way, we can directly
|
||||
* use the on-disk file instead of needing to download the original from remote.
|
||||
*
|
||||
* @param electron The {@link MLWorkerElectron} instance that allows us to call
|
||||
* our Node.js layer for various functionality.
|
||||
*
|
||||
* @param userAgent The UA of the client that is doing the indexing (us).
|
||||
*/
|
||||
export const indexFaces = async (
|
||||
enteFile: EnteFile,
|
||||
file: File | undefined,
|
||||
uploadItem: UploadItem | undefined,
|
||||
electron: MLWorkerElectron,
|
||||
userAgent: string,
|
||||
) => {
|
||||
const imageBitmap = await renderableImageBlob(enteFile, file).then(
|
||||
createImageBitmap,
|
||||
);
|
||||
): Promise<FaceIndex> => {
|
||||
const imageBitmap = uploadItem
|
||||
? await renderableUploadItemImageBitmap(enteFile, uploadItem, electron)
|
||||
: await renderableImageBitmap(enteFile);
|
||||
const { width, height } = imageBitmap;
|
||||
const fileID = enteFile.id;
|
||||
|
||||
@@ -237,7 +245,7 @@ export const indexFaces = async (
|
||||
faceEmbedding: {
|
||||
version: faceIndexingVersion,
|
||||
client: userAgent,
|
||||
faces: await indexFacesInBitmap(fileID, imageBitmap),
|
||||
faces: await indexFacesInBitmap(fileID, imageBitmap, electron),
|
||||
},
|
||||
};
|
||||
// This step, saving face crops, is not part of the indexing pipeline;
|
||||
@@ -258,11 +266,12 @@ export const indexFaces = async (
|
||||
const indexFacesInBitmap = async (
|
||||
fileID: number,
|
||||
imageBitmap: ImageBitmap,
|
||||
electron: MLWorkerElectron,
|
||||
): Promise<Face[]> => {
|
||||
const { width, height } = imageBitmap;
|
||||
const imageDimensions = { width, height };
|
||||
|
||||
const yoloFaceDetections = await detectFaces(imageBitmap);
|
||||
const yoloFaceDetections = await detectFaces(imageBitmap, electron);
|
||||
const partialResult = yoloFaceDetections.map(
|
||||
({ box, landmarks, score }) => {
|
||||
const faceID = makeFaceID(fileID, box, imageDimensions);
|
||||
@@ -271,19 +280,16 @@ const indexFacesInBitmap = async (
|
||||
},
|
||||
);
|
||||
|
||||
const alignments: FaceAlignment[] = [];
|
||||
|
||||
for (const { detection } of partialResult) {
|
||||
const alignment = computeFaceAlignment(detection);
|
||||
alignments.push(alignment);
|
||||
}
|
||||
const alignments = partialResult.map(({ detection }) =>
|
||||
computeFaceAlignment(detection),
|
||||
);
|
||||
|
||||
const alignedFacesData = convertToMobileFaceNetInput(
|
||||
imageBitmap,
|
||||
alignments,
|
||||
);
|
||||
|
||||
const embeddings = await computeEmbeddings(alignedFacesData);
|
||||
const embeddings = await computeEmbeddings(alignedFacesData, electron);
|
||||
const blurs = detectBlur(
|
||||
alignedFacesData,
|
||||
partialResult.map((f) => f.detection),
|
||||
@@ -305,6 +311,7 @@ const indexFacesInBitmap = async (
|
||||
*/
|
||||
const detectFaces = async (
|
||||
imageBitmap: ImageBitmap,
|
||||
electron: MLWorkerElectron,
|
||||
): Promise<YOLOFaceDetection[]> => {
|
||||
const rect = ({ width, height }: Dimensions) => ({
|
||||
x: 0,
|
||||
@@ -315,7 +322,7 @@ const detectFaces = async (
|
||||
|
||||
const { yoloInput, yoloSize } =
|
||||
convertToYOLOInputFloat32ChannelsFirst(imageBitmap);
|
||||
const yoloOutput = await workerBridge.detectFaces(yoloInput);
|
||||
const yoloOutput = await electron.detectFaces(yoloInput);
|
||||
const faces = filterExtractDetectionsFromYOLOOutput(yoloOutput);
|
||||
const faceDetections = transformYOLOFaceDetections(
|
||||
faces,
|
||||
@@ -873,8 +880,9 @@ const mobileFaceNetEmbeddingSize = 192;
|
||||
*/
|
||||
const computeEmbeddings = async (
|
||||
faceData: Float32Array,
|
||||
electron: MLWorkerElectron,
|
||||
): Promise<Float32Array[]> => {
|
||||
const outputData = await workerBridge.computeFaceEmbeddings(faceData);
|
||||
const outputData = await electron.computeFaceEmbeddings(faceData);
|
||||
|
||||
const embeddingSize = mobileFaceNetEmbeddingSize;
|
||||
const embeddings = new Array<Float32Array>(
|
||||
|
||||
@@ -8,11 +8,13 @@ import {
|
||||
isInternalUser,
|
||||
} from "@/new/photos/services/feature-flags";
|
||||
import type { EnteFile } from "@/new/photos/types/file";
|
||||
import { clientPackageName, isDesktop } from "@/next/app";
|
||||
import { isDesktop } from "@/next/app";
|
||||
import { blobCache } from "@/next/blob-cache";
|
||||
import { ensureElectron } from "@/next/electron";
|
||||
import log from "@/next/log";
|
||||
import { ComlinkWorker } from "@/next/worker/comlink-worker";
|
||||
import { proxy } from "comlink";
|
||||
import type { UploadItem } from "../upload/types";
|
||||
import { regenerateFaceCrops } from "./crop";
|
||||
import { clearFaceDB, faceIndex, indexableAndIndexedCounts } from "./db";
|
||||
import { MLWorker } from "./worker";
|
||||
@@ -40,18 +42,21 @@ const worker = async () => {
|
||||
};
|
||||
|
||||
const createComlinkWorker = async () => {
|
||||
const electron = ensureElectron();
|
||||
const mlWorkerElectron = {
|
||||
appVersion: electron.appVersion,
|
||||
detectFaces: electron.detectFaces,
|
||||
computeFaceEmbeddings: electron.computeFaceEmbeddings,
|
||||
};
|
||||
|
||||
const cw = new ComlinkWorker<typeof MLWorker>(
|
||||
"ml",
|
||||
"ML",
|
||||
new Worker(new URL("worker.ts", import.meta.url)),
|
||||
);
|
||||
const ua = await getUserAgent();
|
||||
await cw.remote.then((w) => w.init(ua));
|
||||
await cw.remote.then((w) => w.init(proxy(mlWorkerElectron)));
|
||||
return cw;
|
||||
};
|
||||
|
||||
const getUserAgent = async () =>
|
||||
`${clientPackageName}/${await ensureElectron().appVersion()}`;
|
||||
|
||||
/**
|
||||
* Terminate {@link worker} (if any).
|
||||
*
|
||||
@@ -161,20 +166,25 @@ export const triggerMLSync = () => {
|
||||
};
|
||||
|
||||
/**
|
||||
* Called by the uploader when it uploads a new file from this client.
|
||||
* Run indexing on a file which was uploaded from this client.
|
||||
*
|
||||
* This function is called by the uploader when it uploads a new file from this
|
||||
* client, giving us the opportunity to index it live. This is only an
|
||||
* optimization - if we don't index it now it'll anyways get indexed later as
|
||||
* part of the batch jobs, but that might require downloading the file's
|
||||
* contents again.
|
||||
*
|
||||
* @param enteFile The {@link EnteFile} that got uploaded.
|
||||
*
|
||||
* @param file When available, the web {@link File} object representing the
|
||||
* contents of the file that got uploaded.
|
||||
* @param uploadItem The item that was uploaded. This can be used to get at the
|
||||
* contents of the file that got uploaded. In case of live photos, this is the
|
||||
* image part of the live photo that was uploaded.
|
||||
*/
|
||||
export const onUpload = (enteFile: EnteFile, file: File | undefined) => {
|
||||
export const indexNewUpload = (enteFile: EnteFile, uploadItem: UploadItem) => {
|
||||
if (!_isMLEnabled) return;
|
||||
if (enteFile.metadata.fileType !== FILE_TYPE.IMAGE) return;
|
||||
log.debug(() => ({ t: "ml-liveq", enteFile, file }));
|
||||
// TODO-ML: 1. Use this file!
|
||||
// TODO-ML: 2. Handle cases when File is something else (e.g. on desktop).
|
||||
void worker().then((w) => w.onUpload(enteFile));
|
||||
log.debug(() => ({ t: "ml/liveq", enteFile, uploadItem }));
|
||||
void worker().then((w) => w.onUpload(enteFile, uploadItem));
|
||||
};
|
||||
|
||||
export interface FaceIndexingStatus {
|
||||
|
||||
13
web/packages/new/photos/services/ml/worker-electron.ts
Normal file
13
web/packages/new/photos/services/ml/worker-electron.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
/**
|
||||
* A subset of {@link Electron} provided to the {@link MLWorker}.
|
||||
*
|
||||
* `globalThis.electron` does not exist in the execution context of web workers.
|
||||
* So instead, we manually provide a proxy object of type
|
||||
* {@link MLWorkerElectron} that exposes a subset of the functions from
|
||||
* {@link Electron} that are needed by the code running in the ML web worker.
|
||||
*/
|
||||
export interface MLWorkerElectron {
|
||||
appVersion: () => Promise<string>;
|
||||
detectFaces: (input: Float32Array) => Promise<Float32Array>;
|
||||
computeFaceEmbeddings: (input: Float32Array) => Promise<Float32Array>;
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { EnteFile } from "@/new/photos/types/file";
|
||||
import { fileLogID } from "@/new/photos/utils/file";
|
||||
import { clientPackageName } from "@/next/app";
|
||||
import { getKVN } from "@/next/kv";
|
||||
import { ensureAuthToken } from "@/next/local-user";
|
||||
import log from "@/next/log";
|
||||
@@ -8,6 +9,7 @@ import { wait } from "@/utils/promise";
|
||||
import { expose } from "comlink";
|
||||
import downloadManager from "../download";
|
||||
import { getAllLocalFiles, getLocalTrashedFiles } from "../files";
|
||||
import type { UploadItem } from "../upload/types";
|
||||
import {
|
||||
indexableFileIDs,
|
||||
markIndexingFailed,
|
||||
@@ -16,10 +18,17 @@ import {
|
||||
} from "./db";
|
||||
import { pullFaceEmbeddings, putFaceIndex } from "./embedding";
|
||||
import { type FaceIndex, indexFaces } from "./face";
|
||||
import type { MLWorkerElectron } from "./worker-electron";
|
||||
|
||||
const idleDurationStart = 5; /* 5 seconds */
|
||||
const idleDurationMax = 16 * 60; /* 16 minutes */
|
||||
|
||||
/** An entry in the liveQ maintained by the worker */
|
||||
interface LiveQItem {
|
||||
enteFile: EnteFile;
|
||||
uploadItem: UploadItem;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run operations related to machine learning (e.g. indexing) in a Web Worker.
|
||||
*
|
||||
@@ -43,9 +52,10 @@ const idleDurationMax = 16 * 60; /* 16 minutes */
|
||||
* - "idle": in between state transitions
|
||||
*/
|
||||
export class MLWorker {
|
||||
private electron: MLWorkerElectron | undefined;
|
||||
private userAgent: string | undefined;
|
||||
private shouldSync = false;
|
||||
private liveQ: EnteFile[] = [];
|
||||
private liveQ: LiveQItem[] = [];
|
||||
private state: "idle" | "pull" | "indexing" = "idle";
|
||||
private idleTimeout: ReturnType<typeof setTimeout> | undefined;
|
||||
private idleDuration = idleDurationStart; /* unit: seconds */
|
||||
@@ -56,11 +66,14 @@ export class MLWorker {
|
||||
* This is conceptually the constructor, however it is easier to have this
|
||||
* as a separate function to avoid confounding the comlink types too much.
|
||||
*
|
||||
* @param userAgent The user agent string to use as the client field in the
|
||||
* embeddings generated during indexing by this client.
|
||||
* @param electron The {@link MLWorkerElectron} that allows the worker to
|
||||
* use the functionality provided by our Node.js layer when running in the
|
||||
* context of our desktop app
|
||||
*/
|
||||
async init(userAgent: string) {
|
||||
this.userAgent = userAgent;
|
||||
async init(electron: MLWorkerElectron) {
|
||||
this.electron = electron;
|
||||
// Set the user agent that'll be set in the generated embeddings.
|
||||
this.userAgent = `${clientPackageName}/${await electron.appVersion()}`;
|
||||
// Initialize the downloadManager running in the web worker with the
|
||||
// user's token. It'll be used to download files to index if needed.
|
||||
await downloadManager.init(await ensureAuthToken());
|
||||
@@ -101,7 +114,7 @@ export class MLWorker {
|
||||
* representation of the file's contents with us and won't need to download
|
||||
* the file from remote.
|
||||
*/
|
||||
onUpload(file: EnteFile) {
|
||||
onUpload(enteFile: EnteFile, uploadItem: UploadItem) {
|
||||
// Add the recently uploaded file to the live indexing queue.
|
||||
//
|
||||
// Limit the queue to some maximum so that we don't keep growing
|
||||
@@ -112,11 +125,11 @@ export class MLWorker {
|
||||
// long as we're not systematically ignoring it). This is because the
|
||||
// live queue is just an optimization: if a file doesn't get indexed via
|
||||
// the live queue, it'll later get indexed anyway when we backfill.
|
||||
if (this.liveQ.length < 50) {
|
||||
this.liveQ.push(file);
|
||||
if (this.liveQ.length < 200) {
|
||||
this.liveQ.push({ enteFile, uploadItem });
|
||||
this.wakeUp();
|
||||
} else {
|
||||
log.debug(() => "Ignoring live item since liveQ is full");
|
||||
log.debug(() => "Ignoring upload item since liveQ is full");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +142,7 @@ export class MLWorker {
|
||||
|
||||
private async tick() {
|
||||
log.debug(() => ({
|
||||
t: "ml-tick",
|
||||
t: "ml/tick",
|
||||
state: this.state,
|
||||
shouldSync: this.shouldSync,
|
||||
liveQ: this.liveQ,
|
||||
@@ -155,7 +168,11 @@ export class MLWorker {
|
||||
const liveQ = this.liveQ;
|
||||
this.liveQ = [];
|
||||
this.state = "indexing";
|
||||
const allSuccess = await indexNextBatch(ensure(this.userAgent), liveQ);
|
||||
const allSuccess = await indexNextBatch(
|
||||
liveQ,
|
||||
ensure(this.electron),
|
||||
ensure(this.userAgent),
|
||||
);
|
||||
if (allSuccess) {
|
||||
// Everything is running smoothly. Reset the idle duration.
|
||||
this.idleDuration = idleDurationStart;
|
||||
@@ -196,7 +213,11 @@ const pull = pullFaceEmbeddings;
|
||||
* Which means that when it returns true, all is well and there are more
|
||||
* things pending to process, so we should chug along at full speed.
|
||||
*/
|
||||
const indexNextBatch = async (userAgent: string, liveQ: EnteFile[]) => {
|
||||
const indexNextBatch = async (
|
||||
liveQ: LiveQItem[],
|
||||
electron: MLWorkerElectron,
|
||||
userAgent: string,
|
||||
) => {
|
||||
if (!self.navigator.onLine) {
|
||||
log.info("Skipping ML indexing since we are not online");
|
||||
return false;
|
||||
@@ -204,16 +225,23 @@ const indexNextBatch = async (userAgent: string, liveQ: EnteFile[]) => {
|
||||
|
||||
const userID = ensure(await getKVN("userID"));
|
||||
|
||||
const files =
|
||||
// Use the liveQ if present, otherwise get the next batch to backfill.
|
||||
const items =
|
||||
liveQ.length > 0
|
||||
? liveQ
|
||||
: await syncWithLocalFilesAndGetFilesToIndex(userID, 200);
|
||||
if (files.length == 0) return false;
|
||||
: await syncWithLocalFilesAndGetFilesToIndex(userID, 200).then(
|
||||
(fs) =>
|
||||
fs.map((f) => ({ enteFile: f, uploadItem: undefined })),
|
||||
);
|
||||
|
||||
// Nothing to do.
|
||||
if (items.length == 0) return false;
|
||||
|
||||
// Index, keeping track if any of the items failed.
|
||||
let allSuccess = true;
|
||||
for (const file of files) {
|
||||
for (const { enteFile, uploadItem } of items) {
|
||||
try {
|
||||
await index(file, undefined, userAgent);
|
||||
await index(enteFile, uploadItem, electron, userAgent);
|
||||
// Possibly unnecessary, but let us drain the microtask queue.
|
||||
await wait(0);
|
||||
} catch {
|
||||
@@ -221,6 +249,7 @@ const indexNextBatch = async (userAgent: string, liveQ: EnteFile[]) => {
|
||||
}
|
||||
}
|
||||
|
||||
// Return true if nothing failed.
|
||||
return allSuccess;
|
||||
};
|
||||
|
||||
@@ -263,7 +292,7 @@ const syncWithLocalFilesAndGetFilesToIndex = async (
|
||||
*
|
||||
* @param enteFile The {@link EnteFile} to index.
|
||||
*
|
||||
* @param file If the file is one which is being uploaded from the current
|
||||
* @param uploadItem If the file is one which is being uploaded from the current
|
||||
* client, then we will also have access to the file's content. In such
|
||||
* cases, pass a web {@link File} object to use that its data directly for
|
||||
* face indexing. If this is not provided, then the file's contents will be
|
||||
@@ -273,7 +302,8 @@ const syncWithLocalFilesAndGetFilesToIndex = async (
|
||||
*/
|
||||
export const index = async (
|
||||
enteFile: EnteFile,
|
||||
file: File | undefined,
|
||||
uploadItem: UploadItem | undefined,
|
||||
electron: MLWorkerElectron,
|
||||
userAgent: string,
|
||||
) => {
|
||||
const f = fileLogID(enteFile);
|
||||
@@ -281,7 +311,7 @@ export const index = async (
|
||||
|
||||
let faceIndex: FaceIndex;
|
||||
try {
|
||||
faceIndex = await indexFaces(enteFile, file, userAgent);
|
||||
faceIndex = await indexFaces(enteFile, uploadItem, electron, userAgent);
|
||||
} catch (e) {
|
||||
// Mark indexing as having failed only if the indexing itself
|
||||
// failed, not if there were subsequent failures (like when trying
|
||||
|
||||
@@ -63,10 +63,38 @@ export function mergeMetadata(files: EnteFile[]): EnteFile[] {
|
||||
}
|
||||
|
||||
/**
|
||||
* The returned blob.type is filled in, whenever possible, with the MIME type of
|
||||
* Return a new {@link Blob} containing data in a format that the browser
|
||||
* (likely) knows how to render (in an img tag, or on the canvas).
|
||||
*
|
||||
* The type of the returned blob is set, whenever possible, to the MIME type of
|
||||
* the data that we're dealing with.
|
||||
*
|
||||
* @param fileName The name of the file whose data is {@link imageBlob}.
|
||||
*
|
||||
* @param imageBlob A {@link Blob} containing the contents of an image file.
|
||||
*
|
||||
* The logic used by this function is:
|
||||
*
|
||||
* 1. Try to detect the MIME type of the file from its contents and/or name.
|
||||
*
|
||||
* 2. If this detected type is one of the types that we know that the browser
|
||||
* doesn't know how to render, continue. Otherwise return the imageBlob that
|
||||
* was passed in (after setting its MIME type).
|
||||
*
|
||||
* 3. If we're running in our desktop app and this MIME type is something our
|
||||
* desktop app can natively convert to a JPEG (using ffmpeg), do that and
|
||||
* return the resultant JPEG blob.
|
||||
*
|
||||
* 4. If this is an HEIC file, use our (WASM) HEIC converter and return the
|
||||
* resultant JPEG blob.
|
||||
*
|
||||
* 5. Otherwise (or if any error occurs in the aforementioned steps), return
|
||||
* `undefined`.
|
||||
*/
|
||||
export const getRenderableImage = async (fileName: string, imageBlob: Blob) => {
|
||||
export const renderableImageBlob = async (
|
||||
fileName: string,
|
||||
imageBlob: Blob,
|
||||
) => {
|
||||
try {
|
||||
const tempFile = new File([imageBlob], fileName);
|
||||
const fileTypeInfo = await detectFileTypeInfo(tempFile);
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
import type { Electron, ZipItem } from "@/next/types/ipc";
|
||||
import type { MLWorkerElectron } from "../services/ml/worker-electron";
|
||||
|
||||
/**
|
||||
* Stream the given file or zip entry from the user's local file system.
|
||||
@@ -16,7 +17,8 @@ import type { Electron, ZipItem } from "@/next/types/ipc";
|
||||
* See: [Note: IPC streams].
|
||||
*
|
||||
* To avoid accidentally invoking it in a non-desktop app context, it requires
|
||||
* the {@link Electron} object as a parameter (even though it doesn't use it).
|
||||
* the {@link Electron} (or a functionally similar) object as a parameter (even
|
||||
* though it doesn't use it).
|
||||
*
|
||||
* @param pathOrZipItem Either the path on the file on the user's local file
|
||||
* system whose contents we want to stream. Or a tuple containing the path to a
|
||||
@@ -34,7 +36,7 @@ import type { Electron, ZipItem } from "@/next/types/ipc";
|
||||
* reading, expressed as epoch milliseconds.
|
||||
*/
|
||||
export const readStream = async (
|
||||
_: Electron,
|
||||
_: Electron | MLWorkerElectron,
|
||||
pathOrZipItem: string | ZipItem,
|
||||
): Promise<{ response: Response; size: number; lastModifiedMs: number }> => {
|
||||
let url: URL;
|
||||
|
||||
@@ -59,8 +59,10 @@ export const fileNameFromComponents = (components: FileNameComponents) =>
|
||||
*/
|
||||
export const basename = (path: string) => {
|
||||
const pathComponents = path.split("/");
|
||||
for (let i = pathComponents.length - 1; i >= 0; i--)
|
||||
if (pathComponents[i] !== "") return pathComponents[i];
|
||||
for (let i = pathComponents.length - 1; i >= 0; i--) {
|
||||
const component = pathComponents[i];
|
||||
if (component && component.length > 0) return component;
|
||||
}
|
||||
return path;
|
||||
};
|
||||
|
||||
|
||||
@@ -30,6 +30,8 @@ export class HTTPError extends Error {
|
||||
constructor(url: string, res: Response) {
|
||||
super(`Failed to fetch ${url}: HTTP ${res.status}`);
|
||||
|
||||
// Cargo culted from
|
||||
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error#custom_error_types
|
||||
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
||||
if (Error.captureStackTrace) Error.captureStackTrace(this, HTTPError);
|
||||
|
||||
|
||||
@@ -3,9 +3,34 @@ import log, { logToDisk } from "@/next/log";
|
||||
import { expose, wrap, type Remote } from "comlink";
|
||||
import { ensureLocalUser } from "../local-user";
|
||||
|
||||
/**
|
||||
* A minimal wrapper for a web {@link Worker}, proxying a class of type T.
|
||||
*
|
||||
* `comlink` is a library that simplies working with web workers by
|
||||
* transparently proxying objects across the boundary instead of us needing to
|
||||
* work directly with the raw postMessage interface.
|
||||
*
|
||||
* This class is a thin wrapper over a common usage pattern of comlink. It takes
|
||||
* a web worker ({@link Worker}) that is expected to have {@link expose}-ed a
|
||||
* class of type T. It then makes available the main thread handle to this class
|
||||
* as the {@link remote} property.
|
||||
*
|
||||
* It also exposes an object of type {@link WorkerBridge} _to_ the code running
|
||||
* inside the web worker.
|
||||
*
|
||||
* It all gets a bit confusing sometimes, so here is a legend of the parties
|
||||
* involved:
|
||||
*
|
||||
* - ComlinkWorker (wraps the web worker)
|
||||
* - Web `Worker` (exposes class T)
|
||||
* - ComlinkWorker.remote (the exposed class T running inside the web worker)
|
||||
*/
|
||||
export class ComlinkWorker<T extends new () => InstanceType<T>> {
|
||||
/** The class (T) exposed by the web worker */
|
||||
public remote: Promise<Remote<InstanceType<T>>>;
|
||||
/** The web worker */
|
||||
private worker: Worker;
|
||||
/** An arbitrary name associated with this ComlinkWorker for debugging. */
|
||||
private name: string;
|
||||
|
||||
constructor(name: string, worker: Worker) {
|
||||
@@ -17,7 +42,7 @@ export class ComlinkWorker<T extends new () => InstanceType<T>> {
|
||||
`Got error event from worker: ${JSON.stringify({ event, name })}`,
|
||||
);
|
||||
};
|
||||
log.debug(() => `Initiated web worker ${name}`);
|
||||
log.debug(() => `Created ${name} web worker`);
|
||||
const comlink = wrap<T>(worker);
|
||||
this.remote = new comlink() as Promise<Remote<InstanceType<T>>>;
|
||||
expose(workerBridge, worker);
|
||||
@@ -25,30 +50,27 @@ export class ComlinkWorker<T extends new () => InstanceType<T>> {
|
||||
|
||||
public terminate() {
|
||||
this.worker.terminate();
|
||||
log.debug(() => `Terminated web worker ${this.name}`);
|
||||
log.debug(() => `Terminated ${this.name} web worker`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A set of utility functions that we expose to all workers that we create.
|
||||
* A set of utility functions that we expose to all web workers that we create.
|
||||
*
|
||||
* Inside the worker's code, this can be accessed by using the sibling
|
||||
* `workerBridge` object after importing it from `worker-bridge.ts`.
|
||||
*
|
||||
* Not all workers need access to all these functions, and this can indeed be
|
||||
* done in a more fine-grained, per-worker, manner if needed. For now, since it
|
||||
* is a motley bunch, we just inject them all.
|
||||
* is a motley bunch, we just inject them all to all workers.
|
||||
*/
|
||||
const workerBridge = {
|
||||
// Needed: generally (presumably)
|
||||
// Needed by all workers (likely, not necessarily).
|
||||
logToDisk,
|
||||
// Needed by ML worker
|
||||
// Needed by MLWorker.
|
||||
getAuthToken: () => ensureLocalUser().token,
|
||||
convertToJPEG: (imageData: Uint8Array) =>
|
||||
ensureElectron().convertToJPEG(imageData),
|
||||
detectFaces: (input: Float32Array) => ensureElectron().detectFaces(input),
|
||||
computeFaceEmbeddings: (input: Float32Array) =>
|
||||
ensureElectron().computeFaceEmbeddings(input),
|
||||
};
|
||||
|
||||
export type WorkerBridge = typeof workerBridge;
|
||||
|
||||
Reference in New Issue
Block a user