

A Quick Demo: Kafka to Flink to Cassandra
source link: https://blog.knoldus.com/a-quick-demo-kafka-to-flink-to-cassandra/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

A Quick Demo: Kafka to Flink to Cassandra
Reading Time: 3 minutes
Hi Folks!! In this blog, we are going to learn how we can integrate Flink with Kafka and Cassandra to build a simple streaming data pipeline.
Apache Flink is a framework and distributed processing engine. it is used for stateful computations over unbounded and bounded data streams.
Kafka is a scalable, high performance, low latency platform. It allows reading and writing streams of data like a messaging system.
Cassandra: A distributed and wide-column NoSQL data store.
Minimum Requirements and Installations
To start the application, you will need Kafka, and Cassandra installed locally on your machine. The minimum requirements for the application:
Java 1.8+, scala 2.12.2, Flink 1.9.0 , sbt 1.3.12, Kafka 2.3.0 , Cassandra 3.10.
Dependencies
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-connector-kafka" % "1.9.0",
"org.apache.flink" %% "flink-streaming-scala" % "1.9.0" ,
"org.json4s" %% "json4s-native" % "3.6.10",
// cassandra
"org.apache.flink" %% "flink-connector-cassandra" % "1.9.0"
)
Connecting to Kafka and reading streams.
import org.apache.flink.streaming.api.scala._
implicit lazy val formats = org.json4s.DefaultFormats
// Open Kafka connection and Streaming car data through topic.
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testKafka");
val kafkaConsumer = new FlinkKafkaConsumer[String]("car.create", new SimpleStringSchema(), properties)
// parsing JSON string into Car case class using json4s
val carDataStream = streamExecutionEnvironment.addSource(kafkaConsumer)
.flatMap(raw => JsonMethods.parse(raw).toOption)
.map(_.extract[Car])
In the above code snippet, reading JSON data from Kafka Topic “car.create” which contains information about Cars. And deserializes the message as a JSON string using SimpleStringSchema. Then parsing JSON string into Scala case class using json4s. The Car model looks like below:
case class Car(
Name: String,
Miles_per_Gallon: Option[Double],
Cylinders: Option[Long],
Displacement: Option[Double],
Horsepower: Option[Long],
Weight_in_lbs: Option[Long],
Acceleration: Option[Double],
Year: String,
Origin: String)
By the use of the Flink streaming engine and reading the JSON data from the Kafka topic, we will get DataStream[Car] as a result. You can apply some Transformations to the Car DataStream. Then sink the resultant DataStream to the Cassandra Database.
Writing Flink DataStream to CassandraDB.
import org.apache.flink.streaming.api.scala._
createTypeInformation[(String, Option[Long], Option[Long])]
//Creating car data to sink into cassandraDB.
val sinkCarDataStream = sinkCarStream.map(car =>
(car.Name, car.Cylinders.orNull, car.Horsepower.orNull))
//Open Cassandra connection and Sinking car data into cassandraDB.
CassandraSink.addSink(sinkCarDataStream)
.setHost("127.0.0.1")
.setQuery("INSERT INTO example.car(Name, Cylinders, Horsepower) values (?, ?, ?);")
.build
We are all set with our handy code. You can find complete source code here.
Now lets start the Kafka and Cassandra services locally to test it.
Running Cassandra:
Go to the Cassandra bin directory and run the below command to start cassandra server:
./cassandra
Then, go inside the cassandra shell by running command:
./cqlsh
In the shell, run below commands to create Keyspace example and table car into cassandra-
CREATE KEYSPACE [IF NOT EXISTS] example
WITH REPLICATION = {
'class' : 'SimpleStrategy', 'replication_factor' : 1 }
};
CREATE TABLE [IF NOT EXISTS] example.car("Name" text primary key, "Cylinders" int, "Horsepower" int)
Running Kafka:
Go inside your kafka directory:
- Start Zookeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
- Start Kafka server:
bin/kafka-server-start.sh config/server.properties
- Create Kafka Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic car.create
- Start Kafka Producer:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic car.create
Runnuing Flink application
Go inside the project and open a terminal and run the below commands:
sbt clean compile
sbt run
Produce some sample messages in the kafka topic car.create
{“Name”:”saab 99e”, “Miles_per_Gallon”:25, “Cylinders”:4, “Displacement”:104, “Horsepower”:95, “Weight_in_lbs”:2375, “Acceleration”:17.5, “Year”:”1970-01-01″, “Origin”:”Europe”} {“Name”:”amc gremlin”, “Miles_per_Gallon”:21, “Cylinders”:6, “Displacement”:199, “Horsepower”:90, “Weight_in_lbs”:2648, “Acceleration”:15, “Year”:”1970-01-01″, “Origin”:”USA”} {“Name”:”chevy c20″, “Miles_per_Gallon”:10, “Cylinders”:8, “Displacement”:307, “Horsepower”:200, “Weight_in_lbs”:4376, “Acceleration”:15, “Year”:”1970-01-01″, “Origin”:”USA”}
Result
Go to the cassandra shell and run the below command:
select * from example.car;
You will get Name of the cars, Number of Cylinders used, and Horsepower of a cars into the cassandra Database that streams from kafka.
Thanks for reading. Stay connected for more future blogs.
Recommend
-
51
1. 依赖 Flink版本:1.11.2 Apache Flink 内置了多个 Kafka Connector:通用、0.10、0.11等。这个通用的 Kafka Conn...
-
8
Flink SQL Demo: Building an End-to-End Streaming Application 28 Jul 2020 Jark Wu (@JarkWu) Apache Flink 1.11 has released many exciting new features, including many d...
-
9
An Overview of End-to-End Exactly-Once Processing in Apache Flink (with Apache Kafka, too!) 01 Mar 2018 Piotr Nowojski (@PiotrNowojski) & Mike Winters (
-
12
分享嘉宾...
-
8
Real-Time Exactly-Once Event Processing at Uber With Apache Flink, Kafka, and Pinot Nov 12, 2021...
-
4
Creating Data Pipeline with Spark streaming, Kafka and Cassandra Reading Time: 3 minutes Hi Folks!! In this blog, we are g...
-
11
Real-Time Exactly-Once Ad Event Processing with Apache Flink and Kafka Skip to footer ...
-
7
Flink的sink实战之二:kafka 推荐 原创 程序员欣宸 2022-03-24 06:27:32...
-
8
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos 本次实战的内容是开发Fl...
-
10
本文首发于我的个人博客网站 等待下一个秋-Flink 什么是CDC? CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK