Face cooldown (#2300)
## Description - Remove cooldown timer in FaceMlService - Only load face ML models when actually doing indexing - Small refactor of FaceMlService ## Tests Tested in debug mode on my pixel phone.
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import "dart:async";
|
||||
import "dart:developer" as dev show log;
|
||||
import "dart:io" show File, Platform;
|
||||
import "dart:io" show File;
|
||||
import "dart:isolate";
|
||||
import "dart:math" show min;
|
||||
import "dart:typed_data" show Uint8List, Float32List, ByteData;
|
||||
@@ -40,7 +40,6 @@ import 'package:photos/services/machine_learning/face_ml/face_ml_result.dart';
|
||||
import "package:photos/services/machine_learning/face_ml/person/person_service.dart";
|
||||
import 'package:photos/services/machine_learning/file_ml/file_ml.dart';
|
||||
import 'package:photos/services/machine_learning/file_ml/remote_fileml_service.dart';
|
||||
import "package:photos/services/machine_learning/machine_learning_controller.dart";
|
||||
import 'package:photos/services/machine_learning/ml_exceptions.dart';
|
||||
import "package:photos/services/search_service.dart";
|
||||
import "package:photos/utils/file_util.dart";
|
||||
@@ -50,7 +49,7 @@ import "package:photos/utils/network_util.dart";
|
||||
import "package:photos/utils/thumbnail_util.dart";
|
||||
import "package:synchronized/synchronized.dart";
|
||||
|
||||
enum FileDataForML { thumbnailData, fileData, compressedFileData }
|
||||
enum FileDataForML { thumbnailData, fileData }
|
||||
|
||||
enum FaceMlOperation { analyzeImage }
|
||||
|
||||
@@ -66,26 +65,29 @@ class FaceMlService {
|
||||
Timer? _inactivityTimer;
|
||||
final Duration _inactivityDuration = const Duration(seconds: 120);
|
||||
int _activeTasks = 0;
|
||||
final _initLockIsolate = Lock();
|
||||
late DartUiIsolate _isolate;
|
||||
late ReceivePort _receivePort = ReceivePort();
|
||||
late SendPort _mainSendPort;
|
||||
|
||||
bool _isIsolateSpawned = false;
|
||||
|
||||
// Singleton pattern
|
||||
FaceMlService._privateConstructor();
|
||||
static final instance = FaceMlService._privateConstructor();
|
||||
factory FaceMlService() => instance;
|
||||
|
||||
final _initLock = Lock();
|
||||
final _initModelLock = Lock();
|
||||
final _functionLock = Lock();
|
||||
final _initIsolateLock = Lock();
|
||||
|
||||
final _computer = Computer.shared();
|
||||
|
||||
bool isInitialized = false;
|
||||
bool _isInitialized = false;
|
||||
bool _isModelsInitialized = false;
|
||||
bool _isIsolateSpawned = false;
|
||||
|
||||
late String client;
|
||||
|
||||
bool get isInitialized => _isInitialized;
|
||||
|
||||
bool get showClusteringIsHappening => _showClusteringIsHappening;
|
||||
|
||||
bool debugIndexingDisabled = false;
|
||||
@@ -96,252 +98,57 @@ class FaceMlService {
|
||||
bool _shouldSyncPeople = false;
|
||||
bool _isSyncing = false;
|
||||
|
||||
final int _fileDownloadLimit = 10;
|
||||
final int _embeddingFetchLimit = 200;
|
||||
final int _kForceClusteringFaceCount = 8000;
|
||||
final int _kcooldownLimit = 300;
|
||||
static const Duration _kCooldownDuration = Duration(minutes: 3);
|
||||
static const int _fileDownloadLimit = 10;
|
||||
static const _embeddingFetchLimit = 200;
|
||||
static const _kForceClusteringFaceCount = 8000;
|
||||
|
||||
/// Only call this function once at app startup, after that you can directly call [runAllFaceML]
|
||||
Future<void> init() async {
|
||||
if (LocalSettings.instance.isFaceIndexingEnabled == false) {
|
||||
if (LocalSettings.instance.isFaceIndexingEnabled == false ||
|
||||
_isInitialized) {
|
||||
return;
|
||||
}
|
||||
return _initLock.synchronized(() async {
|
||||
if (isInitialized) {
|
||||
_logger.info("init called");
|
||||
|
||||
// Listen on MachineLearningController
|
||||
Bus.instance.on<MachineLearningControlEvent>().listen((event) {
|
||||
if (LocalSettings.instance.isFaceIndexingEnabled == false) {
|
||||
return;
|
||||
}
|
||||
_logger.info("init called");
|
||||
_logStatus();
|
||||
await _computer.compute(initOrtEnv);
|
||||
try {
|
||||
await FaceDetectionService.instance.init();
|
||||
} catch (e, s) {
|
||||
_logger.severe("Could not initialize yolo onnx", e, s);
|
||||
}
|
||||
try {
|
||||
await FaceEmbeddingService.instance.init();
|
||||
} catch (e, s) {
|
||||
_logger.severe("Could not initialize mobilefacenet", e, s);
|
||||
}
|
||||
|
||||
// Get client name
|
||||
final packageInfo = await PackageInfo.fromPlatform();
|
||||
client = "${packageInfo.packageName}/${packageInfo.version}";
|
||||
_logger.info("client: $client");
|
||||
|
||||
isInitialized = true;
|
||||
_mlControllerStatus = !Platform.isAndroid;
|
||||
|
||||
/// hooking FaceML into [MachineLearningController]
|
||||
Bus.instance.on<MachineLearningControlEvent>().listen((event) {
|
||||
if (LocalSettings.instance.isFaceIndexingEnabled == false) {
|
||||
return;
|
||||
}
|
||||
_mlControllerStatus = event.shouldRun;
|
||||
if (_mlControllerStatus) {
|
||||
if (_shouldPauseIndexingAndClustering) {
|
||||
_shouldPauseIndexingAndClustering = false;
|
||||
_logger.info(
|
||||
"MLController allowed running ML, faces indexing undoing previous pause",
|
||||
);
|
||||
} else {
|
||||
_logger.info(
|
||||
"MLController allowed running ML, faces indexing starting",
|
||||
);
|
||||
}
|
||||
unawaited(indexAndClusterAll());
|
||||
_mlControllerStatus = event.shouldRun;
|
||||
if (_mlControllerStatus) {
|
||||
if (_shouldPauseIndexingAndClustering) {
|
||||
_shouldPauseIndexingAndClustering = false;
|
||||
_logger.info(
|
||||
"MLController allowed running ML, faces indexing undoing previous pause",
|
||||
);
|
||||
} else {
|
||||
_logger.info(
|
||||
"MLController stopped running ML, faces indexing will be paused (unless it's fetching embeddings)",
|
||||
"MLController allowed running ML, faces indexing starting",
|
||||
);
|
||||
pauseIndexingAndClustering();
|
||||
}
|
||||
});
|
||||
if (Platform.isIOS &&
|
||||
MachineLearningController.instance.isDeviceHealthy) {
|
||||
_logger.info("Starting face indexing and clustering on iOS from init");
|
||||
unawaited(indexAndClusterAll());
|
||||
unawaited(runAllFaceML());
|
||||
} else {
|
||||
_logger.info(
|
||||
"MLController stopped running ML, faces indexing will be paused (unless it's fetching embeddings)",
|
||||
);
|
||||
pauseIndexingAndClustering();
|
||||
}
|
||||
|
||||
_listenIndexOnDiffSync();
|
||||
_listenOnPeopleChangedSync();
|
||||
|
||||
_logger.info('init done');
|
||||
});
|
||||
}
|
||||
|
||||
static void initOrtEnv() async {
|
||||
OrtEnv.instance.init();
|
||||
}
|
||||
|
||||
void _listenIndexOnDiffSync() {
|
||||
// Listen on DiffSync
|
||||
Bus.instance.on<DiffSyncCompleteEvent>().listen((event) async {
|
||||
unawaited(sync());
|
||||
});
|
||||
}
|
||||
|
||||
void _listenOnPeopleChangedSync() {
|
||||
// Listne on PeopleChanged
|
||||
Bus.instance.on<PeopleChangedEvent>().listen((event) {
|
||||
if (event.type == PeopleEventType.syncDone) return;
|
||||
_shouldSyncPeople = true;
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> ensureInitialized() async {
|
||||
if (!isInitialized) {
|
||||
await init();
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> release() async {
|
||||
return _initLock.synchronized(() async {
|
||||
_logger.info("dispose called");
|
||||
if (!isInitialized) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await FaceDetectionService.instance.release();
|
||||
} catch (e, s) {
|
||||
_logger.severe("Could not dispose yolo onnx", e, s);
|
||||
}
|
||||
try {
|
||||
await FaceEmbeddingService.instance.release();
|
||||
} catch (e, s) {
|
||||
_logger.severe("Could not dispose mobilefacenet", e, s);
|
||||
}
|
||||
OrtEnv.instance.release();
|
||||
isInitialized = false;
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _initIsolate() async {
|
||||
return _initLockIsolate.synchronized(() async {
|
||||
if (_isIsolateSpawned) return;
|
||||
_logger.info("initIsolate called");
|
||||
|
||||
_receivePort = ReceivePort();
|
||||
|
||||
try {
|
||||
_isolate = await DartUiIsolate.spawn(
|
||||
_isolateMain,
|
||||
_receivePort.sendPort,
|
||||
);
|
||||
_mainSendPort = await _receivePort.first as SendPort;
|
||||
_isIsolateSpawned = true;
|
||||
|
||||
_resetInactivityTimer();
|
||||
} catch (e) {
|
||||
_logger.severe('Could not spawn isolate', e);
|
||||
_isIsolateSpawned = false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _ensureSpawnedIsolate() async {
|
||||
if (!_isIsolateSpawned) {
|
||||
await _initIsolate();
|
||||
}
|
||||
}
|
||||
|
||||
/// The main execution function of the isolate.
|
||||
@pragma('vm:entry-point')
|
||||
static void _isolateMain(SendPort mainSendPort) async {
|
||||
final receivePort = ReceivePort();
|
||||
mainSendPort.send(receivePort.sendPort);
|
||||
|
||||
receivePort.listen((message) async {
|
||||
final functionIndex = message[0] as int;
|
||||
final function = FaceMlOperation.values[functionIndex];
|
||||
final args = message[1] as Map<String, dynamic>;
|
||||
final sendPort = message[2] as SendPort;
|
||||
|
||||
try {
|
||||
switch (function) {
|
||||
case FaceMlOperation.analyzeImage:
|
||||
final time = DateTime.now();
|
||||
final FaceMlResult result =
|
||||
await FaceMlService.analyzeImageSync(args);
|
||||
dev.log(
|
||||
"`analyzeImageSync` function executed in ${DateTime.now().difference(time).inMilliseconds} ms",
|
||||
);
|
||||
sendPort.send(result.toJsonString());
|
||||
break;
|
||||
}
|
||||
} catch (e, stackTrace) {
|
||||
dev.log(
|
||||
"[SEVERE] Error in FaceML isolate: $e",
|
||||
error: e,
|
||||
stackTrace: stackTrace,
|
||||
);
|
||||
sendPort
|
||||
.send({'error': e.toString(), 'stackTrace': stackTrace.toString()});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// The common method to run any operation in the isolate. It sends the [message] to [_isolateMain] and waits for the result.
|
||||
Future<dynamic> _runInIsolate(
|
||||
(FaceMlOperation, Map<String, dynamic>) message,
|
||||
) async {
|
||||
await _ensureSpawnedIsolate();
|
||||
return _functionLock.synchronized(() async {
|
||||
_resetInactivityTimer();
|
||||
|
||||
if (_shouldPauseIndexingAndClustering) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final completer = Completer<dynamic>();
|
||||
final answerPort = ReceivePort();
|
||||
|
||||
_activeTasks++;
|
||||
_mainSendPort.send([message.$1.index, message.$2, answerPort.sendPort]);
|
||||
|
||||
answerPort.listen((receivedMessage) {
|
||||
if (receivedMessage is Map && receivedMessage.containsKey('error')) {
|
||||
// Handle the error
|
||||
final errorMessage = receivedMessage['error'];
|
||||
final errorStackTrace = receivedMessage['stackTrace'];
|
||||
final exception = Exception(errorMessage);
|
||||
final stackTrace = StackTrace.fromString(errorStackTrace);
|
||||
completer.completeError(exception, stackTrace);
|
||||
} else {
|
||||
completer.complete(receivedMessage);
|
||||
}
|
||||
});
|
||||
_activeTasks--;
|
||||
|
||||
return completer.future;
|
||||
});
|
||||
}
|
||||
|
||||
/// Resets a timer that kills the isolate after a certain amount of inactivity.
|
||||
///
|
||||
/// Should be called after initialization (e.g. inside `init()`) and after every call to isolate (e.g. inside `_runInIsolate()`)
|
||||
void _resetInactivityTimer() {
|
||||
_inactivityTimer?.cancel();
|
||||
_inactivityTimer = Timer(_inactivityDuration, () {
|
||||
if (_activeTasks > 0) {
|
||||
_logger.info('Tasks are still running. Delaying isolate disposal.');
|
||||
// Optionally, reschedule the timer to check again later.
|
||||
_resetInactivityTimer();
|
||||
} else {
|
||||
_logger.info(
|
||||
'Clustering Isolate has been inactive for ${_inactivityDuration.inSeconds} seconds with no tasks running. Killing isolate.',
|
||||
);
|
||||
_disposeIsolate();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _disposeIsolate() async {
|
||||
if (!_isIsolateSpawned) return;
|
||||
await release();
|
||||
|
||||
_isIsolateSpawned = false;
|
||||
_isolate.kill();
|
||||
_receivePort.close();
|
||||
_inactivityTimer?.cancel();
|
||||
_isInitialized = true;
|
||||
_logger.info('init done');
|
||||
}
|
||||
|
||||
Future<void> sync({bool forceSync = true}) async {
|
||||
@@ -357,8 +164,11 @@ class FaceMlService {
|
||||
_isSyncing = false;
|
||||
}
|
||||
|
||||
Future<void> indexAndClusterAll() async {
|
||||
if (_cannotRunMLFunction()) return;
|
||||
Future<void> runAllFaceML({bool force = false}) async {
|
||||
if (force) {
|
||||
_mlControllerStatus = true;
|
||||
}
|
||||
if (_cannotRunMLFunction() && !force) return;
|
||||
|
||||
await sync(forceSync: _shouldSyncPeople);
|
||||
|
||||
@@ -399,13 +209,8 @@ class FaceMlService {
|
||||
await SearchService.instance.getAllFiles();
|
||||
w?.log('getAllFiles');
|
||||
|
||||
// Make sure the image conversion isolate is spawned
|
||||
// await ImageMlIsolate.instance.ensureSpawned();
|
||||
await ensureInitialized();
|
||||
|
||||
int fileAnalyzedCount = 0;
|
||||
int fileSkippedCount = 0;
|
||||
int cooldownCount = 0;
|
||||
final stopwatch = Stopwatch()..start();
|
||||
final List<EnteFile> filesWithLocalID = <EnteFile>[];
|
||||
final List<EnteFile> filesWithoutLocalID = <EnteFile>[];
|
||||
@@ -540,6 +345,7 @@ class FaceMlService {
|
||||
fileSkippedCount++;
|
||||
continue;
|
||||
}
|
||||
await _ensureReadyForInference();
|
||||
futures.add(processImage(enteFile));
|
||||
}
|
||||
final awaitedFutures = await Future.wait(futures);
|
||||
@@ -548,22 +354,12 @@ class FaceMlService {
|
||||
(previousValue, element) => previousValue + (element ? 1 : 0),
|
||||
);
|
||||
fileAnalyzedCount += sumFutures;
|
||||
|
||||
if (fileAnalyzedCount > _kcooldownLimit) {
|
||||
_logger.info(
|
||||
'Reached ${cooldownCount * _kcooldownLimit + fileAnalyzedCount} indexed files, cooling down to prevent OS from killing the app',
|
||||
);
|
||||
cooldownCount++;
|
||||
fileAnalyzedCount -= _kcooldownLimit;
|
||||
await Future.delayed(_kCooldownDuration);
|
||||
_logger.info('cooldown done, continuing indexing');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stopwatch.stop();
|
||||
_logger.info(
|
||||
"`indexAllImages()` finished. Fetched $fetchedCount and analyzed ${cooldownCount * _kcooldownLimit + fileAnalyzedCount} images, in ${stopwatch.elapsed.inSeconds} seconds (avg of ${stopwatch.elapsed.inSeconds / fileAnalyzedCount} seconds per image, skipped $fileSkippedCount images, $cooldownCount cooldowns)",
|
||||
"`indexAllImages()` finished. Fetched $fetchedCount and analyzed $fileAnalyzedCount images, in ${stopwatch.elapsed.inSeconds} seconds (avg of ${stopwatch.elapsed.inSeconds / fileAnalyzedCount} seconds per image, skipped $fileSkippedCount images)",
|
||||
);
|
||||
_logStatus();
|
||||
} catch (e, s) {
|
||||
@@ -745,48 +541,6 @@ class FaceMlService {
|
||||
}
|
||||
}
|
||||
|
||||
bool _shouldDiscardRemoteEmbedding(FileMl fileMl) {
|
||||
if (fileMl.faceEmbedding.version < faceMlVersion) {
|
||||
debugPrint("Discarding remote embedding for fileID ${fileMl.fileID} "
|
||||
"because version is ${fileMl.faceEmbedding.version} and we need $faceMlVersion");
|
||||
return true;
|
||||
}
|
||||
// are all landmarks equal?
|
||||
bool allLandmarksEqual = true;
|
||||
if (fileMl.faceEmbedding.faces.isEmpty) {
|
||||
debugPrint("No face for ${fileMl.fileID}");
|
||||
allLandmarksEqual = false;
|
||||
}
|
||||
for (final face in fileMl.faceEmbedding.faces) {
|
||||
if (face.detection.landmarks.isEmpty) {
|
||||
allLandmarksEqual = false;
|
||||
break;
|
||||
}
|
||||
if (face.detection.landmarks
|
||||
.any((landmark) => landmark.x != landmark.y)) {
|
||||
allLandmarksEqual = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (allLandmarksEqual) {
|
||||
debugPrint("Discarding remote embedding for fileID ${fileMl.fileID} "
|
||||
"because landmarks are equal");
|
||||
debugPrint(
|
||||
fileMl.faceEmbedding.faces
|
||||
.map((e) => e.detection.landmarks.toString())
|
||||
.toList()
|
||||
.toString(),
|
||||
);
|
||||
return true;
|
||||
}
|
||||
if (fileMl.width == null || fileMl.height == null) {
|
||||
debugPrint("Discarding remote embedding for fileID ${fileMl.fileID} "
|
||||
"because width is null");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
Future<bool> processImage(EnteFile enteFile) async {
|
||||
_logger.info(
|
||||
"`processImage` start processing image with uploadedFileID: ${enteFile.uploadedFileID}",
|
||||
@@ -899,10 +653,188 @@ class FaceMlService {
|
||||
}
|
||||
}
|
||||
|
||||
/// Analyzes the given image data by running the full pipeline for faces, using [analyzeImageSync] in the isolate.
|
||||
Future<void> _initModels() async {
|
||||
return _initModelLock.synchronized(() async {
|
||||
if (_isModelsInitialized) return;
|
||||
_logger.info('initModels called');
|
||||
|
||||
// Get client name
|
||||
final packageInfo = await PackageInfo.fromPlatform();
|
||||
client = "${packageInfo.packageName}/${packageInfo.version}";
|
||||
_logger.info("client: $client");
|
||||
|
||||
// Initialize models
|
||||
await _computer.compute(() => OrtEnv.instance.init());
|
||||
try {
|
||||
await FaceDetectionService.instance.init();
|
||||
} catch (e, s) {
|
||||
_logger.severe("Could not initialize yolo onnx", e, s);
|
||||
}
|
||||
try {
|
||||
await FaceEmbeddingService.instance.init();
|
||||
} catch (e, s) {
|
||||
_logger.severe("Could not initialize mobilefacenet", e, s);
|
||||
}
|
||||
_isModelsInitialized = true;
|
||||
_logger.info('initModels done');
|
||||
_logStatus();
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _releaseModels() async {
|
||||
return _initModelLock.synchronized(() async {
|
||||
_logger.info("dispose called");
|
||||
if (!_isModelsInitialized) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await FaceDetectionService.instance.release();
|
||||
} catch (e, s) {
|
||||
_logger.severe("Could not dispose yolo onnx", e, s);
|
||||
}
|
||||
try {
|
||||
await FaceEmbeddingService.instance.release();
|
||||
} catch (e, s) {
|
||||
_logger.severe("Could not dispose mobilefacenet", e, s);
|
||||
}
|
||||
OrtEnv.instance.release();
|
||||
_isModelsInitialized = false;
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _initIsolate() async {
|
||||
return _initIsolateLock.synchronized(() async {
|
||||
if (_isIsolateSpawned) return;
|
||||
_logger.info("initIsolate called");
|
||||
|
||||
_receivePort = ReceivePort();
|
||||
|
||||
try {
|
||||
_isolate = await DartUiIsolate.spawn(
|
||||
_isolateMain,
|
||||
_receivePort.sendPort,
|
||||
);
|
||||
_mainSendPort = await _receivePort.first as SendPort;
|
||||
_isIsolateSpawned = true;
|
||||
|
||||
_resetInactivityTimer();
|
||||
_logger.info('initIsolate done');
|
||||
} catch (e) {
|
||||
_logger.severe('Could not spawn isolate', e);
|
||||
_isIsolateSpawned = false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _ensureReadyForInference() async {
|
||||
await _initModels();
|
||||
await _initIsolate();
|
||||
}
|
||||
|
||||
/// The main execution function of the isolate.
|
||||
@pragma('vm:entry-point')
|
||||
static void _isolateMain(SendPort mainSendPort) async {
|
||||
final receivePort = ReceivePort();
|
||||
mainSendPort.send(receivePort.sendPort);
|
||||
|
||||
receivePort.listen((message) async {
|
||||
final functionIndex = message[0] as int;
|
||||
final function = FaceMlOperation.values[functionIndex];
|
||||
final args = message[1] as Map<String, dynamic>;
|
||||
final sendPort = message[2] as SendPort;
|
||||
|
||||
try {
|
||||
switch (function) {
|
||||
case FaceMlOperation.analyzeImage:
|
||||
final time = DateTime.now();
|
||||
final FaceMlResult result =
|
||||
await FaceMlService._analyzeImageSync(args);
|
||||
dev.log(
|
||||
"`analyzeImageSync` function executed in ${DateTime.now().difference(time).inMilliseconds} ms",
|
||||
);
|
||||
sendPort.send(result.toJsonString());
|
||||
break;
|
||||
}
|
||||
} catch (e, stackTrace) {
|
||||
dev.log(
|
||||
"[SEVERE] Error in FaceML isolate: $e",
|
||||
error: e,
|
||||
stackTrace: stackTrace,
|
||||
);
|
||||
sendPort
|
||||
.send({'error': e.toString(), 'stackTrace': stackTrace.toString()});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// The common method to run any operation in the isolate. It sends the [message] to [_isolateMain] and waits for the result.
|
||||
Future<dynamic> _runInIsolate(
|
||||
(FaceMlOperation, Map<String, dynamic>) message,
|
||||
) async {
|
||||
await _initIsolate();
|
||||
return _functionLock.synchronized(() async {
|
||||
_resetInactivityTimer();
|
||||
|
||||
if (_shouldPauseIndexingAndClustering) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final completer = Completer<dynamic>();
|
||||
final answerPort = ReceivePort();
|
||||
|
||||
_activeTasks++;
|
||||
_mainSendPort.send([message.$1.index, message.$2, answerPort.sendPort]);
|
||||
|
||||
answerPort.listen((receivedMessage) {
|
||||
if (receivedMessage is Map && receivedMessage.containsKey('error')) {
|
||||
// Handle the error
|
||||
final errorMessage = receivedMessage['error'];
|
||||
final errorStackTrace = receivedMessage['stackTrace'];
|
||||
final exception = Exception(errorMessage);
|
||||
final stackTrace = StackTrace.fromString(errorStackTrace);
|
||||
completer.completeError(exception, stackTrace);
|
||||
} else {
|
||||
completer.complete(receivedMessage);
|
||||
}
|
||||
});
|
||||
_activeTasks--;
|
||||
|
||||
return completer.future;
|
||||
});
|
||||
}
|
||||
|
||||
/// Resets a timer that kills the isolate after a certain amount of inactivity.
|
||||
///
|
||||
/// Should be called after initialization (e.g. inside `init()`) and after every call to isolate (e.g. inside `_runInIsolate()`)
|
||||
void _resetInactivityTimer() {
|
||||
_inactivityTimer?.cancel();
|
||||
_inactivityTimer = Timer(_inactivityDuration, () {
|
||||
if (_activeTasks > 0) {
|
||||
_logger.info('Tasks are still running. Delaying isolate disposal.');
|
||||
// Optionally, reschedule the timer to check again later.
|
||||
_resetInactivityTimer();
|
||||
} else {
|
||||
_logger.info(
|
||||
'Clustering Isolate has been inactive for ${_inactivityDuration.inSeconds} seconds with no tasks running. Killing isolate.',
|
||||
);
|
||||
_dispose();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _dispose() async {
|
||||
if (!_isIsolateSpawned) return;
|
||||
_logger.info('Disposing isolate and models');
|
||||
await _releaseModels();
|
||||
_isIsolateSpawned = false;
|
||||
_isolate.kill();
|
||||
_receivePort.close();
|
||||
_inactivityTimer?.cancel();
|
||||
}
|
||||
|
||||
/// Analyzes the given image data by running the full pipeline for faces, using [_analyzeImageSync] in the isolate.
|
||||
Future<FaceMlResult?> _analyzeImageInSingleIsolate(EnteFile enteFile) async {
|
||||
_checkEnteFileForID(enteFile);
|
||||
await ensureInitialized();
|
||||
|
||||
final String? filePath =
|
||||
await _getImagePathForML(enteFile, typeOfData: FileDataForML.fileData);
|
||||
@@ -961,7 +893,7 @@ class FaceMlService {
|
||||
return result;
|
||||
}
|
||||
|
||||
static Future<FaceMlResult> analyzeImageSync(Map args) async {
|
||||
static Future<FaceMlResult> _analyzeImageSync(Map args) async {
|
||||
try {
|
||||
final int enteFileID = args["enteFileID"] as int;
|
||||
final String imagePath = args["filePath"] as String;
|
||||
@@ -986,7 +918,7 @@ class FaceMlService {
|
||||
|
||||
// Get the faces
|
||||
final List<FaceDetectionRelative> faceDetectionResult =
|
||||
await FaceMlService.detectFacesSync(
|
||||
await FaceMlService._detectFacesSync(
|
||||
image,
|
||||
imgByteData,
|
||||
faceDetectionAddress,
|
||||
@@ -1009,7 +941,7 @@ class FaceMlService {
|
||||
stopwatch.reset();
|
||||
// Align the faces
|
||||
final Float32List faceAlignmentResult =
|
||||
await FaceMlService.alignFacesSync(
|
||||
await FaceMlService._alignFacesSync(
|
||||
image,
|
||||
imgByteData,
|
||||
faceDetectionResult,
|
||||
@@ -1021,7 +953,7 @@ class FaceMlService {
|
||||
|
||||
stopwatch.reset();
|
||||
// Get the embeddings of the faces
|
||||
final embeddings = await FaceMlService.embedFacesSync(
|
||||
final embeddings = await FaceMlService._embedFacesSync(
|
||||
faceAlignmentResult,
|
||||
faceEmbeddingAddress,
|
||||
resultBuilder: resultBuilder,
|
||||
@@ -1103,13 +1035,6 @@ class FaceMlService {
|
||||
"Getting thumbnail data for uploadedFileID ${enteFile.uploadedFileID} took ${stopwatch.elapsedMilliseconds} ms",
|
||||
);
|
||||
break;
|
||||
|
||||
case FileDataForML.compressedFileData:
|
||||
_logger.warning(
|
||||
"Getting compressed file data for uploadedFileID ${enteFile.uploadedFileID} is not implemented yet",
|
||||
);
|
||||
imagePath = null;
|
||||
break;
|
||||
}
|
||||
|
||||
return imagePath;
|
||||
@@ -1120,7 +1045,7 @@ class FaceMlService {
|
||||
/// `imageData`: The image data to analyze.
|
||||
///
|
||||
/// Returns a list of face detection results.
|
||||
static Future<List<FaceDetectionRelative>> detectFacesSync(
|
||||
static Future<List<FaceDetectionRelative>> _detectFacesSync(
|
||||
Image image,
|
||||
ByteData imageByteData,
|
||||
int interpreterAddress, {
|
||||
@@ -1155,7 +1080,7 @@ class FaceMlService {
|
||||
/// `faces`: The face detection results in a list of [FaceDetectionAbsolute] for the faces to align.
|
||||
///
|
||||
/// Returns a list of the aligned faces as image data.
|
||||
static Future<Float32List> alignFacesSync(
|
||||
static Future<Float32List> _alignFacesSync(
|
||||
Image image,
|
||||
ByteData imageByteData,
|
||||
List<FaceDetectionRelative> faces, {
|
||||
@@ -1188,7 +1113,7 @@ class FaceMlService {
|
||||
}
|
||||
}
|
||||
|
||||
static Future<List<List<double>>> embedFacesSync(
|
||||
static Future<List<List<double>>> _embedFacesSync(
|
||||
Float32List facesList,
|
||||
int interpreterAddress, {
|
||||
FaceMlResult? resultBuilder,
|
||||
@@ -1212,6 +1137,48 @@ class FaceMlService {
|
||||
}
|
||||
}
|
||||
|
||||
bool _shouldDiscardRemoteEmbedding(FileMl fileMl) {
|
||||
if (fileMl.faceEmbedding.version < faceMlVersion) {
|
||||
debugPrint("Discarding remote embedding for fileID ${fileMl.fileID} "
|
||||
"because version is ${fileMl.faceEmbedding.version} and we need $faceMlVersion");
|
||||
return true;
|
||||
}
|
||||
// are all landmarks equal?
|
||||
bool allLandmarksEqual = true;
|
||||
if (fileMl.faceEmbedding.faces.isEmpty) {
|
||||
debugPrint("No face for ${fileMl.fileID}");
|
||||
allLandmarksEqual = false;
|
||||
}
|
||||
for (final face in fileMl.faceEmbedding.faces) {
|
||||
if (face.detection.landmarks.isEmpty) {
|
||||
allLandmarksEqual = false;
|
||||
break;
|
||||
}
|
||||
if (face.detection.landmarks
|
||||
.any((landmark) => landmark.x != landmark.y)) {
|
||||
allLandmarksEqual = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (allLandmarksEqual) {
|
||||
debugPrint("Discarding remote embedding for fileID ${fileMl.fileID} "
|
||||
"because landmarks are equal");
|
||||
debugPrint(
|
||||
fileMl.faceEmbedding.faces
|
||||
.map((e) => e.detection.landmarks.toString())
|
||||
.toList()
|
||||
.toString(),
|
||||
);
|
||||
return true;
|
||||
}
|
||||
if (fileMl.width == null || fileMl.height == null) {
|
||||
debugPrint("Discarding remote embedding for fileID ${fileMl.fileID} "
|
||||
"because width is null");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Checks if the ente file to be analyzed actually can be analyzed: it must be uploaded and in the correct format.
|
||||
void _checkEnteFileForID(EnteFile enteFile) {
|
||||
if (_skipAnalysisEnteFile(enteFile, <int, int>{})) {
|
||||
|
||||
@@ -149,7 +149,7 @@ class _FaceDebugSectionWidgetState extends State<FaceDebugSectionWidget> {
|
||||
onTap: () async {
|
||||
try {
|
||||
FaceMlService.instance.debugIndexingDisabled = false;
|
||||
unawaited(FaceMlService.instance.indexAndClusterAll());
|
||||
unawaited(FaceMlService.instance.runAllFaceML());
|
||||
} catch (e, s) {
|
||||
_logger.warning('indexAndClusterAll failed ', e, s);
|
||||
await showGenericErrorDialog(context: context, error: e);
|
||||
|
||||
@@ -223,7 +223,8 @@ class _MachineLearningSettingsPageState
|
||||
final isEnabled =
|
||||
await LocalSettings.instance.toggleFaceIndexing();
|
||||
if (isEnabled) {
|
||||
unawaited(FaceMlService.instance.ensureInitialized());
|
||||
await FaceMlService.instance.init();
|
||||
unawaited(FaceMlService.instance.runAllFaceML(force: true));
|
||||
} else {
|
||||
FaceMlService.instance.pauseIndexingAndClustering();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user