1

Self-Learning Kafka Streams with Scala - #1 - Knoldus Blogs

 3 years ago
source link: https://blog.knoldus.com/self-learning-kafka-streams-scala-1/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Self-Learning Kafka Streams with Scala

Reading Time: 2 minutes

A few days ago, I came across a situation where I wanted to do a stateful operation on the streaming data. So, I started finding possible solutions for it. I came across many solutions which were using different technologies like Spark Structured Streaming, Apache Flink, Kafka Streams, etc.

All the solutions solved my problem, but I selected Kafka Streams because it met most of my requirements. After that, I started reading its documentation and trying to run its examples. But, as soon as I started learning it, I hit a major roadblock, that was, “Kafka Streams does not provide a Scala API!“. I was shocked to know that.

The reason I was expecting Kafka Streams to have a Scala API was that I am using Scala to build my application and if Kafka Streams provided an API for it then it would have been easy for me to include it in my application. But that didn’t turn out to be the case. Over the top when I searched for its Scala examples, I was able to find only a handful of them.

So, I decided to learn it on my own and my first step was to build a “Hello World!” program using Kafka Streams and Scala, like this:

package com.knoldus.kafka.examples

import java.util.Properties

import org.apache.kafka.common.serialization._

import org.apache.kafka.streams._

import org.apache.kafka.streams.kstream.KStreamBuilder

/**

* Copyright Knoldus Software LLP, 2017. All rights reserved.

*/

object StreamApplication {

def main(args: Array[String]): Unit = {

val config = {

val properties = new Properties()

properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")

properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)

properties

}

val builder = new KStreamBuilder()

val sourceStream = builder.stream("SourceTopic")

sourceStream.to("SinkTopic")

val streams = new KafkaStreams(builder, config)

streams.start()

}

}

Before running this example, we need to start Kafka server. To do that you can read their quick start guide. After that, send some messages to Kafka topic – “SourceTopic” and start a Kafka Consumer for Kafka topic – “SinkTopic“.

xxxxxxxxxx
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic SourceTopic
hello world!

Now, run the example and you will see that Kafka consumer topic – “SinkTopic” will receive the message.

xxxxxxxxxx
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic SinkTopic
hello world!

This means that now we are able to send messages from one Kafka topic to another via Kafka Streams.

So, this was my first step to learn Kafka Streams with Scala. I know that it is not much, but I still need to explore more in Kafka Streams like transformations, joins, aggregations, etc., about which I will be writing in my future posts. So, stay tuned 🙂

The complete code can be downloaded from Github.

Please feel free to suggest or comment!






About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK