Note: the Channels artifact is DEPRECATED, and will not be present in future RainbowCake versions. To get continuous updates from lower layers, prefer using coroutine Flows. This documentation is kept available as a reference of the implementation present in past releases.
The ChannelViewModel
base class provides the ability to observe updates from channels in a safe and concise way, in addition to providing state handling and coroutine Job execution.
This API, as the coroutine Channel API itself, should be regarded as experimental.
To discuss the API provided, some terms need to be defined first.
Channel: A coroutine Channel
emitting a stream of events. In this use case, they are used to enable the ability of lower layers of the architecture (interactors, data sources) to push events and data to the ViewModel without these events having to be polled for.
Observer: The callbacks that are hooked up to a channel and receive updates (new elements) from it as they’re emitted.
Observation An observation is a collective name for a channel and the callbacks connected to it via the observe
methods. There’s always a one-to-one relation between observations and channels.
The most important provided by ChannelViewModel
is observe
, an extension on the ReceiveChannel<T>
type. The observe
method will not return until the observation is over, so it should almost always be used within a non-blocking execute
call.
class LocationViewModel @Inject constructor(
private val locationPresenter: LocationPresenter
) : ChannelViewModel<LocationViewState>(LocationViewState()) {
init {
executeNonBlocking {
val updates = locationPresenter.getLocationUpdates()
updates.observe { location ->
viewState = viewState.copy(
currentLocation = location
)
}
}
}
}
When an observation is cancelled, the ViewModel stops receiving updates from its channel and the channel itself is cancelled as well.
This means that only a single observe
call should ever be made to a channel. Subsequently, any classes in lower layers should always return new channel instances when one is requested from them, as they can not be reused due to the way they are tied to the observations.
Cancellation also happens the other way around: if a channel is cancelled on the supplier’s end, in the lower layer, the observation will be cleaned up as well (after running the appropriate callbacks of observe
).
The trailing lambda seen in the code above receives the elements that the Channel emits. There are two additional callbacks that can be specified to handle the observation ending:
updates.observe(
onCancelled = { Timber.d("Channel was cancelled") },
onClosed = { Timber.d("Channel was closed") }
) { location ->
viewState = viewState.copy(
currentLocation = location
)
}
onClosed
: A callback for when the channel is closed. This may be because the data that was being supplied has ran out, because an exception happened somewhere along the way, or because the ViewModel was cleared and therefore cancelled the observation.onCancelled
: Similar to onClosed
, but will only be called if the channel has terminated exceptionally due to an Exception being thrown in the lower layers. When this happens, onClosed
will still be invoked, but this callback runs first.Starting an observation in an init
block is the simplest way to do it. The channel will be requested when the ViewModel is created, observed while it exists, and closed when the ViewModel is cleared. It may also be required to observe a channel more dynamically, starting and stopping the observation based on various events (e.g. user interaction) during the lifetime of a ViewModel.
For these use cases, an optional key
can be provided to the observe
method. Keyed observations are unique, two observations with the same key
can not exist at the same time.
The replaceExisting
parameter of observe
can be used to indicate whether to replace the existing observation for the given key
with the one being created now, if it exists. If this parameter is false and the key
is already taken, the observe
call is a no-op and terminates silently. The channel that was to be observed will be cancelled.
class LocationViewModel @Inject constructor(
private val locationPresenter: LocationPresenter
) : ChannelViewModel<LocationViewState>(LocationViewState()) {
fun startLocationUpdates() = executeNonBlocking {
val updates = locationPresenter.getLocationUpdates()
updates.observe(
key = "location_key",
replaceExisting = true
) { location ->
viewState = viewState.copy(
currentLocation = location
)
}
}
}
Note that this is wasteful because getting hold of the channel might have already activated callbacks and other mechanisms in underlying data sources, which will have to be shut down.
If this activation and shutdown has significant costs in a specific use case, the isObserving
should be used to check if there’s an active observation for the given key
first. A removeObserver
method is also available to cancel an ongoing observation with a given key
.
fun startLocationUpdates() = executeNonBlocking {
if (isObserving("location_key")) {
return@executeNonBlocking
}
val updates = locationPresenter.getLocationUpdates()
updates.observe( ... )
}
// This method doesn't need to start a coroutine,
// as removing an observer is synchronous
fun stopLocationUpdates() {
removeObserver("location_key")
}
The channels that are observed in the ViewModel have to be created and supplied with data in the data layer. Depending on the API that provides it and the use case at hand, this channel creation will fall into one of two categories.
A “single-engine” source will be activated just once, when the first channel is requested from it. For any additional requests, it only needs to create a channel. Whatever callback API it activated or data source it connected to will supply all created channels on its own. The source will be deactivated if it no longer has channels to supply with data.
An example of this could be a GPS data source, which will only activate the Fused Location API once, and when it receives an update from it, send it to all created channels.
To manage the collection of active channels, a ChannelCollection
interface and a default implementation of it (SynchronizedChannelCollection
) are provided. While extending this default implementation might be an issue for callback-based APIs where a callback already needs to extend a given callback base class, it can be easily delegated to instead:
class MultiChannelLocationCallback(
channelCollection: ChannelCollection<Location> = SynchronizedChannelCollection()
) : LocationCallback(), ChannelCollection<Location> by channelCollection {
override fun onLocationResult(locationResult: LocationResult?) {
locationResult ?: return
forEachChannel { it.offer(locationResult.lastLocation) }
}
// ...
}
All channels can be supplied with the data from the callback using the forEachChannel
method.
The data source implementation can use this callback the following way:
produceInIOContext
function, a simple wrapper around produce
that just sets the context to be used.resultChannel
) is created, and added to the callback collection.produceInIOContext
reads the updates from resultChannel
, and transfers them to its own channel, now in the appropriate coroutine context.class SingleEngineGpsDataSource @Inject constructor(private val context: Context) {
private val callback = MultiChannelLocationCallback()
fun getLocationEvents(): ReceiveChannel<Location> = produceInIOContext {
val resultChannel = Channel<Location>(CONFLATED)
val request = LocationRequest().apply { ... }
val firstChannelAdded = callback.addChannel(resultChannel)
if (firstChannelAdded) {
startLocationUpdates(request, callback)
}
try {
for (location in resultChannel) {
send(location)
}
} finally {
val anyChannelsRemaining = callback.removeChannel(resultChannel)
if (!anyChannelsRemaining) {
stopLocationUpdates(callback)
}
}
}
private fun startLocationUpdates(
request: LocationRequest,
callback: LocationCallback
) {
LocationServices.getFusedLocationProviderClient(context)
.requestLocationUpdates(request, callback, Looper.getMainLooper())
}
private fun stopLocationUpdates(
callback: LocationCallback
) {
LocationServices.getFusedLocationProviderClient(context)
.removeLocationUpdates(callback)
}
}
The second type is a “multi-engine” source, which sounds more complicated, but is actually simpler to implement. Every time a channel is requested from this source, it will start fetching data and supplying the channel. When the channel is cancelled, the mechanism supplying it is also shut down.
For this example, we’ll use the Bluetooth API to scan for devices with various UUID filters. This will require the registration of a new callback for each call to our data source. Since our callback will only serve a single channel, it will receive just this channel as its parameter:
class ChannelScanCallback(private val channel: SendChannel<BluetoothDevice>)
: ScanCallback() {
override fun onScanFailed(errorCode: Int) {
channel.close()
}
override fun onScanResult(callbackType: Int, result: ScanResult) {
channel.offer(result.device)
}
}
The data source implementation is very similar to before, but without the code that was required to manage the channel collection.
scan
is created by the produceInIOContext
function.resultChannel
) is created to be passed into the callback, which is created for each call to scan
, instead of at the class level.resultChannel
is read from in a loop, the data in it is forwarded into the returned channel.class MultiEngineBluetoothDataSource @Inject constructor(
private val scanner: BluetoothLeScanner
) {
suspend fun scan(uuid: UUID): ReceiveChannel<BluetoothDevice>
= produceInIOContext(capacity = 100) {
val resultChannel = Channel<BluetoothDevice>(capacity = 100)
val scanCallback = ChannelScanCallback(resultChannel)
val settings = ScanSettings.Builder()
.setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
.build()
val filter = ScanFilter.Builder()
.setServiceUuid(ParcelUuid(uuid))
.build()
scanner.startScan(listOf(filter), settings, scanCallback)
try {
for (device in resultChannel) {
send(device)
}
} finally {
scanner.stopScan(scanCallback)
}
}
}