From f422e30a8ecd1c9788040e2eeb95bbba83c0d5dd Mon Sep 17 00:00:00 2001 From: laurenspriem Date: Sat, 27 Apr 2024 11:13:52 +0530 Subject: [PATCH] [mob][photos] Migrate fully to sqlite async for faces, removing sqflite fully --- mobile/lib/face/db.dart | 567 ++++++++++++++++++++++------------------ 1 file changed, 313 insertions(+), 254 deletions(-) diff --git a/mobile/lib/face/db.dart b/mobile/lib/face/db.dart index f54b3c6ef4..05450b9687 100644 --- a/mobile/lib/face/db.dart +++ b/mobile/lib/face/db.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import "dart:io" show Directory; import "dart:math"; import "package:collection/collection.dart"; @@ -14,7 +13,7 @@ import "package:photos/face/model/face.dart"; import "package:photos/models/file/file.dart"; import "package:photos/services/machine_learning/face_ml/face_clustering/face_info_for_clustering.dart"; import 'package:photos/services/machine_learning/face_ml/face_filtering/face_filtering_constants.dart'; -import 'package:sqflite/sqflite.dart'; +// import 'package:sqflite/sqflite.dart'; import 'package:sqlite_async/sqlite_async.dart' as sqlite_async; /// Stores all data for the ML-related features. The database can be accessed by `MlDataDB.instance.database`. @@ -33,75 +32,83 @@ class FaceMLDataDB { static final FaceMLDataDB instance = FaceMLDataDB._privateConstructor(); // only have a single app-wide reference to the database - static Future? _dbFuture; static Future? _sqliteAsyncDBFuture; - Future get database async { - _dbFuture ??= _initDatabase(); - return _dbFuture!; - } - - Future get sqliteAsyncDB async { + Future get asyncDB async { _sqliteAsyncDBFuture ??= _initSqliteAsyncDatabase(); return _sqliteAsyncDBFuture!; } - Future _initDatabase() async { + Future _initSqliteAsyncDatabase() async { final documentsDirectory = await getApplicationDocumentsDirectory(); - final String databaseDirectory = - join(documentsDirectory.path, _databaseName); - return await openDatabase( - databaseDirectory, - version: _databaseVersion, - onCreate: _onCreate, - ); - } - - Future _initSqliteAsyncDatabase() async { - final Directory documentsDirectory = - await getApplicationDocumentsDirectory(); final String databaseDirectory = join(documentsDirectory.path, _databaseName); _logger.info("Opening sqlite_async access: DB path " + databaseDirectory); - return sqlite_async.SqliteDatabase(path: databaseDirectory, maxReaders: 1); + final asyncDBConnection = + sqlite_async.SqliteDatabase(path: databaseDirectory, maxReaders: 2); + await _onCreate(asyncDBConnection); + return asyncDBConnection; } - Future _onCreate(Database db, int version) async { - await db.execute(createFacesTable); - await db.execute(createFaceClustersTable); - await db.execute(createClusterPersonTable); - await db.execute(createClusterSummaryTable); - await db.execute(createNotPersonFeedbackTable); - await db.execute(fcClusterIDIndex); + Future _onCreate(sqlite_async.SqliteDatabase asyncDBConnection) async { + final migrations = sqlite_async.SqliteMigrations() + ..add( + sqlite_async.SqliteMigration(_databaseVersion, (tx) async { + await tx.execute(createFacesTable); + await tx.execute(createFaceClustersTable); + await tx.execute(createClusterPersonTable); + await tx.execute(createClusterSummaryTable); + await tx.execute(createNotPersonFeedbackTable); + await tx.execute(fcClusterIDIndex); + }), + ); + await migrations.migrate(asyncDBConnection); } // bulkInsertFaces inserts the faces in the database in batches of 1000. // This is done to avoid the error "too many SQL variables" when inserting // a large number of faces. Future bulkInsertFaces(List faces) async { - final db = await instance.database; + final db = await instance.asyncDB; const batchSize = 500; final numBatches = (faces.length / batchSize).ceil(); for (int i = 0; i < numBatches; i++) { final start = i * batchSize; final end = min((i + 1) * batchSize, faces.length); final batch = faces.sublist(start, end); - final batchInsert = db.batch(); - for (final face in batch) { - batchInsert.insert( - facesTable, - mapRemoteToFaceDB(face), - conflictAlgorithm: ConflictAlgorithm.ignore, - ); - } - await batchInsert.commit(noResult: true); + + const String sql = ''' + INSERT INTO $facesTable ( + $fileIDColumn, $faceIDColumn, $faceDetectionColumn, $faceEmbeddingBlob, $faceScore, $faceBlur, $isSideways, $imageHeight, $imageWidth, $mlVersionColumn + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT($faceIDColumn, $mlVersionColumn) DO NOTHING + '''; + final parameterSets = batch.map((face) { + final map = mapRemoteToFaceDB(face); + return [ + map[fileIDColumn], + map[faceIDColumn], + map[faceDetectionColumn], + map[faceEmbeddingBlob], + map[faceScore], + map[faceBlur], + map[isSideways], + map[imageHeight], + map[imageWidth], + map[mlVersionColumn], + ]; + }).toList(); + + await db.writeTransaction((tx) async { + await tx.executeBatch(sql, parameterSets); + }); } } Future updateFaceIdToClusterId( Map faceIDToClusterID, ) async { - final db = await instance.database; + final db = await instance.asyncDB; const batchSize = 500; final numBatches = (faceIDToClusterID.length / batchSize).ceil(); for (int i = 0; i < numBatches; i++) { @@ -109,24 +116,21 @@ class FaceMLDataDB { final end = min((i + 1) * batchSize, faceIDToClusterID.length); final batch = faceIDToClusterID.entries.toList().sublist(start, end); - final batchUpdate = db.batch(); - - for (final entry in batch) { - final faceID = entry.key; - final clusterID = entry.value; - batchUpdate.insert( - faceClustersTable, - {fcClusterID: clusterID, fcFaceId: faceID}, - conflictAlgorithm: ConflictAlgorithm.replace, - ); - } - await batchUpdate.commit(noResult: true); + const String sql = ''' + INSERT INTO $faceClustersTable ($fcFaceId, $fcClusterID) + VALUES (?, ?) + ON CONFLICT($fcFaceId) DO UPDATE SET $fcClusterID = excluded.$fcClusterID + '''; + final parameterSets = batch.map((e) => [e.key, e.value]).toList(); + await db.writeTransaction((tx) async { + await tx.executeBatch(sql, parameterSets); + }); } } /// Returns a map of fileID to the indexed ML version Future> getIndexedFileIds() async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT $fileIDColumn, $mlVersionColumn FROM $facesTable', ); @@ -138,7 +142,7 @@ class FaceMLDataDB { } Future getIndexedFileCount() async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT COUNT(DISTINCT $fileIDColumn) as count FROM $facesTable', ); @@ -146,7 +150,7 @@ class FaceMLDataDB { } Future> clusterIdToFaceCount() async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT $fcClusterID, COUNT(*) as count FROM $faceClustersTable where $fcClusterID IS NOT NULL GROUP BY $fcClusterID ', ); @@ -158,7 +162,7 @@ class FaceMLDataDB { } Future> getPersonIgnoredClusters(String personID) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; // find out clusterIds that are assigned to other persons using the clusters table final List> maps = await db.getAll( 'SELECT $clusterIDColumn FROM $clusterPersonTable WHERE $personIdColumn != ? AND $personIdColumn IS NOT NULL', @@ -176,7 +180,7 @@ class FaceMLDataDB { } Future> getPersonClusterIDs(String personID) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT $clusterIDColumn FROM $clusterPersonTable WHERE $personIdColumn = ?', [personID], @@ -185,19 +189,22 @@ class FaceMLDataDB { } Future clearTable() async { - final db = await instance.database; - await db.delete(facesTable); - await db.delete(clusterPersonTable); - await db.delete(clusterSummaryTable); - await db.delete(personTable); - await db.delete(notPersonFeedback); + final db = await instance.asyncDB; + + await db.writeTransaction((tx) async { + await tx.execute(deleteFacesTable); + await tx.execute(dropClusterPersonTable); + await tx.execute(dropClusterSummaryTable); + await tx.execute(deletePersonTable); + await tx.execute(dropNotPersonFeedbackTable); + }); } Future> getFaceEmbeddingsForCluster( int clusterID, { int? limit, }) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT $faceEmbeddingBlob FROM $facesTable WHERE $faceIDColumn in (SELECT $fcFaceId from $faceClustersTable where $fcClusterID = ?) ${limit != null ? 'LIMIT $limit' : ''}', [clusterID], @@ -209,7 +216,7 @@ class FaceMLDataDB { Iterable clusterIDs, { int? limit, }) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final Map> result = {}; final selectQuery = ''' @@ -238,7 +245,7 @@ class FaceMLDataDB { int? clusterID, }) async { // read person from db - final db = await instance.database; + final db = await instance.asyncDB; if (personID != null) { final List fileId = [recentFileID]; int? avatarFileId; @@ -248,15 +255,18 @@ class FaceMLDataDB { fileId.add(avatarFileId); } } - final cluterRows = await db.query( - clusterPersonTable, - columns: [clusterIDColumn], - where: '$personIdColumn = ?', - whereArgs: [personID], + const String queryClusterID = ''' + SELECT $clusterIDColumn + FROM $clusterPersonTable + WHERE $personIdColumn = ? + '''; + final clusterRows = await db.getAll( + queryClusterID, + [personID], ); final clusterIDs = - cluterRows.map((e) => e[clusterIDColumn] as int).toList(); - final List> faceMaps = await db.rawQuery( + clusterRows.map((e) => e[clusterIDColumn] as int).toList(); + final List> faceMaps = await db.getAll( 'SELECT * FROM $facesTable where ' '$faceIDColumn in (SELECT $fcFaceId from $faceClustersTable where $fcClusterID IN (${clusterIDs.join(",")}))' 'AND $fileIDColumn in (${fileId.join(",")}) AND $faceScore > $kMinimumQualityFaceScore ORDER BY $faceScore DESC', @@ -274,11 +284,14 @@ class FaceMLDataDB { } } if (clusterID != null) { - final List> faceMaps = await db.query( - faceClustersTable, - columns: [fcFaceId], - where: '$fcClusterID = ?', - whereArgs: [clusterID], + const String queryFaceID = ''' + SELECT $fcFaceId + FROM $faceClustersTable + WHERE $fcClusterID = ? + '''; + final List> faceMaps = await db.getAll( + queryFaceID, + [clusterID], ); final List? faces = await getFacesForGivenFileID(recentFileID); if (faces != null) { @@ -297,22 +310,14 @@ class FaceMLDataDB { } Future?> getFacesForGivenFileID(int fileUploadID) async { - final db = await instance.database; - final List> maps = await db.query( - facesTable, - columns: [ - faceIDColumn, - fileIDColumn, - faceEmbeddingBlob, - faceScore, - faceDetectionColumn, - faceBlur, - imageHeight, - imageWidth, - mlVersionColumn, - ], - where: '$fileIDColumn = ?', - whereArgs: [fileUploadID], + final db = await instance.asyncDB; + const String query = ''' + SELECT * FROM $facesTable + WHERE $fileIDColumn = ? + '''; + final List> maps = await db.getAll( + query, + [fileUploadID], ); if (maps.isEmpty) { return null; @@ -321,7 +326,7 @@ class FaceMLDataDB { } Future getFaceForFaceID(String faceID) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final result = await db.getAll( 'SELECT * FROM $facesTable where $faceIDColumn = ?', [faceID], @@ -335,7 +340,7 @@ class FaceMLDataDB { Future>> getClusterToFaceIDs( Set clusterIDs, ) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final Map> result = {}; final List> maps = await db.getAll( 'SELECT $fcClusterID, $fcFaceId FROM $faceClustersTable WHERE $fcClusterID IN (${clusterIDs.join(",")})', @@ -349,7 +354,7 @@ class FaceMLDataDB { } Future getClusterIDForFaceID(String faceID) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT $fcClusterID FROM $faceClustersTable WHERE $fcFaceId = ?', [faceID], @@ -361,7 +366,7 @@ class FaceMLDataDB { } Future>> getAllClusterIdToFaceIDs() async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final Map> result = {}; final List> maps = await db.getAll( 'SELECT $fcClusterID, $fcFaceId FROM $faceClustersTable', @@ -375,7 +380,7 @@ class FaceMLDataDB { } Future> getFaceIDsForCluster(int clusterID) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT $fcFaceId FROM $faceClustersTable ' 'WHERE $faceClustersTable.$fcClusterID = ?', @@ -385,7 +390,7 @@ class FaceMLDataDB { } Future> getFaceIDsForPerson(String personID) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final faceIdsResult = await db.getAll( 'SELECT $fcFaceId FROM $faceClustersTable LEFT JOIN $clusterPersonTable ' 'ON $faceClustersTable.$fcClusterID = $clusterPersonTable.$clusterIDColumn ' @@ -396,7 +401,7 @@ class FaceMLDataDB { } Future> getBlurValuesForCluster(int clusterID) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; const String query = ''' SELECT $facesTable.$faceBlur FROM $facesTable @@ -418,7 +423,7 @@ class FaceMLDataDB { Future> getFaceIDsToBlurValues( int maxBlurValue, ) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT $faceIDColumn, $faceBlur FROM $facesTable WHERE $faceBlur < $maxBlurValue AND $faceBlur > 1 ORDER BY $faceBlur ASC', ); @@ -432,8 +437,8 @@ class FaceMLDataDB { Future> getFaceIdsToClusterIds( Iterable faceIds, ) async { - final db = await instance.database; - final List> maps = await db.rawQuery( + final db = await instance.asyncDB; + final List> maps = await db.getAll( 'SELECT $fcFaceId, $fcClusterID FROM $faceClustersTable where $fcFaceId IN (${faceIds.map((id) => "'$id'").join(",")})', ); final Map result = {}; @@ -445,8 +450,8 @@ class FaceMLDataDB { Future>> getFileIdToClusterIds() async { final Map> result = {}; - final db = await instance.database; - final List> maps = await db.rawQuery( + final db = await instance.asyncDB; + final List> maps = await db.getAll( 'SELECT $fcClusterID, $fcFaceId FROM $faceClustersTable', ); @@ -463,36 +468,33 @@ class FaceMLDataDB { Future forceUpdateClusterIds( Map faceIDToClusterID, ) async { - final db = await instance.database; + final db = await instance.asyncDB; - // Start a batch - final batch = db.batch(); - - for (final map in faceIDToClusterID.entries) { - final faceID = map.key; - final clusterID = map.value; - batch.insert( - faceClustersTable, - {fcFaceId: faceID, fcClusterID: clusterID}, - conflictAlgorithm: ConflictAlgorithm.replace, - ); - } - // Commit the batch - await batch.commit(noResult: true); + const String sql = ''' + INSERT INTO $faceClustersTable ($fcFaceId, $fcClusterID) + VALUES (?, ?) + ON CONFLICT($fcFaceId) DO UPDATE SET $fcClusterID = excluded.$fcClusterID + '''; + final parameterSets = + faceIDToClusterID.entries.map((e) => [e.key, e.value]).toList(); + await db.writeTransaction((tx) async { + await tx.executeBatch(sql, parameterSets); + }); } Future removePerson(String personID) async { - final db = await instance.database; - await db.delete( - clusterPersonTable, - where: '$personIdColumn = ?', - whereArgs: [personID], - ); - await db.delete( - notPersonFeedback, - where: '$personIdColumn = ?', - whereArgs: [personID], - ); + final db = await instance.asyncDB; + + await db.writeTransaction((tx) async { + await tx.execute( + 'DELETE FROM $clusterPersonTable WHERE $personIdColumn = ?', + [personID], + ); + await tx.execute( + 'DELETE FROM $notPersonFeedback WHERE $personIdColumn = ?', + [personID], + ); + }); } Future> getFaceInfoForClustering({ @@ -506,7 +508,7 @@ class FaceMLDataDB { w.logAndReset( 'reading as float offset: $offset, maxFaces: $maxFaces, batchSize: $batchSize', ); - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final Set result = {}; while (true) { @@ -561,7 +563,7 @@ class FaceMLDataDB { w.logAndReset( 'reading as float offset: $offset, maxFaces: $maxFaces, batchSize: $batchSize', ); - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final Map result = {}; while (true) { @@ -605,7 +607,7 @@ class FaceMLDataDB { List fileIDs, ) async { _logger.info('reading face embeddings for ${fileIDs.length} files'); - final db = await instance.database; + final db = await instance.asyncDB; // Define the batch size const batchSize = 10000; @@ -614,15 +616,23 @@ class FaceMLDataDB { final Map result = {}; while (true) { // Query a batch of rows - final List> maps = await db.query( - facesTable, - columns: [faceIDColumn, faceEmbeddingBlob], - where: - '$faceScore > $kMinimumQualityFaceScore AND $faceBlur > $kLaplacianHardThreshold AND $fileIDColumn IN (${fileIDs.join(",")})', - limit: batchSize, - offset: offset, - orderBy: '$faceIDColumn DESC', - ); + + final List> maps = await db.getAll(''' + SELECT $faceIDColumn, $faceEmbeddingBlob + FROM $facesTable + WHERE $faceScore > $kMinimumQualityFaceScore AND $faceBlur > $kLaplacianHardThreshold AND $fileIDColumn IN (${fileIDs.join(",")}) + ORDER BY $faceIDColumn DESC + LIMIT $batchSize OFFSET $offset + '''); + // final List> maps = await db.query( + // facesTable, + // columns: [faceIDColumn, faceEmbeddingBlob], + // where: + // '$faceScore > $kMinimumQualityFaceScore AND $faceBlur > $kLaplacianHardThreshold AND $fileIDColumn IN (${fileIDs.join(",")})', + // limit: batchSize, + // offset: offset, + // orderBy: '$faceIDColumn DESC', + // ); // Break the loop if no more rows if (maps.isEmpty) { break; @@ -644,7 +654,7 @@ class FaceMLDataDB { Iterable faceIDs, ) async { _logger.info('reading face embeddings for ${faceIDs.length} faces'); - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; // Define the batch size const batchSize = 10000; @@ -681,7 +691,7 @@ class FaceMLDataDB { Future getTotalFaceCount({ double minFaceScore = kMinimumQualityFaceScore, }) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT COUNT(*) as count FROM $facesTable WHERE $faceScore > $minFaceScore AND $faceBlur > $kLaplacianHardThreshold', ); @@ -689,7 +699,7 @@ class FaceMLDataDB { } Future getClusteredToTotalFacesRatio() async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> totalFacesMaps = await db.getAll( 'SELECT COUNT(*) as count FROM $facesTable WHERE $faceScore > $kMinimumQualityFaceScore AND $faceBlur > $kLaplacianHardThreshold', @@ -707,103 +717,111 @@ class FaceMLDataDB { Future getBlurryFaceCount([ int blurThreshold = kLaplacianHardThreshold, ]) async { - final db = await instance.database; - final List> maps = await db.rawQuery( + final db = await instance.asyncDB; + final List> maps = await db.getAll( 'SELECT COUNT(*) as count FROM $facesTable WHERE $faceBlur <= $blurThreshold AND $faceScore > $kMinimumQualityFaceScore', ); return maps.first['count'] as int; } Future resetClusterIDs() async { - final db = await instance.database; - await db.execute(dropFaceClustersTable); - await db.execute(createFaceClustersTable); - await db.execute(fcClusterIDIndex); + try { + final db = await instance.asyncDB; + + await db.writeTransaction((tx) async { + await tx.execute(dropFaceClustersTable); + await tx.execute(createFaceClustersTable); + await tx.execute(fcClusterIDIndex); + }); + } catch (e, s) { + _logger.severe('Error resetting clusterIDs', e, s); + } } Future assignClusterToPerson({ required String personID, required int clusterID, }) async { - final db = await instance.database; - await db.insert( - clusterPersonTable, - { - personIdColumn: personID, - clusterIDColumn: clusterID, - }, - ); + final db = await instance.asyncDB; + + const String sql = ''' + INSERT INTO $clusterPersonTable ($personIdColumn, $clusterIDColumn) VALUES (?, ?) ON CONFLICT($personIdColumn, $clusterIDColumn) DO NOTHING + '''; + await db.execute(sql, [personID, clusterID]); } Future bulkAssignClusterToPersonID( Map clusterToPersonID, ) async { - final db = await instance.database; - final batch = db.batch(); - for (final entry in clusterToPersonID.entries) { - final clusterID = entry.key; - final personID = entry.value; - batch.insert( - clusterPersonTable, - { - personIdColumn: personID, - clusterIDColumn: clusterID, - }, - conflictAlgorithm: ConflictAlgorithm.replace, - ); - } - await batch.commit(noResult: true); + final db = await instance.asyncDB; + + const String sql = ''' + INSERT INTO $clusterPersonTable ($personIdColumn, $clusterIDColumn) VALUES (?, ?) ON CONFLICT($personIdColumn, $clusterIDColumn) DO NOTHING + '''; + final parameterSets = + clusterToPersonID.entries.map((e) => [e.value, e.key]).toList(); + await db.writeTransaction((tx) async { + await tx.executeBatch(sql, parameterSets); + }); + // final batch = db.batch(); + // for (final entry in clusterToPersonID.entries) { + // final clusterID = entry.key; + // final personID = entry.value; + // batch.insert( + // clusterPersonTable, + // { + // personIdColumn: personID, + // clusterIDColumn: clusterID, + // }, + // conflictAlgorithm: ConflictAlgorithm.replace, + // ); + // } + // await batch.commit(noResult: true); } Future captureNotPersonFeedback({ required String personID, required int clusterID, }) async { - final db = await instance.database; - await db.insert( - notPersonFeedback, - { - personIdColumn: personID, - clusterIDColumn: clusterID, - }, - ); + final db = await instance.asyncDB; + + const String sql = ''' + INSERT INTO $notPersonFeedback ($personIdColumn, $clusterIDColumn) VALUES (?, ?) ON CONFLICT($personIdColumn, $clusterIDColumn) DO NOTHING + '''; + await db.execute(sql, [personID, clusterID]); } Future bulkCaptureNotPersonFeedback( Map clusterToPersonID, ) async { - final db = await instance.database; - final batch = db.batch(); - for (final entry in clusterToPersonID.entries) { - final clusterID = entry.key; - final personID = entry.value; - batch.insert( - notPersonFeedback, - { - personIdColumn: personID, - clusterIDColumn: clusterID, - }, - conflictAlgorithm: ConflictAlgorithm.replace, - ); - } - await batch.commit(noResult: true); + final db = await instance.asyncDB; + + const String sql = ''' + INSERT INTO $notPersonFeedback ($personIdColumn, $clusterIDColumn) VALUES (?, ?) ON CONFLICT($personIdColumn, $clusterIDColumn) DO NOTHING + '''; + final parameterSets = + clusterToPersonID.entries.map((e) => [e.value, e.key]).toList(); + + await db.writeTransaction((tx) async { + await tx.executeBatch(sql, parameterSets); + }); } - Future removeClusterToPerson({ + Future removeClusterToPerson({ required String personID, required int clusterID, }) async { - final db = await instance.database; - return db.delete( - clusterPersonTable, - where: '$personIdColumn = ? AND $clusterIDColumn = ?', - whereArgs: [personID, clusterID], - ); + final db = await instance.asyncDB; + + const String sql = ''' + DELETE FROM $clusterPersonTable WHERE $personIdColumn = ? AND $clusterIDColumn = ? + '''; + await db.execute(sql, [personID, clusterID]); } // for a given personID, return a map of clusterID to fileIDs using join query Future>> getFileIdToClusterIDSet(String personID) { - final db = instance.sqliteAsyncDB; + final db = instance.asyncDB; return db.then((db) async { final List> maps = await db.getAll( 'SELECT $faceClustersTable.$fcClusterID, $fcFaceId FROM $faceClustersTable ' @@ -826,7 +844,7 @@ class FaceMLDataDB { Future>> getFileIdToClusterIDSetForCluster( Set clusterIDs, ) { - final db = instance.sqliteAsyncDB; + final db = instance.asyncDB; return db.then((db) async { final List> maps = await db.getAll( 'SELECT $fcClusterID, $fcFaceId FROM $faceClustersTable ' @@ -844,37 +862,61 @@ class FaceMLDataDB { } Future clusterSummaryUpdate(Map summary) async { - final db = await instance.database; - var batch = db.batch(); + final db = await instance.asyncDB; + + const String sql = ''' + INSERT INTO $clusterSummaryTable ($clusterIDColumn, $avgColumn, $countColumn) VALUES (?, ?, ?) ON CONFLICT($clusterIDColumn) DO UPDATE SET $avgColumn = excluded.$avgColumn, $countColumn = excluded.$countColumn + '''; + final List> parameterSets = []; int batchCounter = 0; for (final entry in summary.entries) { if (batchCounter == 400) { - await batch.commit(noResult: true); - batch = db.batch(); + await db.writeTransaction((tx) async { + await tx.executeBatch(sql, parameterSets); + }); batchCounter = 0; + parameterSets.clear(); } - final int cluserID = entry.key; + final int clusterID = entry.key; final int count = entry.value.$2; final Uint8List avg = entry.value.$1; - batch.insert( - clusterSummaryTable, - { - clusterIDColumn: cluserID, - avgColumn: avg, - countColumn: count, - }, - conflictAlgorithm: ConflictAlgorithm.replace, - ); + parameterSets.add([clusterID, avg, count]); batchCounter++; } - await batch.commit(noResult: true); + await db.writeTransaction((tx) async { + await tx.executeBatch(sql, parameterSets); + }); + + // var batch = db.batch(); + // int batchCounter = 0; + // for (final entry in summary.entries) { + // if (batchCounter == 400) { + // await batch.commit(noResult: true); + // batch = db.batch(); + // batchCounter = 0; + // } + // final int cluserID = entry.key; + // final int count = entry.value.$2; + // final Uint8List avg = entry.value.$1; + // batch.insert( + // clusterSummaryTable, + // { + // clusterIDColumn: cluserID, + // avgColumn: avg, + // countColumn: count, + // }, + // conflictAlgorithm: ConflictAlgorithm.replace, + // ); + // batchCounter++; + // } + // await batch.commit(noResult: true); } /// Returns a map of clusterID to (avg embedding, count) Future> getAllClusterSummary([ int? minClusterSize, ]) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final Map result = {}; final rows = await db.getAll( 'SELECT * FROM $clusterSummaryTable${minClusterSize != null ? ' WHERE $countColumn >= $minClusterSize' : ''}', @@ -891,7 +933,7 @@ class FaceMLDataDB { Future> getClusterToClusterSummary( Iterable clusterIDs, ) async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final Map result = {}; final rows = await db.getAll( 'SELECT * FROM $clusterSummaryTable WHERE $clusterIDColumn IN (${clusterIDs.join(",")})', @@ -906,7 +948,7 @@ class FaceMLDataDB { } Future> getClusterIDToPersonID() async { - final db = await instance.sqliteAsyncDB; + final db = await instance.asyncDB; final List> maps = await db.getAll( 'SELECT $personIdColumn, $clusterIDColumn FROM $clusterPersonTable', ); @@ -919,43 +961,60 @@ class FaceMLDataDB { /// WARNING: This will delete ALL data in the database! Only use this for debug/testing purposes! Future dropClustersAndPersonTable({bool faces = false}) async { - final db = await instance.database; - if (faces) { - await db.execute(deleteFacesTable); - await db.execute(createFacesTable); - await db.execute(dropFaceClustersTable); - await db.execute(createFaceClustersTable); - await db.execute(fcClusterIDIndex); - } - await db.execute(deletePersonTable); - await db.execute(dropClusterPersonTable); - await db.execute(dropClusterSummaryTable); - await db.execute(dropNotPersonFeedbackTable); + try { + final db = await instance.asyncDB; + if (faces) { + await db.writeTransaction((tx) async { + await tx.execute(deleteFacesTable); + await tx.execute(createFacesTable); + await tx.execute(dropFaceClustersTable); + await tx.execute(createFaceClustersTable); + await tx.execute(fcClusterIDIndex); + }); + } - await db.execute(createClusterPersonTable); - await db.execute(createNotPersonFeedbackTable); - await db.execute(createClusterSummaryTable); + await db.writeTransaction((tx) async { + await tx.execute(deletePersonTable); + await tx.execute(dropClusterPersonTable); + await tx.execute(dropClusterSummaryTable); + await tx.execute(dropNotPersonFeedbackTable); + }); + + await db.writeTransaction((tx) async { + await tx.execute(createClusterPersonTable); + await tx.execute(createNotPersonFeedbackTable); + await tx.execute(createClusterSummaryTable); + }); + } catch (e, s) { + _logger.severe('Error dropping clusters and person table', e, s); + } } /// WARNING: This will delete ALL data in the database! Only use this for debug/testing purposes! Future dropFeedbackTables() async { - final db = await instance.database; + try { + final db = await instance.asyncDB; - await db.execute(deletePersonTable); - await db.execute(dropClusterPersonTable); - await db.execute(dropNotPersonFeedbackTable); - await db.execute(dropClusterSummaryTable); - await db.execute(createClusterPersonTable); - await db.execute(createNotPersonFeedbackTable); - await db.execute(createClusterSummaryTable); + await db.writeTransaction((tx) async { + await tx.execute(deletePersonTable); + await tx.execute(dropClusterPersonTable); + await tx.execute(dropNotPersonFeedbackTable); + await tx.execute(dropClusterSummaryTable); + await tx.execute(createClusterPersonTable); + await tx.execute(createNotPersonFeedbackTable); + await tx.execute(createClusterSummaryTable); + }); + } catch (e) { + _logger.severe('Error dropping feedback tables', e); + } } Future removeFilesFromPerson( List files, String personID, ) async { - final db = await instance.database; - final faceIdsResult = await db.rawQuery( + final db = await instance.asyncDB; + final faceIdsResult = await db.getAll( 'SELECT $fcFaceId FROM $faceClustersTable LEFT JOIN $clusterPersonTable ' 'ON $faceClustersTable.$fcClusterID = $clusterPersonTable.$clusterIDColumn ' 'WHERE $clusterPersonTable.$personIdColumn = ?', @@ -981,8 +1040,8 @@ class FaceMLDataDB { List files, int clusterID, ) async { - final db = await instance.database; - final faceIdsResult = await db.rawQuery( + final db = await instance.asyncDB; + final faceIdsResult = await db.getAll( 'SELECT $fcFaceId FROM $faceClustersTable ' 'WHERE $faceClustersTable.$fcClusterID = ?', [clusterID],