4

Kafka Producer and Consumer In Golang

 1 year ago
source link: https://manoj-gupta.github.io/kafka/Kafka-Producer-Consumer-Golang/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Apache Kafka is a distributed publish-subscribe messaging system that is designed to be fast, scalable, and durable. Kafka stores streams of records (messages) in topics. Each record consists of a key, a value, and a timestamp. Producers write data to topics and consumers read from topics.

Running Kafka Cluster

For demostration, Kafka cluster is needed. I have installed it on x86 machine running Ubuntu 22.04.1 LTS.

Install Java

First, you need to install Java in order to run the Kafka executables. Visit official download page to download Debian file on your system. Also, you can use the below command to download the Debian file using the command line.

$ wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.deb

Once you download the Debian file, install it on your system using command below:

sudo apt install ./jdk-17_linux-x64_bin.deb 

The above command installs Oracle Java 17 on your system. All the files will be placed under /usr/lib/jvm directory. Now set an alternative reference to newly installed Java on your system.

sudo update-alternatives --install /usr/bin/java java /usr/lib/jvm/jdk-17/bin/java 100

Install Kafka

  • Download Kafka’s binary from official download page.
  • Extract it to a directory using following command tar -xvf kafka_2.12–2.5.0.tgz Calling this directory as KAFKA_DIR for future reference.
  • Check it is running using bin/kafka-topics.sh
  • Add the path to PATH environment variable by editing .bashrc file

Running

Start Zookeeper

Zookeeper is a distributed key-value store commonly used to store server state for coordination, which is required by Kafka to run.

To start Zookeeper issue the following command from KAFKA_DIR

bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka brokers

Kafka brokers are where our data is stored and distributed. Similar to Zookeeper, there are two files to start and configure the broker servers.

  • bin/kafka-server-start.sh: which is used to start the broker server.
  • config/server.properties: which provides the default broker configuration

Let’s start up 3 brokers to have distributed system. There are several options but three properties need to be unique for each server

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# The address the socket server listens on. It will get the value returned from 
listeners=PLAINTEXT://:9092

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

Copy the config/server.properties file and make 3 files for each server instance:

cp config/server.properties config/server.1.properties
cp config/server.properties config/server.2.properties
cp config/server.properties config/server.3.properties

Change above mentioned properties for each file so that they are all unique.

server.1.properties

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1

server.2.properties

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2

server.3.properties

broker.id=3
listeners=PLAINTEXT://:9095
log.dirs=/tmp/kafka-logs-3

Create the required log directories.

mkdir /tmp/kafka-logs-1 /tmp/kafka-logs-2 /tmp/kafka-logs-3

Start the broker instances using below commands on different terminal sessions.

bin/kafka-server-start.sh config/server.1.properties
bin/kafka-server-start.sh config/server.2.properties
bin/kafka-server-start.sh config/server.3.properties

You should see a startup message when the brokers start successfully.

[2022-09-08 15:11:10,987] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)

Create sample topic

We need to create a topic to which the data before adding data to Kafka. To do this, run the command:

$ bin/kafka-topics.sh --create --topic sample-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2

Once you create the topic, you should see a confirmation message:

Created topic sample-kafka-topic.

You can verify it using the command:

bin/kafka-topics.sh --list --bootstrap-server localhost:9093

It should return the topic just created

sample-kafka-topic

Implementation

Kafka Producer

Following function writes a sample message every second into Kafka Cluster.

// the topic and broker addresses
const (
	topic          = "message-log"
	broker1Address = "localhost:9093"
	broker2Address = "localhost:9094"
	broker3Address = "localhost:9095"
)

func Produce(ctx context.Context) {
	count := 0

	l := log.New(os.Stdout, "Kafka producer: ", 0)
	// intialize the writer with the broker addresses, and the topic
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{broker1Address, broker2Address, broker3Address},
		Topic:   topic,
		Logger:  l, // assign the logger to the writer
	})

	for {
		// message is key:value pair. Key determines the partition to publish data
		err := w.WriteMessages(ctx, kafka.Message{
			Key:   []byte(strconv.Itoa(count)),
			Value: []byte("Sample message" + strconv.Itoa(count)),
		})

		if err != nil {
			panic("Error: write message " + err.Error())
		}

		// log a confirmation once the message is written
		fmt.Println("written: ", count)
		count++

		// sleep for a second
		time.Sleep(time.Second)
	}
}

Kafka Consumer

Following function consumes messages from the Kafka cluster.

func Consume(ctx context.Context) {
	l := log.New(os.Stdout, "Kafka consumer: ", 0)

	// groupID - identifies the consumer to receiving duplicates
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{broker1Address, broker2Address, broker3Address},
		Topic:   topic,
		GroupID: "sample-group",
		Logger:  l, // assign the logger to the reader
	})

	for {
		// the `ReadMessage` blocks until we receive the next event
		msg, err := r.ReadMessage(ctx)
		if err != nil {
			panic("could not read message " + err.Error())
		}
		// after receiving the message, log its value
		fmt.Println("received: ", string(msg.Value))
	}
}

Following function calls above functions to demonstrate functionality.

func main() {
	ctx := context.Background()
	// produce messages in a new go routine as
	// both producer and consumer functions are blocking
	go kafkapc.Producer(ctx)
	kafkapc.Consumer(ctx)
}

By running this code, following output will be seen:

Kafka producer: writing 1 messages to sample-kafka-topic (partition: 0)
written:  0
Kafka consumer: committed offsets for group sample-group: 
	topic: sample-kafka-topic
		partition 0: 1
received:  Sample message0
Kafka producer: writing 1 messages to sample-kafka-topic (partition: 1)
Kafka consumer: committed offsets for group sample-group: 
	topic: sample-kafka-topic
		partition 1: 1
received:  Sample message1
written:  1
Kafka producer: writing 1 messages to sample-kafka-topic (partition: 2)
Kafka consumer: committed offsets for group sample-group: 
	topic: sample-kafka-topic
		partition 2: 1
received:  Sample message2
written:  2
Kafka producer: writing 1 messages to sample-kafka-topic (partition: 0)
written:  3
Kafka consumer: committed offsets for group sample-group: 
	topic: sample-kafka-topic
		partition 0: 2
received:  Sample message3
....
....

To tune Kafka client, you can use options list here.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK