39

Asynchronous Programming in Vertx: Callbacks, Futures, and Coroutines

 4 years ago
source link: https://www.tuicool.com/articles/hit/qqyeMrA
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

I want to show three paradigms of asynchronous programming: callbacks, futures, and coroutines. I will provide a simple web application example using Kotlin and the Vertx framework.

Let's assume we are writing an application that receives a string in an HTTP request, searches by this string for a URL in the DB, fetches the URL content, and then sends it back to the client.

Vertx was created as an asynchronous framework for high-load applications, using Netty, new I/O, and event bus.

As it is common in Vertx, one Verticle (something like Actor, if you know Akka) receives a request, sends the received string to an event bus and then to another verticle —  BusinessVerticle , which is responsible for the fetching itself.

object Main {
    @JvmStatic
    fun main(args: Array<String>) {
        val vertx =  Vertx.vertx()
        vertx.deployVerticle(HttpVerticle())
        vertx.deployVerticle(BusinessVerticle())
    }
}
@JvmStatic
fun main(args: Array<String>) {
        val vertx =  Vertx.vertx()
        vertx.deployVerticle(HttpVerticle())
        vertx.deployVerticle(BusinessVerticle())
    }
}

class HttpVerticle : AbstractVerticle() {

    @Throws(Exception::class)
    override fun start(startFuture: Future<Void>) {
        val router = createRouter()

        vertx.createHttpServer()
            .requestHandler(router)
            .listen(8080) { result ->
                if (result.succeeded()) {
                    startFuture.complete()
                } else {
                    startFuture.fail(result.cause())
                }
            }
    }

    private fun createRouter(): Router = Router.router(vertx).apply {
        get("/").handler(handlerRoot)
    }


    private val handlerRoot = Handler<RoutingContext> { rc ->
        vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") 
               { resp: AsyncResult<Message<String>> ->
                    if (resp.succeeded()) {
                        rc.response().end(resp.result().body())
                    } else {
                       rc.fail(500)
                   }
        }
    }

}

In the standard Vertx API, you perform all asynchronous flows via callbacks, so the initial implementation of BusinessVerticle looks like this:

class BusinessVerticle : AbstractVerticle() {


    private lateinit var dbclient: JDBCClient
    private lateinit var webclient: WebClient

    override fun start() {
        vertx.eventBus().consumer<String>("my.addr") { message ->
            handleMessage(message)
        }
        dbclient = JDBCClient.createShared(
            vertx, JsonObject()
                .put("url", "jdbc:postgresql://localhost:5432/payroll")
                .put("driver_class", "org.postgresql.Driver")
                .put("user", "vala")
                .put("password", "vala")
                .put("max_pool_size", 30)
        )

        val options = WebClientOptions()
            .setUserAgent("My-App/1.2.3")

        options.isKeepAlive = false
        webclient = WebClient.create(vertx, options)
    }

    private fun handleMessage(message: Message<String>) {
        dbclient.getConnection { res ->
            if (res.succeeded()) {

                val connection = res.result()

                connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
                    if (res2.succeeded()) {
                        try {
                            val url = res2.result().rows[0].getString("url").removePrefix("http://")
                            webclient
                                .get(url,"/")
                                .send { ar ->
                                    if (ar.succeeded()) {
                                        val response = ar.result()
                                        message.reply(response.bodyAsString())
                                    } else {
                                        message.fail(500, ar.cause().message)
                                    }
                                }

                        } catch (e: Exception) {
                            message.fail(500, e.message)
                        }
                    } else {
                        message.fail(500, res2.cause().message)
                    }
                }
            } else {
                message.fail(500, res.cause().message)
            }
        }
    }

}

And this looks bad. Callbacks and error handling are done in several places.

Let's improve the situation by the extraction of each callback to a separate method:

class BusinessVerticle: AbstractVerticle() {


    private lateinit var dbclient: JDBCClient
    private lateinit var webclient: WebClient

   override fun start() {
        vertx.eventBus().consumer<String>("my.addr") { message ->
            handleMessage(message)
        }
        dbclient = JDBCClient.createShared(
            vertx, JsonObject()
                .put("url", "jdbc:postgresql://localhost:5432/payroll")
                .put("driver_class", "org.postgresql.Driver")
                .put("user", "vala")
                .put("password", "vala")
                .put("max_pool_size", 30)
        )

        val options = WebClientOptions()
            .setUserAgent("My-App/1.2.3")

        options.isKeepAlive = false
        webclient = WebClient.create(vertx, options)
    }

    private fun handleMessage(message: Message<String>) {
        dbclient.getConnection { res ->
            handleConnectionCallback(res, message)
        }
    }

    private fun handleConnectionCallback(
        res: AsyncResult<SQLConnection>,
        message: Message<String>
    ) {
        if (res.succeeded()) {

            val connection = res.result()

            connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 ->
                handleQueryCallBack(res2, message)
            }
        } else {
            message.fail(500, res.cause().message)
        }
    }

    private fun handleQueryCallBack(
        res2: AsyncResult<ResultSet>,
        message: Message<String>
    ) {
        if (res2.succeeded()) {
            try {
                val url = res2.result().rows[0].getString("url").removePrefix("http://")
                webclient
                    .get(url, "/")
                    .send { ar ->
                        handleHttpCallback(ar, message)
                    }

            } catch (e: Exception) {
                message.fail(500, e.message)
            }
        } else {
            message.fail(500, res2.cause().message)
        }
    }

    private fun handleHttpCallback(
        ar: AsyncResult<HttpResponse<Buffer>>,
        message: Message<String>
    ) {
        if (ar.succeeded()) {
            // Obtain response
            val response = ar.result()
            message.reply(response.bodyAsString())
        } else {
            message.fail(500, ar.cause().message)
        }
    }

}

Better, but still bad.

It's just not readable code. The message object, which should pass to all methods, creates a separate error handling in each callback.

Let's rewrite it using Future s. The great advantage of using Future is the ability to chain them using   Future.compose() .

First, let's translate standard Vertx methods, which receive callbacks to methods and returns Future . We will use "extension methods" — a Kotlin feature that allows us to add a method to an existing class.

fun JDBCClient.getConnectionF(): Future<SQLConnection> {
    val f = Future.future<SQLConnection>()
    getConnection { res ->
        if (res.succeeded()) {
            val connection = res.result()
            f.complete(connection)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

fun SQLConnection.queryF(query:String): Future<ResultSet> {
    val f = Future.future<ResultSet>()
    query(query) { res ->
        if (res.succeeded()) {
            val resultSet = res.result()
            f.complete(resultSet)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> {
    val f = Future.future<HttpResponse<M>>()
    send() { res ->
        if (res.succeeded()) {
            val response = res.result()
            f.complete(response)
        } else {
            f.fail(res.cause())
        }
    }
    return f
}

And then, the BusinessVerticle.handleMessage method will be transformed to:

private fun handleMessage(message: Message<String>) {
        val content = getContent(message)

        content.setHandler{res->
            if (res.succeeded()) {
                // Obtain response
                val response = res.result()
                message.reply(response)
            } else {
                message.fail(500, res.cause().message)
            }
        }

    }

    private fun getContent(message: Message<String>): Future<String> {
        val connection = dbclient.getConnectionF()
        val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
        val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") }
        val httpResponse = url.compose { webclient.get(it, "/").sendF() }
        val content = httpResponse.map { it.bodyAsString() }
        return content
    }

Looks good.

Simple, readable code. And we also have error handling in one place. If required, we can add different error handling for different exceptions and/or extract it to a separate method.

But what if we need, under some condition, to stop the chain of   Future.compose() ?

For example, if there are no records in the DB, we want to return a response "No records" with HTTP code 200 instead of an error and code 500?

The only way to do it is to throw a special exception and make special handling for such an exception:

class NoContentException(message:String):Exception(message)

 private fun getContent(message: Message<String>): Future<String> {
        val connection = dbclient.getConnectionF()
        val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") }
        val url = resultSet.map {
            if (it.numRows<1)
                throw NoContentException("No records")
            it.rows[0].getString("url").removePrefix("http://")
        }
        val httpResponse = url.compose { webclient.get(it, "/").sendF() }
        val content = httpResponse.map { it.bodyAsString() }
        return content
    }

    private fun handleMessage(message: Message<String>) {
        val content = getContent(message)

        content.setHandler{res->
            if (res.succeeded()) {
                // Obtain response
                val response = res.result()
                message.reply(response)
            } else {
                if (res.cause() is NoContentException)
                    message.reply(res.cause().message)
                else
                    message.fail(500, res.cause().message)
            }
        }
    }

It works well, but it does not look so good — we are using an exception for flow control, and if there are a lot of "special cases" in the flow, the code will be much less readable.

So, let's try to do the same thing with Kotlin coroutines. There are a lot of articles about Kotlin coroutines, so I'm not going to explain here what they are and how they work.

In the last versions of Vertx, you can include automatically generated coroutine-friendly versions of all callback methods.

You should add libraries:  'vertx-lang-kotlin-coroutines'  and 'vertx-lang-kotlin' . You will get:  JDBCClient.getConnectionAwait() ,   SQLConnection.queryAwait() , and others.

If we use those methods, our message handling method will become simpler:

private suspend fun handleMessage(message: Message<String>) {
        try {
            val content = getContent(message)
            message.reply(content)
        } catch(e:Exception){
            message.fail(500, e.message)
        }

    }

    private suspend fun getContent(message: Message<String>): String {
        val connection = dbclient.getConnectionAwait()
        val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
        val url =  resultSet.rows[0].getString("url").removePrefix("http://")
        val httpResponse = webclient.get(url, "/").sendAwait()
        val content = httpResponse.bodyAsString()
        return content
    }

Additionally, you should change the event bus message subscribe code:

vertx.eventBus().consumer<String>("my.addr") { message ->
           GlobalScope.launch(vertx.dispatcher()) {  handleMessage(message)}
        }

What happens here?

All those await methods call code in an asynchronous way. Then, they wait for a result, and while they wait, a thread is switching and running another coroutine.

If we will look to implementation those await methods, we will see something very similar to our homemade implementation for   Future s:

suspend fun SQLClient.getConnectionAwait(): SQLConnection {
  return awaitResult {
    this.getConnection(it)
  }
}

suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T {
  val asyncResult = awaitEvent(block)
  if (asyncResult.succeeded()) return asyncResult.result()
  else throw asyncResult.cause()
}

suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T {
  return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
    try {
      block.invoke(Handler { t ->
        cont.resume(t)
      })
    } catch (e: Exception) {
      cont.resumeWithException(e)
    }
  }
}

But here, we get a normal code — String as a return type (and not Future < String >   ) and  try / catch   instead of the ugly callback with  AsyncResult .

If we need to stop flow in the middle, we can do it in a natural way, without exceptions:

private suspend fun getContent(message: Message<String>): String {
        val connection = dbclient.getConnectionAwait()
        val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'")
        if (resultSet.numRows<1)
            return "No records"
        val url =  resultSet.rows[0].getString("url").removePrefix("http://")
        val httpResponse = webclient.get(url, "/").sendAwait()
        val content = httpResponse.bodyAsString()
        return content
    }

IMHO, it is just beautiful!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK