Segmental upload

This commit is contained in:
TebbeUbben 2020-09-04 23:11:11 +02:00
parent 354e77e733
commit 71b13917c4
3 changed files with 24 additions and 6 deletions

View file

@ -2069,9 +2069,13 @@ public class DatabaseHelper extends OrmLiteSqliteOpenHelper {
} }
} }
public List<OHQueueItem> getAllOHQueueItems() { public List<OHQueueItem> getAllOHQueueItems(Long maxEntries) {
try { try {
return getDaoOpenHumansQueue().queryForAll(); return getDaoOpenHumansQueue()
.queryBuilder()
.orderBy("id", true)
.limit(maxEntries)
.query();
} catch (SQLException e) { } catch (SQLException e) {
aapsLogger.error("Unhandled exception", e); aapsLogger.error("Unhandled exception", e);
} }

View file

@ -42,7 +42,7 @@ class OHUploadWorker(context: Context, workerParameters: WorkerParameters)
val isConnectedToWifi = wifiManager?.isWifiEnabled ?: false && wifiManager?.connectionInfo?.networkId != -1 val isConnectedToWifi = wifiManager?.isWifiEnabled ?: false && wifiManager?.connectionInfo?.networkId != -1
if (!wifiOnly || (wifiOnly && isConnectedToWifi)) { if (!wifiOnly || (wifiOnly && isConnectedToWifi)) {
setForegroundAsync(createForegroundInfo()) setForegroundAsync(createForegroundInfo())
openHumansUploader.uploadData() openHumansUploader.uploadDataSegmentally()
.andThen(Single.just(Result.success())) .andThen(Single.just(Result.success()))
.onErrorResumeNext { Single.just(Result.retry()) } .onErrorResumeNext { Single.just(Result.retry()) }
} else { } else {

View file

@ -80,6 +80,7 @@ class OpenHumansUploader @Inject constructor(
private const val SUCCESS_NOTIFICATION_ID = 3124 private const val SUCCESS_NOTIFICATION_ID = 3124
private const val SIGNED_OUT_NOTIFICATION_ID = 3125 private const val SIGNED_OUT_NOTIFICATION_ID = 3125
const val UPLOAD_NOTIFICATION_ID = 3126 const val UPLOAD_NOTIFICATION_ID = 3126
private const val UPLOAD_SEGMENT_SIZE = 10000L
} }
private val openHumansAPI = OpenHumansAPI(OPEN_HUMANS_URL, CLIENT_ID, CLIENT_SECRET, REDIRECT_URL) private val openHumansAPI = OpenHumansAPI(OPEN_HUMANS_URL, CLIENT_ID, CLIENT_SECRET, REDIRECT_URL)
@ -440,7 +441,20 @@ class OpenHumansUploader @Inject constructor(
notificationManager.notify(FAILURE_NOTIFICATION_ID, notification) notificationManager.notify(FAILURE_NOTIFICATION_ID, notification)
} }
fun uploadData(): Completable = gatherData() fun uploadDataSegmentally(): Completable =
uploadData(UPLOAD_SEGMENT_SIZE)
.repeatUntil { MainApp.getDbHelper().ohQueueSize == 0L }
.doOnSubscribe {
aapsLogger.info(LTag.OHUPLOADER, "Starting segmental upload")
}
.doOnComplete {
aapsLogger.info(LTag.OHUPLOADER, "Segmental upload successful")
}
.doOnError {
aapsLogger.error(LTag.OHUPLOADER, "Segmental upload erroneous", it)
}
fun uploadData(maxEntries: Long?): Completable = gatherData(maxEntries)
.flatMap { data -> refreshAccessTokensIfNeeded().map { accessToken -> accessToken to data } } .flatMap { data -> refreshAccessTokensIfNeeded().map { accessToken -> accessToken to data } }
.flatMap { uploadFile(it.first, it.second).andThen(Single.just(it.second)) } .flatMap { uploadFile(it.first, it.second).andThen(Single.just(it.second)) }
.flatMapCompletable { .flatMapCompletable {
@ -481,8 +495,8 @@ class OpenHumansUploader @Inject constructor(
} }
} }
private fun gatherData() = Single.defer { private fun gatherData(maxEntries: Long?) = Single.defer {
val items = MainApp.getDbHelper().allOHQueueItems val items = MainApp.getDbHelper().getAllOHQueueItems(maxEntries)
val baos = ByteArrayOutputStream() val baos = ByteArrayOutputStream()
val zos = ZipOutputStream(baos) val zos = ZipOutputStream(baos)
val tags = mutableListOf<String>() val tags = mutableListOf<String>()