Retry for Observables

This commit is contained in:
AdrianLxM 2021-02-27 01:21:03 +01:00
parent 5b128e6def
commit e0e1be01a3
3 changed files with 183 additions and 1 deletions

View file

@ -2,6 +2,7 @@ package info.nightscout.androidaps.utils.rx
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Observable
import io.reactivex.Single
import java.util.concurrent.TimeUnit
import kotlin.math.pow
@ -40,4 +41,22 @@ fun Completable.retryExponentialBackoff(retries: Int, time: Long, timeUnit: Time
).flatMap { retryCount: Int ->
Flowable.timer(time * 2.toDouble().pow(retryCount.toDouble()).toLong(), timeUnit)
}
}
}
inline fun <reified T> Observable<T>.retryWithBackoff(
retries: Int,
delay: Long,
timeUnit: TimeUnit,
delayFactor: Double = 1.0
): Observable<T> = this.retryWhen {
it.zipWith(Observable.range(0, retries + 1), { throwable: Throwable, count: Int ->
if (count >= retries) {
throw throwable
} else {
count
}
}).flatMap { retryCount: Int ->
val actualDelay = (timeUnit.toMillis(delay) * delayFactor.pow(retryCount.toDouble())).toLong()
Observable.timer(actualDelay, TimeUnit.MILLISECONDS)
}
}

View file

@ -0,0 +1,132 @@
package info.nightscout.androidaps.utils.rx
import io.reactivex.Observable
import io.reactivex.observers.TestObserver
import io.reactivex.schedulers.TestScheduler
import org.junit.Assert.assertEquals
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
class RxExtensionsTest {
private val testScheduler = TestScheduler()
@get:Rule
val schedulerRule = TestSchedulerRule(testScheduler)
@Test
fun `fail after 4 retries`() {
val atomicInteger = AtomicInteger()
val testObservable: TestObserver<Int> = succeedOnObservable(atomicInteger, 5)
.retryWithBackoff(4, 1, TimeUnit.SECONDS)
.test()
assertEquals(1, atomicInteger.get()) // 1st failure
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS) // 2nd, 3rd, 4th failure
assertEquals(4, atomicInteger.get())
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS) // 5th failure on 4th retry
assertEquals(5, atomicInteger.get())
testObservable.assertError(RuntimeException::class.java)
testObservable.assertNever(1)
}
@Test
fun `succeed after 4 retries`() {
val atomicInteger = AtomicInteger()
val testObservable: TestObserver<Int> = succeedOnObservable(atomicInteger, 4)
.retryWithBackoff(4, 1, TimeUnit.SECONDS)
.test()
assertEquals(1, atomicInteger.get()) // 1st failure
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS) // 2nd, 3rd, 4th failure
assertEquals(4, atomicInteger.get())
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS) // 5th is a charm
assertEquals(5, atomicInteger.get())
testObservable.assertValue(1)
}
@Test
fun `succeed after 4 retries with delay factor`() {
val atomicInteger = AtomicInteger()
val testObservable: TestObserver<Int> = succeedOnObservable(atomicInteger, 4)
.retryWithBackoff(4, 1, TimeUnit.SECONDS, delayFactor = 1.2)
.test()
assertEquals(1, atomicInteger.get()) // 1st failure
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(999, TimeUnit.MILLISECONDS)
assertEquals(1, atomicInteger.get())
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) //1st retry after 1 second
assertEquals(2, atomicInteger.get())
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(1199, TimeUnit.MILLISECONDS)
assertEquals(2, atomicInteger.get())
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) //2nd retry after 1.2 seconds more
assertEquals(3, atomicInteger.get())
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(1439, TimeUnit.MILLISECONDS)
assertEquals(3, atomicInteger.get())
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) //3rd retry after 1.44 seconds more
assertEquals(4, atomicInteger.get())
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
testScheduler.advanceTimeBy(1726, TimeUnit.MILLISECONDS)
assertEquals(4, atomicInteger.get())
testObservable.assertNotComplete()
testObservable.assertNotTerminated()
testObservable.assertNever(1)
//4th retry = 5th try is a charm after 1.728 seconds more - rounding error by 1 millisecond!!
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS)
assertEquals(5, atomicInteger.get())
testObservable.assertValue(1)
}
private fun succeedOnObservable(atomicInteger: AtomicInteger, initialFailures: Int): Observable<Int> =
Observable.defer {
if (atomicInteger.incrementAndGet() == initialFailures + 1) {
Observable.just(1)
} else {
Observable.error(RuntimeException())
}
}
}

View file

@ -0,0 +1,31 @@
package info.nightscout.androidaps.utils.rx
import io.reactivex.Scheduler
import io.reactivex.android.plugins.RxAndroidPlugins
import io.reactivex.plugins.RxJavaPlugins
import org.junit.rules.TestRule
import org.junit.runner.Description
import org.junit.runners.model.Statement
class TestSchedulerRule(val scheduler: Scheduler) : TestRule {
override fun apply(base: Statement, description: Description) =
object : Statement() {
override fun evaluate() {
RxAndroidPlugins.reset()
RxAndroidPlugins.setInitMainThreadSchedulerHandler { scheduler }
RxJavaPlugins.reset()
RxJavaPlugins.setIoSchedulerHandler { scheduler }
RxJavaPlugins.setNewThreadSchedulerHandler { scheduler }
RxJavaPlugins.setComputationSchedulerHandler { scheduler }
try {
base.evaluate()
} finally {
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
}
}
}
}