64

Moving from RxJava to Coroutines – ITNEXT

 4 years ago
source link: https://itnext.io/moving-from-rxjava-to-coroutines-4e1c34419111?gi=f9f6b674d010
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Moving from RxJava to Coroutines

Image for post
Image for post

Seeing a pull request like the below would make me feel terrified about the horrific things that i would see. Well, that’s last year, cause Coroutines are now here to help us write clean, robust and scalable code, just like RxJava did.

Image for post
Image for post

In this post I will talk about the steps I followed to move from RxJava to Coroutines, on a pet project I recently published on Github NewsSync.

The App

First, I will give a small introduction on how the app is architected.

Data layer is composed of Repositories that are responsible for fetching and mapping the data from the data sources. Sources are composed of a Retrofit service and a Room database that stores the data fetched from network. Both sources always return an Observable like Flowable (in order to listen for database changes) or Completable.

UseCases hold the business logic as they are responsible for communicating with the repositories, threading and returning the appropriate responses to presentation layer.

And lastly, presentation layer consists of ViewModels that are requesting the data from UseCases and mapping it to LiveData State variables, via ReactiveStreams.

Starting from Data Sources

My initial thought was to start applying changes from the lower layer of the application, the data layer and the data sources.

Hopefully, Room supports Coroutines out of the box, starting from version 2.1.0 (you can read more about it here), and for Retrofit we can use the Coroutines adapter, created by Jake Wharton, that can be applied to it. So I was good to go.

Suspending function

Reading about Coroutines, I realised that the real magic happens due to suspend functions.

Suspend functions are functions that can suspend the execution of a coroutine, without blocking the thread that they are running on, and resume with the rest of the function, when the coroutine gets finished.

What I did was to change repositories functions, from normal to suspend.

fun appendStories(q: String?): Flowable<List<Story>>suspend fun appendStories(q: String?): List<Story>

Now we can run asynchronous task, like network and database requests, and return the actual response from the call.

So instead of returning a Flowable from our network request

override fun appendStories(q: String?): Flowable<List<Story>> {
return networkSource.searchArticles(q)
.doOnNext { data ->
localSource
.insertStories(data.articles).subscribe()
}
}

We can get the actual response, store it immediately on our database and return it.

override suspend fun appendStories(q: String?): List<Story> {
val data = networkSource.searchArticles(q).await()
localSource.insertStories(data.articles)
return data.articles
}

What await() here does is to await for the completion of a call that returns a Deferred, without blocking the thread. When the call is completed it returns the value from the call and resumes the rest of the function.

Error Handling

The problem with the above code is that if one of the sources throws an Exception then the app would crash. In RxJava we could handle possible Exceptions using the OnError callback, but that’s not possible here. One possible solution to this it to wrap each of our calls in a try-catch block, but that would make our code ugly and mostly unreadable.

To solve this we can propagate our call response in a sealed class called Result, which has a Success and an Error state.

sealed class Result<out T> {

data class Success<out T>(val data: T) : Result<T>()
data class Error(val exception: Exception) : Result<Nothing>()

companion object {
fun <T> success(data: T): Result<T> =
Success(data)

fun error(exception: Exception) : Result<Nothing> =
Error(exception)
}
}

To do this I used high order functions that take as a parameter a suspend or a Deferred function and return a Result class

suspend fun <T> requestAwait(
call: () -> Deferred<T>
): Result<T> {
return try {
val result = call().await()
Result.success(result)
} catch (exception: Exception) {
Timber.e(exception)
Result.error(exception)
}
}

suspend fun <T> request(
call: suspend () -> T
): Result<T> {
return try {
Result.success(call())
} catch (exception: Exception) {
Timber.e(exception)
Result.error(exception)
}
}

So repository’s suspend function would finally look like this

override suspend fun appendStories(q: String?): Result<List<Story>> {
val response = requestAwait { networkSource.searchArticles(q) }
return when
(response) {
is Result.Success -> {
val articles = response.data.articles
request { localSource.insertStories(articles) }
Result.success(articles)
}
is Result.Error -> response
}
}

Threading

One of the key features of RxJava is how easy is to specify the thread you want to run on, using subscribeOn and observeOn operators.

Coroutines by default are using a Dispatcher called Default, which is backed by a shared pool of threads on JVM. Coroutines allow us to change that Dispatcher and one of the ways to do it is using the withContext function.

withContext is a function that allows us to change the context that will be used to run the code of a coroutine and therefore use a different thread.

So instead of this

fun requestStories(q: String?): Flowable<List<Story>> {
return storiesRepository.searchStories(q)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}

the UseCase’s code now looks like below

suspend fun requestStories(q: String?): Result<List<Story>> {
return withContext(Dispatchers.Main) {
return
@withContext storiesRepository.searchStories(q)
}
}

with Dispatchers.Main being the main thread that the coroutine will run on. The reason I used suspend also here is because a suspend function, that searchStories is, can only be called from a coroutine or another suspend function.

Moving to Presentation Layer

Last step to move away from RxJava, was to remove it from ViewModels. ViewModels were requesting data, from the UseCases, which was getting mapped to a sealed class State and then transformed to LiveData via ReactiveStreams, as you can see below

val stories: LiveData<State<List<Story>>> by lazy {
searchUseCase
.requestStories(q)
.toState()
.toLiveData()
}

Since i wanted to remove RxJava, there was no need for ReactiveStreams and the transformation to LiveData. So, i decided to use a regular postValue to update the LiveData variable, in order to keep it as simple as it can be.

private val _stories = MutableLiveData<State<List<Story>>>()

val stories: LiveData<State<List<Story>>>
get() = _storiesfun getStories() = viewModelScope.launch {
_stories
.postValue(State.loading())
_stories.postValue(searchUseCase.requestStories(q).toState())
}

viewModelScope is a CoroutineScope tied to ViewModel’s lifecycle, that gets canceled when ViewModel gets cleared. The reason I use it here is that, as I mentioned before, suspend functions can only be called from a coroutine or another suspend function. So, viewModelScope launches a coroutine, in order to call the suspend functions.

If you’re interested, you can find the repository’s Coroutines version here and you can see the commit which migrated the app from RxJava to Coroutines here:

Testing

In the next part we will see the necessary changes, in order for the project’s unit tests to pass again, after the migration.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK