Coroutine channels

Channel provides a non-blocking communication between coroutines via the SendChannel interface and the ReceiveChannel interface.

Conceptually, a channel is similar to Java's java.util.concurrent.BlockingQueue, but it has suspending operations instead of blocking ones and can be closed.

Unlike the reactive pattern, you cannot use the channel after consuming the elements.

There are usefull methods and extensions of ReceiveChannel:

  • receive() - retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty, or throws a ClosedReceiveChannelException if the channel is closed
  • consumeEach() - performs the given action for each received element and cancels the channel after the execution of the block. If you need to iterate over the channel without consuming it, a regular for loop should be used instead.
  • consume() - makes sure that the given block consumes all elements from the given channel by always invoking cancel after the execution of the block.
val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, 
    // we'll just send five squares
    for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }

Unlike a queue, a channel can be closed to indicate that no more elements are coming. On the receiver side it is convenient to use a regular for loop to receive elements from the channel.

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // we're done sending
}

// channel must be closed
for (y in channel) println(y)

The produce() function create producer in new coroutine. The resulting object can be used to receive elements produced by this coroutine.

The channel is closed when the coroutine completes. The coroutine context is inherited from this CoroutineScope. Additional context elements can be specified with the context argument.

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
}

The actor() function create consumer in new coroutine. The resulting object can be used to send messages to this coroutine.

val c = actor<Int> {
    // initialize actor's state
    for (msg in channel) {
        // process message here
    }
}
// send messages to the actor
c.send(25)
...
// stop the actor when it is no longer needed
c.close()

You can build a pipeline of channels, where a channel receives data from one channel and produces new data that can be consumed by another channel.

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
repeat(5) {
    println(squares.receive()) // print first five
}

println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines