30

高性能消息中间件 NSQ 解析-应用实践

 2 years ago
source link: http://blueskykong.com/2021/03/14/nsq-2/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

高性能消息中间件 NSQ 解析-应用实践

Nsq 是用 Go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理,对于学习 Go channel的原理和用法,以及如何用 Go 语言来写分布式是一个很不错的入门项目。

我们在上一篇文章整体介绍了 nsq 的组成以及各个模块的功能,本文将会带领大家一起实践 nsq 的安装,并基于 nsq 提供的 API 进行实践。

在官网(https://nsq.io/overview/quick_start.html) 下载对应的二进制可执行文件。

# 启动nsqlookupd
$ nsqlookupd
# 启动 nsqd
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
# 启动 nsqadmin
$ nsqadmin --lookupd-http-address=127.0.0.1:4161

# 创建topic,发送消息
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
# 启动nsq_to_file
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
# 发布消息到 nsqd
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'

在本地按照上述步骤就可以跑起来了。

创建生产者

安装好 nsq 的几个服务之后,我们来实现基于 nsq 的生产和消费示例。首先是创建生产者:

package main

import (
"fmt"
"log"
"time"

"github.com/nsqio/go-nsq"
)

func main() {
config := nsq.NewConfig()
p, err := nsq.NewProducer("127.0.0.1:4150", config)

if err != nil {
log.Panic(err)
}

for i := 0; i < 1000; i++ {
msg := fmt.Sprintf("num-%d", i)
log.Println("Pub:" + msg)
err = p.Publish("testTopic", []byte(msg))
if err != nil {
log.Panic(err)
}
time.Sleep(time.Second * 1)
}

p.Stop()
}

生产者的逻辑比较简单,基于 nsq 官方提供的 github.com/nsqio/go-nsq包,通过调用,循环写 1000 个字符+数字,即 num-n 的形式,通过 p.Publish 发送到消息队列中,等待消费。

接着,我们创建消费者:consumer.go 来消费刚刚生产的消息。

package main

import (
"log"
"sync"

"github.com/nsqio/go-nsq"
)

func main() {
wg := &sync.WaitGroup{}
wg.Add(1000)

config := nsq.NewConfig()
c, _ := nsq.NewConsumer("testTopic", "ch", config)
c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Printf("Got a message: %s", message.Body)
wg.Done()
return nil
}))

// 1.直连nsqd
// err := c.ConnectToNSQD("127.0.0.1:4150")

// 2.通过 nsqlookupd 服务发现
err := c.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil {
log.Panic(err)
}
wg.Wait()
}

可通过两种方式与 nsqd 连接:

  • 直连 nsqd,适用于单机(standalone)版;
  • 通过 nsqlookupd 服务发现,适用于集群(cluster)版;

消费消息的动作,主要逻辑就是打印出来,实际业务中需要进行其他处理。

依次启动生产者和消费者的服务,可以分别看到如下的输出结果:

$go run producer.go

2020/12/28 20:29:51 Pub:num-0
2020/12/28 20:29:51 INF 1 (127.0.0.1:4150) connecting to nsqd
2020/12/28 20:29:52 Pub:num-1
2020/12/28 20:29:53 Pub:num-2
2020/12/28 20:29:54 Pub:num-3
2020/12/28 20:29:55 Pub:num-4
2020/12/28 20:29:56 Pub:num-5
2020/12/28 20:29:57 Pub:num-6
2020/12/28 20:29:58 Pub:num-7
2020/12/28 20:29:59 Pub:num-8
2020/12/28 20:30:00 Pub:num-9
2020/12/28 20:30:01 Pub:num-10

$ go run consumer.go

2020/12/28 20:30:08 INF 1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
2020/12/28 20:30:08 INF 1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
2020/12/28 20:30:08 Got a message: num-0
2020/12/28 20:30:08 Got a message: num-1
2020/12/28 20:30:08 Got a message: num-2
2020/12/28 20:30:08 Got a message: num-3
2020/12/28 20:30:08 Got a message: num-4
2020/12/28 20:30:08 Got a message: num-5
2020/12/28 20:30:08 Got a message: num-6
2020/12/28 20:30:08 Got a message: num-7
2020/12/28 20:30:08 Got a message: num-8
2020/12/28 20:30:08 Got a message: num-9
2020/12/28 20:30:08 Got a message: num-10

通过如上的示例,我们已经成功地实现 NSQ 的应用。下面我们将解析 NSQ 的几个核心部分。

本文主要介绍 nsq 的安装使用,下载好可执行文件之后,依次启动 nsqlookupd、nsqd、nsqadmin 几个服务。接着我们基于官方提供的客户端 API 包实现了生产消费模型的案例。通过简单的案例,我们能够对 nsq 的安装和基本使用有一个了解。

下一篇文章,将会具体分析 nsq 实现的细节。

高性能消息中间件 NSQ 解析-整体介绍

微服务架构中使用 ELK 进行日志采集以及统一处理

没有 try-catch,该如何处理 Go 错误异常?

订阅最新文章,欢迎关注我的公众号


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK