128

Building data pipelines in Kotlin, using Akka and Kafka

 6 years ago
source link: https://www.kotlindevelopment.com/data-pipelines-kotlin-akka-kafka/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

The purpose of this post is three-fold:

  • to evangelize Kotlin for enterprise use-cases;
  • to raise awareness about Akka and the ecosystem around it among Java and Kotlin developers;
  • to give credit to the JVM, making it possible to mix&match various technologies.

Context

Kotlin had a pretty busy year in 2017. Google announced official support for the language on Android. It got selected as a candidate for the programming language of the year. According to StackOverflow, Kotlin was growing so quickly, it "had to be truncated in the plot", while they created statistics. More and more server frameworks are adding support for Kotlin, e.g. Spring or Vert.x. And JetBrains is also working on Kotlin multiplatform, promising that we can compile the same code to multiple target platforms: JVM, JS, & Native (e.g. iOS).

So it seems like Kotlin is the perfect language for building your Next Big Thing with. 🙂

But you might have existing projects running in production, and you don't want to reimplement everything from scratch, just because Kotlin is awesome. Well, there is one particular use-case that I want to cover today, which I find very well suited for experimentation: writing integration code between your existing software components. Chances are that you have a bunch of services that expose some APIs over HTTP, also that you use message queues, relational databases, etc. Let's say you want to build an analytics component that polls one of your services, writes data into a message queue for later analysis, then, after post-processing, updates statistics in a SQL database.

Introducing Kotlin GitHub monitor

For demonstration purposes, I've built an app that polls GitHub's events API, and monitors the whole Kotlin organization so I can keep track of what's going on with the development of the language.

The app has the following functionality:

  • polls GitHub and watches for new events;
  • writes all events into a Kafka topic for later use;
  • reads events from Kafka and filters out PushEvents;
  • updates a Postgres database with: who pushed changes, when, into which repository.

I intentionally tried to keep the code very simple, omitting a few things that you probably would have implemented in case of production use, like recovery from errors, retires, metrics, etc.

Stack

As a foundation, I used the Akka toolkit, more specifically Akka Streams and a companion project called Alpakka. The former is general purpose stream processing library (implementing the Reactive Streams specification), and the latter is a collection of connectors and tools to build data integration between various technologies (e.g. AWS S3, MongoDB, or Google Cloud Pub/Sub).

Akka originates from the Scala ecosystem (also, most of its source code is written in Scala), but has a full-fledged Java API, making it possible to use it from Kotlin seamlessly. Usually, a lot of things can be expressed much more simply when you use the Scala API vs the Java API, but here's the catch: Kotlin has very nice language features and some syntactic sugar, making the two APIs almost identical, in terms of ergonomy.

This post is not intended to be a "getting started guide for Akka Streams", so I won't explain how Akka Streams works in detail, but you don't have to fully understand what happens under the hood to be able to follow the code. There are basically two things that you have to remember: data is moving from Sources to Sinks. If you are familiar with RxJava, Source roughly translates to Observable and Sink to a specialized Observer.

Let's write code!

The entry point of our application is a regular Java main function:

fun main(vararg args: String) {
  val system = ActorSystem.create()
  val materializer = ActorMaterializer.create(system)

  val gitHubClient = GitHubClient(system, materializer)
  val eventsProducer = EventsProducer(system, materializer)
  val eventsConsumer = EventsConsumer(system)
  val pushEventProcessor = PushEventProcessor(materializer)

  eventsProducer.write(gitHubClient.events())
  pushEventProcessor.run(eventsConsumer.read())
}

Let's break it down:

  • 2-3 are basically boilerplate for using Akka and Akka Streams (pretty much everything you do in Akka requires an ActorSystem, and ActorMaterializer is something required to run your streams, more on this a bit later);
  • 5 instance used to poll the GitHub events API;
  • 6 instance used to write events into Kafka;
  • 7 instance used to read events from Kafka;
  • 8 instance used to filter PushEvents and update the database;
  • 10-11 put things in motion.

Now that we have a high-level overview of the moving pieces, let's look inside them one by one.

GitHubClient

While communicating with the GitHub Events API, there are a few things we have to watch out for. The API is optimized for polling, but GitHub requires its users to follow a couple of simple rules:

  • do not fetch data unnecessarily (it uses the HTTP ETag header mechanism and returns 304 Not Modified in case nothing has changed)
  • do not poll the API too frequently (it uses a special header X-Poll-Interval to signal the allowed rate for the user)

This means that after an initial request, we have to keep track of a previous request-response before executing a new one and use the appropriate ETag header and keep the poll interval. Akka Streams has an operator for such recursive use-cases, called Source.unfoldAsync.

fun poll(): Source<GitHubResponse, NotUsed> =
  Source.unfoldAsync(GitHubRequest(), { request ->
    executeWithDelay(request).thenApply { response ->
      val nextRequest = GitHubRequest(eTagOpt = response.eTagOpt, delayOpt = response.pollIntervalOpt)
      Optional.of(Pair.create(nextRequest, response))
    }
  })
  • 2 creates the initial GitHubRequest (with no ETag header and poll interval);
  • 3 executes the request;
  • 4 creates a subsequent request based on the response headers returned for the initial request;
  • 5 returns a non-empty Option with the current response and the next request supposed to be executing, signaling to unfoldAsync that we haven't finished (if we would have returned Optional.empty() the Source would have stopped).

And the code for executing each request:

fun executeWithDelay(request: GitHubRequest): CompletionStage<GitHubResponse> {
  fun execute(request: GitHubRequest): CompletionStage<GitHubResponse> {
    val httpRequest = mapToHttpRequest(request)
    logger.debug("executing request: $httpRequest")
    return client.singleRequest(httpRequest).thenCompose { response -> mapToGitHubResponse(response) }
  }
  return Source
    .single(request)
    .delay(FiniteDuration(request.delayOpt.orElse(lastPollInterval), TimeUnit.SECONDS), DelayOverflowStrategy.backpressure())
    .mapAsync(1, ::execute)
    .runWith(Sink.head(), materializer)
}
  • 2-6 helper function for logging and executing a request;
  • 9 introduces a delay before moving forward to the helper execute method;
  • 11 here we have an interesting concept, specific for Akka Streams
    • when you define Sinks and Sources, what you actually do is building blueprints;
    • these blueprints are not really doing anything interesting on their own, just define what should happen with your data eventually;
    • in order to actually set these blueprints in motion, you have to "materialize" them, in this case with the runWith method;
    • Sink.head() means that we take the first item as the materialized value of the stream;
    • because streams are asynchronous, Akka exposes the materialized value as a Java 8 CompletionStage.

Each time we receive a response from GitHub, we parse it and send individual events downstream.

fun events(): Source<JsonNode, NotUsed> =
  poll().flatMapConcat { response ->
    response.nodesOpt
      .map { nodes -> Source.from(nodes) }
      .orElse(Source.empty())
  }
  • 3 the response may or may not contain an array of events (e.g. in case of an HTTP 304 there are no events), if it does, we create a new Source from them.

The rest of the code is just boilerplate for processing the request/response and extracting header values. Now that we have the infrastructure for communicating with GitHub, let's move on to writing/reading events to Kafka.

EventsProducer and EventsConsumer

Here comes the power of Akka Streams and Alpakka. There is a module called Akka Streams Kafka, which greatly reduces the amount of code that we have to write for integrating with Kafka. Publishing events into a Kafka topic look like the following.

fun write(events: Source<JsonNode, NotUsed>): CompletionStage<Done> =
  events
    .map { node -> ProducerRecord<ByteArray, String>("kotlin-events", objectMapper.writeValueAsString(node)) }
    .runWith(Producer.plainSink(settings), materializer)
  • 3 maps a GitHub event into a Kafka ProducerRecord (serializing JsonNode as a String);
  • 4 connects the Source to a special purpose Sink defined in Akka Streams Kafka, called Producer.plainSink, which is taking care of communicating with Kafka.

The other way around, reading from Kafka, is also super simple.

fun read(): Source<JsonNode, NotUsed> =
  Consumer.plainSource(settings, Subscriptions.assignmentWithOffset(TopicPartition("kotlin-events", 0), 0L))
    .map { record -> objectMapper.readTree(record.value()) }
    .mapMaterializedValue { NotUsed.getInstance() }
  • 2 Consumer.plainSource is a custom Source also defined in Akka Streams Kafka, which takes care of connecting to a topic and reading records from Kafka;
  • 3 maps a record back to a JsonNode, so basically we can work with them as if they would have come from our GitHubClient.

At this point, we have a copy of GitHub's events feed for github.com/Kotlin stored in Kafka, so we can time travel and run different analytics jobs on our local dataset.

PushEventProcessor

According to our specification, we want to filter out PushEvents from the stream and update a Postgres database with the results. Alpakka has a package for interacting with SQL databases called Slick (JDBC) Connector. There is one downside of using the Slick Connector with the Java API: we have to write all the SQL code with String interpolation (Slick has a powerful Scala API that can be used to write type-safe queries and other things). Using Kotlin's raw string support can help though.

First, we need to be able to create a new table (if it doesn't exist).

fun createTableIfNotExists(): Source<Int, NotUsed> {
  val ddl =
    """
      |CREATE TABLE IF NOT EXISTS kotlin_push_events(
      |  id         BIGINT    NOT NULL,
      |  name       VARCHAR   NOT NULL,
      |  timestamp  TIMESTAMP NOT NULL,
      |  repository VARCHAR   NOT NULL,
      |  branch     VARCHAR   NOT NULL,
      |  commits    INTEGER   NOT NULL
      |);
      |CREATE UNIQUE INDEX IF NOT EXISTS id_index ON kotlin_push_events (id);
    """.trimMargin()
  return Slick.source(session, ddl, { _ -> 0 })
}
  • 14 Slick.source can be used to execute SQL code via JDBC and process its results:
    • the third function that we pass to Slick.source has the signature of (SlickRow) -> T, where T will be the generic type of the source returned: Source<T, NotUsed>;
    • in case of e.g. a SELECT statement, we could use this function to map rows in the result set to a custom data class.

Similarly, the function to update the database looks like this.

fun Source<PushEvent, NotUsed>.updateDatabase(): CompletionStage<Done> =
  createTableIfNotExists().flatMapConcat { this }
    .runWith(Slick.sink<PushEvent>(session, 20, { event ->
      """
        |INSERT INTO kotlin_push_events(id, name, timestamp, repository, branch, commits)
        |VALUES (
        |  ${event.id},
        |  '${event.actor.login}',
        |  '${Timestamp.valueOf(event.created_at)}',
        |  '${event.repo.name}',
        |  '${event.payload.ref}',
        |  ${event.payload.distinct_size}
        |)
        |ON CONFLICT DO NOTHING
      """.trimMargin()
    }), materializer)
  • 1 please note, that I defined updateDatabase as an extension method on the type Source<PushEvent, NotUsed> (you'll see why);
  • 2 creates the table then switches back to the original Source;
  • 3 Slick.sink takes each item flowing through the stream and executes the provided SQL statement, in this case, an INSERT.

We are almost done, what's left is filtering and mapping from JsonNode to PushEvent and composing the methods together.

fun Source<JsonNode, NotUsed>.filterPushEvents(): Source<PushEvent, NotUsed> =
  filter { node -> node["type"].asText() == "PushEvent" }
    .map { node -> objectMapper.convertValue(node, PushEvent::class.java) }
  • 1 filterPushEvents is also defined as an extension method.

And finally, all the functions composed together look like this.

fun run(events: Source<JsonNode, NotUsed>): CompletionStage<Done> =
  events
    .filterPushEvents()
    .updateDatabase()

This is why we've used the extension methods above, so we can describe transformations like this, simply chained together. That's it, after running the app for a while (gradle app:run) we can see the activities around different Kotlin repositories. You can find the complete source code on GitHub.

A very nice property of using Akka Streams and Alpakka is that it makes really easy to migrate/reuse your code, e.g. in case you want to store data in Cassandra later on instead of Postgres. All you would have to do is define a different Sink with CassandraSink.create. Or if GitHub events would be dumped in a file located in AWS S3 instead of published to Kafka, all you would have to do is create a Source with S3Client.download(bucket, key). The current list of available connectors is located here, and the list is growing.

Summary

Akka is a very powerful technology, that helps you write complex (and correct) code more easily. Using Kotlin makes the code much more readable and I think there is huge potential here for either the community or Lightbend (the company behind Akka) to create even more idiomatic APIs and wrappers to use with Kotlin. This is one of the benefits of the JVM platform, enabling code to happily co-exist in a single runtime, witten in different languages, e.g. Java, Scala and Kotlin. I really think that 2018 will be an even more exciting year for the Kotlin community and that its adoption will grow further not just on mobile, but for other use-cases, like server side development, data science, machine learning, etc. If you've enjoyed this post, keep an eye out on our blog where we will cover more of these topics.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK