Merge pull request #9 from 0pen-dash/adrian/rx-retry
Retry for Observables
This commit is contained in:
commit
ae8e944c3f
3 changed files with 183 additions and 1 deletions
|
@ -2,6 +2,7 @@ package info.nightscout.androidaps.utils.rx
|
|||
|
||||
import io.reactivex.Completable
|
||||
import io.reactivex.Flowable
|
||||
import io.reactivex.Observable
|
||||
import io.reactivex.Single
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.math.pow
|
||||
|
@ -40,4 +41,22 @@ fun Completable.retryExponentialBackoff(retries: Int, time: Long, timeUnit: Time
|
|||
).flatMap { retryCount: Int ->
|
||||
Flowable.timer(time * 2.toDouble().pow(retryCount.toDouble()).toLong(), timeUnit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inline fun <reified T> Observable<T>.retryWithBackoff(
|
||||
retries: Int,
|
||||
delay: Long,
|
||||
timeUnit: TimeUnit,
|
||||
delayFactor: Double = 1.0
|
||||
): Observable<T> = this.retryWhen {
|
||||
it.zipWith(Observable.range(0, retries + 1), { throwable: Throwable, count: Int ->
|
||||
if (count >= retries) {
|
||||
throw throwable
|
||||
} else {
|
||||
count
|
||||
}
|
||||
}).flatMap { retryCount: Int ->
|
||||
val actualDelay = (timeUnit.toMillis(delay) * delayFactor.pow(retryCount.toDouble())).toLong()
|
||||
Observable.timer(actualDelay, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
package info.nightscout.androidaps.utils.rx
|
||||
|
||||
import io.reactivex.Observable
|
||||
import io.reactivex.observers.TestObserver
|
||||
import io.reactivex.schedulers.TestScheduler
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
class RxExtensionsTest {
|
||||
|
||||
private val testScheduler = TestScheduler()
|
||||
|
||||
@get:Rule
|
||||
val schedulerRule = RxSchedulerRule(testScheduler)
|
||||
|
||||
@Test
|
||||
fun `fail after 4 retries`() {
|
||||
val atomicInteger = AtomicInteger()
|
||||
val testObservable: TestObserver<Int> = succeedOnObservable(atomicInteger, 5)
|
||||
.retryWithBackoff(4, 1, TimeUnit.SECONDS)
|
||||
.test()
|
||||
assertEquals(1, atomicInteger.get()) // 1st failure
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS) // 2nd, 3rd, 4th failure
|
||||
assertEquals(4, atomicInteger.get())
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS) // 5th failure on 4th retry
|
||||
assertEquals(5, atomicInteger.get())
|
||||
testObservable.assertError(RuntimeException::class.java)
|
||||
testObservable.assertNever(1)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `succeed after 4 retries`() {
|
||||
val atomicInteger = AtomicInteger()
|
||||
val testObservable: TestObserver<Int> = succeedOnObservable(atomicInteger, 4)
|
||||
.retryWithBackoff(4, 1, TimeUnit.SECONDS)
|
||||
.test()
|
||||
assertEquals(1, atomicInteger.get()) // 1st failure
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS) // 2nd, 3rd, 4th failure
|
||||
assertEquals(4, atomicInteger.get())
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS) // 5th is a charm
|
||||
assertEquals(5, atomicInteger.get())
|
||||
testObservable.assertValue(1)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `succeed after 4 retries with delay factor`() {
|
||||
val atomicInteger = AtomicInteger()
|
||||
val testObservable: TestObserver<Int> = succeedOnObservable(atomicInteger, 4)
|
||||
.retryWithBackoff(4, 1, TimeUnit.SECONDS, delayFactor = 1.2)
|
||||
.test()
|
||||
assertEquals(1, atomicInteger.get()) // 1st failure
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(999, TimeUnit.MILLISECONDS)
|
||||
assertEquals(1, atomicInteger.get())
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) //1st retry after 1 second
|
||||
assertEquals(2, atomicInteger.get())
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(1199, TimeUnit.MILLISECONDS)
|
||||
assertEquals(2, atomicInteger.get())
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) //2nd retry after 1.2 seconds more
|
||||
assertEquals(3, atomicInteger.get())
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(1439, TimeUnit.MILLISECONDS)
|
||||
assertEquals(3, atomicInteger.get())
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS) //3rd retry after 1.44 seconds more
|
||||
assertEquals(4, atomicInteger.get())
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
testScheduler.advanceTimeBy(1726, TimeUnit.MILLISECONDS)
|
||||
assertEquals(4, atomicInteger.get())
|
||||
testObservable.assertNotComplete()
|
||||
testObservable.assertNotTerminated()
|
||||
testObservable.assertNever(1)
|
||||
|
||||
//4th retry = 5th try is a charm after 1.728 seconds more - rounding error by 1 millisecond!!
|
||||
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS)
|
||||
assertEquals(5, atomicInteger.get())
|
||||
testObservable.assertValue(1)
|
||||
}
|
||||
|
||||
private fun succeedOnObservable(atomicInteger: AtomicInteger, initialFailures: Int): Observable<Int> =
|
||||
Observable.defer {
|
||||
if (atomicInteger.incrementAndGet() == initialFailures + 1) {
|
||||
Observable.just(1)
|
||||
} else {
|
||||
Observable.error(RuntimeException())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package info.nightscout.androidaps.utils.rx
|
||||
|
||||
import io.reactivex.Scheduler
|
||||
import io.reactivex.android.plugins.RxAndroidPlugins
|
||||
import io.reactivex.plugins.RxJavaPlugins
|
||||
import org.junit.rules.TestRule
|
||||
import org.junit.runner.Description
|
||||
import org.junit.runners.model.Statement
|
||||
|
||||
class RxSchedulerRule(val scheduler: Scheduler) : TestRule {
|
||||
|
||||
override fun apply(base: Statement, description: Description) =
|
||||
object : Statement() {
|
||||
override fun evaluate() {
|
||||
RxAndroidPlugins.reset()
|
||||
RxAndroidPlugins.setInitMainThreadSchedulerHandler { scheduler }
|
||||
RxJavaPlugins.reset()
|
||||
RxJavaPlugins.setIoSchedulerHandler { scheduler }
|
||||
RxJavaPlugins.setNewThreadSchedulerHandler { scheduler }
|
||||
RxJavaPlugins.setComputationSchedulerHandler { scheduler }
|
||||
|
||||
try {
|
||||
base.evaluate()
|
||||
} finally {
|
||||
RxJavaPlugins.reset()
|
||||
RxAndroidPlugins.reset()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue