Asynchronous flow
Flows are cold streams similar to sequences — the code inside a flow builder does not run until the flow is collected. There are following ways to create flow:
- flow() - builder method that allows to emit (i.e. publish) specified values via emit() method.
- flowOf() - defines a flow emitting a fixed set of values.
- asFlow() - extension function for collections and sequences, that converts to flow.
The emit() method publishes value that can be transformed by the intermediate operators and consumed by terminal operators. This method is not thread-safe and should not be invoked concurrently.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
simple().collect { value -> println(value) }
}
println("Done")
}
Intermediate operators transform a flow values as you would with collections and sequences. The important difference to sequences is that blocks of code inside these operators can call suspending functions. These operators are cold.
- map() - returns a flow containing the results of applying the given transform function to each value of the original flow.
- filter(p) - returns a flow containing only values of the original flow that match the given predicate.
The transform() operator allows to emit arbitrary values an arbitrary number of times.
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
Size-limiting intermediate operators cancel the execution of the flow when the corresponding limit is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management functions operate normally in case of cancellation.
- take() - returns a flow that contains first count elements. When count elements are consumed, the original flow is cancelled.
fun numbers(): Flow<Int> = flow {
try {
emit(1); emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
Terminal operators on flows are suspending functions that start a collection of the flow.
- collect() - collects the given flow but ignores all emitted values. Usually used with onEach(), onCompletion() and catch() operators to process all emitted values and handle an exception that might occur in the upstream flow or during processing.
- collectIndexed() - collects the given flow with a provided action that takes the index of an element (zero-based) and the element.
- first(predicate) - returns the first element emitted by the flow matching the given predicate and then cancels flow's collection. Throws NoSuchElementException if the flow has not contained elements matching the predicate.
- reduce() - accumulates value starting with the first element and applying operation to current accumulator value and each element.
- fold() - accumulates value starting with initial value and applying operation current accumulator value and each element.
- toList() - collects given flow into a destination.
- toSet() - collects given flow into a destination