Building data pipelines in Kotlin, using Akka and Kafka
source link: https://www.kotlindevelopment.com/data-pipelines-kotlin-akka-kafka/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
The purpose of this post is three-fold:
- to evangelize Kotlin for enterprise use-cases;
- to raise awareness about Akka and the ecosystem around it among Java and Kotlin developers;
- to give credit to the JVM, making it possible to mix&match various technologies.
Context
Kotlin had a pretty busy year in 2017. Google announced official support for the language on Android. It got selected as a candidate for the programming language of the year. According to StackOverflow, Kotlin was growing so quickly, it "had to be truncated in the plot", while they created statistics. More and more server frameworks are adding support for Kotlin, e.g. Spring or Vert.x. And JetBrains is also working on Kotlin multiplatform, promising that we can compile the same code to multiple target platforms: JVM, JS, & Native (e.g. iOS).
So it seems like Kotlin is the perfect language for building your Next Big Thing with. 🙂
But you might have existing projects running in production, and you don't want to reimplement everything from scratch, just because Kotlin is awesome. Well, there is one particular use-case that I want to cover today, which I find very well suited for experimentation: writing integration code between your existing software components. Chances are that you have a bunch of services that expose some APIs over HTTP, also that you use message queues, relational databases, etc. Let's say you want to build an analytics component that polls one of your services, writes data into a message queue for later analysis, then, after post-processing, updates statistics in a SQL database.
Introducing Kotlin GitHub monitor
For demonstration purposes, I've built an app that polls GitHub's events API, and monitors the whole Kotlin organization so I can keep track of what's going on with the development of the language.
The app has the following functionality:
- polls GitHub and watches for new events;
- writes all events into a Kafka topic for later use;
- reads events from Kafka and filters out
PushEvent
s; - updates a Postgres database with: who pushed changes, when, into which repository.
I intentionally tried to keep the code very simple, omitting a few things that you probably would have implemented in case of production use, like recovery from errors, retires, metrics, etc.
Stack
As a foundation, I used the Akka toolkit, more specifically Akka Streams and a companion project called Alpakka. The former is general purpose stream processing library (implementing the Reactive Streams specification), and the latter is a collection of connectors and tools to build data integration between various technologies (e.g. AWS S3, MongoDB, or Google Cloud Pub/Sub).
Akka originates from the Scala ecosystem (also, most of its source code is written in Scala), but has a full-fledged Java API, making it possible to use it from Kotlin seamlessly. Usually, a lot of things can be expressed much more simply when you use the Scala API vs the Java API, but here's the catch: Kotlin has very nice language features and some syntactic sugar, making the two APIs almost identical, in terms of ergonomy.
This post is not intended to be a "getting started guide for Akka Streams", so I won't explain how Akka Streams works in detail, but you don't have to fully understand what happens under the hood to be able to follow the code. There are basically two things that you have to remember: data is moving from Source
s to Sink
s. If you are familiar with RxJava
, Source
roughly translates to Observable
and Sink
to a specialized Observer
.
Let's write code!
The entry point of our application is a regular Java main
function:
fun main(vararg args: String) {
val system = ActorSystem.create()
val materializer = ActorMaterializer.create(system)
val gitHubClient = GitHubClient(system, materializer)
val eventsProducer = EventsProducer(system, materializer)
val eventsConsumer = EventsConsumer(system)
val pushEventProcessor = PushEventProcessor(materializer)
eventsProducer.write(gitHubClient.events())
pushEventProcessor.run(eventsConsumer.read())
}
Let's break it down:
2-3
are basically boilerplate for using Akka and Akka Streams (pretty much everything you do in Akka requires anActorSystem
, andActorMaterializer
is something required to run your streams, more on this a bit later);5
instance used to poll the GitHub events API;6
instance used to write events into Kafka;7
instance used to read events from Kafka;8
instance used to filterPushEvent
s and update the database;10-11
put things in motion.
Now that we have a high-level overview of the moving pieces, let's look inside them one by one.
GitHubClient
While communicating with the GitHub Events API, there are a few things we have to watch out for. The API is optimized for polling, but GitHub requires its users to follow a couple of simple rules:
- do not fetch data unnecessarily (it uses the HTTP
ETag
header mechanism and returns304 Not Modified
in case nothing has changed) - do not poll the API too frequently (it uses a special header
X-Poll-Interval
to signal the allowed rate for the user)
This means that after an initial request, we have to keep track of a previous request-response before executing a new one and use the appropriate ETag header and keep the poll interval. Akka Streams has an operator for such recursive use-cases, called Source.unfoldAsync
.
fun poll(): Source<GitHubResponse, NotUsed> =
Source.unfoldAsync(GitHubRequest(), { request ->
executeWithDelay(request).thenApply { response ->
val nextRequest = GitHubRequest(eTagOpt = response.eTagOpt, delayOpt = response.pollIntervalOpt)
Optional.of(Pair.create(nextRequest, response))
}
})
2
creates the initialGitHubRequest
(with no ETag header and poll interval);3
executes the request;4
creates a subsequent request based on the response headers returned for the initial request;5
returns a non-emptyOption
with the current response and the next request supposed to be executing, signaling tounfoldAsync
that we haven't finished (if we would have returnedOptional.empty()
theSource
would have stopped).
And the code for executing each request:
fun executeWithDelay(request: GitHubRequest): CompletionStage<GitHubResponse> {
fun execute(request: GitHubRequest): CompletionStage<GitHubResponse> {
val httpRequest = mapToHttpRequest(request)
logger.debug("executing request: $httpRequest")
return client.singleRequest(httpRequest).thenCompose { response -> mapToGitHubResponse(response) }
}
return Source
.single(request)
.delay(FiniteDuration(request.delayOpt.orElse(lastPollInterval), TimeUnit.SECONDS), DelayOverflowStrategy.backpressure())
.mapAsync(1, ::execute)
.runWith(Sink.head(), materializer)
}
2-6
helper function for logging and executing a request;9
introduces a delay before moving forward to the helperexecute
method;11
here we have an interesting concept, specific for Akka Streams- when you define
Sink
s andSource
s, what you actually do is building blueprints; - these blueprints are not really doing anything interesting on their own, just define what should happen with your data eventually;
- in order to actually set these blueprints in motion, you have to "materialize" them, in this case with the
runWith
method; Sink.head()
means that we take the first item as the materialized value of the stream;- because streams are asynchronous, Akka exposes the materialized value as a Java 8
CompletionStage
.
- when you define
Each time we receive a response from GitHub, we parse it and send individual events downstream.
fun events(): Source<JsonNode, NotUsed> =
poll().flatMapConcat { response ->
response.nodesOpt
.map { nodes -> Source.from(nodes) }
.orElse(Source.empty())
}
3
the response may or may not contain an array of events (e.g. in case of an HTTP 304 there are no events), if it does, we create a newSource
from them.
The rest of the code is just boilerplate for processing the request/response and extracting header values. Now that we have the infrastructure for communicating with GitHub, let's move on to writing/reading events to Kafka.
EventsProducer and EventsConsumer
Here comes the power of Akka Streams and Alpakka. There is a module called Akka Streams Kafka, which greatly reduces the amount of code that we have to write for integrating with Kafka. Publishing events into a Kafka topic look like the following.
fun write(events: Source<JsonNode, NotUsed>): CompletionStage<Done> =
events
.map { node -> ProducerRecord<ByteArray, String>("kotlin-events", objectMapper.writeValueAsString(node)) }
.runWith(Producer.plainSink(settings), materializer)
3
maps a GitHub event into a KafkaProducerRecord
(serializingJsonNode
as aString
);4
connects theSource
to a special purposeSink
defined in Akka Streams Kafka, calledProducer.plainSink
, which is taking care of communicating with Kafka.
The other way around, reading from Kafka, is also super simple.
fun read(): Source<JsonNode, NotUsed> =
Consumer.plainSource(settings, Subscriptions.assignmentWithOffset(TopicPartition("kotlin-events", 0), 0L))
.map { record -> objectMapper.readTree(record.value()) }
.mapMaterializedValue { NotUsed.getInstance() }
2
Consumer.plainSource
is a customSource
also defined in Akka Streams Kafka, which takes care of connecting to a topic and reading records from Kafka;3
maps a record back to aJsonNode
, so basically we can work with them as if they would have come from ourGitHubClient
.
At this point, we have a copy of GitHub's events feed for github.com/Kotlin stored in Kafka, so we can time travel and run different analytics jobs on our local dataset.
PushEventProcessor
According to our specification, we want to filter out PushEvent
s from the stream and update a Postgres database with the results. Alpakka has a package for interacting with SQL databases called Slick (JDBC) Connector. There is one downside of using the Slick Connector with the Java API: we have to write all the SQL code with String interpolation (Slick has a powerful Scala API that can be used to write type-safe queries and other things). Using Kotlin's raw string support can help though.
First, we need to be able to create a new table (if it doesn't exist).
fun createTableIfNotExists(): Source<Int, NotUsed> {
val ddl =
"""
|CREATE TABLE IF NOT EXISTS kotlin_push_events(
| id BIGINT NOT NULL,
| name VARCHAR NOT NULL,
| timestamp TIMESTAMP NOT NULL,
| repository VARCHAR NOT NULL,
| branch VARCHAR NOT NULL,
| commits INTEGER NOT NULL
|);
|CREATE UNIQUE INDEX IF NOT EXISTS id_index ON kotlin_push_events (id);
""".trimMargin()
return Slick.source(session, ddl, { _ -> 0 })
}
14
Slick.source
can be used to execute SQL code via JDBC and process its results:- the third function that we pass to
Slick.source
has the signature of(SlickRow) -> T
, whereT
will be the generic type of the source returned:Source<T, NotUsed>
; - in case of e.g. a
SELECT
statement, we could use this function to map rows in the result set to a custom data class.
- the third function that we pass to
Similarly, the function to update the database looks like this.
fun Source<PushEvent, NotUsed>.updateDatabase(): CompletionStage<Done> =
createTableIfNotExists().flatMapConcat { this }
.runWith(Slick.sink<PushEvent>(session, 20, { event ->
"""
|INSERT INTO kotlin_push_events(id, name, timestamp, repository, branch, commits)
|VALUES (
| ${event.id},
| '${event.actor.login}',
| '${Timestamp.valueOf(event.created_at)}',
| '${event.repo.name}',
| '${event.payload.ref}',
| ${event.payload.distinct_size}
|)
|ON CONFLICT DO NOTHING
""".trimMargin()
}), materializer)
1
please note, that I definedupdateDatabase
as an extension method on the typeSource<PushEvent, NotUsed>
(you'll see why);2
creates the table then switches back to the originalSource
;3
Slick.sink
takes each item flowing through the stream and executes the provided SQL statement, in this case, anINSERT
.
We are almost done, what's left is filtering and mapping from JsonNode
to PushEvent
and composing the methods together.
fun Source<JsonNode, NotUsed>.filterPushEvents(): Source<PushEvent, NotUsed> =
filter { node -> node["type"].asText() == "PushEvent" }
.map { node -> objectMapper.convertValue(node, PushEvent::class.java) }
1
filterPushEvents
is also defined as an extension method.
And finally, all the functions composed together look like this.
fun run(events: Source<JsonNode, NotUsed>): CompletionStage<Done> =
events
.filterPushEvents()
.updateDatabase()
This is why we've used the extension methods above, so we can describe transformations like this, simply chained together. That's it, after running the app for a while (gradle app:run
) we can see the activities around different Kotlin repositories. You can find the complete source code on GitHub.
A very nice property of using Akka Streams and Alpakka is that it makes really easy to migrate/reuse your code, e.g. in case you want to store data in Cassandra later on instead of Postgres. All you would have to do is define a different Sink
with CassandraSink.create
. Or if GitHub events would be dumped in a file located in AWS S3 instead of published to Kafka, all you would have to do is create a Source
with S3Client.download(bucket, key)
. The current list of available connectors is located here, and the list is growing.
Summary
Akka is a very powerful technology, that helps you write complex (and correct) code more easily. Using Kotlin makes the code much more readable and I think there is huge potential here for either the community or Lightbend (the company behind Akka) to create even more idiomatic APIs and wrappers to use with Kotlin. This is one of the benefits of the JVM platform, enabling code to happily co-exist in a single runtime, witten in different languages, e.g. Java, Scala and Kotlin. I really think that 2018 will be an even more exciting year for the Kotlin community and that its adoption will grow further not just on mobile, but for other use-cases, like server side development, data science, machine learning, etc. If you've enjoyed this post, keep an eye out on our blog where we will cover more of these topics.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK