Coroutines

The coroutine API is oficial Kotlin library that provides way to perform tasks asynchronously. It works on any platform, not only JVM.

val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) // to execute in background
val mainScope = MainScope() // to execute in main thread

scope.launch {
    // ...
    mainScope.launch {
        // ... 
    }
}

Coroutine is lightweight task that can be suspend (paused) and resume executing. Thus coroutine does not block current thread. For example, you can run many coroutines (100k) on single Java thread. If you try do it with threads only you may get out-of-memory error.

// will print all dots within 5 seconds.
fun main() = runBlocking {
    repeat(100_000) { // launch a lot of coroutines
        launch {
            delay(5000L)
            print(".")
        }
    }
}

Scope helps to define a lifecycle of coroutine. For example, using Android built-in scopes, you can safely run the coroutine in a Fragment. When user closes fragment before completion, the coroutine will be canceled.

Coroutine dispatcher determines which thread or threads used to execute coroutines. On JVM platform you can convert the executor to the dispatcher and vice versa.

Coroutine context is a set of various elements like scopes, Job instance and dispatcher.

The Job class represents the coroutine itself. You can cancel or await execution.

The suspend keyword specifies, that function can spawn new coroutines for long-time tasks. Such functions can be called only within scope or from other suspendable functions.

dependencies

For using coroutines you need add dependencies to your project

// In order to work with Main dispatcher, the 
// following artifacts should be added to project:
//   - kotlinx-coroutines-android for Android Main thread dispatcher
//   - kotlinx-coroutines-javafx for JavaFx Application thread dispatcher
//   - kotlinx-coroutines-swing for Swing EDT dispatcher
//   - kotlinx-coroutines-test  for testing purpose

dependencies {
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3"
    testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.3"
 // ...   
}

scopes

The CoroutineScope interface defines a scope for new coroutines.

Every coroutine builder, like launch and async, is an extension on CoroutineScope and inherits its coroutineContext to automatically propagate all its elements and cancellation.

CoroutineScope example
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)

fun async(){
    scope.launch {
        // ...
    }
}

The best ways to obtain a standalone instance of the scope are CoroutineScope() and MainScope() factory functions. Additional context elements can be appended to the scope using the plus operator.

MainScope example

coroutine builders

function description
CoroutineScope.launch(ctx, start, block)

Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a Job. The coroutine is cancelled when the resulting job is cancelled.

CoroutineScope.async(ctx, start, block)

Creates a coroutine and returns its future result as an implementation of Deferred. The running coroutine is cancelled when the resulting deferred is cancelled.

By defualt, it cancels the parent job or outer scope on failure to enforce structured concurrency paradigm. To change that behaviour, supervising parent (SupervisorJob or supervisorScope) can be used.

runBlocking(ctx, block) Runs a new coroutine and blocks the current thread interruptibly until its completion. This function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.

By default EmptyCoroutineContext is used as coroutine context.

If context does not have any dispatcher, Dispatchers.Default is used.

CoroutineStart enum has following values:

  • DEFAULT - immediately schedules the coroutine for execution according to its context
  • LAZY - starts the coroutine lazily, only when it is needed
  • ATOMIC - this is similar to DEFAULT, but the coroutine cannot be cancelled before it starts executing.
  • UNDISPATCHED - immediately executes the coroutine until its first suspension point in the current thread similarly to the coroutine being started using Dispatchers.Unconfined. However, when the coroutine is resumed from suspension it is dispatched according to the CoroutineDispatcher in its context.
async builder example
runBlocking {
 // val value1 : Deferred<Int> = ...
    val value1 = async(context = Dispatchers.IO) { 
        // .. return value
        }
    val value2 = async{ 
        // .. return value
        }
  println("Results:\n${value1.await()}\n${value2.await()}")
}
launch builder example

dispatchers

val mainDispatcher =  try {
    Dispatchers.Main.apply { isDispatchNeeded(this) }
} catch (e: IllegalStateException) { // emulate main thread
    Executors.newFixedThreadPool(1) {
        Thread(it, "Main thread emulated")
    }.asCoroutineDispatcher()
}
class description
CoroutineDispatcher Base class for coroutine dispatchers.
ExecutorCoroutineDispatcher A bridge between coroutine-based API and asynchronous API that requires an instance of the Executor.
Dispatchers Holds default dispatchers:
  • Dispatchers.Default - used by all standard builders when no dispatcher specified. It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used by this dispatcher is equal to the number of CPU cores, but is at least two.
  • Dispatchers.Main - dispatcher that is confined to the Main thread operating with UI objects. Access to this dispatche may throw IllegalStateException if no main thread dispatchers are present in the classpath (you must add appropriate dependency for Android, JavaFX and Swing).
  • Dispatchers.IO - dispatcher for offloading blocking IO tasks to a shared pool of threads
  • Dispatchers.Unconfined - A coroutine dispatcher that is not confined to any specific thread. It executes the initial continuation of a coroutine in the current call-frame and lets the coroutine resume in whatever thread that is used by the corresponding suspending function, without mandating any specific threading policy. Nested coroutines launched in this dispatcher form an event-loop to avoid stack overflows.

chanels

Channel is a non-blocking primitive for communication between a sender (via SendChannel) and a receiver (via ReceiveChannel). Conceptually, a channel is similar to Java's java.util.concurrent.BlockingQueue, but it has suspending operations instead of blocking ones and can be closed.

val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, 
    // we'll just send five squares
    for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }

Unlike a queue, a channel can be closed to indicate that no more elements are coming. On the receiver side it is convenient to use a regular for loop to receive elements from the channel.

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // we're done sending
}

for (y in channel) println(y)

There is a convenient coroutine builder named produce to create producer, and an extension function consumeEach(), that replaces a for loop on the consumer side.

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
}

synchronization

You can use thread-safe data types like AtomicInteger class.

Couroutine API provides alternative to the Java synchronized or ReentrantLock which are thread-blocking.

The Mutex class allows to delimit a critical section. Unlike ReentrantLock, the Mutex.lock() is a suspending function. It does not block a thread. Unlike ReentrantLock, mutex is non-reentrant.

change thread

The withContext() function allows you to change the thread in which the coroutine will executed. After the function completes, the coroutine will continue execution in the previous thread.

In other words, this function does not create new coroutine, but changes thread where current coroutine is executed.

runBlocking {
    println("Result:")
    
    // launch new coroutine
    launch(Dispatchers.Default) {
        println("launch coroutine in thread ${Thread.currentThread().name}")

        withContext(mainDispatcher){
            println("jump to thread ${Thread.currentThread().name}")
        }

        println("jump back to thread ${Thread.currentThread().name}")
     }
}
Result: launch coroutine in thread DefaultDispatcher-worker-2 @coroutine#2 jump to thread Main thread emulated @coroutine#2 jump back to thread DefaultDispatcher-worker-2 @coroutine#2