5

Kafka客户端--Go版本

 3 years ago
source link: https://jjmeg.github.io/posts/kafka-golang-client/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Kafka客户端--Go版本

2020.5.6 974 2 分钟
「 Kafka 」 分布式消息平台,用Scala实现,本项目使用Go语言实现简单的消息生产和消费。

Kafka 客户端 Golang 版

project feature weakness

Shopify/sarama 最受欢迎 集群式消费难实现,不支持Context

bsm/sarama-cluster 基于sarama补充集群式消费 不支持Context

confluentinc/confluent-kafka-go

依赖C语言库,不支持Context

lovoo/goka 依赖于sarama 不支持Context

segmentio/kafka-go 同时支持集群模式,易与软件交互 未正式发布,支持Context


下面将基于 Shopify/sarama 实现简单的消费者及生产者。

基本数据结构

  • Reporter 生产者:包含一个Producer及一个logger用于日志记录

  • Subscriber 消费者:包含一个Consumer及logger 具体实现

package reporter

import (
	"github.com/Shopify/sarama"
	"github.com/sirupsen/logrus"

	"../error_const"
)

// kafka server 端配置
type KafkaCfg struct {
	Host  string `json:"host"`
	Topic string `json:"topic"`
}

// 消息生产者数据结构
type Reporter struct {
	Producer sarama.SyncProducer
	logger   *logrus.Logger
}

// 构造函数
func NewReporter(cfg *KafkaCfg, log *logrus.Logger) *Reporter {
	reporter := &Reporter{
		logger: log,
	}

	reporter.setProducer(cfg)
	return reporter
}

func (reporter *Reporter) setProducer(cfg *KafkaCfg) {
	var broker = []string{cfg.Host}

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	// new 一个Producer
	producer, err := sarama.NewSyncProducer(broker, config)
	if err != nil {
		reporter.logger.Errorf(error_const.InitProducerError, err)
	}

	reporter.Producer = producer
}

func (reporter *Reporter) DoReport(topic string, msg []byte) {
	reporter.do(topic, msg)
}

func (reporter *Reporter) do(topic string, msg []byte) {
	kafkaMsg := generateProducerMessage(topic, msg)
	_, _, err := reporter.Producer.SendMessage(kafkaMsg)
	if err != nil {
		reporter.logger.Errorf(error_const.ReportKafkaMsgError, err, string(msg))
	}
	reporter.logger.Infof(error_const.ReportKafkaMsgSuccess, string(msg))
}

func generateProducerMessage(topic string, message []byte) *sarama.ProducerMessage {
	return &sarama.ProducerMessage{
		Topic:     topic,
		Partition: -1,
		Value:     sarama.StringEncoder(message),
	}
}
package subscriber

import (
	"github.com/Shopify/sarama"
	"github.com/sirupsen/logrus"

	"../error_const"
	"../reporter"
)

type Subscriber struct {
	Consumer sarama.Consumer
	logger   *logrus.Logger
}

func NewSubscriber(cfg *reporter.KafkaCfg, log *logrus.Logger) *Subscriber {
	subscriber := &Subscriber{
		logger: log,
	}
	subscriber.setConsumer(cfg)
	return subscriber
}

func (subscriber *Subscriber) setConsumer(cfg *reporter.KafkaCfg) {
	consumer, err := sarama.NewConsumer([]string{cfg.Host}, nil)
	if err != nil {
		panic(err)
	}
	subscriber.Consumer = consumer
}

func (subscriber *Subscriber) Consume(topic string, ch chan string) {
	defer func() {
		if err := subscriber.Consumer.Close(); err != nil {
			subscriber.logger.Errorf(error_const.SubcriberCloseConsumerError, err)
		}
	}()

	// 获取所有 partition
	partitionList, err := subscriber.Consumer.Partitions(topic)
	if err != nil {
		subscriber.logger.Errorf(error_const.SubScriberGetPartitionsError, err, topic)
	}

	// 遍历所有 partition 获取最新 offset 上的消息
	for _, partition := range partitionList {
		pc, _ := subscriber.Consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
  	// 启动一个 goroutine 用于持续监听消息
		go func(pc sarama.PartitionConsumer) {
			for message := range pc.Messages() {
				subscriber.logger.Infof(messageReceived(message))
				ch <- messageReceived(message)
			}
		}(pc)
	}
}

func messageReceived(message *sarama.ConsumerMessage) string {
	return string(message.Value)
}
package main

import (
	"encoding/json"
	"fmt"
	"os"

	"github.com/sirupsen/logrus"

	"./reporter"
	"./subscriber"
)

var (
	config = `{
	"host": "localhost:9092",
	"topic": "kafka_test"
  }`
)

func main() {
	var cfg reporter.KafkaCfg
	json.Unmarshal([]byte(config), &cfg)
	log := &logrus.Logger{Out: os.Stdout}

	//reporter
	producer := reporter.NewReporter(&cfg, log)
	//subscriber
	consumer := subscriber.NewSubscriber(&cfg, log)
	message := "Hello Kafka World."

	ch := make(chan string)
	consumer.Consume(cfg.Topic, ch)
	producer.DoReport(cfg.Topic, []byte(message))

	select {
	case msg := <-ch:
		fmt.Println("Got msg: ", msg)
		break
	}
}

为避免漏消费消息,可指定 group,Kafka 确保每个 partition 只能同一个 group 中的同一个 consumer 消费,若要重复消费,更换 group。

项目代码:kafka-client-golang,项目结构:

.
├── README.md
├── error_const
│   └── error_const.go
├── main.go
├── reporter
│   └── report.go
└── subscriber
    └── subscribe.go
?欢迎评论?

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK