Asynchronous flow

Flows allow you to process multiple values that are emitted asynchronously.

cold flows

Cold flows are similar to sequences — the code inside a flow builder does not run until the flow is collected. There are following ways to create flow:

  • flow() - builder method that allows to emit (i.e. publish) specified values via emit() method.
  • flowOf() - defines a flow emitting a fixed set of values.
  • asFlow() - extension function for collections and sequences, that converts to flow.

The emit() method publishes value that can be transformed by the intermediate operators and consumed by terminal operators. This method is not thread-safe and should not be invoked concurrently.

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

Intermediate operators transform a flow values as you would with collections and sequences. The important difference to sequences is that blocks of code inside these operators can call suspending functions. These operators are cold.

  • map() - returns a flow containing the results of applying the given transform function to each value of the original flow.
  • filter(p) - returns a flow containing only values of the original flow that match the given predicate.

The transform() operator allows to emit arbitrary values an arbitrary number of times.

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request") 
            emit(performRequest(request)) 
        }
        .collect { response -> println(response) }
}

Size-limiting intermediate operators cancel the execution of the flow when the corresponding limit is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management functions operate normally in case of cancellation.

  • take() - returns a flow that contains first count elements. When count elements are consumed, the original flow is cancelled.
fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1); emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
} 

Terminal operators on flows are suspending functions that start a collection of the flow.

  • collect() - collects the given flow but ignores all emitted values. Usually used with onEach(), onCompletion() and catch() operators to process all emitted values and handle an exception that might occur in the upstream flow or during processing.
  • collectIndexed() - collects the given flow with a provided action that takes the index of an element (zero-based) and the element.
  • first(predicate) - returns the first element emitted by the flow matching the given predicate and then cancels flow's collection. Throws NoSuchElementException if the flow has not contained elements matching the predicate.
  • reduce() - accumulates value starting with the first element and applying operation to current accumulator value and each element.
  • fold() - accumulates value starting with initial value and applying operation current accumulator value and each element.
  • toList() - collects given flow into a destination.
  • toSet() - collects given flow into a destination

hot flows

You can think of hot flows as an implementation of reactive programming in Kotlin, where flow is publisher and collecter is subscriber.

You can think of hot flows as an implementation of producer-consumer pattern, where flow is producer and collecter is consumer.

Kotlin provides following hot flows:

  • SharedFlow - keeps a specific number of the most recent values in its replay cache. Every new collector first gets the values from the replay cache and then gets new emitted values.
  • MutableSharedFlow is a SharedFlow that also provides the abilities
    • to emit a value
    • to tryEmit without suspension if possible
    • to track the subscriptionCount property
    • to resetReplayCache
  • StateFlow - special shared flow, that keeps only the last emitted value.
  • MutableStateFlow - a mutable StateFlow that provides a setter for the value property. So you can emit new value, just assigning it to the value property.
    Remember, that setting a value that is equal to the previous one does nothing.
class EventBus {
    private val _events = MutableSharedFlow<Event>() // private mutable shared flow
    val events = _events.asSharedFlow() // publicly exposed as read-only shared flow

    suspend fun produceEvent(event: Event) {
        _events.emit(event) // suspends until all subscribers receive it
    }
}

class CounterModel {
    private val _counter = MutableStateFlow(0) // private mutable state flow
    val counter = _counter.asStateFlow() // publicly exposed as read-only state flow

    fun inc() {
        _counter.value += 1
    }
}

To observe values, you can use terminal operations of flow like in cold flows. Because hot flows never complete, you need observe in new coroutine.

// observe counter in separate coroutine.
launch {
    counterModel.counter.collect {
        println("counter is $it")
    }
}

Kotlin flow API doesn't allow to unregister the collector directly. So you need to collect the flow from the appropriate coroutine scope. Then the collector will be canceled when the scope is cancelled.

Using hot flow in Android

Conceptually shared flow is similar to BroadcastChannel and is designed to completely replace it. It has the following important differences:

  • SharedFlow is simpler, because it does not have to implement all the Channel APIs, which allows for faster and simpler implementation.
  • SharedFlow supports configurable replay and buffer overflow strategy.
  • SharedFlow has a clear separation into a read-only SharedFlow interface and a MutableSharedFlow.
  • SharedFlow cannot be closed like BroadcastChannel and can never represent a failure. All errors and completion signals should be explicitly materialized if needed.

In the same way, state flow is similar to ConflatedBroadcastChannel and is designed to completely replace it.

cold flows vs hot flows

There are difference hot flow from cold flow:

  • hot flow may have multiple collectors (i.e. consumers or subscribers)
  • hot flow exists independently of its collector
  • collecting from the hot flow doesn't trigger any producer code

Any cold flow can be converted to a shared flow using the shareIn operator.