NSCv3: move WS to service

This commit is contained in:
Milos Kozak 2023-10-14 12:42:52 +02:00
parent 44b8b30637
commit ec8493405a
7 changed files with 411 additions and 298 deletions

View file

@ -8,6 +8,10 @@
android:name=".nsclient.services.NSClientService"
android:enabled="true"
android:exported="false" />
<service
android:name=".nsclientV3.services.NSClientV3Service"
android:enabled="true"
android:exported="false" />
<activity
android:name=".openhumans.ui.OHLoginActivity"
android:launchMode="singleTop"

View file

@ -16,6 +16,7 @@ import app.aaps.plugins.sync.nsclient.workers.NSClientAddAckWorker
import app.aaps.plugins.sync.nsclient.workers.NSClientAddUpdateWorker
import app.aaps.plugins.sync.nsclient.workers.NSClientMbgWorker
import app.aaps.plugins.sync.nsclient.workers.NSClientUpdateRemoveAckWorker
import app.aaps.plugins.sync.nsclientV3.services.NSClientV3Service
import app.aaps.plugins.sync.nsclientV3.workers.DataSyncWorker
import app.aaps.plugins.sync.nsclientV3.workers.LoadBgWorker
import app.aaps.plugins.sync.nsclientV3.workers.LoadDeviceStatusWorker
@ -48,6 +49,7 @@ abstract class SyncModule {
@ContributesAndroidInjector abstract fun contributesNSClientFragment(): NSClientFragment
@ContributesAndroidInjector abstract fun contributesNSClientService(): NSClientService
@ContributesAndroidInjector abstract fun contributesNSClientV3Service(): NSClientV3Service
@ContributesAndroidInjector abstract fun contributesNSClientWorker(): NSClientAddUpdateWorker
@ContributesAndroidInjector abstract fun contributesNSClientAddAckWorker(): NSClientAddAckWorker
@ContributesAndroidInjector abstract fun contributesNSClientUpdateRemoveAckWorker(): NSClientUpdateRemoveAckWorker

View file

@ -114,7 +114,7 @@ class NSClientPlugin @Inject constructor(
}
override fun onStop() {
context.applicationContext.unbindService(mConnection)
if (nsClientService != null) context.unbindService(mConnection)
disposable.clear()
super.onStop()
}
@ -233,7 +233,7 @@ class NSClientPlugin @Inject constructor(
is DataSyncSelector.PairProfileSwitch -> dataPair.value.interfaceIDs.nightscoutId
is DataSyncSelector.PairEffectiveProfileSwitch -> dataPair.value.interfaceIDs.nightscoutId
is DataSyncSelector.PairOfflineEvent -> dataPair.value.interfaceIDs.nightscoutId
else -> throw IllegalStateException()
else -> error("Unsupported type")
}
when (dataPair) {
is DataSyncSelector.PairBolus -> dataPair.value.toJson(false, dateUtil)

View file

@ -68,7 +68,8 @@ import java.net.URISyntaxException
import java.util.*
import javax.inject.Inject
@Suppress("SpellCheckingInspection") class NSClientService : DaggerService() {
@Suppress("SpellCheckingInspection")
class NSClientService : DaggerService() {
@Inject lateinit var injector: HasAndroidInjector
@Inject lateinit var aapsLogger: AAPSLogger

View file

@ -1,8 +1,12 @@
package app.aaps.plugins.sync.nsclientV3
import android.content.ComponentName
import android.content.Context
import android.content.Intent
import android.content.ServiceConnection
import android.os.Handler
import android.os.HandlerThread
import android.os.IBinder
import android.os.SystemClock
import androidx.preference.PreferenceFragmentCompat
import androidx.preference.PreferenceScreen
@ -17,7 +21,6 @@ import app.aaps.core.interfaces.configuration.Constants
import app.aaps.core.interfaces.db.PersistenceLayer
import app.aaps.core.interfaces.logging.AAPSLogger
import app.aaps.core.interfaces.logging.LTag
import app.aaps.core.interfaces.notifications.Notification
import app.aaps.core.interfaces.nsclient.NSAlarm
import app.aaps.core.interfaces.nsclient.StoreDataForDb
import app.aaps.core.interfaces.plugin.PluginBase
@ -29,7 +32,6 @@ import app.aaps.core.interfaces.rx.AapsSchedulers
import app.aaps.core.interfaces.rx.bus.RxBus
import app.aaps.core.interfaces.rx.events.EventAppExit
import app.aaps.core.interfaces.rx.events.EventDeviceStatusChange
import app.aaps.core.interfaces.rx.events.EventDismissNotification
import app.aaps.core.interfaces.rx.events.EventNSClientNewLog
import app.aaps.core.interfaces.rx.events.EventNewHistoryData
import app.aaps.core.interfaces.rx.events.EventOfflineChange
@ -44,29 +46,21 @@ import app.aaps.core.interfaces.source.NSClientSource
import app.aaps.core.interfaces.sync.DataSyncSelector
import app.aaps.core.interfaces.sync.NsClient
import app.aaps.core.interfaces.sync.Sync
import app.aaps.core.interfaces.ui.UiInteraction
import app.aaps.core.interfaces.utils.DateUtil
import app.aaps.core.interfaces.utils.DecimalFormatter
import app.aaps.core.interfaces.utils.T
import app.aaps.core.interfaces.utils.fabric.FabricPrivacy
import app.aaps.core.nssdk.NSAndroidClientImpl
import app.aaps.core.nssdk.interfaces.NSAndroidClient
import app.aaps.core.nssdk.mapper.toNSDeviceStatus
import app.aaps.core.nssdk.mapper.toNSFood
import app.aaps.core.nssdk.mapper.toNSSgvV3
import app.aaps.core.nssdk.mapper.toNSTreatment
import app.aaps.core.nssdk.remotemodel.LastModified
import app.aaps.database.ValueWrapper
import app.aaps.database.entities.interfaces.TraceableDBEntry
import app.aaps.plugins.sync.R
import app.aaps.plugins.sync.nsShared.NSAlarmObject
import app.aaps.plugins.sync.nsShared.NSClientFragment
import app.aaps.plugins.sync.nsShared.NsIncomingDataProcessor
import app.aaps.plugins.sync.nsShared.events.EventConnectivityOptionChanged
import app.aaps.plugins.sync.nsShared.events.EventNSClientUpdateGuiData
import app.aaps.plugins.sync.nsShared.events.EventNSClientUpdateGuiStatus
import app.aaps.plugins.sync.nsclient.ReceiverDelegate
import app.aaps.plugins.sync.nsclient.data.NSDeviceStatusHandler
import app.aaps.plugins.sync.nsclientV3.extensions.toNSBolus
import app.aaps.plugins.sync.nsclientV3.extensions.toNSBolusWizard
import app.aaps.plugins.sync.nsclientV3.extensions.toNSCarbs
@ -80,6 +74,7 @@ import app.aaps.plugins.sync.nsclientV3.extensions.toNSSvgV3
import app.aaps.plugins.sync.nsclientV3.extensions.toNSTemporaryBasal
import app.aaps.plugins.sync.nsclientV3.extensions.toNSTemporaryTarget
import app.aaps.plugins.sync.nsclientV3.extensions.toNSTherapyEvent
import app.aaps.plugins.sync.nsclientV3.services.NSClientV3Service
import app.aaps.plugins.sync.nsclientV3.workers.DataSyncWorker
import app.aaps.plugins.sync.nsclientV3.workers.LoadBgWorker
import app.aaps.plugins.sync.nsclientV3.workers.LoadDeviceStatusWorker
@ -93,19 +88,11 @@ import com.google.gson.GsonBuilder
import dagger.android.HasAndroidInjector
import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.kotlin.plusAssign
import io.socket.client.Ack
import io.socket.client.IO
import io.socket.client.Socket
import io.socket.emitter.Emitter
import kotlinx.serialization.json.Json
import org.json.JSONArray
import org.json.JSONObject
import java.net.URISyntaxException
import java.security.InvalidParameterException
import javax.inject.Inject
import javax.inject.Singleton
@Suppress("SpellCheckingInspection")
@OpenForTesting
@Singleton
class NSClientV3Plugin @Inject constructor(
@ -120,12 +107,9 @@ class NSClientV3Plugin @Inject constructor(
private val receiverDelegate: ReceiverDelegate,
private val config: Config,
private val dateUtil: DateUtil,
private val uiInteraction: UiInteraction,
private val dataSyncSelectorV3: DataSyncSelectorV3,
private val persistenceLayer: PersistenceLayer,
private val nsDeviceStatusHandler: NSDeviceStatusHandler,
private val nsClientSource: NSClientSource,
private val nsIncomingDataProcessor: NsIncomingDataProcessor,
private val storeDataForDb: StoreDataForDb,
private val decimalFormatter: DecimalFormatter
) : NsClient, Sync, PluginBase(
@ -156,28 +140,43 @@ class NSClientV3Plugin @Inject constructor(
override val status
get() =
when {
sp.getBoolean(R.string.key_ns_paused, false) -> rh.gs(app.aaps.core.ui.R.string.paused)
isAllowed.not() -> blockingReason
sp.getBoolean(app.aaps.core.utils.R.string.key_ns_use_ws, true) && wsConnected -> "WS: " + rh.gs(app.aaps.core.interfaces.R.string.connected)
sp.getBoolean(app.aaps.core.utils.R.string.key_ns_use_ws, true) && !wsConnected -> "WS: " + rh.gs(R.string.not_connected)
lastOperationError != null -> rh.gs(app.aaps.core.ui.R.string.error)
nsAndroidClient?.lastStatus == null -> rh.gs(R.string.not_connected)
workIsRunning() -> rh.gs(R.string.working)
nsAndroidClient?.lastStatus?.apiPermissions?.isFull() == true -> rh.gs(app.aaps.core.interfaces.R.string.connected)
nsAndroidClient?.lastStatus?.apiPermissions?.isRead() == true -> rh.gs(R.string.read_only)
else -> rh.gs(app.aaps.core.ui.R.string.unknown)
sp.getBoolean(R.string.key_ns_paused, false) -> rh.gs(app.aaps.core.ui.R.string.paused)
isAllowed.not() -> blockingReason
sp.getBoolean(app.aaps.core.utils.R.string.key_ns_use_ws, true) && nsClientV3Service?.wsConnected == true -> "WS: " + rh.gs(app.aaps.core.interfaces.R.string.connected)
sp.getBoolean(app.aaps.core.utils.R.string.key_ns_use_ws, true) && nsClientV3Service?.wsConnected == false -> "WS: " + rh.gs(R.string.not_connected)
lastOperationError != null -> rh.gs(app.aaps.core.ui.R.string.error)
nsAndroidClient?.lastStatus == null -> rh.gs(R.string.not_connected)
workIsRunning() -> rh.gs(R.string.working)
nsAndroidClient?.lastStatus?.apiPermissions?.isFull() == true -> rh.gs(app.aaps.core.interfaces.R.string.connected)
nsAndroidClient?.lastStatus?.apiPermissions?.isRead() == true -> rh.gs(R.string.read_only)
else -> rh.gs(app.aaps.core.ui.R.string.unknown)
}
var lastOperationError: String? = null
internal var nsAndroidClient: NSAndroidClient? = null
internal var nsClientV3Service: NSClientV3Service? = null
private val isAllowed get() = receiverDelegate.allowed
private val blockingReason get() = receiverDelegate.blockingReason
internal val isAllowed get() = receiverDelegate.allowed
internal val blockingReason get() = receiverDelegate.blockingReason
val maxAge = T.days(100).msecs()
internal var newestDataOnServer: LastModified? = null // timestamp of last modification for every collection provided by server
internal var lastLoadedSrvModified = LastModified(LastModified.Collections()) // max srvLastModified timestamp of last fetched data for every collection
internal var firstLoadContinueTimestamp = LastModified(LastModified.Collections()) // timestamp of last fetched data for every collection during initial load
internal var initialLoadFinished = false
private val serviceConnection: ServiceConnection = object : ServiceConnection {
override fun onServiceDisconnected(name: ComponentName) {
aapsLogger.debug(LTag.NSCLIENT, "Service is disconnected")
nsClientV3Service = null
}
override fun onServiceConnected(name: ComponentName, service: IBinder) {
aapsLogger.debug(LTag.NSCLIENT, "Service is connected")
val localBinder = service as NSClientV3Service.LocalBinder
nsClientV3Service = localBinder.serviceInstance
}
}
override fun onStart() {
super.onStart()
@ -192,15 +191,20 @@ class NSClientV3Plugin @Inject constructor(
setClient("START")
receiverDelegate.grabReceiversState()
disposable += rxBus
.toObservable(EventAppExit::class.java)
.observeOn(aapsSchedulers.io)
.subscribe({ if (nsClientV3Service != null) context.unbindService(serviceConnection) }, fabricPrivacy::logException)
disposable += rxBus
.toObservable(EventConnectivityOptionChanged::class.java)
.observeOn(aapsSchedulers.io)
.subscribe({ ev ->
rxBus.send(EventNSClientNewLog("● CONNECTIVITY", ev.blockingReason))
assert(nsClientV3Service != null)
if (ev.connected) {
when {
isAllowed && storageSocket == null -> setClient("CONNECTIVITY") // socket must be created
!isAllowed && storageSocket != null -> shutdownWebsockets()
isAllowed && nsClientV3Service?.storageSocket == null -> setClient("CONNECTIVITY") // socket must be created
!isAllowed && nsClientV3Service?.storageSocket != null -> shutdownWebsockets()
}
if (isAllowed) executeLoop("CONNECTIVITY", forceNew = false)
}
@ -348,267 +352,22 @@ class NSClientV3Plugin @Inject constructor(
rxBus.send(EventSWSyncStatus(status))
}
/**********************
WS code
**********************/
private var storageSocket: Socket? = null
private var alarmSocket: Socket? = null
internal var wsConnected = false
internal var initialLoadFinished = false
private fun initializeWebSockets(reason: String) {
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_use_ws, true)) {
context.bindService(Intent(context, NSClientV3Service::class.java), serviceConnection, Context.BIND_AUTO_CREATE)
while (nsClientV3Service == null) {
aapsLogger.debug(LTag.NSCLIENT, "Waiting for service start")
SystemClock.sleep(100)
}
nsClientV3Service?.initializeWebSockets(reason)
}
}
private fun shutdownWebsockets() {
storageSocket?.on(Socket.EVENT_CONNECT, onConnectStorage)
storageSocket?.on(Socket.EVENT_DISCONNECT, onDisconnectStorage)
storageSocket?.on("create", onDataCreateUpdate)
storageSocket?.on("update", onDataCreateUpdate)
storageSocket?.on("delete", onDataDelete)
storageSocket?.disconnect()
alarmSocket?.on(Socket.EVENT_CONNECT, onConnectAlarms)
alarmSocket?.on(Socket.EVENT_DISCONNECT, onDisconnectAlarm)
alarmSocket?.on("announcement", onAnnouncement)
alarmSocket?.on("alarm", onAlarm)
alarmSocket?.on("urgent_alarm", onUrgentAlarm)
alarmSocket?.on("clear_alarm", onClearAlarm)
alarmSocket?.disconnect()
wsConnected = false
storageSocket = null
alarmSocket = null
if (nsClientV3Service != null) context.unbindService(serviceConnection)
nsClientV3Service?.shutdownWebsockets()
}
private fun initializeWebSockets(reason: String) {
if (!sp.getBoolean(app.aaps.core.utils.R.string.key_ns_use_ws, true)) return
if (sp.getString(app.aaps.core.utils.R.string.key_nsclientinternal_url, "").isEmpty()) return
val urlStorage = sp.getString(app.aaps.core.utils.R.string.key_nsclientinternal_url, "").lowercase().replace(Regex("/$"), "") + "/storage"
val urlAlarm = sp.getString(app.aaps.core.utils.R.string.key_nsclientinternal_url, "").lowercase().replace(Regex("/$"), "") + "/alarm"
if (!isAllowed) {
rxBus.send(EventNSClientNewLog("● WS", blockingReason))
} else if (sp.getBoolean(R.string.key_ns_paused, false)) {
rxBus.send(EventNSClientNewLog("● WS", "paused"))
} else {
try {
// java io.client doesn't support multiplexing. create 2 sockets
storageSocket = IO.socket(urlStorage).also { socket ->
socket.on(Socket.EVENT_CONNECT, onConnectStorage)
socket.on(Socket.EVENT_DISCONNECT, onDisconnectStorage)
rxBus.send(EventNSClientNewLog("► WS", "do connect storage $reason"))
socket.connect()
socket.on("create", onDataCreateUpdate)
socket.on("update", onDataCreateUpdate)
socket.on("delete", onDataDelete)
}
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_announcements, config.NSCLIENT) ||
sp.getBoolean(app.aaps.core.utils.R.string.key_ns_alarms, config.NSCLIENT)
)
alarmSocket = IO.socket(urlAlarm).also { socket ->
socket.on(Socket.EVENT_CONNECT, onConnectAlarms)
socket.on(Socket.EVENT_DISCONNECT, onDisconnectAlarm)
rxBus.send(EventNSClientNewLog("► WS", "do connect alarm $reason"))
socket.connect()
socket.on("announcement", onAnnouncement)
socket.on("alarm", onAlarm)
socket.on("urgent_alarm", onUrgentAlarm)
socket.on("clear_alarm", onClearAlarm)
}
} catch (e: URISyntaxException) {
rxBus.send(EventNSClientNewLog("● WS", "Wrong URL syntax"))
} catch (e: RuntimeException) {
rxBus.send(EventNSClientNewLog("● WS", "RuntimeException"))
}
}
}
private val onConnectStorage = Emitter.Listener {
val socketId = storageSocket?.id() ?: "NULL"
rxBus.send(EventNSClientNewLog("◄ WS", "connected storage ID: $socketId"))
if (storageSocket != null) {
val authMessage = JSONObject().also {
it.put("accessToken", sp.getString(R.string.key_ns_client_token, ""))
it.put("collections", JSONArray(arrayOf("devicestatus", "entries", "profile", "treatments", "foods", "settings")))
}
rxBus.send(EventNSClientNewLog("► WS", "requesting auth for storage"))
storageSocket?.emit("subscribe", authMessage, Ack { args ->
val response = args[0] as JSONObject
wsConnected = if (response.optBoolean("success")) {
rxBus.send(EventNSClientNewLog("◄ WS", "Subscribed for: ${response.optString("collections")}"))
// during disconnection updated data is not received
// thus run non WS load to get missing data
executeLoop("WS_CONNECT", forceNew = false)
true
} else {
rxBus.send(EventNSClientNewLog("◄ WS", "Auth failed"))
false
}
rxBus.send(EventNSClientUpdateGuiStatus())
})
}
}
private val onConnectAlarms = Emitter.Listener {
val socket = alarmSocket
val socketId = socket?.id() ?: "NULL"
rxBus.send(EventNSClientNewLog("◄ WS", "connected alarms ID: $socketId"))
if (socket != null) {
val authMessage = JSONObject().also {
it.put("accessToken", sp.getString(R.string.key_ns_client_token, ""))
}
rxBus.send(EventNSClientNewLog("► WS", "requesting auth for alarms"))
socket.emit("subscribe", authMessage, Ack { args ->
val response = args[0] as JSONObject
if (response.optBoolean("success")) rxBus.send(EventNSClientNewLog("◄ WS", response.optString("message")))
else rxBus.send(EventNSClientNewLog("◄ WS", "Auth failed"))
})
}
}
private val onDisconnectStorage = Emitter.Listener { args ->
aapsLogger.debug(LTag.NSCLIENT, "disconnect storage reason: ${args[0]}")
rxBus.send(EventNSClientNewLog("◄ WS", "disconnect storage event"))
wsConnected = false
initialLoadFinished = false
rxBus.send(EventNSClientUpdateGuiStatus())
}
private val onDisconnectAlarm = Emitter.Listener { args ->
aapsLogger.debug(LTag.NSCLIENT, "disconnect alarm reason: ${args[0]}")
rxBus.send(EventNSClientNewLog("◄ WS", "disconnect alarm event"))
}
private val onDataCreateUpdate = Emitter.Listener { args ->
val response = args[0] as JSONObject
aapsLogger.debug(LTag.NSCLIENT, "onDataCreateUpdate: $response")
val collection = response.getString("colName")
val docJson = response.getJSONObject("doc")
val docString = response.getString("doc")
rxBus.send(EventNSClientNewLog("◄ WS CREATE/UPDATE", "$collection <i>$docString</i>"))
val srvModified = docJson.getLong("srvModified")
lastLoadedSrvModified.set(collection, srvModified)
storeLastLoadedSrvModified()
when (collection) {
"devicestatus" -> docString.toNSDeviceStatus().let { nsDeviceStatusHandler.handleNewData(arrayOf(it)) }
"entries" -> docString.toNSSgvV3()?.let {
nsIncomingDataProcessor.processSgvs(listOf(it))
storeDataForDb.storeGlucoseValuesToDb()
}
"profile" ->
nsIncomingDataProcessor.processProfile(docJson)
"treatments" -> docString.toNSTreatment()?.let {
nsIncomingDataProcessor.processTreatments(listOf(it))
storeDataForDb.storeTreatmentsToDb()
}
"foods" -> docString.toNSFood()?.let {
nsIncomingDataProcessor.processFood(listOf(it))
storeDataForDb.storeFoodsToDb()
}
"settings" -> {}
}
}
private val onDataDelete = Emitter.Listener { args ->
val response = args[0] as JSONObject
aapsLogger.debug(LTag.NSCLIENT, "onDataDelete: $response")
val collection = response.optString("colName") ?: return@Listener
val identifier = response.optString("identifier") ?: return@Listener
rxBus.send(EventNSClientNewLog("◄ WS DELETE", "$collection $identifier"))
if (collection == "treatments") {
storeDataForDb.deleteTreatment.add(identifier)
storeDataForDb.updateDeletedTreatmentsInDb()
}
if (collection == "entries") {
storeDataForDb.deleteGlucoseValue.add(identifier)
storeDataForDb.updateDeletedGlucoseValuesInDb()
}
}
private val onAnnouncement = Emitter.Listener { args ->
/*
{
"level":0,
"title":"Announcement",
"message":"test",
"plugin":{"name":"treatmentnotify","label":"Treatment Notifications","pluginType":"notification","enabled":true},
"group":"Announcement",
"isAnnouncement":true,
"key":"9ac46ad9a1dcda79dd87dae418fce0e7955c68da"
}
*/
val data = args[0] as JSONObject
rxBus.send(EventNSClientNewLog("◄ ANNOUNCEMENT", data.optString("message")))
aapsLogger.debug(LTag.NSCLIENT, data.toString())
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_announcements, config.NSCLIENT))
uiInteraction.addNotificationWithAction(injector, NSAlarmObject(data))
}
private val onAlarm = Emitter.Listener { args ->
/*
{
"level":1,
"title":"Warning HIGH",
"message":"BG Now: 5 -0.2 → mmol\/L\nRaw BG: 4.8 mmol\/L Čistý\nBG 15m: 4.8 mmol\/L\nIOB: -0.02U\nCOB: 0g",
"eventName":"high",
"plugin":{"name":"simplealarms","label":"Simple Alarms","pluginType":"notification","enabled":true},
"pushoverSound":"climb",
"debug":{"lastSGV":5,"thresholds":{"bgHigh":180,"bgTargetTop":75,"bgTargetBottom":72,"bgLow":70}},
"group":"default",
"key":"simplealarms_1"
}
*/
val data = args[0] as JSONObject
rxBus.send(EventNSClientNewLog("◄ ALARM", data.optString("message")))
aapsLogger.debug(LTag.NSCLIENT, data.toString())
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_alarms, config.NSCLIENT)) {
val snoozedTo = sp.getLong(rh.gs(app.aaps.core.utils.R.string.key_snoozed_to) + data.optString("level"), 0L)
if (snoozedTo == 0L || System.currentTimeMillis() > snoozedTo)
uiInteraction.addNotificationWithAction(injector, NSAlarmObject(data))
}
}
private val onUrgentAlarm = Emitter.Listener { args: Array<Any> ->
val data = args[0] as JSONObject
rxBus.send(EventNSClientNewLog("◄ URGENT ALARM", data.optString("message")))
aapsLogger.debug(LTag.NSCLIENT, data.toString())
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_alarms, config.NSCLIENT)) {
val snoozedTo = sp.getLong(rh.gs(app.aaps.core.utils.R.string.key_snoozed_to) + data.optString("level"), 0L)
if (snoozedTo == 0L || System.currentTimeMillis() > snoozedTo)
uiInteraction.addNotificationWithAction(injector, NSAlarmObject(data))
}
}
private val onClearAlarm = Emitter.Listener { args ->
/*
{
"clear":true,
"title":"All Clear",
"message":"default - Urgent was ack'd",
"group":"default"
}
*/
val data = args[0] as JSONObject
rxBus.send(EventNSClientNewLog("◄ CLEARALARM", data.optString("title")))
aapsLogger.debug(LTag.NSCLIENT, data.toString())
rxBus.send(EventDismissNotification(Notification.NS_ALARM))
rxBus.send(EventDismissNotification(Notification.NS_URGENT_ALARM))
}
override fun handleClearAlarm(originalAlarm: NSAlarm, silenceTimeInMilliseconds: Long) {
if (!isEnabled()) return
if (!sp.getBoolean(R.string.key_ns_upload, true)) {
aapsLogger.debug(LTag.NSCLIENT, "Upload disabled. Message dropped")
return
}
alarmSocket?.emit("ack", originalAlarm.level(), originalAlarm.group(), silenceTimeInMilliseconds)
rxBus.send(EventNSClientNewLog("► ALARMACK ", "${originalAlarm.level()} ${originalAlarm.group()} $silenceTimeInMilliseconds"))
}
/**********************
WS code end
**********************/
override fun resend(reason: String) {
// If WS is enabled, download is triggered by changes in NS. Thus uploadOnly
// Exception is after reset to full sync (initialLoadFinished == false), where
@ -652,6 +411,15 @@ class NSClientV3Plugin @Inject constructor(
dataSyncSelectorV3.resetToNextFullSync()
}
override fun handleClearAlarm(originalAlarm: NSAlarm, silenceTimeInMilliseconds: Long) {
if (!isEnabled()) return
if (!sp.getBoolean(R.string.key_ns_upload, true)) {
aapsLogger.debug(LTag.NSCLIENT, "Upload disabled. Message dropped")
return
}
nsClientV3Service?.handleClearAlarm(originalAlarm, silenceTimeInMilliseconds)
}
override suspend fun nsAdd(collection: String, dataPair: DataSyncSelector.DataPair, progress: String, profile: Profile?): Boolean =
dbOperation(collection, dataPair, progress, Operation.CREATE, profile)
@ -714,6 +482,7 @@ class NSClientV3Plugin @Inject constructor(
}
} catch (e: Exception) {
aapsLogger.error(LTag.NSCLIENT, "Upload exception", e)
return false
}
return true
}
@ -944,7 +713,7 @@ class NSClientV3Plugin @Inject constructor(
sp.putString(R.string.key_ns_client_v3_last_modified, Json.encodeToString(LastModified.serializer(), lastLoadedSrvModified))
}
private fun executeLoop(origin: String, forceNew: Boolean) {
internal fun executeLoop(origin: String, forceNew: Boolean) {
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_use_ws, true) && initialLoadFinished) return
if (sp.getBoolean(R.string.key_ns_paused, false)) {
rxBus.send(EventNSClientNewLog("● RUN", "paused $origin"))

View file

@ -0,0 +1,337 @@
package app.aaps.plugins.sync.nsclientV3.services
import android.annotation.SuppressLint
import android.content.Context
import android.content.Intent
import android.os.Binder
import android.os.Handler
import android.os.HandlerThread
import android.os.IBinder
import android.os.PowerManager
import app.aaps.core.interfaces.configuration.Config
import app.aaps.core.interfaces.logging.AAPSLogger
import app.aaps.core.interfaces.logging.LTag
import app.aaps.core.interfaces.notifications.Notification
import app.aaps.core.interfaces.nsclient.NSAlarm
import app.aaps.core.interfaces.nsclient.StoreDataForDb
import app.aaps.core.interfaces.resources.ResourceHelper
import app.aaps.core.interfaces.rx.bus.RxBus
import app.aaps.core.interfaces.rx.events.*
import app.aaps.core.interfaces.sharedPreferences.SP
import app.aaps.core.interfaces.ui.UiInteraction
import app.aaps.core.interfaces.utils.fabric.FabricPrivacy
import app.aaps.core.nssdk.mapper.toNSDeviceStatus
import app.aaps.core.nssdk.mapper.toNSFood
import app.aaps.core.nssdk.mapper.toNSSgvV3
import app.aaps.core.nssdk.mapper.toNSTreatment
import app.aaps.plugins.sync.R
import app.aaps.plugins.sync.nsShared.NSAlarmObject
import app.aaps.plugins.sync.nsShared.NsIncomingDataProcessor
import app.aaps.plugins.sync.nsShared.events.EventNSClientUpdateGuiStatus
import app.aaps.plugins.sync.nsclient.data.NSDeviceStatusHandler
import app.aaps.plugins.sync.nsclientV3.NSClientV3Plugin
import dagger.android.DaggerService
import dagger.android.HasAndroidInjector
import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.socket.client.Ack
import io.socket.client.IO
import io.socket.client.Socket
import io.socket.emitter.Emitter
import org.json.JSONArray
import org.json.JSONObject
import java.net.URISyntaxException
import java.util.*
import javax.inject.Inject
@Suppress("SpellCheckingInspection")
class NSClientV3Service : DaggerService() {
@Inject lateinit var injector: HasAndroidInjector
@Inject lateinit var aapsLogger: AAPSLogger
@Inject lateinit var rxBus: RxBus
@Inject lateinit var rh: ResourceHelper
@Inject lateinit var sp: SP
@Inject lateinit var fabricPrivacy: FabricPrivacy
@Inject lateinit var nsClientV3Plugin: NSClientV3Plugin
@Inject lateinit var config: Config
@Inject lateinit var nsIncomingDataProcessor: NsIncomingDataProcessor
@Inject lateinit var storeDataForDb: StoreDataForDb
@Inject lateinit var uiInteraction: UiInteraction
@Inject lateinit var nsDeviceStatusHandler: NSDeviceStatusHandler
private val disposable = CompositeDisposable()
private var wakeLock: PowerManager.WakeLock? = null
private val binder: IBinder = LocalBinder()
private val handler = Handler(HandlerThread(this::class.simpleName + "Handler").also { it.start() }.looper)
@SuppressLint("WakelockTimeout")
override fun onCreate() {
super.onCreate()
wakeLock = (getSystemService(Context.POWER_SERVICE) as PowerManager).newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "AndroidAPS:NSClientService")
wakeLock?.acquire()
}
override fun onDestroy() {
super.onDestroy()
disposable.clear()
if (wakeLock?.isHeld == true) wakeLock?.release()
}
inner class LocalBinder : Binder() {
val serviceInstance: NSClientV3Service
get() = this@NSClientV3Service
}
override fun onBind(intent: Intent): IBinder = binder
override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int = START_STICKY
internal var storageSocket: Socket? = null
private var alarmSocket: Socket? = null
internal var wsConnected = false
internal fun shutdownWebsockets() {
storageSocket?.on(Socket.EVENT_CONNECT, onConnectStorage)
storageSocket?.on(Socket.EVENT_DISCONNECT, onDisconnectStorage)
storageSocket?.on("create", onDataCreateUpdate)
storageSocket?.on("update", onDataCreateUpdate)
storageSocket?.on("delete", onDataDelete)
storageSocket?.disconnect()
alarmSocket?.on(Socket.EVENT_CONNECT, onConnectAlarms)
alarmSocket?.on(Socket.EVENT_DISCONNECT, onDisconnectAlarm)
alarmSocket?.on("announcement", onAnnouncement)
alarmSocket?.on("alarm", onAlarm)
alarmSocket?.on("urgent_alarm", onUrgentAlarm)
alarmSocket?.on("clear_alarm", onClearAlarm)
alarmSocket?.disconnect()
wsConnected = false
storageSocket = null
alarmSocket = null
}
internal fun initializeWebSockets(reason: String) {
if (sp.getString(app.aaps.core.utils.R.string.key_nsclientinternal_url, "").isEmpty()) return
val urlStorage = sp.getString(app.aaps.core.utils.R.string.key_nsclientinternal_url, "").lowercase().replace(Regex("/$"), "") + "/storage"
val urlAlarm = sp.getString(app.aaps.core.utils.R.string.key_nsclientinternal_url, "").lowercase().replace(Regex("/$"), "") + "/alarm"
if (!nsClientV3Plugin.isAllowed) {
rxBus.send(EventNSClientNewLog("● WS", nsClientV3Plugin.blockingReason))
} else if (sp.getBoolean(R.string.key_ns_paused, false)) {
rxBus.send(EventNSClientNewLog("● WS", "paused"))
} else {
try {
// java io.client doesn't support multiplexing. create 2 sockets
storageSocket = IO.socket(urlStorage).also { socket ->
socket.on(Socket.EVENT_CONNECT, onConnectStorage)
socket.on(Socket.EVENT_DISCONNECT, onDisconnectStorage)
rxBus.send(EventNSClientNewLog("► WS", "do connect storage $reason"))
socket.connect()
socket.on("create", onDataCreateUpdate)
socket.on("update", onDataCreateUpdate)
socket.on("delete", onDataDelete)
}
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_announcements, config.NSCLIENT) ||
sp.getBoolean(app.aaps.core.utils.R.string.key_ns_alarms, config.NSCLIENT)
)
alarmSocket = IO.socket(urlAlarm).also { socket ->
socket.on(Socket.EVENT_CONNECT, onConnectAlarms)
socket.on(Socket.EVENT_DISCONNECT, onDisconnectAlarm)
rxBus.send(EventNSClientNewLog("► WS", "do connect alarm $reason"))
socket.connect()
socket.on("announcement", onAnnouncement)
socket.on("alarm", onAlarm)
socket.on("urgent_alarm", onUrgentAlarm)
socket.on("clear_alarm", onClearAlarm)
}
} catch (e: URISyntaxException) {
rxBus.send(EventNSClientNewLog("● WS", "Wrong URL syntax"))
} catch (e: RuntimeException) {
rxBus.send(EventNSClientNewLog("● WS", "RuntimeException"))
}
}
}
private val onConnectStorage = Emitter.Listener {
val socketId = storageSocket?.id() ?: "NULL"
rxBus.send(EventNSClientNewLog("◄ WS", "connected storage ID: $socketId"))
if (storageSocket != null) {
val authMessage = JSONObject().also {
it.put("accessToken", sp.getString(R.string.key_ns_client_token, ""))
it.put("collections", JSONArray(arrayOf("devicestatus", "entries", "profile", "treatments", "foods", "settings")))
}
rxBus.send(EventNSClientNewLog("► WS", "requesting auth for storage"))
storageSocket?.emit("subscribe", authMessage, Ack { args ->
val response = args[0] as JSONObject
wsConnected = if (response.optBoolean("success")) {
rxBus.send(EventNSClientNewLog("◄ WS", "Subscribed for: ${response.optString("collections")}"))
// during disconnection updated data is not received
// thus run non WS load to get missing data
nsClientV3Plugin.executeLoop("WS_CONNECT", forceNew = false)
true
} else {
rxBus.send(EventNSClientNewLog("◄ WS", "Auth failed"))
false
}
rxBus.send(EventNSClientUpdateGuiStatus())
})
}
}
private val onConnectAlarms = Emitter.Listener {
val socket = alarmSocket
val socketId = socket?.id() ?: "NULL"
rxBus.send(EventNSClientNewLog("◄ WS", "connected alarms ID: $socketId"))
if (socket != null) {
val authMessage = JSONObject().also {
it.put("accessToken", sp.getString(R.string.key_ns_client_token, ""))
}
rxBus.send(EventNSClientNewLog("► WS", "requesting auth for alarms"))
socket.emit("subscribe", authMessage, Ack { args ->
val response = args[0] as JSONObject
if (response.optBoolean("success")) rxBus.send(EventNSClientNewLog("◄ WS", response.optString("message")))
else rxBus.send(EventNSClientNewLog("◄ WS", "Auth failed"))
})
}
}
private val onDisconnectStorage = Emitter.Listener { args ->
aapsLogger.debug(LTag.NSCLIENT, "disconnect storage reason: ${args[0]}")
rxBus.send(EventNSClientNewLog("◄ WS", "disconnect storage event"))
wsConnected = false
nsClientV3Plugin.initialLoadFinished = false
rxBus.send(EventNSClientUpdateGuiStatus())
}
private val onDisconnectAlarm = Emitter.Listener { args ->
aapsLogger.debug(LTag.NSCLIENT, "disconnect alarm reason: ${args[0]}")
rxBus.send(EventNSClientNewLog("◄ WS", "disconnect alarm event"))
}
private val onDataCreateUpdate = Emitter.Listener { args ->
val response = args[0] as JSONObject
aapsLogger.debug(LTag.NSCLIENT, "onDataCreateUpdate: $response")
val collection = response.getString("colName")
val docJson = response.getJSONObject("doc")
val docString = response.getString("doc")
rxBus.send(EventNSClientNewLog("◄ WS CREATE/UPDATE", "$collection <i>$docString</i>"))
val srvModified = docJson.getLong("srvModified")
nsClientV3Plugin.lastLoadedSrvModified.set(collection, srvModified)
nsClientV3Plugin.storeLastLoadedSrvModified()
when (collection) {
"devicestatus" -> docString.toNSDeviceStatus().let { nsDeviceStatusHandler.handleNewData(arrayOf(it)) }
"entries" -> docString.toNSSgvV3()?.let {
nsIncomingDataProcessor.processSgvs(listOf(it))
storeDataForDb.storeGlucoseValuesToDb()
}
"profile" ->
nsIncomingDataProcessor.processProfile(docJson)
"treatments" -> docString.toNSTreatment()?.let {
nsIncomingDataProcessor.processTreatments(listOf(it))
storeDataForDb.storeTreatmentsToDb()
}
"foods" -> docString.toNSFood()?.let {
nsIncomingDataProcessor.processFood(listOf(it))
storeDataForDb.storeFoodsToDb()
}
"settings" -> {}
}
}
private val onDataDelete = Emitter.Listener { args ->
val response = args[0] as JSONObject
aapsLogger.debug(LTag.NSCLIENT, "onDataDelete: $response")
val collection = response.optString("colName") ?: return@Listener
val identifier = response.optString("identifier") ?: return@Listener
rxBus.send(EventNSClientNewLog("◄ WS DELETE", "$collection $identifier"))
if (collection == "treatments") {
storeDataForDb.deleteTreatment.add(identifier)
storeDataForDb.updateDeletedTreatmentsInDb()
}
if (collection == "entries") {
storeDataForDb.deleteGlucoseValue.add(identifier)
storeDataForDb.updateDeletedGlucoseValuesInDb()
}
}
private val onAnnouncement = Emitter.Listener { args ->
/*
{
"level":0,
"title":"Announcement",
"message":"test",
"plugin":{"name":"treatmentnotify","label":"Treatment Notifications","pluginType":"notification","enabled":true},
"group":"Announcement",
"isAnnouncement":true,
"key":"9ac46ad9a1dcda79dd87dae418fce0e7955c68da"
}
*/
val data = args[0] as JSONObject
rxBus.send(EventNSClientNewLog("◄ ANNOUNCEMENT", data.optString("message")))
aapsLogger.debug(LTag.NSCLIENT, data.toString())
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_announcements, config.NSCLIENT))
uiInteraction.addNotificationWithAction(injector, NSAlarmObject(data))
}
private val onAlarm = Emitter.Listener { args ->
/*
{
"level":1,
"title":"Warning HIGH",
"message":"BG Now: 5 -0.2 → mmol\/L\nRaw BG: 4.8 mmol\/L Čistý\nBG 15m: 4.8 mmol\/L\nIOB: -0.02U\nCOB: 0g",
"eventName":"high",
"plugin":{"name":"simplealarms","label":"Simple Alarms","pluginType":"notification","enabled":true},
"pushoverSound":"climb",
"debug":{"lastSGV":5,"thresholds":{"bgHigh":180,"bgTargetTop":75,"bgTargetBottom":72,"bgLow":70}},
"group":"default",
"key":"simplealarms_1"
}
*/
val data = args[0] as JSONObject
rxBus.send(EventNSClientNewLog("◄ ALARM", data.optString("message")))
aapsLogger.debug(LTag.NSCLIENT, data.toString())
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_alarms, config.NSCLIENT)) {
val snoozedTo = sp.getLong(rh.gs(app.aaps.core.utils.R.string.key_snoozed_to) + data.optString("level"), 0L)
if (snoozedTo == 0L || System.currentTimeMillis() > snoozedTo)
uiInteraction.addNotificationWithAction(injector, NSAlarmObject(data))
}
}
private val onUrgentAlarm = Emitter.Listener { args: Array<Any> ->
val data = args[0] as JSONObject
rxBus.send(EventNSClientNewLog("◄ URGENT ALARM", data.optString("message")))
aapsLogger.debug(LTag.NSCLIENT, data.toString())
if (sp.getBoolean(app.aaps.core.utils.R.string.key_ns_alarms, config.NSCLIENT)) {
val snoozedTo = sp.getLong(rh.gs(app.aaps.core.utils.R.string.key_snoozed_to) + data.optString("level"), 0L)
if (snoozedTo == 0L || System.currentTimeMillis() > snoozedTo)
uiInteraction.addNotificationWithAction(injector, NSAlarmObject(data))
}
}
private val onClearAlarm = Emitter.Listener { args ->
/*
{
"clear":true,
"title":"All Clear",
"message":"default - Urgent was ack'd",
"group":"default"
}
*/
val data = args[0] as JSONObject
rxBus.send(EventNSClientNewLog("◄ CLEARALARM", data.optString("title")))
aapsLogger.debug(LTag.NSCLIENT, data.toString())
rxBus.send(EventDismissNotification(Notification.NS_ALARM))
rxBus.send(EventDismissNotification(Notification.NS_URGENT_ALARM))
}
fun handleClearAlarm(originalAlarm: NSAlarm, silenceTimeInMilliseconds: Long) {
alarmSocket?.emit("ack", originalAlarm.level(), originalAlarm.group(), silenceTimeInMilliseconds)
rxBus.send(EventNSClientNewLog("► ALARMACK ", "${originalAlarm.level()} ${originalAlarm.group()} $silenceTimeInMilliseconds"))
}
}

View file

@ -23,14 +23,14 @@ class DataSyncWorker(
@Inject lateinit var nsClientV3Plugin: NSClientV3Plugin
override suspend fun doWorkAndLog(): Result {
if (activePlugin.activeNsClient?.hasWritePermission == true || nsClientV3Plugin.wsConnected) {
if (activePlugin.activeNsClient?.hasWritePermission == true || nsClientV3Plugin.nsClientV3Service?.wsConnected == true) {
rxBus.send(EventNSClientNewLog("► UPL", "Start"))
dataSyncSelectorV3.doUpload()
rxBus.send(EventNSClientNewLog("► UPL", "End"))
} else {
if (activePlugin.activeNsClient?.hasWritePermission == true)
rxBus.send(EventNSClientNewLog("► ERROR", "No write permission"))
else if (nsClientV3Plugin.wsConnected)
else if (nsClientV3Plugin.nsClientV3Service?.wsConnected == true)
rxBus.send(EventNSClientNewLog("► ERROR", "Not connected"))
// refresh token
nsClientV3Plugin.scheduleIrregularExecution(refreshToken = true)