[mob][photos] Isolate base mvp

This commit is contained in:
laurenspriem
2024-09-02 15:44:24 +02:00
parent da316fdcfd
commit 163c8161fc
2 changed files with 231 additions and 0 deletions

View File

@@ -0,0 +1,70 @@
import "dart:io" show File;
import 'dart:typed_data' show Uint8List;
import "package:logging/logging.dart";
import "package:photos/models/ml/face/box.dart";
import "package:photos/services/machine_learning/ml_model.dart";
import "package:photos/services/machine_learning/semantic_search/clip/clip_text_encoder.dart";
import "package:photos/services/machine_learning/semantic_search/clip/clip_text_tokenizer.dart";
import "package:photos/utils/image_ml_util.dart";
enum IsolateOperation {
/// [MLComputer]
generateFaceThumbnails,
/// [MLComputer]
loadModel,
/// [MLComputer]
initializeClipTokenizer,
/// [MLComputer]
runClipText,
/// [MLComputer]
testLogging,
}
/// WARNING: Only return primitives unless you know the method is only going
/// to be used on regular isolates as opposed to DartUI and Flutter isolates
/// https://api.flutter.dev/flutter/dart-isolate/SendPort/send.html
Future<dynamic> isolateFunction(
IsolateOperation function,
Map<String, dynamic> args,
) async {
switch (function) {
// Cases for MLComputer
case IsolateOperation.generateFaceThumbnails:
final imagePath = args['imagePath'] as String;
final Uint8List imageData = await File(imagePath).readAsBytes();
final faceBoxesJson = args['faceBoxesList'] as List<Map<String, dynamic>>;
final List<FaceBox> faceBoxes =
faceBoxesJson.map((json) => FaceBox.fromJson(json)).toList();
final List<Uint8List> results = await generateFaceThumbnailsUsingCanvas(
imageData,
faceBoxes,
);
return List.from(results);
case IsolateOperation.loadModel:
final modelName = args['modelName'] as String;
final modelPath = args['modelPath'] as String;
final int address = await MlModel.loadModel(
modelName,
modelPath,
);
return address;
case IsolateOperation.initializeClipTokenizer:
final vocabPath = args["vocabPath"] as String;
await ClipTextTokenizer.instance.init(vocabPath);
return true;
case IsolateOperation.runClipText:
//TODO:lau check logging here
final textEmbedding = await ClipTextEncoder.predict(args);
return List<double>.from(textEmbedding, growable: false);
case IsolateOperation.testLogging:
final logger = Logger('XXX MLComputerTestLogging');
logger.info("XXX logging from isolate is working!!!");
throw Exception("XXX logging from isolate testing exception handling");
return true;
}
}

View File

@@ -0,0 +1,161 @@
import 'dart:async';
import 'dart:isolate';
import "package:dart_ui_isolate/dart_ui_isolate.dart";
import "package:flutter/foundation.dart" show kDebugMode;
import "package:logging/logging.dart";
import "package:photos/core/error-reporting/isolate_logging.dart";
import "package:photos/services/isolate_functions.dart";
import "package:synchronized/synchronized.dart";
abstract class SuperIsolate {
static final Logger abstractLogger = Logger('SuperIsolate');
Logger get logger;
Timer? _inactivityTimer;
final Duration _inactivityDuration = const Duration(seconds: 120);
int _activeTasks = 0;
final _initLock = Lock();
final _functionLock = Lock();
bool get isDartUiIsolate;
bool get automaticDispose;
String get isolateName;
late dynamic _isolate;
late ReceivePort _receivePort;
late SendPort _mainSendPort;
bool get isIsolateSpawned => _isIsolateSpawned;
bool _isIsolateSpawned = false;
Future<void> _initIsolate() async {
return _initLock.synchronized(() async {
if (_isIsolateSpawned) return;
_receivePort = ReceivePort();
try {
_isolate = isDartUiIsolate
? await DartUiIsolate.spawn(
_isolateMain,
_receivePort.sendPort,
)
: await Isolate.spawn(
_isolateMain,
_receivePort.sendPort,
debugName: isolateName,
);
_mainSendPort = await _receivePort.first as SendPort;
if (automaticDispose) _resetInactivityTimer();
logger.info('initIsolate done');
_isIsolateSpawned = true;
} catch (e) {
logger.severe('Could not spawn isolate', e);
_isIsolateSpawned = false;
}
});
}
@pragma('vm:entry-point')
static void _isolateMain(SendPort mainSendPort) async {
Logger.root.level = kDebugMode ? Level.ALL : Level.INFO;
final IsolateLogger isolateLogger = IsolateLogger();
Logger.root.onRecord.listen(isolateLogger.onLogRecordInIsolate);
final receivePort = ReceivePort();
mainSendPort.send(receivePort.sendPort);
receivePort.listen((message) async {
final functionIndex = message[0] as int;
final function = IsolateOperation.values[functionIndex];
final args = message[1] as Map<String, dynamic>;
final sendPort = message[2] as SendPort;
late final Object data;
try {
data = await isolateFunction(function, args);
} catch (e, stackTrace) {
data = {
'error': e.toString(),
'stackTrace': stackTrace.toString(),
};
}
final logs = List<String>.from(isolateLogger.getLogStringsAndClear());
sendPort.send({"data": data, "logs": logs});
});
}
/// The common method to run any operation in the isolate. It sends the [message] to [_isolateMain] and waits for the result.
Future<dynamic> _runInIsolate(
(int, Map<String, dynamic>) message,
) async {
await _initIsolate();
return _functionLock.synchronized(() async {
if (automaticDispose) _resetInactivityTimer();
if (_postLockStop()) {
return null;
}
final completer = Completer<dynamic>();
final answerPort = ReceivePort();
_activeTasks++;
_mainSendPort.send([message.$1, message.$2, answerPort.sendPort]);
answerPort.listen((receivedMessage) {
final logs = receivedMessage['logs'] as List<String>;
IsolateLogger.handLogStringsToMainLogger(logs);
final data = receivedMessage['data'];
if (data is Map && data.containsKey('error')) {
// Handle the error
final errorMessage = data['error'];
final errorStackTrace = data['stackTrace'];
final exception = Exception(errorMessage);
final stackTrace = StackTrace.fromString(errorStackTrace);
completer.completeError(exception, stackTrace);
} else {
completer.complete(data);
}
});
_activeTasks--;
return completer.future;
});
}
bool _postLockStop() => false;
/// Resets a timer that kills the isolate after a certain amount of inactivity.
///
/// Should be called after initialization (e.g. inside `init()`) and after every call to isolate (e.g. inside `_runInIsolate()`)
void _resetInactivityTimer() {
_inactivityTimer?.cancel();
_inactivityTimer = Timer(_inactivityDuration, () {
if (_activeTasks > 0) {
logger.info('Tasks are still running. Delaying isolate disposal.');
// Optionally, reschedule the timer to check again later.
_resetInactivityTimer();
} else {
logger.info(
'Clustering Isolate has been inactive for ${_inactivityDuration.inSeconds} seconds with no tasks running. Killing isolate.',
);
_disposeIsolate();
}
});
}
Future<void> _onDispose();
void _disposeIsolate() async {
if (!_isIsolateSpawned) return;
logger.info('Disposing isolate');
await _onDispose();
_isIsolateSpawned = false;
_isolate.kill();
_receivePort.close();
_inactivityTimer?.cancel();
}
}