0

Design of Kotlin Coroutines

 1 year ago
source link: https://proandroiddev.com/design-of-kotlin-coroutines-879bd35e0f34
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

THE BEHIND THE SCENES

Design of Kotlin Coroutines

What does the coroutine creation process look like?

1*RVzAeizLr51jtdbNaBDVnA.jpeg

Most of us use coroutines, but who knows what the coroutine creation process looks like? The structure of the blog post is below:

  1. Definitions
  2. CPS — Continuation Passing Style
  3. Kotlin coroutine principle
    3.1 Coroutine construction
    — 3.1.1 launch()
    — 3.1.2 start()
    — 3.1.3 invoke()
    — 3.1.4 startCoroutineCancellable()
    — 3.1.5 resumeWithCancellable()
    — 3.1.6 resumeWith()
    — 3.1.7 invokeSuspend()
    — 3.1.8 Summary of coroutine construction
    3.2 Bytecode analyses

1. Definitions

What is a Coroutine?

A coroutine is an instance of suspendable computation. It is conceptually similar to a thread, in the sense that it takes a block of code to run that works concurrently with the rest of the code. However, a coroutine is not bound to any particular thread. It may suspend its execution in one thread and resume in another one.

In asynchronous programs, tasks are executed in parallel on separate threads without waiting for other tasks to complete. Improper use of multithreading can lead to high CPU usage or increased CPU cycles and can drastically reduce the performance of your application and therefore threads are an expensive resource. Coroutines are a lightweight alternative to threads.

What is suspend function?

Suspend function is a function that could be started, paused, and resume. One of the most important points to remember about the suspend functions is that they are only allowed to be called from a coroutine or another suspend function.

1*vo7XdbX5O2WrT7yMNPWsNQ.jpeg

When a coroutine is suspended, that thread is free for other coroutines. The continuation of the coroutine doesn’t have to be on the same thread. Here we conclude that we can simultaneously run many coroutines with a small number of threads. You will see below how it all works.

Suspending vs Non-suspending/regular function:

  • A suspension function has zero or more suspension points, while a regular function has zero suspension points. A suspension point represents a statement in its body that can pause the execution of a function to resume at a later time
  • Non-suspended functions cannot directly call suspending functions, because they do not support suspension points
  • Suspending functions can call non-suspended functions because they have 0 suspension points

This is a simple suspend function:

As we said above, we cannot call a suspended function inside a regular function. Let’s decompile the functionA() to see what happens.

Go to Tools->Kotlin->Show Kotlin Bytecode.

1*VuWjEjoDEAD-68O12tEp4A.jpeg

You're probably wondering where the Continuation argument comes from and what it means. Now we will explain where it comes from, and later we will show what it is. Each suspend function goes through a CPS-Continuation Passing Style transformation.

We can observe the suspension of the coroutine through the following example. Let’s imagine that we are playing a game and we want to pause (suspend) it and later continue playing where we left off. In that case, we archive the game, and when we want to continue, we can restore from the archive the last paused point. When the process is suspended, it will return Continuation.

2. CPS — Continuation Passing Style

In the CPS transformation suspend function with n parameters (p_1, p_2, p_3,… p_n) and result type T, they get an additional parameter p_n+1 which is of type Continuation<T> and the return type is Any?. The return type is changed to Any? because:

  • If function returns a result, we get T
  • If the function suspend, it return signal value COROUTINE_SUSPENDED which means that is a suspended state.
1*-3IF7S5Sng-Z1q4-Y60L4A.jpeg

3. Kotlin coroutine principle

Let’s go through the coroutine process with an example, understand how the coroutine is created and explain the flow.

In the example above we have two suspended functions. The getRandomNum() function takes a random number in the range from 1 to 1000, passes it to another suspended function getSqrt() that calculates its root.

Let’s set a breakpoint at line 6 and run the code in debug mode to pick up the stack traces of what happened before executing the body of the coroutine. With this we want to see what the coroutine creation process looks like.

1*ewTP61lN6BGZePrCPW43Qg.jpeg

This is a stack trace:

Execution order is:

1*WHjUyEwSIklIbjK_F0E2bA.jpeg

3.1 Coroutine construction

3.1.1 launch()

1*ca_HZEcMbHWHfaTAaaQKLQ.jpeg

Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a Job.

CoroutineScope

Defines a scope for new coroutines. Every coroutine builder (like launch, async, etc.) is an extension on CoroutineScope and inherits its coroutineContext to automatically propagate all its elements and cancellation.

CoroutineScope is an interface with only one property coroutineContext.

Note: In our example we used launch coroutine builder and we will explain creating a coroutine through it. Other coroutine builders are quite intuitive to this example.

The launch coroutine builder has three parameters:

  • context — additional to CoroutineScope.coroutineContext context of the coroutine
  • start —coroutine start option. The default value is CoroutineStart.DEFAULT
  • block — the coroutine code which will be invoked in the context of the provided scope. The compiler generates an inner class for the suspended lambda function at compile time that extends the SuspendLambda and implements Function2:

That means that Kotlin generates an SuspendLambda anonymous inner class for each coroutine at compile time. This is coroutine body class. Two methods are implemented internally:

  • invokeSuspend()contains code in the body of our coroutine, which internally handles the value of the state. The most important state change logic in the coroutine, which is called the state machine, is contained here.
  • create()— method receives a Continuation object, then creates and returns an object of the coroutine body class.

Coroutine state machine

Kotlin implements suspendable functions as state machines, since such implementation does not require specific runtime support. This dictates the explicit suspend marking (function colouring) of Kotlin coroutines: the compiler has to know which function can potentially suspend, to turn it into a state machine.

The states from the state machine correspond to the suspension points. In our example:

1*b2Y4_huMCvcsrJI1Ivw6zw.jpeg

There are three states for this block of code:

  • L0: until suspension point #1
  • L1: until suspension point #2
  • L2: until end
1*3eX6d8Bd7LPUJxYaFd6pWg.jpeg

The pseudo-code of the generated state machine with these three states looks like this:

Continuation state-machine implementation pseudo-code

The logic of the coroutine is encapsulated in the invokeSuspend method, which we mentioned earlier.
SuspendLambda the inheritance relationship is:

1*Wscb3gP0K5PcgqKEz0Vdjg.jpeg

Continuation

Interface representing a continuation after a suspension point that returns a value of type T.

Continuations are so important because they allow the continuation of the coroutine. Every suspending function is associated with a generated Continuation subtype, which handles the suspension implementation.

  • context — the context of the coroutine that corresponds to this continuation
  • resumeWith() — is used to propagate the results in between suspension points. It is called with the result (or exception) of the last suspension point and resumes the coroutine execution

BaseContinuationImpl

The key source code of BaseContinuationImpl is:

invokeSuspend() — are an abstract method which is implemented in coroutine body class created in compile time.

The implementation of the resumeWith() method always calls the invokeSuspend() method.

ContinuationImpl

ContinuationImpl inherits BaseContinuationImpl. Its function is to generate an DispatchedContinuation object using the interceptor, which is also a Continuation. We can talk more about this in section 3.2.4.

Let’s continue to analyze the body of the launch() function.

1*iWEGaCGp1APeDF-XxZooWg.png

newCoroutineContext()

newCoroutineContext — Creates a context for a new coroutine. It installs Dispatchers.Default when no other dispatcher or ContinuationInterceptor is specified and adds optional support for debugging facilities (when turned on) and copyable-thread-local facilities on JVM.

The newCoroutineContext is an extension function on CoroutineScope. Its function is to merge the inherited context (scope context) from the CoroutineScope with the context of the passed parameter and return the new context.

1*WvIDWwfBTREHu3ENANlMvg.jpeg

Let’s briefly go over CoroutineContext.

CoroutineContext

CoroutineContext is an immutable indexed union set of Element instances like CoroutineName, CoroutineId, CoroutineExceptionHandler, ContinuationIntercepter, CoroutineDispatcher, Job. Every element in this set has a unique Key.

Imagine that we want to control which thread or thread pool will run our coroutine. Depending on whether we want to run a task on the main thread, or whether the tasks are CPU or IO bound, we will use dispatchers.

Dispatchers are thread schedulers provided in the coroutine, used to switch threads and specify the threads that the coroutine runs on. There are four types of schedulers in Dispatchers:

All these dispatchers are CoroutineDispatcher. We said above that CoroutineDispatcher is one of the elements of CoroutineContext, which means that these dispatchers are elements of CoroutineContext.

We said that CoroutineContext is like collection and that collection contains different types of Element. We can create a new context by adding/removing an element, or by merging two existing contexts. The plus operator works as an extension of Set.plus and returns the combination of two contexts with the elements on the right side of plus replacing the elements with the same key on the left.

Context without any element can be created as an instance of EmptyCoroutineContext.

1*sAYxX03FsB7thm3QiNSfWA.jpeg

Example#1Example#2Example#3

1*xJhm7Le6Ve6au6glw5yidw.jpeg

A CoroutineContext is never overridden, but merged with an existing one.
Now that we've learned a few things about CoroutineContext, we can go back to where we left off, which is newCoroutineContext in the body of the launch builder function.

Let’s define different contexts to make it easier for us to understand:

  • scope contextthe context defined in CoroutineScope
  • passed context — the builder function receives a CoroutineContext instance in its first parameter
  • parent context— The suspending block parameter in the builder function has a CoroutineScope receiver, which itself also provides a CoroutineContext. This context is not a new coroutine context!
1*aiPCcv88MVuN9-kVtBWViw.jpeg
  • The new coroutine creates its own child Job instance (using a job from this context as its parent) and defines its coroutine context(child context)as a parent context plus its Job. We will see later in more detail how we concluded this.

After defining a new coroutine context (parent context), we can proceed with creating a new coroutine.

1*iWEGaCGp1APeDF-XxZooWg.png

StandaloneCoroutine

We use new context (parent context) for creating coroutine. For start parameter default value is CoroutineStart.DEFAULT. In this case, we create StandaloneCoroutine (inhered from AbstractCoroutine) with return type is a Job. StandaloneCoroutine is a coroutine object.

Note: If we set that start is lazy we will have LazyStandaloneCoroutine. LazyStandaloneCoroutine inhered from StandaloneCoroutine, and StandaloneCoroutine inhered from AbstractCoroutine.

Only the handleJobException method has been overridden in the StandaloneCoroutine class to handle exceptions not handled by the parent coroutine. The start method called here is a method of the parent class AbstractCoroutine.

AbstractCoroutine class implements the JobSupport class and the Job, Continuation and CoroutineScope interfaces. AbstractCoroutine class is mainly responsible for Coroutine recovery and result return.

1*TqaiW-GBonjPIb3HFAQSeQ.jpeg

JobSupport

JobSupport is the specific implementation of Job. AbstractCoroutine can be used as a Job to control the lifecycle of the coroutine, it can implement the Continuation interface and it can also be used as a Continuation.

The context of AbstractCoroutine is the context we passed through the parameter (parentContext) plus the current coroutine, and since we know that AbstractCoroutine is both Job and CoroutineScope, we know that the context of our coroutine contains a Job element. That context is the coroutine context (child context).

1*mVuiEpHvwxsxJ0b6INDv9Q.jpeg

Second step in stack trace is coroutine.start(start, coroutine, block).

3.2.2. start()

1*Wxts56_bxvx_rkfIRQzP0A.jpeg

Starts this coroutine with the given code block and start strategy. This function shall be invoked at most once on this coroutine.

The AbstractCoroutine#start() method calls the start() method. CoroutineStart is an enum class, and the invoke() method is internally overridden. In that case, the start() method calls the CoroutineStart.invoke() method.

3.2.3. invoke()

1*PfuX_I4YAWfwsv0_Z8KrRg.jpeg

Defines start options for coroutines builders. It is used in start parameter of launch, async, and other coroutine builder functions.

CoroutineStart is an enumeration class with four types:

  • DEFAULT — immediately schedules coroutine for execution according to its context
  • LAZY — starts coroutine lazily, only when it is needed
  • ATOMIC — atomically (in a non-cancellable way) schedules coroutine for execution according to its context
  • UNDISPATCHED — immediately executes coroutine until its first suspension point in the current thread

Here, DEFAULT is used as an example.

3.2.4. startCoroutineCancellable()

1*IxhD8H4rzx3cuu0X06p6Zg.jpeg

Use this function to start coroutine in a cancellable way, so that it can be cancelled while waiting to be dispatched.

runSafely() — runs given block and completes completion with its exception if it occurs. Rationale: startCoroutineCancellable is invoked when we are about to run coroutine asynchronously in its own dispatcher. Thus if dispatcher throws an exception during coroutine start, coroutine never completes, so we should treat dispatcher exception as its cause and resume completion.

The startCoroutineCancellable implementation is a chained call. Let's go through that chain:

  1. createCoroutineUnintercepted() is extended function called throught coroutine body and coroutine body are compiled into a subclass of SuspendLambda (coroutine body class), so this is the BaseContinuationImpl.

The create() method creates an instance of the coroutine body class and here we get an instance of the coroutine class.

2. intercepted() —intercepts this continuation with ContinuationInterceptor.

this is an example of a coroutine body class that inherits ContinuationImpl.

If intercepted is null, intercept the coroutine body class via the interceptor specified in the context and return the wrapped coroutine body class object.

context[ContinuationInterceptor] — gets the scheduler from the collection, and calls the interceptContinuation().
The interceptContinuation() method is used to wrap the coroutine body Continuation into a DispatchedContinuation.

DispatchedContinuation

DispatchedContinuation represents the Continuation object of the coroutine body and holds the thread scheduler. Its function is to use the thread scheduler to schedule the coroutine body to the specified thread for execution.

Notice that it takes a dispatcher and a continuation in the constructor, and it implements both Continuation<T> and DispatchedTask<T>.

3. resumeCancellableWith() — an extension method of Continuation

If this is not an intercepted and wrapped object of the coroutine’s body class, resumeWith(result) will be called.
Otherwise, if this is the intercepted and wrapped DispatchedContinuation class object, then the resumeCancellableWith(result, onCancellation) function is called.

If you look at the CoroutineDispatcher source code, the return value of dispatcher.isDispatchNeeded() is always true, only Dispatchers.Unconfined will override this to false.
If dispatcher.isDispatchNeeded() returns false, we call resumeWith() method directly from the coroutine body class.

dispatcher.dispatch(context, this) is actually equivalent to distributing the execution process of the code to the Default thread pool. The second parameter is Runnable, we pass this here, because DispatchedContinuation indirectly implements the Runnable interface.

Dispatchers.Default is DefaultScheduler and DefaultScheduler is a singleton class, so only one instance can be created and used everywhere.
DefaultScheduler is a subclass of SchedulerCoroutineDispatcher.

Dispatchers.Default#dispatch() calls the dispatch() method of SchedulerCoroutineDispatcher, which calls coroutineScheduler.dispatch().

CoroutineScheduler

CoroutineScheduler is a thread pool implemented in Kotlin that provides threads on which coroutines can be run, which means it generates them.
CoroutineScheduler is a subclass of Executor and its execute() method is also forwarded to dispatch().

Inside the dispatch() method we see the following:

  • createTask() — We create a Task from the passed block of type Runnable which is actually a DispatchedContinuation.
  • currentWorker() gets the currently executing thread. The worker is an internal class of CoroutineScheduler.
  • currentWorker.submitToLocalQueue()adds the task to the local queue of the Worker thread and waits for execution.

Worker

Worker is the thread of Kotlin coroutine. The implementation of Worker inherits Thread, which is essentially an encapsulation of Java threads. We conclude that the Worker is a thread.

Let’s analyze how the Worker performs the task.

The Worker will override the Thread run() method and then the runWorker() method will be invoked. In a while loop, it will always try to fetch the Task from the local Worker queue. If there is a task that needs to be executed, executeTask(task) will be called to execute it.

Let’s see what the executeTask(task) method does:

Inside the runSafely() method we call task.run(). Task is a Runnable and Runnable#run() actually means that our coroutine task is actually running.

DispatchedContinuation inherits from the DispatchedTask class, DispatchedTask inherits SchedulerTask and SchedulerTask implements the Runnable interface. We see that DispatchedTask finally implements the Runable interface, so let’s look at the run() implementation of DispatchedTask.

In the run() method, the original coroutine class Continuation object is obtained through DispatchedContinuation.
resumeWithStackTrace, resumeWithException, and resume are extension methods that trigger resumeWith() method.

3.2.5. resumeWith

1*t1cvbpjMSAalpOp8t1-IXw.jpeg

The final implementation of resumeWith() is in the BaseContinuationImpl class:

Let’s pay attention to the invokeSuspend() function.

3.2.6. invokeSuspend

1*HLjeocfvjdvBkTLTq7vETQ.jpeg

The coroutine body will be executed sequentially through the state machine until the suspend function is called. When we call it, the function will return the COROUTINE_SUSPEND flag and it will directly return to exit the loop as well as the coroutine body. In that case we will not get a thread blocking.
When a function needs to be suspended, the state machine will store the previous result as a continuationin member variable. When the suspend function resumes, the Continuation resumeWith() method is called, and then invokeSuspend() is called. That way, the remaining code from the coroutine body will also continue to execute.

1*plj4er7LEFdVnzRj-2_kXQ.jpeg

3.1.8. Summary of coroutine creation

  1. The coroutine body (suspended lambda function) is compiled into an inner class at compile time that extends SuspendLambda and implements Function2. The specific inheritance chain is
    SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
  2. CoroutineScope#launch() creates a coroutine, according to the default launch mode CoroutineStart.DEFAULT, creates a coroutine object StandaloneCoroutine and launches StandaloneCoroutine#start(start, coroutine, block)
  3. StandaloneCoroutine is a subclass of the AbstractCoroutine class, and the implementation of StandaloneCoroutine#start() is found in AbstractCoroutine (AbstractCoroutine#start()). AbstractCoroutine#start() triggers CoroutineStart#invoke()
  4. Since in our example the dispatcher is Dispatchers.Default, we call the startCoroutineCancellable() method of the coroutine body to the processing logic CoroutineStart#invoke()
  5. startCoroutineCancellable() is a chain call:
    createCoroutineUnintercepted().intercepted().resumeCancellableWith()
  6. createCoroutineUnintercepted() creates a coroutine body class object.
  7. intercepted() uses an interceptor/scheduler to wrap the coroutine body class object into DispatchedContinuation. DispatchedContinuation represents a Continuation object of the coroutine body class and contains the scheduler.
  8. Since the dispatcher is is Dispatchers.Default and the isDispatchNeeded() function returned true , DispatchedContinuation#resumeCancellableWith() use the thread scheduler to run dispatcher#dispatch(context, this) for scheduling.
  9. Dispatchers.Default#dispatch() calls the dispatch() method of SchedulerCoroutineDispatcher, which calls CoroutineScheduler#dispatch()
  10. CoroutineScheduler allocates a thread Worker and triggers the run() of DispatchedContinuation (DispatchedTask#run()) in the Worker#run() method.
  11. The run method triggers resumeWith() method. The execution of the coroutine body is actually a call to the resumeWith() method.
1*j6NPAyhojgjP6u6FFkcuhg.jpeg

3.2 Bytecode analysis

Let’s analyze the bytecodes of our example
.Go to Tools->Kotlin->Show Kotlin Bytecode. Than click on Decompile to generate the corresponding decompiled java code.

Let’s look at the invokeSuspend function:

  1. lines 15,16 — We check if var10000 is equal to COROUTINE_SUSPENDED. If so, that means we don’t have an available result and need to suspend and wait until an available result is returned. In our case, the getRandomNum() method will return COROUTINE_SUSPENDED. Before this we set the label to 1.

Why did the getRandomNum() method return COROUTINE_SUSPENDED?

In line 13, delay() is called. Let’s see its implementation.

Returning suspendCancellableCoroutine is COROUTINE_SUSPENDED. It needs to suspend and wait for the result to return. It can be seen that the logic of delay here is similar to the Handle mechanism (Handler.postDelayed). After the execution is completed, continuation.resume() -> BaseContinuationImpl.resumeWith()-> SuspendLambda.invokeSuspend() will be called to restore.

When the execution of the getRandomNum() method is complete and the available result is returned, the invokeSuspend() method is called. At this point label = 1. First we call the throwOnFailure() method which throws an exception if the result is a failure. If the result is successful, we assign the result to var10000, and then break executes the following logic (lines 32-39).

2. lines 15,16 — Executing the getSqrt() method is analogous to executing the getRandomNum() method. At this point label = 2.
When the execution of the getRandomNum() method is complete and the available result is returned, the invokeSuspend() method is called again. We call the throwOnFailure() method again which throws an exception if the result is a failure. If the result is successful, we assign the result to var10000, then execute break label17 and go to line 101 to execute the rest of the logic.

3. When the execution of the getRandomNum() method is complete and the available result is returned, the invokeSuspend() method is called again. We call the throwOnFailure() method again which throws an exception if the result is a failure. If the result is successful, we assign the result to var10000, then execute break label17 and go to line 42 to execute the rest of the logic.

1*QJf2_Xbb6fIycaz8C00ZMA.jpeg

Now you can explore other coroutine builders and see exactly what the differences are. If you have any doubts, you can write to me on LinkedIn.

I hope you liked this article. Stay tuned for more!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK