- Implemented bolus cancellation

- Improved bolus progress report
- Replaced delayed callbacks in OmnipodManager with RxJava
This commit is contained in:
Bart Sopers 2019-12-05 22:11:11 +01:00
parent 342765c5a2
commit ea5ff4cc3a
6 changed files with 204 additions and 113 deletions

View file

@ -8,8 +8,6 @@ import org.slf4j.LoggerFactory;
import java.util.EnumSet;
import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import info.nightscout.androidaps.logging.L;
@ -47,10 +45,14 @@ import info.nightscout.androidaps.plugins.pump.omnipod.exception.NonceOutOfSyncE
import info.nightscout.androidaps.plugins.pump.omnipod.exception.OmnipodException;
import info.nightscout.androidaps.plugins.pump.omnipod.util.OmnipodConst;
import info.nightscout.androidaps.utils.SP;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.subjects.SingleSubject;
import static info.nightscout.androidaps.plugins.pump.omnipod.util.OmnipodConst.AVERAGE_BOLUS_COMMAND_COMMUNICATION_DURATION;
public class OmnipodManager {
private static final int ACTION_VERIFICATION_TRIES = 3;
@ -61,6 +63,9 @@ public class OmnipodManager {
private final PodStateChangedHandler podStateChangedHandler;
protected PodSessionState podState;
private ActiveBolusData activeBolusData;
private final Object bolusDataLock = new Object();
public OmnipodManager(OmnipodCommunicationService communicationService, PodSessionState podState,
PodStateChangedHandler podStateChangedHandler) {
if (communicationService == null) {
@ -78,27 +83,26 @@ public class OmnipodManager {
this(communicationService, podState, null);
}
// After priming should have been finished, the pod state is verified.
// The result of that verification is passed to the SetupActionResultHandler
public void pairAndPrime(SetupActionResultHandler resultHandler) {
public synchronized Single<SetupActionResult> pairAndPrime() {
if (podState == null) {
podState = communicationService.executeAction(
new PairAction(new PairService(), podStateChangedHandler));
}
if (podState.getSetupProgress().isBefore(SetupProgress.PRIMING_FINISHED)) {
communicationService.executeAction(new PrimeAction(new PrimeService(), podState));
executeDelayed(() -> verifySetupAction(statusResponse -> PrimeAction.updatePrimingStatus(podState, statusResponse), //
SetupProgress.PRIMING_FINISHED, resultHandler), //
calculateBolusDuration(OmnipodConst.POD_PRIME_BOLUS_UNITS, OmnipodConst.POD_PRIMING_DELIVERY_RATE));
} else {
if (!podState.getSetupProgress().isBefore(SetupProgress.PRIMING_FINISHED)) {
throw new IllegalSetupProgressException(SetupProgress.ADDRESS_ASSIGNED, podState.getSetupProgress());
}
communicationService.executeAction(new PrimeAction(new PrimeService(), podState));
long delayInSeconds = calculateBolusDuration(OmnipodConst.POD_PRIME_BOLUS_UNITS, OmnipodConst.POD_PRIMING_DELIVERY_RATE).getStandardSeconds();
return Single.timer(delayInSeconds, TimeUnit.SECONDS) //
.map(o -> verifySetupAction(statusResponse ->
PrimeAction.updatePrimingStatus(podState, statusResponse), SetupProgress.PRIMING_FINISHED)) //
.observeOn(AndroidSchedulers.mainThread());
}
// After inserting the cannula should have been finished, the pod state is verified.
// The result of that verification is passed to the SetupActionResultHandler
public void insertCannula(BasalSchedule basalSchedule, SetupActionResultHandler resultHandler) {
public synchronized Single<SetupActionResult> insertCannula(BasalSchedule basalSchedule) {
if (podState == null || podState.getSetupProgress().isBefore(SetupProgress.PRIMING_FINISHED)) {
throw new IllegalSetupProgressException(SetupProgress.PRIMING_FINISHED, podState == null ? null : podState.getSetupProgress());
} else if (podState.getSetupProgress().isAfter(SetupProgress.CANNULA_INSERTING)) {
@ -107,12 +111,14 @@ public class OmnipodManager {
communicationService.executeAction(new InsertCannulaAction(new InsertCannulaService(), podState, basalSchedule));
executeDelayed(() -> verifySetupAction(statusResponse -> InsertCannulaAction.updateCannulaInsertionStatus(podState, statusResponse), //
SetupProgress.COMPLETED, resultHandler),
calculateBolusDuration(OmnipodConst.POD_CANNULA_INSERTION_BOLUS_UNITS, OmnipodConst.POD_CANNULA_INSERTION_DELIVERY_RATE));
long delayInSeconds = calculateBolusDuration(OmnipodConst.POD_CANNULA_INSERTION_BOLUS_UNITS, OmnipodConst.POD_CANNULA_INSERTION_DELIVERY_RATE).getStandardSeconds();
return Single.timer(delayInSeconds, TimeUnit.SECONDS) //
.map(o -> verifySetupAction(statusResponse ->
InsertCannulaAction.updateCannulaInsertionStatus(podState, statusResponse), SetupProgress.COMPLETED)) //
.observeOn(AndroidSchedulers.mainThread());
}
public StatusResponse getPodStatus() {
public synchronized StatusResponse getPodStatus() {
if (podState == null) {
throw new IllegalSetupProgressException(SetupProgress.PRIMING_FINISHED, null);
}
@ -120,26 +126,26 @@ public class OmnipodManager {
return communicationService.executeAction(new GetStatusAction(podState));
}
public PodInfoResponse getPodInfo(PodInfoType podInfoType) {
public synchronized PodInfoResponse getPodInfo(PodInfoType podInfoType) {
assertReadyForDelivery();
return communicationService.executeAction(new GetPodInfoAction(podState, podInfoType));
}
public void acknowledgeAlerts() {
public synchronized void acknowledgeAlerts() {
assertReadyForDelivery();
communicationService.executeAction(new AcknowledgeAlertsAction(podState, podState.getActiveAlerts()));
}
public void setBasalSchedule(BasalSchedule schedule) {
public synchronized void setBasalSchedule(BasalSchedule schedule) {
assertReadyForDelivery();
communicationService.executeAction(new SetBasalScheduleAction(podState, schedule,
false, podState.getScheduleOffset(), true));
}
public void setTemporaryBasal(TempBasalPair tempBasalPair) {
public synchronized void setTemporaryBasal(TempBasalPair tempBasalPair) {
assertReadyForDelivery();
communicationService.executeAction(new SetTempBasalAction(new SetTempBasalService(),
@ -147,88 +153,97 @@ public class OmnipodManager {
true, true));
}
public void cancelTemporaryBasal() {
public synchronized void cancelTemporaryBasal() {
assertReadyForDelivery();
communicationService.executeAction(new CancelDeliveryAction(podState, DeliveryType.TEMP_BASAL, true));
}
public Single<StatusResponse> bolus(Double units, BolusProgressIndicationConsumer progressIndicationConsumer) {
// Returns a SingleSubject that returns when the bolus has finished.
// When a bolus is cancelled, it will return after cancellation and report the estimated units delivered
public synchronized SingleSubject<BolusResult> bolus(Double units, BolusProgressIndicationConsumer progressIndicationConsumer) {
assertReadyForDelivery();
try {
communicationService.executeAction(new BolusAction(podState, units, true, true));
} catch (Exception ex) {
if (isCertainFailure(ex)) {
throw ex;
} else {
CommandVerificationResult verificationResult = verifyCommand();
switch (verificationResult) {
case CERTAIN_FAILURE:
if (ex instanceof OmnipodException) {
((OmnipodException) ex).setCertainFailure(true);
throw ex;
} else {
OmnipodException newException = new CommunicationException(CommunicationException.Type.UNEXPECTED_EXCEPTION, ex);
newException.setCertainFailure(true);
throw newException;
}
case UNCERTAIN_FAILURE:
throw ex;
case SUCCESS:
// Ignore original exception
break;
}
}
}
executeAndVerify(() -> communicationService.executeAction(new BolusAction(podState, units, true, true)));
DateTime startDate = DateTime.now().minus(AVERAGE_BOLUS_COMMAND_COMMUNICATION_DURATION);
CompositeDisposable disposables = new CompositeDisposable();
Duration bolusDuration = calculateBolusDuration(units, OmnipodConst.POD_BOLUS_DELIVERY_RATE);
Duration estimatedRemainingBolusDuration = bolusDuration.minus(AVERAGE_BOLUS_COMMAND_COMMUNICATION_DURATION);
if (progressIndicationConsumer != null) {
int numberOfProgressReports = 20;
long progressReportInterval = bolusDuration.getMillis() / numberOfProgressReports;
long progressReportInterval = estimatedRemainingBolusDuration.getMillis() / numberOfProgressReports;
disposables.add(Flowable.intervalRange(0, numberOfProgressReports, 0, progressReportInterval, TimeUnit.MILLISECONDS) //
disposables.add(Flowable.intervalRange(1, numberOfProgressReports, 0, progressReportInterval, TimeUnit.MILLISECONDS) //
.subscribe(count -> {
// TODO needs improvement
// take (average) radio communication time into account
double factor = (double)count / numberOfProgressReports;
// Round estimated unites delivered to pod pulse size 0.05
int roundingDivisor = (int) (1 / OmnipodConst.POD_PULSE_SIZE);
double estimatedUnitsDelivered = Math.round(factor * units * roundingDivisor) / roundingDivisor;
progressIndicationConsumer.accept(estimatedUnitsDelivered, (int) (factor * 100));
int percentage = (int) ((double) count / numberOfProgressReports * 100);
double estimatedUnitsDelivered = activeBolusData == null ? 0 : activeBolusData.estimateUnitsDelivered();
progressIndicationConsumer.accept(estimatedUnitsDelivered, percentage);
}));
}
return Single.create(emitter -> {
executeDelayed(() -> {
try {
StatusResponse statusResponse = getPodStatus();
if (statusResponse.getDeliveryStatus().isBolusing()) {
emitter.onError(new IllegalDeliveryStatusException(DeliveryStatus.NORMAL, statusResponse.getDeliveryStatus()));
} else {
emitter.onSuccess(statusResponse);
SingleSubject<BolusResult> bolusCompletionSubject = SingleSubject.create();
disposables.add(Completable.complete()
.delay(estimatedRemainingBolusDuration.getStandardSeconds(), TimeUnit.SECONDS)
.doOnComplete(() -> {
synchronized (bolusDataLock) {
for (int i = 0; i < ACTION_VERIFICATION_TRIES; i++) {
try {
// Retrieve a status response in order to update the pod state
StatusResponse statusResponse = getPodStatus();
if (statusResponse.getDeliveryStatus().isBolusing()) {
throw new IllegalDeliveryStatusException(DeliveryStatus.NORMAL, statusResponse.getDeliveryStatus());
} else {
break;
}
} catch (Exception ex) {
if (isLoggingEnabled()) {
LOG.debug("Ignoring exception in bolus completion verfication", ex);
}
}
}
if (activeBolusData != null) {
activeBolusData.bolusCompletionSubject.onSuccess(new BolusResult(units));
activeBolusData = null;
}
}
} catch (Exception ex) {
emitter.onError(ex);
}
}, bolusDuration);
});
})
.subscribe());
synchronized (bolusDataLock) {
activeBolusData = new ActiveBolusData(units, startDate, bolusCompletionSubject, disposables);
}
return bolusCompletionSubject;
}
public void cancelBolus() {
public synchronized void cancelBolus() {
assertReadyForDelivery();
communicationService.executeAction(new CancelDeliveryAction(podState, DeliveryType.BOLUS, true));
synchronized (bolusDataLock) {
if (activeBolusData == null) {
throw new IllegalDeliveryStatusException(DeliveryStatus.BOLUS_IN_PROGRESS, podState.getLastDeliveryStatus());
}
executeAndVerify(() -> communicationService.executeAction(new CancelDeliveryAction(podState, DeliveryType.BOLUS, true)));
activeBolusData.getDisposables().dispose();
activeBolusData.getBolusCompletionSubject().onSuccess(new BolusResult(activeBolusData.estimateUnitsDelivered()));
activeBolusData = null;
}
}
public void suspendDelivery() {
public synchronized void suspendDelivery() {
assertReadyForDelivery();
communicationService.executeAction(new CancelDeliveryAction(podState, EnumSet.allOf(DeliveryType.class), true));
}
public void resumeDelivery() {
public synchronized void resumeDelivery() {
assertReadyForDelivery();
communicationService.executeAction(new SetBasalScheduleAction(podState, podState.getBasalSchedule(),
@ -236,7 +251,7 @@ public class OmnipodManager {
}
// If this command fails, it it possible that delivery has been suspended
public void setTime() {
public synchronized void setTime() {
assertReadyForDelivery();
// Suspend delivery
@ -251,7 +266,7 @@ public class OmnipodManager {
true, podState.getScheduleOffset(), true));
}
public void deactivatePod() {
public synchronized void deactivatePod() {
if (podState == null) {
throw new IllegalSetupProgressException(SetupProgress.ADDRESS_ASSIGNED, null);
}
@ -285,9 +300,33 @@ public class OmnipodManager {
return podState == null ? "null" : podState.toString();
}
private void executeDelayed(Runnable r, Duration timeout) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(r, timeout.getMillis(), TimeUnit.MILLISECONDS);
// Only works for commands with nonce resyncable message blocks
private void executeAndVerify(Runnable runnable) {
try {
runnable.run();
} catch (Exception ex) {
if (isCertainFailure(ex)) {
throw ex;
} else {
CommandVerificationResult verificationResult = verifyCommand();
switch (verificationResult) {
case CERTAIN_FAILURE:
if (ex instanceof OmnipodException) {
((OmnipodException) ex).setCertainFailure(true);
throw ex;
} else {
OmnipodException newException = new CommunicationException(CommunicationException.Type.UNEXPECTED_EXCEPTION, ex);
newException.setCertainFailure(true);
throw newException;
}
case UNCERTAIN_FAILURE:
throw ex;
case SUCCESS:
// Ignore original exception
break;
}
}
}
}
private void assertReadyForDelivery() {
@ -296,7 +335,7 @@ public class OmnipodManager {
}
}
private void verifySetupAction(StatusResponseHandler setupActionResponseHandler, SetupProgress expectedSetupProgress, SetupActionResultHandler resultHandler) {
private SetupActionResult verifySetupAction(StatusResponseHandler setupActionResponseHandler, SetupProgress expectedSetupProgress) {
SetupActionResult result = null;
for (int i = 0; ACTION_VERIFICATION_TRIES > i; i++) {
try {
@ -316,9 +355,7 @@ public class OmnipodManager {
.exception(ex);
}
}
if (resultHandler != null) {
resultHandler.handle(result);
}
return result;
}
// Only works for commands which contain nonce resyncable message blocks
@ -353,21 +390,85 @@ public class OmnipodManager {
return L.isEnabled(L.PUMP);
}
public static Duration calculateBolusDuration(double units) {
return calculateBolusDuration(units, OmnipodConst.POD_BOLUS_DELIVERY_RATE);
}
private static Duration calculateBolusDuration(double units, double deliveryRate) {
return Duration.standardSeconds((long) Math.ceil(units / deliveryRate));
}
public static Duration calculateBolusDuration(double units) {
return calculateBolusDuration(units, OmnipodConst.POD_BOLUS_DELIVERY_RATE);
}
public static boolean isCertainFailure(Exception ex) {
return ex instanceof OmnipodException && ((OmnipodException) ex).isCertainFailure();
}
public static class BolusResult {
private final double unitsDelivered;
public BolusResult(double unitsDelivered) {
this.unitsDelivered = unitsDelivered;
}
public double getUnitsDelivered() {
return unitsDelivered;
}
}
private enum CommandVerificationResult {
SUCCESS,
CERTAIN_FAILURE,
UNCERTAIN_FAILURE
}
// TODO replace with Consumer when our min API level >= 24
@FunctionalInterface
private interface StatusResponseHandler {
void handle(StatusResponse statusResponse);
}
private static class ActiveBolusData {
private final double units;
private volatile DateTime startDate;
private volatile SingleSubject<BolusResult> bolusCompletionSubject;
private volatile CompositeDisposable disposables;
private ActiveBolusData(double units, DateTime startDate, SingleSubject<BolusResult> bolusCompletionSubject, CompositeDisposable disposables) {
this.units = units;
this.startDate = startDate;
this.bolusCompletionSubject = bolusCompletionSubject;
this.disposables = disposables;
}
public double getUnits() {
return units;
}
public DateTime getStartDate() {
return startDate;
}
public CompositeDisposable getDisposables() {
return disposables;
}
public SingleSubject<BolusResult> getBolusCompletionSubject() {
return bolusCompletionSubject;
}
public void setBolusCompletionSubject(SingleSubject<BolusResult> bolusCompletionSubject) {
this.bolusCompletionSubject = bolusCompletionSubject;
}
public double estimateUnitsDelivered() {
// TODO this needs improvement
// take (average) radio communication time into account
long elapsedMillis = new Duration(startDate, DateTime.now()).getMillis();
long totalDurationMillis = (long) (units / OmnipodConst.POD_BOLUS_DELIVERY_RATE * 1000);
double factor = (double) elapsedMillis / totalDurationMillis;
double estimatedUnits = Math.min(1D, factor) * units;
int roundingDivisor = (int) (1 / OmnipodConst.POD_PULSE_SIZE);
return (double) Math.round(estimatedUnits * roundingDivisor) / roundingDivisor;
}
}
}

View file

@ -1,7 +0,0 @@
package info.nightscout.androidaps.plugins.pump.omnipod.comm;
// TODO replace with Consumer when our min API level >= 24
@FunctionalInterface
public interface SetupActionResultHandler {
void handle(SetupActionResult result);
}

View file

@ -1,9 +0,0 @@
package info.nightscout.androidaps.plugins.pump.omnipod.comm;
import info.nightscout.androidaps.plugins.pump.omnipod.comm.message.response.StatusResponse;
// TODO replace with Consumer when our min API level >= 24
@FunctionalInterface
public interface StatusResponseHandler {
void handle(StatusResponse statusResponse);
}

View file

@ -48,6 +48,7 @@ import info.nightscout.androidaps.plugins.pump.omnipod.exception.PodFaultExcepti
import info.nightscout.androidaps.plugins.pump.omnipod.exception.PodReturnedErrorResponseException;
import info.nightscout.androidaps.plugins.pump.omnipod.util.OmnipodUtil;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
public class AapsOmnipodManager implements OmnipodCommunicationManagerInterface {
private static final Logger LOG = LoggerFactory.getLogger(L.PUMP);
@ -73,7 +74,7 @@ public class AapsOmnipodManager implements OmnipodCommunicationManagerInterface
public PumpEnactResult initPod(PodInitActionType podInitActionType, PodInitReceiver podInitReceiver, Profile profile) {
if (PodInitActionType.PairAndPrimeWizardStep.equals(podInitActionType)) {
try {
delegate.pairAndPrime(res -> //
Disposable disposable = delegate.pairAndPrime().subscribe(res -> //
handleSetupActionResult(podInitActionType, podInitReceiver, res));
return new PumpEnactResult().success(true).enacted(true);
} catch (Exception ex) {
@ -83,7 +84,7 @@ public class AapsOmnipodManager implements OmnipodCommunicationManagerInterface
}
} else if (PodInitActionType.FillCannulaSetBasalProfileWizardStep.equals(podInitActionType)) {
try {
delegate.insertCannula(mapProfileToBasalSchedule(profile), res -> //
Disposable disposable = delegate.insertCannula(mapProfileToBasalSchedule(profile)).subscribe(res -> //
handleSetupActionResult(podInitActionType, podInitReceiver, res));
return new PumpEnactResult().success(true).enacted(true);
} catch (Exception ex) {
@ -150,7 +151,7 @@ public class AapsOmnipodManager implements OmnipodCommunicationManagerInterface
@Override
public PumpEnactResult setBolus(Double units) {
try {
Single<StatusResponse> responseObserver = delegate.bolus(units,
Single<OmnipodManager.BolusResult> responseObserver = delegate.bolus(units,
(estimatedUnitsDelivered, percentage) -> {
EventOverviewBolusProgress progressUpdateEvent = EventOverviewBolusProgress.INSTANCE;
progressUpdateEvent.setStatus(getStringResource(R.string.bolusdelivering, units));
@ -160,16 +161,19 @@ public class AapsOmnipodManager implements OmnipodCommunicationManagerInterface
// At this point, we know that the bolus command has been succesfully sent
double unitsDelivered = units;
try {
// Wait for the bolus to finish
StatusResponse statusResponse = responseObserver.blockingGet();
OmnipodManager.BolusResult bolusResult = responseObserver.blockingGet();
unitsDelivered = bolusResult.getUnitsDelivered();
} catch (Exception ex) {
if (loggingEnabled()) {
LOG.debug("Ignoring failed status response for bolus completion verification", ex);
}
}
return new PumpEnactResult().success(true).enacted(true);
return new PumpEnactResult().success(true).enacted(true).bolusDelivered(unitsDelivered);
} catch (Exception ex) {
// Sending the command failed
String comment = handleAndTranslateException(ex);
@ -179,7 +183,7 @@ public class AapsOmnipodManager implements OmnipodCommunicationManagerInterface
// TODO notify user about uncertain failure
// we don't know if the bolus failed, so for safety reasons, we choose to register the bolus as succesful.
// TODO also manually sleep until the bolus should have been finished here (after notifying the user)
return new PumpEnactResult().success(true).enacted(true).comment(comment);
return new PumpEnactResult().success(true).enacted(true).comment(comment).bolusDelivered(units);
}
}
}

View file

@ -31,7 +31,7 @@ public class OmnipodUIComm {
}
public synchronized OmnipodUITask executeCommand(OmnipodCommandType commandType, Object... parameters) {
public OmnipodUITask executeCommand(OmnipodCommandType commandType, Object... parameters) {
if (isLogEnabled())
LOG.warn("Execute Command: " + commandType.name());

View file

@ -35,6 +35,8 @@ public class OmnipodConst {
public static final Duration MAX_TEMP_BASAL_DURATION = Duration.standardHours(12);
public static final int DEFAULT_ADDRESS = 0xffffffff;
public static final Duration AVERAGE_BOLUS_COMMAND_COMMUNICATION_DURATION = Duration.standardSeconds(2);
public static final Duration SERVICE_DURATION = Duration.standardHours(80);
public static final Duration EXPIRATION_ADVISORY_WINDOW = Duration.standardHours(2);
public static final Duration END_OF_SERVICE_IMMINENT_WINDOW = Duration.standardHours(1);