
5

Kafka客户端--Go版本
source link: https://jjmeg.github.io/posts/kafka-golang-client/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Kafka客户端--Go版本
2020.5.6
974
2 分钟
「 Kafka 」
分布式消息平台,用Scala实现,本项目使用Go语言实现简单的消息生产和消费。
?欢迎评论?
Kafka 客户端 Golang 版
project feature weakness
Shopify/sarama 最受欢迎 集群式消费难实现,不支持Context
bsm/sarama-cluster 基于sarama补充集群式消费 不支持Context
confluentinc/confluent-kafka-go
依赖C语言库,不支持Context
lovoo/goka 依赖于sarama 不支持Context
segmentio/kafka-go 同时支持集群模式,易与软件交互 未正式发布,支持Context
下面将基于 Shopify/sarama 实现简单的消费者及生产者。
基本数据结构
Reporter 生产者:包含一个Producer及一个logger用于日志记录
Subscriber 消费者:包含一个Consumer及logger 具体实现
package reporter
import (
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
"../error_const"
)
// kafka server 端配置
type KafkaCfg struct {
Host string `json:"host"`
Topic string `json:"topic"`
}
// 消息生产者数据结构
type Reporter struct {
Producer sarama.SyncProducer
logger *logrus.Logger
}
// 构造函数
func NewReporter(cfg *KafkaCfg, log *logrus.Logger) *Reporter {
reporter := &Reporter{
logger: log,
}
reporter.setProducer(cfg)
return reporter
}
func (reporter *Reporter) setProducer(cfg *KafkaCfg) {
var broker = []string{cfg.Host}
config := sarama.NewConfig()
config.Producer.Return.Successes = true
// new 一个Producer
producer, err := sarama.NewSyncProducer(broker, config)
if err != nil {
reporter.logger.Errorf(error_const.InitProducerError, err)
}
reporter.Producer = producer
}
func (reporter *Reporter) DoReport(topic string, msg []byte) {
reporter.do(topic, msg)
}
func (reporter *Reporter) do(topic string, msg []byte) {
kafkaMsg := generateProducerMessage(topic, msg)
_, _, err := reporter.Producer.SendMessage(kafkaMsg)
if err != nil {
reporter.logger.Errorf(error_const.ReportKafkaMsgError, err, string(msg))
}
reporter.logger.Infof(error_const.ReportKafkaMsgSuccess, string(msg))
}
func generateProducerMessage(topic string, message []byte) *sarama.ProducerMessage {
return &sarama.ProducerMessage{
Topic: topic,
Partition: -1,
Value: sarama.StringEncoder(message),
}
}
package subscriber
import (
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
"../error_const"
"../reporter"
)
type Subscriber struct {
Consumer sarama.Consumer
logger *logrus.Logger
}
func NewSubscriber(cfg *reporter.KafkaCfg, log *logrus.Logger) *Subscriber {
subscriber := &Subscriber{
logger: log,
}
subscriber.setConsumer(cfg)
return subscriber
}
func (subscriber *Subscriber) setConsumer(cfg *reporter.KafkaCfg) {
consumer, err := sarama.NewConsumer([]string{cfg.Host}, nil)
if err != nil {
panic(err)
}
subscriber.Consumer = consumer
}
func (subscriber *Subscriber) Consume(topic string, ch chan string) {
defer func() {
if err := subscriber.Consumer.Close(); err != nil {
subscriber.logger.Errorf(error_const.SubcriberCloseConsumerError, err)
}
}()
// 获取所有 partition
partitionList, err := subscriber.Consumer.Partitions(topic)
if err != nil {
subscriber.logger.Errorf(error_const.SubScriberGetPartitionsError, err, topic)
}
// 遍历所有 partition 获取最新 offset 上的消息
for _, partition := range partitionList {
pc, _ := subscriber.Consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
// 启动一个 goroutine 用于持续监听消息
go func(pc sarama.PartitionConsumer) {
for message := range pc.Messages() {
subscriber.logger.Infof(messageReceived(message))
ch <- messageReceived(message)
}
}(pc)
}
}
func messageReceived(message *sarama.ConsumerMessage) string {
return string(message.Value)
}
package main
import (
"encoding/json"
"fmt"
"os"
"github.com/sirupsen/logrus"
"./reporter"
"./subscriber"
)
var (
config = `{
"host": "localhost:9092",
"topic": "kafka_test"
}`
)
func main() {
var cfg reporter.KafkaCfg
json.Unmarshal([]byte(config), &cfg)
log := &logrus.Logger{Out: os.Stdout}
//reporter
producer := reporter.NewReporter(&cfg, log)
//subscriber
consumer := subscriber.NewSubscriber(&cfg, log)
message := "Hello Kafka World."
ch := make(chan string)
consumer.Consume(cfg.Topic, ch)
producer.DoReport(cfg.Topic, []byte(message))
select {
case msg := <-ch:
fmt.Println("Got msg: ", msg)
break
}
}
为避免漏消费消息,可指定 group,Kafka 确保每个 partition 只能同一个 group 中的同一个 consumer 消费,若要重复消费,更换 group。
项目代码:kafka-client-golang,项目结构:
.
├── README.md
├── error_const
│ └── error_const.go
├── main.go
├── reporter
│ └── report.go
└── subscriber
└── subscribe.go
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK