5

Go 操作kafka包sarama

 2 years ago
source link: https://segmentfault.com/a/1190000040820548
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Go 操作kafka包sarama

sarama 是一个纯 Go 客户端库,用于处理 Apache Kafka(0.8 及更高版本)。它包括一个用于轻松生成和使用消息的高级 API,以及一个用于在高级 API 不足时控制线路上的字节的低级 API。
在github上stars上比较多(推荐)。

闲话少叙,上示例

package main

import (
    "context"
    "fmt"
    "github.com/Shopify/sarama"
    "log"
    "os"
    "os/signal"
    "sync"
)

type consumerGroupHandler struct {
}

func (consumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
    return nil
}
func (consumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", msg.Value, msg.Timestamp, msg.Topic)
        session.MarkMessage(msg, "")
    }
    return nil
}

//消费者组
func SaramaConsumerGroup() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = false
    config.Version = sarama.V0_10_2_0                     // specify appropriate version
    config.Consumer.Offsets.Initial = sarama.OffsetOldest // 未找到组消费位移的时候从哪边开始消费

    group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
    if err != nil {
        panic(err)
    }
    defer func() { _ = group.Close() }()

    // Track errors
    go func() {
        for err := range group.Errors() {
            fmt.Println("ERROR", err)
        }
    }()
    fmt.Println("Consumed start")
    // Iterate over consumer sessions.
    ctx := context.Background()
    for {
        topics := []string{"my_topic"}
        handler := consumerGroupHandler{}

        // `Consume` should be called inside an infinite loop, when a
        // server-side rebalance happens, the consumer session will need to be
        // recreated to get the new claims
        err := group.Consume(ctx, topics, handler)
        if err != nil {
            panic(err)
        }
    }
}

//消费者
func SaramaConsumer() {
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, sarama.NewConfig())
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetNewest)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    consumed := 0
ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            log.Printf("Consumed message offset %d\n", msg.Offset)
            consumed++
        case <-signals:
            break ConsumerLoop
        }
    }

    log.Printf("Consumed: %d\n", consumed)
}

//异步生产者Goroutines
func SyncProducer() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(err)
    }

    // Trap SIGINT to trigger a graceful shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var (
        wg                                  sync.WaitGroup
        enqueued, successes, producerErrors int
    )

    wg.Add(1)
    go func() {
        defer wg.Done()
        for range producer.Successes() {
            successes++
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        for err := range producer.Errors() {
            log.Println(err)
            producerErrors++
        }
    }()

ProducerLoop:
    for {
        message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 456")}
        select {
        case producer.Input() <- message:
            enqueued++

        case <-signals:
            producer.AsyncClose() // Trigger a shutdown of the producer.
            break ProducerLoop
        }
    }

    wg.Wait()

    log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
}

//异步生产者Select
func SyncProducerSelect() {
    producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, nil)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := producer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var enqueued, producerErrors int
ProducerLoop:
    for {
        select {
        case producer.Input() <- &sarama.ProducerMessage{Topic: "my_topic", Key: nil, Value: sarama.StringEncoder("testing 123")}:
            enqueued++
        case err := <-producer.Errors():
            log.Println("Failed to produce message", err)
            producerErrors++
        case <-signals:
            break ProducerLoop
        }
    }
    log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors)
}

//同步生产模式
func SaramaProducer() {
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
    if err != nil {
        log.Fatalln(err)
    }
    defer func() {
        if err := producer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    msg := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        log.Printf("FAILED to send message: %s\n", err)
    } else {
        log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
    }

}

func main() {
    //生产者
    go SyncProducer()
    //go SaramaProducer()
    //go SyncProducerSelect()

    //消费者
    SaramaConsumerGroup()
    //SaramaConsumer()

}

links

https://pkg.go.dev/github.com/Shopify/sarama
https://github.com/Shopify/sarama

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK