Kotlin Flows in Android

Aron
5 min readAug 3, 2022

Reactive programming, where data is pushed not pulled.

Fetch water from the source

Panda wants to fetch water from the reservoir for his day-to-day activities. He has to walk daily to and fro. One day he finds that water is not there after reaching the reservoir. Then he realizes his current way of fetching water is in-efficient and comes up with a brilliant solution

A pipe to fetch water

He builds a pipe from the reservoir to his place, now he can collect water when required by just opening the tap. (This pipe is called Flow) :)

Benefits of LiveData

All subscribers(new and old subscribers) would always get the latest data.

Always check if the subscriber is active before publishing data.

Survive activity recreation i.e subscribers would always get data on moving from inactive state to active state.

Then why Flows when we already have LiveData

  • It is part of the Android framework, so it makes sense to use them in your UI layer, but when going down to domain and data layers using android dependencies does not make sense.
  • Lack of control over the execution context
  • Lots of Boiler Plate Codes especially while using Transformations

What are Flows

There are 3 components to a flow Producer, Intermediary(optional), Consumer

We can use transformers to change or modify our data like map lambda, each lambda creates a flow and returns it

A flow can emit multiple values sequentially unlike suspend functions which are used for one-shot operations. Flow is declarative/cold. It can only be executed on collection and can have only one subscriber. Any new subscriber would create a new execution of flow{..}.i.e Every time you say collect, a new pipe is created and the producer block is executed.

pipe infrastructure :)

There are hot flows as well (SharedFlow and StateFlow ). (cold: Stops emission when any collector is not active. hot: it remains in memory as long as the flow is collected or as long as any other references to it exist from a garbage collection root.)

Analogy for hot stream : live video feed, all viewers will get to see exact same timeline of video being played.

SharedFlow and StateFlow are HOT flows as they propagates items to multiple consumers. StateFlow is a special shareFlow that maintain the current state(by .value ).

Collecting from the hot flow doesn’t trigger any producer code (code block that updates the value of shared or stateFlow).

We can convert any cold flow (flow{..}) to hot flow by two intermediate operators.
stateIn
shareIn

Lets look at respective functions and try to understand

fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T> (source)
fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> (source)

SharedFlow and StateFlow : Which one to choose ?

I need to have latest value at any point of time using .value : StateFlow.

More than latest value using replay : SharedFlow.

When we don't need to display data in UI, we should stop collecting the data to avoid wastage of resources, just like our Panda who should close his tap when he is not using water. Flows by nature are not lifeCycle aware, unlike LiveData, which makes sense as it’s not a part of android component but a type from kotlin language.

repeatOnLifecycle is a suspend function that takes a Lifecycle.State as a parameter which is used to automatically create and launch a new coroutine with the block passed to it when the lifecycle reaches that state, and cancel the ongoing coroutine that’s executing the block when the lifecycle falls below the state.

You can also use the Flow.flowWithLifecycle operator when you have only one flow to collect. This API uses the repeatOnLifecycle API under the hood

Difference between using and not using the repeatOnLifecycle API

Channels support one-to-one communication between coroutines, and every value that is sent to the channel is received once. Channels are used to handle events that must be processed exactly once. This happens in a design with a type of event that usually has a single subscriber, but intermittently (at startup or during some kind of reconfiguration) there are no subscribers at all, and there is a requirement that all posted events must be retained until a subscriber appears.

Why kotlin flows when we already have RxJava/RxKotlin

Backpressure: offers a whole page of the documentation to deal with backpressure, but flow by default supports it because of it suspending nature

Simplicity : flow { emit(“whatever”) }, you don’t have to think if you have to use just , create, defer or whichever of the multiple operators they have.

Operators: In Rx we have operators to work with synchronous and asynchronous operations. For instance, map is for synchronous operations, on the other hand, flatMap is for asynchronous ones. Because of the fact that all flow operators accept a suspend function, all of them are prepared for asynchronous operations

Lifetime: In Rx, the lifetime it is managed by a Disposable and whenever you want to remove the subscription you have to clear that disposable or add it to a CompositeDisposable to clear it together with other subscriptions.

val disposable = someObservable.subscribe()
disposable.dispose()

// or...

val compositeDisposable = CompositeDisposable()
compositeDisposable.add(disposable)
compositeDisposable.dispose()

Flows have to be launched in a Job inside a CoroutineScope and the Job instance is returned where we launch the coroutine.Usually, the coroutine has to be launched inside a scope that is managed by the framework to be cleaned up.

// In an Android ViewModel you will use the viewModelScope:
viewModelScope.launch { someFlow.collect { /* do whatever */ } }

// In a Fragment we have the lifecycleScope :
lifecycleScope.launch { someFlow.collect { /* do whatever */ } }

// Or with the launchIn method:
someFlow.launchIn(viewModelScope)

References

--

--