From e0e1be01a35cc87889b240a09e84c039f0cadd91 Mon Sep 17 00:00:00 2001 From: AdrianLxM Date: Sat, 27 Feb 2021 01:21:03 +0100 Subject: [PATCH] Retry for Observables --- .../androidaps/utils/rx/RxExtensions.kt | 21 ++- .../androidaps/utils/rx/RxExtensionsTest.kt | 132 ++++++++++++++++++ .../androidaps/utils/rx/TestSchedulerRule.kt | 31 ++++ 3 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 core/src/test/java/info/nightscout/androidaps/utils/rx/RxExtensionsTest.kt create mode 100644 core/src/test/java/info/nightscout/androidaps/utils/rx/TestSchedulerRule.kt diff --git a/core/src/main/java/info/nightscout/androidaps/utils/rx/RxExtensions.kt b/core/src/main/java/info/nightscout/androidaps/utils/rx/RxExtensions.kt index 78e2db0e7a..aef2b17e18 100644 --- a/core/src/main/java/info/nightscout/androidaps/utils/rx/RxExtensions.kt +++ b/core/src/main/java/info/nightscout/androidaps/utils/rx/RxExtensions.kt @@ -2,6 +2,7 @@ package info.nightscout.androidaps.utils.rx import io.reactivex.Completable import io.reactivex.Flowable +import io.reactivex.Observable import io.reactivex.Single import java.util.concurrent.TimeUnit import kotlin.math.pow @@ -40,4 +41,22 @@ fun Completable.retryExponentialBackoff(retries: Int, time: Long, timeUnit: Time ).flatMap { retryCount: Int -> Flowable.timer(time * 2.toDouble().pow(retryCount.toDouble()).toLong(), timeUnit) } - } \ No newline at end of file + } + +inline fun Observable.retryWithBackoff( + retries: Int, + delay: Long, + timeUnit: TimeUnit, + delayFactor: Double = 1.0 +): Observable = this.retryWhen { + it.zipWith(Observable.range(0, retries + 1), { throwable: Throwable, count: Int -> + if (count >= retries) { + throw throwable + } else { + count + } + }).flatMap { retryCount: Int -> + val actualDelay = (timeUnit.toMillis(delay) * delayFactor.pow(retryCount.toDouble())).toLong() + Observable.timer(actualDelay, TimeUnit.MILLISECONDS) + } +} \ No newline at end of file diff --git a/core/src/test/java/info/nightscout/androidaps/utils/rx/RxExtensionsTest.kt b/core/src/test/java/info/nightscout/androidaps/utils/rx/RxExtensionsTest.kt new file mode 100644 index 0000000000..6e1ec2c7dc --- /dev/null +++ b/core/src/test/java/info/nightscout/androidaps/utils/rx/RxExtensionsTest.kt @@ -0,0 +1,132 @@ +package info.nightscout.androidaps.utils.rx + +import io.reactivex.Observable +import io.reactivex.observers.TestObserver +import io.reactivex.schedulers.TestScheduler +import org.junit.Assert.assertEquals +import org.junit.Rule +import org.junit.Test +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +class RxExtensionsTest { + + private val testScheduler = TestScheduler() + + @get:Rule + val schedulerRule = TestSchedulerRule(testScheduler) + + @Test + fun `fail after 4 retries`() { + val atomicInteger = AtomicInteger() + val testObservable: TestObserver = succeedOnObservable(atomicInteger, 5) + .retryWithBackoff(4, 1, TimeUnit.SECONDS) + .test() + assertEquals(1, atomicInteger.get()) // 1st failure + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(3, TimeUnit.SECONDS) // 2nd, 3rd, 4th failure + assertEquals(4, atomicInteger.get()) + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(1, TimeUnit.SECONDS) // 5th failure on 4th retry + assertEquals(5, atomicInteger.get()) + testObservable.assertError(RuntimeException::class.java) + testObservable.assertNever(1) + } + + @Test + fun `succeed after 4 retries`() { + val atomicInteger = AtomicInteger() + val testObservable: TestObserver = succeedOnObservable(atomicInteger, 4) + .retryWithBackoff(4, 1, TimeUnit.SECONDS) + .test() + assertEquals(1, atomicInteger.get()) // 1st failure + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(3, TimeUnit.SECONDS) // 2nd, 3rd, 4th failure + assertEquals(4, atomicInteger.get()) + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(1, TimeUnit.SECONDS) // 5th is a charm + assertEquals(5, atomicInteger.get()) + testObservable.assertValue(1) + } + + @Test + fun `succeed after 4 retries with delay factor`() { + val atomicInteger = AtomicInteger() + val testObservable: TestObserver = succeedOnObservable(atomicInteger, 4) + .retryWithBackoff(4, 1, TimeUnit.SECONDS, delayFactor = 1.2) + .test() + assertEquals(1, atomicInteger.get()) // 1st failure + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(999, TimeUnit.MILLISECONDS) + assertEquals(1, atomicInteger.get()) + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) //1st retry after 1 second + assertEquals(2, atomicInteger.get()) + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(1199, TimeUnit.MILLISECONDS) + assertEquals(2, atomicInteger.get()) + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) //2nd retry after 1.2 seconds more + assertEquals(3, atomicInteger.get()) + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(1439, TimeUnit.MILLISECONDS) + assertEquals(3, atomicInteger.get()) + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) //3rd retry after 1.44 seconds more + assertEquals(4, atomicInteger.get()) + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + testScheduler.advanceTimeBy(1726, TimeUnit.MILLISECONDS) + assertEquals(4, atomicInteger.get()) + testObservable.assertNotComplete() + testObservable.assertNotTerminated() + testObservable.assertNever(1) + + //4th retry = 5th try is a charm after 1.728 seconds more - rounding error by 1 millisecond!! + testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) + assertEquals(5, atomicInteger.get()) + testObservable.assertValue(1) + } + + private fun succeedOnObservable(atomicInteger: AtomicInteger, initialFailures: Int): Observable = + Observable.defer { + if (atomicInteger.incrementAndGet() == initialFailures + 1) { + Observable.just(1) + } else { + Observable.error(RuntimeException()) + } + } + +} \ No newline at end of file diff --git a/core/src/test/java/info/nightscout/androidaps/utils/rx/TestSchedulerRule.kt b/core/src/test/java/info/nightscout/androidaps/utils/rx/TestSchedulerRule.kt new file mode 100644 index 0000000000..258c89fc91 --- /dev/null +++ b/core/src/test/java/info/nightscout/androidaps/utils/rx/TestSchedulerRule.kt @@ -0,0 +1,31 @@ +package info.nightscout.androidaps.utils.rx + +import io.reactivex.Scheduler +import io.reactivex.android.plugins.RxAndroidPlugins +import io.reactivex.plugins.RxJavaPlugins +import org.junit.rules.TestRule +import org.junit.runner.Description +import org.junit.runners.model.Statement + +class TestSchedulerRule(val scheduler: Scheduler) : TestRule { + + override fun apply(base: Statement, description: Description) = + object : Statement() { + override fun evaluate() { + RxAndroidPlugins.reset() + RxAndroidPlugins.setInitMainThreadSchedulerHandler { scheduler } + RxJavaPlugins.reset() + RxJavaPlugins.setIoSchedulerHandler { scheduler } + RxJavaPlugins.setNewThreadSchedulerHandler { scheduler } + RxJavaPlugins.setComputationSchedulerHandler { scheduler } + + try { + base.evaluate() + } finally { + RxJavaPlugins.reset() + RxAndroidPlugins.reset() + } + + } + } +} \ No newline at end of file